hadoop大数据工具及其生态工具 - Hadoop运维优化篇

生产环境Hadoop集群运维与性能优化

生产环境Hadoop集群运维与性能优化

一、硬件规划与容量管理

1.1 生产环境硬件规划

集群规模评估公式

#!/bin/bash
# cluster_capacity_planner.sh

# 输入参数
EXPECTED_DATA_SIZE_TB=1000  # 预期数据量(TB)
AVG_FILE_SIZE_MB=256        # 平均文件大小(MB)
REPLICATION_FACTOR=3        # 副本因子
DAILY_GROWTH_RATE=0.05      # 日增长率
RETENTION_DAYS=1095         # 数据保留天数(3年)

# 计算需求
echo "=== Hadoop集群容量规划 ==="
echo "预期数据量: $EXPECTED_DATA_SIZE_TB TB"
echo "平均文件大小: $AVG_FILE_SIZE_MB MB"
echo "副本因子: $REPLICATION_FACTOR"
echo "日增长率: $(echo "$DAILY_GROWTH_RATE * 100" | bc)%"
echo "数据保留期: $RETENTION_DAYS 天"

# 计算原始存储需求
RAW_STORAGE_TB=$(echo "scale=2; $EXPECTED_DATA_SIZE_TB * $REPLICATION_FACTOR" | bc)
echo "原始存储需求: $RAW_STORAGE_TB TB"

# 计算3年后的存储需求
FUTURE_STORAGE_TB=$(echo "scale=2; $RAW_STORAGE_TB * (1 + $DAILY_GROWTH_RATE)^($RETENTION_DAYS)" | bc)
echo "3年后存储需求: $FUTURE_STORAGE_TB TB"

# 计算DataNode数量(假设每节点60TB)
NODE_CAPACITY_TB=60
DATA_NODES=$(echo "scale=0; ($FUTURE_STORAGE_TB / $NODE_CAPACITY_TB) + 1" | bc)
echo "建议DataNode数量: $DATA_NODES"

# 计算NameNode内存需求(每百万块约1GB)
TOTAL_BLOCKS=$(echo "scale=0; $EXPECTED_DATA_SIZE_TB * 1024 * 1024 / $AVG_FILE_SIZE_MB" | bc)
NN_MEMORY_GB=$(echo "scale=0; $TOTAL_BLOCKS / 1000000 + 16" | bc)
echo "NameNode内存需求: $NN_MEMORY_GB GB"

echo "规划完成!"

节点硬件配置推荐

节点类型CPU内存存储网络数量
NameNode16核64GB2×1TB SSD RAID110Gbps2(HA)
DataNode32核128GB12×8TB HDD JBOD10Gbps按需
ResourceManager16核32GB2×500GB SSD10Gbps2(HA)

1.2 存储规划与磁盘配置

多磁盘配置优化

#!/bin/bash
# disk_configuration.sh

# DataNode磁盘配置
DISK_PATHS=(
  "/data01/hadoop"
  "/data02/hadoop" 
  "/data03/hadoop"
  "/data04/hadoop"
  "/data05/hadoop"
  "/data06/hadoop"
  "/data07/hadoop"
  "/data08/hadoop"
  "/data09/hadoop"
  "/data10/hadoop"
  "/data11/hadoop"
  "/data12/hadoop"
)

# 创建数据目录
for path in "${DISK_PATHS[@]}"; do
  mkdir -p $path/datanode
  chown -R hdfs:hadoop $path
  chmod 755 $path
done

# 配置hdfs-site.xml多磁盘
echo "配置多磁盘DataNode:"
echo "<property>"
echo "  <name>dfs.datanode.data.dir</name>"
echo "  <value>"
for path in "${DISK_PATHS[@]}"; do
  echo "    file://$path/datanode,"
done
echo "  </value>"
echo "</property>"

存储分层策略

<!-- hdfs-site.xml 存储策略配置 -->
<configuration>
  <!-- 存储策略 -->
  <property>
    <name>dfs.storage.policy.enabled</name>
    <value>true</value>
  </property>

  <!-- HOT存储策略 -->
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>[SSD]file:///ssd_data/datanode,[DISK]file:///hdd_data/datanode</value>
  </property>

  <!-- 归档存储 -->
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>[ARCHIVE]file:///archive_data/datanode</value>
  </property>
</configuration>

