将BigQuery表流式传输到Google Pub / Sub

时间:2021-12-22 14:29:25

I have a Google bigQuery Table and I want to stream the entire table into pub-sub Topic

我有一个Google bigQuery表,我想将整个表流式传输到pub-sub主题

what should be the easy/fast way to do it?

什么应该是简单/快速的方式来做到这一点?

Thank you in advance,

先谢谢你,

2 个解决方案

#1


2  

That really depends on the size of the table.

这实际上取决于桌子的大小。

If it's a small table (a few thousand records, a couple doze columns) then you could setup a process to query the entire table, convert the response into a JSON array, and push to pub-sub.

如果它是一个小表(几千个记录,几个打盹列),那么你可以设置一个进程来查询整个表,将响应转换为JSON数组,然后推送到pub-sub。

If it's a big table (millions/billions of records, hundreds of columns) you'd have to export to file, and then prepare/ship to pub-sub

如果它是一个大表(数百万/十亿条记录,数百列),你必须导出到文件,然后准备/运送到pub-sub

It also depends on your partitioning policy - if your tables are set up to partition by date you might be able to, again, query instead of export.

它还取决于您的分区策略 - 如果您的表设置为按日期分区,您可以再次查询而不是导出。

Last but not least, it also depends on the frequency - is this a one time deal (then export) or a continuous process (then use table decorators to query only the latest data)?

最后但并非最不重要的是,它还取决于频率 - 这是一次性交易(然后是导出)还是连续过程(然后使用表装饰器仅查询最新数据)?

Need some more information if you want a truly helpful answer.

如果您想要一个真正有用的答案,需要更多信息。

Edit

编辑

Based on your comments for the size of the table, I think the best way would be to have a script that would:

根据您对表格大小的评论,我认为最好的方法是拥有一个脚本:

  1. Export the table to GCS as newline delimited JSON

    将表导出为GCS作为换行符分隔的JSON

  2. Process the file (read line by line) and send to pub-sub

    处理文件(逐行读取)并发送到pub-sub

There are client libraries for most programming languages. I've done similar things with Python, and it's fairly straight forward.

大多数编程语言都有客户端库。我用Python做过类似的事情,而且相当直接。

#2


4  

The easiest way I know of is going through Google Cloud Dataflow, which natively knows how to access BigQuery and Pub/Sub.

我所知道的最简单的方法是通过Google Cloud Dataflow,它本身知道如何访问BigQuery和Pub / Sub。

In theory it should be as easy as the following Python lines:

从理论上讲,它应该像以下Python行一样简单:

p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
  beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
  beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))

This combination of Python/Dataflow/BigQuery/PubSub doesn't work today (Python Dataflow is in beta, but keep an eye on the changelog).

Python / Dataflow / BigQuery / PubSub的这种组合在今天不起作用(Python Dataflow处于测试阶段,但密切关注更改日志)。

We can do the same with Java, and it works well - I just tested it. It runs either locally, and also in the hosted Dataflow runner:

我们可以用Java做同样的事情,它运行良好 - 我只是测试它。它既可以在本地运行,也可以在托管的Dataflow运行器中运行:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
    @Override
    public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
        c.output(c.element().toString());
    }
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));

p.run();

Test if the messages are there with:

测试消息是否存在:

gcloud --project myproject beta pubsub subscriptions  pull --auto-ack sub1

Hosted Dataflow screenshot:

托管数据流截图:

将BigQuery表流式传输到Google Pub / Sub

#1


2  

That really depends on the size of the table.

这实际上取决于桌子的大小。

If it's a small table (a few thousand records, a couple doze columns) then you could setup a process to query the entire table, convert the response into a JSON array, and push to pub-sub.

如果它是一个小表(几千个记录,几个打盹列),那么你可以设置一个进程来查询整个表,将响应转换为JSON数组,然后推送到pub-sub。

If it's a big table (millions/billions of records, hundreds of columns) you'd have to export to file, and then prepare/ship to pub-sub

如果它是一个大表(数百万/十亿条记录,数百列),你必须导出到文件,然后准备/运送到pub-sub

It also depends on your partitioning policy - if your tables are set up to partition by date you might be able to, again, query instead of export.

它还取决于您的分区策略 - 如果您的表设置为按日期分区,您可以再次查询而不是导出。

Last but not least, it also depends on the frequency - is this a one time deal (then export) or a continuous process (then use table decorators to query only the latest data)?

最后但并非最不重要的是,它还取决于频率 - 这是一次性交易(然后是导出)还是连续过程(然后使用表装饰器仅查询最新数据)?

Need some more information if you want a truly helpful answer.

如果您想要一个真正有用的答案,需要更多信息。

Edit

编辑

Based on your comments for the size of the table, I think the best way would be to have a script that would:

根据您对表格大小的评论,我认为最好的方法是拥有一个脚本:

  1. Export the table to GCS as newline delimited JSON

    将表导出为GCS作为换行符分隔的JSON

  2. Process the file (read line by line) and send to pub-sub

    处理文件(逐行读取)并发送到pub-sub

There are client libraries for most programming languages. I've done similar things with Python, and it's fairly straight forward.

大多数编程语言都有客户端库。我用Python做过类似的事情,而且相当直接。

#2


4  

The easiest way I know of is going through Google Cloud Dataflow, which natively knows how to access BigQuery and Pub/Sub.

我所知道的最简单的方法是通过Google Cloud Dataflow,它本身知道如何访问BigQuery和Pub / Sub。

In theory it should be as easy as the following Python lines:

从理论上讲,它应该像以下Python行一样简单:

p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
  beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
  beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))

This combination of Python/Dataflow/BigQuery/PubSub doesn't work today (Python Dataflow is in beta, but keep an eye on the changelog).

Python / Dataflow / BigQuery / PubSub的这种组合在今天不起作用(Python Dataflow处于测试阶段,但密切关注更改日志)。

We can do the same with Java, and it works well - I just tested it. It runs either locally, and also in the hosted Dataflow runner:

我们可以用Java做同样的事情,它运行良好 - 我只是测试它。它既可以在本地运行,也可以在托管的Dataflow运行器中运行:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
    @Override
    public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
        c.output(c.element().toString());
    }
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));

p.run();

Test if the messages are there with:

测试消息是否存在:

gcloud --project myproject beta pubsub subscriptions  pull --auto-ack sub1

Hosted Dataflow screenshot:

托管数据流截图:

将BigQuery表流式传输到Google Pub / Sub