Google Cloud Dataflow BigQueryIO.Read空指针错误

时间:2021-12-22 14:28:55

I have a streaming job in which I'm listening to message from PubSub and after that reading the data from BigQuery. Data has is queried using the data received from PubSUb. This means I need to form the query dynamically and then pass it to the BigQueryIO.Read.fromQuery() function. Below is the code which is going to read data from the BigQuery and return a TableRow, but it is giving me NullPointerException where my code is executing data to read.

我有一个流媒体工作,我正在收听来自PubSub的消息,然后从BigQuery读取数据。使用从PubSUb收到的数据查询数据。这意味着我需要动态地形成查询,然后将其传递给BigQueryIO.Read.fromQuery()函数。下面是将从BigQuery读取数据并返回TableRow的代码,但是它给了我NullPointerException,其中我的代码正在执行要读取的数据。

public class RequestDailyUsageTransform extends PTransform<PCollection<DailyUsageJob>, PCollection<TableRow>> {

    private String mQuery;

    private String mForDate;
    private LocalDateTime billingDateTime;

    @Override
    public PCollection<TableRow> apply(PCollection<DailyUsageJob> input) {

        TableReference tableReference = getRequestTableReference();

        return input
                .apply(ParDo.named("RequestUsageQuery")
                        .of(new RequestUsageQueryStringDoFn()))
                .apply(BigQueryIO.Read.named("RequestUsageReader")
                        .fromQuery(mQuery)
                        .from(tableReference).withoutValidation())
                .apply(ParDo.named("DailyRequestMapper").of(new DailyRequestMapperDoFn()))
                .apply(ParDo.named("BillDailyRequestUsage")
                        .of(new DailyRequestsBillDoFn(mForDate, billingDateTime)));
    }}

I also wanted to know how to pass the string which was generated in a DoFn in BigQueryIO.Read.fromQuery() function.

我还想知道如何传递在BigQueryIO.Read.fromQuery()函数中的DoFn中生成的字符串。

1 个解决方案

#1


0  

I think in the case the best thing to do would be to run a daily batch job that queries all the data, and is keyed by the userid. This will pull in more data than you would like, but allow you to locate the per user information. Unfortuately there is not currently a way do perform data dependent reads

我认为在这种情况下,最好的办法是运行查询所有数据的每日批处理作业,并由userid键入。这将提取比您想要的更多的数据,但允许您找到每用户信息。遗憾的是,目前没有办法执行数据相关读取

#1


0  

I think in the case the best thing to do would be to run a daily batch job that queries all the data, and is keyed by the userid. This will pull in more data than you would like, but allow you to locate the per user information. Unfortuately there is not currently a way do perform data dependent reads

我认为在这种情况下,最好的办法是运行查询所有数据的每日批处理作业,并由userid键入。这将提取比您想要的更多的数据,但允许您找到每用户信息。遗憾的是,目前没有办法执行数据相关读取