是否有可能以线程安全的方式安全地递增BigInteger,比如使用AtomicReference、w/o锁定?

时间:2022-06-27 16:52:09

A lot of our code is legacy but we are moving to a "Big Data" back-end and I'm trying to evangelize the newer API calls, encourage the use of the latest Spring libraries etc. One of our problems is application layer ID generation. For reasons I don't understand, a higher authority wants sequential BigInteger's. I would have made them random with re-generate and re-try on failed insertions but I done got vetoed.

我们的许多代码都是遗留的,但是我们正在转向“大数据”后端,我正在尝试推广更新的API调用,鼓励使用最新的Spring库等等。我们的问题之一是应用层ID生成。出于我不理解的原因,更高的权限需要顺序大整数。我本来可以让它们随机地重新生成并重试失败的插入,但我做了,被否决了。

Grumbling aside, I'm in a position where I need to increment and get a BigInteger across threads and do it in a safe and performant manner. I've never used AtomicReference before but it looks pretty close to perfect for this application. Right now we have a synchronized code block which hurts our performance pretty badly.

除了抱怨之外,我还需要在线程之间增加和获得一个BigInteger,并以安全、高效的方式进行。我以前从未使用过AtomicReference,但是它看起来非常适合这个应用程序。现在我们有了一个同步的代码块,这会严重影响我们的性能。

Is this the right way to go? Syntax examples?

这条路对吗?语法的例子吗?

I should mention that the way this module works, it hits the database using a Stored Procedure to grab a range of values to use. Tens of thousands at a time so that it only happens maybe once in 20 minutes. This keeps the various servers from stepping on each-other but it also adds the wrinkle of having to set the BigInteger to an arbitrarily subsequent value. Of course, that needs to be thread safe also.

我应该提到这个模块的工作方式,它使用存储过程访问数据库,以获取使用的一系列值。成千上万的人每次只会在20分钟内发生一次。这样可以防止各种服务器之间的交互,但同时也增加了将BigInteger设置为任意后续值的问题。当然,这也需要线程安全。

P.S. I still think my random generation idea is better than handling all this threading stuff. A BigInteger is a ridiculously large number and the odds of ever generating the same one twice have to be close to nil.

另外,我仍然认为我随机生成的想法比处理所有这些线程的东西要好。BigInteger是一个非常大的数字,并且两次生成相同的数字的几率必须接近nil。

3 个解决方案

#1


12  

It is possible using AtomicReference here's a quick draft :

可以使用AtomicReference:

public final class AtomicBigInteger {

    private final AtomicReference<BigInteger> valueHolder = new AtomicReference<>();

    public AtomicBigInteger(BigInteger bigInteger) {
        valueHolder.set(bigInteger);
    }

    public BigInteger incrementAndGet() {
        for (; ; ) {
            BigInteger current = valueHolder.get();
            BigInteger next = current.add(BigInteger.ONE);
            if (valueHolder.compareAndSet(current, next)) {
                return next;
            }
        }
    }
}

It is basically a copy of the AtomicLong code for incrementAndGet()

它基本上是AtomicLong代码的一个副本,用于增量和get ()

#2


4  

This becomes more manageable and easier to understand using the accumulateAndGet or getAndAccumulate introduced in Java 8. These allow you to atomically update the value by supplying an accumulator function that sets the value to the result of the function, and also either returns the previous or calculated result depending on what you need. Here is an example of what that class might look like, followed by a simple example I wrote up that uses it:

使用Java 8中引入的accumulateAndGet或getandaccumul,这会变得更加易于管理和易于理解。通过提供一个将值设置为函数结果的累加器函数,并根据需要返回先前的或计算的结果,这些函数允许您自动更新值。下面是这个类的示例,后面是我编写的一个使用它的简单示例:

import java.math.BigInteger;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

public final class AtomicBigInteger {

  private final AtomicReference<BigInteger> bigInteger;

  public AtomicBigInteger(final BigInteger bigInteger) {
    this.bigInteger = new AtomicReference<>(Objects.requireNonNull(bigInteger));
  }

  // Method references left out for demonstration purposes
  public BigInteger incrementAndGet() {
    return bigInteger.accumulateAndGet(BigInteger.ONE, (previous, x) -> previous.add(x));
  }

  public BigInteger getAndIncrement() {
    return bigInteger.getAndAccumulate(BigInteger.ONE, (previous, x) -> previous.add(x));
  }

