An asynchronous REST service with CXF and the continuations API

Asynchronous calls over HTTP seems to be a common problem in distributed applications. In my case, I met a situation where the client wanted to be notified by some messages received in the back end. I also met a problem where I had to run a very long transaction upon an HTTP call…

What to do in such cases? Fortunately, server push several solutions exist, but looking at the legacy stack I had (Spring and Apache CXF), it appeared the use of the Continuation API was the appropriate choice.

I found few mention of this API on the Web – except this blog post – and above all it lacks concrete examples. So this is a good reason to have a post in my blog with an example.

What is the Continuations API?

The continuations API enables the server to suspend the request thread and resume it later (e.g. when the long-running process is over, or when an event occurs).

In other words, this is a way to implement long-polling, with a scalability advantage because suspended request threads are back to the HTTP request handling thread pool, then being able to serve other client requests.

Continuations API seems to be an idea introduced by the Jetty web server. And it somehow seems to overlap with the latest Servlet 3.0 Async capabilities.

CXF reuses the idea of continuation and make it protocol agnostic (although HTTP is the prime interest to my mind). Let’s use this API in an example.

An example

The code of this example is available in my blog samples repository on Github – see the project continuation-sample.

The REST service class

The REST service logic is implemented in the MyRestService class.

@Service("myRestService")
@Path("/my_rest_service")
public class MyRestService {

	@Context
	private MessageContext context;

	@Autowired
	private TaskExecutor taskExecutor;

	@GET
	@Path("/say_hello/{arg}")
	@Produces(MediaType.TEXT_PLAIN)
	public String sayHello(@PathParam("arg") final String arg) throws Exception {
		// get the Continuation API object
		ContinuationProvider continuationProvider = (ContinuationProvider) context.get(ContinuationProvider.class.getName());
		final Continuation continuation = continuationProvider.getContinuation();
		synchronized (continuation) {
			if(continuation.isNew()) {
				// it means this is a new request
				// we execute a task asynchronously
				FutureTask futureResponse = new FutureTask(new Callable() {

					@Override
					public String call() throws Exception {
						// execute the asynchronous job here
						Thread.sleep(5000);
						// resume the request
						continuation.resume();
						return "Hello "+arg;
					}
				});
				taskExecutor.execute(futureResponse);
				// we store the future task in the continuation object for future retrieval
				continuation.setObject(futureResponse);
				// and we suspend the request
				continuation.suspend(6000);
				// this will not be returned to the client
				return null;
			} else {
				// it means the request has been resumed or that a timeout occurred
				FutureTask futureTask = (FutureTask) continuation.getObject();
				return futureTask.get();
			}
		}
	}
}

We retrieve the continuations API from the request MessageContext object. This context object is injected by CXF at line 5-6 with the @Context annotation. In the context of REST services, we retrieve a org.apache.cxf.jaxrs.ext.MessageContext object. If we were in a Web service context (over SOAP), we would have a javax.xml.ws.WebServiceContext.

The continuations API is implemented in the Continuation object which is obtained at line 16-17-18. This is the object which will enable us to suspend/resume the request thread.

Here we are! See the continuation.suspend() and continuation.resume() in the code. The suspend() operation will cause the request to be on hold. The execution thread will however continue the code execution but the returned response won’t be returned to the client. We can pass an optional long value to the suspend method, this is a timeout which will resume the request if the resume() operation is not cqlled in the meantime.

On the other hand, the resume() method will cause the reexecution of the sayHello() service. Hence the c.isNew() check, which is false if the request has just been resumed. This also justify the synchronized block to prevent potential incoherence.

The Continuation object has also other useful methods: have a look at the CXF Javadoc. In the example we for example use the getObject() and setObject() methods to convey a FutureTask after a request resume.

A word about the Spring task executor

In the example, we simulate a long-running transaction with a simple Thread.sleep() in a separate Thread.

This separate thread is executed with a Spring ThreadExecutor autowired on line 9.

You can find its declaration in the Spring context declaration file (context.xml). There is also the declaration of the CXF REST server, but it is fairly conventional.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jaxrs="http://cxf.apache.org/jaxrs"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
       ">
 
    <import resource="classpath:META-INF/cxf/cxf.xml" />
    <import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
    
    <!-- CXF Rest service setup -->
    <context:component-scan base-package="com.clempinch.sample.continuation" />
     <jaxrs:server id="restContainer" address="/">
        <jaxrs:serviceBeans>
            <ref bean="myRestService" />
        </jaxrs:serviceBeans>
    </jaxrs:server>
     
    <!-- Task executor service  -->
    <task:executor id="myExecutorService" pool-size="5-10" queue-capacity="10" />
 </beans>

The Spring thread executor service is useful to implement asynchronous calls and enable fine configuration of pools, execution queues… We could also imagine that you implement callbacks in JMS listener or with asynchronous EJBs.

The final configuration touch

Finally, to make the continuations API effective, you should configure the CXF servlet to support asynchronous calls. This is done in the web.xml by adding the async-supported tag set to true.

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="3.0"
    xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3.0.xsd">
     
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>WEB-INF/spring/context.xml</param-value>
    </context-param>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
 
    <servlet>
        <servlet-name>CXFServlet</servlet-name>
        <servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>
    <servlet-mapping>
        <servlet-name>CXFServlet</servlet-name>
        <url-pattern>/*</url-pattern>
    </servlet-mapping>
</web-app>

You should also note that this feature has been added with the Servlet 3.0 version (hence the 3.0 namespace declaration). If you do not specify the async-supported tag, the ContinuationProvider will not be found in the context and this will result in a NullPointerException at line 16 of the REST Service.

If you have servlet filters, be sure also that they set the async-supported to true. This is not the case in this example, but if you have security or logging filter the HTTP requests comes through, they should all set async-supported to true.

Deployment

The example can be built with Maven:

mvn package

Deploy the resulting WAR to any Web application server. I tried with Tomcat 7 and it worked.

Go to: http://[server_address]/continuation_example-0.0.1-SNAPSHOT/my_rest_service/say_hello/world

You will see that the response will takes 5 seconds to be issued.