Saturday, September 08, 2018

Ruminating on Kafka consumer parallelism

Many developers struggle to understand the nuances of parallelism in Kafka. So jotting down a few points that should help from the Kafka documentation site.
  • Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
  • Publishers can publish events into different partitions of Kafka. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record).
  • The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism. 
Unlike other messaging middleware, parallel consumption of messages (aka load-balanced consumers) in Kafka is ONLY POSSIBLE using partitions. 

Kafka keeps one offset per [consumer-group, topic, partition]. Hence there cannot be more consumer instances within a single consumer group than there are partitions
So if you have only one partition, you can have only one consumer (within a particular consumer-group). You can of-course have consumers across different consumer-groups, but then the messages would be duplicated and not load-balanced. 

Batch ETL to Stream Processing

Many of our customers are moving their traditional ETL jobs to real-time stream processing.
The following article is an excellent read of why Kafka is an excellent choice for unified batch processing and stream processing.

https://www.infoq.com/articles/batch-etl-streams-kafka

Snippets from the article:

  • Several recent data trends are driving a dramatic change in the old-world batch Extract-Transform-Load (ETL) architecture: data platforms operate at company-wide scale; there are many more types of data sources; and stream data is increasingly ubiquitous
  • Enterprise Application Integration (EAI) was an early take on real-time ETL, but the technologies used were often not scalable. This led to a difficult choice with data integration in the old world: real-time but not scalable, or scalable but batch.
  • Apache Kafka is an open source streaming platform that was developed seven years ago within LinkedIn.
  • Kafka enables the building of streaming data pipelines from “source” to “sink” through the Kafka Connect API and the Kafka Streams API.
  • Logs unify batch and stream processing. A log can be consumed via batched “windows”, or in real time by examining each element as it arrives.

Thursday, August 30, 2018

Tips and Tricks for Thread Dumps

Tip 1#: To find out the number of threads spawned by the JVM, run the following command: ps -eLF
This command will also print a column called 'LWP ID' (light-weight process ID) that prints the thread-id and the CPU utilization of that thread. This same thread-id can be correlated in the thread-dump obtained from the JVM.

Tip 2#: The thread-dump can be obtained by using the following command: jstack PID
If you are using Spring Boot framework, then we can use the Actuator URLs to download the thread-dump.

Tip 3#: This thread-dump file can be uploaded to a cool online tool : http://fastthread.io/ which gives a beautiful report on the threads running inside the JVM that can be analyzed.

Wednesday, August 29, 2018

Ruminating on Thread Pool sizes and names

Recently we were analyzing the thread-dumps of some JVMs in production and found a large number of threads created in certain thread-pools. Since the name of the thread-pool was generic (e.g. thread-pool-3, etc.) it was very difficult to diagnose the code that spawned the thread-pool.

The following code snippets should help developers in properly naming their thread-pools and also limiting the number of threads. In a cloud environment, the number of default threads in a pool will vary based on the CPU's available - for e.g. in the absence of a thread-pool size, the thread pool may grow to hundreds of threads.

We had seen this happen for the RabbitMQ java client. The default RabbitMQ java client uses a thread pool for callback messages (ACK) and the size of this thread-pool depends on the number of CPUs. Since the JVM is not aware of the docker config, all the processors on the host machine are considered and a large thread pool is created.

More details available here - https://github.com/logstash-plugins/logstash-input-rabbitmq/issues/93


Wednesday, July 18, 2018

Creating a RabbitMQ pipeline using listeners/publishers

In Event Driven Architectures, you often have to create a pipeline of event processing. One of my teams was using Spring AMQP libary and wanted to implement the following basic steps.
1. Read a message from the queue using a RabbitMQ channel.
2. Do some processing and transform the message.
3. Publish the message downstream with the same channel

The sample code given below will help developers in implementing this.

Another scenario is when you have multi-threaded code that is publishing messages to RabbitMQ, then you can use the RabbitMQTemplate with Channel caching. Sample code given below.

Wednesday, July 04, 2018

Wednesday, June 27, 2018

Simple multi-threading code

Very often, we have to process a list of objects. Using a for-loop would process these objects in sequence. But if we want to process them in parallel, then the following code snippet will help.


Simple Utils for File & Stream IO

I still see many developers reinventing the wheel when it comes to IO/Stream operations. There are so many open-source libraries for doing this for you today :)

Given below are some code snippets for the most common use-cases for IO.


Wednesday, May 23, 2018

Ruminating on Agile estimates

Over the past few years, we have been using story points for estimation, rather than using man-hours.
For a quick introduction of agile estimation, please peruse the following links that give a good overview.

