Unix网络编程学习日记(三):多进程全双工socket客户端的实现

时间:2022-12-01 08:52:37

本程序其实写在上两篇博客的程序之前,思想更不成熟,代码更冗杂,bug也可能更多。使用父子进程的方式实现多线程可以实现的功能有些多此一举,事实上本程序的代码量是上一篇中多线程实现的三倍,内存占用是两倍有余。
不过有趣的一点是,这个小程序让我练习了三种进程间通信方法:信号、管道、消息队列。

一、总述

本设计为一网络通信客户端程序,实现以下要求:
1、通过网络连接PC上运行的服务器程序,服务器测试程序不用实现,暂时不用考虑服务器给设备的应答。
2、使用TCP方式连接。
3、客户端端可以通过串口输入指令控制客户端操作。控制指令包含:连接服务器、断开服务器、给服务器发测试语句、给服务器发送一个小文件。
4、客户端可以接受服务器下发的数据打印显示并保存成文件

二、总体设计

本设计的网络、文件操作基于Linux标准库函数,因此具有较强的可移植性,在x86平台、ARM平台均可良好运行。程序分为一个负责接收键盘指令、数据显示、数据存储的主进程,和两个负责接收、读取socket包的子进程。子进程没有任务时均处于阻塞状态,进程间同步由信号和管道的阻塞机制实现,耗费资源较少,且在网络通信请求产生时能够快速响应。接收到的信息形式暂存在链表中,需要储存时出队并以基于行的IO形式存入文件。

三、模块设计

1、信息接收模块

该模块实体为RecvProc,实现了socket包的接收、显示、通知主模块处理的功能。
与主模块间单工异步通信,使用信号和消息队列两种IPC机制。该模块收到socket包时,先和时间戳绑定后存入消息队列,再通过用户自定义信号通知主进程处理消息队列。
Unix网络编程学习日记(三):多进程全双工socket客户端的实现

2、信息发送模块

该模块实体为SendProc,实现了socket包的发送功能。
与主模块间单工同步通信,使用了匿名管道机制。该模块无任务时由匿名管道的read函数实现阻塞,当主模块需要发送消息时,管道连通,阻塞解除。发送子进程读取管道中的待发送信息,传递给send函数发送
Unix网络编程学习日记(三):多进程全双工socket客户端的实现

3、主模块

该模块实体为MainProc,为1、2两子进程的父进程,实现了进程调度、接收键盘指令、数据显示、数据存储等功能。
程序启动时,该模块首先初始化,得到socket、消息队列等资源的描述符,再经由发送、接收模块的构造函数传递给两模块。两模块初始化完成后,由主模块创建子进程,在子进程中运行两模块的业务功能。
Unix网络编程学习日记(三):多进程全双工socket客户端的实现

四、数据结构

本设计使用单向队列暂存接收到的数据,容器为deque。具有首尾数据读写效率高,数据存储在内存中分块,利于大量存储的特点。
Unix网络编程学习日记(三):多进程全双工socket客户端的实现
Unix网络编程学习日记(三):多进程全双工socket客户端的实现

五、用户界面设计

本设计使用命令行进行操作,在程序运行时,提示用户输入服务器ip和端口
remote ip:192.168.51.89
remote port:2345

1、连接服务器

若连接成功则提示

connected successfully 192.168.51.89:2345

否则,则提示错误并显示错误原因,如:
connect error!: Connection refused

2、功能选项

连接成功后,显示功能选项:

1.save the message in buffer into a file
2.send message to the server
3.send file to the server
0.disconnect to the server

当光标前标有“[command]”时,可输入以上选项进入相应功能。亦可输入“?”重新显示该菜单。

3.接收消息

程序启动后,接收进程始终在后台运行。当接收到服务器消息后,消息立即在屏幕显示,以[recv]开头,并附有接收时间。如:

[recv][20170807 10:52:19] test message

4.储存消息

选择选项“1”即进入储存消息功能。此时光标前提示“[path]”,在此输入储存消息的文件路径并回车。若文件不存在则自动生成。然后,屏幕中会显示文件写入的内容,如:

