一个基于python的即时通信程序

时间:2023-03-09 01:57:02
一个基于python的即时通信程序

5月17日更新:

广播信息、用户列表、信息确认列表以及通信信息,从原来的用字符串存储改为使用字典来存储,使代码更清晰,更容易扩展,具体更改的格式如下:

广播信息(上线):
{
'status': 信息状态标志,
'user_info': 本机的用户名和主机名,
'pub_key': 本机生成的公钥,
}
广播信息(下线):
{
'status': 信息状态标志,
'user_info': 本机的用户名和主机名,
} 用户列表的元素:
{
'user_info': 对应用户的用户名和主机名,
'pub_key': 对应用户的公钥,
'addr': 用户对应的ip,
} 信息确认列表的元素:
{
'confirm_seq': 信息序列号,
'user': 发送信息的用户的用户名,
'msg': 发送的信息,
'addr': 信息的目的ip和端口,
} 通信信息:
{
'status': 信息序列号,
'user': 发送信息的用户的用户名,
'msg': 发送的信息,
}

更新后的代码如下:

!/usr/bin/env python
#coding=utf-8
#author: cjyfff
#blog: http://www.cnblogs.com/cjyfff/ import socket
import os
import threading
import traceback
import rsa user_list = []
confirm_list = []
username = os.environ['USER']
hostname = os.popen('hostname').read()
(pubkey, privkey) = rsa.newkeys(1024)
pub = pubkey.save_pkcs1() class MyThread(threading.Thread):
'''这个类用于创建新的线程''' def __init__(self, func, args, name=''):
threading.Thread.__init__(self)
self.name = name
self.func = func
self.args = args def run(self):
apply(self.func, self.args) def broadcast(broADDR, status):
'''发送广播信息模块
用于发送广播信息给其他主机,通知其他主机本主机上线\下线状态,以及发送本机的信息给其他主机。
这个模块会在广播信息前添加上status这个参数的值。在本程序中,当需要通知其他主机,本机已经上线时,
会传递"online"给status,当需要通知其他主机本机即将下线时,会传递"offline"给status。
'''
global username, hostname, pub def broadcast_send(oMsg):
udpSock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udpSock2.sendto(oMsg, broADDR)
oMsg = {}
if status == 'online':
oMsg = {
'status': status,
'user_info': ' '.join([username, hostname]),
'pub_key': pub,
}
broadcast_send(str(oMsg))
elif status == 'offline':
oMsg = {
'status': status,
'user_info': ' '.join([username, hostname]),
}
broadcast_send(str(oMsg)) def recv_msg(localADDR, BUFSIZ, udpSock, port):
'''信息接收模块
这个模块的主要功能是,跟据接收到的广播信息更新用户列表,以及处理对端发送过来信息
'''
global user_list, confirm_list, username, hostname, pub, privkey while True:
try:
data, addr = udpSock.recvfrom(BUFSIZ)
except:
break
if data:
data = eval(data)
addr = addr[0]
if data['status'] == 'online':
user_list_info = {
'user_info': data['user_info'],
'pub_key': data['pub_key'],
'addr': addr,
}
if user_list_info not in user_list:
user_list.append(user_list_info)
# 把对方添加上用户列表的同时,还要把自己的信息发给对方,以便对方更新用户列表
respond_msg = {
'status': 'respon_online',
'user_info': ' '.join([username, hostname]),
'pub_key': pub,
}
udpSock.sendto(str(respond_msg), (addr, port)) elif data['status'] == 'offline':
user_list_info = {
'user_info': data['user_info'],
}
for i in xrange(len(user_list)):
for k, v in user_list[i].iteritems():
if user_list_info['user_info'] == v:
del user_list[i] elif data['status'] == 'respon_online':
user_list_info = {
'user_info': data['user_info'],
'pub_key': data['pub_key'],
'addr': addr,
}
if user_list_info not in user_list:
user_list.append(user_list_info) elif data['status'] == 'quit':
print "对方已断开连接,请输入'quit'或'q'返回主菜单"
continue elif data['status'] == 'local_quit':
continue else:
confirm_msg = data['status']
# 假如收到的确认标志和确认表中的某项匹配,删除该项
for i in xrange(len(confirm_list)):
if confirm_list[i]['confirm_seq'] == confirm_msg:
del confirm_list[i]
if not data['msg']:
continue
addr_list = []
for x in user_list:
# 提取出用户表中所有用户的地址,存到addr_list中:
addr_list.append(x['addr']) # 检查发送信息的用户的地址是否在用户列表当中:
if addr in addr_list:
# 反馈收到确认信息给对方:
confirm_res = {'status': confirm_msg, 'msg': 0}
udpSock.sendto(str(confirm_res), (addr, port))
# 打印信息:
data_user = data['user']
try:
data_msg = rsa.decrypt((data['msg']), privkey)
except DecryptionError:
print "解码出现异常,请重新连接"
continue
print data_user, ":", data_msg def print_userlist():
'''打印用户列表模块'''
global user_list
user_list_len = len(user_list)
print "当前有%d个用户在线:" % user_list_len
for i in xrange(user_list_len):
print "ID:", i+1, ":", user_list[i]['user_info'].strip('\n'), \
"come from:", user_list[i]['addr'] def send_msg(udpSock, cli_addr, cli_pub_key, port):
'''信息发送模块'''
import random
global username, confirm_list
quit_list = ['q', 'quit', 'exit']
cli_pub_key_rip = rsa.PublicKey.load_pkcs1(cli_pub_key) while True:
msg = raw_input("> ")
if msg in quit_list:
# quit_msg_to_local用于通知本机对话结束,回收socket
quit_msg_to_local = {'status': 'local_quit'}
quit_msg_to_cli = {'status': 'quit'}
udpSock.sendto(str(quit_msg_to_local), ('localhost', port))
udpSock.sendto(str(quit_msg_to_cli), cli_addr)
break random_num = random.randint(0, 1000)
msg = rsa.encrypt(msg, cli_pub_key_rip)
output_msg = {
'status': str(random_num),
'user': username,
'msg': msg,
}
confirm_list_member = {
'confirm_seq': str(random_num),
'user': username,
'msg': msg,
'addr': cli_addr,
}
confirm_list.append(confirm_list_member) udpSock.sendto(str(output_msg), cli_addr) def confirm_successd(udpSock):
'''确认信息到达模块
采用类似于最久未使用(LRU)算法,每隔5秒钟检查一下信息确认列表(confirm_list),当信息确认列表长度大于5时(
也就是说未确认接收的信息大于5),把信息确认列表前一半的信息再一次发送。
'''
import time
global confirm_list while True:
confirm_list_len = len(confirm_list)
if confirm_list_len > 5:
for i in xrange(confirm_list_len/2):
repeat_output_msg = {
'status': confirm_list[i]['confirm_seq'],
'user': confirm_list[i]['user'],
'msg': confirm_list[i]['msg'],
}
#msg = confirm_list[i][0]
addr = confirm_list[i]['addr']
udpSock.sendto(str(repeat_output_msg), addr)
time.sleep(5)
else:
time.sleep(5) def option(udpSock, BUFSIZ, broADDR, port):
'''选项菜单模块'''
while True:
print '''
请输入您的选项:
1 显示用户列表
2 连接到指定用户,并开始对话
3 退出
'''
action = raw_input("> ")
if action is '':
print_userlist() elif action is '':
client_option = raw_input("您想连接到哪个用户?,请输入对应的id号:\n")
try:
# 获取对端的地址
cli_addr = (user_list[int(client_option)-1]['addr'], port)
cli_pub_key = user_list[int(client_option)-1]['pub_key']
except IndexError:
print "没有这个用户,请重新选择:"
continue
print "已建立好连接,可以开始对话,输入quit或q可以结束会话"
threads = []
t2 = MyThread(send_msg, (udpSock, cli_addr, cli_pub_key, port), send_msg.__name__)
threads.append(t2)
t3 = MyThread(confirm_successd, (udpSock, ), confirm_successd.__name__)
threads.append(t3)
for t in threads:
t.setDaemon(True)
t.start()
t2.join()#send_msg中止之前,让父线程一直在阻塞状态
print "连接中断,返回主菜单" elif action is '':
broadcast(broADDR, 'offline')
udpSock.close()
print "再见!"
break else:
pass def main():
'''主函数'''
host = ''
port = 2425
broADDR = ('<broadcast>', port)
localADDR = (host, port)
BUFSIZ = 1024
try:
broadcast(broADDR, 'online')
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.bind(localADDR)
t1 = MyThread(recv_msg, (localADDR, BUFSIZ, udpSock, port, ),
recv_msg.__name__)
t1.setDaemon(True)
t1.start()
option(udpSock, BUFSIZ, broADDR, port)
except (KeyboardInterrupt, SystemError):
udpSock.close()
raise
except:
traceback.print_exc if __name__ == '__main__':
main()

