使用boost 的thread和asio库实现jpeg图片传输

时间:2021-07-15 05:25:55

 

 编写本文的主要目的是熟悉boost threadasio;主要实现下面内容;

1)实现jpeg图片传输;客户端将jpeg图片传输到服务端,服务端接收,打印接收到的jpeg数据长度

2)客户端使用boost thread开一个线程不断的读jpeg图片,然后进行传输,传输使用asio库;

3)服务端使用asio库,进行jpeg图片接收,接收到图片后,打印一行日志,接收到jpg图片的size是多少?

程序中只使用stl和boost库,不使用任何其它不利于移植的库;

实现要点:

客户端:

1)首先使用boost::filesystem,对指定路径下的jpg图片进行枚举;然后将枚举出来的图片放入STL Vector 容器;

2)文中使用了boost thread库,创建线程,为了演示多线程之间的同步,使用了2个线程,一个线程专门用读一帧jpeg图片,另外一个线程用来发送jpeg数据;

3)jpeg发送线程中调用asio库进行发送;将asio的sock单独封一个简单的类(CClientSock);

4)两个线程使用boost的互斥锁进行同步;

 //多线程同步
 typedef boost::try_mutex MUTEX;
 typedef MUTEX::scoped_try_lock LOCK;
  MUTEX iomutex;

5)文中演示了如何使用boost bind库,boost时间库

  boost::this_thread::sleep(boost::posix_time::milliseconds(5));

6)文中很好的使用了STL的vector和List容器;

 

客户端代码如下:

1)sock 封装类

ClientSock.h

#ifndef __CLIENT_SOCK__   
#define __CLIENT_SOCK__   

#include <boost/asio.hpp>  
#include <boost/bind.hpp>  
using namespace boost;

using boost::asio::io_service;     
using boost::asio::ip::tcp;  
using namespace boost::asio; 

#include <string>
#include <iostream>
using namespace std;

class CClientSock
{
public:
	CClientSock(tcp::endpoint& endpoint);
	virtual ~CClientSock(void);

	unsigned int send (char *pBuff,unsigned int len);     
	unsigned int recv (char *pBuff, int len);     
	void close ();     

private:     
	io_service * io_service_;     
	tcp::socket * socket_;     

};

#endif


ClientSock.cpp

#include "ClientSock.h"

CClientSock::CClientSock(tcp::endpoint& endpoint)
{
	//
	io_service_ = new io_service();     
	socket_ = new tcp::socket(*io_service_);
	boost::system::error_code ec;  
	socket_->connect(endpoint, ec); 

	if (ec)     
	{     
		std::cerr << "Error: " << ec << std::endl;     
		throw ec;  
	} 
}

CClientSock::~CClientSock(void)
{
	//
	delete io_service_;
	delete socket_;
}

//
 unsigned int  CClientSock:: send (char *pBuff,unsigned int len)
{
	boost::asio::const_buffers_1 request(pBuff, len);     
	return socket_->send(request);     
}

 //
 unsigned int CClientSock:: recv ( char *pBuff, int len)
{
	//
	//return boost::asio::read( socket_ ,boost::asio::buffer(static_cast<void*>(pBuff),len));
	return 0;
}

 //
 void CClientSock::close ()
{
	//
	  socket_->close();    
}


2)客户端传输封装类

ClientTransfer.h

#ifndef __CLIENT_TRANSFER__   
#define __CLIENT_TRANSFER__   

#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/filesystem.hpp>
#include <boost/timer.hpp>
using namespace boost;

#include <iostream>
#include <string>

#include <vector>
#include <algorithm>
#include <fstream>
#include <List>
using namespace std;

#include "ClientSock.h"

typedef struct  
{
	unsigned char *pBuff;	//图像Buff指针
	unsigned int length;	//Buff总的大小
	unsigned int valid_len;	//有效数据的长度
}image_buff;

#define MAX_BUFF_SIZE 1024*1024
#define MAX_BUFF_NUMBER 4

class CClientTransfer
{
public:
	CClientTransfer(std::string jpgPath);
	virtual ~CClientTransfer(void);

	void StartThread(void);
	void WaitThreadTerminate(void);
private:
	
	bool GetJpegData(image_buff &imageBuff,std::string jpgPath);
	bool scanFilesUseRecursive(const string& rootPath,vector<string>& container=*(new vector<string>()));

	static void ReadJpegThread(void *arg);
	void ReadJpegData(void);
	boost::thread m_ReadThread;		//读Jpeg线程

	//
	static void SendJpegThread(void *arg);
	void SendJpegData(void);
	boost::thread m_SendThread;		//发送Jpeg图片

	std::string m_jpgPath;
	vector<string>	m_vecFilePic;	//jpeg目录
	unsigned int m_PicId;			//jpeg vector 容器中的第几张图片

	//图像Buff 空List
	list <image_buff> IdleList;		
	//发送图像Buff List
	list <image_buff> SendBuffList;
	bool m_bQuit;		//线程结束标志
	