二、性能监控指标分析

2.1 关键性能指标(KPI)

HDFS关键指标

#!/bin/bash
# hdfs_kpi_monitor.sh

echo "=== HDFS关键性能指标 ==="
echo "采集时间: $(date)"

# 1. 集群容量指标
echo -e "\n1. 集群容量:"
hdfs dfsadmin -report | grep -E "Configured Capacity|Present Capacity|DFS Used|Non DFS Used"

# 2. DataNode指标
echo -e "\n2. DataNode状态:"
LIVE_NODES=$(hdfs dfsadmin -report | grep "Live datanodes" | awk '{print $3}')
TOTAL_CAPACITY=$(hdfs dfsadmin -report | grep "Configured Capacity" | head -1 | awk '{print $3$4}')
USED_CAPACITY=$(hdfs dfsadmin -report | grep "DFS Used" | head -1 | awk '{print $3$4}')
USED_PERCENT=$(hdfs dfsadmin -report | grep "DFS Used%" | head -1 | awk '{print $3}')

echo "存活节点: $LIVE_NODES"
echo "总容量: $TOTAL_CAPACITY"
echo "已用容量: $USED_CAPACITY"
echo "使用率: $USED_PERCENT"

# 3. 块状态指标
echo -e "\n3. 块状态:"
TOTAL_BLOCKS=$(hdfs fsck / -blocks 2>/dev/null | grep "Total blocks" | grep -o '[0-9]*')
UNDER_REPLICATED=$(hdfs fsck / -blocks 2>/dev/null | grep "Under replicated" | awk '{print $3}')
MISSING_BLOCKS=$(hdfs fsck / -blocks 2>/dev/null | grep "Missing blocks" | awk '{print $3}')
CORRUPT_BLOCKS=$(hdfs fsck / -blocks 2>/dev/null | grep "CORRUPT" | awk '{print $2}')

echo "总块数: ${TOTAL_BLOCKS:-0}"
echo "副本不足块数: ${UNDER_REPLICATED:-0}"
echo "缺失块数: ${MISSING_BLOCKS:-0}"
echo "损坏块数: ${CORRUPT_BLOCKS:-0}"

# 4. RPC性能指标
echo -e "\n4. RPC性能:"
hdfs dfsadmin -printTopology 2>/dev/null | head -10

# 5. 生成健康评分
HEALTH_SCORE=100

if [ "${MISSING_BLOCKS:-0}" -gt 0 ]; then
  HEALTH_SCORE=$((HEALTH_SCORE - 30))
fi

if [ "${UNDER_REPLICATED:-0}" -gt 100 ]; then
  HEALTH_SCORE=$((HEALTH_SCORE - 20))
fi

if [ "${LIVE_NODES:-0}" -lt 3 ]; then
  HEALTH_SCORE=$((HEALTH_SCORE - 10))
fi

echo -e "\n集群健康评分: $HEALTH_SCORE/100"

# 告警检查
if [ "$HEALTH_SCORE" -lt 80 ]; then
  echo "警告: 集群健康状态不佳!"
fi

YARN关键指标

#!/usr/bin/env python3
# yarn_kpi_monitor.py

import requests
import json
import time
from datetime import datetime

