Monday, December 31, 2018

Kafka

Apache Kafka Tutorial - Java and Spark Integration

Apache Kafka Tutorial - Java and Spark Integration

This tutorial demonstrates how to work with Apache Kafka using both Java and Apache Spark, including producer and consumer implementations with auto offset management.

Code copied!

1. Java Implementation

1.1 Kafka Producer in Java

This implementation includes idempotence and safe producer configurations for reliable message delivery.
KafkaProducerExample.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {
    private static final String TOPIC_NAME = "example-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Create producer properties
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // Create safe producer configs
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

        // Create the producer
        try (KafkaProducer producer = new KafkaProducer<>(properties)) {
            
            // Send messages with callbacks
            for (int i = 0; i < 10; i++) {
                String key = "key_" + i;
                String value = "message_" + i;
                
                ProducerRecord record = 
                    new ProducerRecord<>(TOPIC_NAME, key, value);

                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("Message sent successfully! Topic: %s, Partition: %d, Offset: %d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                });
            }
            
            // Flush and close producer
            producer.flush();
        }
    }
}

1.2 Kafka Consumer in Java

This consumer implementation includes auto offset commit and earliest offset reset configuration.
KafkaConsumerExample.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "example-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-consumer-group";

    public static void main(String[] args) {
        // Create consumer properties
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // Auto offset reset config
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Enable auto commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // Create the consumer
        try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) {
            // Subscribe to topic
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));

            // Poll for messages
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord record : records) {
                    System.out.printf("Received message: key = %s, value = %s, topic = %s, partition = %d, offset = %d%n",
                        record.key(), record.value(), record.topic(), record.partition(), record.offset());
                }
            }
        }
    }
}

2. Spark Implementation

2.1 Spark Kafka Producer

This Spark implementation demonstrates how to write data to Kafka using Spark's DataFrame API.
SparkKafkaProducer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkKafkaProducer {
  def main(args: Array[String]): Unit = {
    // Create Spark Session
    val spark = SparkSession.builder()
      .appName("Spark Kafka Producer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Create sample data
    val data = (1 to 100).map(i => (s"key_$i", s"message_$i"))
      .toDF("key", "value")

    // Write to Kafka
    data.selectExpr("key", "value")
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "spark-topic")
      .save()

    spark.stop()
  }
}

2.2 Spark Kafka Consumer

This implementation uses Spark Structured Streaming to consume messages from Kafka with auto offset management.
SparkKafkaConsumer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._

object SparkKafkaConsumer {
  def main(args: Array[String]): Unit = {
    // Create Spark Session
    val spark = SparkSession.builder()
      .appName("Spark Kafka Consumer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Read from Kafka
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "spark-topic")
      .option("startingOffsets", "earliest") // Auto offset reset config
      .load()

    // Process the stream
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("1 second"))
      .start()

    query.awaitTermination()
  }
}

Dependencies

For Java Implementation:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>

For Spark Implementation:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>