Reactive Programming in Java EE Tutorial

Sep 3, 2018

Reactive programming: Go with the flow

When do reactive streams in Enterprise Java make sense? In this article, Arne Limburg explores the fundamentals of reactive streams in Enterprise Java and goes over how it can be useful for developers.

In a classic Enterprise Java application, there is a clear link between a request and a thread. If a web request arrives at the server, a thread is used from the corresponding thread pool and it is responsible for the complete processing of the request (Fig. 1). This also applies if a third-party system (for example, a database) is called during the request. The request thread waits and blocks more threads until the call returns to the third-party system (Fig. 2). Any parallel incoming requests are processed by an additional thread (Fig. 3).


STAY TUNED!

Learn more about JAX London

 

Fig. 1: The thread used is responsible for the complete processing of the request.

 

Fig. 2: The request thread waits and blocks more threads until the call returns to the third-party system.

 

Fig. 3: Any parallel incoming requests are processed by an additional thread.

 




The idea behind reactive programming is to avoid such blocked states. In this case, the call would first be sent to the third-party system without expecting an answer directly. As a result, the thread would be free to process any other requests after making the call (Fig. 4). The reply would come later in the form of an event. Processing could then continue when the event occurs.

 

Fig. 4: As a result, the thread would be free to process any other requests after making the call.

Fewer blocking threads would mean that a system programmed in this way would need fewer threads all together (in extreme cases like NodeJS, only one). Additionally, the system would be more efficient thanks to reduced context switching.

Less memory needed thanks to reactive programming

Yet another advantage of reactive programming appears when it comes to memory requirements. Because individual events are processed one at a time, they only need to be stored in the memory. Let’s suppose that all the customers from the database need to be loaded through a REST call. In a classic Enterprise Java application, all the customers would first be loaded from the database, mapped via JPA into the domain model, and then returned to the client via the JAX-RS. All the customers would first be saved in memory as JAVA objects and then additionally as JSON objects, before being completely sent to the client.

In a reactive server, the database would send an event with the first loaded customer, which could then be converted to the domain model and streamed to the client. The remaining customers would be processed the same way. Therefore, there would always be just one customer at a time in memory. All in all, the server would require less memory.

Reactive Programming in Java EE

Basically, reactive programming seems to have a few advantages over the classic Enterprise Java model. The question now is whether you can combine both programming models. For a long time now, Java EE has been offering a number of APIs to provide mechanisms for processing streams, events, or messages. Many of these already have the ability to do so asynchronously or can even wait for events to occur. This way, reactive programming is basically already possible with some of this technology. However, this is not the case with a unified API. In fact, it is rather the case that any Enterprise Java Standard that provides something in along this scope comes with its own API.

Here, I would like to show you how the individual APIs can be combined based on different implementations of a case study. The Use Case which I would like to look at here is the status update of a customer order. In the first variant, the order status is updated via WebSockets. Based on this, a CDI event is sent asynchronously (Listing 1). Clients can register for certain orders via REST and Long Polling (Listing 2). When the REST resource receives the CDI Event, the status update is made to all clients (Listing 3). Additionally, you can listen for the status update via JMS (Listing 4) and send it to the client using server-sent events (Listing 5). The entire chain uses Java EE 8 technologies and can be implemented as completely non-blocking.

 

Listing 1

@ApplicationScoped
@ServerEndpoint(value = "/status", encoders = OrderStatusConverter.class)
public class WebsocketEndpoint {

  @Inject
  @Updated
  private Event<OrderStatus> orderStatusUpdatedEvent;

  @OnMessage
  public void onStatusUpdate(OrderStatus status) {
    orderStatusUpdatedEvent.fireAsync(status);
  }
}

Listing 2

ClientBuilder
  .newClient()
  .target("http://.../orders/" + orderNumber + "/status")
  .request()
  .async()
  .get(new InvocationCallback<OrderStatus>() {
    public void completed(OrderStatus status) {
      ...
    }

    public void failed(Throwable error) {
      ...
    }
  });

Listing 3

@ApplicationScoped
@Path("/orders")
public class OrderResource {

  private Map<OrderNumber, List<AsyncResponse>> statusUpdateRequests
      = new HashMap<>();

  @GET
  @Path("/{orderNumber}/status")
  public void getStatus(
      @PathParam("orderNumber") OrderNumber orderNumber,
      @Suspended AsyncResponse statusUpdateRequest) {
      statusUpdateRequests
          .computeIfAbsent(orderNumber, o -> new ArrayList<>())
          .add(statusUpdateRequest);
  }

  public void updateStatus(
      @ObservesAsync @Updated OrderStatus status) {
      Optional
          .ofNullable(statusUpdateRequests
              .remove(status.getOrderNumber()))
          .ifPresent(l -> l.stream()
              .filter(r -> !r.isCancelled())
              .forEach(r -> r.resume(status)));
  }
}