class YARNKPIMonitor:
    def __init__(self, rm_host="hadoop-master", rm_port=8088):
        self.base_url = f"http://{rm_host}:{rm_port}/ws/v1/cluster"

    def get_cluster_metrics(self):
        """获取集群级别指标"""
        response = requests.get(f"{self.base_url}/metrics")
        return response.json()['clusterMetrics']

    def get_scheduler_metrics(self):
        """获取调度器指标"""
        response = requests.get(f"{self.base_url}/scheduler")
        return response.json()['scheduler']['schedulerInfo']

    def calculate_kpis(self):
        """计算关键性能指标"""
        metrics = self.get_cluster_metrics()
        scheduler_info = self.get_scheduler_metrics()

        kpis = {
            'timestamp': datetime.now().isoformat(),
            'resource_utilization': {},
            'application_metrics': {},
            'queue_metrics': {},
            'health_score': 100
        }

        # 资源利用率
        total_memory = metrics['totalMB']
        used_memory = metrics['allocatedMB']
        total_vcores = metrics['totalVirtualCores']
        used_vcores = metrics['allocatedVirtualCores']

        kpis['resource_utilization']['memory_usage_percent'] = round((used_memory / total_memory) * 100, 2)
        kpis['resource_utilization']['vcore_usage_percent'] = round((used_vcores / total_vcores) * 100, 2)
        kpis['resource_utilization']['active_nodes'] = metrics['activeNodes']
        kpis['resource_utilization']['lost_nodes'] = metrics['lostNodes']

        # 应用指标
        kpis['application_metrics']['running_apps'] = metrics['appsRunning']
        kpis['application_metrics']['pending_apps'] = metrics['appsPending']
        kpis['application_metrics']['completed_apps'] = metrics['appsCompleted']
        kpis['application_metrics']['failed_apps'] = metrics['appsFailed']

        # 队列指标
        if 'queues' in scheduler_info and 'queue' in scheduler_info['queues']:
            for queue in scheduler_info['queues']['queue']:
                queue_name = queue['queueName']
                kpis['queue_metrics'][queue_name] = {
                    'used_capacity': queue.get('usedCapacity', 0),
                    'absolute_used_capacity': queue.get('absoluteUsedCapacity', 0),
                    'num_applications': queue.get('numApplications', 0)
                }

        # 健康评分计算
        if kpis['resource_utilization']['lost_nodes'] > 0:
            kpis['health_score'] -= 20

        if kpis['application_metrics']['pending_apps'] > 10:
            kpis['health_score'] -= 15

        if kpis['resource_utilization']['memory_usage_percent'] > 90:
            kpis['health_score'] -= 25

        kpis['health_score'] = max(0, kpis['health_score'])

        return kpis

    def generate_report(self):
        """生成KPI报告"""
        kpis = self.calculate_kpis()

        print("=== YARN关键性能指标报告 ===")
        print(f"报告时间: {kpis['timestamp']}")

        print(f"\n1. 资源利用率:")
        print(f"   内存使用率: {kpis['resource_utilization']['memory_usage_percent']}%")
        print(f"   vCore使用率: {kpis['resource_utilization']['vcore_usage_percent']}%")
        print(f"   活跃节点: {kpis['resource_utilization']['active_nodes']}")
        print(f"   丢失节点: {kpis['resource_utilization']['lost_nodes']}")

        print(f"\n2. 应用状态:")
        print(f"   运行中应用: {kpis['application_metrics']['running_apps']}")
        print(f"   等待中应用: {kpis['application_metrics']['pending_apps']}")
        print(f"   完成应用: {kpis['application_metrics']['completed_apps']}")
        print(f"   失败应用: {kpis['application_metrics']['failed_apps']}")

        print(f"\n3. 队列状态:")
        for queue_name, metrics in kpis['queue_metrics'].items():
            print(f"   {queue_name}: {metrics['used_capacity']}% 使用率, {metrics['num_applications']} 个应用")

        print(f"\n4. 健康评分: {kpis['health_score']}/100")

        # 告警检查
        if kpis['health_score'] < 80:
            print("\n⚠️  警告: 集群健康状态需要关注!")
        if kpis['health_score'] < 60:
            print("\n🚨 严重: 集群健康状态不佳,需要立即处理!")

if __name__ == "__main__":
    monitor = YARNKPIMonitor()
    monitor.generate_report()

2.2 实时监控仪表板

使用Prometheus + Grafana监控

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'hadoop-hdfs'
    static_configs:
      - targets: ['hadoop-master:9870']
    metrics_path: '/jmx'

  - job_name: 'hadoop-yarn'
    static_configs:
      - targets: ['hadoop-master:8088']
    metrics_path: '/ws/v1/cluster/metrics'

  - job_name: 'node-exporter'
    static_configs:
      - targets: ['hadoop-master:9100', 'hadoop-slave1:9100', 'hadoop-slave2:9100']

自定义指标导出器

#!/usr/bin/env python3
# hadoop_metrics_exporter.py

from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import requests
import threading
import time

