一、MapReduce编程模型详解
1.1 MapReduce核心思想
分而治之:将大规模数据处理任务分解为多个小任务,分布在集群中并行处理
Input Data → Split → Map → Shuffle → Reduce → Output
1.2 MapReduce执行流程
Input Format
↓
Input Split (逻辑分片)
↓
Map Phase (map()函数)
↓
Shuffle & Sort (洗牌排序)
↓
Reduce Phase (reduce()函数)
↓
Output Format
二、WordCount源码深度分析
2.1 经典WordCount程序
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
/**
* Mapper类:文本分词并输出<word, 1>
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// 使用StringTokenizer分词
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one); // 输出:<单词, 1>
}
}
}
/**
* Reducer类:统计相同单词的数量
*/
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
// 累加相同单词的出现次数
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result); // 输出:<单词, 总次数>
}
}
/**
* 驱动程序:配置和启动作业
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
// 设置Job属性
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 使用Reducer作为Combiner
job.setReducerClass(IntSumReducer.class);
// 设置输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业并等待完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.2 编译和打包
# 创建项目目录结构
mkdir -p wordcount/src/wordcount
mkdir wordcount/classes
mkdir wordcount/lib
# 将上述代码保存为 wordcount/src/wordcount/WordCount.java
# 编译Java程序
javac -classpath $(hadoop classpath) \
-d wordcount/classes \
wordcount/src/wordcount/WordCount.java
# 创建JAR包
jar -cvf wordcount.jar -C wordcount/classes .
# 准备测试数据
echo "Hello World Hello Hadoop" > input1.txt
echo "Hello MapReduce Goodbye Hadoop" > input2.txt
hdfs dfs -put input*.txt /user/hadoop/input/wordcount/
三、自定义Mapper/Reducer/Combiner实战
3.1 复杂数据处理的Mapper
/**
* 处理结构化数据的Mapper示例
* 输入:姓名,年龄,城市,薪资
* 输出:<城市, 薪资数据>
*/
public static class CitySalaryMapper
extends Mapper<Object, Text, Text, Text> {
private Text city = new Text();
private Text salaryInfo = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
if (fields.length == 4) {
String name = fields[0].trim();
int age = Integer.parseInt(fields[1].trim());
String cityName = fields[2].trim();
double salary = Double.parseDouble(fields[3].trim());
// 只处理年龄在18-65之间的记录
if (age >= 18 && age <= 65) {
city.set(cityName);
// 输出格式:年龄:薪资
salaryInfo.set(age + ":" + salary);
context.write(city, salaryInfo);
}
}
}
}
3.2 高级Reducer实现
/**
* 计算每个城市的平均薪资和年龄分布
*/
public static class CitySalaryReducer
extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
double totalSalary = 0;
int count = 0;
int under30 = 0, between30_50 = 0, over50 = 0;
double minSalary = Double.MAX_VALUE;
double maxSalary = Double.MIN_VALUE;
for (Text val : values) {
String[] parts = val.toString().split(":");
int age = Integer.parseInt(parts[0]);
double salary = Double.parseDouble(parts[1]);
totalSalary += salary;
count++;
// 年龄分布统计
if (age < 30) under30++;
else if (age <= 50) between30_50++;
else over50++;
// 薪资范围
if (salary < minSalary) minSalary = salary;
if (salary > maxSalary) maxSalary = salary;
}
double avgSalary = totalSalary / count;
// 输出格式:平均薪资|最小薪资|最大薪资|人数|<30岁人数|30-50岁人数|>50岁人数
String output = String.format("%.2f|%.2f|%.2f|%d|%d|%d|%d",
avgSalary, minSalary, maxSalary, count,
under30, between30_50, over50);
result.set(output);
context.write(key, result);
}
}
3.3 自定义Combiner优化
/**
* Combiner:在Map端进行局部聚合,减少网络传输
*/
public static class SalaryCombiner
extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
double totalSalary = 0;
int totalCount = 0;
StringBuilder ageSalaryBuilder = new StringBuilder();
for (Text val : values) {
String[] parts = val.toString().split(":");
double salary = Double.parseDouble(parts[1]);
totalSalary += salary;
totalCount++;
// 累积年龄薪资对,用分号分隔
if (ageSalaryBuilder.length() > 0) {
ageSalaryBuilder.append(";");
}
ageSalaryBuilder.append(val.toString());
}
// 输出局部聚合结果:总薪资:总数:详细数据
Text output = new Text(totalSalary + ":" + totalCount + ":" + ageSalaryBuilder.toString());
context.write(key, output);
}
}
四、序列化与InputFormat/OutputFormat
4.1 自定义Writable数据类型
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* 自定义薪资统计数据类型
*/
public class SalaryWritable implements Writable {
private double totalSalary;
private int count;
private double minSalary;
private double maxSalary;
// 默认构造函数
public SalaryWritable() {
this.totalSalary = 0;
this.count = 0;
this.minSalary = Double.MAX_VALUE;
this.maxSalary = Double.MIN_VALUE;
}
public SalaryWritable(double salary) {
this.totalSalary = salary;
this.count = 1;
this.minSalary = salary;
this.maxSalary = salary;
}
// 合并两个SalaryWritable
public void merge(SalaryWritable other) {
this.totalSalary += other.totalSalary;
this.count += other.count;
this.minSalary = Math.min(this.minSalary, other.minSalary);
this.maxSalary = Math.max(this.maxSalary, other.maxSalary);
}
public double getAverageSalary() {
return count == 0 ? 0 : totalSalary / count;
}
// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(totalSalary);
out.writeInt(count);
out.writeDouble(minSalary);
out.writeDouble(maxSalary);
}
// 反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
totalSalary = in.readDouble();
count = in.readInt();
minSalary = in.readDouble();
maxSalary = in.readDouble();
}
// Getter和Setter
public double getTotalSalary() { return totalSalary; }
public int getCount() { return count; }
public double getMinSalary() { return minSalary; }
public double getMaxSalary() { return maxSalary; }
@Override
public String toString() {
return String.format("%.2f,%.2f,%.2f,%d",
getAverageSalary(), minSalary, maxSalary, count);
}
}
4.2 使用自定义Writable的Mapper/Reducer
/**
* 使用自定义数据类型的Mapper
*/
public static class CustomSalaryMapper
extends Mapper<Object, Text, Text, SalaryWritable> {
private Text city = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
if (fields.length == 4) {
String cityName = fields[2].trim();
double salary = Double.parseDouble(fields[3].trim());
city.set(cityName);
context.write(city, new SalaryWritable(salary));
}
}
}
/**
* 使用自定义数据类型的Reducer
*/
public static class CustomSalaryReducer
extends Reducer<Text, SalaryWritable, Text, SalaryWritable> {
public void reduce(Text key, Iterable<SalaryWritable> values,
Context context
) throws IOException, InterruptedException {
SalaryWritable result = new SalaryWritable();
for (SalaryWritable val : values) {
result.merge(val);
}
context.write(key, result);
}
}
4.3 自定义InputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
/**
* 自定义JSON输入格式
*/
public class JsonInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new JsonRecordReader();
}
// 防止大文件被切分(JSON文件通常不适合切分)
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
}
/**
* JSON记录读取器
*/
public static class JsonRecordReader extends RecordReader<LongWritable, Text> {
private LineRecordReader lineReader;
private Text value;
public JsonRecordReader() {
lineReader = new LineRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
lineReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!lineReader.nextKeyValue()) {
return false;
}
// 这里可以添加JSON解析逻辑
value = lineReader.getCurrentValue();
return true;
}
@Override
public LongWritable getCurrentKey() {
return lineReader.getCurrentKey();
}
@Override
public Text getCurrentValue() {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineReader.getProgress();
}
@Override
public void close() throws IOException {
lineReader.close();
}
}
五、MapReduce性能调优技巧
5.1 资源配置优化
/**
* 优化配置的驱动程序
*/
public class OptimizedWordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// === 性能优化配置 ===
// 1. 设置Map任务内存
conf.set("mapreduce.map.memory.mb", "2048");
conf.set("mapreduce.map.java.opts", "-Xmx1638m");
// 2. 设置Reduce任务内存
conf.set("mapreduce.reduce.memory.mb", "4096");
conf.set("mapreduce.reduce.java.opts", "-Xmx3276m");
// 3. 设置任务并行度
conf.set("mapreduce.job.maps", "10"); // Map任务数
conf.set("mapreduce.job.reduces", "5"); // Reduce任务数
// 4. 启用压缩
conf.set("mapreduce.map.output.compress", "true");
conf.set("mapreduce.map.output.compress.codec",
"org.apache.hadoop.io.compress.SnappyCodec");
conf.set("mapreduce.output.fileoutputformat.compress", "true");
conf.set("mapreduce.output.fileoutputformat.compress.codec",
"org.apache.hadoop.io.compress.GzipCodec");
// 5. 调整Shuffle参数
conf.set("mapreduce.task.io.sort.mb", "100"); // 排序内存
conf.set("mapreduce.task.io.sort.factor", "50"); // 合并文件数
Job job = Job.getInstance(conf, "optimized word count");
// 作业配置
job.setJarByClass(OptimizedWordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 输入输出配置
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置输入格式(处理小文件优化)
job.setInputFormatClass(CombineTextInputFormat.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.2 数据倾斜解决方案
/**
* 处理数据倾斜的Mapper:使用采样和分区优化
*/
public static class SkewAwareMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private Random random = new Random();
// 热点key检测和分散
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String token = itr.nextToken();
// 对疑似热点key添加随机后缀
if (isHotKey(token)) {
String distributedKey = token + "_" + random.nextInt(10);
word.set(distributedKey);
} else {
word.set(token);
}
context.write(word, one);
}
}
private boolean isHotKey(String word) {
// 简单的热点key检测逻辑
return word.length() <= 3; // 假设短单词是热点
}
}
/**
* 处理数据倾斜的Reducer:二次聚合
*/
public static class SkewAwareReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 如果是有随机后缀的key,需要去除后缀并重新聚合
String keyStr = key.toString();
if (keyStr.contains("_")) {
String originalKey = keyStr.split("_")[0];
context.write(new Text(originalKey + "_partial"), new IntWritable(sum));
} else {
result.set(sum);
context.write(key, result);
}
}
}
/**
* 二次聚合的Reducer
*/
public static class SecondAggregationReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 处理部分聚合结果
String keyStr = key.toString();
if (keyStr.endsWith("_partial")) {
String originalKey = keyStr.replace("_partial", "");
context.write(new Text(originalKey), new IntWritable(sum));
} else {
result.set(sum);
context.write(key, result);
}
}
}
5.3 计数器使用
/**
* 使用计数器进行数据质量监控
*/
public static class CounterAwareMapper
extends Mapper<Object, Text, Text, IntWritable> {
// 定义计数器枚举
enum MyCounters {
TOTAL_WORDS,
SHORT_WORDS,
LONG_WORDS,
INVALID_RECORDS
}
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
// 跳过空行
if (line.trim().isEmpty()) {
context.getCounter(MyCounters.INVALID_RECORDS).increment(1);
return;
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
String token = itr.nextToken();
// 单词长度统计
if (token.length() < 3) {
context.getCounter(MyCounters.SHORT_WORDS).increment(1);
} else if (token.length() > 10) {
context.getCounter(MyCounters.LONG_WORDS).increment(1);
}
context.getCounter(MyCounters.TOTAL_WORDS).increment(1);
word.set(token);
context.write(word, one);
}
}
}
六、实战项目:网站访问日志分析
6.1 日志分析Mapper
/**
* 网站访问日志分析Mapper
* 日志格式:IP - - [时间] "请求方法 请求路径 协议" 状态码 响应大小 "来源URL" "用户代理"
*/
public static class LogAnalysisMapper
extends Mapper<Object, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss");
private SimpleDateFormat hourFormat = new SimpleDateFormat("yyyy-MM-dd HH");
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String logEntry = value.toString();
try {
// 解析日志行
LogRecord record = parseLogEntry(logEntry);
if (record != null) {
// 按小时统计访问量
String hourKey = hourFormat.format(record.getTimestamp());
outputKey.set("hour_" + hourKey);
outputValue.set("count:1");
context.write(outputKey, outputValue);
// 按IP统计
outputKey.set("ip_" + record.getIp());
outputValue.set("count:1");
context.write(outputKey, outputValue);
// 按状态码统计
outputKey.set("status_" + record.getStatusCode());
outputValue.set("count:1");
context.write(outputKey, outputValue);
// 按URL路径统计
if (record.getPath() != null && !record.getPath().isEmpty()) {
outputKey.set("path_" + record.getPath());
outputValue.set("count:1");
context.write(outputKey, outputValue);
}
}
} catch (Exception e) {
context.getCounter("LogAnalysis", "PARSE_ERRORS").increment(1);
}
}
private LogRecord parseLogEntry(String logEntry) throws ParseException {
// 简化的日志解析逻辑
String[] parts = logEntry.split(" ");
if (parts.length < 7) return null;
LogRecord record = new LogRecord();
record.setIp(parts[0]);
// 解析时间(简化处理)
String timeStr = logEntry.substring(logEntry.indexOf('[') + 1, logEntry.indexOf(']'));
record.setTimestamp(dateFormat.parse(timeStr));
// 解析请求行
String request = parts[5] + " " + parts[6];
if (request.startsWith("\"")) {
String[] requestParts = request.substring(1).split(" ");
if (requestParts.length >= 2) {
record.setMethod(requestParts[0]);
record.setPath(requestParts[1]);
}
}
// 状态码
record.setStatusCode(Integer.parseInt(parts[8]));
return record;
}
}
// 日志记录类
static class LogRecord {
private String ip;
private Date timestamp;
private String method;
private String path;
private int statusCode;
// getters and setters
public String getIp() { return ip; }
public void setIp(String ip) { this.ip = ip; }
public Date getTimestamp() { return timestamp; }
public void setTimestamp(Date timestamp) { this.timestamp = timestamp; }
public String getMethod() { return method; }
public void setMethod(String method) { this.method = method; }
public String getPath() { return path; }
public void setPath(String path) { this.path = path; }
public int getStatusCode() { return statusCode; }
public void setStatusCode(int statusCode) { this.statusCode = statusCode; }
}
6.2 运行和监控MapReduce作业
# 1. 准备测试数据
cat > weblog.txt << EOF
192.168.1.100 - - [25/Dec/2023:10:15:32 +0800] "GET /index.html HTTP/1.1" 200 1234 "-" "Mozilla/5.0"
192.168.1.101 - - [25/Dec/2023:10:16:45 +0800] "POST /login HTTP/1.1" 302 567 "-" "Mozilla/5.0"
192.168.1.102 - - [25/Dec/2023:10:17:12 +0800] "GET /products HTTP/1.1" 200 2345 "-" "Chrome/120.0"
192.168.1.100 - - [25/Dec/2023:11:20:30 +0800] "GET /contact HTTP/1.1" 200 890 "-" "Safari/16.0"
EOF
hdfs dfs -put weblog.txt /user/hadoop/input/logs/
# 2. 运行WordCount示例
hadoop jar wordcount.jar WordCount /user/hadoop/input/wordcount /user/hadoop/output/wordcount
# 3. 查看作业状态
mapred job -list
mapred job -status <job_id>
# 4. 监控作业进度
# 通过YARN Web UI: http://hadoop-master:8088
# 5. 查看输出结果
hdfs dfs -cat /user/hadoop/output/wordcount/part-r-00000 | head -20
# 6. 查看计数器
mapred job -counter <job_id> WordCount$MyCounters TOTAL_WORDS
6.3 性能测试脚本
#!/bin/bash
# mapreduce-benchmark.sh
# 生成测试数据
echo "生成测试数据..."
for i in {1..5}; do
shuf /usr/share/dict/words | head -10000 > test_data_$i.txt
done
hdfs dfs -put test_data_*.txt /user/hadoop/input/benchmark/
# 运行性能测试
echo "开始性能测试..."
time hadoop jar wordcount.jar WordCount \
/user/hadoop/input/benchmark \
/user/hadoop/output/benchmark
# 清理测试数据
hdfs dfs -rm -r /user/hadoop/input/benchmark
hdfs dfs -rm -r /user/hadoop/output/benchmark
echo "性能测试完成"
学习总结
通过本篇文章,你已经掌握了:
- ✅ MapReduce编程模型核心原理
- ✅ WordCount源码深度分析
- ✅ 自定义Mapper/Reducer/Combiner开发
- ✅ 序列化和InputFormat/OutputFormat
- ✅ 性能调优和数据处理技巧
- ✅ 实战项目:网站日志分析
关键知识点:
- Map阶段:数据分片、局部处理
- Shuffle阶段:分区、排序、Combiner优化
- Reduce阶段:全局聚合、结果输出
- 性能优化:内存配置、压缩、数据倾斜处理
实践建议:
# 动手练习
1. 实现WordCount程序并运行
2. 开发自定义的日志分析MapReduce作业
3. 尝试使用不同的InputFormat和OutputFormat
4. 实践性能调优配置