cjyfffIM_v0.3

4月23日更新:

已实现RSA加密功能

4月18日:

额。。。本来想用弄一个类似于“飞鸽传书”那样的软件的,目前已经实现了一部分功能了,还有一部分功能没有实现,暂时把这篇文章当作是开发文档,以后添加了新功能后再修改这篇文章吧。有什么错漏的地方请各位大牛提出哈。

目前已经实现了的功能:

1 自动发现局域网内也运行了本程序的机器,把该机器添加到客户列表,假如对方下线的话,自动在客户列表中删除对应的信息。

2 具备确认机制,对方收到我方发送的信息后会反馈回来确认信息,没有收到确认的信息将会在一段时间之后重新发送。

3 信息采用RSA加密

待实现的功能:

1 实现文件传输功能

2 优化代码,使代码的可读性增强

程序各个模块的简单逻辑关系如下图:

一个基于python的即时通信程序

各个模块功能表述:

一、选项菜单模块(option)

这个模块有3个选项,分别是:

选项1 打印用户列表。通过调用print_userlist()函数把当前用户列表中的用户打印出来。

选项2 与指定的用户建立连接。根据用户输入的id号,与用户列表中的指定用户建立连接。

选项3 退出程序。在退出前首先会调用发送广播信息模块(broadcast),向局域网广播一条信息通知本机即将下线,然后关闭socket,最后再退出程序。