class HadoopMetricsCollector:
    def __init__(self):
        self.metrics = {}

    def collect_hdfs_metrics(self):
        """收集HDFS指标"""
        try:
            # 通过HDFS JMX接口获取指标
            response = requests.get("http://hadoop-master:9870/jmx")
            data = response.json()

            hdfs_metrics = {}
            for bean in data['beans']:
                if 'Hadoop:service=NameNode,name' in bean['name']:
                    for key, value in bean.items():
                        if key not in ['modelerType', 'name']:
                            hdfs_metrics[f"hdfs_{key}"] = value

            self.metrics.update(hdfs_metrics)

        except Exception as e:
            print(f"HDFS指标收集失败: {e}")

    def collect_yarn_metrics(self):
        """收集YARN指标"""
        try:
            response = requests.get("http://hadoop-master:8088/ws/v1/cluster/metrics")
            data = response.json()['clusterMetrics']

            yarn_metrics = {
                'yarn_apps_pending': data['appsPending'],
                'yarn_apps_running': data['appsRunning'],
                'yarn_apps_completed': data['appsCompleted'],
                'yarn_containers_allocated': data['containersAllocated'],
                'yarn_memory_used_mb': data['allocatedMB'],
                'yarn_memory_total_mb': data['totalMB'],
                'yarn_vcores_used': data['allocatedVirtualCores'],
                'yarn_vcores_total': data['totalVirtualCores']
            }

            self.metrics.update(yarn_metrics)

        except Exception as e:
            print(f"YARN指标收集失败: {e}")

    def collect_all_metrics(self):
        """收集所有指标"""
        while True:
            self.collect_hdfs_metrics()
            self.collect_yarn_metrics()
            time.sleep(15)

class MetricsHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/metrics':
            self.send_response(200)
            self.send_header('Content-Type', 'text/plain')
            self.end_headers()

            metrics_text = ""
            for key, value in collector.metrics.items():
                if isinstance(value, (int, float)):
                    metrics_text += f"hadoop_{key} {value}\n"

            self.wfile.write(metrics_text.encode())
        else:
            self.send_response(404)
            self.end_headers()

if __name__ == "__main__":
    collector = HadoopMetricsCollector()

    # 启动指标收集线程
    collector_thread = threading.Thread(target=collector.collect_all_metrics)
    collector_thread.daemon = True
    collector_thread.start()

    # 启动HTTP服务器
    server = HTTPServer(('0.0.0.0', 8080), MetricsHandler)
    print("Hadoop指标导出器启动在端口 8080")
    server.serve_forever()

三、故障诊断与处理

3.1 常见故障排查手册

HDFS故障排查

#!/bin/bash
# hdfs_troubleshooting.sh

echo "=== HDFS故障排查工具 ==="

# 1. 检查服务状态
echo -e "\n1. 服务状态检查:"
jps | grep -E "NameNode|DataNode|JournalNode"

# 2. 检查网络连通性
echo -e "\n2. 网络连通性:"
ping -c 3 hadoop-master
ping -c 3 hadoop-slave1

# 3. 检查磁盘空间
echo -e "\n3. 磁盘空间检查:"
df -h | grep -E "/data|/ssd"

# 4. 检查HDFS状态
echo -e "\n4. HDFS状态:"
hdfs dfsadmin -report 2>/dev/null | head -20 || echo "HDFS服务不可用"

# 5. 检查安全模式
echo -e "\n5. 安全模式:"
hdfs dfsadmin -safemode get 2>/dev/null || echo "无法获取安全模式状态"

# 6. 检查日志文件
echo -e "\n6. 最近错误日志:"
tail -50 /var/log/hadoop-hdfs/hdfs-audit.log | grep -i "error\|exception\|failed" | tail -5
tail -50 /var/log/hadoop-hdfs/hadoop-hdfs-namenode-*.log | grep -i "error\|exception\|failed" | tail -5

# 7. 检查块状态
echo -e "\n7. 块状态检查:"
hdfs fsck / -files -blocks -locations 2>/dev/null | grep -i "corrupt\|missing" || echo "无法检查块状态"

# 8. 检查端口监听
echo -e "\n8. 端口监听状态:"
netstat -tlnp | grep -E "9870|9000|50070|50075"

# 9. 检查防火墙
echo -e "\n9. 防火墙状态:"
systemctl status firewalld 2>/dev/null | grep "Active:" || echo "防火墙服务未运行"

# 10. 生成诊断报告
echo -e "\n10. 诊断建议:"
if ! jps | grep -q "NameNode"; then
    echo "❌ NameNode未运行,尝试: sudo -u hdfs hdfs --daemon start namenode"
fi

if ! jps | grep -q "DataNode"; then
    echo "❌ DataNode未运行,尝试: sudo -u hdfs hdfs --daemon start datanode"
