Tuesday, January 31, 2023

SPARQL Property Paths: Advanced Graph Traversal Made Easy

SPARQL Property Paths: Advanced Graph Traversal Made Easy

Master SPARQL property paths to efficiently traverse and query complex graph relationships.

Code copied!

🎯 What are Property Paths?

Property paths in SPARQL allow you to express complex graph traversal patterns concisely. They're particularly useful for finding connections between resources that may be several steps apart.

🔧 Basic Path Operators

Operator Description Example Use Case
/ Sequence Path Finding grandparents (parent of parent)
| Alternative Path Finding either authors or scientists
? Zero or One Path Optional relationships
* Zero or More Path All ancestors in a family tree
+ One or More Path All descendants in a family tree
! Negated Property Path Finding relationships that are not of a specific type

📝 Basic Examples

Finding Grandparents

Sequence Path Example
SELECT ?person ?grandparent
WHERE {
    ?person dbo:parent/dbo:parent ?grandparent .
}
The / operator connects two properties in sequence, similar to following two separate relationships.

Finding Authors or Scientists

Alternative Path Example
SELECT ?person ?name
WHERE {
    ?person rdf:type/(dbo:Author|dbo:Scientist) ;
           foaf:name ?name .
}

🌟 Advanced Path Patterns

Academic Genealogy

One or More Path Example
SELECT ?student ?advisor ?generation
WHERE {
    ?student dbo:doctoralAdvisor+ ?advisor .
    BIND(LENGTH(STR(?advisor)) AS ?generation)
}
The + operator finds one or more repetitions of the relationship, perfect for tracing academic lineage.

Finding Connected Artists

Complex Path Example
SELECT ?artist1 ?artist2 ?path
WHERE {
    ?artist1 rdf:type dbo:Artist .
    ?artist2 rdf:type dbo:Artist .
    ?artist1 (dbo:collaborator|dbo:influenced)* ?intermediate .
    ?intermediate (dbo:collaborator|dbo:influenced)* ?artist2 .
    BIND(CONCAT(?artist1, " -> ", ?intermediate, " -> ", ?artist2) AS ?path)
    FILTER(?artist1 != ?artist2)
}

💡 Best Practices

Follow these guidelines for effective property paths:
  • Use property paths for traversing known relationship patterns
  • Be careful with * and + on large datasets
  • Consider using LIMIT to restrict results when using recursive paths
  • Use named graphs to limit the scope of path traversal
  • Combine property paths with FILTER for more precise results

⚠️ Common Pitfalls

Watch out for:
  • Unbounded path expressions (* and +) on large datasets
  • Complex path patterns that may timeout
  • Forgetting to filter out self-references in cyclic paths
  • Not considering the direction of relationships

🎯 Use Cases

  • Social network analysis (finding connections between people)
  • Genealogical research (family trees)
  • Citation networks (academic papers and their references)
  • Organizational hierarchies
  • Knowledge graph navigation

🚀 Advanced Techniques

Limited Length Paths

Path Length Restriction Example
SELECT DISTINCT ?author1 ?author2
WHERE {
    ?author1 rdf:type dbo:Author .
    ?author2 rdf:type dbo:Author .
    ?author1 (dbo:coAuthor){1,3} ?author2 .
    FILTER(?author1 != ?author2)
}
The {1,3} syntax restricts the path length to between 1 and 3 steps.

Saturday, December 31, 2022

Mastering SPARQL: A Comprehensive Guide to Querying RDF Data

Mastering SPARQL: A Comprehensive Guide to Querying RDF Data

Learn how to effectively query RDF data using SPARQL, from basic queries to advanced federated searches.

Code copied!

📚 Introduction to SPARQL

SPARQL (SPARQL Protocol and RDF Query Language) is the standard query language for RDF (Resource Description Framework) data. It allows you to query and manipulate data stored in RDF format across different data sources.

🔍 Basic Queries

Let's start with some basic SPARQL queries:

Finding Capital Cities in Europe

Basic SELECT Query
PREFIX rdf: 
PREFIX dbo: 

