下面是“Spring Boot + Canal 实现数据库实时监控”的完整攻略。
1. 简介
Canal 是阿里巴巴开源的一款用于数据库增量日志解析的工具,它基于 MySQL 构建,实现了 MySQL 数据库增量日志的实时采集,并提供了增量日志解析的功能,目前 Canal 的客户端支持 Spring Boot。
本文将介绍如何使用 Spring Boot 和 Canal 实现数据库实时监控的方法。
2. 准备工作
为了实现数据库实时监控,需要完成以下准备工作:
- 安装 MySQL 数据库,并创建需要监控的数据库和表;
- 下载 Canal 并进行配置;
- 创建 Spring Boot 项目,并添加 Canal 的依赖。
3. 配置 Canal
- 下载 Canal 并解压到指定目录;
- 进入 Canal 下的 conf 目录,复制并重命名其中的示例文件
example/instance.properties
为your_instance.properties
,并修改其中的以下配置项:
canal.instance.master.address = your_mysql_ip:your_mysql_port
canal.instance.master.jdbc.url = jdbc:mysql://your_mysql_ip:your_mysql_port?characterEncoding=UTF-8&useUnicode=true
canal.instance.master.jdbc.username = your_mysql_username
canal.instance.master.jdbc.password = your_mysql_password
canal.instance.dbUsername = your_db_username
canal.instance.dbPassword = your_db_password
canal.instance.defaultDatabaseName = your_db_name
其中,canal.instance.master.address
表示 MySQL 数据库的 IP 和端口号,canal.instance.master.jdbc
表示 MySQL 数据库连接的 URL 和用户名密码,canal.instance.defaultDatabaseName
为默认需要监控的数据库名。
- 将
conf
目录下的canal.properties
中的canal.destinations
修改为需要监控的实例名(与刚刚创建的your_instance.properties
中的canal.instance.mysql.slaveId
值一致),并添加,如:
canal.destinations = example, your_instance
- 修改
canal.conf
中的canal.instance.filter.regex
为需要监控的表名或正则表达式,如:
canal.instance.filter.regex = your_db_name\.your_table_name
- 启动 Canal:
cd canal.deployer-1.1.4
bin/startup.sh
- 验证 Canal 是否运行正常,可以访问以下地址验证:
http://localhost:8080/canal/admin/index
4. 配置 Spring Boot
- 在
pom.xml
中添加 Canal 客户端的依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
- 创建 Canal 客户端的配置类
CanalConfig
,设置 Canal 客户端的参数:
```
@Configuration
public class CanalConfig {
@Bean
public CanalConnector canalConnector(@Value("${canal.hostname}") String hostname,
@Value("${canal.port}") int port,
@Value("${canal.destination}") String destination,
@Value("${canal.username}") String username,
@Value("${canal.password}") String password) {
return CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port),
destination, username, password);
}
}
```
其中,canal.hostname
、canal.port
、canal.destination
、canal.username
和 canal.password
都是从配置文件中读取的参数。
- 创建实现了 Canal 客户端监听器接口
CanalListener
的类,并在其中定义自己想要实现的数据库操作。
```
@Component
public class MyCanalListener implements CanalListener {
@Override
public void onMessage(CanalMessage canalMessage) {
// TODO 实现自己想要的数据库操作
}
}
```
- 使用
@EnableCanalClient
注解开启 Canal 客户端并监听:
```
@SpringBootApplication
@EnableCanalClient
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
```
以上为 Spring Boot 和 Canal 的配置过程。
5. 示例
示例 1:打印学生表变更的信息
假设需要监控的数据库名为 test
,表名为 student
,需要实现学生表变更时打印变更的信息。
- 修改
your_instance.properties
中的配置:
canal.instance.filter.regex = test\.student
- 实现
MyCanalListener
类:
```
@Component
public class MyCanalListener implements CanalListener {
@Override
public void onMessage(CanalMessage canalMessage) {
List<CanalEntry.Entry> entries = canalMessage.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE) {
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
JSONObject jsonObject = new JSONObject();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
jsonObject.put(column.getName(), column.getValue());
}
System.out.println("Canal监听到Database: " + entry.getHeader().getSchemaName() + ", TableName: " + entry.getHeader().getTableName() +
", EventType: " + eventType + ", AfterData: " + jsonObject.toJSONString());
}
}
}
}
}
}
```
其中,通过 entry.getHeader().getSchemaName()
和 entry.getHeader().getTableName()
获取了监听到的数据库名和表名,通过 rowChange.getEventType()
获取操作类型,通过遍历 rowData.getAfterColumnsList()
获取变更后的数据,并打印变更信息。
示例 2:将订单表的变更记录到 Redis
假设需要监控的数据库名为 test
,表名为 order
,需要实现订单表变更时将变更记录到 Redis 中。
- 修改
your_instance.properties
中的配置:
canal.instance.filter.regex = test\.order
- 在
application.properties
中添加 Redis 的连接和相关配置:
spring.redis.host = localhost
spring.redis.port = 6379
spring.redis.database = 0
spring.redis.password =
- 实现
MyCanalListener
类:
```
@Component
public class MyCanalListener implements CanalListener {
private final RedisTemplate<String, String> redisTemplate;
public MyCanalListener(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(CanalMessage canalMessage) {
List<CanalEntry.Entry> entries = canalMessage.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE) {
String key = entry.getHeader().getTableName() + ":" + rowChange.getRowDatas(0).getAfterColumns(0).getValue();
Map<String, String> valueMap = new HashMap<>();
for (CanalEntry.Column column : rowChange.getRowDatasList().get(0).getAfterColumnsList()) {
valueMap.put(column.getName(), column.getValue());
}
redisTemplate.opsForHash().putAll(key, valueMap);
System.out.println("Canal监听到Database: " + entry.getHeader().getSchemaName() + ", TableName: " + entry.getHeader().getTableName() +
", EventType: " + eventType + ", AfterData: " + valueMap);
}
}
}
}
}
```
其中,通过 RedisTemplate 实例将变更信息存储到 Redis 中。
以上就是示例的具体过程。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spring Boot + Canal 实现数据库实时监控 - Python技术站