C++并发编程框架Theron(6)——File reader(2)

时间:2022-02-13 17:58:31

1 前言
  在上一篇博文中我主要介绍了一个异步功能的文件读取服务程序,但是程序有一点小小的不足就是不能并行处理多文件:尽管文件请求是在一个独立线程下异步处理的,但是他们仍然是严格按先后顺序来执行操作的。在这篇博文中,我们将使用一系列线程Workers去并发处理读取请求,而不是仅仅一个异步的线程。在本文当中,我们会学到怎样从actors集合中构建一个复杂的系统,并且使用actor抽象化隐藏在消息传递交流的背后的线程操作。
2 多个actors 
  为了整合一系列Worker actors,我们需要一个中心分发功能的actor来管理这些workers,并且有序派遣Readquests给空闲的workers:

// 一个派遣actor处理工作事例
// 派遣actor会在内部创建和控制一系列workers.
// 它协调workers并行处理工作事例
template <class WorkMessage>
class Dispatcher : public Theron::Actor
{
public:
Dispatcher(Theron::Framework &framework, const int workerCount) : Theron::Actor(framework)
{
// 创建workers并且将它们加入空闲的列中
for (int i = 0; i < workerCount; ++i)
{
mWorkers.push_back(new WorkerType(framework));
mFreeQueue.push(mWorkers.back()->GetAddress());
}
RegisterHandler(this, &Dispatcher::Handler);
}
~Dispatcher()
{
// 释放workers
const int workerCount(static_cast<int>(mWorkers.size()));
for (int i = 0; i < workerCount; ++i)
{
delete mWorkers[i];
}
}
private:
typedef Worker<WorkMessage> WorkerType;
// 从客户端处理读取请求
void Handler(const WorkMessage &message, const Theron::Address from)
{
// 是否这个工作事例被处理过?
if (message.Processed())
{
// 发送结果返回给请求的调用者
Send(message, message.Client());
// 将worker加入到空间的列中
mFreeQueue.push(from);
}
else
{
// 将没有处理的工作事例放入工作列中
mWorkQueue.push(message);
}
// 服务于工作队列
while (!mWorkQueue.empty() && !mFreeQueue.empty())
{
Send(mWorkQueue.front(), mFreeQueue.front());
mFreeQueue.pop();
mWorkQueue.pop();
}
}
std::vector<WorkerType *> mWorkers; // 拥有的workers的指针
std::queue<Theron::Address> mFreeQueue; // 空闲workers队列.
std::queue<WorkMessage> mWorkQueue; // 未处理工作消息队列
};

  这块我还要说明一下,我们创建的Dispatcher是作为一个类模板,来强调就像worker一样,它并不是仅仅用来读取文件,还可以用来并行处理其他形式的工作。
  Workers群体是使用new运算符在构造器中被动态创建的,储存在一个指针向量中,并且是在析构函数中使用delete运算符被销毁。
  构造器也将workers的地址推送进空闲的worker队列中,可以根据它们的地址被识别。这个队列被用来记录哪些workers在当前是空闲的,并且可以用来处理work队列中收到的工作消息。
  实际的行动是在Handler()方法中。当收到一条WorkMessage,Handler()方法会检查该消息是不是已经看过。如果是新消息会被放置到工作队列中,不然就直接返回之前检测的结果给请求的客户端。
  当Handler()方法反馈完一个结果消息,它会重新将这个worker的地址发送加到空闲workers队列中,从而它可以被用来服务于未来的工作请求。最终整个过程形成了一个工作服务环结构,即我们使用空闲worker队列中的空闲的workers服务于work队列中未处理的工作消息。
3 多个frameworks 
  在上面介绍中我们已经了解到,Theron::Framework是一个管理类,用来主持actors集合。在这一小节我会展现Frameworks是如何可以用来控制线程来执行文件读取请求的。
  多个frameworks在一个单应用中被创建,并且管理着各自的actors集体。在每个frameworks内部,每个framework实例使用自己的worker线程集合来执行它管理的actors的消息处理函数(之前理论中有介绍过)。
  从消息传递来看,frameworks之间的壁垒如同透明般一样:在一个framework中的actors完全可以给其他frameworks的actors发送消息,就像一个framework中两个actor之间通信那样简单。Actor地址是全局独一无二的,不仅可以用来分别出actor,而且可以分别拥有它的framework,所以不同frameworks中的actors通信畅行无阻。
  到目前为止我们介绍的frameworks都是依靠默认构造函数创建的。但是也可以提供一个Parameters构造器parameter,允许控制worker线程数量和哪个处理器核被执行。

    Theron::Framework::Parameters frameworkParams; 
