使用MySQL作为输入源并写入Google BigQuery

时间:2021-09-18 14:35:22

I have an Apache Beam task that reads from a MySQL source using JDBC and it's supposed to write the data as it is to a BigQuery table. No transformation is performed at this point, that will come later on, for the moment I just want the database output to be directly written into BigQuery.

我有一个Apache Beam任务,它使用JDBC从MySQL源读取,它应该将数据写入BigQuery表。此时不会执行转换,稍后会发生转换,目前我只希望将数据库输出直接写入BigQuery。

This is the main method trying to perform this operation:

这是尝试执行此操作的主要方法:

    public static void main(String[] args) {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

        Pipeline p = Pipeline.create(options);

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("phone").setType("STRING"));
        fields.add(new TableFieldSchema().setName("url").setType("STRING"));
        TableSchema schema = new TableSchema().setFields(fields);

        p.apply(JdbcIO.<KV<String, String>>read()
         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
             "com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name")
             .withUsername("user")
             .withPassword("pass"))
             .withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100")
             .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
                public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
                return KV.of(resultSet.getString(1), resultSet.getString(2));
             }
          })
         .apply(BigQueryIO.Write
            .to(options.getOutput())
            .withSchema(schema)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)));

        p.run();
    }

But when I execute the template using maven, I get the following error:

但是当我使用maven执行模板时,我收到以下错误:

Test.java:[184,6] cannot find symbol symbol: method apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
location: class org.apache.beam.sdk.io.jdbc.JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>

Test.java:[184,6]找不到符号符号:方法apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)location:class org.apache.beam.sdk.io.jdbc。 JdbcIO.Read >

It seems that I'm not passing BigQueryIO.Write the expected data collection and that's what I am struggling with at the moment.

看来我没有传递BigQueryIO.Write预期的数据集合,这就是我目前正在努力的方面。

How can I make the data coming from MySQL meets BigQuery's expectations in this case?

在这种情况下,如何使来自MySQL的数据符合BigQuery的期望?

1 个解决方案

#1


1  

I think that you need to provide a PCollection<TableRow> to BigQueryIO.Write instead of the PCollection<KV<String,String>> type that the RowMapper is outputting.

我认为您需要向BigQueryIO.Write提供PCollection ,而不是RowMapper正在输出的PCollection >类型。

Also, please use the correct column name and value pairs when setting the TableRow. Note: I think that your KVs are the phone and url values (e.g. {"555-555-1234": "http://www.url.com"}), not the column name and value pairs (e.g. {"phone": "555-555-1234", "url": "http://www.url.com"})

另外,请在设置TableRow时使用正确的列名和值对。注意:我认为您的KV是电话和网址值(例如{“555-555-1234”:“http://www.url.com”}),而不是列名和值对(例如{“电话” “:”555-555-1234“,”url“:”http://www.url.com“})

See the example here: https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

请参阅此处的示例:https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

Would you please give this a try and let me know if it works for you? Hope this helps.

请您尝试一下,让我知道它是否适合您?希望这可以帮助。

#1


1  

I think that you need to provide a PCollection<TableRow> to BigQueryIO.Write instead of the PCollection<KV<String,String>> type that the RowMapper is outputting.

我认为您需要向BigQueryIO.Write提供PCollection ,而不是RowMapper正在输出的PCollection >类型。

Also, please use the correct column name and value pairs when setting the TableRow. Note: I think that your KVs are the phone and url values (e.g. {"555-555-1234": "http://www.url.com"}), not the column name and value pairs (e.g. {"phone": "555-555-1234", "url": "http://www.url.com"})

另外,请在设置TableRow时使用正确的列名和值对。注意:我认为您的KV是电话和网址值(例如{“555-555-1234”:“http://www.url.com”}),而不是列名和值对(例如{“电话” “:”555-555-1234“,”url“:”http://www.url.com“})

See the example here: https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

请参阅此处的示例:https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

Would you please give this a try and let me know if it works for you? Hope this helps.

请您尝试一下,让我知道它是否适合您?希望这可以帮助。