Confluent: Developing a Streaming Microservices Application — Kafka

IamVigneshC
15 min readDec 30, 2020

Background

https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

Objectives

  • Persist events into Kafka by producing records that represent customer orders
  • Write a service that validates customer orders
  • Write a service that joins streaming order information with streaming payment information and data from a customer database
  • Define one set of criteria to filter records in a stream based on some criteria
  • Create a session window to define five-minute windows for processing
  • Create a state store for the Inventory Service
  • Create one persistent query that enriches the orders stream with customer information using a stream-table join

Persist events

An event is simply a thing that happened or occurred. An event in a business is some fact that occurred, such as a sale, an invoice, a trade, or a customer experience, and it is the source of truth. In event-oriented architectures, events are first-class citizens that constantly push data into applications. Client applications can then react to these streams of events in real time and decide what to do next.

In this section, you will persist events into Kafka by producing records that represent customer orders. This event happens in the Orders Service, which provides a REST interface to POST and GET Orders. Posting an Order is essentially a REST call, and it creates the event in Kafka.

  1. In Cloud Shell, start by cloning the Examples repository and the Kafka Streams Examples repository. You will use the latter for testing purposes.
git clone https://github.com/confluentinc/examples
content_copy
git clone https://github.com/confluentinc/kafka-streams-examples.git
content_copy

2. Next, run the following to navigate to the kafka-streams-examples folder and switch to the 5.5.1-post branch:

cd ~/kafka-streams-examples/ # change directory and change to 5.5.1-post branch
git checkout 5.5.1-post

3. Next, navigate to mircoservice-orders/exercises directory and switch to the 5.5.1-post branch:

cd ~/examples/microservices-orders/exercises # change directory and change to 5.5.1-post branch
git checkout 5.5.1-post

4. Click the Open Editor button on the toolbar of Cloud Shell. (You can switch between Cloud Shell and the code editor by using Open Editor and Open Terminal icon as required, or click the Open in new window button to leave the Editor open in a separate tab).

5. Open the OrdersService.java file by navigating to examples/microservices-orders/exercises/OrdersService.java in the Editor.

6. In OrdersService.java, fill in TODO 1.1: create a new ProducerRecord with a key specified by bean.getId() and value of the bean, to the orders topic whose name is specified by ORDERS.name():

producer.send(new ProducerRecord<>(ORDERS.name(), bean.getId(), bean),

7. Next, fill in TODO 1.2: produce the newly created record using the existing producer and pass use the OrdersService#callback function to send the response and the record key.

callback(response, bean.getId()));

8. Navigate back to Cloud Shell. To test your code, run the following commands to copy your version of the file to the main project, compile, and run the unit test. This may take a couple of minutes to complete.

# Copy your exercise client application to the project
cp ~/examples/microservices-orders/exercises/OrdersService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/.
# Compile the project and resolve any compilation errors
mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml
# Run the test and validate that it passes
mvn compile -Dtest=io.confluent.examples.streams.microservices.OrdersServiceTest test -f ~/kafka-streams-examples/pom.xmlcontent_copy

Note: you can proceed to the next section while you compile and run the tests in the background.

Event-driven applications

Service-based architectures are often designed to be request-driven, in which services send commands to other services to tell them what to do, await a response, or send queries to get the resulting state. Building services on a protocol of requests and responses forces a complicated web of synchronous dependencies that bind services together.

In contrast, in an event-driven design, the event stream is the inter-service communication that enables services to cross deployment boundaries and avoids synchronous execution. When and how downstream services respond to those events is within their control, which reduces the coupling between services and enables an architecture with more pluggability. Read more on Build Services on a Backbone of Events.

In this section, you will write a service that validates customer orders. Instead of using a series of synchronous calls to submit and validate orders, the order event itself triggers the OrderDetailsService. When a new order is created, it is written to the topic orders, from which OrderDetailsService has a consumer polling for new records.

  1. In the Cloud Shell Editor, navigate to examples/microservices-orders/exercises and open the OrderDetailsService.java file.
  2. In OrderDetailsService.java, implement TODO 2.1: subscribe the existing consumer to a Collections#singletonList with the orders topic whose name is specified by Topics.ORDERS.name()