SELECT ?city ?country ?population
WHERE {
    ?city rdf:type dbo:City ;
          dbo:country ?country ;
          dbo:populationTotal ?population .
    ?country dbo:capital ?city ;
             dbo:continent dbr:Europe .
}
ORDER BY DESC(?population)
LIMIT 10
Key Points:
  • PREFIX declarations define shortcuts for URIs
  • SELECT specifies which variables to include in results
  • WHERE clause defines the pattern to match
  • ORDER BY and LIMIT control result ordering and quantity

🔄 Advanced Queries

Using OPTIONAL and FILTER

Advanced Query Example
SELECT ?scientist ?discovery ?year
WHERE {
    ?scientist rdf:type dbo:Scientist .
    OPTIONAL {
        ?discovery dbo:discoverer ?scientist ;
                  dbo:discoveryYear ?year .
    }
    FILTER(?year > 1900)
}
Pro Tip: Use OPTIONAL when some data might be missing and you still want results for the main pattern.

✏️ Update Operations

Inserting and Modifying Data

Update Query Example
PREFIX ex: 

INSERT DATA {
    ex:JohnDoe rdf:type ex:Person ;
               ex:name "John Doe" ;
               ex:age 30 .
}

DELETE {
    ex:JohnDoe ex:age ?oldAge .
}
INSERT {
    ex:JohnDoe ex:age 31 .
}
WHERE {
    ex:JohnDoe ex:age ?oldAge .
}

🌐 Federated Queries

Federated queries allow you to query multiple SPARQL endpoints in a single query:

Federated Query Example
SELECT ?person ?dbpediaBirth ?wikidataBirth
WHERE {
    # DBpedia part
    ?person rdf:type dbo:Scientist ;
            dbo:birthDate ?dbpediaBirth .
    
    # Wikidata part
    SERVICE  {
        ?wikidataPerson wdt:P31 wd:Q5 ;
                        wdt:P569 ?wikidataBirth .
    }
}

📊 Common SPARQL Patterns

Pattern Use Case
FILTER Restrict results based on conditions
OPTIONAL Include optional patterns without failing the query
UNION Combine results from alternative patterns
GROUP BY Aggregate results based on variables
SERVICE Query external SPARQL endpoints

🎯 Best Practices

Follow these guidelines for better queries:
  • Always use appropriate PREFIX declarations
  • Start with specific patterns and add OPTIONAL patterns later
  • Use LIMIT and OFFSET for pagination
  • Include FILTER clauses as early as possible
  • Use named graphs for better data organization

Thursday, December 31, 2020

graphql

Building a Scalable GraphQL API with Apollo Server and DataLoader

Building a Scalable GraphQL API with Apollo Server and DataLoader

Learn how to build an efficient GraphQL API using Apollo Server and DataLoader. We'll create a blog-like system with users, posts, and comments while avoiding the N+1 query problem.

Code copied!

🎯 What We're Building

We'll create a GraphQL API that handles relationships between:

  • Users who write posts and comments
  • Posts that belong to users and have comments
  • Comments that belong to posts and users
User ──┬── Posts
       └── Comments ── Posts
            

🚀 Getting Started

First, let's set up our project with the necessary dependencies:

package.json
{
  "name": "graphql-apollo-tutorial",
  "version": "1.0.0",
  "type": "module",
  "dependencies": {
    "@apollo/server": "^4.9.5",
    "dataloader": "^2.2.2",
    "graphql": "^16.8.1"
  }
}

📦 Setting Up DataLoaders

DataLoader is a key tool for solving the N+1 query problem in GraphQL. It batches and caches database lookups:

What is the N+1 Problem?
When fetching a list of items and their relations, you might end up making one query for the list and N additional queries for each item's relations.
dataloaders/index.js
import DataLoader from 'dataloader';

export const createLoaders = () => ({
  userLoader: new DataLoader(async (userIds) => {
    console.log('Batch loading users:', userIds);
    return userIds.map(id => users.find(user => user.id === id));
  }),

  userPostsLoader: new DataLoader(async (userIds) => {
    console.log('Batch loading posts for users:', userIds);
    const postsByUser = groupBy(posts, post => post.userId);
    return userIds.map(userId => postsByUser[userId] || []);
  }),

  postCommentsLoader: new DataLoader(async (postIds) => {
    console.log('Batch loading comments for posts:', postIds);
    const commentsByPost = groupBy(comments, comment => comment.postId);
    return postIds.map(postId => commentsByPost[postId] || []);
  })
});