https://rubygarage.org/blog/how-to-estimate-with-story-points
https://rubygarage.org/blog/how-to-estimate-project-cost
https://rubygarage.org/blog/3-reasons-to-estimate-with-story-points

But still folks struggle to understand the advantages of estimating by story points. During the planning poker session, all team members discuss each story point and arrive at the story points through consensus. Thus each team member has skin in the game and is involved in the estimation process.

The time needed to complete a story point will vary based on a developer’s level of experience, but the amount of work is correctly estimated using story points.

IMHO, velocity should be calculated only after 2-3 sprints. This average velocity (#story-points/sprint) can be used to estimate the calendar timelines for the project.


Thursday, March 29, 2018

Cool Java client library for secure FTP

My team was looking for a library to PUT and GET files from a sftp server. We used the cool Java library called jSch (www.jcraft.com/jsch/).

Sample code to download and upload files to the SFTP server is given below..


Sunday, March 18, 2018

Spring WebSocket STOMP tips and tricks

Recently we successfully implemented secure Websockets in one of our projects and learned a lot of tricks to get things working together. Given below are some tips that would help teams embarking on implementing WebSockets in their programs.

1)  Spring uses the STOMP protocol for Web Sockets. The other popular protocol for Web Sockets is WAMP, but Spring does not support it. Hence if you are using Spring, make sure that your Android, iOS and JS  libraries support STOMP.

2)  Spring websocket library by default also supports SockJS as a fall-back for web JS clients. If your use-case only entails supporting Android and iOS clients, then disable SockJS in your Spring configuration. It might happen that a use-case might work on SockJS on a web-client, but fail in native mobile code.

3) The default SockJS implementation of Spring, sends a server heartbeat header (char 'h') every 25 seconds to the clients. Hence there is no timeout on the socket connection for JS clients. But on mobile apps (pure STOMP), there is no heartbeat configured by default. Hence we have to explicitly set the heartbeat on the server OR on the client to keep the connection alive when idle. Otherwise, idle connections get dropped after 1 minute. Reference Links: https://stackoverflow.com/questions/28841505/spring-4-stomp-websockets-heartbeat
Sample server side code below.

4) One Android websocket library that works very well with Spring websockets is https://github.com/NaikSoftware/StompProtocolAndroid. But unfortunately, this library does not support heartbeats. Hence you have to explicitly send heartbeats using sample code like this - https://github.com/NaikSoftware/StompProtocolAndroid/issues/18.

5) On iOS, the following socket library worked very well with Spring websockets - https://github.com/rguldener/WebsocketStompKit. We faced an issue, where this library was throwing Array Index Range exceptions on server side heartbeat messages, but we made small changes in this library to skip process for heart-beat header messages.

6)  It is recommended to give the URL scheme as wss://, although we found that https:// was also working fine. If you have SockJS enabled on Spring, then please append the URL with /websocket, as you would get a exception stating invalid protocol up-gradation. Hence clients should subscribe to/{endpoint}/websocket. https://github.com/rstoyanchev/spring-websocket-portfolio/issues/14

7)  Both libraries also support sending headers during the CONNECT step. Very often, teams send the authorization token during the CONNECT step in the header, that can be used to authenticate the client. To access these headers on the server we need to access the Nativeheaders hashmap. Sample code - https://github.com/rsparkyc/WebSocketServer/blob/master/src/main/java/hello/ConnectionListener.java

  

Tuesday, March 13, 2018

Ruminating on Jackson JSON Parsing

In my previous post, we discussed about how we can extract an arbitrary JSON value out of a JSON string. Very often, developers face another error while using Jackson - com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field error.

This happens when your JSON string has an attribute that is not present in your POJO and you are trying to deserialize it. Your POJO might not be interested in these fields or these fields could be optional.

To resolve this error, you have two options:

Option 1:  Disable the error checking on the Jackson Mapper class as follows.
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)

Option 2: If you have access to the POJO object, then you can annotate it as follows
@JsonIgnoreProperties(ignoreUnknown = true)

Saturday, March 10, 2018

Identifying RabbitMQ consumers

One of the challenges my team was facing was to accurately identify the consumers of a RabbitMQ queue. Let's say if you have 5 consumers and you want to kill one of them through the dashboard, you need to identify the consumer.

Each consumer of RabbitMQ has a tag value that can be used to uniquely identify it. By default, RabbitMQ assigns some random number as the consumer tag, but just by looking at this random string tag there is no way to identify the actual consumer. 

To resolve this, you need to create a ConsumerTagStrategy and associate it with the MessageListenerContainer. Code snippet given below: 

Tuesday, February 27, 2018

Ruminating on BPM vs Case Management

Most BPM tools today support both BPMN (business process modeling notation) as well as CMMN (Case Management Modeling Notation). But when to use what?