[path]/tmp/output.txt

write:20170807 10:52:19 test message write:20170807 10:53:50
test message again

write file complete!

5.发送消息

选择选项“2”即进入发送消息功能。此时光标前提示“[msg]”,在此输入待发送的消息并回车,如:

[msg]test message

6.发送文件

选择选项“3”即进入储存消息功能。此时光标前提示“[path]”,在此输入待发送的文件路径并回车。若文件不存或无读取权限则由相应提示,如:

read the file to be send failed: No such file or directory

若读取成功,则输出读取文件并发送的内容,如:

openfile success [send]#BEGIN# [send]20170807 08:36:49 987
[send]20170807 08:36:50 654 [send]20170807 08:36:51 321
[send]20170807 10:52:19 test message [send]20170807 10:53:50
test message again [send]#EOF#

其中,首尾两条并非文件内容,是为将文件与普通消息相区别的标识符。

7.退出程序

选择选项“0”即则退出程序。此时程序会结束子进程,清理系统资源并退出。

六、容灾设计

1.程序被强行终止

为防止程序在终端被用户使用CTRL+C强行终止,导致系统资源没有及时释放,本设计捕捉了SIGINT信号,接收到信号时激活各模块的析构函数,保证释放系统资源、回收垃圾后退出。

2.功能选项输入错误

若功能选项输入错误,则要求用户重新输入

3.参数输入错误

当输入传递给系统函数的参数输入错误,例如文件路径、ip地址等错误时,输出系统维护的错误代码对应的错误提示,并退出当前函数,防止段错误等异常情况发生。


main.c

#include "main_proc.h"
#include "recv_proc.h"
#include "send_proc.h"

MainProc *mainProc;
RecvProc *recvProc;
SendProc *sendProc;

int main(void)
{
void signalMqCaller(int);
void signalClean(int);

mainProc = new MainProc();
recvProc = new RecvProc(mainProc->getMsgid(),
mainProc->getSockfd(),mainProc->getMainpid());
sendProc = new SendProc(mainProc->getSockfd(),mainProc->getPipefd());
signal(SIGUSR1 , signalMqCaller);
signal(SIGINT, signalClean);

mainProc->startRecvProc(recvProc);
mainProc->startSendProc(sendProc);
mainProc->startMainProc();
}

void signalMqCaller(int)
{
mainProc->sigMsgque();
}

void signalClean(int)
{
cout << "catch CTRL+C signal" <<endl;
delete mainProc;
delete recvProc;
delete sendProc;
}

main_proc.cpp

/*
* 主进程,负责用户交互、进程管理和数据管理
*/

#include "main_proc.h"

#define MSQTYPE 1000
#define BUFSIZE 1024
#define SENDDELAY 200000

MainProc::MainProc()
{

/*消息队列初始化*/
key_t unique_key=0;
if((msgid = msgget(unique_key, IPC_PRIVATE | 0666)) == -1)
{
perror("init message queue failed");
exit(-1);
}
sndMsg.mtype = MSQTYPE;

/*匿名管道初始化*/
if(pipe(pipefd) < 0)
{
perror("init pipe failed");
exit(-1);
}

/*socket初始化*/
struct sockaddr_in addr;
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) <0)
{
perror("fail to create socket");
exit(-1);
}
memset(&addr,0,sizeof(struct sockaddr_in));

/*连接到服务器*/
string remote_ip;
int remote_port;
cout << "remote ip:";
// remote_port = 2345;
// remote_ip = string("192.168.17.1");
cin>>remote_ip;
cout << "remote port:";
cin>>remote_port;
cout <<endl;
addr.sin_family = AF_INET;
addr.sin_port = htons(remote_port);
addr.sin_addr.s_addr = inet_addr(remote_ip.data());
if(connect(sockfd,(struct sockaddr *)(&addr),sizeof(struct sockaddr)) < 0)
{
perror("connect error!");
exit(-1);
}

