为BigQuery写一个特定的PCollection

时间:2022-11-23 15:36:11

Suppose I create two output PCollections as a result of SideOutputs and depending on some condition I want to write only one of them to BigQuery. How to do this?

假设我作为SideOutputs的结果创建了两个输出PCollections,并且根据某些条件,我想只将其中一个写入BigQuery。这个怎么做?

Basically my use case is that I'm trying to make Write_Append and Write_Truncate dynamic. I fetch the information(append/truncate) from a config table that I maintain in BigQuery. So depending on what I have in the config table I must apply Truncate or Append.

基本上我的用例是我正在尝试使Write_Append和Write_Truncate动态化。我从我在BigQuery中维护的配置表中获取信息(append / truncate)。因此,根据配置表中的内容,我必须应用Truncate或Append。

So using SideOutputs I was able to create two PCollections(Append and Truncate respectively) out of which one will be empty. And the one which has all the rows must be written to BigQuery. Is this approach correct?

因此,使用SideOutputs,我能够创建两个PCollections(分别为Append和Truncate),其中一个将为空。并且具有所有行的那个必须写入BigQuery。这种方法是否正确?

The code that i'm using:

我正在使用的代码:

 final TupleTag<TableRow> truncate =
                  new TupleTag<TableRow>(){};
              // Output that contains word lengths.
              final TupleTag<TableRow> append =
                  new TupleTag<TableRow>(){};

              PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){
              @ProcessElement
              public void processElement(ProcessContext c)
              {
                  String value = c.sideInput(configView).get(0).toString();
                  LOG.info("config: "+value);
                  if(value.equals("truncate")){
                      LOG.info("outputting to truncate");
                      c.output(new TableRow().set("color", c.element()));
                  }
                  else
                  {
                      LOG.info("outputting to append");
                      c.output(append,new TableRow().set("color", c.element()));
                  }
                  //c.output(new TableRow().set("color", c.element()));
              }
          }).withSideInputs(configView).withOutputTags(truncate,
                  TupleTagList.of(append)));

              results.get(truncate).apply("truncate",BigQueryIO.writeTableRows()
                        .to("projectid:datasetid.tableid")
                        .withSchema(schema)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

              results.get(append).apply("append",BigQueryIO.writeTableRows()
                        .to("projectid:datasetid.tableid")
                        .withSchema(schema)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

I need to perform one out of the two. If I do both table is going to get truncated anyways.

我需要执行两个中的一个。如果我这两个表都会被截断。

P.S. I'm using Java SDK (Apache Beam 2.1)

附:我正在使用Java SDK(Apache Beam 2.1)

1 个解决方案

#1


0  

I believe you are right that, if your pipeline includes at all a write to a BigQuery table with WRITE_TRUNCATE, currently the table will get truncated even if there's no data. Feel free to file a JIRA to support more configurable behavior in this case.

我相信你是对的,如果你的管道完全包含用WRITE_TRUNCATE写入BigQuery表,那么即使没有数据,表也会被截断。在这种情况下,请随意提交JIRA以支持更多可配置的行为。

So if you want it to conditionally not get truncated, you need to conditionally not include that write transform at all. Is there a way to push the condition to that level, or does the condition actually have to be computed from other data in the pipeline?

因此,如果您希望它有条件地不被截断,您需要有条件地不包括该写入转换。有没有办法将条件推送到该级别,还是实际上必须从管道中的其他数据计算条件?

(the only workaround I can think of is to use DynamicDestinations to dynamically choose the name of the table to truncate, and truncate some other dummy empty table instead - I can elaborate on this more after your answer to the previous paragraph)

(我能想到的唯一解决方法是使用DynamicDestinations动态选择要截断的表的名称,并截断一些其他虚拟空表 - 我可以在回答前一段后再详细说明)

#1


0  

I believe you are right that, if your pipeline includes at all a write to a BigQuery table with WRITE_TRUNCATE, currently the table will get truncated even if there's no data. Feel free to file a JIRA to support more configurable behavior in this case.

我相信你是对的,如果你的管道完全包含用WRITE_TRUNCATE写入BigQuery表,那么即使没有数据,表也会被截断。在这种情况下,请随意提交JIRA以支持更多可配置的行为。

So if you want it to conditionally not get truncated, you need to conditionally not include that write transform at all. Is there a way to push the condition to that level, or does the condition actually have to be computed from other data in the pipeline?

因此,如果您希望它有条件地不被截断,您需要有条件地不包括该写入转换。有没有办法将条件推送到该级别,还是实际上必须从管道中的其他数据计算条件?

(the only workaround I can think of is to use DynamicDestinations to dynamically choose the name of the table to truncate, and truncate some other dummy empty table instead - I can elaborate on this more after your answer to the previous paragraph)

(我能想到的唯一解决方法是使用DynamicDestinations动态选择要截断的表的名称,并截断一些其他虚拟空表 - 我可以在回答前一段后再详细说明)