consumer.subscribe(singletonList(Topics.ORDERS.name()));
content_copy

3. Implement TODO 2.2: validate the order using OrderDetailsService#isValid and save the validation result to type OrderValidationResult.

isValid(order) ? PASS : FAIL;

4. Adding to the previous command, implement TODO 2.3: create a new record using OrderDetailsService#result() that takes the order and validation result.

result(order, isValid(order) ? PASS : FAIL);

5. Finally, implement TODO 2.4: produce the newly created record using the existing producer.

producer.send(result(order, isValid(order) ? PASS : FAIL));

Note: you will only put the provided code for TODO 2.1 and TODO 2.4 in the file.

6. Navigate back to Cloud Shell. To test your code, run the following commands to copy your version of the file to the main project, compile, and run the unit test. This may take a couple of minutes to complete.

# Copy your exercise client application to the project
cp ~/examples/microservices-orders/exercises/OrderDetailsService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/.
# Compile the project and resolve any compilation errors
mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml
# Run the test and validate that it passes
mvn compile -Dtest=io.confluent.examples.streams.microservices.OrderDetailsServiceTest test -f ~/kafka-streams-examples/pom.xmlcontent_copy

Enriching streams with joins

Streams can be enriched with data from other streams or tables through joins. A join enriches data by performing lookups in a streaming context where data is updated continuously and concurrently. For example, applications backing an online retail store might enrich new data records with information from multiple databases. In this scenario, it may be that a stream of customer transactions is enriched with sales price, inventory, customer information, etc. These lookups can be performed at very large scale and with a low processing latency.

A popular pattern is to make the information in the databases available in Kafka through so-called change data capture (CDC), together with Kafka’s Connect API to pull in the data from the database. Once the data is in Kafka, client applications can perform very fast and efficient joins of such tables and streams, rather than requiring the application to make a query to a remote database over the network for each record. Read more on an overview of distributed, real-time joins and implementing joins in Kafka Streams.

In this exercise, you will write a service that joins streaming order information with streaming payment information and data from a customer database. First, the payment stream needs to be rekeyed to match the same key info as the order stream before joined together. The resulting stream is then joined with the customer information that was read into Kafka by a JDBC source from a customer database. Additionally, this service performs dynamic routing: an enriched order record is written to a topic that is determined from the value of level field of the corresponding customer.

  1. In the Cloud Shell editor, navigate to examples/microservices-orders/exercises/EmailService.java and open the file.
  2. In EmailService.java, implement TODO 3.1: create a new KStream called payments from payments_original, using KStream#selectKey to rekey on order id specified by payment.getOrderId() instead of payment id.

You will replace this:

final KStream<String, Payment> payments_original = builder.stream(PAYMENTS.name(),
Consumed.with(PAYMENTS.keySerde(), PAYMENTS.valueSerde()));

With this:

final KStream<String, Payment> payments = builder.stream(PAYMENTS.name(),
Consumed.with(PAYMENTS.keySerde(), PAYMENTS.valueSerde()))
//Rekey payments to be by OrderId for the windowed join
.selectKey((s, payment) -> payment.getOrderId());
  1. Next, implement TODO 3.2: do a stream-table join with the customers table, which requires three arguments:
  • The GlobalKTable for the stream-table join.
  • customer Id, specified by order.getCustomerId(), using a KeyValueMapper that gets the customer id from the tuple in the record's value.
  • A method that computes a value for the result record, in this case EmailTuple::setCustomer.
.join(customers,(key1, tuple) -> tuple.order.getCustomerId(),EmailTuple::setCustomer)

2. Implement TODO 3.3: route an enriched order record to a topic that is dynamically determined from the value of the customerLevel field of the corresponding customer.

.to((orderId, orderEnriched, record) -> orderEnriched.getCustomerLevel(), Produced.with(ORDERS_ENRICHED.keySerde(), ORDERS_ENRICHED.valueSerde()));

