
时间:2021-11-12 15:21:52

I have multiple subscriptions from Cloud PubSub to read based on certain prefix pattern using Apache Beam. I extend PTransform class and implement expand() method to read from multiple subscriptions and do Flatten transformation to the PCollectionList (multiple PCollection on from each subscription). I have a problem to pass subscription prefix as ValueProvider into the expand() method, since expand() is called on template creation time, not when launching the job. However, if I only use 1 subscription, I can pass ValueProvider into PubsubIO.readStrings().fromSubscription().

我有多个来自Cloud PubSub的订阅,可以使用Apache Beam基于某些前缀模式进行读取。我扩展了PTransform类并实现了expand()方法来从多个订阅中读取,并将Flatten转换为PCollectionList(每个订阅的多个PCollection)。我有一个问题是将订阅前缀作为ValueProvider传递到expand()方法,因为在模板创建时调用expand(),而不是在启动作业时调用。但是,如果我只使用1个订阅,我可以将ValueProvider传递给PubsubIO.readStrings()。fromSubscription()。

Here's some sample code.


public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {

    private ValueProvider<String> prefixPubsub;

    public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
        this.prefixPubsub = prefixPubsub;

    public PCollection<PubsubMessage> expand(PBegin input) {
        List<String> myList = null;

        try {
            // prefixPubsub.get() will return error
            myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
        } catch (Exception e) {
            LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));

        List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();

        if(myList != null && !myList.isEmpty()){
            for(String subs : myList){
                PCollection<PubsubMessage> pCollection = input
                        .apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));    

            PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
                    .apply("FlattenPcollections", Flatten.pCollections());
            return pubsubMessagePCollection;
        } else {
            LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
            return null;

    public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
        return new MultiPubSubIO(null, prefixPubsub);

So I'm thinking of how to use the same way PubsubIO.read().fromSubscription() to read from ValueProvider. Or am I missing something?


Searched links:


  • extract-value-from-valueprovider-in-apache-beam - Answer talked about using DoFn, while I need PTransform that receives PBegin.
  • extract-value-from-valueprovider-in-apache-beam - 回答谈到使用DoFn,而我需要接收PBegin的PTransform。

1 个解决方案



Unfortunately this is not possible currently:


  • It is not possible for the value of a ValueProvider to affect transform expansion - at expansion time, it is unknown; by the time it is known, the pipeline shape is already fixed.

    ValueProvider的值不可能影响转换扩展 - 在扩展时,它是未知的;到了解时,管道形状已经固定。

  • There is currently no transform like PubsubIO.read() that can accept a PCollection of topic names. Eventually there will be (it is enabled by [http://s.apache.org/splittable-do-fn](Splittable DoFn)), but it will take a while - nobody is working on this currently.

    目前没有像PubsubIO.read()这样的转换可以接受主题名称的PCollection。最终会有(它由[http://s.apache.org/splittable-do-fn](Splittable DoFn)启用),但它需要一段时间 - 目前没有人正在研究这个问题。



Unfortunately this is not possible currently:


  • It is not possible for the value of a ValueProvider to affect transform expansion - at expansion time, it is unknown; by the time it is known, the pipeline shape is already fixed.

    ValueProvider的值不可能影响转换扩展 - 在扩展时,它是未知的;到了解时,管道形状已经固定。

  • There is currently no transform like PubsubIO.read() that can accept a PCollection of topic names. Eventually there will be (it is enabled by [http://s.apache.org/splittable-do-fn](Splittable DoFn)), but it will take a while - nobody is working on this currently.

    目前没有像PubsubIO.read()这样的转换可以接受主题名称的PCollection。最终会有(它由[http://s.apache.org/splittable-do-fn](Splittable DoFn)启用),但它需要一段时间 - 目前没有人正在研究这个问题。