Category Archives: Java

An asynchronous REST service with CXF and the continuations API

Asynchronous calls over HTTP seems to be a common problem in distributed applications. In my case, I met a situation where the client wanted to be notified by some messages received in the back end. I also met a problem where I had to run a very long transaction upon an HTTP call…

What to do in such cases? Fortunately, server push several solutions exist, but looking at the legacy stack I had (Spring and Apache CXF), it appeared the use of the Continuation API was the appropriate choice.

I found few mention of this API on the Web – except this blog post – and above all it lacks concrete examples. So this is a good reason to have a post in my blog with an example.

What is the Continuations API?

The continuations API enables the server to suspend the request thread and resume it later (e.g. when the long-running process is over, or when an event occurs).

In other words, this is a way to implement long-polling, with a scalability advantage because suspended request threads are back to the HTTP request handling thread pool, then being able to serve other client requests.

Continuations API seems to be an idea introduced by the Jetty web server. And it somehow seems to overlap with the latest Servlet 3.0 Async capabilities.

CXF reuses the idea of continuation and make it protocol agnostic (although HTTP is the prime interest to my mind). Let’s use this API in an example.

An example

The code of this example is available in my blog samples repository on Github – see the project continuation-sample.

The REST service class

The REST service logic is implemented in the MyRestService class.

@Service("myRestService")
@Path("/my_rest_service")
public class MyRestService {

	@Context
	private MessageContext context;

	@Autowired
	private TaskExecutor taskExecutor;

	@GET
	@Path("/say_hello/{arg}")
	@Produces(MediaType.TEXT_PLAIN)
	public String sayHello(@PathParam("arg") final String arg) throws Exception {
		// get the Continuation API object
		ContinuationProvider continuationProvider = (ContinuationProvider) context.get(ContinuationProvider.class.getName());
		final Continuation continuation = continuationProvider.getContinuation();
		synchronized (continuation) {
			if(continuation.isNew()) {
				// it means this is a new request
				// we execute a task asynchronously
				FutureTask futureResponse = new FutureTask(new Callable() {

					@Override
					public String call() throws Exception {
						// execute the asynchronous job here
						Thread.sleep(5000);
						// resume the request
						continuation.resume();
						return "Hello "+arg;
					}
				});
				taskExecutor.execute(futureResponse);
				// we store the future task in the continuation object for future retrieval
				continuation.setObject(futureResponse);
				// and we suspend the request
				continuation.suspend(6000);
				// this will not be returned to the client
				return null;
			} else {
				// it means the request has been resumed or that a timeout occurred
				FutureTask futureTask = (FutureTask) continuation.getObject();
				return futureTask.get();
			}
		}
	}
}

We retrieve the continuations API from the request MessageContext object. This context object is injected by CXF at line 5-6 with the @Context annotation. In the context of REST services, we retrieve a org.apache.cxf.jaxrs.ext.MessageContext object. If we were in a Web service context (over SOAP), we would have a javax.xml.ws.WebServiceContext.

The continuations API is implemented in the Continuation object which is obtained at line 16-17-18. This is the object which will enable us to suspend/resume the request thread.

Here we are! See the continuation.suspend() and continuation.resume() in the code. The suspend() operation will cause the request to be on hold. The execution thread will however continue the code execution but the returned response won’t be returned to the client. We can pass an optional long value to the suspend method, this is a timeout which will resume the request if the resume() operation is not cqlled in the meantime.

On the other hand, the resume() method will cause the reexecution of the sayHello() service. Hence the c.isNew() check, which is false if the request has just been resumed. This also justify the synchronized block to prevent potential incoherence.

The Continuation object has also other useful methods: have a look at the CXF Javadoc. In the example we for example use the getObject() and setObject() methods to convey a FutureTask after a request resume.

A word about the Spring task executor

In the example, we simulate a long-running transaction with a simple Thread.sleep() in a separate Thread.

This separate thread is executed with a Spring ThreadExecutor autowired on line 9.

You can find its declaration in the Spring context declaration file (context.xml). There is also the declaration of the CXF REST server, but it is fairly conventional.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jaxrs="http://cxf.apache.org/jaxrs"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
       ">
 
    <import resource="classpath:META-INF/cxf/cxf.xml" />
    <import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
    
    <!-- CXF Rest service setup -->
    <context:component-scan base-package="com.clempinch.sample.continuation" />
     <jaxrs:server id="restContainer" address="/">
        <jaxrs:serviceBeans>
            <ref bean="myRestService" />
        </jaxrs:serviceBeans>
    </jaxrs:server>
     
    <!-- Task executor service  -->
    <task:executor id="myExecutorService" pool-size="5-10" queue-capacity="10" />
 </beans>

The Spring thread executor service is useful to implement asynchronous calls and enable fine configuration of pools, execution queues… We could also imagine that you implement callbacks in JMS listener or with asynchronous EJBs.

The final configuration touch