📝 Defining the Schema

Our GraphQL schema defines the types and their relationships:

schema.js
type User {
  id: ID!
  name: String!
  email: String!
  posts: [Post!]!
}

type Post {
  id: ID!
  title: String!
  content: String!
  author: User!
  comments: [Comment!]!
}

type Comment {
  id: ID!
  text: String!
  post: Post!
  author: User!
}

🔧 Implementing Resolvers

Resolvers define how to fetch the data for each field:

resolvers.js
export const resolvers = {
  Query: {
    user: (_, { id }, { loaders }) => {
      return loaders.userLoader.load(id);
    },
    posts: (_, __, { posts }) => posts,
  },

  User: {
    posts: (parent, _, { loaders }) => {
      return loaders.userPostsLoader.load(parent.id);
    },
  },

  Post: {
    author: (parent, _, { loaders }) => {
      return loaders.userLoader.load(parent.userId);
    },
    comments: (parent, _, { loaders }) => {
      return loaders.postCommentsLoader.load(parent.id);
    },
  }
};

🌟 Example Queries

Here's how to query our API:

Query Example
query {
  user(id: "1") {
    name
    email
    posts {
      title
      content
      comments {
        text
        author {
          name
        }
      }
    }
  }
}

🎉 Benefits of This Approach

  • Efficient data loading with DataLoader's batching and caching
  • Clean separation of concerns between schema and resolvers
  • Type-safe API with GraphQL's type system
  • Flexible querying capabilities for clients
Important Note:
Remember to create new DataLoader instances for each request to prevent data leaks between different users' requests.

🔍 Performance Monitoring

You can monitor DataLoader's effectiveness by looking at the console logs. You should see multiple IDs being loaded in single batches instead of individual queries.

