Flink 提供一个基于 Docker 的 Playground 给用户了解学习 Flink

https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/docker-playgrounds/flink-operations-playground.html

https://github.com/apache/flink-playgrounds

搭建环境

下载

git clone --branch release-1.10 https://github.com/apache/flink-playgrounds.git

查看 flink-playgrounds/operations-playground/docker-compose.yaml 的内容

version: "2.1"
services:
  client:
    build: ../docker/ops-playground-image
    image: apache/flink-ops-playground:1-FLINK-1.10-scala_2.11
    command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
    depends_on:
      - jobmanager
      - kafka
    volumes:
      - ./conf:/opt/flink/conf
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  clickevent-generator:
    image: apache/flink-ops-playground:1-FLINK-1.10-scala_2.11
    command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
    depends_on:
      - kafka
  jobmanager:
    image: flink:1.10.0-scala_2.11
    command: "jobmanager.sh start-foreground"
    ports:
      - 8081:8081
    volumes:
      - ./conf:/opt/flink/conf
      - flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink:1.10.0-scala_2.11
    depends_on:
      - jobmanager
    command: "taskmanager.sh start-foreground"
    volumes:
      - ./conf:/opt/flink/conf
      - flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_CREATE_TOPICS: "input:2:1, output:2:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    ports:
      - 9094:9094
volumes:
  flink-checkpoints-directory:

可以看到这个文件定义了 kafka、zookeeper、jobmanager、taskmanager、clickevent-generator、client 等多个容器,其中 client 是通过 dockerfile 生成的,而 clickevent-generator 是基于 client 生成的 image,其他的是直接下载 image

build client image 的时候用到 maven,最好换成国内的源
到 flink-playgrounds/docker/ops-playground-image 添加 settings.xml 文件,内容如下

<?xml version="1.0" encoding="UTF-8"?>

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <mirrors>
       <mirror>
          <id>aliyunmaven</id>
          <mirrorOf>central</mirrorOf>
          <name>aliyun maven</name>
          <url>https://maven.aliyun.com/repository/public</url>
       </mirror>
    </mirrors>
</settings>

再修改 flink-playgrounds/docker/ops-playground-image/Dockerfile 添加下面两行

COPY ./settings.xml /usr/share/maven/conf/settings.xml
COPY ./settings.xml /root/.m2/settings.xml

编译和启动服务

cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d

查看新的本地镜像

docker images

查看容器

docker-compose ps

可以看到 kafka、zookeeper、jobmanager、taskmanager 组成的集群在运行,clickevent-generator 在产生数据,而 client 已经成功提交了 job 并退出了

                    Name                                  Command               State                   Ports
-----------------------------------------------------------------------------------------------------------------------------
operations-playground_clickevent-generator_1   /docker-entrypoint.sh java ...   Up       6123/tcp, 8081/tcp
operations-playground_client_1                 /docker-entrypoint.sh flin ...   Exit 0
operations-playground_jobmanager_1             /docker-entrypoint.sh jobm ...   Up       6123/tcp, 0.0.0.0:8081->8081/tcp
operations-playground_kafka_1                  start-kafka.sh                   Up       0.0.0.0:9094->9094/tcp
operations-playground_taskmanager_1            /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp
operations-playground_zookeeper_1              /bin/sh -c /usr/sbin/sshd  ...   Up       2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

可以通过下面的命令退出所有容器

docker-compose down

也可以通过 docker 命令单独操作

UI 界面

登陆 http://localhost:8081 可以看 UI 界面
Flink : Docker Playground
可以看到有一个 Click Event Count 的 Job 正在运行

查看集群状态

通过下面的命令可以看到 log

docker-compose logs -f jobmanager
docker-compose logs -f taskmanager

登陆其中一个容器,看一下 flink 命令

root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/# flink -v
Version: 1.10.0, Commit ID: aa4eb8f
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/# flink help
"help" is not a valid action.

Valid actions are "run", "list", "info", "savepoint", "stop", or "cancel".