It all depends on the process that you want to model. Given below are some tips that can be used to decide whether to model the process as a traditional BPM or a case management solution.

Traditional BPM: 
  • If your process is a predefined and ordered sequence of tasks - e.g. sending out a insurance renewal message, onboarding an employee, etc. 
  • The order of the steps rarely change - i.e. the process is repeatable.
  • Business users cannot dynamically change the process. The process determines the sequence of events. 
Case Management:
  • When the process does not have a strict ordering of steps - e.g. settling a claim.
  • The process depends on the knowledge worker, who decides the next steps. 
  • External events (submission of documents) determine what next step the knowledge worker will take.  
  • Case management empowers knowledge workers and provides them with access to all the information concerning the case. The knowledge worker then uses his discretion and control to move the case towards the next steps. 
Using the above guidelines, you can model your process using BPMN or CMMN. Business rules can be modeled as DMN (decision modeling notation). 

Monday, February 19, 2018

Adding custom filters to Spring Security

One of my teams was looking for an option for adding filters on the Spring Security OAuth Server.

As we know, the Spring Security OAuth2 Server is a complex mesh of filters that get the job done in implementing all the grant types of the OAuth specification - https://docs.spring.io/spring-security/site/docs/current/reference/html/security-filter-chain.html#filter-ordering

The team wanted to add additional filters to this pipeline of Security filters. There are many ways of achieving this:

Option 1: Create a custom filter by extending the Spring GenericFilterBean class. You can set the order by using the @Order annotation.

Option 2: Register the filter manually in the WebSecurityConfigurerAdapter class using the addFilterAfter/addFilterBefore methods.

Option 3: Set the property "security.filter-order=5" in your application.properties. Now you can add upto 4 custom filters and set the order as either 1,2,3,4.
Another option is to manually set the order (without annotations) using FilterRegistrationBean in any @Configuration

The following 2 blogs helped us explore all options and use the appropriate one.
https://mtyurt.net/post/spring-how-to-insert-a-filter-before-springsecurityfilterchain.html
http://anilkc.me/understanding-spring-security-filter-chain/








Monday, January 29, 2018

Ruminating on the V model of software testing

In the V model of software testing, the fundamental concept in to interweave testing activities into each and every step of the development cycle. Testing is NOT a separate phase in the SDLC, but rather testing activities are carried out right from the start of the requirements phase.

  • During requirements analysis, the UAT test cases are written. In fact, many teams have started using user stories and acceptance criteria as test cases. 
  • System test cases and Performance test cases are written during the Architecture definition and Design phase. 
  • Integration test cases are written during the coding and unit testing phase. 
A good illustration of this is given at http://www.softwaretestinghelp.com/what-is-stlc-v-model/


Monday, January 22, 2018

Ruminating on Netflix Conductor Orchestration

We have been evaluating the Netflix open source Conductor project and are intrigued by the design decisions made in it.

Netflix Conductor can be used for orchestration of microservices. Microservices are typically loosely coupled using pub/sub semantics and leveraging a resilient message broker such as Kafka. But this simple and proven event driven architecture did not suffice the needs for Netflix.

A good article on the challenges faced by the Netflix team and the genesis of Conductor is available here. Snippets from the article:

"Pub/sub model worked for simplest of the flows, but quickly highlighted some of the issues associated with the approach:

  • Process flows are “embedded” within the code of multiple applications - e.g. If you have a pipeline of publishers and subscribers, it becomes difficult to understand the big picture without proper design documentation. 
  • There is tight coupling and assumptions around input/output, SLAs etc, making it harder to adapt to changing needs  - i.e. How will you monitor that the response for a particular task is completed within a 3 second SLA? If not done within this timeframe, we need to mark this task as a failure. Doing this is very difficult with pub/sub unless we code this ourselves.
  • No easy way to monitor the progress - When did the process complete? At what step did the transaction fail?

To address all the above issues, the Netflix team created Conductor. Conductor servers are also stateless and can be deployed on multiple servers to handle scale and availability needs.

Ruminating on idempotency of REST APIs

Most REST services are stateless - i.e. they do not store state in memory and can be scaled out horizontally.

There is another concept called a 'idempotent' operation. An operation is considered to be idempotent if making the same call with the same input parameters repeatedly gives the same results. In other words, the service does not differentiate between one call or a million calls.

Typically all GET, HEAD, OPTIONS calls are safe and idempotent. PUT and DELETE are also considered idempotent, but they can return different responses, but with no state change. A good video describing this is here  - http://www.restapitutorial.com/lessons/idempotency.html

POST calls are NOT idempotent, as every call creates a new record. 

Wednesday, January 17, 2018

Why GPU computing and Deep Learning are a match made in heaven?