Pro Tip:
Use the Apollo Studio Explorer (available at http://localhost:4000) to test your queries and see the exact data being requested and returned.

Monday, October 5, 2020

Spring Batch Tutorial: Processing Records with Scheduled Jobs

Spring Batch Tutorial: Processing Records with Scheduled Jobs

Learn how to implement a Spring Batch application that processes records every 5 minutes using an in-memory H2 database.

Code copied!

🎯 Project Overview

In this tutorial, we'll build a Spring Batch application that:

  • Uses an in-memory H2 database for storing transactions
  • Processes pending transactions every 5 minutes
  • Implements a complete batch processing pipeline
  • Generates sample data for testing

🛠️ Project Setup

First, let's set up our project with the required dependencies in pom.xml:

pom.xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

📦 Domain Model

We'll create a simple Transaction entity to represent our data:

Transaction.java
@Entity
@Data
@NoArgsConstructor
public class Transaction {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String accountId;
    private BigDecimal amount;
    private String type;
    private LocalDateTime timestamp;
    private String status;
}

⚙️ Batch Configuration

The batch job configuration is the heart of our application:

BatchConfig.java
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchConfig {
    @Bean
    public ItemReader reader() {
        RepositoryItemReader reader = new RepositoryItemReader<>();
        reader.setRepository(transactionRepository);
        reader.setMethodName("findByStatus");
        reader.setArguments(java.util.Arrays.asList("PENDING"));
        return reader;
    }

    @Bean
    public ItemProcessor processor() {
        return transaction -> {
            transaction.setStatus("PROCESSED");
            return transaction;
        };
    }

    @Bean
    public Job processTransactionsJob() {
        return jobBuilderFactory.get("processTransactionsJob")
                .incrementer(new RunIdIncrementer())
                .flow(processTransactionsStep())
                .end()
                .build();
    }
}

⏰ Job Scheduler

The scheduler runs our batch job every 5 minutes:

BatchJobScheduler.java
@Component
@EnableScheduling
@RequiredArgsConstructor
@Slf4j
public class BatchJobScheduler {
    @Scheduled(fixedRate = 300000) // Run every 5 minutes
    public void runBatchJob() {
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
        
        jobLauncher.run(processTransactionsJob, jobParameters);
    }
}

🔄 Sample Data Generation

We generate sample transactions on application startup:

TransactionService.java
@Service
@RequiredArgsConstructor
public class TransactionService {
    public void generateSampleTransactions() {
        for (int i = 0; i < 50; i++) {
            Transaction transaction = new Transaction();
            transaction.setAccountId(UUID.randomUUID().toString());
            transaction.setAmount(BigDecimal.valueOf(Math.random() * 1000));
            transaction.setType(Math.random() > 0.5 ? "CREDIT" : "DEBIT");
            transaction.setTimestamp(LocalDateTime.now());
            transaction.setStatus("PENDING");
            
            transactionRepository.save(transaction);
        }
    }
}

💡 Key Features

This implementation includes:
  • Chunk-based processing (10 items per chunk)
  • Automatic job parameter incrementation
  • Error handling and logging
  • Scheduled execution every 5 minutes
  • In-memory H2 database for easy testing

🚀 Running the Application

  1. Clone the repository
  2. Run mvn clean install
  3. Start the application
  4. Access H2 console at http://localhost:8080/h2-console
  5. Watch the logs for batch job execution every 5 minutes
Note: The H2 database is in-memory, so data will be reset when the application restarts.

🔍 Monitoring

You can monitor the batch jobs through:

  • Application logs
  • H2 console for database inspection
  • Spring Batch tables (BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION)

Monday, December 31, 2018

Kafka

Apache Kafka Tutorial - Java and Spark Integration

Apache Kafka Tutorial - Java and Spark Integration

This tutorial demonstrates how to work with Apache Kafka using both Java and Apache Spark, including producer and consumer implementations with auto offset management.

Code copied!

1. Java Implementation

1.1 Kafka Producer in Java

This implementation includes idempotence and safe producer configurations for reliable message delivery.
KafkaProducerExample.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {
    private static final String TOPIC_NAME = "example-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Create producer properties
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // Create safe producer configs
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

        // Create the producer
        try (KafkaProducer producer = new KafkaProducer<>(properties)) {
            
            // Send messages with callbacks
            for (int i = 0; i < 10; i++) {
                String key = "key_" + i;
                String value = "message_" + i;
                
                ProducerRecord record = 
                    new ProducerRecord<>(TOPIC_NAME, key, value);

                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("Message sent successfully! Topic: %s, Partition: %d, Offset: %d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                });
            }
            
            // Flush and close producer
            producer.flush();
        }
    }
}

1.2 Kafka Consumer in Java

This consumer implementation includes auto offset commit and earliest offset reset configuration.
KafkaConsumerExample.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "example-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-consumer-group";

    public static void main(String[] args) {
        // Create consumer properties
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
        // Auto offset reset config
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Enable auto commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // Create the consumer
        try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) {
            // Subscribe to topic
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));

            // Poll for messages
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord record : records) {
                    System.out.printf("Received message: key = %s, value = %s, topic = %s, partition = %d, offset = %d%n",
                        record.key(), record.value(), record.topic(), record.partition(), record.offset());
                }
            }
        }
    }
}

2. Spark Implementation

2.1 Spark Kafka Producer

This Spark implementation demonstrates how to write data to Kafka using Spark's DataFrame API.
SparkKafkaProducer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object SparkKafkaProducer {
  def main(args: Array[String]): Unit = {
    // Create Spark Session
    val spark = SparkSession.builder()
      .appName("Spark Kafka Producer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Create sample data
    val data = (1 to 100).map(i => (s"key_$i", s"message_$i"))
      .toDF("key", "value")

    // Write to Kafka
    data.selectExpr("key", "value")
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "spark-topic")
      .save()

    spark.stop()
  }
}

2.2 Spark Kafka Consumer

This implementation uses Spark Structured Streaming to consume messages from Kafka with auto offset management.
SparkKafkaConsumer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._

object SparkKafkaConsumer {
  def main(args: Array[String]): Unit = {
    // Create Spark Session
    val spark = SparkSession.builder()
      .appName("Spark Kafka Consumer")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Read from Kafka
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "spark-topic")
      .option("startingOffsets", "earliest") // Auto offset reset config
      .load()

    // Process the stream
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("1 second"))
      .start()

    query.awaitTermination()
  }
}

Dependencies

For Java Implementation:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>

For Spark Implementation:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>