二、发送广播信息模块(broadcast)

这个模块的作用是在程序启动(退出)时,向局域网内的其他机器发送广播,通知其他机器在各自的用户列表中添加(删除)此用户。

假设本机的用户名是Mike,主机名是Mike‘PC

本机上线的广播信息将是:online^Mike Mike’PC‘^Mike’PC的rsa公钥

本机下线的广播信息将是:offline^Mike Mike’PC’

三、信息发送模块(send_msg)

这个模块运行在一个循环当中,不断的处理用户的输入。

假如用户输入退出指令('q', 'quit', 'exit'),这时候这个模块首先向本机发送一个“local^quit”信息,让本机的信息接收模块(recv_msg)停止接收数据,同时发送一个“quit”给对方,通知对方连接即将中断,然后退出循环,让程序回到选项菜单模块(option)。

假如用户输入的不是退出指令,那么就认为用户将要发送的是正常信息。这里要提一下这个程序中确认机制的实现原理:本机在发送一个消息出去的时候,会在消息的头部加上一个(0~9999)的随机数作为确认标记,同时把这个消息添加到信息确认列表(confirm_list)。对端收到这条消息后,会把确认标记发送回来,然后本机就会根据所接收到的确认标记删除信息确认列表(confirm_list)所对应的条目,这样就认为一条消息对方已经成功接收。

回到具体实现的过程,这个模块会在输入的信息之前加上一个(0~9999)的随机数作为标记,同时加上用户名。例如本机Mike用户向对端一个ip地址为192.168.1.10的用户发送一个“Hello”,那么经这个模块发送出去的信息可能是这样:“1255^Mike^Hello”。同时这个模块会在信息确认列表(confirm_list)中添加上“[1255^Mike^Hello,192.168.1.10]”这样的一条记录。

四、信息接收模块(recv_msg)

这个模块的主要功能是,跟据接收到的广播信息更新用户列表(confirm_list),以及处理对端发送过来信息。