Listing 4

@Resource(lookup = "jms/statusTopic")
private Topic topic;
@Inject
private JMSContext context;
@Inject
@Updated
private Event<OrderStatus> orderStatusUpdatedEvent;

@PostConstuct
public void waitForStatusUpdates() {
  context
    .createConsumer(topic)
    .setMessageListener(m
      -> orderStatusUpdatedEvent.fireAsync(m.getBody()));
}

Listing 5

@ApplicationScoped
@Path("/orders")
public class OrderResource {

  @Context
  private Sse sse;
  private Map<OrderNumber, List<SseEventSink>> statusUpdateRequests
    = new HashMap<>();

  @GET
  @Path("/{orderNumber}/status")
  @Produces(MediaType.SERVER_SENT_EVENTS)
  public void getStatus(@Context SseEventSink statusUpdateRequest) {
    statusUpdateRequests
      .computeIfAbsent(orderNumber, o -> new ArrayList<>())
      .add(statusUpdateRequest);
  }

  public void updateStatus(
    @ObservesAsync @Updated OrderStatus status) {
    OutboundSseEvent event = sse.newEventBuilder()
      .id(...)
      .name("statusUpdate")
      .data(status.toString())
      .build();
    Optional
      .ofNullable(statusUpdateRequests
        .remove(status.getOrderNumber()))
      .ifPresent(l -> l.stream()
        .forEach(s -> s.send(event)));
  }
}

Non-blocking sending

The sending of messages also allows for non-blocking to be implemented in the various technologies. In WebSockets (Listing 6) as well as JMS (Listing 7), we can make sure without much effort that the sender does not need to wait for the messages, but rather will be informed about the event via callback as needed.

Since Version 3.1, the Servlet standard has been offering callback methods that we can implement with streaming larger volumes of data. The callbacks are then executed when data is available: during streaming from client to server, when the line is free, or during streaming from server to client (Listings 8 and 9).

 

Listing 6

public void updateStatus(
    @ObservesAsync @Updated OrderStatus status) {
  websocketSession
    .getAsyncRemote()
    .sendObject(status, result -> ...);
}

Listing 7

public void updateStatus(
    @ObservesAsync @Updated OrderStatus status) {
  jmsContext.createProducer(topic)
    .setAsync(new CompletionListener() {
      public void onCompletion(Message message) {
        ...
      }
      public void onException(
        Message message,
        Exception exception) {
        ...
      }
    })
    .send(status);
}

Listing 8

AsyncContext context = servletRequest.startAsync();
context.getRequest()
  .getInputStream()
  .setReadListener(new ReadListener() {
    public void onDataAvailable() {
      ...
    }

    public void onAllDataRead() {
      ...
    }

    public void onError(Throwable t) {
      ...
    }
  });

Listing 9

AsyncContext context = servletRequest.startAsync();
context.getResponse()
  .getInputStream()
  .setReadListener(new ReadListener() {
    public void onDataAvailable() {
      ...
    }

    public void onAllDataRead() {
      ...
    }

    public void onError(Throwable t) {
      ...
    }
  });

What is missing in Java EE?

Until now, I have shown off some of the capabilities that Java EE has to offer. However, there is an important area of asynchrony that I have not yet documented: transactions and persistence. Currently, in a standard EE application, a transaction is still always tied to the current thread. There is also no possibility to transfer the active transaction from one thread to another. The situation is similar with the JPA specification. Here as well, there have been no indications of asynchrony to date, let alone reactivity. In this very case, access to a long-running database seems to be the perfect solution for a streaming solution and asynchronous processing.

The question as to why nothing has happened in the JPA standard yet in terms of asynchrony can be answered quickly. JPA encloses the database access via JDBC. In JDBC, there is simply no way to run database queries asynchronously. Yet, the remedy is in sight here. Work is progressing on a solution called ADBA [1] that would provide asynchronous access to relational databases in Java in a standardized manner. Once this specification is brought to a standard of maturity, the next step to an asynchronous JPA grows even closer.

The thread binding of transactions is also still an open issue. A look into the JBatch specification shows that an adaptation of this pattern is quite possible. However, the path to make that possible seems to be much longer in the case of transactions.






Java 9 reactive streams

Based on the example shown here, we can deduce which existing APIs would already allow for something like reactive programming. Still, we can also clearly observe that each Java EE specification brings its own callback interface, which can be more or less useful at times. Above all, as we have seen, you need to take matters into your own hands if you want to combine them. Moreover, it should be noted that the APIs are indeed suited to technically offer what lies behind reactive programming, namely non-blocking and stream orientation. However, they do not provide the comfort level needed to write readable and maintainable software using reactive programming. To do that, we need to fall back on third-party libraries that also support the reactive paradigm in operation like Akka [2], Vert.x [3], and Project Reactor [4].

