Asynchronous calls over HTTP seems to be a common problem in distributed applications. In my case, I met a situation where the client wanted to be notified by some messages received in the back end. I also met a problem where I had to run a very long transaction upon an HTTP call…
What to do in such cases? Fortunately, server push several solutions exist, but looking at the legacy stack I had (Spring and Apache CXF), it appeared the use of the Continuation API was the appropriate choice.
I found few mention of this API on the Web – except this blog post – and above all it lacks concrete examples. So this is a good reason to have a post in my blog with an example.
What is the Continuations API?
The continuations API enables the server to suspend the request thread and resume it later (e.g. when the long-running process is over, or when an event occurs).
In other words, this is a way to implement long-polling, with a scalability advantage because suspended request threads are back to the HTTP request handling thread pool, then being able to serve other client requests.
Continuations API seems to be an idea introduced by the Jetty web server. And it somehow seems to overlap with the latest Servlet 3.0 Async capabilities.
CXF reuses the idea of continuation and make it protocol agnostic (although HTTP is the prime interest to my mind). Let’s use this API in an example.
An example
The code of this example is available in my blog samples repository on Github – see the project continuation-sample
.
The REST service class
The REST service logic is implemented in the MyRestService
class.
@Service("myRestService") @Path("/my_rest_service") public class MyRestService { @Context private MessageContext context; @Autowired private TaskExecutor taskExecutor; @GET @Path("/say_hello/{arg}") @Produces(MediaType.TEXT_PLAIN) public String sayHello(@PathParam("arg") final String arg) throws Exception { // get the Continuation API object ContinuationProvider continuationProvider = (ContinuationProvider) context.get(ContinuationProvider.class.getName()); final Continuation continuation = continuationProvider.getContinuation(); synchronized (continuation) { if(continuation.isNew()) { // it means this is a new request // we execute a task asynchronously FutureTask futureResponse = new FutureTask(new Callable() { @Override public String call() throws Exception { // execute the asynchronous job here Thread.sleep(5000); // resume the request continuation.resume(); return "Hello "+arg; } }); taskExecutor.execute(futureResponse); // we store the future task in the continuation object for future retrieval continuation.setObject(futureResponse); // and we suspend the request continuation.suspend(6000); // this will not be returned to the client return null; } else { // it means the request has been resumed or that a timeout occurred FutureTask futureTask = (FutureTask) continuation.getObject(); return futureTask.get(); } } } }
We retrieve the continuations API from the request MessageContext
object. This context object is injected by CXF at line 5-6 with the @Context
annotation. In the context of REST services, we retrieve a org.apache.cxf.jaxrs.ext.MessageContext
object. If we were in a Web service context (over SOAP), we would have a javax.xml.ws.WebServiceContext
.
The continuations API is implemented in the Continuation
object which is obtained at line 16-17-18. This is the object which will enable us to suspend/resume the request thread.
Here we are! See the continuation.suspend()
and continuation.resume()
in the code. The suspend()
operation will cause the request to be on hold. The execution thread will however continue the code execution but the returned response won’t be returned to the client. We can pass an optional long value to the suspend
method, this is a timeout which will resume the request if the resume()
operation is not cqlled in the meantime.
On the other hand, the resume()
method will cause the reexecution of the sayHello()
service. Hence the c.isNew()
check, which is false if the request has just been resumed. This also justify the synchronized block to prevent potential incoherence.
The Continuation
object has also other useful methods: have a look at the CXF Javadoc. In the example we for example use the getObject()
and setObject()
methods to convey a FutureTask
after a request resume.
A word about the Spring task executor
In the example, we simulate a long-running transaction with a simple Thread.sleep()
in a separate Thread.
This separate thread is executed with a Spring ThreadExecutor
autowired on line 9.
You can find its declaration in the Spring context declaration file (context.xml
). There is also the declaration of the CXF REST server, but it is fairly conventional.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jaxrs="http://cxf.apache.org/jaxrs" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd "> <import resource="classpath:META-INF/cxf/cxf.xml" /> <import resource="classpath:META-INF/cxf/cxf-servlet.xml" /> <!-- CXF Rest service setup --> <context:component-scan base-package="com.clempinch.sample.continuation" /> <jaxrs:server id="restContainer" address="/"> <jaxrs:serviceBeans> <ref bean="myRestService" /> </jaxrs:serviceBeans> </jaxrs:server> <!-- Task executor service --> <task:executor id="myExecutorService" pool-size="5-10" queue-capacity="10" /> </beans>
The Spring thread executor service is useful to implement asynchronous calls and enable fine configuration of pools, execution queues… We could also imagine that you implement callbacks in JMS listener or with asynchronous EJBs.
The final configuration touch
Finally, to make the continuations API effective, you should configure the CXF servlet to support asynchronous calls. This is done in the web.xml
by adding the async-supported
tag set to true.
<?xml version="1.0" encoding="UTF-8"?> <web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3.0.xsd"> <context-param> <param-name>contextConfigLocation</param-name> <param-value>WEB-INF/spring/context.xml</param-value> </context-param> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <servlet> <servlet-name>CXFServlet</servlet-name> <servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class> <load-on-startup>1</load-on-startup> <async-supported>true</async-supported> </servlet> <servlet-mapping> <servlet-name>CXFServlet</servlet-name> <url-pattern>/*</url-pattern> </servlet-mapping> </web-app>
You should also note that this feature has been added with the Servlet 3.0 version (hence the 3.0 namespace declaration). If you do not specify the async-supported
tag, the ContinuationProvider
will not be found in the context and this will result in a NullPointerException
at line 16 of the REST Service.
If you have servlet filters, be sure also that they set the async-supported
to true. This is not the case in this example, but if you have security or logging filter the HTTP requests comes through, they should all set async-supported
to true.
Deployment
The example can be built with Maven:
mvn package
Deploy the resulting WAR to any Web application server. I tried with Tomcat 7 and it worked.
Go to: http://[server_address]/continuation_example-0.0.1-SNAPSHOT/my_rest_service/say_hello/world
You will see that the response will takes 5 seconds to be issued.