[c++实践]内存对齐与伪共享

时间:2023-01-09 16:15:45

内存对齐与伪共享

时间测试类

该类会在后续的测试中用于运行时间测试。

// public/timer.h
#include <chrono>
#include <iostream>
#include <functional>

struct ScopeTimer
{
ScopeTimer(const char *msg):_msg(msg),_now(std::chrono::high_resolution_clock::now()){}


~ScopeTimer(){
std::cout << _msg << ",espaced " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - _now).count() << "ms" << std::endl;
}
const char *_msg;
std::chrono::high_resolution_clock::time_point _now;
};

void test_cycle(std::function<void()> f,long cycle,const char *msg)
{
ScopeTimer s(msg);
for (long index = 0;index < cycle;++index)
{
f();
}
}

内存对齐

内存对齐规则

我们知道,在我们写结构体时,默认情况下编译器会自动对这些数据结构进行对齐,对齐的规则为按照最大的​​pod​​类型进行对齐。同时,我们也可以强制指定对齐的大小,此时将按照默认与指定的字节中较小的进行对齐。在程序员看来,内存是一个一个字节的,但是现代操作系统在进行内存管理时会要求内存按照N字节进行对齐,一般是​​4​​字节。假设有如下几个结构体,按照不同的对齐规则,有不同的大小。

#include <iostream>

// 使用默认对齐
struct t1
{
char c;
int a;
char b;
short d;
};


// 强制1字节对齐
#pragma pack(1)
struct t2
{
char c;
int a;
char b;
short d;
};
struct t3
{
char c;
int a;
short d;
};
// 恢复默认对齐
#pragma pack()



int main()
{
std::cout << "sizeof(t1):" << sizeof(t1) << std::endl; // 12
std::cout << "sizeof(t2):" << sizeof(t2) << std::endl; // 8
std::cout << "sizeof(t3):" << sizeof(t3) << std::endl; // 7
return 0;
}

内存对齐对读取次数的影响

同样的,对于上面的2个结构体​​t1​​和​​t2​​,在要访问成员变量​​a​​时,内存访问的次数是不一样的。对于​​t1​​,由于是​​4​​字节对齐,​​t1.a​​本身就是符合内存对齐要求的,因此只需要一次存取。对于​​t2​​,由于是​​1​​字节强制对齐,​​t2.a​​在内存中的布局如下:

struct t2
{
char c; // 0
int a; // 1、2、3、4
char b; // 5
short d; // 6、7
};

要访问​​t2.a​​,首先要将​​0~3​​字节和​​4~7​​字节分​​2​​次读取到内存中,然后再从​​1~4​​字节获取变量​​t2.a​​的值,因此未对齐的数据结构的访问速度要比对齐的数据结构访问次数慢不止2倍。经过测试,实际的访问速度基本没有差别,是测试代码有问题??测试代码如下:

#include <thread>
#include <iostream>
#include <chrono>
#include "../public/timer.h"


// 使用默认对齐
struct t1
{
char c{0};
int a{0};
char b{0};
short d{0};
};


// 强制1字节对齐
#pragma pack(1)
struct t2
{
char c{0};
int a{0};
char b{0};
short d{0};
};
#pragma pack()


long cycle{10000};


