【发布时间】:2023-04-07 15:37:01
【问题描述】:
我正在尝试使用 python 驱动程序将数据加载到 Cassandra。我能得到的最快速度是每秒 6k 次写入。我正在读取的 csv 有大约 115 万行,导致总插入时间约为 3 分 10 秒。我真的需要将这段时间缩短到 2 分钟或更短,以便及时了解数据。
我的数据由 115 万行和 52 列组成。
目前我正在使用 session.execute_async 函数来插入数据。更改我一次允许的异步请求数量似乎可以加快速度。似乎在大约 5-6k 个请求之后阻塞会导致最快的插入率。
我确实尝试过批量插入,但速度非常慢。
这是我目前将数据插入 Cassandra 的方法。
# insert data into cassandra table
execution_profile = ExecutionProfile(request_timeout=10)
profiles = {'node1': execution_profile}
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['11.111.11.11'], 9042, auth_provider=auth_provider, execution_profiles=profiles)
session = cluster.connect() # connect to your keyspace
# Read csv rows into cassandra
count = 0
futures = []
with open('/massaged.csv') as f:
next(f) #skip the header row
for line in f:
query = SimpleStatement("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,sfc_vis,sfc_gust,sfc_pres,sfc_hgt,sfc_tmp,sfc_snow_0Xacc,sfc_cnwat,sfc_weasd,sfc_snowc,sfc_snod,two_m_tmp,two_m_pot,two_m_spfh,two_m_dpt,two_m_rh,ten_m_ugrd,ten_m_vgrd,ten_m_wind_1hr_max,ten_m_maxuw_1hr_max,ten_m_maxvw_1hr_max,sfc_cpofp,sfc_prate,sfc_apcp_0Xacc,sfc_weasd_0Xacc,sfc_frozr_0Xacc,sfc_frzr_0Xacc,sfc_ssrun_1hr_acc,sfc_bgrun_1hr_acc,sfc_apcp_1hr_acc,sfc_weasd_1hr_acc,sfc_frozr_1hr_acc,sfc_csnow,sfc_cicep,sfc_cfrzr,sfc_crain,sfc_sfcr,sfc_fricv,sfc_shtfl,sfc_lhtfl,sfc_gflux,sfc_vgtyp,sfc_cape,sfc_cin,sfc_dswrf,sfc_dlwrf,sfc_uswrf,sfc_ulwrf,sfc_vbdsf,sfc_vddsf,sfc_hpbl) VALUES (%s)" %(line), consistency_level=ConsistencyLevel.ONE)
futures.append(session.execute_async(query, execution_profile='node1'))
count += 1
if count % 5000 == 0:
for f in futures:
f.result() # blocks until remaining inserts are completed.
futures = []
print("rows processed: " + str(count))
# Catch any remaining async requests that haven't finished
for f in futures:
f.result() # blocks until remaining inserts are completed.
print("rows processed: " + str(count))
我需要将插入时间缩短到大约 2 分钟或更短(大约每秒 10K 次插入)。我应该使用多处理来实现这一点还是我错误地使用了 execute_async 函数?
更新
根据 Alex 的建议,我尝试实现一个准备好的语句。这是我想出的,但它似乎要慢得多?对我做错了什么有什么想法吗?
hrrr_prepared = session.prepare("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,...,sfc_hpbl) VALUES (?, ..., ?)")
for row in range(0, len(data)):
futures.append(session.execute_async(hrrr_prepared, tuple(data.iloc[row])))
count += 1
if count % 5000 == 0:
for f in futures:
f.result() # blocks until remaining inserts are completed.
futures = []
print("rows processed: " + str(count))
注意:为了便于阅读,我将“...”放在准备好的语句中,实际代码没有。
【问题讨论】:
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:如何使用 Python 驱动程序加速将 execute_async 插入 Cassandra - Python技术站