Thursday, December 22, 2016

Ruminating on AMQP internals and JMS equivalent semantics

Many Java developers often use JMS APIs to communicate with the message broker. The JMS API abstracts away the internal implementation complexities of the message broker and provides a unified interface for the developer.

But if you are interested in understanding the internals of AMQP, then the following old tutorial on the Spring site is still the best - and

By understanding the core concepts of exchange, queue and binding keys, you can envisage multiple integration patterns such as pub/sub, req/reply, etc.

Jotting down snippets from the above sites on the similarities between JMS and AMQP.

JMS has queues and topics. A message sent on a JMS queue is consumed by no more than one client. A message sent on a JMS topic may be consumed by multiple consumers. AMQP only has queues. AMQP producers don't publish directly to queues. A message is published to an exchange, which through its bindings may get sent to one queue or multiple queues, effectively emulating JMS queues and topics.

AMQP has exchanges, routes, and queues. Messages are first published to exchanges with a routing key. Routes define on which queue(s) to pipe the message. Consumers subscribing to that queue then receive a copy of the message. If more than one consumer subscribes to the same queue, the messages are dispensed in a round-robin fashion.

It is very important to understand the difference between routing key and binding key. Publishers publish messages to the AMQP Exchange by giving a routing key; which is typically in the form of {company}.{product-category}.{appliance-type}.{appliance-id}.....

You create a queue on AMQP by specifying the binding key. For e,g, if your binding key is {company}.{product-category}.# then all messages for that product category would come to this queue.

While creating subscribers, you have two options - either bind to an existing queue (by giving the queue name) or create a subscriber private queue by specifying a binding key. 

Thursday, December 01, 2016

Ruminating on non-blocking REST services

In a typical REST service environment, a thread is allocated to each incoming HTTP request for processing. So if you have configured your container to start 50 threads, then you can handle 50 concurrent HTTP requests and any additional HTTP request would be queued till a thread is free.

Today, using principles of non-blocking IO and reactive programming, we can break the tight coupling between a thread and a web request. The Servlet-3 specification also supports async requests as explained in this article - The core idea is to  delegate the long-running or asynchronous processing to another background thread (Task Executor), so that the HTTP handler threads are not starved.

One might argue that we are just moving the 'blocking thread' bottleneck from the HTTP threads to the backend thread pool (Task Executors). But this does result in better performance, as we can serve more HTTP clients.

The Spring MVC documentation on Async REST MVC is worth a perusal to understand the main concepts -

A good article that demonstrates how Sprint MVC can be used to build non-blocking REST services that call a backend service exposed via JMS - All the source code for this example is available here. One must understand the basics of Spring Integration framework to work with the above example.

The following discussion thread on StackOverFlow would also give a good idea on how to implement this -

The following blog-post shows another example using Spring Boot on Docker -

Concurrency and scaling strategies for MDPs and MDBs

Message Driven Beans offer a lot of advantages over a standalone JMS consumer as listed in this blog-post. The Spring framework provides us with another lightweight alternative called MDP (Message Driven POJOs) that has all the goodies of MDB, but without any heavy JEE server baggage.

The strategy for implementing a scalable and load-balanced message consumption solution is a bit different in MDBs vs. MDPs.

An MDB is managed by the JEE container and is 'thread-safe' by default. The JEE container maintains a pool of MDBs and allows only one thread to execute an MDB at one time. Thus if you configure your JEE container to spawn 10 MDBs, you can have 10 JMS consumers processing messages concurrently.

Spring MDPs are typically managed by the DMLC (DefaultMessageListenerContainer). Each MDP is typically a singleton, but can have multiple threads running through it. Hence an MDP is NOT 'thread-safe' by default and we have to make sure that our MDPs are stateless - e.g. do not have instance variables, etc. The DMLC can be configured for min/max concurrent consumers to dynamically scale the number of consumers. All connection, session and consumer objects are cached by Spring to improve performance. Jotting down some important stuff to remember regarding DMLC from the Spring Java Docs.

"On startup, DMLC obtains a fixed number of JMS Sessions to invoke the listener, and optionally allows for dynamic adaptation at runtime (up to a maximum number). Actual MessageListener execution happens in asynchronous work units which are created through Spring's TaskExecutor abstraction. By default, the specified number of invoker tasks will be created on startup, according to the "concurrentConsumers" setting."

It is also possible in Spring to create a pool of MDPs and give one to each TaskExecutor, but we have never tested this and never felt the need for this. Making your MDPs stateless and hence 'thread-safe' would suffice for almost all business needs. Nevertheless, if you want to create a pool of MDP objects, then this link would help -

Another good article on Spring MDP scaling that is worth perusing is here -

Implementing a Request Response with Messaging (JMS)

Quite often, we need to implement a request/response paradigm on JMS - e.g. calling a backend function on a mainframe or a third-party interface that has an AMQP endpoint. To implement this, we have the following 3 options:

Option 1: The client creates a temporary queue and embeds the name-address of this queue in the message header (JMSReplyTo message header) before sending it to the server. The server processes the request message and writes the response message on to the temp queue mentioned in the JMSReplyTo header.
The advantage of this approach is that the server does not need to know the destination of the client response message in advance.
An important point to remember is that a temporary queue is only valid till the connection object is open. Temporary queues are generally light-weight, but it depends on the implementation of the MOM (message-oriented middleware).

 Option 2: Create a permanent queue for response messages from the server. The client sets a correlation ID on the message before it is sent to the server. The client then listens on the response queue using a JMS selector - using the correlation ID header value as the selector property. This ensures that only the appropriate message from the response queue is delivered to the appropriate client.

Option 3: Use a combination of Option 1 and Option 2.  Each client creates a temporary queue and JMS Consumer on startup. We need to use this option if the client does not block for each request. So the client can keep on sending messages and listen to multiple response messages on the temp queue and then match the req/res using the correlation ID.

Spring has implemented this design pattern in the JMS Outbound Gateway class -

The JMS spec also contains an API for basic req/res using temporary queues - QueueRequestor and TopicRequestor. So if your MOM supports JMS, then you can use this basic implementation if it suffices your needs.

The following links would serve as a good read on this topic.