Finally, to make the continuations API effective, you should configure the CXF servlet to support asynchronous calls. This is done in the web.xml by adding the async-supported tag set to true.

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="3.0"
    xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3.0.xsd">
     
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>WEB-INF/spring/context.xml</param-value>
    </context-param>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
 
    <servlet>
        <servlet-name>CXFServlet</servlet-name>
        <servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>
    <servlet-mapping>
        <servlet-name>CXFServlet</servlet-name>
        <url-pattern>/*</url-pattern>
    </servlet-mapping>
</web-app>

You should also note that this feature has been added with the Servlet 3.0 version (hence the 3.0 namespace declaration). If you do not specify the async-supported tag, the ContinuationProvider will not be found in the context and this will result in a NullPointerException at line 16 of the REST Service.

If you have servlet filters, be sure also that they set the async-supported to true. This is not the case in this example, but if you have security or logging filter the HTTP requests comes through, they should all set async-supported to true.

Deployment

The example can be built with Maven:

mvn package

Deploy the resulting WAR to any Web application server. I tried with Tomcat 7 and it worked.

Go to: http://[server_address]/continuation_example-0.0.1-SNAPSHOT/my_rest_service/say_hello/world

You will see that the response will takes 5 seconds to be issued.

Java source code generation and Maven

Imagine you are working on a project where you use some Java code generated from a tool.
For the moment, you generate the code and integrate it yourself to the project:

Original Maven project structure

How would you automate this code generation step in your Maven build?

1. Create a separate Maven project where you will package the result of the code generation.

We package the result of the code generation in a separate JAR. Therefore, we need a different project structure.

New Maven project structure

We create a parent project with two child Maven modules. One with the original code, the other where the generated code will be located.
The parent POM looks like this:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>com.clempinch</groupId>
		<artifactId>myproject</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</parent>
	<groupId>com.clempinch</groupId>
	<artifactId>sample</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>com.clempinch</groupId>
			<artifactId>generated-code</artifactId>
			<version>0.0.1-SNAPSHOT</version>
		</dependency>
	</dependencies>

</project>

2. Automate the code generation in Maven.

If a Maven plugin is not already provided to generate the source code, then you can probably use the Exec Maven plugin to do the job.

Here is the pom.xml of the code generation project (we assume you generate the code via a Java application – hence the use the exec:java plugin).

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.clempinch</groupId>
    <artifactId>myproject</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  <groupId>com.clempinch</groupId>
  <artifactId>generated-code</artifactId>

  <dependencies>
    <!-- For generating source code -->
    <dependency>
      <groupId>com.clempinch</groupId>
      <artifactId>some-generation-jar</artifactId>
      <version>1.0.0</version>
      <scope>runtime</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>generateCode</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>java</goal>
            </goals>
            <configuration>
              <mainClass>com.clempinch.generation.MainClass</mainClass>
              <classpathScope>runtime</classpathScope>
              <sourceRoot>${project.build.directory}/generated-sources/main/java</sourceRoot>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>build-helper-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>addtoSourceFolder</id>
            <goals>
              <goal>add-source</goal>
            </goals>
            <phase>process-sources</phase>
            <configuration>
              <sources>
                <source>${project.build.directory}/generated-sources/generation_place</source>
              </sources>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

3. Add the dependency to your initial project.

The pom.xml of the initial project needs to add a dependency to the generated-code JAR package:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>	
	<parent>
		<groupId>com.clempinch</groupId>
		<artifactId>myproject</artifactId>
		<version>0.0.1-SNAPSHOT</version>
        </parent>
	<groupId>com.clempinch</groupId>
	<artifactId>sample</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>com.clempinch</groupId>
			<artifactId>generated-code</artifactId>
			<version>0.0.1-SNAPSHOT</version>
		</dependency>
	</dependencies>
</project>

4. Build the project from the parent POM.

mvn clean install

Now everything should be done automatically: the code generation and the build of the project.

Monitor an Active MQ queue with Hermes JMS

Hermes JMS is a great tool to monitor the activities on your JMS broker. Especially, it is quite useful to see what are the messages transiting in the queues and topics (you can also record them and replay them, which is pretty cool).

HermesJMS

However, when you are monitoring the messages transiting in a queue, Hermes JMS only takes a snapshot of what is pending, i.e. not yet delivered. That means that if you have a consumer plugged on this queue, you will barely see any message since they are immediately consumed as they are sent (most of the cases at least)… It is a problem when you want to see the flow of messages, record it and replay it.

Mirrored queues

I am using Apache ActiveMQ as a JMS Broker. My solution for this broker is to use mirrored queues.

This feature of ActiveMQ enables to route messages from one destination to another without any external interventions. So if we route messages from the queue we want to monitor to a topic, messages coming to the queue will be automatically sent to the topic. If we subscribe to this topic with Hermes JMS, then we are able to see all messages which have been sent to the original queue. Indeed, topics are easier to monitor with Hermes JMS. We see there the continuous flow of messages as they arrive.

How is this done in ActiveMQ? I requires very few configuration in your activemq.xml.

First you need to configure the broker instance as enabling virtual topics:

<broker brokerName="localhost" 
   useVirtualTopics="true" 
   dataDirectory="${activemq.data}" ... >

Then, if we assume the queue to monitor is called test.queue and if you want to route the messages to be routes to a topic called test.queue.mirror, you have to create a destination interceptor:

<destinationInterceptors>
	<virtualDestinationInterceptor>
		<virtualDestinations>
			<compositeQueue name="test.queue" forwardOnly="false">
				<forwardTo>
					<topic physicalName="test.queue.mirror"/>
				</forwardTo>
			</compositeQueue>
		</virtualDestinations>
	</virtualDestinationInterceptor>		
</destinationInterceptors>

Be sure the queue and topic destinations exists as well:

<amq:destinations>
	<amq:queue physicalName="test.queue"></amq:queue>
	<amq:topic physicalName="test.queue.mirror"></amq:topic>
</amq:destinations>

Back to Hermes JMS

Here we are! Restart ActiveMQ and you will notice that when you send a message to queue.test, this message is also sent to test.queue.mirror.

Start to Hermes JMS and connect to the topic: you see the flow of messages sent to the queue arriving.

Hermes JMS with mirrored queue

Transient and final instance variables in Java

In Java, transient and final are two important keywords and it is legal to put them as modifier of instance variables.

But how does it behave when it comes to object serialization? Since it is transient, we can guess such variable are not serialized and therefore, when the object is deserialized, they are initialized to their default values, like for regular non final instance variable.

But in fact, it depends! See the code below:

public class Diplodocus implements Serializable {

	private final transient String s1 = "test";
	private final transient String s2;
	private final transient String s3 = new String("hello");
	private final transient String s4 = s1 + s1 + 1;
	private final transient int i1 = 7;
	private final transient int i2 = 7 * 3;
	private final transient int i3 = Integer.MIN_VALUE;
	private final transient int[] a1 = {1,2,3};

	public Diplodocus() {
		s2 = "s2";
	}

	public static void main(String[] args) {

		File f = new File("diplo");
		try {
			FileOutputStream fos = new FileOutputStream(f);
			ObjectOutputStream oos = new ObjectOutputStream(fos);
			oos.writeObject(new Diplodocus());
			oos.flush();
			oos.close();

			FileInputStream fis = new FileInputStream(f);
			ObjectInputStream ois = new ObjectInputStream(fis);
			Diplodocus diplo = (Diplodocus) ois.readObject();
			ois.close();
			System.out.println("s1 = "+diplo.s1);
			System.out.println("s2 = "+diplo.s2);
			System.out.println("s3 = "+diplo.s3);
			System.out.println("s4 = "+diplo.s4);
			System.out.println("i1 = "+diplo.i1);
			System.out.println("i2 = "+diplo.i2);
			System.out.println("i3 = "+diplo.i3);
			System.out.println("a1 = "+diplo.a1);

		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}
}

What we expect here is that the instance variables after the deserialization will be equals to the default initialization values : null for objects like Strings and 0 for the integers.

But, in fact the code has the following output:

s1 = test
s2 = null
s3 = null
s4 = testtest1
i1 = 7
i2 = 21
i3 = -2147483648
a1 = null

What happened?

During deserialization, the constructor of the object is not called. This is a special object instantiation process handled by the JVM. For regular non-final transient instance variable, they are assigned with their default values.

But what happened for final variables? There is a little trick.

If the final variable is blank, meaning that it is first initialized in the object constructor(s) (like on line 13), the deserialization will put the default value. That is why s2 = null.

On the contrary, for a final variable explicitly initialized, the value is reaffected, but only if this initializations is done in a certain manner, in fact, only if the initialization is done with what the Java specification calls a compile-time constant expression (§15.28).

Constant expressions are the ones you are allowed to use in the case labels of a switch statement. A constant expression is an expression, which can be resolved at compile-time with no ambiguity: somehow, it gives the guarantee that the resulting value will never change during the execution.

In our case, these are considered constant expressions and that is why the values are reaffected to the final variables (even if they are transient).

  • literals (like a String on line 3, or an integer on line 7)
  • + operation between literals and reference to a constant final variables (like on line 6)
  • * operation between integers (like on line 8)
  • reference to constants (like on line 9)

There are other types of constant expressions (e.g. cast operation to a primitive or a String). I suggest you to have a look at the specifications for more details1.

However, object instantiations are not considered to be constant expressions and that is why these are assigned to the default values (like on lines 5 and 10).

Conclusion

Be careful when you use transient final instance variable! You may expect to get attributes initialized or not according to the context. And the worst part is that, since they are final you cannot modify them directly afterwards.

  1. or simply check this: http://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.28 []