搭建Flink-1.11.2集群

把 Flink 运行在 YARN 上有两种方式,第一种方式是建立一个长期运行的 Flink YARN Session,然后向这个 Session 提交 Flink Job,多个任务同时运行时会共享资源。第二种方式是为单个任务启动一个 Flink 集群,这个任务会独占 Flink 集群的所有资源,任务结束即代表集群被回收。
另外,Flink on YARN 模式需要系统中设置了 YARN_CONF_DIR 或 HADOOP_CONF_DIR 环境变量,如果未设置,请在 ~/.profile 中加入以下内容,然后使用 source ~/.profile 命令使修改立即生效。

# 在这条命令前定义HADOOP_HOME环境变量
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

添加环境变量(Flink 1.11必需)
vim ~/.bash_profile

export HADOOP_CLASSPATH=`hadoop classpath`

使用下列命令来启动一个拥有 2 个 TaskManager 的 Flink 集群,每个 TaskManager 有 2 GB 内存,2 个 slot。
./bin/yarn-session.sh -n 2 -jm 1024m -tm 2048
启动 YARN Session 以后会输出 JobManager 的 Web Interface 地址.
仔细一看,Task Managers,Task Slots 怎么都是 0 呢?难道是哪里出了问题?其实并没有问题,从某个版本开始 Flink 允许动态分配资源,在没有任务的时候不分配 TaskManager。接下来我们就提交一个任务试试。
因为启动 YARN Session 以后 Flink Client 会一直在前台运行,所以先用 Ctrl + Z 快捷键把 Client 转到后台,然后再提交任务。

./bin/flink run ./examples/batch/WordCount.jar

下面这条命令会为 wordcount 任务启动一个独占的 Flink 集群,任务结束集群即被回收。其中 -m 选项指定 Flink 集群的启动模式,-yn 选项指定 TaskManager 的数目。

./bin/flink run -m YARN-cluster -yn 2 ./examples/batch/WordCount.jar

虚拟内存不够(https://stackoverflow.com/questions/21005643/container-is-running-beyond-memory-limits)

  • mapred-site.xml
<property>
        <name>mapreduce.map.memory.mb</name>
        <value>2048</value>
</property>

<property>
        <name>mapreduce.reduce.memory.mb</name>
        <value>2048</value>
</property>
<property>
        <name>mapreduce.reduce.java.opts</name>
        <value>-Xmx1638m</value>
</property>

<property>
        <name>mapreduce.map.java.opts</name>
        <value>-Xmx1638m</value>
</property>
  • yarn-site.xml
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>