Apache Kafka Tutorial - Java and Spark Integration
This tutorial demonstrates how to work with Apache Kafka using both Java and Apache Spark, including producer and consumer implementations with auto offset management.
1. Java Implementation
1.1 Kafka Producer in Java
This implementation includes idempotence and safe producer configurations for reliable message delivery.
KafkaProducerExample.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerExample {
private static final String TOPIC_NAME = "example-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Create producer properties
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create safe producer configs
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
// Create the producer
try (KafkaProducer producer = new KafkaProducer<>(properties)) {
// Send messages with callbacks
for (int i = 0; i < 10; i++) {
String key = "key_" + i;
String value = "message_" + i;
ProducerRecord record =
new ProducerRecord<>(TOPIC_NAME, key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message sent successfully! Topic: %s, Partition: %d, Offset: %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
});
}
// Flush and close producer
producer.flush();
}
}
}
1.2 Kafka Consumer in Java
This consumer implementation includes auto offset commit and earliest offset reset configuration.
KafkaConsumerExample.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "example-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-consumer-group";
public static void main(String[] args) {
// Create consumer properties
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Auto offset reset config
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Enable auto commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// Create the consumer
try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) {
// Subscribe to topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// Poll for messages
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("Received message: key = %s, value = %s, topic = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.topic(), record.partition(), record.offset());
}
}
}
}
}
2. Spark Implementation
2.1 Spark Kafka Producer
This Spark implementation demonstrates how to write data to Kafka using Spark's DataFrame API.
SparkKafkaProducer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SparkKafkaProducer {
def main(args: Array[String]): Unit = {
// Create Spark Session
val spark = SparkSession.builder()
.appName("Spark Kafka Producer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Create sample data
val data = (1 to 100).map(i => (s"key_$i", s"message_$i"))
.toDF("key", "value")
// Write to Kafka
data.selectExpr("key", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "spark-topic")
.save()
spark.stop()
}
}
2.2 Spark Kafka Consumer
This implementation uses Spark Structured Streaming to consume messages from Kafka with auto offset management.
SparkKafkaConsumer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
object SparkKafkaConsumer {
def main(args: Array[String]): Unit = {
// Create Spark Session
val spark = SparkSession.builder()
.appName("Spark Kafka Consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Read from Kafka
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "spark-topic")
.option("startingOffsets", "earliest") // Auto offset reset config
.load()
// Process the stream
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 second"))
.start()
query.awaitTermination()
}
}
Dependencies
For Java Implementation:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
For Spark Implementation:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>