Pthreads n 体问题

时间:2023-03-09 18:56:08
Pthreads n 体问题

▶ 《并行程序设计导论》第六章中讨论了 n 体问题,分别使用了 MPI,Pthreads,OpenMP 来进行实现,这里是 Pthreads 的代码,分为基本算法和简化算法(引力计算量为基本算法的一半,但是消息传递较为复杂)

● 基本算法

 // pth_nbody_basic.c,Pthreads 基本算法
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <pthread.h>
#include "D:\Code\Ptheads\timer.h"
#pragma comment(lib, "pthreadVC2.lib") #define OUTPUT
#define DEBUG
#define DIM 2
#define X 0
#define Y 1
const double G = 6.673e-11;
const int BLOCK = , CYCLIC = ;// 任务分配方式,BLCOK 块调度,CYCLIC 循环调度
typedef double vect_t[DIM];
struct particle_s
{
double m;
vect_t s;
vect_t v;
};
int nThread, n, n_steps, output_freq;
double delta_t;
struct particle_s* curr;
vect_t* forces;
int b_thread_count; // 栅栏计数器
pthread_mutex_t b_mutex; // 栅栏互斥量
pthread_cond_t b_cond_var; // 栅栏条件变量 void Usage(char* prog_name)
{
fprintf(stderr, "usage: %s <nThreads> <nParticles> <nTimestep> <sizeTimestep> <outputFrequency> <g|i>\n", prog_name);
fprintf(stderr, " 'g': inite condition by random\n 'i': inite condition from stdin\n");
exit();
} void Get_args(int argc, char* argv[], char* g_i_p)
{
if (argc != )
Usage(argv[]);
nThread = strtol(argv[], NULL, );
n = strtol(argv[], NULL, );
n_steps = strtol(argv[], NULL, );
delta_t = strtod(argv[], NULL);
output_freq = strtol(argv[], NULL, );
*g_i_p = argv[][];
if (nThread < || n <= || n % nThread || n_steps < || delta_t <= || *g_i_p != 'g' && *g_i_p != 'i')
{
Usage(argv[]);
exit();
}
# ifdef DEBUG
printf("Get_args, nThread%2d n %d n_steps %d delta_t %e output_freq %d g_i %c\n",
nThread, n, n_steps, delta_t, output_freq, *g_i_p);
fflush(stdout);
# endif
} void Gen_init_cond(void)
{
const double mass = 5.0e24, gap = 1.0e5, speed = 3.0e4;
//srand(2);
for (int i = ; i < n; i++)
{
curr[i].m = mass;
curr[i].s[X] = i * gap;
curr[i].s[Y] = 0.0;
curr[i].v[X] = 0.0;
// vel[i][Y] = speed * (2 * rand() / (double)RAND_MAX) - 1);
curr[i].v[Y] = (i % ) ? -speed : speed;
}
} void Get_init_cond(void)
{
printf("For each particle, enter (in order): mass x-coord y-coord x-velocity y-velocity\n");
for (int i = ; i < n; i++)
{
scanf_s("%lf", &curr[i].m);
scanf_s("%lf", &curr[i].s[X]);
scanf_s("%lf", &curr[i].s[Y]);
scanf_s("%lf", &curr[i].v[X]);
scanf_s("%lf", &curr[i].v[Y]);
}
} void Loop_schedule(int my_rank, int nThread, int n, int sched, int* first_p, int* last_p, int* incr_p)
{
if (sched == CYCLIC)// 循环调度,从自己线程编号开始,每隔 nThread 个元素取一个,直到取到 n 为止
{
*first_p = my_rank;
*last_p = n;
*incr_p = nThread;
}
else // 块调度,每线程平均取 n / nThread 个,若不能整除,则前 remainder 个线程每线程多拿一个
{
int quotient = n / nThread;
int remainder = n % nThread;
int myCount;
*incr_p = ;
if (my_rank < remainder)
{
*first_p = my_rank * (quotient + );
*last_p = *first_p + quotient + ;
}
else
{
*first_p = my_rank * quotient + remainder;
*last_p = *first_p + quotient;
}
}
} void Barrier_init(void)// 条件变量等待法同步所有线程
{
b_thread_count = ;
pthread_mutex_init(&b_mutex, NULL);
pthread_cond_init(&b_cond_var, NULL);
} void Barrier(void)
{
pthread_mutex_lock(&b_mutex);
b_thread_count++;
if (b_thread_count == nThread)
{
b_thread_count = ;
pthread_cond_broadcast(&b_cond_var);
}
else
while (pthread_cond_wait(&b_cond_var, &b_mutex) != );
pthread_mutex_unlock(&b_mutex);
} void Barrier_destroy(void)
{
pthread_mutex_destroy(&b_mutex);
pthread_cond_destroy(&b_cond_var);
} void Output_state(double time)
{
printf("Output_state, time = %.2f\n", time);
for (int i = ; i < n; i++)
printf("%2d %10.3e %10.3e %10.3e %10.3e %10.3e\n", i, curr[i].m, curr[i].s[X], curr[i].s[Y], curr[i].v[X], curr[i].v[Y]);
printf("\n");
fflush(stdout);
} void Compute_force(int part)
{
int k;
vect_t f_part_k;
double len, fact;
for (forces[part][X] = forces[part][Y] = 0.0, k = ; k < n; k++)
{
if (k != part)
{
f_part_k[X] = curr[part].s[X] - curr[k].s[X];
f_part_k[Y] = curr[part].s[Y] - curr[k].s[Y];
len = sqrt(f_part_k[X] * f_part_k[X] + f_part_k[Y] * f_part_k[Y]);
fact = -G * curr[part].m * curr[k].m / (len * len * len);
f_part_k[X] *= fact;
f_part_k[Y] *= fact;
forces[part][X] += f_part_k[X];
forces[part][Y] += f_part_k[Y];
# ifdef DEBUG
printf("Compute_force, k%2d> %10.3e %10.3e %10.3e %10.3e\n", k, len, fact, f_part_k[X], f_part_k[Y]);
# endif
}
}
} void Update_part(int part)
{
const double fact = delta_t / curr[part].m;
# ifdef DEBUG
printf("Update_part before, part%2d %10.3e %10.3e %10.3e %10.3e %10.3e %10.3e\n",
part, curr[part].s[X], curr[part].s[Y], curr[part].v[X], curr[part].v[Y], forces[part][X], forces[part][Y]);
fflush(stdout);
# endif
curr[part].s[X] += delta_t * curr[part].v[X];
curr[part].s[Y] += delta_t * curr[part].v[Y];
curr[part].v[X] += fact * forces[part][X];
curr[part].v[Y] += fact * forces[part][Y];
# ifdef DEBUG
printf("Update_part after, part%2d %10.3e %10.3e %10.3e %10.3e)\n",
part, curr[part].s[X], curr[part].s[Y], curr[part].v[X], curr[part].v[Y]);
# endif
} void* Thread_work(void* rank)
{
const long long my_rank = (long long)rank;
int step, part; // 循环变量
int first, last, incr; // 每个线程分配颗粒的起始标号,最后编号,间隔数
Loop_schedule(my_rank, nThread, n, BLOCK, &first, &last, &incr);
for (step = ; step <= n_steps; step++)
{
for (part = first; part < last; Compute_force(part), part += incr);
Barrier();
for (part = first; part < last; Update_part(part), part += incr);
Barrier();
# ifdef OUTPUT
if (step % output_freq == && my_rank == )
Output_state(step * delta_t);
# endif
}
return NULL;
} int main(int argc, char* argv[])
{
char g_i;
long thread;
//double start, finish; // 无法计时
pthread_t* thread_handles; Get_args(argc, argv, &g_i);
curr = (particle_s*)malloc(n * sizeof(struct particle_s));
forces = (vect_t*)malloc(n * sizeof(vect_t));
thread_handles = (pthread_t*)malloc(nThread * sizeof(pthread_t));
if (g_i == 'g')
Gen_init_cond();
else
Get_init_cond();
Barrier_init(); //GET_TIME(start);
# ifdef OUTPUT
Output_state(0.0);
# endif
for (thread = ; thread < nThread; thread++)
pthread_create(&thread_handles[thread], NULL, Thread_work, (void*)thread);
for (thread = ; thread < nThread; thread++)
pthread_join(thread_handles[thread], NULL); //GET_TIME(finish);
//printf("Elapsed time = %e ms\n", (finish - start) * 1000);
Barrier_destroy();
free(thread_handles);
free(curr);
free(forces);
return ;
}

