# 架构介绍

HostnameIP用户
controller (跳板机)222.200.180.102openstack
cpf-1 (虚拟机)10.10.3.183ubuntu
cpf-2 (虚拟机)10.10.1.96ubuntu
cpf-3 (虚拟机)10.10.3.222ubuntu
cpf-4 (虚拟机)10.10.0.176ubuntu

本文在内网环境中的 4 个节点上部署一套包含 Hadoop、Spark、Tez、Flink 的环境,文中提到的跳板机和节点如上所示。

最终搭建起的架构如图所示:(忽略其中的 zipkin, promethus, grafana 本文暂不讨论)

# Hadoop

安装 兼容版本的 Java,我这里使用 Ubuntu 24.02,直接使用 apt 安装 Java 8 :

sudo apt install openjdk-8-jre

通过 which java ,查看软链接,找到 Java 的实际安装目录,在我这里为 /usr/lib/jvm/java-8-openjdk-amd64

下载 Hadoop,我这里选择下载 hadoop-stable 的为 hadoop-3.4.1 。直接 wget 下载然后 tar -zxvf 解压。我下载时国内网速太慢,挂了梯子下载然后 scp 分发。

在所有节点上执行如下:

sudo tar xzf hadoop-3.4.1.tar.gz -C /opt
sudo ln -s /opt/hadoop-3.4.1/ /opt/hadoop
sudo chown -R ubuntu:ubuntu /opt/hadoop
sudo chown -R ubuntu:ubuntu /opt/hadoop-3.4.1
sudo mkdir data
sudo mkdir /data/hadoop
sudo chown -R ubuntu:ubuntu /data/hadoop
mkdir /data/hadoop/data
mkdir /data/hadoop/name
mkdir /data/hadoop/pid
mkdir /data/hadoop/tmp

然后,在 /opt/hadoop/etc/hadoop 目录下,编辑配置文件:

core-site.xml :

# Add these elements:
## Do not forget to change the tmp directory
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://{master}:9000</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/data/hadoop/tmp</value>
  </property>

hadoop-env.sh :

# Add these commands:
## Do not forget to change the PID directory
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_PID_DIR=/data/hadoop/pid

hdfs-site.xml :

# Add these elements:
## dfs.replication property specifies replication factor, you may want to change it. I prefer replication factor of two for small clusters.
## Do not forget to change the data directory
## Do not forget to change the name directory
  <property>
    <name>dfs.namenode.http-address</name>
    <value>{master}:9001</value>
  </property>
  <property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>{master}:9002</value>
  </property>
  <property>
    <name>dfs.webhdfs.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
  <property>
    <name>dfs.data.dir</name>
    <value>/data/hadoop/data</value>
  </property>
  <property>
    <name>dfs.name.dir</name>
    <value>/data/hadoop/name</value>
  </property>
  <property>
    <name>dfs.permission</name>
    <value>false</value>
  </property>

mapred-site.xml :

# Add these elements:
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
  <property>
    <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
  </property>
  <property>
    <name>mapred.job.tracker</name>
    <value>{master}:54311</value>
  </property>
# The host and port that the MapReduce job tracker runs at.  If "local", then jobs are run in-process as a single map and reduce task.

workers :

# Add the names of all workers
cpf-2
cpf-3
cpf-4

yarn-site.xml :

# Add these lines
## You may want to change the max-disk-utilization-per-disk-percentage.
## Notice that the value of "yarn.nodemanager.hostname" property is {worker}. We leave it for now, and will change it later
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  </property>
  <property>
    <name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
    <value>95</value>
  </property>
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>{master}</value>
  </property>
  <property>
    <name>yarn.nodemanager.hostname</name>
    <value>{worker}</value>
  </property>

通过 sudo vim /etc/profile.d/apache-01-hadoop.sh 编辑一个 Hadoop 相关环境变量的配置文件,这个文件命名是为了在后续加载 Spark、Tez 环境变量的配置文件之前完成加载,因为后者依赖于这里的一些环境变量。

