关于Mysql到Elasticsearch高效实时同步Debezium实现的攻略,我可以提供如下具体步骤:
准备工作
-
安装Mysql、Elasticsearch、Kibana和Debezium Connector并设置好它们的环境变量,确保能正常运行它们。
-
开启binlog以便Debezium捕获Mysql的数据变更,具体可以在Mysql中修改配置文件
my.cnf
,设置log-bin=mysql-bin
并重启服务。
配置Debezium Connectors
- 在Debezium中配置Mysql Connector并启动它:
json
{
"name": "db-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "1",
"database.server.name": "db",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "db.history"
}
}
该配置告诉Debezium连接到本地的Mysql服务,并通过Kafka消息队列来保存Mysql的历史数据变更。
- 在Debezium中配置Elasticsearch Connector并启动它:
json
{
"name": "es-connector",
"config": {
"connector.class": "io.debezium.connector.elasticsearch.ElasticsearchConnector",
"tasks.max": "1",
"database.hosts": "http://localhost:9200",
"database.user": "elastic",
"database.password": "changeme",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "es.history",
"key.ignore": "true"
}
}
该配置告诉Debezium连接到本地的Elasticsearch服务,并通过Kafka消息队列来保存新的数据变更。
验证
- 在Mysql中创建一张新表:
sql
CREATE TABLE `db`.`users` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(45) NOT NULL,
`email` VARCHAR(45) NOT NULL,
`password` VARCHAR(45) NOT NULL,
PRIMARY KEY (`id`)
);
- 在Kibana中验证新表同步到了Elasticsearch:
shell
curl -X GET "http://localhost:9200/users/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"match_all": {}
}
}
'
预期结果为:
json
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
- 在Mysql中插入一条新的用户数据:
sql
INSERT INTO `db`.`users` (`name`, `email`, `password`) VALUES ('test', 'test@example.com', 'test');
- 再次在Kibana中验证同步到了Elasticsearch:
shell
curl -X GET "http://localhost:9200/users/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"match_all": {}
}
}
'
预期结果为:
json
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "users",
"_type": "doc",
"_id": "1",
"_score": 1,
"_source": {
"id": 1,
"name": "test",
"email": "test@example.com",
"password": "test"
}
}
]
}
}
以上即是Mysql到Elasticsearch高效实时同步Debezium实现的攻略,希望能帮到你。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Mysql到Elasticsearch高效实时同步Debezium实现 - Python技术站