What’s missing is a common API that would allow connection between Java EE technologies and with third-party libraries. Since Java 8, the CompletableFuture interface has been available and offers a common API for asynchronous processing of individual data records. It wasn’t until Java 9 that processing of reactive streams had become standard. Here, the reactive streams initiative [5] has been picked up and standardized through the Flow Interface. Flow.Publisher implements a class that provides a data flow, while the Flow.Subscriber interface implements a class that receives a data flow.

Even now, a variety of third-party libraries can be seamlessly combined using this interface, such as Project Reactor with Akka. In the future, it would be desirable to make this possible with Jakarta EE technologies as well.

Reactive streams in Jakarta EE

Support for Java 9 reactive streams in Jakarta EE has not been planned up to now. The reasons for this are more organizational rather than technical; the transition from Oracle to Eclipse is still pending, while the reorganization as an open source specification has yet to be completed. They haven’t even changed over the name from Java EE yet for Java 8. The conversion of the specifications and implementations is also encountering technical difficulties, thanks to the modularization associated with Project Jigsaw in Java 9. The specifications have quite simply not had enough time yet to extensively deal with Java 9 in terms of content and the related reactive streams. A blog-post by James Roper from February 2018 [6] and a look into the Jakarta EE mailing list does show though that they are slowly starting to address this issue.

Application scenarios of reactive programming

If necessary, we can already break the bond between thread and request in Java EE today and be on our way in a clearly more asynchronous, streaming-oriented, and even more reactive manner. Right away, this raises the question of when it is necessary at all to indicate the basic considerations [7] that each server is able to process a “normal” load. Do we really need reactive programming at all? Or is it just a toy for technology-loving nerds who want to squeeze the very last bit of performance out of their servers?

Yet, if you take a look at the list of companies standing behind each reactive third-party libraries like Akka and Vert.x, it becomes clear that this is in no way an irrelevant or niche paradigm. In today’s world, processing of large volumes of data is being more and more important; any approach that supports this automatically gains in significance as well. The fact that reactive programming constitutes such an approach in terms of performance as well as memory usage is something I had already clarified at the beginning of this article. But what does this mean for the day-to-day business of developers?

In most application cases, reactive programming will still not be needed as classic server paradigms can handle normal loads. However, each application will have use cases in which it makes sense to switch to reactive programming. The number of these use cases may increase considerably in the years to come. Reactive programming will not replace classic enterprise computing, but it will certainly complement it. Therefore, it is important that support for reactive programming is also provided in classic server technologies. To have this happen through a common API still remains an unfulfilled dream for the future, at least for now.

Conclusion

As we have seen, there already are quite a few APIs in Java EE that are designed to process data on demand and thus implement reactive programming. The link between a request and a thread has also long since eroded, even in the standard version. For the time being, each API defines its own interface to accomplish that. Standardization is urgently needed here. A suitable standard for this has already been available since Java 9 in the Reactive Streams API. With a switch from Java EE to Jakarta EE and hopefully an accompanying future update of the Java version to Java 9, it could be possible that the Reactive Streams API will find its way to the various standards. But for now, no first release of Jakarta EE can be seen on the horizon. Only MicroProfile features a quick and high-quality release sequence; however, the target platform is still Java 8, which blocks the integration of reactive streams for the time being.

Additionally, Java still lacks the basics for reactive streams in a few places. This is the case for transactions and JPA, for example. The basic rule applies here that the lack of non-blocking data access means no reactive JPA. Progress is being made on the required basis with ADBA, but it has not been completed yet and is therefore not included in Java 9 or Java 10.

It will still be a while until the reactive streams standard finds its way into standard Enterprise Java. If reactive programming should nevertheless seem useful in individual use cases, you can already use one of the many special solutions included in the individual specifications. The alternative is to fall back on one of the third-party libraries. The developer must then build the interaction between the standards on their own.

 

limburg_arne_sw.tif

Arne Limburg is a software architect at open knowledge GmbH in Oldenburg. He has many years of experience as a developer, architect and consultant in the Java environment and has also been active in the Android environment since day one.

Links & literature

[7] Fasel, Marc: „Performance Comparison between Node.js and Java EE“: https://dzone.com/articles/performance-comparison-between

Top Articles About Java Core & JVM Languages

STAY TUNED!

JOIN OUR NEWSLETTER

Behind the Tracks

Software Architecture & Design
Software innovation & more
Microservices
Architecture structure & more
Agile & Communication
Methodologies & more
DevOps & Continuous Delivery
Delivery Pipelines, Testing & more
Big Data & Machine Learning
Saving, processing & more