fi

if hdfs dfsadmin -safemode get 2>/dev/null | grep -q "ON"; then
    echo "⚠️  HDFS处于安全模式,尝试: hdfs dfsadmin -safemode leave"
fi

echo -e "\n排查完成!"

YARN故障排查

#!/bin/bash
# yarn_troubleshooting.sh

echo "=== YARN故障排查工具 ==="

# 1. 检查服务状态
echo -e "\n1. 服务状态检查:"
jps | grep -E "ResourceManager|NodeManager"

# 2. 检查资源管理器状态
echo -e "\n2. ResourceManager状态:"
yarn rmadmin -getServiceState rm1 2>/dev/null || echo "ResourceManager不可用"

# 3. 检查节点状态
echo -e "\n3. NodeManager状态:"
yarn node -list 2>/dev/null | head -10 || echo "无法获取节点列表"

# 4. 检查应用状态
echo -e "\n4. 应用状态:"
yarn application -list 2>/dev/null | head -5 || echo "无法获取应用列表"

# 5. 检查资源使用
echo -e "\n5. 资源使用情况:"
yarn top 2>/dev/null | head -10 || echo "无法获取资源使用情况"

# 6. 检查日志
echo -e "\n6. 最近错误日志:"
tail -50 /var/log/hadoop-yarn/yarn-audit.log | grep -i "error\|exception\|failed" | tail -5
tail -50 /var/log/hadoop-yarn/yarn-resourcemanager-*.log | grep -i "error\|exception\|failed" | tail -5

# 7. 检查Web UI
echo -e "\n7. Web UI可访问性:"
curl -s http://hadoop-master:8088/ws/v1/cluster/info > /dev/null && echo "✅ ResourceManager Web UI可访问" || echo "❌ ResourceManager Web UI不可访问"

# 8. 诊断建议
echo -e "\n8. 诊断建议:"
if ! jps | grep -q "ResourceManager"; then
    echo "❌ ResourceManager未运行,尝试: sudo -u yarn yarn --daemon start resourcemanager"
fi

if ! jps | grep -q "NodeManager"; then
    echo "❌ NodeManager未运行,尝试: sudo -u yarn yarn --daemon start nodemanager"
fi

if yarn node -list 2>/dev/null | grep -q "NEW"; then
    echo "⚠️  有新节点未注册,检查NodeManager日志"
fi

echo -e "\n排查完成!"

3.2 性能瓶颈分析

MapReduce性能分析工具

#!/bin/bash
# mr_performance_analyzer.sh

JOB_ID=$1

if [ -z "$JOB_ID" ]; then
    echo "使用方法: $0 <job_id>"
    exit 1
fi

echo "=== MapReduce作业性能分析: $JOB_ID ==="

# 1. 获取作业详情
echo -e "\n1. 作业基本信息:"
mapred job -status $JOB_ID 2>/dev/null || echo "作业不存在或无法访问"

# 2. 分析计数器
echo -e "\n2. 关键计数器:"
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.TaskCounter" "MAP_INPUT_RECORDS" 2>/dev/null
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.TaskCounter" "REDUCE_INPUT_RECORDS" 2>/dev/null
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.FileSystemCounter" "HDFS_BYTES_READ" 2>/dev/null
mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.FileSystemCounter" "HDFS_BYTES_WRITTEN" 2>/dev/null

# 3. 分析任务执行时间
echo -e "\n3. 任务执行时间分析:"
mapred job -history $JOB_ID 2>/dev/null | grep -A 10 "Task Summary" || echo "无法获取历史信息"

# 4. 数据倾斜分析
echo -e "\n4. 数据倾斜检查:"
MAP_RECORDS=$(mapred job -counter $JOB_ID "org.apache.hadoop.mapreduce.TaskCounter" "MAP_INPUT_RECORDS" 2>/dev/null | awk '{print $3}')
NUM_MAPS=$(mapred job -status $JOB_ID 2>/dev/null | grep "Number of maps" | awk '{print $4}')

if [ ! -z "$MAP_RECORDS" ] && [ ! -z "$NUM_MAPS" ] && [ "$NUM_MAPS" -gt 0 ]; then
    AVG_RECORDS_PER_MAP=$((MAP_RECORDS / NUM_MAPS))
    echo "平均每个Map处理记录数: $AVG_RECORDS_PER_MAP"

    if [ $AVG_RECORDS_PER_MAP -gt 1000000 ]; then
        echo "⚠️  建议增加Map数量或调整输入分片大小"
    fi
