使用ValueProvider从多个Pubsub订阅中读取

时间:2021-11-12 15:21:52

I have multiple subscriptions from Cloud PubSub to read based on certain prefix pattern using Apache Beam. I extend PTransform class and implement expand() method to read from multiple subscriptions and do Flatten transformation to the PCollectionList (multiple PCollection on from each subscription). I have a problem to pass subscription prefix as ValueProvider into the expand() method, since expand() is called on template creation time, not when launching the job. However, if I only use 1 subscription, I can pass ValueProvider into PubsubIO.readStrings().fromSubscription().

我有多个来自Cloud PubSub的订阅,可以使用Apache Beam基于某些前缀模式进行读取。我扩展了PTransform类并实现了expand()方法来从多个订阅中读取,并将Flatten转换为PCollectionList(每个订阅的多个PCollection)。我有一个问题是将订阅前缀作为ValueProvider传递到expand()方法,因为在模板创建时调用expand(),而不是在启动作业时调用。但是,如果我只使用1个订阅,我可以将ValueProvider传递给PubsubIO.readStrings()。fromSubscription()。

Here's some sample code.

这是一些示例代码。

public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {

    private ValueProvider<String> prefixPubsub;

    public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
        super(name);
        this.prefixPubsub = prefixPubsub;
    }

    @Override
    public PCollection<PubsubMessage> expand(PBegin input) {
        List<String> myList = null;

        try {
            // prefixPubsub.get() will return error
            myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
        } catch (Exception e) {
            LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));
        }

        List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();

        if(myList != null && !myList.isEmpty()){
            for(String subs : myList){
                PCollection<PubsubMessage> pCollection = input
                        .apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));    
                collectionList.add(pCollection);
            }

            PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
                    .apply("FlattenPcollections", Flatten.pCollections());
            return pubsubMessagePCollection;
        } else {
            LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
            return null;
        }
    }

    public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
        return new MultiPubSubIO(null, prefixPubsub);
    }
}

So I'm thinking of how to use the same way PubsubIO.read().fromSubscription() to read from ValueProvider. Or am I missing something?

所以我在考虑如何使用PubsubIO.read()。fromSubscription()从ValueProvider中读取相同的方法。或者我错过了什么?

Searched links:

搜索链接:

  • extract-value-from-valueprovider-in-apache-beam - Answer talked about using DoFn, while I need PTransform that receives PBegin.
  • extract-value-from-valueprovider-in-apache-beam - 回答谈到使用DoFn,而我需要接收PBegin的PTransform。

1 个解决方案

#1


1  

Unfortunately this is not possible currently:

不幸的是,目前这是不可能的:

  • It is not possible for the value of a ValueProvider to affect transform expansion - at expansion time, it is unknown; by the time it is known, the pipeline shape is already fixed.

    ValueProvider的值不可能影响转换扩展 - 在扩展时,它是未知的;到了解时,管道形状已经固定。

  • There is currently no transform like PubsubIO.read() that can accept a PCollection of topic names. Eventually there will be (it is enabled by [http://s.apache.org/splittable-do-fn](Splittable DoFn)), but it will take a while - nobody is working on this currently.

    目前没有像PubsubIO.read()这样的转换可以接受主题名称的PCollection。最终会有(它由[http://s.apache.org/splittable-do-fn](Splittable DoFn)启用),但它需要一段时间 - 目前没有人正在研究这个问题。

#1


1  

Unfortunately this is not possible currently:

不幸的是,目前这是不可能的:

  • It is not possible for the value of a ValueProvider to affect transform expansion - at expansion time, it is unknown; by the time it is known, the pipeline shape is already fixed.

    ValueProvider的值不可能影响转换扩展 - 在扩展时,它是未知的;到了解时,管道形状已经固定。

  • There is currently no transform like PubsubIO.read() that can accept a PCollection of topic names. Eventually there will be (it is enabled by [http://s.apache.org/splittable-do-fn](Splittable DoFn)), but it will take a while - nobody is working on this currently.

    目前没有像PubsubIO.read()这样的转换可以接受主题名称的PCollection。最终会有(它由[http://s.apache.org/splittable-do-fn](Splittable DoFn)启用),但它需要一段时间 - 目前没有人正在研究这个问题。