# Java
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
# Haddop & YARN
export PDSH_RCMD_TYPE=ssh
export HADOOP_HOME=/opt/hadoop
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export HADOOP_MAPRED_HOME=/opt/hadoop
export HADOOP_COMMON_HOME=/opt/hadoop
export HADOOP_HDFS_HOME=/opt/hadoop
export HADOOP_YARN_HOME=/opt/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
export HDFS_NAMENODE_USER="ubuntu"
export HDFS_DATANODE_USER="ubuntu"
export HDFS_SECONDARYNAMENODE_USER="ubuntu"
export YARN_RESOURCEMANAGER_USER="ubuntu"
export YARN_NODEMANAGER_USER="ubuntu"
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native

然后就可以启动 hadoop 试试看了,常用操作如下:

# 格式化 (清除) hdfs
hadoop namenode -format
# 启动 hdfs 和 yarn 集群
/opt/hadoop/sbin/start-dfs.sh
/opt/hadoop/sbin/start-yarn.sh
# 停止 hdfs 和 yarn 集群
/opt/hadoop/sbin/stop-yarn.sh
/opt/hadoop/sbin/stop-dfs.sh

万一误操作导致 hdfs 文件系统爆了,在所有节点上执行如下脚本:

#!/bin/bash
rm -rf /data/hadoop/data
rm -rf /data/hadoop/name
rm -rf /data/hadoop/pid
rm -rf /data/hadoop/tmp
mkdir -p /data/hadoop/data
mkdir -p /data/hadoop/name
mkdir -p /data/hadoop/pid
mkdir -p /data/hadoop/tmp

# Spark

同样首先还是在所有节点上安装二进制程序到 /opt 下:

sudo tar xzf spark-3.4.4-bin-hadoop3.tgz -C /opt
sudo ln -s /opt/spark-3.4.4-bin-hadoop3/ /opt/spark
sudo chown -R ubuntu:ubuntu /opt/spark
sudo chown -R ubuntu:ubuntu /opt/spark-3.4.4-bin-hadoop3

然后将 hadoop 的相关配置文件复制到 spark 的配置目录下:

cp /opt/hadoop/etc/hadoop/core-site.xml /opt/spark/conf/
cp /opt/hadoop/etc/hadoop/hdfs-site.xml /opt/spark/conf/
cp /opt/hadoop/etc/hadoop/yarn-site.xml /opt/spark/conf/

然后,在 /opt/spark/conf 目录下,编辑配置文件:

workers :

# 写入 worker 节点 hostname (当然不一定要复制模板)
cpf-2
cpf-3
cpf-4

cp spark-env.sh.template spark-env.sh 后在 spark-env.sh 增加如下内容 :

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SPARK_HOME=/opt/spark
export SPARK_CONF_DIR=$SPARK_HOME/conf
export HADOOP_HOME=/opt/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_WORKER_DIR=/tmp/spark/work

cp log4j2.properties.template log4j2.properties 使用默认配置即可。

sudo vim /etc/profile.d/apache-02-spark.sh 配置 Spark 相关环境变量:

export SPARK_HOME=/opt/spark
export SPARK_CONF_DIR=$SPARK_HOME/conf
export PATH=$SPARK_HOME/bin:$PATH

最后上传 Spark 相关 jar 文件到 HDFS:

hdfs dfs -mkdir -p /user/spark/share/lib
hadoop fs -put /opt/spark/jars/* /user/spark/share/lib/

# Tez

sudo tar xzf apache-tez-0.10.5-bin.tar.gz -C /opt
sudo ln -s /opt/apache-tez-0.10.5-bin/ /opt/tez
sudo chown -R ubuntu:ubuntu /opt/tez
sudo chown -R ubuntu:ubuntu /opt/apache-tez-0.10.5-bin

将解压出来的 share/tez.tar.gz 上传到 hdfs 中:

hadoop fs -mkdir -p /apps/tez
hadoop fs -copyFromLocal /opt/tez/share/tez.tar.gz /apps/tez/

准备一个配置文件 tez-site.xml ,在 /opt/tez/conf 目录中将有自动生成的配置文件模板,所以我们的配置文件也放在这里。这里仅配置一项即可:

<configuration>
	<property>
		<name>tez.lib.uris</name>
		<value>${fs.defaultFS}/apps/tez/tez.tar.gz</value>
	</property>
</configuration>

sudo vim /etc/profile.d/apache-03-tez.sh 配置 Spark 相关环境变量:

export TEZ_HOME=/opt/tez
export TEZ_CONF_DIR=${TEZ_HOME}/conf
export TEZ_JARS=${TEZ_HOME}
export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:${TEZ_CONF_DIR}:${TEZ_JARS}/*:${TEZ_JARS}/lib/*

Flink on YARN 支持如下几种模式:

  • Session 模式,先将 Flink 集群作为一个作业提交给 YARN 来启动集群,然后再提交作业,且后续提交的作业共享 Flink 集群的资源。
    • Attached 模式,顾名思义前台运行;
    • Dettached 模式,顾名思义后台运行;
  • Application 模式,直接提交作业。提交后在 YARN 上启动一个 Flink 集群,应用程序 jar 的 main() 方法将在 YARN 中的 JobManager 上执行。应用程序一完成,集群就会关闭。

接下来部署一个 Dettached Session 模式的 Flink 集群,这将同时提供一个 Web UI 供提交监控 Flink 作业。

首先还是在所有节点上安装二进制程序到 /opt 下:

sudo tar xzf flink-2.0.0.tar.gz -C /opt
sudo ln -s /opt/flink-2.0.0/ /opt/flink
sudo chown -R ubuntu:ubuntu /opt/flink
sudo chown -R ubuntu:ubuntu /opt/flink-2.0.0

然后修改配置文件 /opt/flink/conf/config.yaml ,指定 Flink 版本所兼容的 Java 版本,例如这里 Flink 2.0.0 需要 Java 17+ 版本,我将其指定为 apt 安装的 Java 17 安装路径:

env:
  java:
    home: /usr/lib/jvm/java-17-openjdk-amd64
#
# ......
#
rest:
  port: xxxxx # 绑定 Web UI 到指定端口

上面配置修改了 rest.port 绑定 Web UI 端口,因为默认的 8081 端口已经被 Hadoop 和 YARN 占用了,避免每次启动都是随机端口。尽管如此,还是无法绑定到指定 address 上,因为提交 Flink 集群后,前端分配到哪个节点是不确定的,只能在启动后确定。

但是,我们的 Hadoop 和 YARN 依然依赖于 Java 8 所以当前环境变量 JAVA_HOME 指向的 Java 路径是 Java 8 的,并且 Flink 也依赖于这个环境变量!所以即使加了上面的配置启动集群时还是会有 Java 版本不兼容的报错。所以只好启动的时候指定一个高版本 Java 路径:

JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 ./bin/yarn-session.sh -d

然后会输出 Web UI 的访问方式,以及 Flink 集群的 application id 用于停止集群。当然,YARN 面板上随时可以查到这个 Flink 集群的 application id,所以并不一定需要记下它。

当然,停止集群的时候也要指定 Java 路径:

echo "stop" | JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 ./bin/yarn-session.sh -id <application_id>

但是接下来还有可能遇到一个问题导致提交的作业永远无法顺利执行,就是资源限制问题。和 Spark 和 Tez 不太一样,Flink 需要一个作业作为 Flink 集群,若我们使用同一个用户启动 Flink 集群,再提交其他非 Flink 作业时可能会发现作业卡住永远没有进展,是因为当前用户 AM 资源上限被 Flink 集群这个作业占完了。似乎能修改配置解决,有比较好的解决方案了后续再补上...

# 访问内网环境

~/.ssh/config 中添加如下配置:

Host hadoop-tunnel
  HostName 222.200.180.102
  User openstack
  # HDFS Web UI (namenode)
  LocalForward 9870 10.10.3.183:9870
  # HDFS Web UI (secondary namenode)
  LocalForward 9868 10.10.3.183:9868
  # YARN Web UI
  LocalForward 8088 10.10.3.183:8088
  LocalForward 8042 10.10.1.96:8042
  LocalForward 8043 10.10.3.222:8042
  LocalForward 8044 10.10.0.176:8042
  # zipkin Web UI
  LocalForward 9411 10.10.3.183:9411
  # JobHistory Web UI
  LocalForward 19888 10.10.3.183:19888
  # Promethues Web UI
  LocalForward 9090 10.10.3.183:9090
  # Node_Exporter Web UI
  LocalForward 9100 10.10.3.183:9100
  LocalForward 9200 10.10.1.96:9100
  LocalForward 9300 10.10.3.222:9100
  LocalForward 9400 10.10.0.176:9100
  # Grafana Web UI 监控可视化
  LocalForward 3000 10.10.3.183:3000

Host cpf-1
  HostName 10.10.3.183
  User ubuntu
  ProxyCommand ssh -W %h:%p hadoop-tunnel

Host cpf-2
  HostName 10.10.1.96
  User ubuntu
  ProxyCommand ssh -W %h:%p hadoop-tunnel

Host cpf-3
  HostName 10.10.3.222
  User ubuntu
  ProxyCommand ssh -W %h:%p hadoop-tunnel

Host cpf-4
  HostName 10.10.0.176
  User ubuntu
  ProxyCommand ssh -W %h:%p hadoop-tunnel

基于上述配置:

ssh -N hadoop-tunnel  # 会挂着转发全部 Web 应用
ssh hadoop-tunnel     # 登录跳板机,顺便转发全部 Web 应用
ssh cpf-1             # 登录虚拟机

# 提交任务

除了 Flink 都需要连接到虚拟机上提交。

# Spark

需要通过 spark-submit 来提交。

由于是 Spark on YARN 的部署方式,提交的任务中,SparkSession 需要增加一项配置 .config("spark.hadoop.fs.defaultFS", "hdfs://cpf-1:9000") 来指定使用 YARN 和 hadoop,并且在提交时指定 --master yarn --deploy-mode cluster

spark-submit
  --master yarn \
  --deploy-mode cluster \
  xxxApp

使用 Spot 追踪,假设追踪后端为 zipkin 并且部署在 http://cpf-1:9411,需要增加:

spark-submit
  --jars spot-complete-3.5_2.12-0.1.1.jar \
  --conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
  --conf spark.otel.traces.exporter=zipkin \
  --conf spark.otel.exporter.zipkin.endpoint=http://cpf-1:9411/api/v2/spans \
  xxxApp

这里的 spot-complete-3.5_2.12-x.x.x.jar 从 这个 fork 的 CI (build) 下载,因为原项目仓库依赖缺少 opentelemetry-exporter-zipkin 会导致用 zipkin 测试失败。

python 测试为例如下:

from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .config("spark.hadoop.fs.defaultFS", "hdfs://cpf-1:9000")\
        .appName("test01")\
        .getOrCreate()
def get_python_version(_):
  import sys
  print("Python version:", sys.version)
def get_packages(_):
  import pkg_resources
  print({pkg.key: pkg.version for pkg in pkg_resources.working_set})
# Driver 端的 Python 版本
get_python_version(114)
# Executor 端的 Python 版本
spark.sparkContext.parallelize([1], 1).map(get_python_version).collect()
# 列出所有 Driver 端的包名
get_packages(514)
# 列出所有 Executor 端的包名
spark.sparkContext.parallelize([1], 1).map(get_packages).collect()
spark.stop()

提交测试并追踪:

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --jars spot-complete-3.5_2.12-0.1.1.jar \
  --conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
  --conf spark.otel.traces.exporter=zipkin \
  --conf spark.otel.exporter.zipkin.endpoint=http://cpf-1:9411/api/v2/spans \
  test.py

可以直接通过 Web UI 提交,也可以登陆虚拟机通过指令提交,只是提交时必须指定 JAVA_HOME (默认的 JAVA_HOME 是给 hadoop 和 yarn 用的 java 8):

JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 \
  /opt/flink/bin/flink run \
  /opt/flink/examples/streaming/TopSpeedWindowing.jar

# Tez

虚拟机上通过指令按照普通的 MapReduce 任务提交即可,例如:

hadoop \
  jar tez-examples-0.10.5.jar \
  orderedwordcount \
  /tmp/test/wordcount-input \
  /tmp/test/wordcount-output