一、Hive数据仓库搭建与HQL
1.1 Hive架构与安装
Hive架构概述
用户接口
↓
Hive驱动
↓
元数据存储 (Metastore)
↓
Hadoop集群 (MapReduce/Tez/Spark)
Hive安装配置
# 1. 下载并安装Hive
wget https://downloads.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
tar -xzf apache-hive-3.1.3-bin.tar.gz -C /usr/local/
cd /usr/local && ln -s apache-hive-3.1.3-bin hive
# 2. 配置环境变量
echo 'export HIVE_HOME=/usr/local/hive' >> /etc/profile
echo 'export PATH=$HIVE_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
# 3. 配置Hive
mkdir -p /usr/local/hive/warehouse
chmod -R 777 /usr/local/hive/warehouse
# 配置hive-site.xml
hive-site.xml配置
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- Hive元数据存储位置 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:;databaseName=/usr/local/hive/metastore_db;create=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.apache.derby.jdbc.EmbeddedDriver</value>
</property>
<!-- HDFS数据仓库目录 -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!-- 使用本地模式 -->
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
</property>
<!-- 显示当前数据库 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<!-- 显示表头 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
</configuration>
1.2 Hive数据模型与表类型
内部表 vs 外部表
-- 创建内部表(管理表)
CREATE TABLE IF NOT EXISTS students_managed (
student_id INT,
name STRING,
gender STRING,
subject STRING,
score INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
-- 创建外部表
CREATE EXTERNAL TABLE IF NOT EXISTS students_external (
student_id INT,
name STRING,
gender STRING,
subject STRING,
score INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/external/students';
-- 查看表详细信息
DESCRIBE FORMATTED students_managed;
分区表实战
-- 创建分区表(按学科分区)
CREATE TABLE students_partitioned (
student_id INT,
name STRING,
gender STRING,
score INT
)
PARTITIONED BY (subject STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
-- 加载数据到分区
LOAD DATA LOCAL INPATH '/home/hadoop/students_math.csv'
INTO TABLE students_partitioned
PARTITION (subject='math');
LOAD DATA LOCAL INPATH '/home/hadoop/students_english.csv'
INTO TABLE students_partitioned
PARTITION (subject='english');
-- 查看分区
SHOW PARTITIONS students_partitioned;
-- 分区查询(分区裁剪)
SELECT * FROM students_partitioned
WHERE subject = 'math' AND score > 90;
分桶表优化
-- 创建分桶表
CREATE TABLE students_bucketed (
student_id INT,
name STRING,
gender STRING,
subject STRING,
score INT
)
CLUSTERED BY (student_id) INTO 4 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
-- 启用分桶
SET hive.enforce.bucketing = true;
-- 从其他表插入数据到分桶表
INSERT OVERWRITE TABLE students_bucketed
SELECT student_id, name, gender, subject, score
FROM students_managed;
1.3 HQL高级查询实战
数据准备
-- 创建学生成绩表
CREATE TABLE student_scores (
student_id INT,
name STRING,
class STRING,
math_score INT,
english_score INT,
science_score INT,
exam_date STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
-- 插入示例数据
INSERT INTO student_scores VALUES
(1, '张三', 'ClassA', 85, 92, 78, '2023-09-01'),
(2, '李四', 'ClassA', 92, 88, 95, '2023-09-01'),
(3, '王五', 'ClassB', 78, 85, 82, '2023-09-01'),
(4, '赵六', 'ClassB', 88, 79, 91, '2023-09-01'),
(5, '钱七', 'ClassA', 95, 96, 89, '2023-09-01');
复杂查询示例
-- 1. 窗口函数分析
SELECT
name,
class,
math_score,
english_score,
science_score,
-- 计算总分和平均分
(math_score + english_score + science_score) as total_score,
ROUND((math_score + english_score + science_score) / 3.0, 2) as avg_score,
-- 班级内排名
RANK() OVER (PARTITION BY class ORDER BY (math_score + english_score + science_score) DESC) as class_rank,
-- 全校排名
RANK() OVER (ORDER BY (math_score + english_score + science_score) DESC) as school_rank,
-- 移动平均
AVG(math_score) OVER (PARTITION BY class ORDER BY student_id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as math_moving_avg
FROM student_scores;
-- 2. 多维度聚合
SELECT
class,
COUNT(*) as student_count,
ROUND(AVG(math_score), 2) as avg_math,
ROUND(AVG(english_score), 2) as avg_english,
ROUND(AVG(science_score), 2) as avg_science,
MAX(math_score) as max_math,
MIN(math_score) as min_math,
-- 优秀率(90分以上)
ROUND(SUM(CASE WHEN math_score >= 90 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as math_excellent_rate,
-- 及格率(60分以上)
ROUND(SUM(CASE WHEN math_score >= 60 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as math_pass_rate
FROM student_scores
GROUP BY class;
-- 3. 条件查询和CASE表达式
SELECT
name,
class,
math_score,
CASE
WHEN math_score >= 90 THEN '优秀'
WHEN math_score >= 80 THEN '良好'
WHEN math_score >= 70 THEN '中等'
WHEN math_score >= 60 THEN '及格'
ELSE '不及格'
END as math_grade,
CASE
WHEN math_score >= 90 AND english_score >= 90 AND science_score >= 90 THEN '全优生'
WHEN math_score < 60 OR english_score < 60 OR science_score < 60 THEN '需要辅导'
ELSE '普通生'
END as student_level
FROM student_scores;
高级Join操作
-- 创建附加表
CREATE TABLE student_info (
student_id INT,
birth_date STRING,
address STRING,
parent_phone STRING
);
CREATE TABLE class_info (
class STRING,
head_teacher STRING,
class_size INT
);
-- 多表连接查询
SELECT
s.name,
s.class,
s.math_score,
i.birth_date,
c.head_teacher,
-- 计算年龄(简化)
YEAR(CURRENT_DATE()) - YEAR(CAST(FROM_UNIXTIME(UNIX_TIMESTAMP(i.birth_date, 'yyyy-MM-dd')) AS TIMESTAMP)) as age
FROM student_scores s
JOIN student_info i ON s.student_id = i.student_id
JOIN class_info c ON s.class = c.class
WHERE s.math_score > 80;
1.4 Hive性能优化
配置优化参数
-- 1. 启用向量化查询
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;
-- 2. 使用Tez执行引擎
SET hive.execution.engine=tez;
-- 3. 启用CBO(成本优化器)
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
-- 4. 并行执行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=8;
-- 5. 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000;
SET hive.merge.smallfiles.avgsize=16000000;
执行计划分析
-- 查看查询执行计划
EXPLAIN
SELECT class, AVG(math_score) as avg_math
FROM student_scores
GROUP BY class;
-- 查看详细执行计划
EXPLAIN EXTENDED
SELECT class, AVG(math_score) as avg_math
FROM student_scores
GROUP BY class;
-- 查看依赖分析
EXPLAIN DEPENDENCY
SELECT class, AVG(math_score) as avg_math
FROM student_scores
GROUP BY class;
二、Sqoop数据导入导出实战
2.1 Sqoop架构与安装
Sqoop架构
RDBMS (MySQL/Oracle) ↔ Sqoop Client ↔ Hadoop (HDFS/Hive/HBase)
Sqoop安装配置
# 1. 下载安装Sqoop
wget https://archive.apache.org/dist/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
tar -xzf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /usr/local/
cd /usr/local && ln -s sqoop-1.4.7.bin__hadoop-2.6.0 sqoop
# 2. 配置环境变量
echo 'export SQOOP_HOME=/usr/local/sqoop' >> /etc/profile
echo 'export PATH=$SQOOP_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
# 3. 配置MySQL连接器
cp mysql-connector-java-8.0.28.jar $SQOOP_HOME/lib/
# 4. 配置sqoop-site.xml(可选)
2.2 MySQL到HDFS数据导入
准备MySQL测试数据
-- 创建测试数据库和表
CREATE DATABASE school;
USE school;
-- 学生表
CREATE TABLE students (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(50),
gender ENUM('Male', 'Female'),
age INT,
class VARCHAR(20),
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 成绩表
CREATE TABLE scores (
id INT PRIMARY KEY AUTO_INCREMENT,
student_id INT,
subject VARCHAR(20),
score INT,
exam_date DATE,
FOREIGN KEY (student_id) REFERENCES students(id)
);
-- 插入测试数据
INSERT INTO students (name, gender, age, class) VALUES
('张三', 'Male', 18, 'ClassA'),
('李四', 'Male', 17, 'ClassA'),
('王芳', 'Female', 18, 'ClassB'),
('赵敏', 'Female', 17, 'ClassB');
INSERT INTO scores (student_id, subject, score, exam_date) VALUES
(1, 'Math', 85, '2023-09-01'),
(1, 'English', 92, '2023-09-01'),
(2, 'Math', 78, '2023-09-01'),
(2, 'English', 88, '2023-09-01'),
(3, 'Math', 92, '2023-09-01'),
(3, 'English', 95, '2023-09-01'),
(4, 'Math', 88, '2023-09-01'),
(4, 'English', 79, '2023-09-01');
基础导入操作
# 1. 全表导入到HDFS
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table students \
--target-dir /user/hadoop/students_import \
--m 1
# 2. 指定列导入
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table students \
--columns "id,name,class" \
--target-dir /user/hadoop/students_basic \
--m 1
# 3. 使用查询导入
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--query 'SELECT s.name, s.class, sc.subject, sc.score FROM students s JOIN scores sc ON s.id = sc.student_id WHERE $CONDITIONS' \
--target-dir /user/hadoop/student_scores \
--split-by s.id \
--m 2
# 4. 增量导入(基于时间戳)
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table students \
--target-dir /user/hadoop/students_incremental \
--check-column created_time \
--incremental lastmodified \
--last-value "2023-01-01 00:00:00" \
--m 1
# 5. 增量导入(基于自增ID)
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table students \
--target-dir /user/hadoop/students_append \
--check-column id \
--incremental append \
--last-value 2 \
--m 1
导入到Hive
# 1. 直接导入到Hive
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table students \
--hive-import \
--hive-table students_hive \
--create-hive-table \
--m 1
# 2. 导入到Hive分区表
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--query 'SELECT id, name, gender, age, class FROM students WHERE $CONDITIONS' \
--split-by id \
--hive-import \
--hive-table students_partitioned \
--hive-partition-key class \
--hive-partition-value "ClassA" \
--m 2
# 3. 使用Hcatalog导入(推荐)
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table scores \
--hcatalog-database default \
--hcatalog-table scores_hive \
--m 2
2.3 HDFS到MySQL数据导出
导出操作
# 1. 基础导出
sqoop export \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table student_export \
--export-dir /user/hadoop/students_import \
--input-fields-terminated-by ',' \
--m 1
# 2. 更新导出(更新已存在记录)
sqoop export \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table students \
--export-dir /user/hadoop/students_update \
--update-key id \
--update-mode allowinsert \
--input-fields-terminated-by ',' \
--m 1
# 3. 批量导出
sqoop export \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password hadoop123 \
--table student_batch \
--export-dir /user/hadoop/students_batch \
--batch \
--input-fields-terminated-by ',' \
--m 2
导出数据准备
# 在HDFS准备导出数据
hdfs dfs -mkdir -p /user/hadoop/export_data
echo "5,孙武,Male,19,ClassC" | hdfs dfs -put - /user/hadoop/export_data/part-m-00000
echo "6,周瑜,Male,18,ClassC" | hdfs dfs -put - /user/hadoop/export_data/part-m-00001
# 在MySQL创建目标表
mysql -u root -p school -e "
CREATE TABLE student_export (
id INT,
name VARCHAR(50),
gender VARCHAR(10),
age INT,
class VARCHAR(20)
);
"
2.4 Sqoop高级特性
作业管理
# 1. 创建Sqoop作业(用于增量导入)
sqoop job \
--create daily_student_import \
-- import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password-file /user/hadoop/.password \
--table students \
--target-dir /user/hadoop/students_daily \
--incremental append \
--check-column id \
--last-value 0
# 2. 执行作业
sqoop job --exec daily_student_import
# 3. 查看作业
sqoop job --list
sqoop job --show daily_student_import
# 4. 删除作业
sqoop job --delete daily_student_import
密码安全管理
# 1. 使用密码文件
echo -n "hadoop123" > /user/hadoop/.password
hdfs dfs -put /user/hadoop/.password /user/hadoop/
hdfs dfs -chmod 400 /user/hadoop/.password
# 在Sqoop命令中使用密码文件
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password-file /user/hadoop/.password \
--table students \
--target-dir /user/hadoop/students_secure
# 2. 使用密码别名(需要配置Hadoop Credential Provider)
hadoop credential create mysql.password.alias -provider jceks://hdfs/user/hadoop/password.jceks -value hadoop123
sqoop import \
--connect jdbc:mysql://localhost:3306/school \
--username root \
--password-alias mysql.password.alias \
--table students \
--target-dir /user/hadoop/students_alias
三、Flume日志采集配置
3.1 Flume架构与安装
Flume架构组件
Source → Channel → Sink
↓ ↓ ↓
数据来源 临时存储 数据目的地
Flume安装配置
# 1. 下载安装Flume
wget https://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -xzf apache-flume-1.9.0-bin.tar.gz -C /usr/local/
cd /usr/local && ln -s apache-flume-1.9.0-bin flume
# 2. 配置环境变量
echo 'export FLUME_HOME=/usr/local/flume' >> /etc/profile
echo 'export PATH=$FLUME_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
# 3. 配置JAVA_HOME
echo 'export JAVA_HOME=/usr/local/jdk1.8.0_371' >> $FLUME_HOME/conf/flume-env.sh
3.2 基础数据采集配置
监控日志文件到HDFS
# 创建Flume配置文件:file_to_hdfs.conf
cat > $FLUME_HOME/conf/file_to_hdfs.conf << 'EOF'
# 定义Agent的组件
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# 配置Source(监控文件)
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/application.log
agent1.sources.source1.batchSize = 100
agent1.sources.source1.channels = channel1
# 配置Channel(文件通道)
agent1.channels.channel1.type = file
agent1.channels.channel1.checkpointDir = /data/flume/checkpoint
agent1.channels.channel1.dataDirs = /data/flume/data
agent1.channels.channel1.capacity = 100000
agent1.channels.channel1.transactionCapacity = 1000
# 配置Sink(HDFS Sink)
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://hadoop-master:9000/user/flume/logs/%Y-%m-%d
agent1.sinks.sink1.hdfs.filePrefix = application-
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.hdfs.rollInterval = 3600
agent1.sinks.sink1.hdfs.rollSize = 134217728
agent1.sinks.sink1.hdfs.rollCount = 0
agent1.sinks.sink1.hdfs.batchSize = 1000
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.channel = channel1
EOF
# 创建必要的目录
mkdir -p /data/flume/{checkpoint,data}
mkdir -p /var/log/
# 启动Flume Agent
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/file_to_hdfs.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
网络数据采集配置
# 创建网络采集配置:netcat_to_logger.conf
cat > $FLUME_HOME/conf/netcat_to_logger.conf << 'EOF'
# 定义Agent
agent2.sources = netcat-source
agent2.channels = memory-channel
agent2.sinks = logger-sink
# 配置NetCat Source
agent2.sources.netcat-source.type = netcat
agent2.sources.netcat-source.bind = 0.0.0.0
agent2.sources.netcat-source.port = 44444
agent2.sources.netcat-source.channels = memory-channel
# 配置Memory Channel
agent2.channels.memory-channel.type = memory
agent2.channels.memory-channel.capacity = 1000
agent2.channels.memory-channel.transactionCapacity = 100
# 配置Logger Sink
agent2.sinks.logger-sink.type = logger
agent2.sinks.logger-sink.channel = memory-channel
EOF
# 启动NetCat Agent
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/netcat_to_logger.conf \
--name agent2 \
-Dflume.root.logger=INFO,console &
# 测试网络采集
echo "Hello Flume" | nc localhost 44444
3.3 高级数据采集场景
多路复用采集配置
# 创建多路复用配置:multiplexing.conf
cat > $FLUME_HOME/conf/multiplexing.conf << 'EOF'
# Agent定义
agent3.sources = http-source
agent3.channels = mem-channel-1 mem-channel-2
agent3.sinks = hdfs-sink kafka-sink
# HTTP Source配置
agent3.sources.http-source.type = http
agent3.sources.http-source.port = 8000
agent3.sources.http-source.handler = org.apache.flume.source.http.JSONHandler
agent3.sources.http-source.channels = mem-channel-1 mem-channel-2
# 多路复用配置
agent3.sources.http-source.selector.type = multiplexing
agent3.sources.http-source.selector.header = type
agent3.sources.http-source.selector.mapping.log = mem-channel-1
agent3.sources.http-source.selector.mapping.metric = mem-channel-2
agent3.sources.http-source.selector.default = mem-channel-1
# Memory Channel 1(日志数据)
agent3.channels.mem-channel-1.type = memory
agent3.channels.mem-channel-1.capacity = 10000
agent3.channels.mem-channel-1.transactionCapacity = 1000
# Memory Channel 2(指标数据)
agent3.channels.mem-channel-2.type = memory
agent3.channels.mem-channel-2.capacity = 5000
agent3.channels.mem-channel-2.transactionCapacity = 500
# HDFS Sink(存储日志数据)
agent3.sinks.hdfs-sink.type = hdfs
agent3.sinks.hdfs-sink.hdfs.path = hdfs://hadoop-master:9000/user/flume/logs/%Y-%m-%d
agent3.sinks.hdfs-sink.hdfs.filePrefix = http-log
agent3.sinks.hdfs-sink.hdfs.rollInterval = 0
agent3.sinks.hdfs-sink.hdfs.rollSize = 268435456
agent3.sinks.hdfs-sink.hdfs.rollCount = 0
agent3.sinks.hdfs-sink.hdfs.batchSize = 1000
agent3.sinks.hdfs-sink.channel = mem-channel-1
# Kafka Sink(实时指标数据)
agent3.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent3.sinks.kafka-sink.kafka.topic = metrics
agent3.sinks.kafka-sink.kafka.bootstrap.servers = kafka-server:9092
agent3.sinks.kafka-sink.kafka.flumeBatchSize = 100
agent3.sinks.kafka-sink.kafka.producer.acks = 1
agent3.sinks.kafka-sink.channel = mem-channel-2
EOF
负载均衡和高可用配置
# 创建负载均衡配置:load_balancer.conf
cat > $FLUME_HOME/conf/load_balancer.conf << 'EOF'
agent4.sources = exec-source
agent4.channels = file-channel
agent4.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
# Exec Source
agent4.sources.exec-source.type = exec
agent4.sources.exec-source.command = tail -F /var/log/load_balanced.log
agent4.sources.exec-source.channels = file-channel
# File Channel
agent4.channels.file-channel.type = file
agent4.channels.file-channel.checkpointDir = /data/flume/lb_checkpoint
agent4.channels.file-channel.dataDirs = /data/flume/lb_data
# HDFS Sinks
agent4.sinks.hdfs-sink1.type = hdfs
agent4.sinks.hdfs-sink1.hdfs.path = hdfs://hadoop-master:9000/user/flume/lb/logs1
agent4.sinks.hdfs-sink1.hdfs.filePrefix = events-
agent4.sinks.hdfs-sink1.channel = file-channel
agent4.sinks.hdfs-sink2.type = hdfs
agent4.sinks.hdfs-sink2.hdfs.path = hdfs://hadoop-master:9000/user/flume/lb/logs2
agent4.sinks.hdfs-sink2.hdfs.filePrefix = events-
agent4.sinks.hdfs-sink2.channel = file-channel
agent4.sinks.hdfs-sink3.type = hdfs
agent4.sinks.hdfs-sink3.hdfs.path = hdfs://hadoop-master:9000/user/flume/lb/logs3
agent4.sinks.hdfs-sink3.hdfs.filePrefix = events-
agent4.sinks.hdfs-sink3.channel = file-channel
# 负载均衡配置
agent4.sinkgroups = sg1
agent4.sinkgroups.sg1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
agent4.sinkgroups.sg1.processor.type = load_balance
agent4.sinkgroups.sg1.processor.selector = round_robin
agent4.sinkgroups.sg1.processor.backoff = true
EOF
3.4 拦截器和数据转换
自定义拦截器配置
// 创建时间戳拦截器
package com.hadoop.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.ArrayList;
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {}
@Override
public Event intercept(Event event) {
// 添加时间戳头信息
event.getHeaders().put("timestamp", String.valueOf(System.currentTimeMillis()));
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> intercepted = new ArrayList<>();
for (Event event : events) {
Event interceptedEvent = intercept(event);
intercepted.add(interceptedEvent);
}
return intercepted;
}
@Override
public void close() {}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {}
}
}
使用拦截器的Flume配置
cat > $FLUME_HOME/conf/interceptor.conf << 'EOF'
agent5.sources = exec-source
agent5.channels = mem-channel
agent5.sinks = hdfs-sink
# Source with Interceptors
agent5.sources.exec-source.type = exec
agent5.sources.exec-source.command = tail -F /var/log/app.log
agent5.sources.exec-source.interceptors = i1 i2 i3
agent5.sources.exec-source.interceptors.i1.type = timestamp
agent5.sources.exec-source.interceptors.i2.type = host
agent5.sources.exec-source.interceptors.i2.useIP = false
agent5.sources.exec-source.interceptors.i2.hostHeader = hostname
agent5.sources.exec-source.interceptors.i3.type = com.hadoop.flume.interceptor.TimestampInterceptor$Builder
agent5.sources.exec-source.channels = mem-channel
# Channel
agent5.channels.mem-channel.type = memory
agent5.channels.mem-channel.capacity = 10000
# HDFS Sink with Headers
agent5.sinks.hdfs-sink.type = hdfs
agent5.sinks.hdfs-sink.hdfs.path = hdfs://hadoop-master:9000/user/flume/intercepted/%Y-%m-%d
agent5.sinks.hdfs-sink.hdfs.filePrefix = events-%{hostname}
agent5.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
agent5.sinks.hdfs-sink.channel = mem-channel
EOF
四、数据流管道设计实战
4.1 完整的数据管道架构
电商日志分析管道设计
Web Servers → Flume → Kafka → Spark Streaming → HDFS → Hive → Presto
↓ ↓ ↓ ↓ ↓ ↓
Nginx日志 实时采集 消息缓冲 实时处理 冷存储 即席查询
4.2 端到端数据管道实现
1. Flume到Kafka配置
cat > $FLUME_HOME/conf/flume_to_kafka.conf << 'EOF'
agent6.sources = exec-source
agent6.channels = mem-channel
agent6.sinks = kafka-sink
# Source配置
agent6.sources.exec-source.type = exec
agent6.sources.exec-source.command = tail -F /var/log/nginx/access.log
agent6.sources.exec-source.interceptors = i1
agent6.sources.exec-source.interceptors.i1.type = timestamp
agent6.sources.exec-source.channels = mem-channel
# Channel配置
agent6.channels.mem-channel.type = memory
agent6.channels.mem-channel.capacity = 10000
agent6.channels.mem-channel.transactionCapacity = 1000
# Kafka Sink配置
agent6.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent6.sinks.kafka-sink.kafka.topic = nginx-logs
agent6.sinks.kafka-sink.kafka.bootstrap.servers = kafka1:9092,kafka2:9092,kafka3:9092
agent6.sinks.kafka-sink.kafka.flumeBatchSize = 100
agent6.sinks.kafka-sink.kafka.producer.acks = 1
agent6.sinks.kafka-sink.kafka.producer.compression.type = snappy
agent6.sinks.kafka-sink.channel = mem-channel
EOF
2. Hive表结构设计
-- 创建电商日志ODS层表
CREATE TABLE ods_nginx_logs (
log_time TIMESTAMP,
client_ip STRING,
request_method STRING,
request_url STRING,
status_code INT,
response_size INT,
referer STRING,
user_agent STRING,
processing_time DOUBLE,
server_name STRING
)
PARTITIONED BY (dt STRING, hour STRING)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY');
-- 创建DWD层用户行为表
CREATE TABLE dwd_user_actions (
user_id STRING,
session_id STRING,
action_time TIMESTAMP,
page_url STRING,
action_type STRING,
product_id STRING,
category_id STRING,
stay_duration INT
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET;
-- 创建DWS层用户画像表
CREATE TABLE dws_user_profiles (
user_id STRING,
total_visits INT,
total_orders INT,
favorite_categories ARRAY<STRING>,
avg_order_value DOUBLE,
last_visit_date STRING,
user_segment STRING
)
STORED AS PARQUET;
3. 数据质量检查脚本
“`bash
!/bin/bash
data_quality_check.sh
echo “=== 数据质量检查报告 ===”
echo “检查时间: $(date)”
HDFS数据检查
echo -e “\n1. HDFS数据检查”
hdfs dfs