Monday, December 31, 2018

Kafka

Apache Kafka Tutorial - Java and Spark Integration

Apache Kafka Tutorial - Java and Spark Integration

This tutorial demonstrates how to work with Apache Kafka using both Java and Apache Spark, including producer and consumer implementations with auto offset management.

Code copied!

1. Java Implementation

1.1 Kafka Producer in Java

This implementation includes idempotence and safe producer configurations for reliable message delivery.
KafkaProducerExample.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {
    private static final String TOPIC_NAME = "example-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Create producer properties
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // Create safe producer configs
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

        // Create the producer
        try (KafkaProducer producer = new KafkaProducer<>(properties)) {
            
            // Send messages with callbacks
            for (int i = 0; i < 10; i++) {
                String key = "key_" + i;
                String value = "message_" + i;
                
                ProducerRecord record = 
                    new ProducerRecord<>(TOPIC_NAME, key, value);

                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("Message sent successfully! Topic: %s, Partition: %d, Offset: %d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                });
            }
            
            // Flush and close producer
            producer.flush();
        }
    }
}

1.2 Kafka Consumer in Java

This consumer implementation includes auto offset commit and earliest offset reset configuration.
KafkaConsumerExample.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "example-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-consumer-group";

    public static void main(String[] args) {
        // Create consumer properties
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // Auto offset reset config
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Enable auto commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // Create the consumer
        try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) {
            // Subscribe to topic
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));

            // Poll for messages
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord record : records) {
                    System.out.printf("Received message: key = %s, value = %s, topic = %s, partition = %d, offset = %d%n",
                        record.key(), record.value(), record.topic(), record.partition(), record.offset());
                }
            }
        }
    }
}

2. Spark Implementation

2.1 Spark Kafka Producer

This Spark implementation demonstrates how to write data to Kafka using Spark's DataFrame API.
SparkKafkaProducer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkKafkaProducer {
  def main(args: Array[String]): Unit = {
    // Create Spark Session
    val spark = SparkSession.builder()
      .appName("Spark Kafka Producer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Create sample data
    val data = (1 to 100).map(i => (s"key_$i", s"message_$i"))
      .toDF("key", "value")

    // Write to Kafka
    data.selectExpr("key", "value")
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "spark-topic")
      .save()

    spark.stop()
  }
}

2.2 Spark Kafka Consumer

This implementation uses Spark Structured Streaming to consume messages from Kafka with auto offset management.
SparkKafkaConsumer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._

object SparkKafkaConsumer {
  def main(args: Array[String]): Unit = {
    // Create Spark Session
    val spark = SparkSession.builder()
      .appName("Spark Kafka Consumer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Read from Kafka
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "spark-topic")
      .option("startingOffsets", "earliest") // Auto offset reset config
      .load()

    // Process the stream
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("1 second"))
      .start()

    query.awaitTermination()
  }
}

Dependencies

For Java Implementation:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>

For Spark Implementation:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>

Wednesday, June 20, 2018

Building a RESTful API with Spring Boot and H2 Database

Building a RESTful API with Spring Boot and H2 Database

Learn how to create a production-ready REST API using Spring Boot, JPA, and H2 Database with complete CRUD operations and proper error handling.

🎯 Project Overview

We'll build a Customer Management API that includes:

  • CRUD operations for customer data
  • Input validation
  • Exception handling
  • Swagger/OpenAPI documentation
  • In-memory H2 database
  • Sample data generation

🛠️ Project Setup

First, let's set up our project with the required dependencies in pom.xml:

pom.xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

📦 Domain Model

Create the Customer entity with validation:

Customer.java
@Entity
@Data
@NoArgsConstructor
public class Customer {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @NotBlank(message = "First name is required")
    private String firstName;

    @Email(message = "Email should be valid")
    private String email;

    @Min(value = 18, message = "Age should be at least 18")
    private Integer age;

    private Boolean active = true;
}

🔄 REST Endpoints

Method Endpoint Description
GET /api/customers Get all customers
GET /api/customers/{id} Get customer by ID
POST /api/customers Create new customer
PUT /api/customers/{id} Update customer
DELETE /api/customers/{id} Delete customer
PATCH /api/customers/{id}/deactivate Deactivate customer

🎯 Controller Implementation

CustomerController.java
@RestController
@RequestMapping("/api/customers")
@RequiredArgsConstructor
public class CustomerController {
    private final CustomerService customerService;

    @GetMapping
    public ResponseEntity> getAllCustomers(
            @RequestParam(required = false) Boolean activeOnly) {
        List customers = activeOnly != null && activeOnly 
            ? customerService.getActiveCustomers()
            : customerService.getAllCustomers();
        return ResponseEntity.ok(customers);
    }

    @PostMapping
    public ResponseEntity createCustomer(
            @Valid @RequestBody Customer customer) {
        return new ResponseEntity<>(
            customerService.createCustomer(customer), 
            HttpStatus.CREATED
        );
    }
}

⚡ Exception Handling

Global exception handler for consistent error responses:

GlobalExceptionHandler.java
@RestControllerAdvice
public class GlobalExceptionHandler {
    @ExceptionHandler(CustomerNotFoundException.class)
    public ResponseEntity handleCustomerNotFoundException(
            CustomerNotFoundException ex) {
        ErrorResponse error = new ErrorResponse(
            HttpStatus.NOT_FOUND.value(),
            ex.getMessage(),
            LocalDateTime.now()
        );
        return new ResponseEntity<>(error, HttpStatus.NOT_FOUND);
    }
}

📝 Sample Data

The application automatically generates sample customer data on startup:

Sample Data
{
    "firstName": "John",
    "lastName": "Doe",
    "email": "john.doe@example.com",
    "phone": "+1234567890",
    "age": 30
}

🚀 Running the Application

  1. Clone the repository
  2. Run mvn clean install
  3. Start the application
  4. Access Swagger UI at http://localhost:8080/swagger-ui.html
  5. Access H2 Console at http://localhost:8080/h2-console
Testing the API:

You can use tools like Postman or curl to test the endpoints. For example:

curl http://localhost:8080/api/customers