Wednesday, June 20, 2018

Building a RESTful API with Spring Boot and H2 Database

Building a RESTful API with Spring Boot and H2 Database

Learn how to create a production-ready REST API using Spring Boot, JPA, and H2 Database with complete CRUD operations and proper error handling.

🎯 Project Overview

We'll build a Customer Management API that includes:

  • CRUD operations for customer data
  • Input validation
  • Exception handling
  • Swagger/OpenAPI documentation
  • In-memory H2 database
  • Sample data generation

🛠️ Project Setup

First, let's set up our project with the required dependencies in pom.xml:

pom.xml
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

📦 Domain Model

Create the Customer entity with validation:

Customer.java
@Entity
@Data
@NoArgsConstructor
public class Customer {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @NotBlank(message = "First name is required")
    private String firstName;

    @Email(message = "Email should be valid")
    private String email;

    @Min(value = 18, message = "Age should be at least 18")
    private Integer age;

    private Boolean active = true;
}

🔄 REST Endpoints

Method Endpoint Description
GET /api/customers Get all customers
GET /api/customers/{id} Get customer by ID
POST /api/customers Create new customer
PUT /api/customers/{id} Update customer
DELETE /api/customers/{id} Delete customer
PATCH /api/customers/{id}/deactivate Deactivate customer

🎯 Controller Implementation

CustomerController.java
@RestController
@RequestMapping("/api/customers")
@RequiredArgsConstructor
public class CustomerController {
    private final CustomerService customerService;

    @GetMapping
    public ResponseEntity> getAllCustomers(
            @RequestParam(required = false) Boolean activeOnly) {
        List customers = activeOnly != null && activeOnly 
            ? customerService.getActiveCustomers()
            : customerService.getAllCustomers();
        return ResponseEntity.ok(customers);
    }

    @PostMapping
    public ResponseEntity createCustomer(
            @Valid @RequestBody Customer customer) {
        return new ResponseEntity<>(
            customerService.createCustomer(customer), 
            HttpStatus.CREATED
        );
    }
}

⚡ Exception Handling

Global exception handler for consistent error responses:

GlobalExceptionHandler.java
@RestControllerAdvice
public class GlobalExceptionHandler {
    @ExceptionHandler(CustomerNotFoundException.class)
    public ResponseEntity handleCustomerNotFoundException(
            CustomerNotFoundException ex) {
        ErrorResponse error = new ErrorResponse(
            HttpStatus.NOT_FOUND.value(),
            ex.getMessage(),
            LocalDateTime.now()
        );
        return new ResponseEntity<>(error, HttpStatus.NOT_FOUND);
    }
}

📝 Sample Data

The application automatically generates sample customer data on startup:

Sample Data
{
    "firstName": "John",
    "lastName": "Doe",
    "email": "john.doe@example.com",
    "phone": "+1234567890",
    "age": 30
}

🚀 Running the Application

  1. Clone the repository
  2. Run mvn clean install
  3. Start the application
  4. Access Swagger UI at http://localhost:8080/swagger-ui.html
  5. Access H2 Console at http://localhost:8080/h2-console
Testing the API:

You can use tools like Postman or curl to test the endpoints. For example:

curl http://localhost:8080/api/customers

Tuesday, March 28, 2017

Hadoop

Hadoop Ecosystem Tutorial - Word Count Examples

Hadoop Ecosystem Tutorial - Word Count Examples

This tutorial demonstrates how to implement word count using different components of the Hadoop ecosystem.

Code copied!

1. HDFS Example

