Tag: Java

Reactive Programming in a Nutshell

In this guest post, Peter Verhas attempts to explain reactive programming with simple yet effective examples, which drive home the concept of reactive programming.

Reactive programming is a paradigm that focuses more on where the data flows during computation than on how to compute the result. The problem is best described as several computations that depend on the output of one another, but if several may be executed independently of the other, reactive programming may come into the picture. As a simple example, we can have the following computation that calculates the value of h from some given b, c, e,and f values, using f1, f2, f3, f4,and f5 as simple computational steps:

If we write these in Java in a conventional way, the methods f1 to f5 will be invoked one after the other. If we have multiple processors and we are able to make the execution parallel, we may also perform some of the methods parallel. This, of course, assumes that these methods are purely computational methods and do not change the state of the environment, and, in this way, they can be executed independently of one another. For example, f1, f2, and f3 can be executed independently of one another. The execution of the  f4function depends on the output of f3, and the execution of f5 depends on the output of f1, f2, and f4.

If we have two processors, we can execute f1 and f2 together, followed by the execution of f3, then f4, and, finally, f5. These are the four steps. If we look at the preceding calculation not as commands but rather as expressions and how the calculations depend on one another, then we do not dictate the actual execution order, and the environment may decide to calculate f1 and f3 together, then f2 and f4, and, finally f5, saving one step. This way, we can concentrate on the data flow and let the reactive environment act upon it without putting in extra constraints:

This is a very simple approach of reactive programming. The description of the calculation in the form of expressions gives the data flow, but in the explanation, we still assumed that the calculation is executed synchronously. If the calculations are executed on different processors located on different machines connected to a network, then the calculation may not and does not need to be synchronous.

Reactive programs can be asynchronously executed if the environment is asynchronous. It may happen that the different calculations, f1 to f4, are implemented and deployed on different machines.

In such a case, the values calculated are sent from one to the other over the network, and the nodes execute the calculation every time there is a change in the inputs. This is very similar to good old analog computers that were created using simple building blocks, and the calculations were done using analog signals.

The program was implemented as an electronic circuit, and when the input voltage or current (usually voltage) changed in the inputs, the analog circuits followed it at light speed, and the result appeared in the output. In such a case, the signal propagation was limited by the speed of light on the wires and analog circuitry speed in the wired modules, which was extremely fast and may beat digital computers.

When we talk about digital computers, the propagation of the signal is digital, and this way, it needs to be sent from one calculation node to the other one, be it some object in JVM or some program on the network. A node has to execute its calculation if either of the following apply:

  • Some of the values in the input have changed
  • The output of the calculation is needed

If the input has not changed, then the result should eventually be the same as the last time; thus, the calculation does not need to be executed again—it would be a waste of resources. If the result of the calculation is not needed, then there is no need to perform the calculation, even if the result would not be the same as the last one. No one cares.

To accommodate this, reactive environments implement two approaches to propagate the values. The nodes may pull the values from the output of other modules. This will ensure that no calculation that is not needed will be executed. The modules may push their output to the next module that depends on them. This approach will ensure that only changed values ignite calculation. Some of the environments may implement a hybrid solution.

When values change in the system, the change is propagated toward the other nodes that again propagate the changes to another node, and so on. If we imagine the calculation dependencies as a directed graph, then the changes travel towards the transitive closure of the changed values along the nodes connected.

The data may travel with all the values from one node output to the other node input, or only the change may travel. The second approach is more complex because it needs the changed data and also meta information that describes what has changed. On the other hand, the gain may be significant when the output and input set of data is huge, and only a small portion of it is changed.

It may also be important to calculate and propagate only the actual delta of the change when there is a high probability that some of the nodes do not change the output for many of the different inputs. In such a case, the change propagation may stop at the node where there is no real change in spite of the changed input values. This can save up a lot of calculation in some of the networks.

In the configuration of the data propagation, the directed acyclic graph can be expressed in the code of the program; it can be configured, or it can even be set up and changed during the execution of the code dynamically. When the program code contains the structure of the graph, the routes and the dependencies are fairly static.

To change the data propagation, the code of the program has to be changed, recompiled, and deployed. If there are multiple network node programs, this may even need multiple deployments that should be carefully furnished to avoid different incompatible versions running on different nodes.

