建立Stream及视图
pipeline.execute("create stream caesar(name text,info json);")
#创建stream,字段name和info,info为json类型
pipeline.execute("create continuous view caesar_view as select name,info from caesar;")
# 创建源于caesar流的caesar_view视图,包括字段name和info
数据插入
模板:
import psycopg2
from psycopg2.extras import Json
conn = psycopg2.connect("dbname='pipeline' user='caesar' password='123' host='localhost' port=5432")
pipeline = conn.cursor()
caesar = {"name":"*","info":{"age":19,"id":2}}
caesar = {key:Json(value) if isinstance(value,dict) else value for key,value in caesar.items()}
sql = """insert into caesar (name,info) values ('%(name)s',%(info)s) ;""" %(caesar)
pipeline.execute(sql)
conn.commit()
1.使用psycopg2建立数据库连接
2.from psycopg2.extras import Json用以转化字典为json存入数据库
3. {key:Json(value) if isinstance(value,dict) else value for key,value in caesar.items()} 将字典数据转为json
批量插入
#数据list
caesar = [{"name":"ri","info":{"age":20,"id":3}}]
#将list中字典中value为字典的转为json
caesar_list=[]
for ca in caesar:
caesar_list.append({key:Json(value) if isinstance(value,dict) else value for key,value in ca.items()})
sql = """insert into caesar (name, info) values (%(name)s,%(info)s);"""
#批量存入数据库
pipeline.executemany(sql,caesar_list)
数据查询
执行查询语句 select * from caesar_view where name='river';
rows = pipeline.fetchall() 返回数据元组列表,查询插入的json,返回后为字典