fi

# 5. 资源使用分析
echo -e "\n5. 资源使用分析:"
yarn application -status $JOB_ID 2>/dev/null | grep -E "Memory|VCores" || echo "无法获取资源使用信息"

# 6. 性能优化建议
echo -e "\n6. 性能优化建议:"
echo "• 检查输入数据是否均匀分布"
echo "• 确认Map和Reduce数量设置合理"
echo "• 检查是否有数据倾斜"
echo "• 确认是否启用了Combiner"
echo "• 检查Shuffle阶段性能"

echo -e "\n分析完成!"

四、版本升级与迁移

4.1 滚动升级策略

HDFS滚动升级流程

#!/bin/bash
# hdfs_rolling_upgrade.sh

set -e

echo "=== HDFS滚动升级开始 ==="
echo "当前版本: $(hdfs version | head -1)"
echo "目标版本: Hadoop 3.3.4"

# 1. 准备工作
echo -e "\n1. 准备工作..."
hdfs dfsadmin -safemode enter
hdfs dfsadmin -saveNamespace
hdfs dfsadmin -safemode leave

# 备份配置和元数据
BACKUP_DIR="/backup/hadoop_upgrade_$(date +%Y%m%d)"
mkdir -p $BACKUP_DIR
cp -r $HADOOP_HOME/etc/hadoop $BACKUP_DIR/config_backup

# 2. 升级JournalNodes
echo -e "\n2. 升级JournalNodes..."
for journal_node in hadoop-master hadoop-slave1 hadoop-slave2; do
    echo "升级 $journal_node JournalNode..."
    ssh $journal_node "systemctl stop hadoop-journalnode"
    # 在这里执行实际的软件包升级
    ssh $journal_node "systemctl start hadoop-journalnode"
    sleep 30
done

# 3. 升级NameNodes (Standby First)
echo -e "\n3. 升级Standby NameNode..."
ssh hadoop-slave1 "systemctl stop hadoop-namenode"
# 升级软件包
ssh hadoop-slave1 "systemctl start hadoop-namenode"

# 等待Standby NameNode同步
sleep 60

# 4. 故障转移到新版本
echo -e "\n4. 故障转移到新版本NameNode..."
hdfs haadmin -failover nn1 nn2

# 5. 升级原Active NameNode
echo -e "\n5. 升级原Active NameNode..."
systemctl stop hadoop-namenode
# 升级软件包
systemctl start hadoop-namenode

# 6. 升级DataNodes
echo -e "\n6. 升级DataNodes..."
for datanode in hadoop-master hadoop-slave1 hadoop-slave2; do
    echo "升级 $datanode DataNode..."
    ssh $datanode "systemctl stop hadoop-datanode"
    # 升级软件包
    ssh $datanode "systemctl start hadoop-datanode"
    sleep 30
done

# 7. 最终检查
echo -e "\n7. 最终检查..."
hdfs dfsadmin -report | head -10
hdfs haadmin -getServiceState nn1
hdfs haadmin -getServiceState nn2

echo -e "\n✅ HDFS滚动升级完成!"

4.2 数据迁移策略

跨集群数据迁移

#!/bin/bash
# cross_cluster_migration.sh

SOURCE_CLUSTER="hdfs://old-cluster:8020"
TARGET_CLUSTER="hdfs://new-cluster:8020"
MIGRATION_LIST="/tmp/migration_paths.txt"

# 生成迁移路径列表
cat > $MIGRATION_LIST << 'EOF'
/user/production
/data/warehouse
/logs/application
/tmp/important
EOF

echo "=== 开始跨集群数据迁移 ==="

# 1. 预检查
echo -e "\n1. 预检查..."
while read path; do
    if [ -n "$path" ]; then
        echo "检查源路径: $SOURCE_CLUSTER$path"
        hadoop fs -test -e $SOURCE_CLUSTER$path && echo "✅ 存在" || echo "❌ 不存在"
    fi
done < $MIGRATION_LIST

