ClickHouse Auto Increment ID 自增ID

ClickHouse Auto Increment ID 自增ID

·

2 min read

操作 ClickHouse 的时候有这样一个需求:从一个表中读取这个表的一列数据,然后添加一列自增 ID 再插入新表,这个 ID 必须从 1 开始到 1亿结束。ClickHouse中不支持自增 ID,且 ClickHouse 自带的 generateUUIDIP44 只能生成 UUID 类型,虽然 UUID 可以转换为 string 和 int,但是没法从 0 开始自增。

ClickHouse 中还有种方法可以生成自增整数:

select * from numbers(110)
/**
number
1
2
3
4
5
6
7
8
9
10
*/

但是如果使用这个函数的话:

Insert into table new_table(id,name) select * from numbers(1,10),(select name from old_tablel limit 10)

这会导致产生 100 条数据,数据是 id 和 name 字段的关联,即10 x 10,这不是想要的结果!

有两种方法实现这个需求:

一、SQL 实现

用 arrayJoin() 和 range() 函数来实现。但是 仅适用于小数据量,因为这种方式也是关联两表实现的,因此会产生非常大的内存需求!

Insert into table new_table(id,name) select arrayJoin(range(1,10,1)),name from old_table limit 10

range(1,10,1) 会产生从 1 到 10 的数据,步长为1,产生的结果是一个数组:

select range(1,10,1)
/**
[1,2,3,4,5,6,7,8,9]
*/

因此需要将数组变为列示数组:

select arrayJoin(range(1,10,1))
/**
1
2
3
4
5
6
7
8
9
*/

二、python 脚本实现

因为 SQL 的方式没法满足一亿数据的自增 ID,因此适用 python 脚本将数据拉取,然后添加 ID 后插入新表

#!/usr/bin/env python3
from clickhouse_driver import Client

dim = 128
max_size=10000000
block_size=100000

clickhouse_host="localhost"
clickhouse_port="9000"
clickhouse_user='default'
clickhouse_password=''

old_table="database.old_table"


client = Client(host=clickhouse_host,
                port=clickhouse_port,
                user=clickhouse_user,
                password=clickhouse_password
                )

settings = {'max_block_size': block_size}
SQL='SELECT name from {0} limit {1}'.format(old_table,max_size)

rows_gen = client.execute_iter(
    SQL.format(clickhouse_db_table=old_table), settings=settings
)


client2 = Client(host=clickhouse_host,
                port=clickhouse_port,
                user=clickhouse_user,
                password=clickhouse_password
                )

i=0
dict_list=[]
for row in rows_gen:
    i+=1
    dict_obj={"id":i,"name":row[0]}
    dict_list.append(dict_obj)

    if i%block_size==0:
        print(i)
        client2.execute("INSERT INTO database.new_table (id,name) values", dict_list) 
        dict_list=[]

    if i==max_size:
        break
  • 需要 ClickHouse 的 python API

    pip install clickhouse-driver
    
  • clickhouse-driver API 的文档:Quickstart — clickhouse-driver 0.2.4 documentation

  • client.execute_iter 方法是通过流式传输来传输数据,避免数据太大,一次传输不了

  • 在插入数据的过程中分批插入,i%block_size 保证每次到达 block_size 的时候才插入数据,如果 block_size 足够大的时候插入速度并不是很慢,一亿数据只需要几分钟。如果不进行批次控制,每条都执行 client.execute() 插入的话会非常耗时!

  • 注意每批次插入完成后需要清空一下 dict_list[] 数组,不然数据会重复,而且可能会导致 OOM!