This tutorial contains a sample app that demonstrates akka-stream-kafka.
akka-stream-kafka is a wrapper that allows interacting with Apache Kafka like with a Reactive Stream, which is a standard for asynchronous stream processing with non-blocking backpressure.
akka-stream-kafka allows reading from Kafka like from a reactive Source, or writing to its representation as a Sink. The library offers a simple DSL for constructing reactive Sinks/Streams/Flows with various configuration options for the underlying Kafka queue. It also supports manual commit to achieve at-least-once delivery.
What is our example application's logic?
Our flow begins with a random number generator which publishes numbers to
a Kafka queue using a reactive stream. At the same time, two consumer streams read from this topic and expose
read data in two different ways. The LoggingConsumer
starts with application and writes all the
numbers to a log. The HomeController
represents a Play web endpoint which initializes another
stream and emits read numbers via a WebSocket.
Let's now take a look at all the individual components and how they work together.
Starts embedded Kafka service and actors: RandomNumberWriter
+ LoggingConsumer
. Also
maintains a shutdown hook responsible for graceful shutdown.
An actor which creates and runs the producer stream. This stream uses Kafka as a sink and sends generated numbers to it, using default ack settings. Materialized value of type `Control` is used for graceful shutdown of the stream. In case of stream failure, the exception will handled by restarting the actor which will re-initialize the stream.
This actor creates a consumer stream which reads data from our topic and writes the numbers to log (which
is configured as STDOUT). Our processing can be parallelized, which is demonstrated with mapAsync(2)(processMessage)
.
We consider data as "processed" after it gets written to the log, then
all the messages flow further into a grouping stage (groupWithin
). This stage finishes grouping
after it collects 10 messages or 15 seconds pass, whatever happens first. Such group gets then committed to Kafka.
Error handling and shutdown control are managed in the same way as in case of `RandomNumberWriter`. When our actor restarts, it will resume the stream which should start consuming from last committed offset. Since we commit messages in batches after processing, our approach guarantees at-least-once delivery semantics.
There may be more than one consumer for a topic. Our HomeController
allows connecting using, for example,
a web browser. Such request will open a websocket emitting numbers read by its own stream. This consumer stream
has a different groupId than our logging consumer.
Error handling and shutdown are maintained by Play framework, so we can just create a `Source` and pass it further to the framework.
It is important that we close the consumers and publishers before actor system goes down. The
DemoLifecycle
uses Play shutdown hook which sends appropriate messages to writer/reader actors.
These actor will then use `Control` objects materialized by streams to close the flows and all underlying Kafka connections.
To start the app, you just need to execute run
in the sbt console. Next, open localhost:9000 in a
web browser. The console output should look
similar to following:
09:52:54.941 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
09:52:54.945 INFO reactivekafka.DemoLifecycle - Starting embedded Kafka
09:52:55.556 INFO reactivekafka.DemoLifecycle - Embedded Kafka ready
09:52:55.686 INFO reactivekafka.RandomNumberWriter - Initializing writer
09:52:55.746 INFO reactivekafka.RandomNumberWriter - Writer now running, writing random numbers to topic RandomNumbers
09:52:57.562 INFO reactivekafka.LoggingConsumer - Initializing logging consumer
09:52:57.589 INFO reactivekafka.LoggingConsumer - Logging consumer started
09:52:57.672 INFO play.api.Play - Application started (Dev)
09:52:58.761 INFO reactivekafka.LoggingConsumer - Consumed number: -881358456
09:52:58.762 INFO reactivekafka.LoggingConsumer - Consumed number: -1211016723
09:52:58.762 INFO reactivekafka.LoggingConsumer - Consumed number: 82910467
09:52:59.737 INFO reactivekafka.LoggingConsumer - Consumed number: -1250504005
Output in the browser should display same numbers. If you now push ctrl+D
in the console, you
will see indications that the clean shutdown hooks are doing their job:
09:54:32.453 INFO reactivekafka.DemoLifecycle - Shutting down application...
09:54:32.454 INFO reactivekafka.RandomNumberWriter - Stopping Kafka producer stream and actor
09:54:32.455 INFO reactivekafka.LoggingConsumer - Shutting down logging consumer stream and actor
You may notice that the logback.xml
configuration file contains directives that silence some errors
and warnings.
These messages are related to running Kafka/Zookeeper in embedded mode and do not really affect our application,
so we don't want them to pollute our output in this example. You can always try to run the app on your local
ZK/Kafka
instance and disable usage of EmbeddedKafka
in the DemoLifecycle
actor.