  public BigInteger get() {
    return bigInteger.get();
  }
}

An example using it:

一个例子使用它:

import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ABIExample {

  private static final int AVAILABLE_PROCS = Runtime.getRuntime().availableProcessors();
  private static final int INCREMENT_AMOUNT = 2_500_000;
  private static final int TASK_AMOUNT = AVAILABLE_PROCS * 2;
  private static final BigInteger EXPECTED_VALUE = BigInteger.valueOf(INCREMENT_AMOUNT)
                                                             .multiply(BigInteger
                                                                           .valueOf(TASK_AMOUNT));

  public static void main(String[] args)
      throws InterruptedException, ExecutionException {
    System.out.println("Available processors: " + AVAILABLE_PROCS);


    final ExecutorService executorService = Executors
        .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    final AtomicBigInteger atomicBigInteger = new AtomicBigInteger(BigInteger.ZERO);

    final List<Callable<Void>> incrementTasks =  IntStream.rangeClosed(1, TASK_AMOUNT)
             .mapToObj(i -> incrementTask(i, atomicBigInteger))
             .collect(Collectors.toList());
    final List<Future<Void>> futures = executorService.invokeAll(incrementTasks);
    for (Future<Void> future : futures) {
      future.get();
    }
    executorService.shutdown();
    executorService.awaitTermination(30, TimeUnit.SECONDS);
    System.out.println("Final value: " + atomicBigInteger.get());
    final boolean areEqual = EXPECTED_VALUE.equals(atomicBigInteger.get());
    System.out.println("Does final value equal expected? - " + areEqual);
  }

  private static Callable<Void> incrementTask(
      final int taskNumber,
      final AtomicBigInteger atomicBigInteger
  ) {
    return () -> {
      for (int increment = 0; increment < INCREMENT_AMOUNT; increment++) {
        atomicBigInteger.incrementAndGet();
      }
      System.out.println("Task #" + taskNumber + " Completed");
      return null;
    };

  }
}

And an output from running the example on my machine:

在我的机器上运行这个例子的输出:

Available processors: 8
Task #3 Completed
Task #8 Completed
Task #7 Completed
Task #6 Completed
Task #5 Completed
Task #2 Completed
Task #4 Completed
Task #1 Completed
Task #9 Completed
Task #10 Completed
Task #11 Completed
Task #13 Completed
Task #16 Completed
Task #12 Completed
Task #14 Completed
Task #15 Completed
Final value: 80000000
Does final value equal expected? - true

#3


0  

This basically tries over and over until the operation was atomic.

这基本上是反复尝试,直到操作是原子的。

Personally I don't like this code as theoretically it could lead to thread starvation (although the person who showed it to me claims it never happens)

就我个人而言,我不喜欢这段代码,因为从理论上讲,它可能会导致线程不足(尽管向我展示它的人声称它从未发生过)

private AtomicReference<BigInteger> ref = new AtomicReference<BigInteger>(BigInteger.ZERO);

public BigInteger incrementAndGet() {
    BigInteger currVal, newVal;
    do {
        currVal = ref.get();
        newVal = currVal.clone();
        newVal.add(BigInteger.ONE);
    } while (!ref.compareAndSet(currVal, newVal));

}

I would go with AtomicLong if possible.

如果可能的话,我会选择原子龙。

#1


12  

It is possible using AtomicReference here's a quick draft :

可以使用AtomicReference:

public final class AtomicBigInteger {

    private final AtomicReference<BigInteger> valueHolder = new AtomicReference<>();

    public AtomicBigInteger(BigInteger bigInteger) {
        valueHolder.set(bigInteger);
    }

    public BigInteger incrementAndGet() {
        for (; ; ) {
            BigInteger current = valueHolder.get();
            BigInteger next = current.add(BigInteger.ONE);
            if (valueHolder.compareAndSet(current, next)) {
                return next;
            }
        }
    }
}

It is basically a copy of the AtomicLong code for incrementAndGet()

它基本上是AtomicLong代码的一个副本,用于增量和get ()

#2


4  

This becomes more manageable and easier to understand using the accumulateAndGet or getAndAccumulate introduced in Java 8. These allow you to atomically update the value by supplying an accumulator function that sets the value to the result of the function, and also either returns the previous or calculated result depending on what you need. Here is an example of what that class might look like, followed by a simple example I wrote up that uses it:

