hadoop大数据工具及其生态工具 - MapReduce编程:实战篇

MapReduce原理深度解析与编程实战

一、MapReduce编程模型详解

1.1 MapReduce核心思想

分而治之:将大规模数据处理任务分解为多个小任务,分布在集群中并行处理

Input Data → Split → Map → Shuffle → Reduce → Output

1.2 MapReduce执行流程

Input Format
    ↓
Input Split (逻辑分片)
    ↓
Map Phase (map()函数)
    ↓
Shuffle & Sort (洗牌排序)
    ↓  
Reduce Phase (reduce()函数)
    ↓
Output Format

二、WordCount源码深度分析

2.1 经典WordCount程序

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    /**
     * Mapper类:文本分词并输出<word, 1>
     */
    public static class TokenizerMapper 
         extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {

            // 使用StringTokenizer分词
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);  // 输出:<单词, 1>
            }
        }
    }

    /**
     * Reducer类:统计相同单词的数量
     */
    public static class IntSumReducer 
         extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, 
                           Context context
                           ) throws IOException, InterruptedException {

            int sum = 0;
            // 累加相同单词的出现次数
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);  // 输出:<单词, 总次数>
        }
    }

    /**
     * 驱动程序:配置和启动作业
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");

        // 设置Job属性
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);  // 使用Reducer作为Combiner
        job.setReducerClass(IntSumReducer.class);

        // 设置输入输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交作业并等待完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

2.2 编译和打包

# 创建项目目录结构
mkdir -p wordcount/src/wordcount
mkdir wordcount/classes
mkdir wordcount/lib

# 将上述代码保存为 wordcount/src/wordcount/WordCount.java

# 编译Java程序
javac -classpath $(hadoop classpath) \
      -d wordcount/classes \
      wordcount/src/wordcount/WordCount.java

# 创建JAR包
jar -cvf wordcount.jar -C wordcount/classes .

# 准备测试数据
echo "Hello World Hello Hadoop" > input1.txt
echo "Hello MapReduce Goodbye Hadoop" > input2.txt
hdfs dfs -put input*.txt /user/hadoop/input/wordcount/

三、自定义Mapper/Reducer/Combiner实战

3.1 复杂数据处理的Mapper

/**
 * 处理结构化数据的Mapper示例
 * 输入:姓名,年龄,城市,薪资
 * 输出:<城市, 薪资数据>
 */
public static class CitySalaryMapper 
     extends Mapper<Object, Text, Text, Text> {

    private Text city = new Text();
    private Text salaryInfo = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

        String line = value.toString();
        String[] fields = line.split(",");

        if (fields.length == 4) {
            String name = fields[0].trim();
            int age = Integer.parseInt(fields[1].trim());
            String cityName = fields[2].trim();
            double salary = Double.parseDouble(fields[3].trim());

            // 只处理年龄在18-65之间的记录
            if (age >= 18 && age <= 65) {
                city.set(cityName);
                // 输出格式:年龄:薪资
                salaryInfo.set(age + ":" + salary);
                context.write(city, salaryInfo);
            }
        }
    }
}

3.2 高级Reducer实现

/**
 * 计算每个城市的平均薪资和年龄分布
 */
public static class CitySalaryReducer 
     extends Reducer<Text, Text, Text, Text> {

    private Text result = new Text();

    public void reduce(Text key, Iterable<Text> values, 
                       Context context
                       ) throws IOException, InterruptedException {

        double totalSalary = 0;
        int count = 0;
        int under30 = 0, between30_50 = 0, over50 = 0;
        double minSalary = Double.MAX_VALUE;
        double maxSalary = Double.MIN_VALUE;

        for (Text val : values) {
            String[] parts = val.toString().split(":");
            int age = Integer.parseInt(parts[0]);
            double salary = Double.parseDouble(parts[1]);

            totalSalary += salary;
            count++;

            // 年龄分布统计
            if (age < 30) under30++;
            else if (age <= 50) between30_50++;
            else over50++;

            // 薪资范围
            if (salary < minSalary) minSalary = salary;
            if (salary > maxSalary) maxSalary = salary;
        }

        double avgSalary = totalSalary / count;

        // 输出格式:平均薪资|最小薪资|最大薪资|人数|<30岁人数|30-50岁人数|>50岁人数
        String output = String.format("%.2f|%.2f|%.2f|%d|%d|%d|%d", 
            avgSalary, minSalary, maxSalary, count, 
            under30, between30_50, over50);

        result.set(output);
        context.write(key, result);
    }
}

3.3 自定义Combiner优化

/**
 * Combiner:在Map端进行局部聚合,减少网络传输
 */
