如何从Cassandra增加Dataflow读取并行性

时间:2022-12-03 15:21:23

I am trying to export a lot of data (2 TB, 30kkk rows) from Cassandra to BigQuery. All my infrastructure is on GCP. My Cassandra cluster have 4 nodes (4 vCPUs, 26 GB memory, 2000 GB PD (HDD) each). There is one seed node in the cluster. I need to transform my data before writing to BQ, so I am using Dataflow. Worker type is n1-highmem-2. Workers and Cassandra instances are at the same zone europe-west1-c. My limits for Cassandra:

我试图将大量数据(2 TB,30kkk行)从Cassandra导出到BigQuery。我的所有基础设施都在GCP上。我的Cassandra集群有4个节点(4个vCPU,26 GB内存,每个2000 GB PD(HDD))。集群中有一个种子节点。我需要在写入BQ之前转换我的数据,所以我使用的是Dataflow。工人类型是n1-highmem-2。工人和Cassandra实例位于同一区域europe-west1-c。我对Cassandra的限制:

如何从Cassandra增加Dataflow读取并行性

Part of my pipeline code responsible for reading transform is located here.

我负责读取转换的部分管道代码位于此处。

Autoscaling

The problem is that when I don't set --numWorkers, the autoscaling set number of workers in such manner (2 workers average):

问题是,当我没有设置--numWorkers时,以这种方式自动调整工人数量(平均2名工人):

如何从Cassandra增加Dataflow读取并行性

Load balancing

When I set --numWorkers=15 the rate of reading doesn't increase and only 2 workers communicate with Cassandra (I can tell it from iftop and only these workers have CPU load ~60%).

当我设置--numWorkers = 15时,读取速率不会增加,只有2名工作人员与Cassandra通信(我可以从iftop告诉它,只有这些工作人员的CPU负载大约为60%)。

At the same time Cassandra nodes don't have a lot of load (CPU usage 20-30%). Network and disk usage of the seed node is about 2 times higher than others, but not too high, I think:

同时,Cassandra节点没有很多负载(CPU使用率为20-30%)。种子节点的网络和磁盘使用率比其他节点大约高2倍,但不是太高,我认为:

如何从Cassandra增加Dataflow读取并行性

如何从Cassandra增加Dataflow读取并行性

And for the not seed node here:

对于非种子节点:

如何从Cassandra增加Dataflow读取并行性

如何从Cassandra增加Dataflow读取并行性

Pipeline launch warnings

I have some warnings when pipeline is launching:

管道启动时我有一些警告:

WARNING: Size estimation of the source failed: 
org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details])

My Cassandra cluster is in GCE local network and it seams that some queries are made from my local machine and cannot reach the cluster (I am launching pipeline with Dataflow Eclipse plugin as described here). These queries are about size estimation of tables. Can I specify size estimation by hand or launch pipline from GCE instance? Or can I ignore these warnings? Does it have effect on rate of read?

我的Cassandra集群位于GCE本地网络中,它接收到一些查询是从我的本地计算机进行的,无法访问集群(我正在使用Dataflow Eclipse插件启动管道,如此处所述)。这些查询是关于表的大小估计。我可以手动指定尺寸估算或从GCE实例启动pipline吗?或者我可以忽略这些警告吗?它对阅读率有影响吗?

I'v tried to launch pipeline from GCE VM. There is no more problem with connectivity. I don't have varchar columns in my tables but I get such warnings (no codec in datastax driver [varchar <-> java.lang.Long]). :

我试图从GCE VM启动管道。连接没有问题。我的表中没有varchar列,但是我收到了这样的警告(datastax驱动程序中没有编解码器[varchar < - > java.lang.Long])。 :

WARNING: Can't estimate the size
com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Long]
        at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741)
        at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588)
        at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
        at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467)
        at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69)
        at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135)
        at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Pipeline read code

// Read data from Cassandra table
PCollection<Model> pcollection = p.apply(CassandraIO.<Model>read()
        .withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042)
        .withKeyspace(keyspaceName).withTable(tableName)
        .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Transform pcollection to KV PCollection by rowName
