Tag: spring cloud

Building an Event-Driven Reactive Asynchronous System with Spring Boot and Kafka

Building an event-driven Reactive Asynchronous System

Spring Boot provides a new strategy for application development with the Spring Framework. It enables you to focus only on the application’s functionality rather than on Spring meta configuration, as Spring Boot requires minimal to zero configuration in the Spring application.

This article will show you how to build a sample project that demonstrates how to create a real-time streaming application using event-driven architecture, Spring Cloud Stream, Spring Boot, Apache Kafka, and Spring Netflix Eureka.

Architecture

Here, Netflix Hystrix has been used to implement the circuit breaker pattern and the API Gateway proxy has been configured using Netflix Zuul.

To get started, create an application with three microservices: Account, Customer, and Notification. Whenever you create a customer record or create an account for a customer, a notification service sends an email and a mobile notification.

All three decoupled services—Account, Customer, and Notification—are independently deployable applications. The Account service can be used to create, read, update, and delete customer accounts. It also sends a message to the Kafka topic when a new account is created.

Similarly, the Customer service is used to create, read, update, and delete a customer in the database. It sends a message to the Kafka topic when a new customer is created, and the Notification service sends email and SMS notifications. The Notification service listens on topics from incoming customer and account messages and then processes these messages by sending notifications to the given email and mobile.

The Account and Customer microservices have their own H2 database, and the Notification service uses MongoDB. In this application, you’ll use the Spring Cloud Stream module to provide abstract messaging mechanisms; it is a framework for building event-driven microservice applications.

This example also uses the edge service for API Gateway using Netflix Zuul. Zuul is a JVM-based router and is also used as server-side load balancer by Netflix. Spring has a strong bonding with Netflix Zuul and provides a Spring Cloud Netflix Zuul module.

Introducing Spring Cloud Streaming

Spring Cloud Stream is a framework for building message-driven microservice applications. It abstracts away the message producer and consumer code from message broker-specific implementations. Spring Cloud Stream provides input and output channels for servicing communications to the outside world. It provides the message broker’s connectivity to the Spring Cloud Stream. Message brokers, such as Kafka and RabbitMQ, can be easily added by injecting a binding dependency to the application code.

Here’s the Maven dependency for Spring Cloud Stream:

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-stream-reactive</artifactId>

</dependency>

In the above Maven dependency, you have the Spring Cloud Stream dependency reactive model. Now to enable the application to connect with the message broker, use the following code:

@EnableBinding(NotificationStreams.class)

public class StreamsConfig {

}

Here, the @EnableBinding annotation is used to enable connectivity between the application and message broker. This annotation takes one or more interfaces as parameters; in this case, you have passed the NotificationStreams interface as a parameter:

public interface NotificationStreams {

String INPUT = "notification-in";

String OUTPUT = "notification-out";

@Input(INPUT)

SubscribableChannel subscribe();

@Output(OUTPUT)

MessageChannel notifyTo();

}

As you can see, the interface declares input and/or output channels. This is your custom interface in this example, but you can also use other interfaces provided by Spring Cloud Stream:

  • Source: This interface can be used for an application that has a single outbound channel
  • Sink: This interface can be used for an application that has a single inbound channel
  • Processor: This interface can be used for an application that has both an inbound and an outbound channel

In the preceding code, the @Input annotation is used to identify an input channel. Using this identifier, it receives a message, which enters the application. Similarly, the @Output annotation is used to identify an output channel; using this identifier, published messages leave the application.

The @Input and @Output annotations take the name parameter as a channel name; if a name is not provided, then the name of the annotated method will be used by default. In this application, Kafka is used as a message broker.

Adding Kafka to your application

Apache Kafka is a publish-subscribe-based high-performance and horizontally scalable messaging platform. Developed by LinkedIn, it is fast, scalable, and distributed by design. Spring Cloud Stream supports binder implementations for Kafka and RabbitMQ. First, you have to install Kafka in your machine.

Installing and running Kafka

Download Kafka from https://kafka.apache.org/downloads and untar it using the following commands:

> tar -xzf kafka_2.12-1.1.0.tgz

> cd kafka_2.12-1.1.0

Now start ZooKeeper and Kafka on Windows:

> bin\windows\zookeeper-server-start.bat configzookeeper.properties

> bin\windows\kafka-server-start.bat configserver.properties

You can start ZooKeeper and Kafka on Linux using the following commands:

> bin/zookeeper-server-start.sh config/zookeeper.properties

> bin/kafka-server-start.sh config/server.properties

After starting Kafka on your machine, add the Kafka Maven dependency in your application:

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-stream-binder-kafka</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>

</dependency>

Here, Spring Cloud Stream and Kafka binder is already added. After adding these dependencies, set the configuration properties for Kafka.

