要将Oracle数据库中的数据同步到Kafka中,可以使用Kafka Connect JDBC插件。Kafka Connect是Kafka的一个可扩展框架,可以通过插件来实现与外部系统的集成。JDBC插件可实现与关系型数据库的连接。
下面是将Oracle同步数据到Kafka的攻略:
准备工作
-
下载并安装Oracle JDBC驱动器。
-
把Kafka的JDBC连接器JAR包下载到连接器安装目录下。
-
编辑Kafka的连接器运行脚本(connect-standalone.sh/connect-distributed.sh),添加JDBC连接器JAR包的路径(包含CLASSPATH环境变量的路径)。
创建Kafka Connect的配置文件
-
新建一个JSON文件,定义以下字段:
{
"name": "OracleSourceConnector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"topics": "OracleData",
"connection.url": "<Oracle Database Connection URL>",
"connection.user": "<Oracle User>",
"connection.password": "<Oracle Password>",
"mode": "incrementing",
"incrementing.column.name": "<name of the column to be used as a row identifier for incremental updates>",
"topic.prefix": "<the prefix for the generated topics>"
}
} -
定义上述配置文件中的每个字段:
-
name
: 连接器的名称,必须是唯一的。 -
connector.class
: 使用的连接器类,此处为JDBC连接器。 -
tasks.max
: 连接器使用的工作线程数。 -
topics
: 数据写入的Kafka主题。 -
connection.url
: Oracle数据库的连接URL。 -
connection.user
: Oracle的用户名。 -
connection.password
: Oracle用户密码。 -
mode
:- incremental - 用来将表中未同步的最新数据同步到Kafka主题中。
- bulk - 将整张表数据全部发送到Kafka主题中。
-
incrementing.column.name
: 指定作为表的标志列的列名。 -
topic.prefix
: 生成的主题名称的前缀。
启动连接器
- 运行下面的命令启动连接器:
$ kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties oracle-jdbc.properties
- 查看Kafka日志检查是否成功。
示例1
假设Oracle中有名为employees
的表,并且要同步该表的全部数据:
-
修改配置文件中的
name
、topics
和incrementing.column.name
:{
"name": "OracleEmployees",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"topics": "employees",
"connection.url": "<Oracle Database Connection URL>",
"connection.user": "<Oracle User>",
"connection.password": "<Oracle Password>",
"mode": "bulk",
"topic.prefix": "oracle-",
"incrementing.column.name": "employee_id"
}
} -
启动连接器:
$ kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties oracle-employees.properties
- 查看Kafka日志,检查主题是否同步成功。
示例2
以下是将Oracle中的数据归档到Kafka的例子。归档的数据是某个特定的时间段内已更新的记录,使用incrementing
模式:
- 定义一个名为
employees_incremental
的配置文件:
{
"name": "OracleEmployeesIncremental",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"topics": "employees_archived",
"connection.url": "<Oracle Database Connection URL>",
"connection.user": "<Oracle User>",
"connection.password": "<Oracle Password>",
"mode": "incrementing",
"timestamp.column.name": "last_modified",
"incrementing.column.name": "employee_id",
"topic.prefix": "oracle-"
}
}
- 启动连接器
$ kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties oracle-employees-incremental.properties
- 查看Kafka日志,检查主题是否同步成功。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Oracle同步数据到kafka的方法 - Python技术站