Deep learning is a branch of machine learning that uses multi-layered neural networks for solving a number of challenging AI problems.

GPU (Graphics Processing Unit) architecture are fundamentally different from CPU architecture. A GPU chip would be significantly slower that a CPU, but a single GPU might have thousands of cores while a CPU usually has not more than 12 cores.

Hence any task that can be parallelized over multiple core is a perfect fit for GPUs. Now it so happens, that deep learning algorithms involve a lot of matrix multiplications that are an excellent candidate for parallel processing over the cores of a GPU. Hence GPUs make deep learning algorithms run faster by an order of magnitude. Even training of models is much faster and hence you can expedite the GTM of your AI solutions.

An example of how GPUs are better for AI solutions is considering AlexNet, a well known image classification deep network. A modern GPU costing about $1000 takes 2.5 days to fully train AlexNet on the very large ImageNet dataset. On the otherhand, it takes a CPU costing several thousand dollars nearly 43 days.

A good video demonstrating the difference between GPU and CPU is here:


Timeout for REST calls

Quite often, we need a REST API to respond within a specified time frame, say 1500 ms.
Implementing this using the excellent Spring RestTemplate is very simple. A good blog explaining how to use RestTemplate is available here - http://www.baeldung.com/rest-template

   

Monday, January 08, 2018

The curious case of Java Heap Memory settings on Docker containers

On docker containers, developers often come across OOM (out-of-memory) errors and get baffled because most of their applications are stateless applications such as REST services.

So why do docker containers throw OOM errors even when there are no long-living objects in memory and all short-lived objects should be garbage collected?
The answer lies in how the JVM calculates the max heap size that should be allocated to the Java process.

By default, the JVM assigns 1/4 of the total physical memory as the max heap size for the Java runtime. But this is the physical memory of the server (or VM) and not of the docker container.
So let's say your server has a memory of 16 GB on which you are running 8 docker containers with each container configured for 2 GB. But your JVM has no understanding of the docker max memory size. The JVM will allocate 1/4 of 16 GB = 4 GB as the max heap size. Hence garbage collection MAY not run unless the full heap is utilized and your JVM memory may go beyond 2 GB.
When this happens, docker or Kubernetes would kill your JVM process with an OOM error.

You can simulate the above scenario and print the Java heap memory stats to understand the issue. Print the runtime.freeMemory(), runtime.MaxMemory() and runtime.totalMemory() to understand the patterns of memory allocation and garbage collection.

The diagram below gives a good illustration of the memory stats of JVM.



So how do we solve the problem? There is a very simple solution - Just explicitly set the max memory of the JVM when launching the program - e.g. java -Xmx 2000m -jar myjar.jar
This will ensure that the garbage collection runs appropriately and an OOM error does not occur.
A good article throwing more details on this is available here.

Also, it is important to understand that the total memory consumed by a Java process (called as Resident Set Size RSS) is equal to the heap size + Perm Size (MetaSpace) + native memory (required for thread stacks, file pointers). If you have a large number of threads, then native memory consumption can also be high (No. of threads * -Xss)

Max memory = [-Xmx] + [-XX:MaxPermSize/MaxMetaSpace] + [number_of_threads * (-Xss)] 

Since JDK 8, you can also utilize the following -XX parameters:
-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap //This allows the JDK to understand the CGroup memory limits on the Linux kernel
-XX:MaxRAM //This specifies the max memory allocated to the JVM...includes both on-heap and off-heap memory. 

In Java 10, a lot of changes have been made to the JDK to make it container friendly and you would not need to specify any parameters.

Other articles worth perusing are:
http://trustmeiamadeveloper.com/2016/03/18/where-is-my-memory-java/
https://developers.redhat.com/blog/2017/04/04/openjdk-and-containers/
https://dzone.com/articles/how-to-decrease-jvm-memory-consumption-in-docker-u
https://plumbr.io/blog/memory-leaks/why-does-my-java-process-consume-more-memory-than-xmx
http://stas-blogspot.blogspot.com/2011/07/most-complete-list-of-xx-options-for.html

If you wish to check all the -XX options for a JVM, then you can specify the following command.
java -XX:+UnlockDiagnosticVMOptions -XX:+PrintFlagsFinal -version

If you have a debug build of Java, then you can also try:
java -XX:+UnlockDiagnosticVMOptions -XX:+UnlockExperimentalVMOptions -XX:+PrintFlagsFinal -XX:+PrintFlagsWithComments -version

There are also tools that you can utilize to understand the best memory options to configure, such as the Java Memory Build Pack.
https://github.com/cloudfoundry/java-buildpack-memory-calculator
https://github.com/dsyer/spring-boot-memory-blog/blob/master/cf.md