操作 ClickHouse 的时候有这样一个需求:从一个表中读取这个表的一列数据,然后添加一列自增 ID 再插入新表,这个 ID 必须从 1 开始到 1亿结束。ClickHouse中不支持自增 ID,且 ClickHouse 自带的 generateUUIDIP44 只能生成 UUID 类型,虽然 UUID 可以转换为 string 和 int,但是没法从 0 开始自增。
ClickHouse 中还有种方法可以生成自增整数:
select * from numbers(1,10)
/**
number
1
2
3
4
5
6
7
8
9
10
*/
但是如果使用这个函数的话:
Insert into table new_table(id,name) select * from numbers(1,10),(select name from old_tablel limit 10)
这会导致产生 100 条数据,数据是 id 和 name 字段的关联,即10 x 10,这不是想要的结果!
有两种方法实现这个需求:
一、SQL 实现
用 arrayJoin() 和 range() 函数来实现。但是 仅适用于小数据量,因为这种方式也是关联两表实现的,因此会产生非常大的内存需求!
Insert into table new_table(id,name) select arrayJoin(range(1,10,1)),name from old_table limit 10
range(1,10,1) 会产生从 1 到 10 的数据,步长为1,产生的结果是一个数组:
select range(1,10,1)
/**
[1,2,3,4,5,6,7,8,9]
*/
因此需要将数组变为列示数组:
select arrayJoin(range(1,10,1))
/**
1
2
3
4
5
6
7
8
9
*/
二、python 脚本实现
因为 SQL 的方式没法满足一亿数据的自增 ID,因此适用 python 脚本将数据拉取,然后添加 ID 后插入新表
#!/usr/bin/env python3
from clickhouse_driver import Client
dim = 128
max_size=10000000
block_size=100000
clickhouse_host="localhost"
clickhouse_port="9000"
clickhouse_user='default'
clickhouse_password=''
old_table="database.old_table"
client = Client(host=clickhouse_host,
port=clickhouse_port,
user=clickhouse_user,
password=clickhouse_password
)
settings = {'max_block_size': block_size}
SQL='SELECT name from {0} limit {1}'.format(old_table,max_size)
rows_gen = client.execute_iter(
SQL.format(clickhouse_db_table=old_table), settings=settings
)
client2 = Client(host=clickhouse_host,
port=clickhouse_port,
user=clickhouse_user,
password=clickhouse_password
)
i=0
dict_list=[]
for row in rows_gen:
i+=1
dict_obj={"id":i,"name":row[0]}
dict_list.append(dict_obj)
if i%block_size==0:
print(i)
client2.execute("INSERT INTO database.new_table (id,name) values", dict_list)
dict_list=[]
if i==max_size:
break
需要 ClickHouse 的 python API
pip install clickhouse-driver
clickhouse-driver API 的文档:Quickstart — clickhouse-driver 0.2.4 documentation
client.execute_iter 方法是通过流式传输来传输数据,避免数据太大,一次传输不了
在插入数据的过程中分批插入,i%block_size 保证每次到达 block_size 的时候才插入数据,如果 block_size 足够大的时候插入速度并不是很慢,一亿数据只需要几分钟。如果不进行批次控制,每条都执行 client.execute() 插入的话会非常耗时!
注意每批次插入完成后需要清空一下 dict_list[] 数组,不然数据会重复,而且可能会导致 OOM!