SPARQL Property Paths: Advanced Graph Traversal Made Easy
SPARQL Property Paths: Advanced Graph Traversal Made Easy
Master SPARQL property paths to efficiently traverse and query complex graph relationships.
Code copied!
🎯 What are Property Paths?
Property paths in SPARQL allow you to express complex graph traversal patterns concisely.
They're particularly useful for finding connections between resources that may be several steps apart.
🔧 Basic Path Operators
Operator
Description
Example Use Case
/
Sequence Path
Finding grandparents (parent of parent)
|
Alternative Path
Finding either authors or scientists
?
Zero or One Path
Optional relationships
*
Zero or More Path
All ancestors in a family tree
+
One or More Path
All descendants in a family tree
!
Negated Property Path
Finding relationships that are not of a specific type
📝 Basic Examples
Finding Grandparents
Sequence Path Example
SELECT ?person ?grandparent
WHERE {
?person dbo:parent/dbo:parent ?grandparent .
}
The / operator connects two properties in sequence, similar to following two separate relationships.
The {1,3} syntax restricts the path length to between 1 and 3 steps.
Saturday, December 31, 2022
Mastering SPARQL: A Comprehensive Guide to Querying RDF Data
Mastering SPARQL: A Comprehensive Guide to Querying RDF Data
Learn how to effectively query RDF data using SPARQL, from basic queries to advanced federated searches.
Code copied!
📚 Introduction to SPARQL
SPARQL (SPARQL Protocol and RDF Query Language) is the standard query language for RDF (Resource Description Framework) data.
It allows you to query and manipulate data stored in RDF format across different data sources.
Building a Scalable GraphQL API with Apollo Server and DataLoader
Building a Scalable GraphQL API with Apollo Server and DataLoader
Learn how to build an efficient GraphQL API using Apollo Server and DataLoader.
We'll create a blog-like system with users, posts, and comments while avoiding the N+1 query problem.
Code copied!
🎯 What We're Building
We'll create a GraphQL API that handles relationships between:
Users who write posts and comments
Posts that belong to users and have comments
Comments that belong to posts and users
User ──┬── Posts
└── Comments ── Posts
🚀 Getting Started
First, let's set up our project with the necessary dependencies:
DataLoader is a key tool for solving the N+1 query problem in GraphQL. It batches and caches database lookups:
What is the N+1 Problem?
When fetching a list of items and their relations, you might end up making one query for the list and N additional queries for each item's relations.
query {
user(id: "1") {
name
email
posts {
title
content
comments {
text
author {
name
}
}
}
}
}
🎉 Benefits of This Approach
Efficient data loading with DataLoader's batching and caching
Clean separation of concerns between schema and resolvers
Type-safe API with GraphQL's type system
Flexible querying capabilities for clients
Important Note:
Remember to create new DataLoader instances for each request to prevent data leaks between different users' requests.
🔍 Performance Monitoring
You can monitor DataLoader's effectiveness by looking at the console logs. You should see multiple IDs being loaded in single batches instead of individual queries.
Pro Tip:
Use the Apollo Studio Explorer (available at http://localhost:4000) to test your queries and see the exact data being requested and returned.
Monday, October 5, 2020
Spring Batch Tutorial: Processing Records with Scheduled Jobs
Spring Batch Tutorial: Processing Records with Scheduled Jobs
Learn how to implement a Spring Batch application that processes records every 5 minutes using an in-memory H2 database.
Code copied!
🎯 Project Overview
In this tutorial, we'll build a Spring Batch application that:
Uses an in-memory H2 database for storing transactions
Processes pending transactions every 5 minutes
Implements a complete batch processing pipeline
Generates sample data for testing
🛠️ Project Setup
First, let's set up our project with the required dependencies in pom.xml:
Apache Kafka Tutorial - Java and Spark Integration
Apache Kafka Tutorial - Java and Spark Integration
This tutorial demonstrates how to work with Apache Kafka using both Java and Apache Spark, including producer and consumer implementations with auto offset management.
Code copied!
1. Java Implementation
1.1 Kafka Producer in Java
This implementation includes idempotence and safe producer configurations for reliable message delivery.
KafkaProducerExample.java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerExample {
private static final String TOPIC_NAME = "example-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// Create producer properties
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create safe producer configs
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
// Create the producer
try (KafkaProducer producer = new KafkaProducer<>(properties)) {
// Send messages with callbacks
for (int i = 0; i < 10; i++) {
String key = "key_" + i;
String value = "message_" + i;
ProducerRecord record =
new ProducerRecord<>(TOPIC_NAME, key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message sent successfully! Topic: %s, Partition: %d, Offset: %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
});
}
// Flush and close producer
producer.flush();
}
}
}
1.2 Kafka Consumer in Java
This consumer implementation includes auto offset commit and earliest offset reset configuration.
KafkaConsumerExample.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "example-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-consumer-group";
public static void main(String[] args) {
// Create consumer properties
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Auto offset reset config
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Enable auto commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// Create the consumer
try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) {
// Subscribe to topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// Poll for messages
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("Received message: key = %s, value = %s, topic = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.topic(), record.partition(), record.offset());
}
}
}
}
}
2. Spark Implementation
2.1 Spark Kafka Producer
This Spark implementation demonstrates how to write data to Kafka using Spark's DataFrame API.
SparkKafkaProducer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SparkKafkaProducer {
def main(args: Array[String]): Unit = {
// Create Spark Session
val spark = SparkSession.builder()
.appName("Spark Kafka Producer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Create sample data
val data = (1 to 100).map(i => (s"key_$i", s"message_$i"))
.toDF("key", "value")
// Write to Kafka
data.selectExpr("key", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "spark-topic")
.save()
spark.stop()
}
}
2.2 Spark Kafka Consumer
This implementation uses Spark Structured Streaming to consume messages from Kafka with auto offset management.
SparkKafkaConsumer.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
object SparkKafkaConsumer {
def main(args: Array[String]): Unit = {
// Create Spark Session
val spark = SparkSession.builder()
.appName("Spark Kafka Consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Read from Kafka
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "spark-topic")
.option("startingOffsets", "earliest") // Auto offset reset config
.load()
// Process the stream
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("1 second"))
.start()
query.awaitTermination()
}
}
@Entity
@Data
@NoArgsConstructor
public class Customer {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@NotBlank(message = "First name is required")
private String firstName;
@Email(message = "Email should be valid")
private String email;
@Min(value = 18, message = "Age should be at least 18")
private Integer age;
private Boolean active = true;
}
🔄 REST Endpoints
Method
Endpoint
Description
GET
/api/customers
Get all customers
GET
/api/customers/{id}
Get customer by ID
POST
/api/customers
Create new customer
PUT
/api/customers/{id}
Update customer
DELETE
/api/customers/{id}
Delete customer
PATCH
/api/customers/{id}/deactivate
Deactivate customer
🎯 Controller Implementation
CustomerController.java
@RestController
@RequestMapping("/api/customers")
@RequiredArgsConstructor
public class CustomerController {
private final CustomerService customerService;
@GetMapping
public ResponseEntity> getAllCustomers(
@RequestParam(required = false) Boolean activeOnly) {
List customers = activeOnly != null && activeOnly
? customerService.getActiveCustomers()
: customerService.getAllCustomers();
return ResponseEntity.ok(customers);
}
@PostMapping
public ResponseEntity createCustomer(
@Valid @RequestBody Customer customer) {
return new ResponseEntity<>(
customerService.createCustomer(customer),
HttpStatus.CREATED
);
}
}
⚡ Exception Handling
Global exception handler for consistent error responses:
GlobalExceptionHandler.java
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(CustomerNotFoundException.class)
public ResponseEntity handleCustomerNotFoundException(
CustomerNotFoundException ex) {
ErrorResponse error = new ErrorResponse(
HttpStatus.NOT_FOUND.value(),
ex.getMessage(),
LocalDateTime.now()
);
return new ResponseEntity<>(error, HttpStatus.NOT_FOUND);
}
}
📝 Sample Data
The application automatically generates sample customer data on startup:
This tutorial demonstrates how to implement word count using different components of the Hadoop ecosystem.
Code copied!
1. HDFS Example
HDFSWordCount.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class HDFSWordCount {
public static void main(String[] args) {
try {
// Initialize HDFS configuration
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
// Path to input file in HDFS
Path inputPath = new Path("/input/sample.txt");
// Read file from HDFS
FSDataInputStream in = fs.open(inputPath);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
// Word count map
Map wordCount = new HashMap<>();
String line;
while ((line = reader.readLine()) != null) {
String[] words = line.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
wordCount.merge(word, 1, Integer::sum);
}
}
}
// Write results back to HDFS
Path outputPath = new Path("/output/hdfs_wordcount.txt");
FSDataOutputStream out = fs.create(outputPath);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
for (Map.Entry entry : wordCount.entrySet()) {
writer.write(entry.getKey() + "\t" + entry.getValue() + "\n");
}
// Cleanup
writer.close();
reader.close();
System.out.println("Word count completed successfully!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
2. MapReduce Example
WordCountMR.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCountMR {
public static class TokenizerMapper extends Mapper
3. PIG Example
wordcount.pig
-- Load the input file
raw_input = LOAD '/input/sample.txt' AS (line:chararray);
-- Tokenize the words
words = FOREACH raw_input GENERATE FLATTEN(TOKENIZE(LOWER(line))) AS word;
-- Group words and count occurrences
word_groups = GROUP words BY word;
word_count = FOREACH word_groups GENERATE group AS word, COUNT(words) AS count;
-- Sort results by count in descending order
ordered_word_count = ORDER word_count BY count DESC;
-- Store the results
STORE ordered_word_count INTO '/output/pig_wordcount';
4. Hive Example
wordcount.hql
-- Create a table for raw input text
CREATE TABLE IF NOT EXISTS raw_text (
line STRING
);
-- Load data into the raw_text table
LOAD DATA INPATH '/input/sample.txt' INTO TABLE raw_text;
-- Create a table for word counts
CREATE TABLE IF NOT EXISTS word_counts (
word STRING,
count INT
);
-- Insert processed data into word_counts table
INSERT OVERWRITE TABLE word_counts
SELECT word, COUNT(*) as count
FROM (
SELECT EXPLODE(SPLIT(LOWER(line), '\\W+')) as word
FROM raw_text
WHERE TRIM(line) != ''
) w
WHERE LENGTH(word) > 0
GROUP BY word
ORDER BY count DESC;
5. Spark Examples
Scala Implementation
WordCountSpark.scala
import org.apache.spark.sql.SparkSession
object WordCountSpark {
def main(args: Array[String]): Unit = {
// Create Spark session
val spark = SparkSession.builder()
.appName("Spark Word Count")
.master("local[*]") // Use local mode for testing, remove for cluster deployment
.getOrCreate()
// Import spark implicits for DataFrame operations
import spark.implicits._
try {
// Read input file
val inputPath = "/input/sample.txt"
val outputPath = "/output/spark_wordcount"
// Read text file and split into words
val words = spark.read.textFile(inputPath)
.flatMap(line => line.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
// Count words
val wordCounts = words.groupBy("value")
.count()
.orderBy($"count".desc)
.withColumnRenamed("value", "word")
// Save results
wordCounts.write
.mode("overwrite")
.format("csv")
.option("header", "true")
.save(outputPath)
// Show results
println("Word Count Results:")
wordCounts.show()
} finally {
// Clean up
spark.stop()
}
}
}
Python Implementation
word_count_spark.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, count, desc
def word_count_spark():
# Create Spark session
spark = SparkSession.builder \
.appName("PySpark Word Count") \
.master("local[*]") \
.getOrCreate()
try:
# Input and output paths
input_path = "/input/sample.txt"
output_path = "/output/pyspark_wordcount"
# Read the input file
df = spark.read.text(input_path)
# Process the text and count words
word_counts = df.select(explode(split(lower(df.value), "\\W+")).alias("word")) \
.filter("word != ''") \
.groupBy("word") \
.agg(count("*").alias("count")) \
.orderBy(desc("count"))
# Save results
word_counts.write \
.mode("overwrite") \
.format("csv") \
.option("header", "true") \
.save(output_path)
# Show results
print("Word Count Results:")
word_counts.show()
finally:
# Clean up
spark.stop()
if __name__ == "__main__":
word_count_spark()