Python学习之--socket续集

时间:2023-03-08 18:39:23
IO多路复用:

I/O多路复用指:通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

一个很简单的linux例子,select,poll,epoll 都是IO多路复用的机制。

select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。 poll poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。 epoll 直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

IO多路复用使用的场合:

(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。
(2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
(3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
(4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
(5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

 

Python中的IO多路复用:

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

Windows Python:
提供: select
Mac Python:
提供: select
Linux Python:
提供: select、poll、epoll

利用select监听终端操作实例:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'wang'
#f = file() , obj = socket(), sys.stdin = 终端输入
#select.select监听用户输入,如果用户输入内容,select 会感知 sys.sdtin 改变,将改变的文件句柄保存至列表,并将列表作为select第一个参数返回,如果用户未输入内容,select 第一个参数 = [], import select
import threading
import sys while True:
readable, writeable, error = select.select([sys.stdin,],[],[],1)
if sys.stdin in readable:
print 'select get stdin',sys.stdin.readline()

利用select实现伪同时处理多个Socket客户端请求:服务端

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'wang'
import socket
import select sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0) inputs = [sk1,] while True:
readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
for r in readable_list:
# 当客户端第一次连接服务端时
if sk1 == r:
print 'accept'
request, address = r.accept()
request.setblocking(0)
inputs.append(request)
# 当客户端连接上服务端之后,再次发送数据时
else:
received = r.recv(1024)
# 当正常接收客户端发送的数据时
if received:
print 'received data:', received
# 当客户端关闭程序时
else:
inputs.remove(r) sk1.close()

利用select实现伪同时处理多个Socket客户端请求:客户端

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'wang'
import socket ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port) while True:
inp = raw_input('please input:')
sk.sendall(inp)
sk.close()

执行结果

Python学习之--socket续集Python学习之--socket续集

python进程池:multiprocessing.pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

使用进程池实例:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'wang' import multiprocessing
import time def func(msg):
print "msg:", msg
time.sleep(3)
print "end" if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print "SOS~ SOS~ SOS~~~~~~~~~~~~~~~~~~~~~~"
pool.close()
pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print "Sub-process(es) done."

执行结果:

SOS~ SOS~ SOS~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
#等待后出现
msg: hello 3
end
end
end
Sub-process(es) done.

函数解释

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
close() 关闭pool,使其不在接受新的任务。
terminate() 结束工作进程,不在处理未完成的任务。
join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

执行说明

创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: SOS~ SOS~ SOS~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

 

使用进程池(阻塞)实例:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'wang' import multiprocessing
import time def func(msg):
print "msg:", msg
time.sleep(3)
print "end" if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
pool.close()
pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print "Sub-process(es) done."

执行结果:

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

 

Python线程

Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

小例子:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'wang'
import threading
import time def show(arg):
time.sleep(1)
print 'thread'+str(arg) for i in range(10):
t = threading.Thread(target=show, args=(i,)) #创建多线程
t.start() print 'main thread stop'

运行结果:

main thread stop
thread0thread1thread2 thread7
thread6thread5thread4thread3 thread9
thread8

结果是乱掉的,同时执行,没有锁,主机已经凌乱了~。~

代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

更多方法:

start        线程准备就绪,等待CPU调度
setName 为线程设置名称
getName 获取线程名称
setDaemon 设置为后台线程或前台线程(默认)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
run 线程被cpu调度后执行Thread类对象的run方法

为了避免上述问题,引入了线程锁的概念

#未加线程锁代码,输出结果是凌乱的
import threading
import time gl_num = 0 def show(arg):
global gl_num
time.sleep(1)
gl_num +=1
print gl_num for i in range(10):
t = threading.Thread(target=show, args=(i,))
t.start() print 'main thread stop' #加上线程锁的代码
import threading
import time gl_num = 0 lock = threading.RLock() def Func():
    #上锁
lock.acquire()
global gl_num
gl_num +=1
time.sleep(1)
print gl_num
    #解锁
lock.release() for i in range(10):
t = threading.Thread(target=Func)
t.start()

 

多线程与多进程的区别:

多线程之间可以共享内存,进程之间无法共享内存,都是为了让多个CPU同时处理某个请求,如果想利用多CPU,则需要写多进程。IO密集型,用多线程,计算密集型用多进程,由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。

 

事件:

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True

小例子:

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'wang'
import threading def do(event):
print 'start'
event.wait()
print 'execute' event_obj = threading.Event()
for i in range(10):
#创建10个线程对象
t = threading.Thread(target=do, args=(event_obj,))
#执行start,执行完之后进入wait
t.start() event_obj.clear()
inp = raw_input('input:')
if inp == 'true':
#当输入为true时,执行execute
event_obj.set()

输出结果:

start
start
start
start
start
start
start
start
start
start
input:true #输入为true时,执行下面的execute
execute
executeexecuteexecute executeexecuteexecute executeexecuteexecute