● 输出结果,由于 time.h 有点问题,没能进行及时,但是结果还是正确的。

D:\Code\Ptheads\PthreadsProjectTemp\x64\Debug>PthreadsProjectTemp.exe      g
Output_state, time = 0.00
5.000e+24 0.000e+00 0.000e+00 0.000e+00 3.000e+04
5.000e+24 1.000e+05 0.000e+00 0.000e+00 -3.000e+04
5.000e+24 2.000e+05 0.000e+00 0.000e+00 3.000e+04
5.000e+24 3.000e+05 0.000e+00 0.000e+00 -3.000e+04
5.000e+24 4.000e+05 0.000e+00 0.000e+00 3.000e+04
5.000e+24 5.000e+05 0.000e+00 0.000e+00 -3.000e+04
5.000e+24 6.000e+05 0.000e+00 0.000e+00 3.000e+04
5.000e+24 7.000e+05 0.000e+00 0.000e+00 -3.000e+04
5.000e+24 8.000e+05 0.000e+00 0.000e+00 3.000e+04
5.000e+24 9.000e+05 0.000e+00 0.000e+00 -3.000e+04
5.000e+24 1.000e+06 0.000e+00 0.000e+00 3.000e+04
5.000e+24 1.100e+06 0.000e+00 0.000e+00 -3.000e+04
5.000e+24 1.200e+06 0.000e+00 0.000e+00 3.000e+04
5.000e+24 1.300e+06 0.000e+00 0.000e+00 -3.000e+04
5.000e+24 1.400e+06 0.000e+00 0.000e+00 3.000e+04
5.000e+24 1.500e+06 0.000e+00 0.000e+00 -3.000e+04 Output_state, time = 1.00
5.000e+24 0.000e+00 3.000e+04 5.273e+04 3.000e+04
5.000e+24 1.000e+05 -3.000e+04 1.922e+04 -3.000e+04
5.000e+24 2.000e+05 3.000e+04 1.071e+04 3.000e+04
5.000e+24 3.000e+05 -3.000e+04 6.802e+03 -3.000e+04
5.000e+24 4.000e+05 3.000e+04 4.485e+03 3.000e+04
5.000e+24 5.000e+05 -3.000e+04 2.875e+03 -3.000e+04
5.000e+24 6.000e+05 3.000e+04 1.614e+03 3.000e+04
5.000e+24 7.000e+05 -3.000e+04 5.213e+02 -3.000e+04
5.000e+24 8.000e+05 3.000e+04 -5.213e+02 3.000e+04
5.000e+24 9.000e+05 -3.000e+04 -1.614e+03 -3.000e+04
5.000e+24 1.000e+06 3.000e+04 -2.875e+03 3.000e+04
5.000e+24 1.100e+06 -3.000e+04 -4.485e+03 -3.000e+04
5.000e+24 1.200e+06 3.000e+04 -6.802e+03 3.000e+04
5.000e+24 1.300e+06 -3.000e+04 -1.071e+04 -3.000e+04
5.000e+24 1.400e+06 3.000e+04 -1.922e+04 3.000e+04
5.000e+24 1.500e+06 -3.000e+04 -5.273e+04 -3.000e+04 Output_state, time = 2.00
5.000e+24 5.273e+04 6.000e+04 9.288e+04 1.641e+04
5.000e+24 1.192e+05 -6.000e+04 3.818e+04 -3.791e+03
5.000e+24 2.107e+05 6.000e+04 2.116e+04 3.791e+03
5.000e+24 3.068e+05 -6.000e+04 1.356e+04 -3.101e+03
5.000e+24 4.045e+05 6.000e+04 8.930e+03 3.101e+03
5.000e+24 5.029e+05 -6.000e+04 5.739e+03 -2.959e+03
5.000e+24 6.016e+05 6.000e+04 3.218e+03 2.959e+03
5.000e+24 7.005e+05 -6.000e+04 1.043e+03 -2.929e+03
5.000e+24 7.995e+05 6.000e+04 -1.043e+03 2.929e+03
5.000e+24 8.984e+05 -6.000e+04 -3.218e+03 -2.959e+03
5.000e+24 9.971e+05 6.000e+04 -5.739e+03 2.959e+03
5.000e+24 1.096e+06 -6.000e+04 -8.930e+03 -3.101e+03
5.000e+24 1.193e+06 6.000e+04 -1.356e+04 3.101e+03
5.000e+24 1.289e+06 -6.000e+04 -2.116e+04 -3.791e+03
5.000e+24 1.381e+06 6.000e+04 -3.818e+04 3.791e+03
5.000e+24 1.447e+06 -6.000e+04 -9.288e+04 -1.641e+04 Output_state, time = 3.00
5.000e+24 1.456e+05 7.641e+04 1.273e+05 -1.575e+03
5.000e+24 1.574e+05 -6.379e+04 5.867e+04 2.528e+04
5.000e+24 2.319e+05 6.379e+04 2.685e+04 -2.069e+04
5.000e+24 3.204e+05 -6.310e+04 1.884e+04 2.228e+04
5.000e+24 4.134e+05 6.310e+04 1.235e+04 -2.151e+04
5.000e+24 5.086e+05 -6.296e+04 8.111e+03 2.179e+04
5.000e+24 6.048e+05 6.296e+04 4.537e+03 -2.163e+04
5.000e+24 7.016e+05 -6.293e+04 1.493e+03 2.169e+04
5.000e+24 7.984e+05 6.293e+04 -1.493e+03 -2.169e+04
5.000e+24 8.952e+05 -6.296e+04 -4.537e+03 2.163e+04
5.000e+24 9.914e+05 6.296e+04 -8.111e+03 -2.179e+04
5.000e+24 1.087e+06 -6.310e+04 -1.235e+04 2.151e+04
5.000e+24 1.180e+06 6.310e+04 -1.884e+04 -2.228e+04
5.000e+24 1.268e+06 -6.379e+04 -2.685e+04 2.069e+04
5.000e+24 1.343e+06 6.379e+04 -5.867e+04 -2.528e+04
5.000e+24 1.354e+06 -7.641e+04 -1.273e+05 1.575e+03

