CAS原子操作实现无锁及性能分析

时间:2023-01-03 12:15:04

原文地址:http://blog.csdn.net/chen19870707/article/details/41083183

CAS原子操作实现无锁及性能分析

 

  • Author:Echo Chen(陈斌)

  • Email:chenb19870707@gmail.com

  • Blog:Blog.csdn.net/chen19870707

  • Date:Nov 13th, 2014

    最近在研究nginx的自旋锁的时候,又见到了GCC CAS原子操作,于是决定动手分析下CAS实现的无锁到底性能如何,网上关于CAS实现无锁的文章很多,但少有研究这种无锁的性能提升的文章,这里就以实验结果和我自己的理解逐步展开。

  • 1.什么是CAS原子操作

    在研究无锁之前,我们需要首先了解一下CAS原子操作——Compare & Set,或是 Compare & Swap,现在几乎所CAS原子操作实现无锁及性能分析有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。

    大家应该还记得操作系统里面关于“原子操作”的概念,一个操作是原子的(atomic),如果这个操作所处的层(layer)的更高层不能发现其内部实现与结构。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。有了这个原子操作这个保证我们就可以实现无锁了。

    CAS原子操作在*中的代码描述如下:

       1: int compare_and_swap(int* reg, int oldval, int newval)
       2: {
       3:   ATOMIC();
       4:   int old_reg_val = *reg;
       5:   if (old_reg_val == oldval)
       6:      *reg = newval;
       7:   END_ATOMIC();
       8:   return old_reg_val;
       9: }
  • 也就是检查内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。上面的代码总是返回old_reg_value,调用者如果需要知道是否更新成功还需要做进一步判断,为了方便,它可以变种为直接返回是否更新成功,如下:

  •    1: bool compare_and_swap (int *accum, int *dest, int newval)
       2: {
       3:   if ( *accum == *dest ) {
       4:       *dest = newval;
       5:       return true;
       6:   }
       7:   return false;
       8: }

    除了CAS还有以下原子操作:

    • Fetch And Add,一般用来对变量做 +1 的原子操作。
         1: << atomic >>
         2: function FetchAndAdd(address location, int inc) {
         3:     int value := *location
         4:     *location := value + inc
         5:     return value
         6: }
       Test-and-set,写值到某个内存位置并传回其旧值。汇编指令BST。
         1: #define LOCKED 1
         2:  
         3: int TestAndSet(int* lockPtr) {
         4:     int oldValue;
         5:  
         6:     // Start of atomic segment
         7:     // The following statements should be interpreted as pseudocode for
         8:     // illustrative purposes only.
         9:     // Traditional compilation of this code will not guarantee atomicity, the
        10:     // use of shared memory (i.e. not-cached values), protection from compiler
        11:     // optimization, or other required properties.
        12:     oldValue = *lockPtr;
        13:     *lockPtr = LOCKED;
        14:     // End of atomic segment
        15:  
        16:     return oldValue;
        17: }
       
    • Test and Test-and-set,用来实现多核环境下互斥锁,
         1: boolean locked := false // shared lock variable
         2: procedure EnterCritical() {
         3:   do {
         4:     while (locked == true) skip // spin until lock seems free
         5:   } while TestAndSet(locked) // actual atomic locking
         6: }

    2.CAS 在各个平台下的实现

     

    2.1 Linux GCC 支持的 CAS

    GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins

       1: bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
       2: type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

    2.2  Windows支持的CAS

    在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions

  •    1: InterlockedCompareExchange ( __inout LONG volatile *Target,
       2:                                 __in LONG Exchange,
       3:                                 __in LONG Comperand);

    2.3  C++ 11支持的CAS

    C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library

       1: template< class T >
       2: bool atomic_compare_exchange_weak( std::atomic<T>* obj,
       3:                                    T* expected, T desired );
       4: template< class T >
       5: bool atomic_compare_exchange_weak( volatile std::atomic<T>* obj,
       6:                                    T* expected, T desired );
     
  • 3.CAS原子操作实现无锁的性能分析

     

    • CAS原子操作实现无锁及性能分析3.1测试方法描述

     
             这里由于只是比较性能,所以采用很简单的方式,创建10个线程并发执行,每个线程中循环对全局变量count进行++操作(i++),循环加2000000次,这必然会涉及到并发互斥操作,在同一台机器上分析 加普通互斥锁、CAS实现的无锁、Fetch And Add实现的无锁消耗的时间,然后进行分析。

     

    3.2 加普通互斥锁代码

       1: #include <stdio.h>
       2: #include <stdlib.h>
       3: #include <pthread.h>
       4: #include <time.h>
       5: #include "timer.h"
       6:  
       7: pthread_mutex_t mutex_lock;
       8: static volatile int count = 0;
       9: void *test_func(void *arg)
      10: {
      11:         int i = 0;
      12:         for(i = 0; i < 2000000; i++)
      13:         {
      14:                 pthread_mutex_lock(&mutex_lock);
      15:                 count++;
      16:                 pthread_mutex_unlock(&mutex_lock);
      17:         }
      18:         return NULL;
      19: }
      20:  
      21: int main(int argc, const char *argv[])
      22: {
      23:     Timer timer; // 为了计时,临时封装的一个类Timer。
      24:     timer.Start();    // 计时开始
      25:     pthread_mutex_init(&mutex_lock, NULL);
      26:     pthread_t thread_ids[10];
      27:     int i = 0;
      28:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      29:     {
      30:         pthread_create(&thread_ids[i], NULL, test_func, NULL);
      31:     }
      32:  
      33:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      34:     {
      35:         pthread_join(thread_ids[i], NULL);
      36:     }
      37:  
      38:     timer.Stop();// 计时结束
      39:     timer.Cost_time();// 打印花费时间
      40:     printf("结果:count = %d\n",count);
      41:  
      42:     return 0;
      43: }

    注:Timer类仅作统计时间用,其实现在文章最后给出。

     

    3.2 CAS实现的无锁

     

       1: #include <stdio.h>
       2: #include <stdlib.h>
       3: #include <pthread.h>
       4: #include <unistd.h>
       5: #include <time.h>
       6: #include "timer.h"
       7:  
       8: int mutex = 0;
       9: int lock = 0;
      10: int unlock = 1;
      11:  
      12: static volatile int count = 0;
      13: void *test_func(void *arg)
      14: {
      15:         int i = 0;
      16:         for(i = 0; i < 2000000; i++)
      17:     {
      18:         while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
      19:          count++;
      20:          __sync_bool_compare_and_swap (&mutex, unlock, 0);
      21:         }
      22:         return NULL;
      23: }
      24:  
      25: int main(int argc, const char *argv[])
      26: {
      27:     Timer timer;
      28:     timer.Start();
      29:     pthread_t thread_ids[10];
      30:     int i = 0;
      31:  
      32:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      33:     {
      34:             pthread_create(&thread_ids[i], NULL, test_func, NULL);
      35:     }
      36:  
      37:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
      38:     {
      39:             pthread_join(thread_ids[i], NULL);
      40:     }
      41:  
      42:     timer.Stop();
      43:     timer.Cost_time();
      44:     printf("结果:count = %d\n",count);
      45:  
      46:     return 0;
      47: }
      48:  

     

    3.4 Fetch And Add 原子操作

     
       1: #include <stdio.h>
       2: #include <stdlib.h>
       3: #include <pthread.h>
       4: #include <unistd.h>
       5: #include <time.h>
       6: #include "timer.h"
       7:  
       8: static volatile int count = 0;
       9: void *test_func(void *arg)
      10: {
      11:         int i = 0;
      12:         for(i = 0; i < 2000000; i++)
      13:         {
      14:             __sync_fetch_and_add(&count, 1);
      15:         }
      16:         return NULL;
      17: }
      18:  
      19: int main(int argc, const char *argv[])
      20: {
      21:     Timer timer;
      22:     timer.Start();
      23:     pthread_t thread_ids[10];
      24:     int i = 0;
      25:  
      26:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
      27:             pthread_create(&thread_ids[i], NULL, test_func, NULL);
      28:     }
      29:  
      30:     for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
      31:             pthread_join(thread_ids[i], NULL);
      32:     }
      33:  
      34:     timer.Stop();
      35:     timer.Cost_time();
      36:     printf("结果:count = %d\n",count);
      37:     return 0;
      38: }
      39:  
     

    4 实验结果和分析

    在同一台机器上,各运行以上3份代码10次,并统计平均值,其结果如下:(单位微秒)


    CAS原子操作实现无锁及性能分析

    由此可见,无锁操作在性能上远远优于加锁操作,消耗时间仅为加锁操作的1/3左右,无锁编程方式确实能够比传统加锁方式效率高,经上面测试可以发现,可以快到3倍左右。所以在极力推荐在高并发程序中采用无锁编程的方式可以进一步提高程序效率。

    5.时间统计类Timer

    timer.h

       1: #ifndef TIMER_H
       2: #define TIMER_H
       3:  
       4: #include <sys/time.h>
       5: class Timer
       6: {
       7: public:    
       8:     Timer();
       9:     // 开始计时时间
      10:     void Start();
      11:     // 终止计时时间
      12:     void Stop();
      13:     // 重新设定
      14:     void Reset();
      15:     // 耗时时间
      16:     void Cost_time();
      17: private:
      18:     struct timeval t1;
      19:     struct timeval t2;
      20:     bool b1,b2;
      21: };
      22: #endif

    timer.cpp

       1: #include "timer.h"
       2: #include <stdio.h>
       3:  
       4: Timer::Timer()
       5: {
       6:     b1 = false;
       7:     b2 = false;
       8: }
       9: void Timer::Start()
      10: {
      11:     gettimeofday(&t1,NULL);  
      12:     b1 = true;
      13:     b2 = false;
      14: }
      15:  
      16: void Timer::Stop()
      17: {    
      18:     if (b1 == true)
      19:     {
      20:         gettimeofday(&t2,NULL);  
      21:         b2 = true;
      22:     }
      23: }
      24:  
      25: void Timer::Reset()
      26: {    
      27:     b1 = false;
      28:     b2 = false;
      29: }
      30:  
      31: void Timer::Cost_time()
      32: {
      33:     if (b1 == false)
      34:     {
      35:         printf("计时出错,应该先执行Start(),然后执行Stop(),再来执行Cost_time()");
      36:         return ;
      37:     }
      38:     else if (b2 == false)
      39:     {
      40:         printf("计时出错,应该执行完Stop(),再来执行Cost_time()");
      41:         return ;
      42:     }
      43:     else
      44:     {
      45:         int usec,sec;
      46:         bool borrow = false;
      47:         if (t2.tv_usec > t1.tv_usec)
      48:         {
      49:             usec = t2.tv_usec - t1.tv_usec;
      50:         }
      51:         else
      52:         {
      53:             borrow = true;
      54:             usec = t2.tv_usec+1000000 - t1.tv_usec;
      55:         }
      56:  
      57:         if (borrow)
      58:         {
      59:             sec = t2.tv_sec-1 - t1.tv_sec;
      60:         }
      61:         else
      62:         {
      63:             sec = t2.tv_sec - t1.tv_sec;
      64:         }
      65:         printf("花费时间:%d秒 %d微秒\n",sec,usec);
      66:     }
      67: }
      68:  
  • 6.参考

  • 1.http://blog.csdn.net/hzhsan/article/details/25837189      

  •  2.http://coolshell.cn/articles/8239.html
  • -

  • Echo Chen:Blog.csdn.net/chen19870707

       -

  • 7
     
    1
      

    我的同类文章

    猜你在找
    Linux快速学习以及监控分析实战【小强测试出品】
    Linux环境C语言编程基础
    C语言指针与汇编内存地址
    C语言指针与汇编内存地址(二)
    1.16.ARM裸机第十六部分-shell原理和问答机制引入
    透过 Linux 内核看无锁编程
    透过 Linux 内核看无锁编程
    透过 Linux 内核看无锁编程
    透过 Linux 内核看无锁编程
    透过 Linux 内核看无锁编程
    查看评论
    9楼  workrun2015-09-22 22:01发表 [回复]
    CAS原子操作实现无锁及性能分析
    多谢分享,学习了~
    8楼  luoyuyou2015-01-16 14:51发表 [回复]
    CAS原子操作实现无锁及性能分析
    楼主文章不错~
    7楼  bxps9992014-12-22 16:03发表 [回复]
    CAS原子操作实现无锁及性能分析
    学习了
    6楼  fubar2014-12-02 17:09发表 [回复]
    CAS原子操作实现无锁及性能分析
    pthread mutex implementation also makes use of CAS under the hood. The major difference, with your test, is that pthread mutex would make a system call to block calling thread until the lock is available, while your CAS implementation makes a fixed-time sleep.
    Re:  l0vehj2014-12-05 10:37发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复fubar:usleep is also a system call, is't it?
    why the version which used usleep is much faster then the mutex one?
    Re:  chen198707072014-12-04 19:45发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复fubar:Thanks for your remarks! I'll learn more about it.
    Re:  TiGEr_zZ2014-12-04 18:26发表 [回复]
    CAS原子操作实现无锁及性能分析
    这篇文章,包括很多人对无锁的理解都是非常错误的,这里其实使用了spin锁,实际上也是基于锁的。
    另外fubar说的很对,CAS本身没什么神秘的到处都在用。无锁只是说基于CAS来实现,因为它被证明是充分必要条件,而其他原子操作要不不够,要不像ll/sc更强不被广泛支持,两者不是等同的概念。
    5楼  scgywx2014-11-26 16:22发表 [回复]
    CAS原子操作实现无锁及性能分析
    学习了,谢谢!
    另外有个疑问,CAS实现的无锁,如果拿不到锁就sleep 100ms,,如果时间改小一点会不会有所提高呢?毕竟如果有一次拿到就浪费了100ms了。。
    Re:  何以诚2014-12-18 22:09发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复scgywx:有一种极端情况:若是它能拿到锁的时候永远都在sleep咋办。。。
    Re:  chen198707072014-11-26 17:37发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复scgywx:应该会,你可以测测,我没试
    4楼  alaclp2014-11-25 09:38发表 [回复]
    CAS原子操作实现无锁及性能分析
    能够用C++ 11特性, 别使用pthread, 写个线程安全的计数器
    在linux下测试没找到pthread
    3楼  zenny_chen2014-11-17 12:36发表 [回复]
    CAS原子操作实现无锁及性能分析
    CAS没有LL-SC那么灵活强大
    Re:  workrun2015-09-22 23:16发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复zenny_chen:LL-SC 很强大么,把 一个CAS操作分开来做,还要考虑到期间硬件中断和缓存行被修改的问题,貌似store-conditional 失败率更高
    Re:  zenny_chen2015-09-23 20:44发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复workrun:LL-SC可不是将一个CAS分开来。LL-SC比起CAS来有两个优势:
    LL/SC has two advantages over CAS when designing a load-store architecture: reads and writes are separate instructions, as required by the design philosophy (and pipeline architecture); and both instructions can be performed using only two registers (address and value), fitting naturally into LSA encoding schemes. CAS, on the other hand, requires three registers (address, old value, new value) and a dependency between the value read and the value written. x86, being a CISC architecture, does not have this constraint; though modern chips may well translate a CAS instruction into separate LL/SC micro-operations internally.
    Re:  workrun2015-09-24 13:07发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复zenny_chen:难道它不是把读写操作分开来的么,读取-监视-条件写,可以看一下这篇http://blogs.msdn.com/b/oldnewthing/archive/2013/09/13/10448736.aspx,
    请求CPU监视一个内存地址依赖于CPU自己的实现。但要记住一件事情,CPU在同一时间只能监视一个内存地址,并且这个时间是很短暂的。如果你的代码被抢占了或者在load-link后有一个硬件中断到来,那么你的store-conditional将会失败,因为CPU因为硬件中断而分心了,完全忘记了你要求它监视的内存地址(即使CPU成功的记住了它,也不会记太久,因为硬件中断几乎都会执行自己的load-link指令,因此会替换成它自己要求监视的内存地址)。
    另外,CPU可能会有点懒,在监视时并不监视内存地址,而是监视cache line,如果有人修改了一个不同的内存位置,但是刚好跟要被监视的内存地址在同一个cache line里,store-conditional操作也会失败,即使它事实上可以成功完成。ARM架构的CPU是太懒了,以至于任何向同一块2048字节写入的操作都会导致store-conditional失败。
    Re:  lojunren2014-11-30 21:28发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复zenny_chen:X86系列的CPU是没有LL-SC指令的
    Re:  chen198707072014-11-17 15:23发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复zenny_chen:我要研究下,谢谢共享知识
    Re:  zenny_chen2014-12-01 10:33发表 [回复]
    CAS原子操作实现无锁及性能分析
    回复chen19870707:呼呼,推荐阅读:http://en.wikipedia.org/wiki/Non-blocking_algorithm
    其中,see also里的Load-Link/Store-Conditional就是LL-SC
    2楼  丁国华2014-11-16 08:05发表 [回复]
    CAS原子操作实现无锁及性能分析
    谢谢分享 学习了 `(*∩_∩*)′
    1楼  soledadzz2014-11-14 17:59发表 [回复] [引用] [举报]
    CAS原子操作实现无锁及性能分析
    您的文章已被推荐到博客首页和个人页侧边栏推荐文章,感谢您的分享。