pipelinedb continuous view 操作

时间:2023-03-09 15:56:32
pipelinedb continuous view 操作

continuous view 是 pipelinedb的核心,类似一个view,但是数据是合并了stream以及table的数据输入数据,并且是
实时根据输入数据进行更新的

语法

CREATE CONTINUOUS VIEW name AS query

query是一个pg 的select 格式的语法,格式如下:

SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ]
expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ]
[ WINDOW window_name AS ( window_definition ) [, ...] ] where from_item can be one of: stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]

环境准备

项目使用docker运行同时结合hasura graphql 引擎

  • docker-compose
version: '3.6'
services:
postgres:
image: pipelinedb/pipelinedb
ports:
- "5432:5432"
graphql-engine:
image: hasura/graphql-engine:v1.0.0-alpha06
ports:
- "8080:8080"
depends_on:
- "postgres"
command: >
/bin/sh -c "
graphql-engine --database-url postgres://pipeline:pipeline@postgres:5432/pipeline serve --enable-console;
"

数据源从基本数据tabley以及stream 获取(比较综合的例子)

  • 创建基本表
CREATE TABLE userlogin (
id SERIAL PRIMARY KEY,
username text NOT NULL,
userid integer NOT NULL,
usertype text NOT NULL,
logintype text NOT NULL
); ```
* 创建stream:
```code
CREATE STREAM loginlogs (logintype text, userid integer);
  • 创建continuous view
CREATE CONTINUOUS VIEW userloginview AS 

select a.logintype,b.username,b.userid, b.logintype as logintype_ from loginlogs a join  userlogin b

on a.userid=b.userid
  • 插入数据&&查询
    pipelinedb continuous view 操作
insert into  loginlogs(logintype,userid) values ('mobile',333),('pc',333),('web',333)
select * from userloginview

pipelinedb continuous view 操作

  • graphql 集成
    pipelinedb continuous view 操作
  • graphql 查询
    pipelinedb continuous view 操作
  • 说明
    实际使用中我们的view一般都是一个聚合函数的操作,比如统计状态,异常信息排查,同时view 支持ttl 可以支持有效期控制

官方提供的一个比较有意思的demo

  • 延迟百分比 90 95 99 延迟占比
CREATE CONTINUOUS VIEW latency AS
SELECT percentile_cont(array[90, 95, 99]) WITHIN GROUP (ORDER BY latency)
FROM latency_stream;
  • 最新5分钟广告的曝光
CREATE CONTINUOUS VIEW imps AS
SELECT COUNT(*) FROM imps_stream
WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');

参考资料

http://docs.pipelinedb.com/continuous-views.html