else
{ cout << "-------------------------------------------------"<<endl;
cout << "connected successfully " << remote_ip <<":" << remote_port<<endl;
cout << "-------------------------------------------------"<<endl;
}
mainPid = getpid();
}

/*析构函数,回收资源*/
MainProc::~MainProc()
{
msgctl(msgid, IPC_RMID, 0);
shutdown(sockfd,SHUT_RDWR);
close(sockfd);
kill(recvPid,SIGTERM);
if(wait(NULL) == -1)
{
perror("no child processclose to be closed");
exit(0);
}
}

int MainProc::getMsgid()
{

return msgid;
}

int MainProc::getSockfd()
{
return sockfd;
}

int MainProc::getMainpid()
{
return mainPid;
}

int* MainProc::getPipefd()
{
return pipefd;
}

/*消息队列处理函数,已和SIGUSR1信号绑定*/
void MainProc::sigMsgque()
{
if(( msgrcv(msgid, (struct msgbuf *)&rcvMsg, BUFSIZE, MSQTYPE, IPC_NOWAIT)) != -1)
{
/*新的消息结点从队尾入列*/
socketData.push(pair<time_t,string>(rcvMsg.revTime,string(rcvMsg.mtext)));
}
else
{
perror("msg queue rev error");
msgctl(msgid, IPC_RMID, 0);
exit(-1);
}
}

/*菜单中功能1:文件存储,将此前接收到的消息和时间戳储存到文件*/
void MainProc::saveFile()
{
string filePath;
struct tm *pTime;
char tmpStr[BUFSIZE];
FILE *fp;
cout << "[path]" <<endl;
cin >> filePath;
fp = fopen(filePath.data(),"at");
if(fp == NULL)
{
perror("open file failed");
return;
}
else
{
while(socketData.size() != 0)
{
/*格式化输出:time+data*/
pTime = localtime(&socketData.front().first);
sprintf(tmpStr,"%4d%02d%02d %02d:%02d:%02d ", pTime->tm_year + 1900,
pTime->tm_mon + 1, pTime->tm_mday, pTime->tm_hour,
pTime->tm_min, pTime->tm_sec);
strcat(tmpStr,socketData.front().second.data());
strcat(tmpStr,"\n");
if(fputs(tmpStr,fp) == EOF)
{
perror("write file failed");
return;
}
else
{
cout << "write:" << tmpStr << endl;
}
/*每次读取头结点,读取后头结点出队*/
socketData.pop();
}
cout << "write file complete!" << endl;
}
fflush(fp);
fclose(fp);

}

/*匿名管道信息发送函数*/
void MainProc::sendPipe(const char *msg)
{
if(sizeof(msg) > BUFSIZE)
{
cout << "msg is too long";
return;
}
close(pipefd[0]);
if(write(pipefd[1],msg,BUFSIZE) < 0)
{
perror("send to pipe failed");
}
usleep(SENDDELAY);
}

/*菜单中功能2:发送信息*/
void MainProc::sendMsg()
{
string sendBuf;
cout<<endl<<"[msg]";
getline(cin,sendBuf);
//cout << "getline:"<<sendBuf << endl;

sendPipe(sendBuf.data());

}

/*菜单中功能3:发送文件*/
void MainProc::sendFile()
{
string sendPath;
char readBuf[BUFSIZE];

//sendPath = string("/home/ljy/1.txt");
cout<<endl<<"[path]";
getline(cin,sendPath);
FILE *sendfd = NULL;
sendfd = fopen(sendPath.data(),"r+");
if(sendfd == NULL)
{
perror("read the file to be send failed");
return;
}
else
{
/*按行读取文件,每行发送一次,发送格式为#BEGIN#-data-#EOF#*/
cout << "openfile success"<<endl;
sendPipe("#BEGIN#\n");
while((fgets(readBuf,BUFSIZE,sendfd))!=NULL)
{
puts(readBuf);
sendPipe(readBuf);
}
sendPipe("#EOF#\n");

fclose(sendfd);

}

}

