Friday 17th May 2024
Ho Chi Minh, Vietnam

1. Why Kafka?

Kafka is an open-source distributed event streaming platform used for building real-time data pipelines and streaming applications. Originally Kafka was developed by LinkedIn and later open-sourced as a part of the Apache Software Foundation, Apache Kafka is designed to handle high-throughput, fault-tolerant, and scalable streaming data. It’s commonly used in scenarios like Real-time Data Processing, Log Aggregation, Event Sourcing, Stream Processing, Metrics Monitoring, Data Integration, Microservices Communication, IOT, Change Data Capture, and Machine Learning Pipelines.

Kafka does have distinct advantages compared with other messaging systems that make it a preferred choice for certain use cases:

  • Scalability: Kafka is designed to scale horizontally, allowing it to handle massive volumes of data and high-throughput workloads across distributed systems. Its architecture enables seamless scaling by adding more brokers to the cluster.
  • Fault Tolerance: Kafka ensures high availability and fault tolerance by replicating data across multiple brokers in a cluster. This replication mechanism ensures that data remains available even if some nodes fail, enhancing the reliability of the system.
  • Durability: Kafka persists messages to disk, providing durability and enabling consumers to replay messages if needed. This durability ensures that messages are not lost even in the event of hardware failures.
  • Real-time Processing: Kafka facilitates real-time processing of streaming data, making it suitable for use cases such as real-time analytics, monitoring, and event-driven architectures. Its low latency and high throughput capabilities enable timely processing of data streams.
  • Decoupled Architecture: Kafka’s decoupled architecture allows producers and consumers to operate independently, providing flexibility and scalability in designing streaming data applications. Producers can publish messages to topics without needing to know who will consume them, and consumers can subscribe to topics to receive messages of interest.
  • Integration Ecosystem: Kafka has a rich ecosystem of connectors and integrations with various systems and frameworks, including stream processing engines like Apache Flink and Apache Spark, as well as storage systems like Apache Hadoop. This ecosystem facilitates seamless integration with existing infrastructure and enables the building of end-to-end data pipelines.
  • Community and Support: Kafka benefits from a large and active community of developers and users, providing a wealth of resources, documentation, and community-driven support. This ecosystem contributes to the ongoing development and improvement of Kafka, ensuring its continued relevance and adoption in the industry.

2. Kafka’s architecture and basic concepts

Apache Kafka follows a distributed architecture designed for scalability, fault tolerance, and high throughput.

Here’s an overview of Kafka’s architecture:

  1. Broker:
    • Kafka clusters consist of one or more Kafka brokers.
    • A broker is a server responsible for storing and managing topic partitions, handling produce and consume requests, and replicating data across the cluster for fault tolerance.
    • Brokers communicate with each other to ensure data replication and consistency across partitions.
  2. Topic:
    • Topics are logical categories or feeds to which messages are published by producers and from which messages are consumed by consumers.
    • Each topic is divided into one or more partitions.
    • Topics can have multiple consumers (consumer groups) subscribed to them, enabling parallel message processing.
  3. Partition:
    • Each topic is divided into one or more partitions, which are distributed across Kafka brokers in the cluster.
    • Partitions are the unit of parallelism in Kafka, allowing messages within a topic to be processed concurrently by multiple consumers.
    • Messages within a partition are ordered and immutable, ensuring strict message ordering and durability.
  4. Producer:
    • Producers are client applications that publish messages to Kafka topics.
    • Producers send messages to Kafka brokers, specifying the topic and optionally a message key.
    • Producers can choose to receive acknowledgments (acks) from brokers to confirm successful message delivery.
  5. Consumer:
    • Consumers are client applications that subscribe to Kafka topics and consume messages from topic partitions.
    • Consumers can be organized into consumer groups, where each consumer group processes messages independently.
    • Kafka provides scalable and fault-tolerant message consumption through consumer rebalancing and offset management.
  6. Consumer Group:
    • A consumer group is a logical grouping of consumers that jointly consume messages from one or more Kafka topics.
    • Each message within a topic partition is delivered to only one consumer within the group, enabling parallel message processing and load balancing across consumers.
  7. ZooKeeper:
    • Kafka uses Apache ZooKeeper for cluster coordination, metadata management, and leader election.
    • ZooKeeper maintains metadata about brokers, topics, partitions, and consumer groups, ensuring consistency and fault tolerance in the Kafka cluster.

In this article, we dig into Broker, Producer, Consumer (consumer group) Topic (partition) and ZooKeeper, we will go through the rest in the next articles.

3. Consumer and Partition relationship

In Apache Kafka, the relationship between consumers and partitions is fundamental to how messages are processed and distributed within a Kafka topic, so it is super important to understand this relationship to use Kafka in the right way.