int main()
{
std::thread x1(
[](){
struct t1 a;
ScopeTimer s("inc t1.a");
for (long index = 0;index < cycle;++index)
{
a.a++;
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
);


std::thread x2([](){
struct t2 a;
ScopeTimer s("inc t2.a");
for (long index = 0;index < cycle;++index)
{
a.a++;
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
});


x1.join();
x2.join();
return 0;
}

//inc t2.a,espaced 7110ms
//inc t1.a,espaced 7115ms

嗯,这是由于​​cache line​​造成的吗??还是说内存对齐与否造成的影响本身是可以忽略的??后续在介绍​​cpu cache line​​之后,在排除了缓存命中问题后,使用数组来再次测试内存对齐与否的性能差异。

cpu cache line

现代​​cpu​​都带有缓存,一般分为3级,离​​cpu​​越近的缓存存取速度越快,同时缓存的容量越小。现代​​cpu​​的一级缓存一般大小为​​4~64k​​,并且存取时是以​​cache line​​的形式进行的。我们日常使用的​​cpu cache line​​一般​​64​​字节,也就是​​cpu​​在读取数据时会一次性的从上级内存将​​64​​字节的数据读取到当前缓存中。因此,当我们要读取一个​​long​​类型的数据时,​​cpu​​实际上会将和它临近的一些字节一起读取到一级缓存中,以满足一次读取一个​​cache line​​的要求。

伪共享

如果​​cpu​​只有一个核,在多线程编程时,每个线程进行切换时,都需要将当前线程的上下文进行保存,然后加载下次要运行的线程的上下文,这就叫做上下文切换。现代​​cpu​​一般都会有多个核,因此实际运行时会有多个线程并行运行,每个核都有独立的缓存,正常情况下并行运行的​​2​​个线程如果没有访问或者修改相同内存是不会相互影响。但由于​​cache line​​的存在,如果一个线程修改了运行在另外一个核上线程​​cache line​​上的某一数据,则此时​​cpu​​需要重新加载该​​cache line​​上的数据。我们可以通过以下代码来证明该现象的存在:

#include "../public/timer.h"
#include <thread>

struct Array
{
long size{100};
long curIndex{0};
};

void incIndex(struct Array &arr)
{
arr.curIndex++;
}

void getSize(struct Array &arr)
{
long s = arr.size;
}

constexpr long maxIndex{100000000};

int main(int argc,char **argv)
{
struct Array arr;
{
ScopeTimer s("main");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
}


std::thread t1([&arr](){
ScopeTimer s("thread1");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
});


std::thread t2([&arr](){
ScopeTimer s("thread2");
long index{0};
while(index++ < maxIndex)
{
getSize(arr);
}
});


t1.join();
t2.join();
return 0;
}

//main,espaced 250ms
//thread2,espaced 606ms
//thread1,espaced 699ms

​main​​函数中的输出证明对​​Array​​的​​N​​次递增只需要​​250ms​​,但是当我们在独立线程中让线程​​1​​对​​curIndex​​进行递增,让线程​​2​​获取​​size​​的值,可以看到他们的运行时间都有答复提升。这是由于线程​​1​​和线程​​2​​分别运行在不同的​​cpu​​核心上,线程​​2​​的​​cpu​​会同时将​​curIndex​​和​​size​​同时读取到​​cache line​​,当线程​​1​​修改了​​curIndex​​的时候,会造成线程​​2​​中的​​curIndex​​的值发生改变,虽然线程​​2​​不关心​​curIndex​​,但是此时​​cpu​​还是需要重新从内存获取整个​​cache line​​的数据,因此造成运行时间的大幅提升。如果在两个线程都运行​​getSize​​,可以看到两个线程运行的时间都在​​200ms​​左右。

解决伪共享

虽然两个线程访问的数据是独立的,但是可能会存在某一线程修改的数据,在另外一个线程的​​cache line​​中,这样会造成另外一个线程需要重新存取整个​​cache line​​,这种现象叫做伪共享。要解决伪共享,就要避免多个线程的​​cache line​​相互影响,此时我们可以通过强制补充不需要的数据,让我们要访问的数据相互隔离,避免​​cache line​​的影响。测试代码如下:

#include "../public/timer.h"
#include <thread>

struct Array
{
long size{100};
char padding[64-sizeof(long)];
long curIndex{0};
};

void incIndex(struct Array &arr)
{
arr.curIndex++;
}

void getSize(struct Array &arr)
{
long s = arr.size;
}

constexpr long maxIndex{100000000};
int main(int argc,char **argv)
{
struct Array arr;
{
ScopeTimer s("main incIndex");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
}
{
ScopeTimer s("main getSize");
long index{0};
while(index++ < maxIndex)
{
getSize(arr);
}
}

std::thread t1([&arr](){
ScopeTimer s("thread1 incIndex");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
});

std::thread t2([&arr](){
ScopeTimer s("thread2 getSize");
long index{0};
while(index++ < maxIndex)
{
getSize(arr);
}
});

t1.join();
t2.join();
return 0;
}



//main incIndex,espaced 244ms
//main getSize,espaced 236ms
//thread2 getSize,espaced 192ms
//thread1 incIndex,espaced 259ms

由于我们在要访问的数据中添加了​​padding​​,此时两个线程要访问/修改的数据都是相互独立的,可以看到运行的时间基本和他们在​​main​​函数中依次运行时大致一致。

测试代码

屏蔽cache line测试内存对齐的影响

在​​内存对齐对读取次数的影响​​的测试中,我们看到对齐和不对齐实际的测试结果和我们预期不符,两者的运行时间大致一致。这可能是由于​​cache line​​的原因造成的。这里我们创建2个结构体,大小都是​​64​​字节,然后动态创建数组,并对数组中的每个元素的处于​​4​​字节对齐位置和非对齐位置进行累加,这样可以避免由于​​cache line​​对同一元素进行访问时由于缓存造成​​N​​次循环中实际只有第一次访问时是存在差异的。通过测试,我们可以看到内存对齐的访问速度是非内存对齐的​​9​​倍!!!

#include <thread>
#include <iostream>
#include <chrono>
#include "../public/timer.h"


// 使用默认对齐
struct t5
{
char c{0};
int a{0};
char b{0};
short d{0};
char x[64-12];
};


// 强制1字节对齐
#pragma pack(1)
struct t6
{
char c;
int a;
char x[64-5];
};
#pragma pack()
constexpr long cycle{100000000};

int main()
{
std::cout << "sizeof(t5):" << sizeof(t5) << std::endl; // 64
std::cout << "sizeof(t6):" << sizeof(t6) << std::endl; // 64

// 提前申请内存,避免内存申请造成的差异
struct t5 * a1 = new struct t5[cycle];
struct t6 * a2 = new struct t6[cycle];

// 对内存对齐的N个元素分别进行1次访问
{
ScopeTimer s("inc t5.a");
for (long index = 0;index < cycle;++index)
{
a1[index].a++;
}
}

// 对非内存对齐的N个元素分别进行1次访问
{
ScopeTimer s("inc t6.a");
for (long index = 0;index < cycle;++index)
{
a2[index].a++;
}
}
}

//sizeof(t5):64
//sizeof(t6):64
//inc t5.a,espaced 568ms
//inc t6.a,espaced 4611ms