假如收到以“online”开头的信息,这个模块会认为这是对端发送过来的通知上线的广播信息,于是便会在信息中提取出用户名以及主机名,再加上对端的ip地址和端口,添加到用户列表中。并且以一条以“respon_online”开头的信息反馈给对方本机的信息,以便对方也可以更新用户列表。例如收到从192.168.1.11发送过来的一条“online^Kate Kate'PC'^Kate'PC'的rsa公钥”这样一条广播信息后,本机将在用户列表中添加上“[['Kate Kate'PC', Kate'PC'的rsa公钥], ('192.168.1.11', 12345)]”(这个端口号是随机分配的),同时本机返回一条这样的信息给对方:respon_online^'Mike Mike'PC'^Mike'PC'的rsa公钥。

假如是本机收到以“respon_online”开头的信息的话,那就跟上面“online”的情况一样,提取出用户名、主机名、ip地址和端口,添加到用户列表(confirm_list)上。

假如收到的是以“offline”开头的信息,就提取出用户名、主机名、ip地址和端口,检查用户列表(confirm_list)中有没有对应的条目,假如有的话就删除掉对应的条目。

假如收到的是“quit”信息,说明对端即将断开连接,这个时候本模块将提示用户输入退出命令,以便退出连接。

假如收到的是“local^quit”信息,说明本机即将断开连接,这个时候本模块将返回模块的开头,准备接收新的信息。

假如接收到的信息不满足以上的条件,就会被认为是用户间发送的正常消息:

首先要提取消息头部的确认标志。如果收到的信息除了确认标志外没有其他内容了,那么这条消息会被认为是对端在收到本机发送出去的信息后,反馈回来的确认信息,因此接下来的工作就是根据确认标志,查找信息确认列表(confirm_list)所对应的条目并删除。

假如处理确认标志外还有其他内容,那么这条信息就是对端用户所输入的信息,于是首先提取出确认标志返回给对端,然后再本机上打印出对方所输入的内容。

五、确认信息到达模块(confirm_successd)

这个模块采用类似于最久未使用(LRU)算法,每隔5秒钟检查一下信息确认列表(confirm_list),当信息确认列表长度大于5时(也就是说未确认接收的信息大于5),把信息确认列表前一半的信息再一次发送。

最后是这个程序的代码:

#! /usr/bin/env python
#coding=utf-8
#author: cjyfff
#blog: http://www.cnblogs.com/cjyfff/ import socket
import os
import pwd
import threading
import traceback
import random
import time
import rsa user_list = []
confirm_list = []
username = pwd.getpwuid(os.getuid())[0]
hostname = os.popen('hostname').read()
(pubkey, privkey) = rsa.newkeys(1024)
pub = pubkey.save_pkcs1() class MyThread(threading.Thread):
'''这个类用于创建新的线程''' def __init__(self, func, args, name=''):
threading.Thread.__init__(self)
self.name = name
self.func = func
self.args = args def run(self):
apply(self.func, self.args) def broadcast(broADDR, status):
'''发送广播信息模块
用于发送广播信息给其他主机,通知其他主机本主机上线\下线状态,以及发送本机的信息给其他主机。
这个模块会在广播信息前添加上status这个参数的值。在本程序中,当需要通知其他主机,本机已经上线时,
会传递"online"给status,当需要通知其他主机本机即将下线时,会传递"offline"给status。
'''
global username, hostname, pub def broadcast_send(oMsg):
udpSock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udpSock2.sendto(oMsg, broADDR) if status == 'online':
oMsg = status + "^" + username + ' ' + hostname + "^" + pub
broadcast_send(oMsg)
elif status == 'offline':
oMsg = status + "^" + username + ' ' + hostname
broadcast_send(oMsg) def recv_msg(localADDR, BUFSIZ, udpSock, port):
'''信息接收模块
这个模块的主要功能是,跟据接收到的广播信息更新用户列表,以及处理对端发送过来信息
'''
global user_list, confirm_list, username, hostname, pub, privkey while True:
try:
data, addr = udpSock.recvfrom(BUFSIZ)
except:
break if data.startswith('online'):
data = data.split('^')[1:]
if [data, addr] not in user_list:
user_list.append([data, addr])
# 把对方添加上用户列表的同时,还要把自己的信息发给对方,以便对方把更新用户列表
res_msg = 'respon_online^' + username + ' ' + hostname + "^" + pub
udpSock.sendto(res_msg, (addr[0], port)) elif data.startswith('offline'):
data = data.split('^')[1]
for i in xrange(len(user_list)):
if user_list[i][0][0] == data:
del user_list[i] elif data.startswith('respon_online'):
data = data.split('^')[1:]
if [data, addr] not in user_list:
user_list.append([data, addr]) elif data == 'quit':
print "对方已断开连接,请输入'quit'或'q'返回主菜单"
continue elif data == 'local^quit':
continue else:
confirm_recv = data.split('^')[0]
# 假如收到的确认标志和确认表中的某项匹配,删除该项
for i in xrange(len(confirm_list)):
if confirm_list[i][0].split('^')[0] == confirm_recv:
del confirm_list[i]
data = data.split('^')[1:]
if not data:
continue
addr_list = []
for x in user_list:
# 提取出用户表中所有用户的地址,存到addr_list中:
addr_list.append(x[1][0])
addr = addr[0]
# 检查发送信息的用户的地址是否在用户列表当中:
if addr in addr_list:
# 反馈收到确认信息给对方:
udpSock.sendto(str(confirm_recv), (addr, port))
# 打印信息:
data_name = data[0]
data_msg = rsa.decrypt((data[1]), privkey)
print data_name, ":", data_msg def print_userlist():
'''打印用户列表模块'''
global user_list
print "当前有%d个用户在线:" % len(user_list)
for i in xrange(len(user_list)):
print "ID: ", i+1, ":", user_list[i][0][0] def send_msg(udpSock, cli_addr, cli_pub, port):
'''信息发送模块'''
global username, user_list, confirm_list
quit_list = ['q', 'quit', 'exit']
cli_pubkey = rsa.PublicKey.load_pkcs1(cli_pub) while True:
msg = raw_input("> ")
if msg in quit_list:
udpSock.sendto('local^quit', ('localhost', port))
udpSock.sendto('quit', cli_addr)
break random_num = random.randint(0, 1000)
msg = rsa.encrypt(msg, cli_pubkey)
out_msg = '%s' % random_num + '^' + username + '^' + msg
confirm_list.append([out_msg, cli_addr])
udpSock.sendto(out_msg, cli_addr) def confirm_successd(udpSock):
'''确认信息到达模块
采用类似于最久未使用(LRU)算法,每隔5秒钟检查一下信息确认列表(confirm_list),当信息确认列表长度大于5时(
也就是说未确认接收的信息大于5),把信息确认列表前一半的信息再一次发送。
'''
global confirm_list while True:
lenght = len(confirm_list)
if lenght > 5:
for i in xrange(lenght/2):
msg = confirm_list[i][0]
addr = confirm_list[i][1]
udpSock.sendto(msg, addr)
time.sleep(5)
else:
time.sleep(5) def option(udpSock, BUFSIZ, broADDR, port):
'''选项菜单模块'''
while True:
print '''
输入您的选项:
显示用户列表
连接到指定用户,并开始对话
退出
'''
action = raw_input("> ")
if action is '':
print_userlist() elif action is '':
client_id = raw_input("您想连接到哪个用户?,请输入对应的id号:\n")
try:
# 获取对端的地址
cli_addr = (user_list[int(client_id)-1][1][0], port)
cli_pub = user_list[int(client_id)-1][0][1]
except IndexError:
print "没有这个用户,请重新选择:"
continue
print "已建立好连接,可以开始对话"
threads = []
t2 = MyThread(send_msg, (udpSock, cli_addr, cli_pub, port), send_msg.__name__)
threads.append(t2)
t3 = MyThread(confirm_successd, (udpSock, ), confirm_successd.__name__)
threads.append(t3)
for t in threads:
t.setDaemon(True)
t.start()
t2.join()#send_msg中止之前,让父线程一直在阻塞状态 print "连接中断,返回主菜单" elif action is '':
broadcast(broADDR, 'offline')
udpSock.close()
print "再见!"
break else:
pass def main():
'''主函数'''
host = ''
port = 2425
broADDR = ('<broadcast>', port)
localADDR = (host, port)
BUFSIZ = 1024
try:
broadcast(broADDR, 'online')
udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpSock.bind(localADDR)
t1 = MyThread(recv_msg, (localADDR, BUFSIZ, udpSock, port, ),
recv_msg.__name__)
t1.setDaemon(True)
t1.start()
option(udpSock, BUFSIZ, broADDR, port)
except (KeyboardInterrupt, SystemError):
udpSock.close()
raise
except:
traceback.print_exc if __name__ == '__main__':
main()

cjyfffIM_v0.1