Specify the version option (-v or --version) to print Flink version.

Specify the help option (-h or --help) to get help on the command.
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/# flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
10.05.2020 11:19:18 : c763a6a422d9392da4e9a9678fb66287 : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
root@68b39518cf16:/#
root@68b39518cf16:/# flink --help
root@68b39518cf16:/#

通过 REST API 也可以查看 Job

curl localhost:8081/jobs

clickevent-generator 容器有一个一直运行的 java 程序
不断的往 kafka 的 input topic 发数据,可以查看发的信息

docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic input

client 容器通过 flink run 命令提交了一个一直在运行的 Click Event Count 的 flink 程序
不断从 input 读数据、统计、发往 output topic

docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic output

这几个容器都没有安装 ps 命令,自己装一个

apt-get update
apt-get install procps

如果命令太长 ps 可能不会全显示出来,可以先重定向到文件再看

ps -ef > temp.txt

登上 jobmanager 可以看到启动的 Java 程序是

org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

登上 taskmanager 可以看到启动的 Java 程序是

org.apache.flink.runtime.taskexecutor.TaskManagerRunner

generator 上同样有个 java 程序在生成数据发到 kafka

错误恢复机制

先连上 kafka 的 output topic

docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic output

然后 kill 掉 taskmanager

docker-compose kill taskmanager

可以看到 UI 上的 taskmanager 不见了(但还显示有 job 在 running,奇怪)
通过 flink list 命令可以看到 job 状态变成 RESTARTING
同时可以看到 output topic 没有数据输出了

实际上 JobManager 在发现 TaskManager 不可用后,就 cancel 了受影响的正在运行的 job,并立刻重新提交 Job,这个 Job 会一直等待直到 TaskManager 可用

重启 TaskManager

docker-compose up -d taskmanager

可以看到 TaskManager 和 Job 的状态都恢复了,output topic 也继续有输出了

实际上 Job 会从上次的 checkpoint 处恢复状态,并迅速处理积累的所有数据

升级和伸缩

flink stop 命令可以停止 Job 并保存 save point

lin@Ubuntu-VM-1:$ docker-compose run --no-deps client flink stop c2cbdb0bca115d2375706b2c95d8a323
Suspending job "c2cbdb0bca115d2375706b2c95d8a323" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f

注意这里用 docker-compose run 运行,checkpoint 会保存在宿主机本地

如果要取消 Job 应该用

flink cancel <job-id>

现在 output topic 没有输出了

假设我们对程序或配置做了改变,然后重启程序

docker-compose run --no-deps client flink run \
    -s /tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f/ \
    -d /opt/ClickCountJob.jar \
    --bootstrap.servers kafka:9092 \
    --checkpointing \
    --event-time

Job has been submitted with JobID 29094005784e0e164f79ae7ce28fc250

可以看到 output topic 又重新有输出了,Job 会从 check point 开始处理积累的数据

现在再把 Job 停掉

lin@Ubuntu-VM-1:$ docker-compose run --no-deps client flink stop 29094005784e0e164f79ae7ce28fc250
Suspending job "29094005784e0e164f79ae7ce28fc250" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-290940-e4915465904b

以并发度为 3 重启

docker-compose run --no-deps client flink run \
    -p 3 \
    -s /tmp/flink-savepoints-directory/savepoint-290940-e4915465904b \
    -d /opt/ClickCountJob.jar \
    --bootstrap.servers kafka:9092 \
    --checkpointing \
    --event-time

Job has been submitted with JobID f6011eec5bf4b66f1770e9f03441a8a3

但 output topic 并没有数据输出

查看 Job 状态显示是 Running 的

root@13e5e296d72f:/opt/flink# flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
11.05.2020 17:04:34 : f6011eec5bf4b66f1770e9f03441a8a3 : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

这是因为现在只有一个 task manager,而每个 task manager 只配置了 2 个 slot

查看 flink-playgrounds/operations-playground/conf/flink-conf.yaml

