如何通过Dataflow的输出追加标题行?

时间:2022-09-17 11:42:41

I am producing an output CSV file with a Dataflow and I'd like to write a header row and then append all of my output to that. How can I go about doing that?

我正在生成一个带有数据流的输出CSV文件,我想写一个标题行,然后将所有输出附加到该文件。我怎么能这样做呢?

My flow roughly looks like this:

我的流程大致如下:

    PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() {
        private static final long serialVersionUID = 0;

        @Override
        public void processElement(ProcessContext c) {
            // Produce CSV output
        }
    })).apply(TextIO.Write.named("WriteData").to(options.getOutput()));

Thanks!

谢谢!

2 个解决方案

#1


2  

The proper way to do this is to use the Custom Sink API. You can derive from FileBasedSink, FileBasedWriteOperation and FileBasedWriter (for example, you can name your classes respectively CSVSink, CSVWriteOperation and CSVWriter).

正确的方法是使用Custom Sink API。您可以从FileBasedSink,FileBasedWriteOperation和FileBasedWriter派生(例如,您可以分别为CSVSink,CSVWriteOperation和CSVWriter命名您的类)。

The only non-trivial logic will be in CSVWriter. Write the header in its writeHeader() and write the CSV entries in write().

唯一的非平凡逻辑将在CSVWriter中。在其writeHeader()中写入标头,并在write()中写入CSV条目。

Then you can use the sink in the pipeline using the Write.to() transform, in place of TextIO you're currently using.

然后,您可以使用Write.to()转换在管道中使用接收器,代替您当前使用的TextIO。

A good example built into the SDK is the XML sink.

SDK中内置的一个很好的例子是XML接收器。

#2


0  

You can output the header from DoFn#startBundle().

您可以从DoFn #startBundle()输出标头。

#1


2  

The proper way to do this is to use the Custom Sink API. You can derive from FileBasedSink, FileBasedWriteOperation and FileBasedWriter (for example, you can name your classes respectively CSVSink, CSVWriteOperation and CSVWriter).

正确的方法是使用Custom Sink API。您可以从FileBasedSink,FileBasedWriteOperation和FileBasedWriter派生(例如,您可以分别为CSVSink,CSVWriteOperation和CSVWriter命名您的类)。

The only non-trivial logic will be in CSVWriter. Write the header in its writeHeader() and write the CSV entries in write().

唯一的非平凡逻辑将在CSVWriter中。在其writeHeader()中写入标头,并在write()中写入CSV条目。

Then you can use the sink in the pipeline using the Write.to() transform, in place of TextIO you're currently using.

然后,您可以使用Write.to()转换在管道中使用接收器,代替您当前使用的TextIO。

A good example built into the SDK is the XML sink.

SDK中内置的一个很好的例子是XML接收器。

#2


0  

You can output the header from DoFn#startBundle().

您可以从DoFn #startBundle()输出标头。