使用Java 8中引入的accumulateAndGet或getandaccumul,这会变得更加易于管理和易于理解。通过提供一个将值设置为函数结果的累加器函数,并根据需要返回先前的或计算的结果,这些函数允许您自动更新值。下面是这个类的示例,后面是我编写的一个使用它的简单示例:

import java.math.BigInteger;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

public final class AtomicBigInteger {

  private final AtomicReference<BigInteger> bigInteger;

  public AtomicBigInteger(final BigInteger bigInteger) {
    this.bigInteger = new AtomicReference<>(Objects.requireNonNull(bigInteger));
  }

  // Method references left out for demonstration purposes
  public BigInteger incrementAndGet() {
    return bigInteger.accumulateAndGet(BigInteger.ONE, (previous, x) -> previous.add(x));
  }

  public BigInteger getAndIncrement() {
    return bigInteger.getAndAccumulate(BigInteger.ONE, (previous, x) -> previous.add(x));
  }

  public BigInteger get() {
    return bigInteger.get();
  }
}

An example using it:

一个例子使用它:

import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ABIExample {

  private static final int AVAILABLE_PROCS = Runtime.getRuntime().availableProcessors();
  private static final int INCREMENT_AMOUNT = 2_500_000;
  private static final int TASK_AMOUNT = AVAILABLE_PROCS * 2;
  private static final BigInteger EXPECTED_VALUE = BigInteger.valueOf(INCREMENT_AMOUNT)
                                                             .multiply(BigInteger
                                                                           .valueOf(TASK_AMOUNT));

  public static void main(String[] args)
      throws InterruptedException, ExecutionException {
    System.out.println("Available processors: " + AVAILABLE_PROCS);


    final ExecutorService executorService = Executors
        .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    final AtomicBigInteger atomicBigInteger = new AtomicBigInteger(BigInteger.ZERO);

    final List<Callable<Void>> incrementTasks =  IntStream.rangeClosed(1, TASK_AMOUNT)
             .mapToObj(i -> incrementTask(i, atomicBigInteger))
             .collect(Collectors.toList());
    final List<Future<Void>> futures = executorService.invokeAll(incrementTasks);
    for (Future<Void> future : futures) {
      future.get();
    }
    executorService.shutdown();
    executorService.awaitTermination(30, TimeUnit.SECONDS);
    System.out.println("Final value: " + atomicBigInteger.get());
    final boolean areEqual = EXPECTED_VALUE.equals(atomicBigInteger.get());
    System.out.println("Does final value equal expected? - " + areEqual);
  }

  private static Callable<Void> incrementTask(
      final int taskNumber,
      final AtomicBigInteger atomicBigInteger
  ) {
    return () -> {
      for (int increment = 0; increment < INCREMENT_AMOUNT; increment++) {
        atomicBigInteger.incrementAndGet();
      }
      System.out.println("Task #" + taskNumber + " Completed");
      return null;
    };

  }
}

And an output from running the example on my machine:

在我的机器上运行这个例子的输出:

Available processors: 8
Task #3 Completed
Task #8 Completed
Task #7 Completed
Task #6 Completed
Task #5 Completed
Task #2 Completed
Task #4 Completed
Task #1 Completed
Task #9 Completed
Task #10 Completed
Task #11 Completed
Task #13 Completed
Task #16 Completed
Task #12 Completed
Task #14 Completed
Task #15 Completed
Final value: 80000000
Does final value equal expected? - true

#3


0  

This basically tries over and over until the operation was atomic.

这基本上是反复尝试,直到操作是原子的。

Personally I don't like this code as theoretically it could lead to thread starvation (although the person who showed it to me claims it never happens)

就我个人而言,我不喜欢这段代码,因为从理论上讲,它可能会导致线程不足(尽管向我展示它的人声称它从未发生过)

private AtomicReference<BigInteger> ref = new AtomicReference<BigInteger>(BigInteger.ZERO);

public BigInteger incrementAndGet() {
    BigInteger currVal, newVal;
    do {
        currVal = ref.get();
        newVal = currVal.clone();
        newVal.add(BigInteger.ONE);
    } while (!ref.compareAndSet(currVal, newVal));

}

I would go with AtomicLong if possible.

如果可能的话,我会选择原子龙。