jobmanager.rpc.address: jobmanager
blob.server.port: 6124
query.server.port: 6125

taskmanager.memory.process.size: 1568m
taskmanager.numberOfTaskSlots: 2

state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory

heartbeat.interval: 1000
heartbeat.timeout: 5000

所以要实现并发度 3 至少要两个 task manager,我们再启动一个

lin@Ubuntu-VM-1:$ docker-compose scale taskmanager=2
WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.
Starting operations-playground_taskmanager_1 ... done
Creating operations-playground_taskmanager_2 ... done

通过 docker-compose ps 查看,现在有两个 task manager

operations-playground_taskmanager_1      /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp
operations-playground_taskmanager_2      /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp

很快就可以看到 output topic 又有数据输出了

变量:--checkpointing 和 --event-time

  • --checkpointing enables checkpoint, which is Flink’s fault-tolerance mechanism. If you run without it and go through failure and recovery, you should will see that data is actually lost.

  • --event-time enables event time semantics for your Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of the timestamp of the ClickEvent. Consequently, the number of events per window will not be exactly one thousand anymore.

Metrics

下面的命令在宿主机操作

curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3/metrics"

[{
	"id": "numberOfFailedCheckpoints"
}, {
	"id": "lastCheckpointSize"
}, {
	"id": "lastCheckpointExternalPath"
}, {
	"id": "totalNumberOfCheckpoints"
}, {
	"id": "lastCheckpointRestoreTimestamp"
}, {
	"id": "lastCheckpointAlignmentBuffered"
}, {
	"id": "restartingTime"
}, {
	"id": "uptime"
}, {
	"id": "numberOfInProgressCheckpoints"
}, {
	"id": "downtime"
}, {
	"id": "numberOfCompletedCheckpoints"
}, {
	"id": "numRestarts"
}, {
	"id": "fullRestarts"
}, {
	"id": "lastCheckpointDuration"
}]
curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3/metrics?get=lastCheckpointSize"

[{"id":"lastCheckpointSize","value":"12466"}]
curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3"