Configuration properties for Kafka

Here is the application.yml configuration file for a microservice:

spring:

  application:

    name: customer-service

  cloud:

    stream:

      kafka:

        binder:

          brokers:

          - localhost:9092

      bindings:

          notification-in:

            destination: notification

            contentType: application/json

          notification-out:

            destination: notification

            contentType: application/json

This file configures the address of the Kafka server to connect to, and the Kafka topic used for both the inbound and outbound streams in your code. The contentType properties tell Spring Cloud Stream to send or receive your message objects as strings in the streams.

Service used to write to Kafka

The following service class is responsible for writing to Kafka in your application:

@Servicepublic class NotificationService {



 private final NotificationStreams notificationStreams;



 public NotificationService(NotificationStreams notificationStreams) {

  super();

  this.notificationStreams = notificationStreams;

 }



 public void sendNotification(final Notification notification) {

  MessageChannel messageChannel = notificationStreams.notifyTo();

  messageChannel.send(MessageBuilder.withPayload(notification)

    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)

    .build());

 }

}

The sentNotification() method uses an injected NotificationStreams object to send messages represented by the Notification object in your application. Now look at the following Controller class, which will trigger sending the message to Kafka.

Rest API controller

Here’s a Rest Controller class that you can use to create a REST API endpoint. This controller will trigger sending a message to Kafka using the NotificationService Spring Bean:

@RestController

public class CustomerController {

...

@Autowired

CustomerRepository customerRepository;

@Autowired

AccountService accountService;

@Autowired

NotificationService notificationService;

@PostMapping(value = "/customer")

public Customer save (@RequestBody Customer customer){

Notification notification = new Notification("Customer is created", "admin@dineshonjava.com", "9852XXX122");

notificationService.sendNotification(notification);

return customerRepository.save(customer);

}

...

...

}

The preceding Controller class of Customer service has a dependency with NotificationService. The save() method is responsible for creating a customer in the corresponding database; it creates a notification message using the Notification object and sends it to Kafka using the sendNotification() method of NotificationService. Here’s another side of how Kafka listens to this message using the topic name notification.

Listening to a Kafka topic

Create a listener NotificationListener class that will be used to listen to messages on the Kafka notification topic and send email and SMS notifications to the customer:

@Component

public class NotificationListener {

@StreamListener(NotificationStreams.INPUT)

public void sendMailNotification(@Payload Notification notification) {

System.out.println("Sent notification to email: "+notification.getEmail()+" Message: "+notification.getMessage());

}

@StreamListener(NotificationStreams.INPUT)

public void sendSMSNotification(@Payload Notification notification) {

System.out.println("Notified with SMS to mobile: "+notification.getMobile()+" Message: "+notification.getMessage());

}

}

The NotificationListener class has two methods: sendMailNotification() and sendSMSNotification(). These methods will be invoked by Spring Cloud Stream with every new Notification message object on the Kafka notification topic. These methods are annotated with @StreamListener. This annotation makes the method listener receive events for stream processing.

This article doesn’t have the complete code for this event-driven application; you can find the complete code in the GitHub repository at https://github.com/PacktPublishing/Mastering-Spring-Boot-2.0.

Now run this application to test how the event-driven microservice works. First, ensure that you run Kafka and Zookeeper. The Kafka server will be run at http://localhost:9092.

Now run EurekaServer, ApiZuulService, AccountService, CustomerService, and NotificationService. Open Eureka dashboard on the browser:

All services are running now; create a Customer object to trigger the event to Kafka. Here, Postman is used as a REST client. See the following diagram, where you created a new customer using the http://localhost:8080/api/customers/customer API endpoint through Zuul API Gateway:

You have entered a new customer record in the database. Whenever a new customer is created, it will trigger a message to Kafka to send email and SMS notifications using the Notification microservice. See the following console output of the Notification microservice:

You have created a new customer using the Customer service, which will trigger a notification to be sent to the customer using the Kafka broker. It is a message-driven asynchronous call. Similarly, whenever you create an account record for a new customer, Kafka will listen for another new notification message for the account creation:

Now verify the console of the Notification microservice:

You have successfully created an account record for the customer, which has triggered a message to Kafka to send email and SMS notifications to the customer. You can check the customer record for this customer by visiting http://localhost:8080/api/customers/customer/2001:

As you can see, the customer has complete information including an associated account object.

You’ve now learned to create an event-driven microservice using the Spring Cloud Stream, Kafka Event Bus, Spring Netflix Zuul, and Spring Discovery services.

If you found this article interesting, you can explore Dinesh Rajput’s Mastering Spring Boot 2.0 to learn how to develop, test, and deploy your Spring Boot distributed application and explore various best practices. This book will address challenges related to power that come with Spring Boot’s great configurability and flexibility.