PCollection<KV<Long, Model>> pcollection_by_rowName = pcollection
        .apply(ParDo.of(new DoFn<Model, KV<Long, Model>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                c.output(KV.of(c.element().rowName, c.element()));
            }
        }));

Number of splits (Stackdriver log)

W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 

What I'v tried

No effect:

没有效果:

  1. set read consistency level to ONE
  2. 将读取一致性级别设置为ONE
  3. nodetool setstreamthroughput 1000, nodetool setinterdcstreamthroughput 1000
  4. nodetool setstreamthroughput 1000,nodetool setinterdcstreamthroughput 1000
  5. increase Cassandra read concurrency (in cassandra.yaml): concurrent_reads: 32
  6. 增加Cassandra读取并发性(在cassandra.yaml中):concurrent_reads:32
  7. setting different number of workers 1-40.
  8. 设置不同数量的工人1-40。

Some effect: 1. I'v set numSplits = 10 as @jkff proposed. Now I can see in logs:

一些影响:1。我设置numSplits = 10为@jkff提议。现在我可以在日志中看到:

I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
W  Can't estimate the size 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359 
I  Splitting source [0, 1) produced 1 bundles with total serialized response size 1091 
I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
I  Splitting source [0, 0) produced 0 bundles with total serialized response size 76 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527 

But I'v got another exception:

但我有另一个例外:

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra...
(5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43)
    at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80)
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579)
    ... 14 more
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:144)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

Maybe there is a mistake: CassandraServiceImpl.java#L220

也许有一个错误:CassandraServiceImpl.java#L220

And this statement looks like mistype: CassandraServiceImpl.java#L207

这句话看起来像是错误的:CassandraServiceImpl.java#L207

Changes I'v done to CassandraIO code

As @jkff proposed, I've change CassandraIO in the way I needed:

正如@jkff所提议的那样,我以我需要的方式改变了CassandraIO:

@VisibleForTesting
protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
                                              long desiredBundleSizeBytes,
                                              long estimatedSizeBytes) {
  long numSplits = 1;
  List<BoundedSource<T>> sourceList = new ArrayList<>();
  if (desiredBundleSizeBytes > 0) {
    numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
  }
  if (numSplits <= 0) {
    LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits);
    numSplits = 10;
  }

  LOG.info("Number of splits is {}", numSplits);

  Long startRange = MIN_TOKEN;
  Long endRange = MAX_TOKEN;
  Long startToken, endToken;

  String pk = "$pk";
  switch (spec.table()) {
  case "table1":
          pk = "table1_pk";
          break;
  case "table2":
  case "table3":
          pk = "table23_pk";
          break;
  }

  endToken = startRange;
  Long incrementValue = endRange / numSplits - startRange / numSplits;
  String splitQuery;
  if (numSplits == 1) {
    // we have an unique split
    splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
    sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
  } else {
    // we have more than one split
    for (int i = 0; i < numSplits; i++) {
      startToken = endToken;
      endToken = startToken + incrementValue;
      Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
      if (i > 0) {
        builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken));
      }
      if (i < (numSplits - 1)) {
        builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken));
      }
      sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
    }
  }
  return sourceList;
}

1 个解决方案

#1


2  

I think this should be classified as a bug in CassandraIO. I filed BEAM-3424. You can try building your own version of Beam with that default of 1 changed to 100 or something like that, while this issue is being fixed.

我认为这应该被归类为CassandraIO中的一个错误。我提交了BEAM-3424。您可以尝试构建自己的Beam版本,默认值1更改为100或类似的东西,而此问题正在修复。

I also filed BEAM-3425 for the bug during size estimation.

我还在尺寸估算期间提交了BEAM-3425的bug。

#1


2  

I think this should be classified as a bug in CassandraIO. I filed BEAM-3424. You can try building your own version of Beam with that default of 1 changed to 100 or something like that, while this issue is being fixed.

我认为这应该被归类为CassandraIO中的一个错误。我提交了BEAM-3424。您可以尝试构建自己的Beam版本,默认值1更改为100或类似的东西,而此问题正在修复。

I also filed BEAM-3425 for the bug during size estimation.

我还在尺寸估算期间提交了BEAM-3425的bug。