3. Navigate back to Cloud Shell. To test your code, run the following to copy your version of the file to the main project, compile, and run the unit test. This may take a couple of minutes to complete.

# Copy your exercise client application to the project
cp ~/examples/microservices-orders/exercises/EmailService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/.
# Compile the project and resolve any compilation errors
mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml
# Run the test and validate that it passes
mvn compile -Dtest=io.confluent.examples.streams.microservices.EmailServiceTest test -f ~/kafka-streams-examples/pom.xmlcontent_copy

Filtering and branching

A stream of events can be captured in a Kafka topic. Client applications can then manipulate this stream based on some user-defined criteria, even creating new streams of data that they can act on or downstream services can act on. These help create new streams with more logically consistent data. In some cases, the application may need to filter events from an input stream that match certain critera, which results in a new stream with just a subset of records from the original stream. In other cases, the application may need to branch events, whereby each event is tested against a predicate and then routed to a stream that matches, which results in multiple new streams split from the original stream.

In this section, you will define one set of criteria to filter records in a stream based on some criteria. Then you will define another set of criteria to branch records into two different streams.

  1. In the Cloud Shell Editor, navigate to examples/microservices-orders/exercises and open the FraudService.java file.
  2. In FraudService.java, implement TODO 4.1: filter this stream to include only orders in "CREATED" state, i.e., it should satisfy the predicate OrderState.CREATED.equals(order.getState()).
.filter((id, order) -> OrderState.CREATED.equals(order.getState()));

3. Next, implement TODO 4.2: create a KStream<String, OrderValue> array from the ordersWithTotals stream by branching the records based on OrderValue#getValue.

  • First branched stream: FRAUD_CHECK will fail for predicate where order value >= FRAUD_LIMIT
  • Second branched stream: FRAUD_CHECK will pass for predicate where order value < FRAUD_LIMIT
@SuppressWarnings("unchecked")
final KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
(id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
(id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);

4. Navigate back to Cloud Shell. To test your code, Run the following to copy your version of the file to the main project, compile, and run the unit test. This may take a couple of minutes to complete.

# Copy your exercise client application to the project
cp ~/examples/microservices-orders/exercises/FraudService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/.
# Compile the project and resolve any compilation errors
mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml
# Run the test and validate that it passes
mvn compile -Dtest=io.confluent.examples.streams.microservices.FraudServiceTest test -f ~/kafka-streams-examples/pom.xmlcontent_copy

Stateful operations

An aggregation operation takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing count or sum, because they combine current record values with previous record values. These are stateful operations because they maintain data during processing. Aggregations are always key-based operations, and Kafka's Streams API ensures that records for the same key are always routed to the same stream processing task. Oftentimes, these are combined with windowing capabilities in order to run computations in real time over a window of time.

In this section, you will create a session window to define five-minute windows for processing. Additionally, you will use a stateful operation, reduce, to collapse duplicate records in a stream. Before running reduce you will group the records to repartition the data, which is generally required before using an aggregation operator.

  1. In the Cloud Shell Editor, navigate to examples/microservices-orders/exercises and open the ValidationsAggregatorService.java file.
  2. In ValidationsAggregatorService.java, implement TODO 5.1: window the data using KGroupedStream#windowedBy, specifically using SessionWindows.with to define 5-minute windows.
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)))

3. Next, implement TODO 5.2: group the records by key using KStream#groupByKey, providing the existing Serialized instance for ORDERS.

.groupByKey(serdes6)

4. Implement TODO 5.3: use an aggregation operator KTable#reduce to collapse the records in this stream to a single order for a given key.

.reduce((order, v1) -> order)

5. Navigate back to Cloud Shell. To test your code, run the following to copy your version of the file to the main project, compile, and run the unit test. This may take a couple of minutes to complete.