● 简化算法

 //pth_nbody_red.c,Pthreads 简化算法
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <pthread.h>
#include "D:\Code\Ptheads\timer.h"
#pragma comment(lib, "pthreadVC2.lib") //#define OUTPUT
//#define DEBUG
#define DIM 2
#define X 0
#define Y 1
const double G = 6.673e-11;
const int BLOCK = , CYCLIC = ;// 任务分配方式,BLCOK 块调度,CYCLIC 循环调度
typedef double vect_t[DIM];
struct particle_s
{
double m;
vect_t s;
vect_t v;
};
int nThread, n, n_steps, output_freq;
double delta_t;
struct particle_s* curr;
vect_t *forces, *loc_forces; // 多一个 loc_forces
int b_thread_count;
pthread_mutex_t b_mutex;
pthread_cond_t b_cond_var; void Usage(char* prog_name)
{
fprintf(stderr, "usage: %s <nThreads> <nParticles> <nTimestep> <sizeTimestep> <outputFrequency> <g|i>\n", prog_name);
fprintf(stderr, " 'g': inite condition by random\n 'i': inite condition from stdin\n");
exit();
} void Get_args(int argc, char* argv[], char* g_i_p)
{
if (argc != )
Usage(argv[]);
nThread = strtol(argv[], NULL, );
n = strtol(argv[], NULL, );
n_steps = strtol(argv[], NULL, );
delta_t = strtod(argv[], NULL);
output_freq = strtol(argv[], NULL, );
*g_i_p = argv[][];
if (nThread < || n <= || n % nThread || n_steps < || delta_t <= || *g_i_p != 'g' && *g_i_p != 'i')
{
Usage(argv[]);
exit();
}
# ifdef DEBUG
printf("Get_args, nThread%2d n %d n_steps %d delta_t %e output_freq %d g_i %c\n",
nThread, n, n_steps, delta_t, output_freq, *g_i_p);
fflush(stdout);
# endif
} void Gen_init_cond(void)
{
const double mass = 5.0e24, gap = 1.0e5, speed = 3.0e4;
//srand(2);
for (int i = ; i < n; i++)
{
curr[i].m = mass;
curr[i].s[X] = i * gap;
curr[i].s[Y] = 0.0;
curr[i].v[X] = 0.0;
// vel[i][Y] = speed * (2 * rand() / (double)RAND_MAX) - 1);
curr[i].v[Y] = (i % ) ? -speed : speed;
}
} void Get_init_cond(void)
{
printf("For each particle, enter (in order): mass x-coord y-coord x-velocity y-velocity\n");
for (int i = ; i < n; i++)
{
scanf_s("%lf", &curr[i].m);
scanf_s("%lf", &curr[i].s[X]);
scanf_s("%lf", &curr[i].s[Y]);
scanf_s("%lf", &curr[i].v[X]);
scanf_s("%lf", &curr[i].v[Y]);
}
} void Loop_schedule(int my_rank, int nThread, int n, int sched, int* first_p, int* last_p, int* incr_p)
{
if (sched == CYCLIC)
{
*first_p = my_rank;
*last_p = n;
*incr_p = nThread;
}
else
{
int quotient = n / nThread;
int remainder = n % nThread;
int myCount;
*incr_p = ;
if (my_rank < remainder)
{
*first_p = my_rank * (quotient + );
*last_p = *first_p + quotient + ;
}
else
{
*first_p = my_rank * quotient + remainder;
*last_p = *first_p + quotient;
}
}
} void Barrier_init(void)
{
b_thread_count = ;
pthread_mutex_init(&b_mutex, NULL);
pthread_cond_init(&b_cond_var, NULL);
} void Barrier(void)
{
pthread_mutex_lock(&b_mutex);
b_thread_count++;
if (b_thread_count == nThread)
{
b_thread_count = ;
pthread_cond_broadcast(&b_cond_var);
}
else
while (pthread_cond_wait(&b_cond_var, &b_mutex) != );
pthread_mutex_unlock(&b_mutex);
} void Barrier_destroy(void)
{
pthread_mutex_destroy(&b_mutex);
pthread_cond_destroy(&b_cond_var);
} void Output_state(double time)
{
printf("Output_state, time = %.2f\n", time);
for (int i = ; i < n; i++)
printf("%2d %10.3e %10.3e %10.3e %10.3e %10.3e\n", i, curr[i].m, curr[i].s[X], curr[i].s[Y], curr[i].v[X], curr[i].v[Y]);
printf("\n");
fflush(stdout);
} void Compute_force(int part, vect_t loc_forces[])// 多一个参数 loc_forces[]
{
int k;
vect_t f_part_k;
double len, fact;
for (k = part + ; k < n; k++)
{
f_part_k[X] = curr[part].s[X] - curr[k].s[X];
f_part_k[Y] = curr[part].s[Y] - curr[k].s[Y];
len = sqrt(f_part_k[X] * f_part_k[X] + f_part_k[Y] * f_part_k[Y]);
fact = -G * curr[part].m * curr[k].m / (len * len * len);
f_part_k[X] *= fact;
f_part_k[Y] *= fact;
loc_forces[part][X] += f_part_k[X];
loc_forces[part][Y] += f_part_k[Y];
loc_forces[k][X] -= f_part_k[X];
loc_forces[k][Y] -= f_part_k[Y];
# ifdef DEBUG
printf("Compute_force, k%2d> %10.3e %10.3e %10.3e %10.3e\n", k, len, fact, f_part_k[X], f_part_k[Y]);
# endif
}
} void Update_part(int part)
{
const double fact = delta_t / curr[part].m;
# ifdef DEBUG
printf("Update_part before, part%2d %10.3e %10.3e %10.3e %10.3e %10.3e %10.3e\n",
part, curr[part].s[X], curr[part].s[Y], curr[part].v[X], curr[part].v[Y], forces[part][X], forces[part][Y]);
fflush(stdout);
# endif
curr[part].s[X] += delta_t * curr[part].v[X];
curr[part].s[Y] += delta_t * curr[part].v[Y];
curr[part].v[X] += fact * forces[part][X];
curr[part].v[Y] += fact * forces[part][Y];
# ifdef DEBUG
printf("Update_part after, part%2d %10.3e %10.3e %10.3e %10.3e)\n",
part, curr[part].s[X], curr[part].s[Y], curr[part].v[X], curr[part].v[Y]);
# endif
} void* Thread_work(void* rank)
{
const long long my_rank = (long long)rank;
int thread;
int step, part;
int bfirst, blast, bincr;// 计算采用循环调度,更新数据采用块调度
int cfirst, clast, cincr; Loop_schedule(my_rank, nThread, n, BLOCK, &bfirst, &blast, &bincr);
Loop_schedule(my_rank, nThread, n, CYCLIC, &cfirst, &clast, &cincr);
for (step = ; step <= n_steps; step++)
{
memset(loc_forces + my_rank*n, , n * sizeof(vect_t));
Barrier();
for (part = cfirst; part < clast; Compute_force(part, loc_forces + my_rank*n), part += cincr);
Barrier();
for (part = bfirst; part < blast; part += bincr)
{
forces[part][X] = forces[part][Y] = 0.0;
for (thread = ; thread < nThread; thread++)
{
forces[part][X] += loc_forces[thread*n + part][X];
forces[part][Y] += loc_forces[thread*n + part][Y];
}
}
Barrier();
for (part = bfirst; part < blast; Update_part(part), part += bincr);
Barrier();
# ifdef OUTPUT
if (step % output_freq == && my_rank == )
Output_state(step * delta_t);
# endif
}
return NULL;
} int main(int argc, char* argv[])
{
char g_i;
long thread;
//double start, finish;
pthread_t* thread_handles; Get_args(argc, argv, &g_i);
curr = (particle_s*)malloc(n * sizeof(struct particle_s));
forces = (vect_t*)malloc(n * sizeof(vect_t));
loc_forces = (vect_t*)malloc(nThread * n * sizeof(vect_t));// 多一个 loc_force 的初始化
thread_handles = (pthread_t*)malloc(nThread * sizeof(pthread_t));
if (g_i == 'g')
Gen_init_cond();
else
Get_init_cond();
Barrier_init(); //GET_TIME(start);
# ifdef OUTPUT
Output_state(0.0);
# endif
for (thread = ; thread < nThread; thread++)
pthread_create(&thread_handles[thread], NULL, Thread_work, (void*)thread);
for (thread = ; thread < nThread; thread++)
pthread_join(thread_handles[thread], NULL); //GET_TIME(finish);
//printf("Elapsed time = %e ms\n", (finish - start) * 1000);
Barrier_destroy();
free(thread_handles);
free(curr);
free(forces);
free(loc_forces);
return ;
}

● 输出结果,与基本算法类似。直观上,pthreads 简化算法花费的时间是 MPI,OpenMP,Pthreads 中最少的,好像 6 秒以内就算完了 1024 体 3600 秒的模拟