Each Kafka topic is divided into one or more partitions. Each partition is exclusively owned by a single consumer within a consumer group. This ensures that messages within a partition are processed sequentially by a single consumer instance.

Kafka consumers provide parallelism by allowing multiple instances (consumers) within a consumer group to process messages concurrently. Each consumer within a group is assigned one or more partitions to consume messages from. The number of partitions assigned to a consumer is determined by the number of consumer instances and the partitioning strategy (e.g., round-robin, range-based) used by Kafka. Hence, Kafka dynamically rebalances partition assignments among consumer instances within a consumer group to ensure equitable distribution of workload and guarantees that messages within a partition are processed in the order they are received.

Let’s consider an example: suppose we have a Kafka cluster with one or many brokers and a topic named example_topic with three partitions (Partition0, Partition1, Partition2).

We also have a consumer group named example_group with three consumer instances (Consumer1, Consumer2, Consumer3) that belong to this group.

  1. Subscription:
    • The consumer group coordinator receives the subscription information from the consumers in example_group. Let’s say the subscription is for the example_topic.
  2. Partition Assignment:
    • Kafka uses a partition assignment strategy to determine how to distribute the partitions of example_topic among the consumers in example_group.
    • Let’s assume Kafka uses a round-robin assignment strategy. In this case, each consumer in the group will be assigned one partition in a round-robin fashion.
  3. Partition Assignment:
    • After the partition assignment is calculated, the consumer group coordinator communicates the assignments to the consumers.
    • Consumer1 is assigned Partition0, Consumer2 is assigned Partition1, and Consumer3 is assigned Partition2.
  4. Consumption:
    • Each consumer starts consuming messages from the partition(s) it was assigned.
    • Consumer1 consumes messages from Partition0, Consumer2 consumes messages from Partition1, and Consumer3 consumes messages from Partition2.
    • Messages within each partition are processed sequentially by the assigned consumer.
  5. Rebalancing:
    • If a new consumer joins the group or an existing consumer leaves, Kafka triggers a rebalance operation.
    • During rebalancing, the partition assignments are recalculated, and partitions may be reassigned among the consumers to maintain load balance and fault tolerance.

4. Demonstration

We will create an Order Management system containing three services: Checkout Service, Order Service, and Fulfillment Service communicating via two Kafka topics: OrderCreated and OrderProcessed. Each service can be scaled up horizontally by adding more instances.

The order process is triggered via a REST API sent from the client side to Checkout Service as below.

4.1. Prerequisites

  • Docker
  • Java 11
  • Gradle
  • Spring boot
  • Spring Cloud

4.2. Build up Spark locally with Docker

We will use Docker Compose to build the Kafka local environment including:

  • A zookeeper
  • A broker
  • A Kafka CLI container to create OrderCreated and OrderProcessed with two partitions
  • A Kafka UI to monitor and manage Apache Kafka clusters

docker-compose.yml

version: '2.1'

services:
  zookeeper:
    container_name: project_name-zookeeper
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    container_name: project_name-kafka
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    expose:
      - "29092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  init-kafka:
    container_name: project_name-init-kafka
    image: confluentinc/cp-kafka:latest
    depends_on:
      - kafka
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic orderCreatedTopic --replication-factor 1 --partitions 2
      kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic orderProcessedTopic --replication-factor 1 --partitions 2

      echo -e 'Created kafka topics:'
      kafka-topics --bootstrap-server kafka:29092 --list
      "

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8088:8080
    depends_on:
      - kafka
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
      KAFKA_CLUSTERS_0_NAME: wizard_test
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

4.3. Implementation

We use only one Spring Application to implement our three services. We use Spring Cloud Stream to simplify the integration of Kafka into our services which is a framework for building message-driven applications.

To produce/consume Kafka messages to OrderCreated and OrderProcessed topics, we use StreamBridge helping us to produce messages from Checkout Service and Order Service, we also need to define two consumer beans for Order Service and Consumer Service.

Producer.java

Please note that we use customerId as the message key to ensure all events related to the same customer are processed sequentially by assigning them to the same partition.

@Service
@AllArgsConstructor
public class Producer {
  private final StreamBridge streamBridge;

  public void produceOrderCreated(Order order) {
    Message<Order> message = MessageBuilder.withPayload(order).setHeader(
      KafkaHeaders.MESSAGE_KEY, String.valueOf(order.getCustomerId())).build();
    streamBridge.send("produceOrderCreated-out-0",message);
  }

  public void produceOrderProcessed(Order order) {
    Message<Order> message = MessageBuilder.withPayload(order).setHeader(
      KafkaHeaders.MESSAGE_KEY, String.valueOf(order.getCustomerId())).build();
    streamBridge.send("produceOrderProcessed-out-0",message);
  }
}

CheckoutEndpoint.java

@RestController
@AllArgsConstructor
public class CheckoutEndpoint {

  private final Producer producer;