	//多线程同步
	typedef boost::try_mutex MUTEX;
	typedef MUTEX::scoped_try_lock LOCK;
	 MUTEX iomutex;

	 //asio
	 CClientSock *m_sock;
};

#endif


 

ClientTransfer.cpp

#include "ClientTransfer.h"

//参数jpgPath:保存jpg序列的目录
CClientTransfer::CClientTransfer(std::string jpgPath)
{
	m_jpgPath=jpgPath;

	//分配内存
	image_buff desc;
	desc.pBuff = NULL;
	desc.length = MAX_BUFF_SIZE;
	desc.valid_len = 0;

	for (int i = 0; i < MAX_BUFF_NUMBER; ++i) {
		desc.pBuff = new unsigned char [MAX_BUFF_SIZE];
		IdleList.push_back(desc);
	}

	m_PicId=0;
	m_bQuit=false;

	ip::tcp::endpoint ep(ip::address_v4::from_string("127.0.0.1"), 8100); 

	m_sock=new CClientSock(ep);

}

//
CClientTransfer::~CClientTransfer(void)
{
	//
	std::list<image_buff>::iterator iter;

	//注意加锁
	LOCK lock(iomutex);
	for (iter = SendBuffList.begin();iter != SendBuffList.end(); ++iter)
	{
		delete []((*iter).pBuff);
		(*iter).pBuff = NULL;
	}

	SendBuffList.clear();

	for (iter = IdleList.begin();iter != IdleList.end(); ++iter)
	{
		delete []((*iter).pBuff);
		(*iter).pBuff = NULL;
	}

	IdleList.clear();

	if (m_sock)
	{
		delete m_sock;
		m_sock=NULL;
	}

}

//函数功能:从jpgPath中获取jpeg数据
bool CClientTransfer::GetJpegData(image_buff &imageBuff,std::string jpgPath)
{
	//
	bool status=false;

	ifstream ifs(jpgPath.c_str(), std::ios_base::binary);

	unsigned int nJpgLen = 0;

	if ( ifs.is_open() )
	{
		ifs.seekg( 0 , std::ios::end );
		nJpgLen = ifs.tellg();

		if ( nJpgLen > 0 &&nJpgLen<=imageBuff.length)
		{
			ifs.seekg( 0 , std::ios::beg );
			ifs.read((char*)imageBuff.pBuff,nJpgLen);
			imageBuff.valid_len=nJpgLen;

			status=true;
		}else
		{
			status=false;
		}

		ifs.close();
	}

	return status;
}


//函数功能:从文件夹下枚举.jpg图片,放入到jpg的vector容器中
bool CClientTransfer::scanFilesUseRecursive(const string& rootPath,vector<string>& container)
{
	//

	namespace fs = boost::filesystem;    
	fs::path fullpath (rootPath, fs::native);   

	vector<string> &ret = container;   

	//判断路径是否存在
	if(!fs::exists(fullpath))
	{
		return false;
	}    

	fs::recursive_directory_iterator end_iter;   

	for(fs::recursive_directory_iterator iter(fullpath);iter!=end_iter;iter++)
	{    

		try{            
			//是文件名&&.jpg
			if (!fs::is_directory( *iter ) &&(fs::extension(*iter)==".jpg"||fs::extension(*iter)==".JPG"||fs::extension(*iter)==".jpeg"||fs::extension(*iter)==".JPEG") )
			{
				//
				std::string jpgstring=iter->path().string();

				ret.push_back(jpgstring);                
				std::cout << *iter << " is a file" << std::endl;       
			}

		} catch ( const std::exception & ex )
		{            
			std::cerr << ex.what() << std::endl;           
			continue;        
		}    
	} 

	return true;

}

//
void TransferThread(image_buff &imageBuff)
{
	//
	printf("imageBuff len=%d\n",imageBuff.valid_len);

}

void CClientTransfer::ReadJpegThread(void *arg)
{
	//
	CClientTransfer *pThis=(CClientTransfer *)arg;

	while (1)
	{
		if (pThis->m_bQuit)
		{
			break;
		}

		pThis->ReadJpegData();
	}
}

//读jpeg数据
void CClientTransfer::ReadJpegData(void)
{

	image_buff struImageBuff;
	memset(&struImageBuff,0,sizeof(struImageBuff));

	//多线程必须加锁
	if (IdleList.size()>0)
	{
		LOCK lock(iomutex);			//锁定mutex
		struImageBuff=IdleList.front();
		IdleList.pop_front();
	}else
	{
		//使用boost ::时间库替换
		boost::this_thread::sleep(boost::posix_time::milliseconds(5));
		return;

	}

	LOCK lock(iomutex);			//锁定mutex
	if (m_PicId<m_vecFilePic.size())
	{
		//
		std::string jpgPath=m_vecFilePic[m_PicId].c_str();

		if (GetJpegData(struImageBuff,jpgPath))
		{
			m_PicId++;

			SendBuffList.push_back(struImageBuff);
			cout<<"read a jpeg image"<<endl;

		}else
		{
			IdleList.push_back(struImageBuff);
		}
	}else
	{
		IdleList.push_back(struImageBuff);

		cout<<"all jpeg image read complete"<<endl;
	}

}

