clickhouse相关

时间:2025-04-24 10:29:37

目录

1.创建一个表(使用MergeTree引擎)

建表注意点

() 参数

解决重复数据问题

读取得到df,读取得到DataFrame格式的数据


1.创建一个表(使用MergeTree引擎)

create table t_order_mt(
 id UInt32,
 sku_id String,
 total_amount Decimal(16,2),
 create_time Datetime
) engine =MergeTree
 partition by toYYYYMMDD(create_time)
 primary key (id)
 order by (id,sku_id);


建表注意点

order by 设定了分区内的数据按照哪些字段顺序进行有序保存;
order by 是 MergeTree 中唯一一个必填项,甚至比 primary key 还重要,因为当用户不设置主键的情况,很多处理会依照 order by 的字段进行处理(比如后面会讲的去重和汇总);
要求:主键必须是 order by 字段的前缀字段,比如 order by 字段是 (id,sku_id) 那么主键必须是 id 或者(id,sku_id)


() 参数

ENGINE [=] MergeTree(date-column [, sampling_expression], (primary, key), index_granularity)

date-column— Date类型的列的名称。ClickHouse 根据该列自动按月创建分区。分区名称采用"YYYYMM"格式。
sampling_expression— 抽样表达式。
(primary, key)- 首要的关键。类型:元组()
index_granularity— 索引的粒度。索引“标记”之间的数据行数。值 8192 适用于大多数任务。


解决重复数据问题

方法一:
4.1 可以使用 ReplacingMergeTree,它能够在合并分区时删除重复的数据,但是只能对同一分区的数据去重,且去重依据是order by排序键,而不是主键。

4.2 
指定表引擎:
ENGINE = ReplacingMergeTree([ver])
参数:ver,版本列。版本列的类型为UInt*、Date或DateTime。可选参数。

4.3 合并的时候,ReplacingMergeTree从所有相同主键的行中选择一行留下,如果ver未指定,选择最后一条。如果指定了ver列,选择ver值最大的版本。

4.4 执行optimize手动触发合并
optimize table replace_test final;
示例:可以用ReplacingMergeTree引擎创建EPBI表,把EPBI表不设置分区(将会创建一个all分区),将PRIPID放在order by排序键里,将数据汇聚时间放在版本号里,导入数据后手动触发分区。
 
CREATE TABLE MARKET_SUPERVISE_MONITOR_ACTIVITY_BASE_SHENGJU.MSM_ACT_BASE_EPBI_GT1
(

    `PRIPID` String COMMENT '主体身份代码',

    `UNISCID` Nullable(String) COMMENT '统一社会信用代码',

    `ENTNAME` Nullable(String) COMMENT '企业(机构)名称',

    `REGNO` Nullable(String) COMMENT '注册号',

    `ESTDATE` Nullable(Date) COMMENT '成立日期',

    `REGSTATE` Nullable(String) COMMENT '登记状态',

    `TRADE` Nullable(String) COMMENT '产业类型',

    `INDUSTRYPHY` Nullable(String) COMMENT '行业门类',

    `INDUSTRYCO` Nullable(String) COMMENT '行业大类',

    `REGCAP` Nullable(Float64) COMMENT '注册资本(金,单位:万元)',

    `REGCAPCUR` Nullable(String) COMMENT '注册资本(金)币种',

    `REGCAPLEVEL` Nullable(String) COMMENT '注册资本规模',

    `REPORTTYPE` Nullable(String) COMMENT '企业性质',

    `PROVINCE` Nullable(String) COMMENT '省',

    `CITY` Nullable(String) COMMENT '市',

    `COUNTY` Nullable(String) COMMENT '县',

    `FTZ` Nullable(String) COMMENT '自贸区',

    `FTZYN` Nullable(String) COMMENT '是否自贸区',

    `DOMDISTRICT` Nullable(String) COMMENT '住所所在行政区划',

    `DOM` Nullable(String) COMMENT '住所',

    `TOWN` Nullable(String) COMMENT '是否城镇',

    `XZDATE` Date DEFAULT now() COMMENT '新增日期',

    `S_EXT_DATATIME` Datetime COMMENT '数据汇总时间'
)
engine = ReplacingMergeTree(S_EXT_DATATIME)
 order by (PRIPID);
 
CREATE TABLE MARKET_SUPERVISE_MONITOR_ACTIVITY_BASE_SHENGJU.MSM_ACT_BASE_EPBI_NZ1
(

    `PRIPID` String COMMENT '主体身份代码',

    `UNISCID` Nullable(String) COMMENT '统一社会信用代码',

    `ENTNAME` Nullable(String) COMMENT '企业(机构)名称',

    `REGNO` Nullable(String) COMMENT '注册号',

    `ESTDATE` Nullable(Date) COMMENT '成立日期',

    `REGSTATE` Nullable(String) COMMENT '登记状态',

    `TRADE` Nullable(String) COMMENT '产业类型',

    `INDUSTRYPHY` Nullable(String) COMMENT '行业门类',

    `INDUSTRYCO` Nullable(String) COMMENT '行业大类',

    `EI` Nullable(String) COMMENT '战略新兴产业',

    `REGCAP` Nullable(Float64) COMMENT '注册资本(金,单位:万元)',

    `REGCAPCUR` Nullable(String) COMMENT '注册资本(金)币种',

    `REGCAPLEVEL` Nullable(String) COMMENT '注册资本规模',

    `PROVINCE` Nullable(String) COMMENT '省',

    `CITY` Nullable(String) COMMENT '市',

    `COUNTY` Nullable(String) COMMENT '县',

    `FTZ` Nullable(String) COMMENT '自贸区',

    `FTZYN` Nullable(String) COMMENT '是否自贸区',

    `DOMDISTRICT` Nullable(String) COMMENT '住所所在行政区划',

    `DOM` Nullable(String) COMMENT '住所',

    `GOVNUM` Nullable(Int64) COMMENT '企事业单位或社会团体成员总数',

    `XZDATE` Date DEFAULT now() COMMENT '新增日期',

    `S_EXT_DATATIME` Datetime COMMENT '数据汇总时间'
)
engine = ReplacingMergeTree(S_EXT_DATATIME)
 order by (PRIPID);

方法二:
用MergeTree引擎创建表,按照PRIPID分区,删除重复数据时,按照删除分区来删除。排序键设置成立日期。


读取得到df,读取得到DataFrame格式的数据

from clickhouse_driver import Client
def read_ck():
    client_ck = Client(host='',
                       port='',
                       user='',
                       password='',
                       send_receive_timeout=600,
                       )
    sql = ''
    data = client_ck.query_dataframe(sql)
    return data