public static class SalaryCombiner 
     extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, 
                       Context context
                       ) throws IOException, InterruptedException {

        double totalSalary = 0;
        int totalCount = 0;
        StringBuilder ageSalaryBuilder = new StringBuilder();

        for (Text val : values) {
            String[] parts = val.toString().split(":");
            double salary = Double.parseDouble(parts[1]);

            totalSalary += salary;
            totalCount++;

            // 累积年龄薪资对,用分号分隔
            if (ageSalaryBuilder.length() > 0) {
                ageSalaryBuilder.append(";");
            }
            ageSalaryBuilder.append(val.toString());
        }

        // 输出局部聚合结果:总薪资:总数:详细数据
        Text output = new Text(totalSalary + ":" + totalCount + ":" + ageSalaryBuilder.toString());
        context.write(key, output);
    }
}

四、序列化与InputFormat/OutputFormat

4.1 自定义Writable数据类型

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * 自定义薪资统计数据类型
 */
public class SalaryWritable implements Writable {
    private double totalSalary;
    private int count;
    private double minSalary;
    private double maxSalary;

    // 默认构造函数
    public SalaryWritable() {
        this.totalSalary = 0;
        this.count = 0;
        this.minSalary = Double.MAX_VALUE;
        this.maxSalary = Double.MIN_VALUE;
    }

    public SalaryWritable(double salary) {
        this.totalSalary = salary;
        this.count = 1;
        this.minSalary = salary;
        this.maxSalary = salary;
    }

    // 合并两个SalaryWritable
    public void merge(SalaryWritable other) {
        this.totalSalary += other.totalSalary;
        this.count += other.count;
        this.minSalary = Math.min(this.minSalary, other.minSalary);
        this.maxSalary = Math.max(this.maxSalary, other.maxSalary);
    }

    public double getAverageSalary() {
        return count == 0 ? 0 : totalSalary / count;
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeDouble(totalSalary);
        out.writeInt(count);
        out.writeDouble(minSalary);
        out.writeDouble(maxSalary);
    }

    // 反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
        totalSalary = in.readDouble();
        count = in.readInt();
        minSalary = in.readDouble();
        maxSalary = in.readDouble();
    }

    // Getter和Setter
    public double getTotalSalary() { return totalSalary; }
    public int getCount() { return count; }
    public double getMinSalary() { return minSalary; }
    public double getMaxSalary() { return maxSalary; }

    @Override
    public String toString() {
        return String.format("%.2f,%.2f,%.2f,%d", 
            getAverageSalary(), minSalary, maxSalary, count);
    }
}

4.2 使用自定义Writable的Mapper/Reducer

/**
 * 使用自定义数据类型的Mapper
 */
public static class CustomSalaryMapper 
     extends Mapper<Object, Text, Text, SalaryWritable> {

    private Text city = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

        String line = value.toString();
        String[] fields = line.split(",");

        if (fields.length == 4) {
            String cityName = fields[2].trim();
            double salary = Double.parseDouble(fields[3].trim());

            city.set(cityName);
            context.write(city, new SalaryWritable(salary));
        }
    }
}

/**
 * 使用自定义数据类型的Reducer
 */
public static class CustomSalaryReducer 
     extends Reducer<Text, SalaryWritable, Text, SalaryWritable> {

    public void reduce(Text key, Iterable<SalaryWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {

        SalaryWritable result = new SalaryWritable();

        for (SalaryWritable val : values) {
            result.merge(val);
        }

        context.write(key, result);
    }
}

4.3 自定义InputFormat

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

/**
 * 自定义JSON输入格式
 */
public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        return new JsonRecordReader();
    }

    // 防止大文件被切分(JSON文件通常不适合切分)
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
}

/**
 * JSON记录读取器
 */
public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
    private LineRecordReader lineReader;
    private Text value;

    public JsonRecordReader() {
        lineReader = new LineRecordReader();
    }

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) 
            throws IOException, InterruptedException {
        lineReader.initialize(split, context);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!lineReader.nextKeyValue()) {
            return false;
        }

        // 这里可以添加JSON解析逻辑
        value = lineReader.getCurrentValue();
        return true;
    }

    @Override
    public LongWritable getCurrentKey() {
        return lineReader.getCurrentKey();
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return lineReader.getProgress();
    }

    @Override
    public void close() throws IOException {
        lineReader.close();
    }
}

五、MapReduce性能调优技巧

5.1 资源配置优化

/**
 * 优化配置的驱动程序
 */
public class OptimizedWordCount {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        // === 性能优化配置 ===

        // 1. 设置Map任务内存
        conf.set("mapreduce.map.memory.mb", "2048");
        conf.set("mapreduce.map.java.opts", "-Xmx1638m");

        // 2. 设置Reduce任务内存  
        conf.set("mapreduce.reduce.memory.mb", "4096");
        conf.set("mapreduce.reduce.java.opts", "-Xmx3276m");

