下面是详细讲解Python操作ES的方式及与Mysql数据同步过程的完整攻略。
Python操作ES的方式
安装elasticsearch-py库
使用pip安装elasticsearch-py库:
pip install elasticsearch
连接Elasticsearch
连接Elasticsearch的方式:
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
以上代码中,通过指定Elasticsearch的URL和端口号,建立连接并返回一个Elasticsearch的对象。
创建索引
创建索引的方式:
es.indices.create(index='my_index', ignore=400)
以上代码中,创建了一个名为my_index的新索引。如果索引已存在,则忽略错误(ignore=400)。
删除索引
删除索引的方式:
es.indices.delete(index='my_index', ignore=[400, 404])
以上代码中,删除名为my_index的索引。如果索引不存在,则忽略错误(ignore=[400, 404])。
增加文档
增加文档的方式:
es.index(index='my_index', doc_type='my_type', body={'name': 'Lucy', 'age': 20})
以上代码中,创建了一个名为my_index的索引,类型为my_type,文档内容为{'name': 'Lucy', 'age': 20}。
查询文档
查询文档的方式:
es.search(index='my_index', body={'query': {'match': {'name': 'Lucy'}}})
以上代码中,从名为my_index的索引中查询,返回匹配name字段为Lucy的文档。
Mysql数据同步过程示例
MySQL是一种关系型数据库,可使用Python操作MySQL并将数据同步到Elasticsearch中。
以下是一个将MySQL中的数据同步到Elasticsearch的过程示例:
创建数据库表
创建数据表的SQL语句:
CREATE TABLE `users` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(50) NOT NULL,
`age` int(11) NOT NULL,
`address` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
连接MySQL
连接MySQL的方式:
import pymysql.cursors
connection = pymysql.connect(host='localhost',
user='user',
password='passwd',
db='db_name',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
以上代码中,通过指定MySQL数据库的host、用户名、密码、数据库名和字符编码等参数,建立连接并返回一个连接对象。
从MySQL中读取数据
读取MySQL中数据的方式:
with connection.cursor() as cursor:
# 查询数据的SQL语句
sql = "SELECT `name`, `age`, `address` FROM `users`"
cursor.execute(sql)
result = cursor.fetchall()
以上代码中,使用with...as语句打开一个游标,并使用游标执行SQL查询语句。执行的结果保存在result变量中。
同步数据到Elasticsearch
同步数据到Elasticsearch的方式:
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
for row in result:
es.index(index='my_index', doc_type='my_type', body=row)
以上代码中,使用for循环遍历MySQL查询结果数组,并将每行数据插入到Elasticsearch的my_index索引中。
至此,已经完成MySQL数据库与Elasticsearch的数据同步过程示例。
另外,可以使用Python的定时任务包schedule,实现定时同步MySQL数据到Elasticsearch的功能。例如:
import schedule
import time
def sync_data():
# 数据同步代码
pass
schedule.every().day.at("03:00").do(sync_data)
while True:
schedule.run_pending()
time.sleep(1)
以上代码中,使用schedule包每天03:00定时执行同步数据的函数sync_data()。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python操作ES的方式及与Mysql数据同步过程示例 - Python技术站