boost的asio接收单路大数据量udp包的方法

时间:2023-03-09 08:15:28
boost的asio接收单路大数据量udp包的方法

开发windows客户端接收RTP视频流,当h264视频达到1080P 60fps的时候,按包来调用recvfrom的函数压力比较大,存在丢包的问题,windows的完成端口的性能效果当然可以解决这个问题,而boost的asio在windows上是基于完成端口来开发的,所以采用boost的asio和环形缓冲区的方法,可以解决接收单路大数据量udp包中丢包的问题。

需要引入的头文件为:

  1. #include "CircledBuffer.h"
  2. #include <iostream>
  3. #include <boost/asio.hpp>
  4. #include <boost/bind.hpp>

其中CircledBuffer.h是自定义的缓冲区的类,之后会有介绍,boost的两个文件是asio必需的两个文件。

需要定义的全局变量为:

  1. using boost::asio::ip::udp;
  2. boost::asio::io_service service;
  3. boost::asio::ip::udp::socket sock(service);
  4. boost::asio::ip::udp::endpoint sender_ep;
  5. CircledBuffer readBuffer;
  6. PacketBuffer* packet;

其中io_service是用来标示启动的,后面会调用run。sock和endpoint类似于描述符和sockaddr_in的关系。CircledBuffer和PacketBuffer*,是自定义缓冲区。

主函数为:

  1. int main(int argc, char* argv[]) {
  2. boost::asio::ip::udp::endpoint ep( boost::asio::ip::address::from_string("192.168.1.206"),
  3. 9002);
  4. sock.open(ep.protocol());
  5. sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));
  6. boost::asio::socket_base::receive_buffer_size recv_option(8*65534);
  7. sock.set_option(recv_option);
  8. sock.bind(ep);
  9. packet = readBuffer.GetLast();
  10. sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);
  11. service.run();
  12. }

初始化ep和sock,其中udp接收的数量比较大的话,需要设定receive_buffer_size,然后bind,设置接受buffer为packet。

介绍一下async_receive_from函数,它有三个参数,分别为接收的buffer,远端的ep,注意与本端的ep不同,远端的ep不用初始化设置,再就是buffer收满后的回调函数。

回调函数的内容是:

  1. void on_read(const boost::system::error_code & err, std::size_t
  2. read_bytes) {
  3. std::cout << "read: " << read_bytes << std::endl;
  4. readBuffer.MoveNext();
  5. packet= readBuffer.GetLast();
  6. sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);
  7. }

与main函数的接收部分一致,这里用了不断的自身回调,来实现while recvfrom的功能。

补充说一句,用申请好的CircledBuffer,便于后期的多线程或者异步strand的处理,而不阻塞接收。

缓冲区类的代码:

头文件:

  1. #ifndef CIRCLED_BUFFER_H
  2. #define CIRCLED_BUFFER_H
  3. #include <memory.h>
  4. #include <boost/atomic.hpp>
  5. #define CIRCLED_BUFFER_SIZE 300
  6. #define BUFFER_SIZE 2000
  7. struct PacketBuffer
  8. {
  9. PacketBuffer(){bufferSize=BUFFER_SIZE;dataSize=0;}
  10. unsigned int bufferSize;
  11. unsigned int dataSize;
  12. char data[BUFFER_SIZE];
  13. PacketBuffer& operator=(PacketBuffer& other)
  14. {
  15. memcpy(data,other.data,other.dataSize);
  16. dataSize = other.dataSize;
  17. bufferSize = other.bufferSize;
  18. return *this;
  19. }
  20. };
  21. class CircledBuffer
  22. {
  23. public:
  24. CircledBuffer(unsigned int bufSize=CIRCLED_BUFFER_SIZE);
  25. public:
  26. ~CircledBuffer(void);
  27. PacketBuffer* GetAt(unsigned int idx){return &packets[idx];}
  28. PacketBuffer* GetLast()
  29. {
  30. return GetAt(writeIndex.load(boost::memory_order_consume));
  31. };
  32. void MoveNext()
  33. {
  34. unsigned int idx = writeIndex.load(boost::memory_order_relaxed);
  35. writeIndex.store((idx+1)%bufferSize,boost::memory_order_release);
  36. };
  37. unsigned int GetLastIndex(){return writeIndex.load(boost::memory_order_consume);};
  38. unsigned int GetSize(){return bufferSize;};
  39. protected:
  40. boost::atomic<unsigned int> writeIndex;
  41. unsigned int bufferSize;
  42. PacketBuffer* packets;
  43. };
  44. #endif

缓冲区类的构造函数与析构函数

  1. #include "CircledBuffer.h"
  2. CircledBuffer::CircledBuffer(unsigned int bufSize)
  3. :bufferSize(bufSize),
  4. writeIndex(0)
  5. {
  6. packets = new PacketBuffer[bufSize];
  7. }
  8. CircledBuffer::~CircledBuffer(void)
  9. {
  10. delete []packets;
  11. }

源代码下载链接