        // 3. 设置任务并行度
        conf.set("mapreduce.job.maps", "10");    // Map任务数
        conf.set("mapreduce.job.reduces", "5");  // Reduce任务数

        // 4. 启用压缩
        conf.set("mapreduce.map.output.compress", "true");
        conf.set("mapreduce.map.output.compress.codec", 
                 "org.apache.hadoop.io.compress.SnappyCodec");
        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec",
                 "org.apache.hadoop.io.compress.GzipCodec");

        // 5. 调整Shuffle参数
        conf.set("mapreduce.task.io.sort.mb", "100");  // 排序内存
        conf.set("mapreduce.task.io.sort.factor", "50"); // 合并文件数

        Job job = Job.getInstance(conf, "optimized word count");

        // 作业配置
        job.setJarByClass(OptimizedWordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 输入输出配置
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 设置输入格式(处理小文件优化)
        job.setInputFormatClass(CombineTextInputFormat.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

5.2 数据倾斜解决方案

/**
 * 处理数据倾斜的Mapper:使用采样和分区优化
 */
public static class SkewAwareMapper 
     extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private Random random = new Random();

    // 热点key检测和分散
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            String token = itr.nextToken();

            // 对疑似热点key添加随机后缀
            if (isHotKey(token)) {
                String distributedKey = token + "_" + random.nextInt(10);
                word.set(distributedKey);
            } else {
                word.set(token);
            }

            context.write(word, one);
        }
    }

    private boolean isHotKey(String word) {
        // 简单的热点key检测逻辑
        return word.length() <= 3; // 假设短单词是热点
    }
}

/**
 * 处理数据倾斜的Reducer:二次聚合
 */
public static class SkewAwareReducer 
     extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

        // 如果是有随机后缀的key,需要去除后缀并重新聚合
        String keyStr = key.toString();
        if (keyStr.contains("_")) {
            String originalKey = keyStr.split("_")[0];
            context.write(new Text(originalKey + "_partial"), new IntWritable(sum));
        } else {
            result.set(sum);
            context.write(key, result);
        }
    }
}

/**
 * 二次聚合的Reducer
 */
public static class SecondAggregationReducer 
     extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

        // 处理部分聚合结果
        String keyStr = key.toString();
        if (keyStr.endsWith("_partial")) {
            String originalKey = keyStr.replace("_partial", "");
            context.write(new Text(originalKey), new IntWritable(sum));
        } else {
            result.set(sum);
            context.write(key, result);
        }
    }
}

5.3 计数器使用

/**
 * 使用计数器进行数据质量监控
 */
public static class CounterAwareMapper 
     extends Mapper<Object, Text, Text, IntWritable> {

    // 定义计数器枚举
    enum MyCounters {
        TOTAL_WORDS,
        SHORT_WORDS,
        LONG_WORDS,
        INVALID_RECORDS
    }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

        String line = value.toString();

        // 跳过空行
        if (line.trim().isEmpty()) {
            context.getCounter(MyCounters.INVALID_RECORDS).increment(1);
            return;
        }

        StringTokenizer itr = new StringTokenizer(line);
        while (itr.hasMoreTokens()) {
            String token = itr.nextToken();

            // 单词长度统计
            if (token.length() < 3) {
                context.getCounter(MyCounters.SHORT_WORDS).increment(1);
            } else if (token.length() > 10) {
                context.getCounter(MyCounters.LONG_WORDS).increment(1);
            }

            context.getCounter(MyCounters.TOTAL_WORDS).increment(1);

            word.set(token);
            context.write(word, one);
        }
    }
}

六、实战项目:网站访问日志分析

6.1 日志分析Mapper

/**
 * 网站访问日志分析Mapper
 * 日志格式:IP - - [时间] "请求方法 请求路径 协议" 状态码 响应大小 "来源URL" "用户代理"
 */