frameworkParams.mThreadCount = 16;
frameworkParams.mProcessorMask = 0xF;

Theron::Framework framework(frameworkParams);

  通过设置这个framework中线程池的大小,我们可以配置软件同时接触其管理的actors的数量。此外,通过限制一个framework的worker线程集被执行占用系统核的资源量,我们可以确保其他剩余系统资源可以用来处理事,包括非actor代码和其他frameworks的actors。
  main()函数如下,我们也构建一个Framework用来完成文件读取服务:

int main(int argc, char *argv[]) 
{
// Theron::Framework对象可以被有参构造
// 我们创建一个拥有n个线程的framework, n是文件的数量,我们希望可以并行读取所有文件。
// 我们也限制工作线程为前两个处理器核
Theron::Framework::Parameters frameworkParams;
frameworkParams.mThreadCount = MAX_FILES;
frameworkParams.mProcessorMask = (1UL << 0) | (1UL << 1);
Theron::Framework framework(frameworkParams);
if (argc < 2)
{
printf("Expected up to 16 file name arguments.\n");
}
// 注册一个receiver和一个处理函数,用来捕获结果消息
Theron::Receiver receiver;
Theron::Catcher<ReadRequest> resultCatcher;
receiver.RegisterHandler(&resultCatcher, &Theron::Catcher<ReadRequest>::Push);
// 创建一个dispatcher用来处理工作, 一共有MAX_FILES个workers.
Dispatcher<ReadRequest> dispatcher(framework, MAX_FILES);
// 发送工作请求, 每个工作请求是命令行上输入的文件名
for (int i = 0; i < MAX_FILES && i + 1 < argc; ++i)
{
unsigned char *const buffer = new unsigned char[MAX_FILE_SIZE];
const ReadRequest message(
receiver.GetAddress(),
argv[i + 1],
buffer,
MAX_FILE_SIZE);
framework.Send(message, receiver.GetAddress(), dispatcher.GetAddress());
}
// 等待所有线程反馈
for (int i = 1; i < argc; ++i)
{
receiver.Wait();
}
// 处理结果,我们仅打印文件的字节大小
ReadRequest result;
Theron::Address from;
while (!resultCatcher.Empty())
{
resultCatcher.Pop(result, from);
printf("Read %d bytes from file '%s'\n", result.FileSize(), result.FileName());

// 释放空间
delete [] result.Buffer();
}
}

  我们使用mThreadCount个Parameters结构来具体化framework中worker线程的数量。创建更多的线程允许更多的actors可以在同一时刻被执行,但也会增加开销。通过具体化MAX_FILES作为工作线程总量,我们支持同时处理MAX_FILES阅读操作。
  而在指定系统处理该程序的处理器核方面,我们使用mProcessMask成员来指定工作线程的处理器亲昵度。亲昵度是按位被指定的(每一位对应一个处理器)。我们假设现在的硬件系统都包含至少两个核,我们限制IO framework在前两个,剩下其他核给其他frameworks。因为这个仅是一个实例,我们实际上不需要创建任何其他的actors,但是如果我们需要的话,我们还要创建它们在另一个独立的Framework。
4 小结 
  因为系统文件的读取调用可以阻塞ms级甚至s级的开销。通过在一个独立Framework隔离文件读取服务的actors,我们可以贡献一个线程池来执行这些潜在的文件阅读。并且通过具体化framework的线程池,我们可以控制多少线程被允许在此被占用。接着,甚至所有的线程都被突如其来的许多并行文件读取请求所占用,我们知道其他frameworks的工作线程也不会受到任何影响,所以他们的actors并不会被饿死。
  以上是个人学习记录,由于能力和时间有限,如果有错误望读者纠正,谢谢!
  转载请注明出处:http://blog.csdn.net/FX677588/article/details/75268270


  参考文献:
  Theron框架官网http://www.theron-library.com/