//发送Jpeg数据
void CClientTransfer::SendJpegThread(void *arg)
{
	//
	CClientTransfer *pThis=(CClientTransfer *)arg;

	while (1)
	{
		if (pThis->m_bQuit)
		{
			//
			break;
		}

		pThis->SendJpegData();
	}
}

void CClientTransfer::SendJpegData(void)
{
	//
	image_buff struImageBuff;
	memset(&struImageBuff,0,sizeof(struImageBuff));

	//加锁
	if (SendBuffList.size()>0)
	{
		LOCK lock(iomutex);			//锁定mutex
		struImageBuff=SendBuffList.front();
		SendBuffList.pop_front();

	}else
	{

		boost::this_thread::sleep(boost::posix_time::milliseconds(5));
		return;
	}

	//调用boost Asio发送数据
	m_sock->send((char*)struImageBuff.pBuff,struImageBuff.valid_len);

	LOCK lock(iomutex);			//锁定mutex
	IdleList.push_back(struImageBuff);

	cout<<"send a jpeg image"<<endl;

}

void CClientTransfer::StartThread(void)
{

	scanFilesUseRecursive(m_jpgPath,m_vecFilePic);

	m_ReadThread= boost::thread(boost::bind(&CClientTransfer::ReadJpegThread,this));
	m_SendThread=boost::thread(boost::bind(&CClientTransfer::SendJpegThread,this));
}

//
void CClientTransfer::WaitThreadTerminate(void)
{
	//
	m_ReadThread.join();
	m_SendThread.join();

}


 

3)客户端传输类调用代码

client.cpp

#include "stdafx.h"
#include "ClientTransfer.h"

int _tmain(int argc, _TCHAR* argv[])
{

	CClientTransfer clientTransfer("D:\\Tool\\");
	clientTransfer.StartThread();

	clientTransfer.WaitThreadTerminate();

	return 0;
}


 

服务端:

1)做一个简单的服务器,对jpeg图片进行接收,服务器代码如下:

// boostAsio.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
using boost::asio::ip::tcp;
#define max_len 1024*1024

class clientSession
	:public boost::enable_shared_from_this<clientSession>
{
public:
	clientSession(boost::asio::io_service& ioservice)
		:m_socket(ioservice)
	{
		memset(data_,'\0',sizeof(data_));
	}

	~clientSession()
	{
		//

	}

	tcp::socket& socket()
	{
		return m_socket;
	}

	void start()
	{
		//boost::asio::async_write(m_socket,
		//	boost::asio::buffer("link successed!"),
		//	boost::bind(&clientSession::handle_write,shared_from_this(),
		//	boost::asio::placeholders::error));


		m_socket.async_read_some(boost::asio::buffer(data_,max_len),boost::bind(&clientSession::handle_read,shared_from_this()
			,boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
	}
private:
	void handle_write(const boost::system::error_code& error)
	{

		if(error)
		{
			m_socket.close();
		}

	}

	void handle_read(const boost::system::error_code& error,size_t bytes_transferred)
	{

		if(!error)
		{
			//std::cout << data_ << std::endl;
			std::cout<<"read jpeg size="<<bytes_transferred<<std::endl;

			m_socket.async_read_some(boost::asio::buffer(data_,max_len),boost::bind(&clientSession::handle_read,shared_from_this()
				,boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));
		}
		else
		{
			m_socket.close();
		}

	}
private:
	tcp::socket m_socket;
	char data_[max_len];
};


class serverApp
{
	typedef boost::shared_ptr<clientSession> session_ptr;
public:
	serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint)
		:m_ioservice(ioservice),
		acceptor_(ioservice,endpoint)
	{
		session_ptr new_session(new clientSession(ioservice));
		acceptor_.async_accept(new_session->socket(),
			boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
			new_session));
	}
	~serverApp()
	{
	}
private:
	void handle_accept(const boost::system::error_code& error,session_ptr& session)
	{
		if(!error)
		{
			std::cout << "get a new client!" << std::endl;
	
			session->start();

			session_ptr new_session(new clientSession(m_ioservice));
			acceptor_.async_accept(new_session->socket(),
				boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,
				new_session));
		}
	}
private:
	boost::asio::io_service& m_ioservice;
	tcp::acceptor acceptor_;
};

int main(int argc , char* argv[])
{
	boost::asio::io_service myIoService;
	short port = 8100;

	tcp::endpoint endPoint(tcp::v4(),port);

	serverApp sa(myIoService,endPoint);

	myIoService.run();
	return 0;
}