There should be similar considerations when the graph is described in some configuration. In such a case, the compilation of the program(s) may not be needed when only the wiring of the graph is changed, but the burden to have compatible configuration on different nodes in the case of a network execution is still there.

Letting the graph change dynamically also does not solve this problem. The setup and the structure are more flexible and, at the same time, more complex. The data propagated along the edges of the graph may contain not only computational data but also data that drives changes in the graph. Many times, this leads to a very flexible model called higher-order reactive programming.

Reactive programming has a lot of benefits, but, at the same time, it may be very complex, sometimes too complex, for simple problems. It is to be considered when the problem to be solved can easily be described using data graph and simple data propagations. We can separate the description of the problem and the order of the execution of the different blocks. This is the same consideration that we discussed in the previous chapter. We describe more about the what to do part and less about the how to do part.

On the other hand, when the reactive system decides the order of execution, what is changed, and how that should be reflected on the output of other blocks, it should do so without knowing the core of the problem that it is solving. In some situations, coding the execution order manually based on the original problem could perform better.

Note

This is similar to the memory management issue. In modern runtime environments, such as the JVM, Python runtime, Swift programming, or even Golang, there is some automated memory management. When programming in C, the programmer has full control over memory allocation and memory release.

In the case of real-time applications, where the performance and response time is of the utmost importance, there is no way to let an automated garbage collector take time and delay the execution from time to time. In such a case, the C code can be optimized to allocate memory when needed; there is a resource for the allocation and release of memory when possible, and there is time to manage memory.

These programs are better performing than the ones created for the same purpose using a garbage collector. Still, we do not use C in most of the applications because we can afford the extra resources needed for automated memory collection. Even though it would be possible to write a faster code by managing the memory manually, automated code is faster than what an average programmer would have created using C, and also the frequency of programming errors is much lower.

Just as there are some issues that we have to pay attention to when using automated memory management, we have to pay attention to some issues in a reactive environment, which would not exist in the case of manual coding. Still, we use the reactive approach for its benefits.

The most important issue is to avoid loops in thedependency graph. Although it is absolutely perfect to write thedefinition of calculations, a reactive system would probably not be able tocope with these definitions. Some reactive systems may resolve in somesimple-case cyclic redundancy, but that is an extra feature, and we generallyjust have to avoid that. Consider the following computations:

Here, a depends on b, so when b changes, a is calculated. However, b also depends on a, which is recalculated, and, in this way, the system gets into an infinite loop. The preceding example seems to be simple, but that is the feature of a good example. Real-life problems are not simple, and in a distributed environment, it is extremely hard sometimes to find cyclic redundancy.

Another problem is called a glitch.Consider the following definition:

When the parameter b is changed, for example, from 3 to 6, the value of a will change from 6 to 9, and, thus, q will change from 9 to 15. This is very simple. However, the execution order based on the recognition of the changes may first alter the value of q from 9 to 12 before modifying it to 15 in the second step.

This can happen if the calculating node responsible for the calculation of q recognizes the change in b before the value of a as a consequence of the change in the value of b. For a short period of time, the value of q will be 12, which doesn’t match the previous one and the changed state. This value is only a glitch in the system that happens after an input changes and also disappears without any further change in the input in the system:

If you have ever learned the design of logical circuits, then static hazards may ring a bell. They are exactly the same phenomenon.

Reactive programming also assumes that the calculations are stateless. The individual nodes that perform the calculation may have a state in practice and, in most cases, they do. It is not inherently evil to have a state in some calculation. However, debugging something that has a state is significantly more complex than debugging something that is stateless, and functional.

It is also an important aid to the reactive environment, letting it perform different optimizations based on the fact that the calculations are functional. If the nodes have a state, then the calculations may not be rearranged freely because the outcome may depend on the actual evaluation order. These systems may not really be reactive, or, at least, this may be debated.

If this article piqued your interest in reactive programming and Java in general, you can explore Peter Verhas’s Java Projects – Second Edition to learn the fundamentals of Java 11 programming by building industry grade practical projects. Following a learn-as-you-do approach, Java Projects – Second Edition is perfect for anyone who wants to learn the Java programming language (no prior programming experience required).

