Google Cloud DataFlow PubSubIO不会从完整主题中读取

时间:2022-02-18 14:37:14

I'm trying to run a pipeline in Google Cloud DataFlow, in "Streaming" mode. The pipeline should read from a PubSub topic, however it doesn't actually read from the topic until I delete it, re-create it and re-publish all the messages to the topic AFTER the pipeline started.

我正试图在“流式”模式下在Google Cloud DataFlow中运行管道。管道应该从PubSub主题中读取,但是在我删除它之前它实际上并没有从主题读取,重新创建它并在管道启动之后将所有消息重新发布到主题。

Is there any way to make the pipeline read already-published messages?

有没有办法让管道读取已发布的消息?

2 个解决方案

#1


1  

Please create a custom subscription in pub sub using cloud console. In the code try something like this.

请使用云控制台在pub sub中创建自定义订阅。在代码中尝试这样的事情。

 PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device iot data from PubSub")

            .subscription(String.format("projects/%s/subscriptions/%s",<ProjectId>,<Subscriptionname>))

            .timestampLabel("ts")
            .withCoder(TableRowJsonCoder.of()));

Please note when you subscribe , you can subscribe to either a topic or subscription name.

请注意,订阅时,您可以订阅主题或订阅名称。

In the above code i subscribed to subscription which i created explicitly in pub sub console. The advantage of going for explicit subscription is that, it stores data pulled from pub sub even when your data flow code is offline.So data wont be lost.

在上面的代码中,我订阅了我在pub子控制台中明确创建的订阅。进行显式订阅的优点是,即使您的数据流代码处于脱机状态,它也会存储从pub sub中提取的数据。因此数据不会丢失。

#2


1  

It sounds like supplying a Pub/Sub subscription (more details in the Pub/Sub I/O documentation) would solve your problem. Messages will be buffered after the subscription creation, allowing these to be read when the pipeline starts.

听起来像提供Pub / Sub订阅(Pub / Sub I / O文档中的更多细节)可以解决您的问题。创建订阅后将缓冲消息,允许在管道启动时读取这些消息。

#1


1  

Please create a custom subscription in pub sub using cloud console. In the code try something like this.

请使用云控制台在pub sub中创建自定义订阅。在代码中尝试这样的事情。

 PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device iot data from PubSub")

            .subscription(String.format("projects/%s/subscriptions/%s",<ProjectId>,<Subscriptionname>))

            .timestampLabel("ts")
            .withCoder(TableRowJsonCoder.of()));

Please note when you subscribe , you can subscribe to either a topic or subscription name.

请注意,订阅时,您可以订阅主题或订阅名称。

In the above code i subscribed to subscription which i created explicitly in pub sub console. The advantage of going for explicit subscription is that, it stores data pulled from pub sub even when your data flow code is offline.So data wont be lost.

在上面的代码中,我订阅了我在pub子控制台中明确创建的订阅。进行显式订阅的优点是,即使您的数据流代码处于脱机状态,它也会存储从pub sub中提取的数据。因此数据不会丢失。

#2


1  

It sounds like supplying a Pub/Sub subscription (more details in the Pub/Sub I/O documentation) would solve your problem. Messages will be buffered after the subscription creation, allowing these to be read when the pipeline starts.

听起来像提供Pub / Sub订阅(Pub / Sub I / O文档中的更多细节)可以解决您的问题。创建订阅后将缓冲消息,允许在管道启动时读取这些消息。