使用数据流JdbcIO api写入云sql

时间:2022-01-30 15:33:40

I have a requirement where i have to write a PCollection of String to a Cloud SQL using Cloud Dataflow API.

我有一个要求,我必须使用Cloud Dataflow API将一个字符串的PCollection写入Cloud SQL。

pipeline.apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
    .apply(JdbcIO.write()
    .withDataSourceConfiguration(DataSourceConfiguration
    .create("org.postgresql.Driver", "jdbc:postgresql://***:5432/test")
    .withUsername("**").withPassword("password10"))
    .withStatement("insert into person values(?,?)")
    .withPreparedStatementSetter(
 new JdbcIO.PreparedStatementSetter < Object > () {
  /**
   * 
   */
  private static final long serialVersionUID = 1 L;

  @Override
  public void setParameters(Object arg0, PreparedStatement query)
  throws Exception {
   // TODO Auto-generated method stub
   query.setString(1, "Hello");
   query.setString(1, "Hi");
  }
 }));

This is the sample code I am trying. A very simple version of what I want to do.

这是我正在尝试的示例代码。我想做的一个非常简单的版本。

Also, is it feasible to write into Cloud SQL from Dataflow using a parDo and writing simple insert statements?

另外,使用parDo从Dataflow写入Cloud SQL并编写简单的insert语句是否可行?

1 个解决方案

#1


1  

The previous transform outputs a PCollection<String>, so you need to specify that was the input type to the JdbcIO<T>.write()

上一个转换输出PCollection ,因此您需要指定它是JdbcIO .write()的输入类型

Something like this:

像这样的东西:

    pipeline
        .apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
        .apply(JdbcIO.<String>write().withDataSourceConfiguration(
            DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://***:5432/test")
                .withUsername("**")
                .withPassword("password10"))
                .withStatement("insert into person values(?,?)")
                    .withPreparedStatementSetter((element, query) -> {
                        query.setInt(1, 1);
                        query.setString(2, "Hello");
                    })
        );

#1


1  

The previous transform outputs a PCollection<String>, so you need to specify that was the input type to the JdbcIO<T>.write()

上一个转换输出PCollection ,因此您需要指定它是JdbcIO .write()的输入类型

Something like this:

像这样的东西:

    pipeline
        .apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
        .apply(JdbcIO.<String>write().withDataSourceConfiguration(
            DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://***:5432/test")
                .withUsername("**")
                .withPassword("password10"))
                .withStatement("insert into person values(?,?)")
                    .withPreparedStatementSetter((element, query) -> {
                        query.setInt(1, 1);
                        query.setString(2, "Hello");
                    })
        );