Hadoop Ecosystem Tutorial - Word Count Examples
This tutorial demonstrates how to implement word count using different components of the Hadoop ecosystem.
1. HDFS Example
HDFSWordCount.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class HDFSWordCount {
public static void main(String[] args) {
try {
// Initialize HDFS configuration
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
// Path to input file in HDFS
Path inputPath = new Path("/input/sample.txt");
// Read file from HDFS
FSDataInputStream in = fs.open(inputPath);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
// Word count map
Map wordCount = new HashMap<>();
String line;
while ((line = reader.readLine()) != null) {
String[] words = line.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
wordCount.merge(word, 1, Integer::sum);
}
}
}
// Write results back to HDFS
Path outputPath = new Path("/output/hdfs_wordcount.txt");
FSDataOutputStream out = fs.create(outputPath);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
for (Map.Entry entry : wordCount.entrySet()) {
writer.write(entry.getKey() + "\t" + entry.getValue() + "\n");
}
// Cleanup
writer.close();
reader.close();
System.out.println("Word count completed successfully!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
2. MapReduce Example
WordCountMR.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCountMR {
public static class TokenizerMapper extends Mapper
3. PIG Example
wordcount.pig
-- Load the input file
raw_input = LOAD '/input/sample.txt' AS (line:chararray);
-- Tokenize the words
words = FOREACH raw_input GENERATE FLATTEN(TOKENIZE(LOWER(line))) AS word;
-- Group words and count occurrences
word_groups = GROUP words BY word;
word_count = FOREACH word_groups GENERATE group AS word, COUNT(words) AS count;
-- Sort results by count in descending order
ordered_word_count = ORDER word_count BY count DESC;
-- Store the results
STORE ordered_word_count INTO '/output/pig_wordcount';
4. Hive Example
wordcount.hql
-- Create a table for raw input text
CREATE TABLE IF NOT EXISTS raw_text (
line STRING
);
-- Load data into the raw_text table
LOAD DATA INPATH '/input/sample.txt' INTO TABLE raw_text;
-- Create a table for word counts
CREATE TABLE IF NOT EXISTS word_counts (
word STRING,
count INT
);
-- Insert processed data into word_counts table
INSERT OVERWRITE TABLE word_counts
SELECT word, COUNT(*) as count
FROM (
SELECT EXPLODE(SPLIT(LOWER(line), '\\W+')) as word
FROM raw_text
WHERE TRIM(line) != ''
) w
WHERE LENGTH(word) > 0
GROUP BY word
ORDER BY count DESC;
5. Spark Examples
Scala Implementation
WordCountSpark.scala
import org.apache.spark.sql.SparkSession
object WordCountSpark {
def main(args: Array[String]): Unit = {
// Create Spark session
val spark = SparkSession.builder()
.appName("Spark Word Count")
.master("local[*]") // Use local mode for testing, remove for cluster deployment
.getOrCreate()
// Import spark implicits for DataFrame operations
import spark.implicits._
try {
// Read input file
val inputPath = "/input/sample.txt"
val outputPath = "/output/spark_wordcount"
// Read text file and split into words
val words = spark.read.textFile(inputPath)
.flatMap(line => line.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
// Count words
val wordCounts = words.groupBy("value")
.count()
.orderBy($"count".desc)
.withColumnRenamed("value", "word")
// Save results
wordCounts.write
.mode("overwrite")
.format("csv")
.option("header", "true")
.save(outputPath)
// Show results
println("Word Count Results:")
wordCounts.show()
} finally {
// Clean up
spark.stop()
}
}
}
Python Implementation
word_count_spark.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, count, desc
def word_count_spark():
# Create Spark session
spark = SparkSession.builder \
.appName("PySpark Word Count") \
.master("local[*]") \
.getOrCreate()
try:
# Input and output paths
input_path = "/input/sample.txt"
output_path = "/output/pyspark_wordcount"
# Read the input file
df = spark.read.text(input_path)
# Process the text and count words
word_counts = df.select(explode(split(lower(df.value), "\\W+")).alias("word")) \
.filter("word != ''") \
.groupBy("word") \
.agg(count("*").alias("count")) \
.orderBy(desc("count"))
# Save results
word_counts.write \
.mode("overwrite") \
.format("csv") \
.option("header", "true") \
.save(output_path)
# Show results
print("Word Count Results:")
word_counts.show()
finally:
# Clean up
spark.stop()
if __name__ == "__main__":
word_count_spark()
No comments:
Post a Comment