public static class LogAnalysisMapper 
     extends Mapper<Object, Text, Text, Text> {

    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss");
    private SimpleDateFormat hourFormat = new SimpleDateFormat("yyyy-MM-dd HH");

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

        String logEntry = value.toString();

        try {
            // 解析日志行
            LogRecord record = parseLogEntry(logEntry);
            if (record != null) {
                // 按小时统计访问量
                String hourKey = hourFormat.format(record.getTimestamp());
                outputKey.set("hour_" + hourKey);
                outputValue.set("count:1");
                context.write(outputKey, outputValue);

                // 按IP统计
                outputKey.set("ip_" + record.getIp());
                outputValue.set("count:1");
                context.write(outputKey, outputValue);

                // 按状态码统计
                outputKey.set("status_" + record.getStatusCode());
                outputValue.set("count:1");
                context.write(outputKey, outputValue);

                // 按URL路径统计
                if (record.getPath() != null && !record.getPath().isEmpty()) {
                    outputKey.set("path_" + record.getPath());
                    outputValue.set("count:1");
                    context.write(outputKey, outputValue);
                }
            }
        } catch (Exception e) {
            context.getCounter("LogAnalysis", "PARSE_ERRORS").increment(1);
        }
    }

    private LogRecord parseLogEntry(String logEntry) throws ParseException {
        // 简化的日志解析逻辑
        String[] parts = logEntry.split(" ");
        if (parts.length < 7) return null;

        LogRecord record = new LogRecord();
        record.setIp(parts[0]);

        // 解析时间(简化处理)
        String timeStr = logEntry.substring(logEntry.indexOf('[') + 1, logEntry.indexOf(']'));
        record.setTimestamp(dateFormat.parse(timeStr));

        // 解析请求行
        String request = parts[5] + " " + parts[6];
        if (request.startsWith("\"")) {
            String[] requestParts = request.substring(1).split(" ");
            if (requestParts.length >= 2) {
                record.setMethod(requestParts[0]);
                record.setPath(requestParts[1]);
            }
        }

        // 状态码
        record.setStatusCode(Integer.parseInt(parts[8]));

        return record;
    }
}

// 日志记录类
static class LogRecord {
    private String ip;
    private Date timestamp;
    private String method;
    private String path;
    private int statusCode;

    // getters and setters
    public String getIp() { return ip; }
    public void setIp(String ip) { this.ip = ip; }
    public Date getTimestamp() { return timestamp; }
    public void setTimestamp(Date timestamp) { this.timestamp = timestamp; }
    public String getMethod() { return method; }
    public void setMethod(String method) { this.method = method; }
    public String getPath() { return path; }
    public void setPath(String path) { this.path = path; }
    public int getStatusCode() { return statusCode; }
    public void setStatusCode(int statusCode) { this.statusCode = statusCode; }
}

6.2 运行和监控MapReduce作业

# 1. 准备测试数据
cat > weblog.txt << EOF
192.168.1.100 - - [25/Dec/2023:10:15:32 +0800] "GET /index.html HTTP/1.1" 200 1234 "-" "Mozilla/5.0"
192.168.1.101 - - [25/Dec/2023:10:16:45 +0800] "POST /login HTTP/1.1" 302 567 "-" "Mozilla/5.0"
192.168.1.102 - - [25/Dec/2023:10:17:12 +0800] "GET /products HTTP/1.1" 200 2345 "-" "Chrome/120.0"
192.168.1.100 - - [25/Dec/2023:11:20:30 +0800] "GET /contact HTTP/1.1" 200 890 "-" "Safari/16.0"
EOF

hdfs dfs -put weblog.txt /user/hadoop/input/logs/

# 2. 运行WordCount示例
hadoop jar wordcount.jar WordCount /user/hadoop/input/wordcount /user/hadoop/output/wordcount

# 3. 查看作业状态
mapred job -list
mapred job -status <job_id>

# 4. 监控作业进度
# 通过YARN Web UI: http://hadoop-master:8088

# 5. 查看输出结果
hdfs dfs -cat /user/hadoop/output/wordcount/part-r-00000 | head -20

# 6. 查看计数器
mapred job -counter <job_id> WordCount$MyCounters TOTAL_WORDS

6.3 性能测试脚本

#!/bin/bash
# mapreduce-benchmark.sh

# 生成测试数据
echo "生成测试数据..."
for i in {1..5}; do
    shuf /usr/share/dict/words | head -10000 > test_data_$i.txt
done
hdfs dfs -put test_data_*.txt /user/hadoop/input/benchmark/

# 运行性能测试
echo "开始性能测试..."
time hadoop jar wordcount.jar WordCount \
    /user/hadoop/input/benchmark \
    /user/hadoop/output/benchmark

# 清理测试数据
hdfs dfs -rm -r /user/hadoop/input/benchmark
hdfs dfs -rm -r /user/hadoop/output/benchmark

echo "性能测试完成"

学习总结

通过本篇文章,你已经掌握了:

  • ✅ MapReduce编程模型核心原理
  • ✅ WordCount源码深度分析
  • ✅ 自定义Mapper/Reducer/Combiner开发
  • ✅ 序列化和InputFormat/OutputFormat
  • ✅ 性能调优和数据处理技巧
  • ✅ 实战项目:网站日志分析

关键知识点

  1. Map阶段:数据分片、局部处理
  2. Shuffle阶段:分区、排序、Combiner优化
  3. Reduce阶段:全局聚合、结果输出
  4. 性能优化:内存配置、压缩、数据倾斜处理

实践建议

# 动手练习
1. 实现WordCount程序并运行
2. 开发自定义的日志分析MapReduce作业  
3. 尝试使用不同的InputFormat和OutputFormat
4. 实践性能调优配置

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注