Flink 提供一个基于 Docker 的 Playground 给用户了解学习 Flink
https://github.com/apache/flink-playgrounds
搭建环境
下载
git clone --branch release-1.10 https://github.com/apache/flink-playgrounds.git
查看 flink-playgrounds/operations-playground/docker-compose.yaml 的内容
version: "2.1"
services:
client:
build: ../docker/ops-playground-image
image: apache/flink-ops-playground:1-FLINK-1.10-scala_2.11
command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
- kafka
volumes:
- ./conf:/opt/flink/conf
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
image: apache/flink-ops-playground:1-FLINK-1.10-scala_2.11
command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka
jobmanager:
image: flink:1.10.0-scala_2.11
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
volumes:
- ./conf:/opt/flink/conf
- flink-checkpoints-directory:/tmp/flink-checkpoints-directory
- /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.10.0-scala_2.11
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"
volumes:
- ./conf:/opt/flink/conf
- flink-checkpoints-directory:/tmp/flink-checkpoints-directory
- /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
zookeeper:
image: wurstmeister/zookeeper:3.4.6
kafka:
image: wurstmeister/kafka:2.12-2.2.1
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_CREATE_TOPICS: "input:2:1, output:2:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ports:
- 9094:9094
volumes:
flink-checkpoints-directory:
可以看到这个文件定义了 kafka、zookeeper、jobmanager、taskmanager、clickevent-generator、client 等多个容器,其中 client 是通过 dockerfile 生成的,而 clickevent-generator 是基于 client 生成的 image,其他的是直接下载 image
build client image 的时候用到 maven,最好换成国内的源
到 flink-playgrounds/docker/ops-playground-image 添加 settings.xml 文件,内容如下
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<mirrors>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>central</mirrorOf>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
</mirrors>
</settings>
再修改 flink-playgrounds/docker/ops-playground-image/Dockerfile 添加下面两行
COPY ./settings.xml /usr/share/maven/conf/settings.xml
COPY ./settings.xml /root/.m2/settings.xml
编译和启动服务
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d
查看新的本地镜像
docker images
查看容器
docker-compose ps
可以看到 kafka、zookeeper、jobmanager、taskmanager 组成的集群在运行,clickevent-generator 在产生数据,而 client 已经成功提交了 job 并退出了
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------
operations-playground_clickevent-generator_1 /docker-entrypoint.sh java ... Up 6123/tcp, 8081/tcp
operations-playground_client_1 /docker-entrypoint.sh flin ... Exit 0
operations-playground_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp
operations-playground_kafka_1 start-kafka.sh Up 0.0.0.0:9094->9094/tcp
operations-playground_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
operations-playground_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
可以通过下面的命令退出所有容器
docker-compose down
也可以通过 docker 命令单独操作
UI 界面
登陆 http://localhost:8081 可以看 UI 界面
可以看到有一个 Click Event Count 的 Job 正在运行
查看集群状态
通过下面的命令可以看到 log
docker-compose logs -f jobmanager
docker-compose logs -f taskmanager
登陆其中一个容器,看一下 flink 命令
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/# flink -v
Version: 1.10.0, Commit ID: aa4eb8f
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/# flink help
"help" is not a valid action.
Valid actions are "run", "list", "info", "savepoint", "stop", or "cancel".
Specify the version option (-v or --version) to print Flink version.
Specify the help option (-h or --help) to get help on the command.
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/#
root@68b39518cf16:/# flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
10.05.2020 11:19:18 : c763a6a422d9392da4e9a9678fb66287 : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
root@68b39518cf16:/#
root@68b39518cf16:/# flink --help
root@68b39518cf16:/#
通过 REST API 也可以查看 Job
curl localhost:8081/jobs
clickevent-generator 容器有一个一直运行的 java 程序
不断的往 kafka 的 input topic 发数据,可以查看发的信息
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic input
client 容器通过 flink run 命令提交了一个一直在运行的 Click Event Count 的 flink 程序
不断从 input 读数据、统计、发往 output topic
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
这几个容器都没有安装 ps 命令,自己装一个
apt-get update
apt-get install procps
如果命令太长 ps 可能不会全显示出来,可以先重定向到文件再看
ps -ef > temp.txt
登上 jobmanager 可以看到启动的 Java 程序是
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
登上 taskmanager 可以看到启动的 Java 程序是
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
generator 上同样有个 java 程序在生成数据发到 kafka
错误恢复机制
先连上 kafka 的 output topic
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
然后 kill 掉 taskmanager
docker-compose kill taskmanager
可以看到 UI 上的 taskmanager 不见了(但还显示有 job 在 running,奇怪)
通过 flink list 命令可以看到 job 状态变成 RESTARTING
同时可以看到 output topic 没有数据输出了
实际上 JobManager 在发现 TaskManager 不可用后,就 cancel 了受影响的正在运行的 job,并立刻重新提交 Job,这个 Job 会一直等待直到 TaskManager 可用
重启 TaskManager
docker-compose up -d taskmanager
可以看到 TaskManager 和 Job 的状态都恢复了,output topic 也继续有输出了
实际上 Job 会从上次的 checkpoint 处恢复状态,并迅速处理积累的所有数据
升级和伸缩
flink stop 命令可以停止 Job 并保存 save point
lin@Ubuntu-VM-1:$ docker-compose run --no-deps client flink stop c2cbdb0bca115d2375706b2c95d8a323
Suspending job "c2cbdb0bca115d2375706b2c95d8a323" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f
注意这里用 docker-compose run 运行,checkpoint 会保存在宿主机本地
如果要取消 Job 应该用
flink cancel <job-id>
现在 output topic 没有输出了
假设我们对程序或配置做了改变,然后重启程序
docker-compose run --no-deps client flink run \
-s /tmp/flink-savepoints-directory/savepoint-c2cbdb-8d350f39371f/ \
-d /opt/ClickCountJob.jar \
--bootstrap.servers kafka:9092 \
--checkpointing \
--event-time
Job has been submitted with JobID 29094005784e0e164f79ae7ce28fc250
可以看到 output topic 又重新有输出了,Job 会从 check point 开始处理积累的数据
现在再把 Job 停掉
lin@Ubuntu-VM-1:$ docker-compose run --no-deps client flink stop 29094005784e0e164f79ae7ce28fc250
Suspending job "29094005784e0e164f79ae7ce28fc250" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-290940-e4915465904b
以并发度为 3 重启
docker-compose run --no-deps client flink run \
-p 3 \
-s /tmp/flink-savepoints-directory/savepoint-290940-e4915465904b \
-d /opt/ClickCountJob.jar \
--bootstrap.servers kafka:9092 \
--checkpointing \
--event-time
Job has been submitted with JobID f6011eec5bf4b66f1770e9f03441a8a3
但 output topic 并没有数据输出
查看 Job 状态显示是 Running 的
root@13e5e296d72f:/opt/flink# flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
11.05.2020 17:04:34 : f6011eec5bf4b66f1770e9f03441a8a3 : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
这是因为现在只有一个 task manager,而每个 task manager 只配置了 2 个 slot
查看 flink-playgrounds/operations-playground/conf/flink-conf.yaml
jobmanager.rpc.address: jobmanager
blob.server.port: 6124
query.server.port: 6125
taskmanager.memory.process.size: 1568m
taskmanager.numberOfTaskSlots: 2
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory
heartbeat.interval: 1000
heartbeat.timeout: 5000
所以要实现并发度 3 至少要两个 task manager,我们再启动一个
lin@Ubuntu-VM-1:$ docker-compose scale taskmanager=2
WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.
Starting operations-playground_taskmanager_1 ... done
Creating operations-playground_taskmanager_2 ... done
通过 docker-compose ps 查看,现在有两个 task manager
operations-playground_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
operations-playground_taskmanager_2 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
很快就可以看到 output topic 又有数据输出了
变量:--checkpointing 和 --event-time
-
--checkpointing
enables checkpoint, which is Flink’s fault-tolerance mechanism. If you run without it and go through failure and recovery, you should will see that data is actually lost. -
--event-time
enables event time semantics for your Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of the timestamp of theClickEvent
. Consequently, the number of events per window will not be exactly one thousand anymore.
Metrics
下面的命令在宿主机操作
curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3/metrics"
[{
"id": "numberOfFailedCheckpoints"
}, {
"id": "lastCheckpointSize"
}, {
"id": "lastCheckpointExternalPath"
}, {
"id": "totalNumberOfCheckpoints"
}, {
"id": "lastCheckpointRestoreTimestamp"
}, {
"id": "lastCheckpointAlignmentBuffered"
}, {
"id": "restartingTime"
}, {
"id": "uptime"
}, {
"id": "numberOfInProgressCheckpoints"
}, {
"id": "downtime"
}, {
"id": "numberOfCompletedCheckpoints"
}, {
"id": "numRestarts"
}, {
"id": "fullRestarts"
}, {
"id": "lastCheckpointDuration"
}]
curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3/metrics?get=lastCheckpointSize"
[{"id":"lastCheckpointSize","value":"12466"}]
curl "localhost:8081/jobs/f6011eec5bf4b66f1770e9f03441a8a3"
{
"jid": "f6011eec5bf4b66f1770e9f03441a8a3",
"name": "Click Event Count",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1589216674777,
"end-time": -1,
"duration": 407088,
"now": 1589217081865,
"timestamps": {
"RECONCILING": 0,
"RUNNING": 1589216674820,
"CANCELED": 0,
"RESTARTING": 0,
"FAILING": 0,
"FAILED": 0,
"FINISHED": 0,
"CANCELLING": 0,
"SUSPENDED": 0,
"CREATED": 1589216674777
},
"vertices": [{
"id": "bc764cd8ddf7a0cff126f51c16239658",
"name": "Source: ClickEvent Source",
"parallelism": 3,
"status": "RUNNING",
"start-time": 1589216946118,
"end-time": -1,
"duration": 135747,
"tasks": {
"FAILED": 0,
"CANCELING": 0,
"SCHEDULED": 0,
"RUNNING": 3,
"RECONCILING": 0,
"CREATED": 0,
"FINISHED": 0,
"CANCELED": 0,
"DEPLOYING": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 5979476,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 197511,
"write-records-complete": true
}
}, {
"id": "0a448493b4782967b150582570326227",
"name": "Timestamps/Watermarks",
"parallelism": 3,
"status": "RUNNING",
"start-time": 1589216946147,
"end-time": -1,
"duration": 135718,
"tasks": {
"FAILED": 0,
"CANCELING": 0,
"SCHEDULED": 0,
"RUNNING": 3,
"RECONCILING": 0,
"CREATED": 0,
"FINISHED": 0,
"CANCELED": 0,
"DEPLOYING": 0
},
"metrics": {
"read-bytes": 5998885,
"read-bytes-complete": true,
"write-bytes": 5998076,
"write-bytes-complete": true,
"read-records": 197499,
"read-records-complete": true,
"write-records": 197499,
"write-records-complete": true
}
}, {
"id": "ea632d67b7d595e5b851708ae9ad79d6",
"name": "ClickEvent Counter",
"parallelism": 3,
"status": "RUNNING",
"start-time": 1589216946172,
"end-time": -1,
"duration": 135693,
"tasks": {
"FAILED": 0,
"CANCELING": 0,
"SCHEDULED": 0,
"RUNNING": 3,
"RECONCILING": 0,
"CREATED": 0,
"FINISHED": 0,
"CANCELED": 0,
"DEPLOYING": 0
},
"metrics": {
"read-bytes": 6032525,
"read-bytes-complete": true,
"write-bytes": 27913,
"write-bytes-complete": true,
"read-records": 197458,
"read-records-complete": true,
"write-records": 198,
"write-records-complete": true
}
}, {
"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
"name": "Sink: ClickEventStatistics Sink",
"parallelism": 3,
"status": "RUNNING",
"start-time": 1589216946208,
"end-time": -1,
"duration": 135657,
"tasks": {
"FAILED": 0,
"CANCELING": 0,
"SCHEDULED": 0,
"RUNNING": 3,
"RECONCILING": 0,
"CREATED": 0,
"FINISHED": 0,
"CANCELED": 0,
"DEPLOYING": 0
},
"metrics": {
"read-bytes": 36133,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 198,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}],
"status-counts": {
"FAILED": 0,
"CANCELING": 0,
"SCHEDULED": 0,
"RUNNING": 4,
"RECONCILING": 0,
"CREATED": 0,
"FINISHED": 0,
"CANCELED": 0,
"DEPLOYING": 0
},
"plan": {
"jid": "f6011eec5bf4b66f1770e9f03441a8a3",
"name": "Click Event Count",
"nodes": [{
"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
"parallelism": 3,
"operator": "",
"operator_strategy": "",
"description": "Sink: ClickEventStatistics Sink",
"inputs": [{
"num": 0,
"id": "ea632d67b7d595e5b851708ae9ad79d6",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}],
"optimizer_properties": {}
}, {
"id": "ea632d67b7d595e5b851708ae9ad79d6",
"parallelism": 3,
"operator": "",
"operator_strategy": "",
"description": "ClickEvent Counter",
"inputs": [{
"num": 0,
"id": "0a448493b4782967b150582570326227",
"ship_strategy": "HASH",
"exchange": "pipelined_bounded"
}],
"optimizer_properties": {}
}, {
"id": "0a448493b4782967b150582570326227",
"parallelism": 3,
"operator": "",
"operator_strategy": "",
"description": "Timestamps/Watermarks",
"inputs": [{
"num": 0,
"id": "bc764cd8ddf7a0cff126f51c16239658",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}],
"optimizer_properties": {}
}, {
"id": "bc764cd8ddf7a0cff126f51c16239658",
"parallelism": 3,
"operator": "",
"operator_strategy": "",
"description": "Source: ClickEvent Source",
"optimizer_properties": {}
}]
}
}
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink : Docker Playground - Python技术站