apache beam动态目标自定义目标数据类型

时间:2021-12-01 14:18:45

I am using AvroIO.writeCustomTypeToGenericRecords to write messages base on type of event message. The DestinationT is custom bean class, that implemenets serializable, I am getting following error while running the code:

我正在使用AvroIO.writeCustomTypeToGenericRecords根据事件消息的类型编写消息。 DestinationT是自定义bean类,实现可序列化,我在运行代码时遇到以下错误:

java.lang.RuntimeException: org.apache.beam.sdk.coders.Coder$NonDeterministicException: org.apache.beam.sdk.coders.SerializableCoder@5d436f5b is not deterministic because: Java Serialization may be non-deterministic.

java.lang.RuntimeException:org.apache.beam.sdk.coders.Coder $ NonDeterministicException:org.apache.beam.sdk.coders.SerializableCoder@5d436f5b不确定,因为:Java序列化可能是不确定的。

Looks like I have to create coder for this custom bean class.

看起来我必须为这个自定义bean类创建编码器。

1 个解决方案

#1


0  

I could able to solve this problem by creating Bean class for example

我可以通过创建Bean类来解决这个问题

public class TestBean{

  private String field1;

  public TestBean(){

  }

  public TestBean(String field1){
     this.field1=field1
  }

  //getter and setter methods for each property

}

Then added coder for the same by

然后添加编码器

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * this is coder for TestBean
 */
public class TestBeanCoder
        extends AtomicCoder<TestBean> {

  //singleton
  private static final TestBeanCoder INSTANCE = new TestBeanCoder();

  private static final TypeDescriptor<TestBean> TYPE_DESCRIPTOR =
          new TypeDescriptor<TestBean>() {
          };

  private final ObjectMapper MAPPER = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);


  /**
   * singleton, it is require to be of()
   *
   * @return
   */
  public static TestBeanCoder of() {
    return INSTANCE;
  }

  /**
   * private construction
   */
  private void TestBeanCoder() {
  }

  /**
   * encode TestBean
   *
   * @param value
   * @param outStream
   * @throws IOException
   * @throws CoderException
   */
  public void encode(TestBean value, OutputStream outStream)
          throws IOException, CoderException {
    if (value == null) {
      throw new CoderException("cannot encode a null ByteString");
    }
    String strValue = MAPPER.writeValueAsString(value);
    StringUtf8Coder.of().encode(strValue, outStream);
  }

  /**
   * decode input stream
   *
   * @param inStream
   * @return
   * @throws IOException
   */
  public TestBean decode(InputStream inStream) throws IOException {
    String strValue = StringUtf8Coder.of().decode(inStream);
    return MAPPER.readValue(strValue, TestBean.class);
  }


  @Override
  public void verifyDeterministic() {

  }


  @Override
  public boolean consistentWithEquals() {
    return true;
  }


  @Override
  public boolean isRegisterByteSizeObserverCheap(TestBean value) {
    return true;
  }

  @Override
  public TypeDescriptor<TestBean> getEncodedTypeDescriptor() {
    return TYPE_DESCRIPTOR;
  }
}

register same coder with pipeline

用管道注册同一个编码器

CoderProviders.fromStaticMethods(classOf[TestBean], classOf[TestBeanCoder])
pipeline.getCoderRegistry.registerCoderProvider(coder)

Similarly can be done via scala case classes as well.

同样可以通过scala案例类来完成。

#1


0  

I could able to solve this problem by creating Bean class for example

我可以通过创建Bean类来解决这个问题

public class TestBean{

  private String field1;

  public TestBean(){

  }

  public TestBean(String field1){
     this.field1=field1
  }

  //getter and setter methods for each property

}

Then added coder for the same by

然后添加编码器

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * this is coder for TestBean
 */
public class TestBeanCoder
        extends AtomicCoder<TestBean> {

  //singleton
  private static final TestBeanCoder INSTANCE = new TestBeanCoder();

  private static final TypeDescriptor<TestBean> TYPE_DESCRIPTOR =
          new TypeDescriptor<TestBean>() {
          };

  private final ObjectMapper MAPPER = new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);


  /**
   * singleton, it is require to be of()
   *
   * @return
   */
  public static TestBeanCoder of() {
    return INSTANCE;
  }

  /**
   * private construction
   */
  private void TestBeanCoder() {
  }

  /**
   * encode TestBean
   *
   * @param value
   * @param outStream
   * @throws IOException
   * @throws CoderException
   */
  public void encode(TestBean value, OutputStream outStream)
          throws IOException, CoderException {
    if (value == null) {
      throw new CoderException("cannot encode a null ByteString");
    }
    String strValue = MAPPER.writeValueAsString(value);
    StringUtf8Coder.of().encode(strValue, outStream);
  }

  /**
   * decode input stream
   *
   * @param inStream
   * @return
   * @throws IOException
   */
  public TestBean decode(InputStream inStream) throws IOException {
    String strValue = StringUtf8Coder.of().decode(inStream);
    return MAPPER.readValue(strValue, TestBean.class);
  }


  @Override
  public void verifyDeterministic() {

  }


  @Override
  public boolean consistentWithEquals() {
    return true;
  }


  @Override
  public boolean isRegisterByteSizeObserverCheap(TestBean value) {
    return true;
  }

  @Override
  public TypeDescriptor<TestBean> getEncodedTypeDescriptor() {
    return TYPE_DESCRIPTOR;
  }
}

register same coder with pipeline

用管道注册同一个编码器

CoderProviders.fromStaticMethods(classOf[TestBean], classOf[TestBeanCoder])
pipeline.getCoderRegistry.registerCoderProvider(coder)

Similarly can be done via scala case classes as well.

同样可以通过scala案例类来完成。