  @PostMapping("/app/order/{id}")
  public ResponseEntity<String> checkout(@PathVariable Integer id, @RequestParam Integer customerId) {
    System.out.println("Creating order with order id " + id + ", customer id" + customerId);
    producer.produceOrderCreated(Order.builder().id(id).customerId(customerId).build());
    return ResponseEntity.ok("OK");
  }
}

OrderServiceConsumer.java

@Configuration
public class OrderServiceConsumer {

  @Autowired
  private Producer producer;

  @Bean
  public Consumer<Order> consumeOrderCreated() {
    return order -> {
      System.out.println("Processing order: " + order);
      producer.produceOrderProcessed(order);
    };
  }
}
                                                                                                                                                                                                                                                                                                                                                                                                              

FulfillmentServiceConsumer.java

@Configuration
public class FulfillmentServiceConsumer {

  @Bean
  public Consumer<Order> consumeOrderProcessed() {
    return order -> {
      System.out.println("Processing shipment for order: " + order);
    };
  }

}

Spring Cloud Stream binder is the most important configuration when using this framework to make sure we produce/consume the correct topic and partition via input/output channels. Spring Cloud Stream provides a flexible approach to define binder configuration via the application YML.

application.yml

server:
  port: 8083
  tomcat:
    max-threads: 15


spring:
  cloud:
    function:
      definition: consumeOrderCreated;consumeOrderProcessed (1)
    stream:
      kafka:
        binder:
          brokers: localhost:9092 (2) 
        bindings:
          # checkout service to produce orderCreated message
          produceOrderCreated-out-0: (3)
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
          # order service to produce orderProcessed message
          produceOrderProcessed-out-0: (4)
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
          # order service to consume consumeOrderCreated message
          consumeOrderCreated-in-0: (5)
            consumer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
          # fulfillment service to consume consumeOrderProcessed message
          consumeOrderProcessed-in-0: (6)
            consumer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
      bindings: (7)
        produceOrderCreated-out-0:
          destination: orderCreatedTopic
          producer:
            use-native-encoding: true
        produceOrderProcessed-out-0:
          destination: orderProcessedTopic
          producer:
            use-native-encoding: true
        consumeOrderCreated-in-0:
          destination: orderCreatedTopic
          group: OrderCreatedConsumer
        consumeOrderProcessed-in-0:
          destination: orderProcessedTopic
          group: OrderProcessedConsumer
  1. Define consumer functions which are the beans defined in OrderServiceConsumer.java and FulfillmentServiceConsumer.java
  2. Config Kafka brokers connection
  3. Define the output binding to produce messages to OrderCreated topic (Note that we don’t need to define explicitly this function in our application, it is just used in the YML file to define binder and binding, in our application, we use StreamBridge to produce message to this binding)
  4. Define the output binding to produce messages to OrderProcessed message
  5. Define the input binding to consume messages from OrderCreated topic
  6. Define the input binding to consume messages from OrderProcessed topic
  7. Map input/output bindings to the corresponding topic

Note that our consumers are defined in a consumer group which is a collection of Kafka consumer instances that work together to consume messages from one or more Kafka topics. Consumer groups allow for parallel processing of messages across multiple instances while ensuring that each message is processed by only one consumer within the group.

4.4. Demo

Now, let’s start our application, then change the application port to another for example 8082, and start another application to create a new instance for Order Service and Fulfilment Service. After that let’s create some orders from 3 customers (id = 10,20,30) via Checkout Service.

Let’s access Kafka UI to observe what happened behind the scenes.

Consumers: there are two consumer groups as we defined in the YML file each of them has two consumer members because we started two application instances.

Topics: our topics were created as expected with two partitions.

Messages: you would see the messages were produced as our expectation (an order should have one OrderCreated message and one OrderProcessed message).

The messages from the same customer ID were assigned to the same partition since we use customer ID as the message key.

Finally, if you open our applications’s logs you will notice each message in the subscribed topic partitions is consumed by only one consumer within the group. This allows for parallel processing of messages across multiple consumers which is such a cool feature, thank Kafka consumer group.

The demo project can be found on GitHub.

5. Conclusion

In this article, we explored the power and versatility of Apache Kafka within a Docker-compose environment, demonstrating its fundamental concepts of topics, message keys, and consumer groups using Spring Cloud Stream. Through this demonstration, we’ve underscored Kafka’s pivotal role in modern distributed systems architecture, particularly in enabling robust, scalable, and fault-tolerant messaging solutions.

As Kafka continues to dominate the realm of real-time data processing and event-driven architectures, understanding its core concepts becomes increasingly crucial for developers and architects alike. There are many other important components in Kafka such as Schema Registry, ACK mechanism, and message TTL. We will dig into each of them in the next articles.

Leave a Reply

Your email address will not be published. Required fields are marked *

Back To Top