{
	"jid": "f6011eec5bf4b66f1770e9f03441a8a3",
	"name": "Click Event Count",
	"isStoppable": false,
	"state": "RUNNING",
	"start-time": 1589216674777,
	"end-time": -1,
	"duration": 407088,
	"now": 1589217081865,
	"timestamps": {
		"RECONCILING": 0,
		"RUNNING": 1589216674820,
		"CANCELED": 0,
		"RESTARTING": 0,
		"FAILING": 0,
		"FAILED": 0,
		"FINISHED": 0,
		"CANCELLING": 0,
		"SUSPENDED": 0,
		"CREATED": 1589216674777
	},
	"vertices": [{
		"id": "bc764cd8ddf7a0cff126f51c16239658",
		"name": "Source: ClickEvent Source",
		"parallelism": 3,
		"status": "RUNNING",
		"start-time": 1589216946118,
		"end-time": -1,
		"duration": 135747,
		"tasks": {
			"FAILED": 0,
			"CANCELING": 0,
			"SCHEDULED": 0,
			"RUNNING": 3,
			"RECONCILING": 0,
			"CREATED": 0,
			"FINISHED": 0,
			"CANCELED": 0,
			"DEPLOYING": 0
		},
		"metrics": {
			"read-bytes": 0,
			"read-bytes-complete": true,
			"write-bytes": 5979476,
			"write-bytes-complete": true,
			"read-records": 0,
			"read-records-complete": true,
			"write-records": 197511,
			"write-records-complete": true
		}
	}, {
		"id": "0a448493b4782967b150582570326227",
		"name": "Timestamps/Watermarks",
		"parallelism": 3,
		"status": "RUNNING",
		"start-time": 1589216946147,
		"end-time": -1,
		"duration": 135718,
		"tasks": {
			"FAILED": 0,
			"CANCELING": 0,
			"SCHEDULED": 0,
			"RUNNING": 3,
			"RECONCILING": 0,
			"CREATED": 0,
			"FINISHED": 0,
			"CANCELED": 0,
			"DEPLOYING": 0
		},
		"metrics": {
			"read-bytes": 5998885,
			"read-bytes-complete": true,
			"write-bytes": 5998076,
			"write-bytes-complete": true,
			"read-records": 197499,
			"read-records-complete": true,
			"write-records": 197499,
			"write-records-complete": true
		}
	}, {
		"id": "ea632d67b7d595e5b851708ae9ad79d6",
		"name": "ClickEvent Counter",
		"parallelism": 3,
		"status": "RUNNING",
		"start-time": 1589216946172,
		"end-time": -1,
		"duration": 135693,
		"tasks": {
			"FAILED": 0,
			"CANCELING": 0,
			"SCHEDULED": 0,
			"RUNNING": 3,
			"RECONCILING": 0,
			"CREATED": 0,
			"FINISHED": 0,
			"CANCELED": 0,
			"DEPLOYING": 0
		},
		"metrics": {
			"read-bytes": 6032525,
			"read-bytes-complete": true,
			"write-bytes": 27913,
			"write-bytes-complete": true,
			"read-records": 197458,
			"read-records-complete": true,
			"write-records": 198,
			"write-records-complete": true
		}
	}, {
		"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
		"name": "Sink: ClickEventStatistics Sink",
		"parallelism": 3,
		"status": "RUNNING",
		"start-time": 1589216946208,
		"end-time": -1,
		"duration": 135657,
		"tasks": {
			"FAILED": 0,
			"CANCELING": 0,
			"SCHEDULED": 0,
			"RUNNING": 3,
			"RECONCILING": 0,
			"CREATED": 0,
			"FINISHED": 0,
			"CANCELED": 0,
			"DEPLOYING": 0
		},
		"metrics": {
			"read-bytes": 36133,
			"read-bytes-complete": true,
			"write-bytes": 0,
			"write-bytes-complete": true,
			"read-records": 198,
			"read-records-complete": true,
			"write-records": 0,
			"write-records-complete": true
		}
	}],
	"status-counts": {
		"FAILED": 0,
		"CANCELING": 0,
		"SCHEDULED": 0,
		"RUNNING": 4,
		"RECONCILING": 0,
		"CREATED": 0,
		"FINISHED": 0,
		"CANCELED": 0,
		"DEPLOYING": 0
	},
	"plan": {
		"jid": "f6011eec5bf4b66f1770e9f03441a8a3",
		"name": "Click Event Count",
		"nodes": [{
			"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
			"parallelism": 3,
			"operator": "",
			"operator_strategy": "",
			"description": "Sink: ClickEventStatistics Sink",
			"inputs": [{
				"num": 0,
				"id": "ea632d67b7d595e5b851708ae9ad79d6",
				"ship_strategy": "FORWARD",
				"exchange": "pipelined_bounded"
			}],
			"optimizer_properties": {}
		}, {
			"id": "ea632d67b7d595e5b851708ae9ad79d6",
			"parallelism": 3,
			"operator": "",
			"operator_strategy": "",
			"description": "ClickEvent Counter",
			"inputs": [{
				"num": 0,
				"id": "0a448493b4782967b150582570326227",
				"ship_strategy": "HASH",
				"exchange": "pipelined_bounded"
			}],
			"optimizer_properties": {}
		}, {
			"id": "0a448493b4782967b150582570326227",
			"parallelism": 3,
			"operator": "",
			"operator_strategy": "",
			"description": "Timestamps/Watermarks",
			"inputs": [{
				"num": 0,
				"id": "bc764cd8ddf7a0cff126f51c16239658",
				"ship_strategy": "FORWARD",
				"exchange": "pipelined_bounded"
			}],
			"optimizer_properties": {}
		}, {
			"id": "bc764cd8ddf7a0cff126f51c16239658",
			"parallelism": 3,
			"operator": "",
			"operator_strategy": "",
			"description": "Source: ClickEvent Source",
			"optimizer_properties": {}
		}]
	}
}