HDFSWordCount.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class HDFSWordCount {
    public static void main(String[] args) {
        try {
            // Initialize HDFS configuration
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);

            // Path to input file in HDFS
            Path inputPath = new Path("/input/sample.txt");
            
            // Read file from HDFS
            FSDataInputStream in = fs.open(inputPath);
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            
            // Word count map
            Map wordCount = new HashMap<>();
            
            String line;
            while ((line = reader.readLine()) != null) {
                String[] words = line.toLowerCase().split("\\W+");
                for (String word : words) {
                    if (word.length() > 0) {
                        wordCount.merge(word, 1, Integer::sum);
                    }
                }
            }
            
            // Write results back to HDFS
            Path outputPath = new Path("/output/hdfs_wordcount.txt");
            FSDataOutputStream out = fs.create(outputPath);
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
            
            for (Map.Entry entry : wordCount.entrySet()) {
                writer.write(entry.getKey() + "\t" + entry.getValue() + "\n");
            }
            
            // Cleanup
            writer.close();
            reader.close();
            System.out.println("Word count completed successfully!");
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2. MapReduce Example

WordCountMR.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;

public class WordCountMR {
    
    public static class TokenizerMapper extends Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken().toLowerCase());
                context.write(word, one);
            }
        }
    }
    
    public static class IntSumReducer extends Reducer {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        
        job.setJarByClass(WordCountMR.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3. PIG Example

wordcount.pig
-- Load the input file
raw_input = LOAD '/input/sample.txt' AS (line:chararray);

-- Tokenize the words
words = FOREACH raw_input GENERATE FLATTEN(TOKENIZE(LOWER(line))) AS word;

-- Group words and count occurrences
word_groups = GROUP words BY word;
word_count = FOREACH word_groups GENERATE group AS word, COUNT(words) AS count;

-- Sort results by count in descending order
ordered_word_count = ORDER word_count BY count DESC;

-- Store the results
STORE ordered_word_count INTO '/output/pig_wordcount';

4. Hive Example

wordcount.hql
-- Create a table for raw input text
CREATE TABLE IF NOT EXISTS raw_text (
    line STRING
);

-- Load data into the raw_text table
LOAD DATA INPATH '/input/sample.txt' INTO TABLE raw_text;

-- Create a table for word counts
CREATE TABLE IF NOT EXISTS word_counts (
    word STRING,
    count INT
);

-- Insert processed data into word_counts table
INSERT OVERWRITE TABLE word_counts
SELECT word, COUNT(*) as count
FROM (
    SELECT EXPLODE(SPLIT(LOWER(line), '\\W+')) as word
    FROM raw_text
    WHERE TRIM(line) != ''
) w
WHERE LENGTH(word) > 0
GROUP BY word
ORDER BY count DESC;

5. Spark Examples

Scala Implementation

WordCountSpark.scala
import org.apache.spark.sql.SparkSession

object WordCountSpark {
  def main(args: Array[String]): Unit = {
    // Create Spark session
    val spark = SparkSession.builder()
      .appName("Spark Word Count")
      .master("local[*]")  // Use local mode for testing, remove for cluster deployment
      .getOrCreate()

    // Import spark implicits for DataFrame operations
    import spark.implicits._

    try {
      // Read input file
      val inputPath = "/input/sample.txt"
      val outputPath = "/output/spark_wordcount"

      // Read text file and split into words
      val words = spark.read.textFile(inputPath)
        .flatMap(line => line.toLowerCase.split("\\W+"))
        .filter(_.nonEmpty)

      // Count words
      val wordCounts = words.groupBy("value")
        .count()
        .orderBy($"count".desc)
        .withColumnRenamed("value", "word")

      // Save results
      wordCounts.write
        .mode("overwrite")
        .format("csv")
        .option("header", "true")
        .save(outputPath)

      // Show results
      println("Word Count Results:")
      wordCounts.show()

    } finally {
      // Clean up
      spark.stop()
    }
  }
}

Python Implementation

word_count_spark.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, count, desc

def word_count_spark():
    # Create Spark session
    spark = SparkSession.builder \
        .appName("PySpark Word Count") \
        .master("local[*]") \
        .getOrCreate()

    try:
        # Input and output paths
        input_path = "/input/sample.txt"
        output_path = "/output/pyspark_wordcount"

        # Read the input file
        df = spark.read.text(input_path)

        # Process the text and count words
        word_counts = df.select(explode(split(lower(df.value), "\\W+")).alias("word")) \
            .filter("word != ''") \
            .groupBy("word") \
            .agg(count("*").alias("count")) \
            .orderBy(desc("count"))

        # Save results
        word_counts.write \
            .mode("overwrite") \
            .format("csv") \
            .option("header", "true") \
            .save(output_path)

        # Show results
        print("Word Count Results:")
        word_counts.show()

    finally:
        # Clean up
        spark.stop()

if __name__ == "__main__":
    word_count_spark()