# Copy your exercise client application to the project
cp ~/examples/microservices-orders/exercises/ValidationsAggregatorService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/.
# Compile the project and resolve any compilation errors
mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml
# Run the test and validate that it passes
mvn compile -Dtest=io.confluent.examples.streams.microservices.ValidationsAggregatorServiceTest test -f ~/kafka-streams-examples/pom.xmlcontent_copy

State stores

Kafka Streams provides so-called state stores, which are disk-resident hash tables held inside the API for the client application. The state store can be used within stream processing applications to store and query data, an important capability when implementing stateful operations. It can be used to remember recently received input records, to track rolling aggregates, to de-duplicate input records, etc.

It is also backed by a Kafka topic and comes with all the Kafka guarantees. Consequently, other applications can also interactively query another application’s state store. Querying state stores is always read-only to guarantee that the underlying state stores will never be mutated out-of-band (i.e., you cannot add new entries).

In this section, you will create a state store for the Inventory Service. This state store is initialized with data from a Kafka topic before the service starts processing, and then it is updated as new orders are created.

  1. In the Cloud Shell editor, navigate to examples/microservices-orders/exercises and open the InventoryService.java file.
  2. In InventoryService.java, implement TODO 6.1: create a state store called RESERVED_STOCK_STORE_NAME, using Stores#keyValueStoreBuilder and Stores#persistentKeyValueStore.
  • The key Serde is derived from the topic specified by WAREHOUSE_INVENTORY
  • The value Serde is derived from Serdes.Long() because it represents a count
.keyValueStoreBuilder(Stores.persistentKeyValueStore(RESERVED_STOCK_STORE_NAME),
Topics.WAREHOUSE_INVENTORY.keySerde(), Serdes.Long())

3. Implement TODO 6.2: update the reserved stock in the KeyValueStore called reservedStocksStore.

  • The key is the product in the order, using OrderBean#getProduct
  • The value is the sum of the current reserved stock and the quantity in the order, using OrderBean#getQuantity
reservedStocksStore.put(order.getProduct(), reserved + order.getQuantity());

4. Navigate back to Cloud Shell. To test your code, run the following to copy your version of the file to the main project, compile, and run the unit test. This may take a couple of minutes to complete.

# Copy your exercise client application to the project
cp ~/examples/microservices-orders/exercises/InventoryService.java ~/kafka-streams-examples/src/main/java/io/confluent/examples/streams/microservices/.
# Compile the project and resolve any compilation errors
mvn clean compile -DskipTests -f ~/kafka-streams-examples/pom.xml
# Run the test and validate that it passes
mvn compile -Dtest=io.confluent.examples.streams.microservices.InventoryServiceTest test -f ~/kafka-streams-examples/pom.xmlcontent_copy

Enrichment with ksqlDB

Confluent ksqlDB is the streaming SQL engine that enables real-time data processing against Apache Kafka. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka, without requiring you to write code in a programming language such as Java or Python. ksqlDB is scalable, elastic, fault tolerant, and it supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and sessionization.

You can use ksqlDB to merge streams of data in real time by using a SQL-like join syntax. A ksqlDB join and a relational database join are similar in that they both combine data from two sources based on common values. The result of a ksqlDB join is a new stream or table that’s populated with the column values that you specify in a SELECT statement. ksqlDB also supports several aggregate functions, like COUNT and SUM. You can use these to build stateful aggregates on streaming data.

In this section, you will create one persistent query that enriches the orders stream with customer information using a stream-table join. You will create another persistent query that detects fraudulent behavior by counting the number of orders in a given window.

  1. In the Cloud Console, from the Navigation menu, navigate to Compute Engine > VM Instances.
  2. Click SSH for the microservices-vm instance.

3. In your SSH window, run the following script to bootstrap the VM:

wget https://raw.githubusercontent.com/GoogleCloudPlatform/training-data-analyst/master/quests/confluent/bootstrap_vm.sh
chmod +x bootstrap_vm.sh
./bootstrap_vm.sh
content_copy

4. Once the commands finish executing, exit out of your SSH window.

5. Now, click SSH again to restart the session.

6. Clone the repository, and switch to the 5.5.1-post branch:

git clone https://github.com/confluentinc/examples
cd ~/examples/microservices-orders/exercises # change directory and change to 5.5.1-post branch
git checkout 5.5.1-post
content_copy

Note: the #TODO implementations you've written won't be used in this last part of the lab. You can simply copy over the files from the repository again.

  1. Run the following to change to the microservices-orders directory, and build the Docker image:
sudo chmod -R 777 ~/examples/microservices-orders/
cd ~/examples/microservices-orders
export LD_LIBRARY_PATH='/usr/bin/openssl'
sudo sysctl -w vm.max_map_count=262144
docker-compose up -d --build
content_copy

2. After a minute or so, run the docker-compose ps status command to ensure that everything has started correctly:

docker-compose ps
content_copy

Note: Make sure your ksqldb-server is up (healthy) and running, and the kafka-setup and ksql-query-setup are in the Exit 0 state before you run the next command, as this will ensure the queries have been executed. This may take up to 5 minutes to start up.

3. Launch the ksqlDB CLI:

docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
content_copy

4. From the ksqlDB CLI prompt, execute the following command to see the Kafka topics created. We will be using orders and customers topics.

SHOW TOPICS;
content_copy

You should see the following topics:

5. Next, you will create some persistent queries. Start by creating a new ksqlDB stream that does a stream-table join between orders and my_customers_table based on customer id:

CREATE STREAM my_orders WITH (kafka_topic='orders', value_format='AVRO');--Next 3 steps are required to create a TABLE with keys of String type--1. Create a stream
CREATE STREAM my_customers_with_wrong_format_key WITH (kafka_topic='customers', value_format='AVRO');
--2. Derive a new stream with the required key changes.
--The CAST statement converts the key to the required format.
--The PARTITION BY clause re-partitions the stream based on the new, converted key.
CREATE STREAM my_customers_with_proper_key WITH (KAFKA_TOPIC='my_customers-with-proper-key') AS SELECT CAST(id as BIGINT) as customerid, firstname, lastname, email, address, level FROM my_customers_with_wrong_format_key PARTITION BY CAST(id as BIGINT);
--3. Create the table on the properly keyed stream
CREATE TABLE my_customers_table (rowkey bigint KEY, customerid bigint, firstname varchar, lastname varchar, email varchar, address varchar, level varchar) WITH (KAFKA_TOPIC='my_customers-with-proper-key', VALUE_FORMAT='AVRO', KEY='customerid');
--Join customer information based on customer id
CREATE STREAM my_orders_cust1_joined AS SELECT my_customers_table.customerid AS customerid, firstname, lastname, state, product, quantity, price FROM my_orders LEFT JOIN my_customers_table ON my_orders.customerid = my_customers_table.customerid;content_copy

6. Finally, create a new ksqlDB table that counts if a customer submits more than 2 orders in a 30 second time window:

--Fraud alert if a customer submits more than 2 orders in a 30 second time window
CREATE TABLE MY_FRAUD_ORDER AS SELECT CUSTOMERID, LASTNAME, FIRSTNAME, COUNT(*) AS COUNTS FROM my_orders_cust1_joined WINDOW TUMBLING (SIZE 30 SECONDS) GROUP BY CUSTOMERID, LASTNAME, FIRSTNAME HAVING COUNT(*)>2;content_copy

7. The CLI parser will give immediate feedback whether your ksqlDB queries worked or not. Use the following command to see the rows in each query. You can exit out of the process by typing CTRL + C at any point.

-- Visualize frauds
SELECT * FROM MY_FRAUD_ORDER EMIT CHANGES;

We started by producing records representing customer orders to persist into Kafka. We then wrote services that validated customer orders and joined streaming order information with streaming payment information and data from a customer database. Next, we defined one set of criteria to filter records in a stream. Lastly, we created a session window to define five-minute windows for processing, created a state store for the Inventory Service, and created one persistent query that enriched the orders stream with customer information using a stream-table join.

--

--

IamVigneshC

Product Manager/Owner, Product Consultant, Business Analyst, Techie