How to Use the jcmd Command for the JVM

This article will focus on the diagnostic command introduced with Java 9 as a command-line utility, jcmd. If the bin folder is on the path, you can invoke it by typing jcmd on the command line. Otherwise, you have to go to the bin directory or prepend the jcmd in your examples with the full or relative (to the location of your command line window) path to the bin folder.

If you open the bin folder of the Java installation, you can find quite a few command-line utilities there. These can be used to diagnose issues and monitor an application deployed with the Java Runtime Environment (JRE). They use different mechanisms to get the data they report. The mechanisms are specific to the Virtual Machine (VM) implementation, operating systems, and release. Typically, only a subset of these tools is applicable to a given issue.

If you do type it and there is no Java process currently running on the machine, you’ll get back only one line, as follows:

This shows that only one Java process is currently running (the jcmd utility itself) and it has the process identifier(PID) of 87863 (which will be different with each run).

JAVA Example

Now run a Java program, for example:

The output of jcmd will show (with different PIDs) the following:

If entered without any options, the jcmd utility reports the PIDs of all the currently running Java processes. After getting the PID, you can then use jcmd to request data from the JVM that runs the process:

Alternatively, you can avoid using PID (and calling jcmd without parameters) by referring to the process by the main class of the application:

You can read the JVM documentation for more details about the jcmd utility and how to use it.

How to do it…

jcmd is a utility that allows you to issue commands to a specified Java process:

  1. Get the full list of the jcmdcommands available for a particular Java process by executing the following line:

     

Instead of PID/main-class, enter the process identifier or the main class name. The list is specific to JVM, so each listed command requests the data from the specific process.

  1. In JDK 8, the following jcmdcommands were available:

JDK 9 introduced the following jcmd commands (JDK 18.3 and JDK 18.9 did not add any new commands):

  • queue: Prints the methods queued for compilation with either C1 or C2 (separate queues)
  • codelist: Prints n-methods (compiled) with full signature, address range, and state (alive, non-entrant, and zombie), and allows the selection of printing to stdout, a file, XML, or text printout
  • codecache: Prints the content of the code cache, where the JIT compiler stores the generated native code to improve performance
  • directives_add file: Adds compiler directives from a file to the top of the directives stack
  • directives_clear: Clears the compiler directives stack (leaves the default directives only)
  • directives_print: Prints all the directives on the compiler directives stack from top to bottom
  • directives_remove: Removes the top directive from the compiler directives stack
  • heap_info: Prints the current heap parameters and status
  • finalizer_info: Shows the status of the finalizer thread, which collects objects with a finalizer (that is, a finalize()method)
  • configure: Allows configuring the Java Flight Recorder
  • data_dump: Prints the Java Virtual Machine Tool Interface data dump
  • agent_load: Loads (attaches) the Java Virtual Machine Tool Interface agent
  • status: Prints the status of the remote JMX agent
  • print: Prints all the threads with stack traces
  • log [option]: Allows setting the JVM log configuration at runtime, after the JVM has started (the availability can be seen using VM.log list)
  • info: Prints the unified JVM info (version and configuration), a list of all threads and their state (without thread dump and heap dump), heap summary, JVM internal events (GC, JIT, safepoint, and so on), memory map with loaded native libraries, VM arguments and environment variables, and details of the operation system and hardware
  • dynlibs: Prints information about dynamic libraries
  • set_flag: Allows setting the JVM writable(also called manageable) flags
  • stringtableand VM.symboltable: Print all UTF-8 string constants
  • class_hierarchy [full-class-name]: Prints all the loaded classes or just a specified class hierarchy
  • classloader_stats: Prints information about the classloader
  • print_touched_methods: Prints all the methods that have been touched (have been read at least) at runtime

As you can see, these new commands belong to several groups, denoted by the prefix compiler, garbage collector (GC), Java Flight Recorder (JFR), Java Virtual Machine Tool Interface (JVMTI), Management Agent (related to remote JMX agent), thread, and VM.

How it works…

  1. To get help for the jcmdutility, run the following command:

Here is the result of the command:

It tells you that the commands can also be read from the file specified after -f, and there is a PerfCounter.print command, which prints all the performance counters (statistics) of the process.

  1. Run the following command:

The output may look similar to this screenshot:

It shows the total heap size and how much of it was used, the size of a region in the young generation and how many regions are allocated, and the parameters of Metaspace and class space.

  1. The following command is very helpful in case you are looking for runaway threads or would like to know what else is going on behind the scenes:

Here is a fragment of the possible output:

  1. This command is probably used most often, as it produces a wealth of information about the hardware, the JVM process as a whole, and the current state of its components:

It starts with a summary, as follows:

The general process description is as follows:

Then the details of the heap are shown (this is only a tiny fragment of it):

It then prints the compilation events, GC heap history, de-optimization events, internal exceptions, events, dynamic libraries, logging options, environment variables, VM arguments, and many parameters of the system running the process.

The jcmd commands give a deep insight into the JVM process, which helps to debug and tune the process for best performance and optimal resource usage.

If you found this article interesting, you can dive into Java 11 Cookbook – Second Edition to explore the new features added to Java 11 that will make your application modular, secure, and fast. Java 11 Cookbook – Second Edition offers a range of software development solutions with simple and straightforward Java 11 code examples to help you build a modern software system.

How to Develop a Real-Time Object Detection Project

Developing a real-time object detection project

You can develop a video object classification application using pre-trained YOLO models (that is, transfer learning), Deeplearning4j (DL4J), and OpenCV that can detect labels such as cars and trees inside a video frame. You can find the relevant code files for this tutorial at https://github.com/PacktPublishing/Java-Deep-Learning-Projects/tree/master/Chapter06. This application is also about extending an image detection problem to video detection. Time to get started!

Step 1 – Loading a pre-trained YOLO model

Since Alpha release 1.0.0, DL4J provides a Tiny YOLO model via ZOO. For this, you need to add a dependency to your Maven friendly pom.xml file:

Apart from this, if possible, make sure that you utilize the CUDA and cuDNN by adding the following dependencies:

Now, use the below code to load the pre-trained Tiny YOLO model as a Computation Graph. You can use the PASCAL Visual Object Classes (PASCAL VOC) dataset (see more at http://host.robots.ox.ac.uk/pascal/VOC/) to train the YOLO model.

In the above code segment, the createObjectLabels() method refers to the labels from the PASCAL Visual Object Classes (PASCAL VOC) dataset. The signature of the method can be seen as follows:

Now, create a Tiny YOLO model instance:

Take a look at the model architecture and the number of hyper parameters in each layer:

Network summary and layer structure of a pre-trained Tiny YOLO model

Your Tiny YOLO model has around 1.6 million parameters across its 29-layer network. However, the original YOLO 2 model has more layers. You can look at the original YOLO 2 at https://github.com/yhcc/yolo2/blob/master/model_data/model.png.

Step 2 – Generating frames from video clips

To deal with real-time video, you can use video processing tools or frameworks such as JavaCV that can split a video into individual frames. Take the image height and width. For this, include the following dependency in the pom.xml file:

JavaCV uses wrappers from the JavaCPP presets of libraries commonly used by researchers in the field of computer vision (for example, OpenCV and FFmpeg). It provides utility classes to make their functionality easier to use on the Java platform, including Android.

For this project, there are two video clips (each 1 minute long) that should give you a glimpse into an autonomous driving car. This dataset has been downloaded from the following YouTube links:

After downloading them, they were renamed as follows:

  • SelfDrivingCar_Night.mp4
  • SelfDrivingCar_Day.mp4

When you play these clips, you’ll see how Germans drive their cars at 160 km/h or even faster. Now, parse the video (first use day 1) and see some properties to get an idea of video quality hardware requirements:

The inputted video clip has 1802 frames. The inputted video clip has frame rate of 29.97002997002997.

Now grab each frame and use Java2DFrameConverter to convert frames to JPEG images:

The above code will generate 1,802 JPEG images against an equal number of frames. Take a look at the generated images:

From video clip to video frame to image

Thus, the 1-minute long video clip has a fair number (that is, 1,800) of frames and is 30 frames per second. In short, this video clip has 720p video quality. So, you can understand that processing this video should require good hardware; in particular, having a GPU configured should help.

Step 3 – Feeding generated frames into the Tiny YOLO model

Now that you know some properties of the clip, start generating the frames to be passed to the Tiny YOLO pre-trained model. First, look at a less important but transparent approach:

In the above code, you send each frame to the model. Then, you use the Mat class to represent each frame in an n-dimensional, dense, numerical multi-channel (that is, RGB) array.

In other words, you split the video clip into multiple frames and pass into the Tiny YOLO model to process them one by one. This way, you applied a single neural network to the full image.

Step 4 – Real Object detection from image frames

Tiny YOLO extracts the features from each frame as an n-dimensional, dense, numerical multi-channel array. Then, each image is split into a smaller number of rectangles (boxes):

In the above code, the prepareImage() method takes video frames as images, parses them using the NativeImageLoader class, does the necessary preprocessing, and extracts image features that are further converted into a INDArray format, consumable by the model:

Then, the markWithBoundingBox() method is used for non-max suppression in the case of more than one bounding box.

Step 5 – Non-max suppression in case of more than one bounding box

As YOLO predicts more than one bounding box per object, non-max suppression is implemented; it merges all detections that belong to the same object. Therefore, instead of using bxbybh, and bw, you can use the top-left and bottom-right points. gridWidth and gridHeight are the number of small boxes you split your image into. In this case, it is 13 x 13, where w and h are the original image frame dimensions:

Finally, remove those objects that intersect with the max suppression, as follows:

In the second block, you scaled each image into 416 x 416 x 3 (that is, W x H x 3 RGB channels). This scaled image is then passed to Tiny YOLO for predicting and marking the bounding boxes as follows:

Your Tiny YOLO model predicts the class of an object detected in a bounding box

Once the markObjectWithBoundingBox() method is executed, the following logs containing the predicted class, bxbybhbw, and confidence (that is, the detection threshold) will be generated and shown on the console:

Step 6 – Wrapping up everything and running the application

Up to this point, you know the overall workflow of your approach. You can now wrap up everything and see whether it really works. However, before this, take a look at the functionalities of different Java classes:

  • java: This shows how to grab frames from the video clip and save each frame as a JPEG image. Besides, it also shows some exploratory properties of the video clip.
  • java: This instantiates the Tiny YOLO model and generates the label. It also creates and marks the object with the bounding box. Nonetheless, it shows how to handle non-max suppression for more than one bounding box per object.
  • java: This main class continuously grabs the frames and feeds them to the Tiny YOLO model (until the user presses the Esckey). Then, it predicts the corresponding class of each object successfully detected inside the normal or overlapped bounding boxes with non-max suppression (if required).

In short, first, you create and instantiate the Tiny YOLO model. Then, you grab the frames and treat each frame as a separate JPEG image. Next, you pass all the images to the model and the model does its trick as outlined previously. The whole workflow can now be depicted with some Java code as follows:

Once the preceding class is executed, the application should load the pre-trained model and the UI should be loaded, showing each object being classified:

Your Tiny YOLO model can predict multiple cars simultaneously from a video clip (day)

Now, to see the effectiveness of your model even in night mode, perform a second experiment on the night dataset. To do this, just change one line in the main() method, as follows:

Once the preceding class is executed using this clip, the application should load the pre-trained model and the UI should be loaded, showing each object being classified:

Your Tiny YOLO model can predict multiple cars simultaneously from a video clip (night)

Furthermore, to see the real-time output, execute the given screen recording clips showing the output of the application.

If you found this interesting, you can explore Md. Rezaul Karim’s Java Deep Learning Projects to build and deploy powerful neural network models using the latest Java deep learning libraries. Java Deep Learning Projects starts with an overview of deep learning concepts and then delves into advanced projects.

Working with CDI Bean in Java

Creating your CDI bean

A CDI bean is an application component that encapsulates some business logic. Beans can be used either by a Java code or by the unified EL (expression language used in JSP and JSF technologies). The beans’ life cycles are managed by the container and can be injected into other beans. To define a bean, all you need to do is to write a POJO and declare it to be a CDI bean. To declare this, there are two primary approaches:

  • Using annotations
  • Using the xml file

Both ways should work; however, folks prefer using annotations over XML as it’s handy and included in the actual coding context. So, why is XML still there? Well, that’s because annotations are relatively new in Java (released in Java 5). Until they were introduced, there was no other way in Java other than XML to provide configuration information to the application server. And since then, it continued to be just another way, alongside the annotations approach.

Moreover, if both are used together, XML is going to override annotations. Some developers and application administrators tend to perform temporary changes or hot-fixes at times, by overriding some hard-coded programmatic configuration values, using external XML files. It’s worth mentioning that this approach is not a recommended way to actually deploy things into your production.

In this article, you’ll use the annotations approach. Now, start by defining your first CDI bean:

First CDI bean

Perform the below steps:

  1. Defining a CDIbean – Start the first step by creating a new Java class with the name MyPojo, and then write the following code:

This bean is nothing more than a plain old Java object, annotated with the @Dependent annotation. This annotation declares that your POJO is a CDI component, which is called the dependent scope. The dependent scope tells the CDI context that whenever you request an injection to this bean, a new instance will be created.

  1. Now, inject your baby CDIbean into another component. Use a servlet as an example to this. Create a servlet named ExampleServlet and write the following code:

You have used the @Inject annotation to obtain an instance to the MyPojo class. Now run the application and visit http://localhost:8080/EnterpriseApplication1-war/cdi-example-1. You should see a page with the following text:

Congratulations! You have created and used your first CDI bean.

Providing alternative implementations to your bean

One of the greatest features of CDI is that you can provide two or more different implementations to the same bean. This is very useful if you wish to do one of the following:

  • Handling client-specific business logic that is determined at runtime; for example, providing different payment mechanisms for a purchase transaction
  • Supporting different versions for different deployment scenarios; for example, providing an implementation that handles taxes in the USA and another one for Europe
  • Easier management for test-driven development; for example, you can provide a primary implementation for production and another mocked one for testing

To do this, you should first rewrite your bean as an abstract element (abstract class or interface) to provide different implementations according to the basic OOP principles. Now rewrite your bean to be an interface as follows:

Create an implementation class to your new interface:

Now, without any modifications to the servlet class, you can test re-run your example; it should give the following output:

What happened at runtime? The container received your request to inject a MyPojo instance. Since the container has detected your annotation over an interface and not a class stuffed with an actual implementation, it has started looking for a concrete class that implements this interface. After this, the container has detected the MyPojoImp class that satisfies this criterion. Therefore, it has instantiated and injected it for you.

Now provide different implementations. For this, you’ll need to create a new class that implements the MyPojo interface. Create a class called AnotherPojoImp as follows:

Seems simple, right? But if you checked your servlet code again and if you were in the container shoes, how would you be able to determine which implementation should be injected at runtime? If you tried to run the previous example, you will end up with the following exception:

You have an ambiguity here, and there should be some meaning to specify which implementation version should be used at runtime. In CDI, this is achieved using qualifiers.

Using qualifiers

A qualifier is a user-defined annotation that is used to tell the container which version of the bean implementation you wish to use at runtime. The idea of qualifiers is too simple; you define a qualifier, and then you annotate both the bean and injection point with this qualifier.

Now define your first qualifier for the newly created bean implementation and create a new annotation with the following code:

As you see, the qualifier is a custom-defined annotation, which itself is annotated with the @Qualifier annotation. @Qualifier tells the container that this annotation will act as a qualifier, while @Retention(RUNTIME) tells the JVM that this annotation should be available for reflection use at runtime. This way the container can check for this annotation at runtime. @Target{TYPE, METHOD, FIELD, PARAMETER} tells the compiler that this annotation can be used on types, methods, fields, and parameters. Note that the @Qualifier annotation is the key annotation here.

Here is the summary of the annotations to make it clearer:

Annotation Description
@Qualifier Tells CDI that this annotation is going to be used to distinguish between different implementations to the same interface.
@Retention(RUNTIME) Tells JVM that this annotation is intended to be used at runtime. Required for qualifiers.
@Target({TYPE, METHOD, FIELD, PARAMETER}) Tells JVM that this annotation can be used on the mentioned syntax elements.

Now, add the @AnotherImp annotation to AnotherPojoImp as follows:

The annotation’s role here is that it tells the container that this version of the class is called AnotherImp. Now you can reference this version by modifying the servlet as follows:

With Example

But how can you reference the original implementation MyPojoImp? There are two options available to do this:

  • Defining another qualifier for MyPojoImp, like the earlier example
  • Using the default qualifier

The default qualifier, as the name suggests, is the default one for any CDI bean that has not been explicitly qualified. Although an explicit declaration for the default qualifier is considered redundant and useless, it’s possible to explicitly declare your CDI bean as a default one using the @Default annotation, as shown in the following revision to the MyPojoImp class:

Again, @Default is redundant, but you should consider its existence even if you have not explicitly declared it. Now, to reference the MyPojoImp from the servlet, rewrite it as follows:

This way, the original MyPojoImp implementation will be injected instead. And likewise, you can eliminate the @Default annotation, as the default implementation will be used by default!

If you found this article interesting, you can explore Abdalla Mahmoud’s Developing Middleware in Java EE 8 to use Java features such as JAX-RS, EJBs, and JPAs for building powerful middleware for newer architectures such as the cloud. This book can help you become an expert in developing middleware for a variety of applications.

Building an Event-Driven Reactive Asynchronous System with Spring Boot and Kafka

Building an event-driven Reactive Asynchronous System

Spring Boot provides a new strategy for application development with the Spring Framework. It enables you to focus only on the application’s functionality rather than on Spring meta configuration, as Spring Boot requires minimal to zero configuration in the Spring application.

This article will show you how to build a sample project that demonstrates how to create a real-time streaming application using event-driven architecture, Spring Cloud Stream, Spring Boot, Apache Kafka, and Spring Netflix Eureka.

Architecture

Here, Netflix Hystrix has been used to implement the circuit breaker pattern and the API Gateway proxy has been configured using Netflix Zuul.

To get started, create an application with three microservices: Account, Customer, and Notification. Whenever you create a customer record or create an account for a customer, a notification service sends an email and a mobile notification.

All three decoupled services—Account, Customer, and Notification—are independently deployable applications. The Account service can be used to create, read, update, and delete customer accounts. It also sends a message to the Kafka topic when a new account is created.

Similarly, the Customer service is used to create, read, update, and delete a customer in the database. It sends a message to the Kafka topic when a new customer is created, and the Notification service sends email and SMS notifications. The Notification service listens on topics from incoming customer and account messages and then processes these messages by sending notifications to the given email and mobile.

The Account and Customer microservices have their own H2 database, and the Notification service uses MongoDB. In this application, you’ll use the Spring Cloud Stream module to provide abstract messaging mechanisms; it is a framework for building event-driven microservice applications.

This example also uses the edge service for API Gateway using Netflix Zuul. Zuul is a JVM-based router and is also used as server-side load balancer by Netflix. Spring has a strong bonding with Netflix Zuul and provides a Spring Cloud Netflix Zuul module.

Introducing Spring Cloud Streaming

Spring Cloud Stream is a framework for building message-driven microservice applications. It abstracts away the message producer and consumer code from message broker-specific implementations. Spring Cloud Stream provides input and output channels for servicing communications to the outside world. It provides the message broker’s connectivity to the Spring Cloud Stream. Message brokers, such as Kafka and RabbitMQ, can be easily added by injecting a binding dependency to the application code.

Here’s the Maven dependency for Spring Cloud Stream:

In the above Maven dependency, you have the Spring Cloud Stream dependency reactive model. Now to enable the application to connect with the message broker, use the following code:

Here, the @EnableBinding annotation is used to enable connectivity between the application and message broker. This annotation takes one or more interfaces as parameters; in this case, you have passed the NotificationStreams interface as a parameter:

As you can see, the interface declares input and/or output channels. This is your custom interface in this example, but you can also use other interfaces provided by Spring Cloud Stream:

  • Source: This interface can be used for an application that has a single outbound channel
  • Sink: This interface can be used for an application that has a single inbound channel
  • Processor: This interface can be used for an application that has both an inbound and an outbound channel

In the preceding code, the @Input annotation is used to identify an input channel. Using this identifier, it receives a message, which enters the application. Similarly, the @Output annotation is used to identify an output channel; using this identifier, published messages leave the application.

The @Input and @Output annotations take the name parameter as a channel name; if a name is not provided, then the name of the annotated method will be used by default. In this application, Kafka is used as a message broker.

Adding Kafka to your application

Apache Kafka is a publish-subscribe-based high-performance and horizontally scalable messaging platform. Developed by LinkedIn, it is fast, scalable, and distributed by design. Spring Cloud Stream supports binder implementations for Kafka and RabbitMQ. First, you have to install Kafka in your machine.

Installing and running Kafka

Download Kafka from https://kafka.apache.org/downloads and untar it using the following commands:

Now start ZooKeeper and Kafka on Windows:

You can start ZooKeeper and Kafka on Linux using the following commands:

After starting Kafka on your machine, add the Kafka Maven dependency in your application:

Here, Spring Cloud Stream and Kafka binder is already added. After adding these dependencies, set the configuration properties for Kafka.

Configuration properties for Kafka

Here is the application.yml configuration file for a microservice:

This file configures the address of the Kafka server to connect to, and the Kafka topic used for both the inbound and outbound streams in your code. The contentType properties tell Spring Cloud Stream to send or receive your message objects as strings in the streams.

Service used to write to Kafka

The following service class is responsible for writing to Kafka in your application:

The sentNotification() method uses an injected NotificationStreams object to send messages represented by the Notification object in your application. Now look at the following Controller class, which will trigger sending the message to Kafka.

Rest API controller

Here’s a Rest Controller class that you can use to create a REST API endpoint. This controller will trigger sending a message to Kafka using the NotificationService Spring Bean:

The preceding Controller class of Customer service has a dependency with NotificationService. The save() method is responsible for creating a customer in the corresponding database; it creates a notification message using the Notification object and sends it to Kafka using the sendNotification() method of NotificationService. Here’s another side of how Kafka listens to this message using the topic name notification.

Listening to a Kafka topic

Create a listener NotificationListener class that will be used to listen to messages on the Kafka notification topic and send email and SMS notifications to the customer:

The NotificationListener class has two methods: sendMailNotification() and sendSMSNotification(). These methods will be invoked by Spring Cloud Stream with every new Notification message object on the Kafka notification topic. These methods are annotated with @StreamListener. This annotation makes the method listener receive events for stream processing.

This article doesn’t have the complete code for this event-driven application; you can find the complete code in the GitHub repository at https://github.com/PacktPublishing/Mastering-Spring-Boot-2.0.

Now run this application to test how the event-driven microservice works. First, ensure that you run Kafka and Zookeeper. The Kafka server will be run at http://localhost:9092.

Now run EurekaServer, ApiZuulService, AccountService, CustomerService, and NotificationService. Open Eureka dashboard on the browser:

All services are running now; create a Customer object to trigger the event to Kafka. Here, Postman is used as a REST client. See the following diagram, where you created a new customer using the http://localhost:8080/api/customers/customer API endpoint through Zuul API Gateway:

You have entered a new customer record in the database. Whenever a new customer is created, it will trigger a message to Kafka to send email and SMS notifications using the Notification microservice. See the following console output of the Notification microservice:

You have created a new customer using the Customer service, which will trigger a notification to be sent to the customer using the Kafka broker. It is a message-driven asynchronous call. Similarly, whenever you create an account record for a new customer, Kafka will listen for another new notification message for the account creation:

Now verify the console of the Notification microservice:

You have successfully created an account record for the customer, which has triggered a message to Kafka to send email and SMS notifications to the customer. You can check the customer record for this customer by visiting http://localhost:8080/api/customers/customer/2001:

As you can see, the customer has complete information including an associated account object.

You’ve now learned to create an event-driven microservice using the Spring Cloud Stream, Kafka Event Bus, Spring Netflix Zuul, and Spring Discovery services.

If you found this article interesting, you can explore Dinesh Rajput’s Mastering Spring Boot 2.0 to learn how to develop, test, and deploy your Spring Boot distributed application and explore various best practices. This book will address challenges related to power that come with Spring Boot’s great configurability and flexibility.