# 2. 执行迁移
echo -e "\n2. 执行迁移..."
while read path; do
    if [ -n "$path" ]; then
        echo "迁移: $SOURCE_CLUSTER$path -> $TARGET_CLUSTER$path"

        hadoop distcp \
            -update \
            -delete \
            -m 100 \
            -bandwidth 100 \
            $SOURCE_CLUSTER$path \
            $TARGET_CLUSTER$path

        if [ $? -eq 0 ]; then
            echo "✅ 迁移成功: $path"
        else
            echo "❌ 迁移失败: $path"
            exit 1
        fi
    fi
done < $MIGRATION_LIST

# 3. 验证迁移
echo -e "\n3. 验证迁移..."
while read path; do
    if [ -n "$path" ]; then
        SOURCE_SIZE=$(hadoop fs -du -s $SOURCE_CLUSTER$path 2>/dev/null | awk '{print $1}' || echo 0)
        TARGET_SIZE=$(hadoop fs -du -s $TARGET_CLUSTER$path 2>/dev/null | awk '{print $1}' || echo 0)

        if [ "$SOURCE_SIZE" -eq "$TARGET_SIZE" ]; then
            echo "✅ 验证通过: $path"
        else
            echo "❌ 验证失败: $path (源: $SOURCE_SIZE, 目标: $TARGET_SIZE)"
        fi
    fi
done < $MIGRATION_LIST

echo -e "\n🎉 数据迁移完成!"

五、最佳实践总结

5.1 运维检查清单

日常检查清单

#!/bin/bash
# daily_ops_checklist.sh

echo "=== Hadoop集群日常运维检查清单 ==="
echo "检查时间: $(date)"

CHECKLIST=(
    "✅ HDFS服务状态"
    "✅ YARN服务状态" 
    "✅ 集群容量监控"
    "✅ 节点健康状态"
    "✅ 块复制状态"
    "✅ 安全模式状态"
    "✅ 作业运行状态"
    "✅ 日志错误检查"
    "✅ 备份状态验证"
    "✅ 监控系统状态"
)

FAILED_CHECKS=()

# 1. HDFS服务检查
echo -e "\n1. HDFS服务检查:"
jps | grep -q NameNode && echo "✅ NameNode运行正常" || { echo "❌ NameNode异常"; FAILED_CHECKS+=("NameNode"); }
jps | grep -q DataNode && echo "✅ DataNode运行正常" || { echo "❌ DataNode异常"; FAILED_CHECKS+=("DataNode"); }

# 2. YARN服务检查
echo -e "\n2. YARN服务检查:"
jps | grep -q ResourceManager && echo "✅ ResourceManager运行正常" || { echo "❌ ResourceManager异常"; FAILED_CHECKS+=("ResourceManager"); }
jps | grep -q NodeManager && echo "✅ NodeManager运行正常" || { echo "❌ NodeManager异常"; FAILED_CHECKS+=("NodeManager"); }

# 3. 集群容量检查
echo -e "\n3. 集群容量检查:"
USAGE_PERCENT=$(hdfs dfsadmin -report 2>/dev/null | grep "DFS Used%" | head -1 | awk '{print $3}' | sed 's/%//')
if [ -n "$USAGE_PERCENT" ]; then
    if [ $(echo "$USAGE_PERCENT > 85" | bc) -eq 1 ]; then
        echo "❌ 集群容量告警: ${USAGE_PERCENT}%"
        FAILED_CHECKS+=("集群容量")
    else
        echo "✅ 集群容量正常: ${USAGE_PERCENT}%"
    fi
fi

# 4. 块复制检查
echo -e "\n4. 块复制检查:"
UNDER_REPLICATED=$(hdfs fsck / -blocks 2>/dev/null | grep "Under replicated" | awk '{print $3}')
if [ -n "$UNDER_REPLICATED" ] && [ "$UNDER_REPLICATED" -gt 10 ]; then
    echo "❌ 副本不足块数: $UNDER_REPLICATED"
    FAILED_CHECKS+=("块复制")
else
    echo "✅ 块复制状态正常"
fi

