生产环境Hadoop集群运维与性能优化
一、硬件规划与容量管理
1.1 生产环境硬件规划
集群规模评估公式
#!/bin/bash
# cluster_capacity_planner.sh
# 输入参数
EXPECTED_DATA_SIZE_TB=1000 # 预期数据量(TB)
AVG_FILE_SIZE_MB=256 # 平均文件大小(MB)
REPLICATION_FACTOR=3 # 副本因子
DAILY_GROWTH_RATE=0.05 # 日增长率
RETENTION_DAYS=1095 # 数据保留天数(3年)
# 计算需求
echo "=== Hadoop集群容量规划 ==="
echo "预期数据量: $EXPECTED_DATA_SIZE_TB TB"
echo "平均文件大小: $AVG_FILE_SIZE_MB MB"
echo "副本因子: $REPLICATION_FACTOR"
echo "日增长率: $(echo "$DAILY_GROWTH_RATE * 100" | bc)%"
echo "数据保留期: $RETENTION_DAYS 天"
# 计算原始存储需求
RAW_STORAGE_TB=$(echo "scale=2; $EXPECTED_DATA_SIZE_TB * $REPLICATION_FACTOR" | bc)
echo "原始存储需求: $RAW_STORAGE_TB TB"
# 计算3年后的存储需求
FUTURE_STORAGE_TB=$(echo "scale=2; $RAW_STORAGE_TB * (1 + $DAILY_GROWTH_RATE)^($RETENTION_DAYS)" | bc)
echo "3年后存储需求: $FUTURE_STORAGE_TB TB"
# 计算DataNode数量(假设每节点60TB)
NODE_CAPACITY_TB=60
DATA_NODES=$(echo "scale=0; ($FUTURE_STORAGE_TB / $NODE_CAPACITY_TB) + 1" | bc)
echo "建议DataNode数量: $DATA_NODES"
# 计算NameNode内存需求(每百万块约1GB)
TOTAL_BLOCKS=$(echo "scale=0; $EXPECTED_DATA_SIZE_TB * 1024 * 1024 / $AVG_FILE_SIZE_MB" | bc)
NN_MEMORY_GB=$(echo "scale=0; $TOTAL_BLOCKS / 1000000 + 16" | bc)
echo "NameNode内存需求: $NN_MEMORY_GB GB"
echo "规划完成!"
节点硬件配置推荐
| 节点类型 | CPU | 内存 | 存储 | 网络 | 数量 |
|---|---|---|---|---|---|
| NameNode | 16核 | 64GB | 2×1TB SSD RAID1 | 10Gbps | 2(HA) |
| DataNode | 32核 | 128GB | 12×8TB HDD JBOD | 10Gbps | 按需 |
| ResourceManager | 16核 | 32GB | 2×500GB SSD | 10Gbps | 2(HA) |
1.2 存储规划与磁盘配置
多磁盘配置优化
#!/bin/bash
# disk_configuration.sh
# DataNode磁盘配置
DISK_PATHS=(
"/data01/hadoop"
"/data02/hadoop"
"/data03/hadoop"
"/data04/hadoop"
"/data05/hadoop"
"/data06/hadoop"
"/data07/hadoop"
"/data08/hadoop"
"/data09/hadoop"
"/data10/hadoop"
"/data11/hadoop"
"/data12/hadoop"
)
# 创建数据目录
for path in "${DISK_PATHS[@]}"; do
mkdir -p $path/datanode
chown -R hdfs:hadoop $path
chmod 755 $path
done
# 配置hdfs-site.xml多磁盘
echo "配置多磁盘DataNode:"
echo "<property>"
echo " <name>dfs.datanode.data.dir</name>"
echo " <value>"
for path in "${DISK_PATHS[@]}"; do
echo " file://$path/datanode,"
done
echo " </value>"
echo "</property>"
存储分层策略
<!-- hdfs-site.xml 存储策略配置 -->
<configuration>
<!-- 存储策略 -->
<property>
<name>dfs.storage.policy.enabled</name>
<value>true</value>
</property>
<!-- HOT存储策略 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>[SSD]file:///ssd_data/datanode,[DISK]file:///hdd_data/datanode</value>
</property>
<!-- 归档存储 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>[ARCHIVE]file:///archive_data/datanode</value>
</property>
</configuration>
二、性能监控指标分析
2.1 关键性能指标(KPI)
HDFS关键指标
#!/bin/bash
# hdfs_kpi_monitor.sh
echo "=== HDFS关键性能指标 ==="
echo "采集时间: $(date)"
# 1. 集群容量指标
echo -e "\n1. 集群容量:"
hdfs dfsadmin -report | grep -E "Configured Capacity|Present Capacity|DFS Used|Non DFS Used"
# 2. DataNode指标
echo -e "\n2. DataNode状态:"
LIVE_NODES=$(hdfs dfsadmin -report | grep "Live datanodes" | awk '{print $3}')
TOTAL_CAPACITY=$(hdfs dfsadmin -report | grep "Configured Capacity" | head -1 | awk '{print $3$4}')
USED_CAPACITY=$(hdfs dfsadmin -report | grep "DFS Used" | head -1 | awk '{print $3$4}')
USED_PERCENT=$(hdfs dfsadmin -report | grep "DFS Used%" | head -1 | awk '{print $3}')
echo "存活节点: $LIVE_NODES"
echo "总容量: $TOTAL_CAPACITY"
echo "已用容量: $USED_CAPACITY"
echo "使用率: $USED_PERCENT"
# 3. 块状态指标
echo -e "\n3. 块状态:"
TOTAL_BLOCKS=$(hdfs fsck / -blocks 2>/dev/null | grep "Total blocks" | grep -o '[0-9]*')
UNDER_REPLICATED=$(hdfs fsck / -blocks 2>/dev/null | grep "Under replicated" | awk '{print $3}')
MISSING_BLOCKS=$(hdfs fsck / -blocks 2>/dev/null | grep "Missing blocks" | awk '{print $3}')
CORRUPT_BLOCKS=$(hdfs fsck / -blocks 2>/dev/null | grep "CORRUPT" | awk '{print $2}')
echo "总块数: ${TOTAL_BLOCKS:-0}"
echo "副本不足块数: ${UNDER_REPLICATED:-0}"
echo "缺失块数: ${MISSING_BLOCKS:-0}"
echo "损坏块数: ${CORRUPT_BLOCKS:-0}"
# 4. RPC性能指标
echo -e "\n4. RPC性能:"
hdfs dfsadmin -printTopology 2>/dev/null | head -10
# 5. 生成健康评分
HEALTH_SCORE=100
if [ "${MISSING_BLOCKS:-0}" -gt 0 ]; then
HEALTH_SCORE=$((HEALTH_SCORE - 30))
fi
if [ "${UNDER_REPLICATED:-0}" -gt 100 ]; then
HEALTH_SCORE=$((HEALTH_SCORE - 20))
fi
if [ "${LIVE_NODES:-0}" -lt 3 ]; then
HEALTH_SCORE=$((HEALTH_SCORE - 10))
fi
echo -e "\n集群健康评分: $HEALTH_SCORE/100"
# 告警检查
if [ "$HEALTH_SCORE" -lt 80 ]; then
echo "警告: 集群健康状态不佳!"
fi
YARN关键指标
#!/usr/bin/env python3
# yarn_kpi_monitor.py
import requests
import json
import time
from datetime import datetime
class YARNKPIMonitor:
def __init__(self, rm_host="hadoop-master", rm_port=8088):
self.base_url = f"http://{rm_host}:{rm_port}/ws/v1/cluster"
def get_cluster_metrics(self):
"""获取集群级别指标"""
response = requests.get(f"{self.base_url}/metrics")
return response.json()['clusterMetrics']
def get_scheduler_metrics(self):
"""获取调度器指标"""
response = requests.get(f"{self.base_url}/scheduler")
return response.json()['scheduler']['schedulerInfo']
def calculate_kpis(self):
"""计算关键性能指标"""
metrics = self.get_cluster_metrics()
scheduler_info = self.get_scheduler_metrics()
kpis = {
'timestamp': datetime.now().isoformat(),
'resource_utilization': {},
'application_metrics': {},
'queue_metrics': {},
'health_score': 100
}
# 资源利用率
total_memory = metrics['totalMB']
used_memory = metrics['allocatedMB']
total_vcores = metrics['totalVirtualCores']
used_vcores = metrics['allocatedVirtualCores']
kpis['resource_utilization']['memory_usage_percent'] = round((used_memory / total_memory) * 100, 2)
kpis['resource_utilization']['vcore_usage_percent'] = round((used_vcores / total_vcores) * 100, 2)
kpis['resource_utilization']['active_nodes'] = metrics['activeNodes']
kpis['resource_utilization']['lost_nodes'] = metrics['lostNodes']
# 应用指标
kpis['application_metrics']['running_apps'] = metrics['appsRunning']
kpis['application_metrics']['pending_apps'] = metrics['appsPending']
kpis['application_metrics']['completed_apps'] = metrics['appsCompleted']
kpis['application_metrics']['failed_apps'] = metrics['appsFailed']
# 队列指标
if 'queues' in scheduler_info and 'queue' in scheduler_info['queues']:
for queue in scheduler_info['queues']['queue']:
queue_name = queue['queueName']
kpis['queue_metrics'][queue_name] = {
'used_capacity': queue.get('usedCapacity', 0),
'absolute_used_capacity': queue.get('absoluteUsedCapacity', 0),
'num_applications': queue.get('numApplications', 0)
}
# 健康评分计算
if kpis['resource_utilization']['lost_nodes'] > 0:
kpis['health_score'] -= 20
if kpis['application_metrics']['pending_apps'] > 10:
kpis['health_score'] -= 15
if kpis['resource_utilization']['memory_usage_percent'] > 90:
kpis['health_score'] -= 25
kpis['health_score'] = max(0, kpis['health_score'])
return kpis
def generate_report(self):
"""生成KPI报告"""
kpis = self.calculate_kpis()
print("=== YARN关键性能指标报告 ===")
print(f"报告时间: {kpis['timestamp']}")
print(f"\n1. 资源利用率:")
print(f" 内存使用率: {kpis['resource_utilization']['memory_usage_percent']}%")
print(f" vCore使用率: {kpis['resource_utilization']['vcore_usage_percent']}%")
print(f" 活跃节点: {kpis['resource_utilization']['active_nodes']}")
print(f" 丢失节点: {kpis['resource_utilization']['lost_nodes']}")
print(f"\n2. 应用状态:")
print(f" 运行中应用: {kpis['application_metrics']['running_apps']}")
print(f" 等待中应用: {kpis['application_metrics']['pending_apps']}")
print(f" 完成应用: {kpis['application_metrics']['completed_apps']}")
print(f" 失败应用: {kpis['application_metrics']['failed_apps']}")
print(f"\n3. 队列状态:")
for queue_name, metrics in kpis['queue_metrics'].items():
print(f" {queue_name}: {metrics['used_capacity']}% 使用率, {metrics['num_applications']} 个应用")
print(f"\n4. 健康评分: {kpis['health_score']}/100")
# 告警检查
if kpis['health_score'] < 80:
print("\n⚠️ 警告: 集群健康状态需要关注!")
if kpis['health_score'] < 60:
print("\n🚨 严重: 集群健康状态不佳,需要立即处理!")
if __name__ == "__main__":
monitor = YARNKPIMonitor()
monitor.generate_report()
2.2 实时监控仪表板
使用Prometheus + Grafana监控
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'hadoop-hdfs'
static_configs:
- targets: ['hadoop-master:9870']
metrics_path: '/jmx'
- job_name: 'hadoop-yarn'
static_configs:
- targets: ['hadoop-master:8088']
metrics_path: '/ws/v1/cluster/metrics'
- job_name: 'node-exporter'
static_configs:
- targets: ['hadoop-master:9100', 'hadoop-slave1:9100', 'hadoop-slave2:9100']
自定义指标导出器
#!/usr/bin/env python3
# hadoop_metrics_exporter.py
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import requests
import threading
import time
class HadoopMetricsCollector:
def __init__(self):
self.metrics = {}
def collect_hdfs_metrics(self):
"""收集HDFS指标"""
try:
# 通过HDFS JMX接口获取指标
response = requests.get("http://hadoop-master:9870/jmx")
data = response.json()
hdfs_metrics = {}
for bean in data['beans']:
if 'Hadoop:service=NameNode,name' in bean['name']:
for key, value in bean.items():
if key not in ['modelerType', 'name']:
hdfs_metrics[f"hdfs_{key}"] = value
self.metrics.update(hdfs_metrics)
except Exception as e:
print(f"HDFS指标收集失败: {e}")
def collect_yarn_metrics(self):
"""收集YARN指标"""
try:
response = requests.get("http://hadoop-master:8088/ws/v1/cluster/metrics")
data = response.json()['clusterMetrics']
yarn_metrics = {
'yarn_apps_pending': data['appsPending'],
'yarn_apps_running': data['appsRunning'],
'yarn_apps_completed': data['appsCompleted'],
'yarn_containers_allocated': data['containersAllocated'],
'yarn_memory_used_mb': data['allocatedMB'],
'yarn_memory_total_mb': data['totalMB'],
'yarn_vcores_used': data['allocatedVirtualCores'],
'yarn_vcores_total': data['totalVirtualCores']
}
self.metrics.update(yarn_metrics)
except Exception as e:
print(f"YARN指标收集失败: {e}")
def collect_all_metrics(self):
"""收集所有指标"""
while True:
self.collect_hdfs_metrics()
self.collect_yarn_metrics()
time.sleep(15)
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/metrics':
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
metrics_text = ""
for key, value in collector.metrics.items():
if isinstance(value, (int, float)):
metrics_text += f"hadoop_{key} {value}\n"
self.wfile.write(metrics_text.encode())
else:
self.send_response(404)
self.end_headers()
if __name__ == "__main__":
collector = HadoopMetricsCollector()
# 启动指标收集线程
collector_thread = threading.Thread(target=collector.collect_all_metrics)
collector_thread.daemon = True
collector_thread.start()
# 启动HTTP服务器
server = HTTPServer(('0.0.0.0', 8080), MetricsHandler)
print("Hadoop指标导出器启动在端口 8080")
server.serve_forever()
三、故障诊断与处理
3.1 常见故障排查手册
HDFS故障排查
#!/bin/bash
# hdfs_troubleshooting.sh
echo "=== HDFS故障排查工具 ==="
# 1. 检查服务状态
echo -e "\n1. 服务状态检查:"
jps | grep -E "NameNode|DataNode|JournalNode"
# 2. 检查网络连通性
echo -e "\n2. 网络连通性:"
ping -c 3 hadoop-master
ping -c 3 hadoop-slave1
# 3. 检查磁盘空间
echo -e "\n3. 磁盘空间检查:"
df -h | grep -E "/data|/ssd"
# 4. 检查HDFS状态
echo -e "\n4. HDFS状态:"
hdfs dfsadmin -report 2>/dev/null | head -20 || echo "HDFS服务不可用"
# 5. 检查安全模式
echo -e "\n5. 安全模式:"
hdfs dfsadmin -safemode get 2>/dev/null || echo "无法获取安全模式状态"
# 6. 检查日志文件
echo -e "\n6. 最近错误日志:"
tail -50 /var/log/hadoop-hdfs/hdfs-audit.log | grep -i "error\|exception\|failed" | tail -5
tail -50 /var/log/hadoop-hdfs/hadoop-hdfs-namenode-*.log | grep -i "error\|exception\|failed" | tail -5
# 7. 检查块状态
echo -e "\n7. 块状态检查:"
hdfs fsck / -files -blocks -locations 2>/dev/null | grep -i "corrupt\|missing" || echo "无法检查块状态"
# 8. 检查端口监听
echo -e "\n8. 端口监听状态:"
netstat -tlnp | grep -E "9870|9000|50070|50075"
# 9. 检查防火墙
echo -e "\n9. 防火墙状态:"
systemctl status firewalld 2>/dev/null | grep "Active:" || echo "防火墙服务未运行"
# 10. 生成诊断报告
echo -e "\n10. 诊断建议:"
if ! jps | grep -q "NameNode"; then
echo "❌ NameNode未运行,尝试: sudo -u hdfs hdfs --daemon start namenode"
fi
if ! jps | grep -q "DataNode"; then
echo "❌ DataNode未运行,尝试: sudo -u hdfs hdfs --daemon start datanode"
fi
if hdfs dfsadmin -safemode get 2>/dev/null | grep -q "ON"; then
echo "⚠️ HDFS处于安全模式,尝试: hdfs dfsadmin -safemode leave"
fi
echo -e "\n排查完成!"
YARN故障排查
#!/bin/bash
# yarn_troubleshooting.sh
echo "=== YARN故障排查工具 ==="
# 1. 检查服务状态
echo -e "\n1. 服务状态检查:"
jps | grep -E "ResourceManager|NodeManager"
# 2. 检查资源管理器状态
echo -e "\n2. ResourceManager状态:"
yarn rmadmin -getServiceState rm1 2>/dev/null || echo "ResourceManager不可用"
# 3. 检查节点状态
echo -e "\n3. NodeManager状态:"
yarn node -list 2>/dev/null | head -10 || echo "无法获取节点列表"
# 4. 检查应用状态
echo -e "\n4. 应用状态:"
yarn application -list 2>/dev/null | head -5 || echo "无法获取应用列表"
# 5. 检查资源使用
echo -e "\n5. 资源使用情况:"
yarn top 2>/dev/null | head -10 || echo "无法获取资源使用情况"
# 6. 检查日志
echo -e "\n6. 最近错误日志:"
tail -50 /var/log/hadoop-yarn/yarn-audit.log | grep -i "error\|exception\|failed" | tail -5
tail -50 /var/log/hadoop-yarn/yarn-resourcemanager-*.log | grep -i "error\|exception\|failed" | tail -5
# 7. 检查Web UI
echo -e "\n7. Web UI可访问性:"
curl -s http://hadoop-master:8088/ws/v1/cluster/info > /dev/null && echo "✅ ResourceManager Web UI可访问" || echo "❌ ResourceManager Web UI不可访问"
# 8. 诊断建议
echo -e "\n8. 诊断建议:"
if ! jps | grep -q "ResourceManager"; then
echo "❌ ResourceManager未运行,尝试: sudo -u yarn yarn --daemon start resourcemanager"
fi
if ! jps | grep -q "NodeManager"; then
echo "❌ NodeManager未运行,尝试: sudo -u yarn yarn --daemon start nodemanager"
fi
if yarn node -list 2>/dev/null | grep -q "NEW"; then
echo "⚠️ 有新节点未注册,检查NodeManager日志"
fi
echo -e "\n排查完成!"
3.2 性能瓶颈分析
MapReduce性能分析工具
#!/bin/bash
# mr_performance_analyzer.sh
JOB_ID=$1
if [ -z "$JOB_ID" ]; then
echo "使用方法: $0 <job_id>"
exit 1
fi
echo "=== MapReduce作业性能分析: $JOB_ID ==="
# 1. 获取作业详情
echo -e "\n1. 作业基本信息:"
mapred job -status $JOB_ID 2>/dev/null || echo "作业不存在或无法访问"
# 2. 分析计数器
echo -e "\n2. 关键计数器:"
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.TaskCounter" "MAP_INPUT_RECORDS" 2>/dev/null
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.TaskCounter" "REDUCE_INPUT_RECORDS" 2>/dev/null
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.FileSystemCounter" "HDFS_BYTES_READ" 2>/dev/null
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.FileSystemCounter" "HDFS_BYTES_WRITTEN" 2>/dev/null
# 3. 分析任务执行时间
echo -e "\n3. 任务执行时间分析:"
mapred job -history $JOB_ID 2>/dev/null | grep -A 10 "Task Summary" || echo "无法获取历史信息"
# 4. 数据倾斜分析
echo -e "\n4. 数据倾斜检查:"
MAP_RECORDS=$(mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.TaskCounter" "MAP_INPUT_RECORDS" 2>/dev/null | awk '{print $3}')
NUM_MAPS=$(mapred job -status $JOB_ID 2>/dev/null | grep "Number of maps" | awk '{print $4}')
if [ ! -z "$MAP_RECORDS" ] && [ ! -z "$NUM_MAPS" ] && [ "$NUM_MAPS" -gt 0 ]; then
AVG_RECORDS_PER_MAP=$((MAP_RECORDS / NUM_MAPS))
echo "平均每个Map处理记录数: $AVG_RECORDS_PER_MAP"
if [ $AVG_RECORDS_PER_MAP -gt 1000000 ]; then
echo "⚠️ 建议增加Map数量或调整输入分片大小"
fi
fi
# 5. 资源使用分析
echo -e "\n5. 资源使用分析:"
yarn application -status $JOB_ID 2>/dev/null | grep -E "Memory|VCores" || echo "无法获取资源使用信息"
# 6. 性能优化建议
echo -e "\n6. 性能优化建议:"
echo "• 检查输入数据是否均匀分布"
echo "• 确认Map和Reduce数量设置合理"
echo "• 检查是否有数据倾斜"
echo "• 确认是否启用了Combiner"
echo "• 检查Shuffle阶段性能"
echo -e "\n分析完成!"
四、版本升级与迁移
4.1 滚动升级策略
HDFS滚动升级流程
#!/bin/bash
# hdfs_rolling_upgrade.sh
set -e
echo "=== HDFS滚动升级开始 ==="
echo "当前版本: $(hdfs version | head -1)"
echo "目标版本: Hadoop 3.3.4"
# 1. 准备工作
echo -e "\n1. 准备工作..."
hdfs dfsadmin -safemode enter
hdfs dfsadmin -saveNamespace
hdfs dfsadmin -safemode leave
# 备份配置和元数据
BACKUP_DIR="/backup/hadoop_upgrade_$(date +%Y%m%d)"
mkdir -p $BACKUP_DIR
cp -r $HADOOP_HOME/etc/hadoop $BACKUP_DIR/config_backup
# 2. 升级JournalNodes
echo -e "\n2. 升级JournalNodes..."
for journal_node in hadoop-master hadoop-slave1 hadoop-slave2; do
echo "升级 $journal_node JournalNode..."
ssh $journal_node "systemctl stop hadoop-journalnode"
# 在这里执行实际的软件包升级
ssh $journal_node "systemctl start hadoop-journalnode"
sleep 30
done
# 3. 升级NameNodes (Standby First)
echo -e "\n3. 升级Standby NameNode..."
ssh hadoop-slave1 "systemctl stop hadoop-namenode"
# 升级软件包
ssh hadoop-slave1 "systemctl start hadoop-namenode"
# 等待Standby NameNode同步
sleep 60
# 4. 故障转移到新版本
echo -e "\n4. 故障转移到新版本NameNode..."
hdfs haadmin -failover nn1 nn2
# 5. 升级原Active NameNode
echo -e "\n5. 升级原Active NameNode..."
systemctl stop hadoop-namenode
# 升级软件包
systemctl start hadoop-namenode
# 6. 升级DataNodes
echo -e "\n6. 升级DataNodes..."
for datanode in hadoop-master hadoop-slave1 hadoop-slave2; do
echo "升级 $datanode DataNode..."
ssh $datanode "systemctl stop hadoop-datanode"
# 升级软件包
ssh $datanode "systemctl start hadoop-datanode"
sleep 30
done
# 7. 最终检查
echo -e "\n7. 最终检查..."
hdfs dfsadmin -report | head -10
hdfs haadmin -getServiceState nn1
hdfs haadmin -getServiceState nn2
echo -e "\n✅ HDFS滚动升级完成!"
4.2 数据迁移策略
跨集群数据迁移
#!/bin/bash
# cross_cluster_migration.sh
SOURCE_CLUSTER="hdfs://old-cluster:8020"
TARGET_CLUSTER="hdfs://new-cluster:8020"
MIGRATION_LIST="/tmp/migration_paths.txt"
# 生成迁移路径列表
cat > $MIGRATION_LIST << 'EOF'
/user/production
/data/warehouse
/logs/application
/tmp/important
EOF
echo "=== 开始跨集群数据迁移 ==="
# 1. 预检查
echo -e "\n1. 预检查..."
while read path; do
if [ -n "$path" ]; then
echo "检查源路径: $SOURCE_CLUSTER$path"
hadoop fs -test -e $SOURCE_CLUSTER$path && echo "✅ 存在" || echo "❌ 不存在"
fi
done < $MIGRATION_LIST
# 2. 执行迁移
echo -e "\n2. 执行迁移..."
while read path; do
if [ -n "$path" ]; then
echo "迁移: $SOURCE_CLUSTER$path -> $TARGET_CLUSTER$path"
hadoop distcp \
-update \
-delete \
-m 100 \
-bandwidth 100 \
$SOURCE_CLUSTER$path \
$TARGET_CLUSTER$path
if [ $? -eq 0 ]; then
echo "✅ 迁移成功: $path"
else
echo "❌ 迁移失败: $path"
exit 1
fi
fi
done < $MIGRATION_LIST
# 3. 验证迁移
echo -e "\n3. 验证迁移..."
while read path; do
if [ -n "$path" ]; then
SOURCE_SIZE=$(hadoop fs -du -s $SOURCE_CLUSTER$path 2>/dev/null | awk '{print $1}' || echo 0)
TARGET_SIZE=$(hadoop fs -du -s $TARGET_CLUSTER$path 2>/dev/null | awk '{print $1}' || echo 0)
if [ "$SOURCE_SIZE" -eq "$TARGET_SIZE" ]; then
echo "✅ 验证通过: $path"
else
echo "❌ 验证失败: $path (源: $SOURCE_SIZE, 目标: $TARGET_SIZE)"
fi
fi
done < $MIGRATION_LIST
echo -e "\n🎉 数据迁移完成!"
五、最佳实践总结
5.1 运维检查清单
日常检查清单
#!/bin/bash
# daily_ops_checklist.sh
echo "=== Hadoop集群日常运维检查清单 ==="
echo "检查时间: $(date)"
CHECKLIST=(
"✅ HDFS服务状态"
"✅ YARN服务状态"
"✅ 集群容量监控"
"✅ 节点健康状态"
"✅ 块复制状态"
"✅ 安全模式状态"
"✅ 作业运行状态"
"✅ 日志错误检查"
"✅ 备份状态验证"
"✅ 监控系统状态"
)
FAILED_CHECKS=()
# 1. HDFS服务检查
echo -e "\n1. HDFS服务检查:"
jps | grep -q NameNode && echo "✅ NameNode运行正常" || { echo "❌ NameNode异常"; FAILED_CHECKS+=("NameNode"); }
jps | grep -q DataNode && echo "✅ DataNode运行正常" || { echo "❌ DataNode异常"; FAILED_CHECKS+=("DataNode"); }
# 2. YARN服务检查
echo -e "\n2. YARN服务检查:"
jps | grep -q ResourceManager && echo "✅ ResourceManager运行正常" || { echo "❌ ResourceManager异常"; FAILED_CHECKS+=("ResourceManager"); }
jps | grep -q NodeManager && echo "✅ NodeManager运行正常" || { echo "❌ NodeManager异常"; FAILED_CHECKS+=("NodeManager"); }
# 3. 集群容量检查
echo -e "\n3. 集群容量检查:"
USAGE_PERCENT=$(hdfs dfsadmin -report 2>/dev/null | grep "DFS Used%" | head -1 | awk '{print $3}' | sed 's/%//')
if [ -n "$USAGE_PERCENT" ]; then
if [ $(echo "$USAGE_PERCENT > 85" | bc) -eq 1 ]; then
echo "❌ 集群容量告警: ${USAGE_PERCENT}%"
FAILED_CHECKS+=("集群容量")
else
echo "✅ 集群容量正常: ${USAGE_PERCENT}%"
fi
fi
# 4. 块复制检查
echo -e "\n4. 块复制检查:"
UNDER_REPLICATED=$(hdfs fsck / -blocks 2>/dev/null | grep "Under replicated" | awk '{print $3}')
if [ -n "$UNDER_REPLICATED" ] && [ "$UNDER_REPLICATED" -gt 10 ]; then
echo "❌ 副本不足块数: $UNDER_REPLICATED"
FAILED_CHECKS+=("块复制")
else
echo "✅ 块复制状态正常"
fi
# 5. 生成报告
echo -e "\n=== 检查结果汇总 ==="
if [ ${#FAILED_CHECKS[@]} -eq 0 ]; then
echo "🎉 所有检查项通过,集群状态健康!"
else
echo "⚠️ 发现 ${#FAILED_CHECKS[@]} 个问题:"
printf '• %s\n' "${FAILED_CHECKS[@]}"
echo "请及时处理上述问题。"
fi
5.2 性能优化检查表
性能优化检查表
#!/bin/bash
# performance_optimization_checklist.sh
echo "=== Hadoop性能优化检查表 ==="
OPTIMIZATIONS=()
# 1. HDFS配置优化检查
echo -e "\n1. HDFS配置优化:"
if grep -q "dfs.blocksize.*134217728" $HADOOP_HOME/etc/hadoop/hdfs-site.xml 2>/dev/null; then
echo "✅ 块大小优化为128MB"
OPTIMIZATIONS+=("块大小优化")
else
echo "❌ 建议设置dfs.blocksize=134217728"
fi
# 2. YARN配置优化检查
echo -e "\n2. YARN配置优化:"
if grep -q "yarn.nodemanager.resource.memory-mb" $HADOOP_HOME/etc/hadoop/yarn-site.xml 2>/dev/null; then
echo "✅ NodeManager内存配置已设置"
OPTIMIZATIONS+=("内存配置")
else
echo "❌ 建议配置yarn.nodemanager.resource.memory-mb"
fi
# 3. MapReduce配置优化检查
echo -e "\n3. MapReduce配置优化:"
if grep -q "mapreduce.map.memory.mb" $HADOOP_HOME/etc/hadoop/mapred-site.xml 2>/dev/null; then
echo "✅ Map任务内存配置已设置"
OPTIMIZATIONS+=("Map内存配置")
else
echo "❌ 建议配置mapreduce.map.memory.mb"
fi
# 4. 压缩配置检查
echo -e "\n4. 压缩配置:"
if grep -q "mapreduce.map.output.compress.*true" $HADOOP_HOME/etc/hadoop/mapred-site.xml 2>/dev/null; then
echo "✅ Map输出压缩已启用"
OPTIMIZATIONS+=("压缩优化")
else
echo "❌ 建议启用mapreduce.map.output.compress"
fi
# 5. 生成优化报告
echo -e "\n=== 优化状态汇总 ==="
if [ ${#OPTIMIZATIONS[@]} -eq 0 ]; then
echo "📋 未发现已应用的优化配置,建议检查优化项。"
else
echo "✅ 已应用 ${#OPTIMIZATIONS[@]} 项优化:"
printf '• %s\n' "${OPTIMIZATIONS[@]}"
fi
echo -e "\n推荐优化项:"
echo "• 调整HDFS块大小为128MB或256MB"
echo "• 合理配置YARN容器内存和vCore"
echo "• 启用Map输出压缩"
echo "• 配置合理的Map和Reduce数量"
echo "• 使用Combiner减少网络传输"
echo "• 启用推测执行"
echo "• 配置合理的副本放置策略"
学习总结
通过本系列六篇文章,您已经完整掌握了Hadoop从入门到企业级运维的全套技能:
🎯 知识体系总结
第一篇:Hadoop完全分布式环境搭建与HDFS基础操作
- 集群规划与部署
- HDFS架构与操作
- 基础运维命令
第二篇:MapReduce原理深度解析与编程实战
- MapReduce编程模型
- 高级开发技巧
- 性能优化策略
第三篇:YARN架构详解与多租户资源管理
- 资源调度原理
- 多租户环境搭建
- 队列管理与监控
第四篇:Hadoop生态工具实战
- Hive数据仓库
- Sqoop数据迁移
- Flume日志采集
第五篇:Hadoop高级特性
- Kerberos安全认证
- HDFS高可用架构
- 集群监控与备份
第六篇:生产环境运维与优化(本篇)
- 容量规划与管理
- 性能监控与分析
- 故障诊断与处理
- 版本升级与迁移
- 最佳实践总结
🚀 职业发展路径
初级工程师 → 掌握1-3篇内容
中级工程师 → 掌握1-4篇内容
高级工程师 → 掌握全部6篇内容
架构师 → 深入理解并能够设计企业级方案
📚 持续学习建议
- 实践项目:搭建完整的数据平台,处理真实业务数据
- 源码研究:深入阅读Hadoop核心组件源码
- 社区参与:关注Hadoop社区,参与问题讨论和贡献
- 技术扩展:学习Spark、Flink等新一代计算框架
- 云原生:掌握Kubernetes和云平台上的Hadoop部署