void MainProc::showMenu()
{
cout << endl;
cout << "1.save the message in buffer into a file" <<endl;
cout << "2.send message to the server" << endl;
cout << "3.send file to the server" << endl;
cout << "0.disconnect to the server" <<endl<<endl;

}

/*信息接收进程建立*/
void MainProc::startRecvProc(RecvProc *recvProc)
{
recvPid = fork();
if(recvPid == 0)
{
for(;;)
{
recvProc->recvSock();
}
}
}

/*信息发送进程建立*/
void MainProc::startSendProc(SendProc *sendProc)
{
sendPid = fork();
if(sendPid == 0)
{
for(;;)
{
sendProc->sendSock();

}
}
}

/*主进程建立*/
void MainProc::startMainProc()
{
char select;
showMenu();
while(cout<<endl<<"[command]"&&cin >> select)
{
getchar();
switch(select)
{
case '?':
{
showMenu();
}break;
case '0':
{
delete this;
exit(0);
}break;
case '1':
{
saveFile();
}break;
case '2':
{
sendMsg();
}break;
case '3':
{
sendFile();
}break;
default:break;
}
}
}

main_proc.h

#ifndef MAIN_PROC_H
#define MAIN_PROC_H

#include "recv_proc.h"
#include "send_proc.h"

#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <error.h>
#include <list>
#include <queue>
#include <string>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <string.h>
#include <signal.h>
#include <time.h>

using namespace std;

class MainProc
{
public:
MainProc();
~MainProc();
void menu();
int getMsgid();
int getSockfd();
int getMainpid();
int *getPipefd();
void startRecvProc(RecvProc *recvProc);
void startSendProc(SendProc *sendProc);
void startMainProc();
void sigMsgque();
private:
msgqueBuf sndMsg, rcvMsg;
int msgid,pipefd[2],sockfd;
int mainPid,recvPid,sendPid;
queue< pair<time_t,string> > socketData;
void saveFile();
void sendPipe(const char *msg);
void sendMsg();
void sendFile();
void showMenu();



};

#endif // MAIN_PROC_H

recv_proc.cpp

/*
* 接收进程,接收服务器送来的socket包,与时间戳配对后存入消息队列,
* 并通过信号通知主进程处理
*/

#include "recv_proc.h"

using namespace std;

#define MSQTYPE 1000
#define BUFSIZE 1024

RecvProc::RecvProc(int mid, int sfd, int mpid)
{
msgid = mid;
sockfd = sfd;
mainPid = mpid;
}

RecvProc::~RecvProc()
{

}
void RecvProc::recvSock()
{
/*socket接收*/
memset(buf, 0,sizeof(buf));
if(recv(sockfd,buf,sizeof(buf),0) <=0)
{
perror("socket recv error");
exit(EXIT_FAILURE);
}
/*格式化输出:[recv][time]data*/
time(&currentTime);
pTime = localtime(&currentTime);
cout <<endl<<"[recv]";
printf("[%4d%02d%02d %02d:%02d:%02d] ", pTime->tm_year + 1900,
pTime->tm_mon + 1, pTime->tm_mday, pTime->tm_hour,
pTime->tm_min, pTime->tm_sec);
cout<<buf << endl;
/*储存到消息队列*/
strcpy(sndMsg.mtext,buf);
sndMsg.mtype = MSQTYPE;
sndMsg.revTime = currentTime;
if(msgsnd(msgid, (struct msgbuf *)&sndMsg, BUFSIZE, 0) == -1)
{
perror("msgsnd error");
exit(EXIT_FAILURE);
}
/*发送信号给主进程,通知处理消息队列中的数据*/
kill(mainPid,SIGUSR1);
}

recv_proc.h

#ifndef SEND_PROC_H
#define SEND_PROC_H

#include <iostream>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <error.h>

using namespace std;

class SendProc
{
public:
SendProc(int sock, int pipe[]);
void sendSock();
private:
char buf[1024];
int sockfd,*pipefd;

};

#endif // SEND_PROC_H