# 5. 生成报告
echo -e "\n=== 检查结果汇总 ==="
if [ ${#FAILED_CHECKS[@]} -eq 0 ]; then
    echo "🎉 所有检查项通过,集群状态健康!"
else
    echo "⚠️  发现 ${#FAILED_CHECKS[@]} 个问题:"
    printf '• %s\n' "${FAILED_CHECKS[@]}"
    echo "请及时处理上述问题。"
fi

5.2 性能优化检查表

性能优化检查表

#!/bin/bash
# performance_optimization_checklist.sh

echo "=== Hadoop性能优化检查表 ==="

OPTIMIZATIONS=()

# 1. HDFS配置优化检查
echo -e "\n1. HDFS配置优化:"
if grep -q "dfs.blocksize.*134217728" $HADOOP_HOME/etc/hadoop/hdfs-site.xml 2>/dev/null; then
    echo "✅ 块大小优化为128MB"
    OPTIMIZATIONS+=("块大小优化")
else
    echo "❌ 建议设置dfs.blocksize=134217728"
fi

# 2. YARN配置优化检查
echo -e "\n2. YARN配置优化:"
if grep -q "yarn.nodemanager.resource.memory-mb" $HADOOP_HOME/etc/hadoop/yarn-site.xml 2>/dev/null; then
    echo "✅ NodeManager内存配置已设置"
    OPTIMIZATIONS+=("内存配置")
else
    echo "❌ 建议配置yarn.nodemanager.resource.memory-mb"
fi

# 3. MapReduce配置优化检查
echo -e "\n3. MapReduce配置优化:"
if grep -q "mapreduce.map.memory.mb" $HADOOP_HOME/etc/hadoop/mapred-site.xml 2>/dev/null; then
    echo "✅ Map任务内存配置已设置"
    OPTIMIZATIONS+=("Map内存配置")
else
    echo "❌ 建议配置mapreduce.map.memory.mb"
fi

# 4. 压缩配置检查
echo -e "\n4. 压缩配置:"
if grep -q "mapreduce.map.output.compress.*true" $HADOOP_HOME/etc/hadoop/mapred-site.xml 2>/dev/null; then
    echo "✅ Map输出压缩已启用"
    OPTIMIZATIONS+=("压缩优化")
else
    echo "❌ 建议启用mapreduce.map.output.compress"
fi

# 5. 生成优化报告
echo -e "\n=== 优化状态汇总 ==="
if [ ${#OPTIMIZATIONS[@]} -eq 0 ]; then
    echo "📋 未发现已应用的优化配置,建议检查优化项。"
else
    echo "✅ 已应用 ${#OPTIMIZATIONS[@]} 项优化:"
    printf '• %s\n' "${OPTIMIZATIONS[@]}"
fi

echo -e "\n推荐优化项:"
echo "• 调整HDFS块大小为128MB或256MB"
echo "• 合理配置YARN容器内存和vCore"
echo "• 启用Map输出压缩"
echo "• 配置合理的Map和Reduce数量"
echo "• 使用Combiner减少网络传输"
echo "• 启用推测执行"
echo "• 配置合理的副本放置策略"

学习总结

通过本系列六篇文章,您已经完整掌握了Hadoop从入门到企业级运维的全套技能:

🎯 知识体系总结

第一篇:Hadoop完全分布式环境搭建与HDFS基础操作

  • 集群规划与部署
  • HDFS架构与操作
  • 基础运维命令

第二篇:MapReduce原理深度解析与编程实战

  • MapReduce编程模型
  • 高级开发技巧
  • 性能优化策略

第三篇:YARN架构详解与多租户资源管理

  • 资源调度原理
  • 多租户环境搭建
  • 队列管理与监控

第四篇:Hadoop生态工具实战

  • Hive数据仓库
  • Sqoop数据迁移
  • Flume日志采集

第五篇:Hadoop高级特性

  • Kerberos安全认证
  • HDFS高可用架构
  • 集群监控与备份

第六篇:生产环境运维与优化(本篇)

  • 容量规划与管理
  • 性能监控与分析
  • 故障诊断与处理
  • 版本升级与迁移
  • 最佳实践总结

🚀 职业发展路径

初级工程师 → 掌握1-3篇内容
中级工程师 → 掌握1-4篇内容
高级工程师 → 掌握全部6篇内容
架构师 → 深入理解并能够设计企业级方案

📚 持续学习建议

  1. 实践项目:搭建完整的数据平台,处理真实业务数据
  2. 源码研究:深入阅读Hadoop核心组件源码
  3. 社区参与:关注Hadoop社区,参与问题讨论和贡献
  4. 技术扩展:学习Spark、Flink等新一代计算框架
  5. 云原生:掌握Kubernetes和云平台上的Hadoop部署

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注