事件驱动之Twsited异步网络框架

时间:2023-03-09 03:53:22
事件驱动之Twsited异步网络框架

在这之前先了解下什么是事件驱动编程

传统的编程是如下线性模式的:
开始--->代码块A--->代码块B--->代码块C--->代码块D--->......--->结束
每一个代码块里是完成各种各样事情的代码,但编程者知道代码块A,B,C,D...的执行顺序,唯一能够改变这个流程的是数据。输入不同的数据,根据条件语句判断,流程或许就改为A--->C--->E...--->结束。每一次程序运行顺序或许都不同,但它的控制流程是由输入数据和你编写的程序决定的。如果你知道这个程序当前的运行状态(包括输入数据和程序本身),那你就知道接下来甚至一直到结束它的运行流程。
对于事件驱动型程序模型,它的流程大致如下:
开始--->初始化--->等待
与上面传统编程模式不同,事件驱动程序在启动之后,就在那等待,等待什么呢?等待被事件触发。传统编程下也有“等待”的时候,比如在代码块D中,你定义了一个input(),需要用户输入数据。但这与下面的等待不同,传统编程的“等待”,比如input(),你作为程序编写者是知道或者强制用户输入某个东西的,或许是数字,或许是文件名称,如果用户输入错误,你还需要提醒他,并请他重新输入。事件驱动程序的等待则是完全不知道,也不强制用户输入或者干什么。只要某一事件发生,那程序就会做出相应的“反应”。这些事件包括:输入信息、鼠标、敲击键盘上某个键还有系统内部定时器触发。

Twsited异步网络框架

Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操作、电子邮件等。事件驱动之Twsited异步网络框架

事件驱动

简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。

自定义事件驱动框架,命名为:“弑君者”:

#!/usr/bin/env python
# -*- coding:utf-8 -*- # event_drive.py event_list = [] def run():
for event in event_list:
obj = event()
obj.execute() class BaseHandler(object):
"""
用户必须继承该类,从而规范所有类的方法(类似于接口的功能)
"""
def execute(self):
raise Exception('you must overwrite execute')

程序员使用“弑君者框架”:

#!/usr/bin/env python
# -*- coding:utf-8 -*- from source import event_drive class MyHandler(event_drive.BaseHandler): def execute(self):
print 'event-drive execute MyHandler' event_drive.event_list.append(MyHandler) # 注册一个事件
event_drive.run() # 执行事件

Protocols

Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:

makeConnection               在transport对象和服务器之间建立一条连接
connectionMade 连接建立起来后调用
dataReceived 接收数据时调用
connectionLost 关闭连接时调用

Transports

Transports代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可作为transports的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性”,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports实现了ITransports接口,它包含如下的方法:

write                   以非阻塞的方式按顺序依次将数据写到物理连接上
writeSequence 将一个字符串列表写到物理连接上
loseConnection 将所有挂起的数据写入,然后关闭连接
getPeer 取得连接中对端的地址信息
getHost 取得连接中本端的地址信息

将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。

EchoServer

from twisted.internet import protocol
from twisted.internet import reactor #reactor是twisted事件循环的核心,它提供了一些服务的基本接口,像网络通信、线程和事件的分发 class Echo(protocol.Protocol): #必须自己定义一个类,继承protocol,成为protocol的子类
def dataReceived(self, data): # 只要twisted收到数据就会调用此方法
self.transport.write(data) #写入数据到物理链接 类似send发送数据 def main():
factory = protocol.ServerFactory() #protocol.ServerFactor是一个基础工厂类,里面为空,但是你必须要定义,将你自定义的类作为参数传入
factory.protocol = Echo #定义类下的protocol变量的值,默认在ServerFactory里protocol字段值是None,在此将类的内存地址传入给protocol
reactor.listenTCP(1234,factory)#reactor绑定端口和传入protocol的值(类名),客户端连接过来的操作在自定义的类里完成
reactor.run() # 启动分发器 | 类似于select的监听,有连接就触发重新定义类里的操作 (因为在上面已经绑定了类) if __name__ == '__main__':
main()

EchoClient

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import reactor, protocol # a client protocol class EchoClient(protocol.Protocol):
"""Once connected, send a message, then print the result."""
def connectionMade(self): #连接一建立成功,就会自动调用 此方法
print("connection is build, sending data...")
self.transport.write(bytes("hello alex!","utf8")) # 向server端send一条消息 def dataReceived(self, data): # 接受server端返回的消息
"As soon as any data is received, write it back."
print ("Server said:", data.decode())
self.transport.loseConnection()# 调用该方法,自动执行conectionLost方法
# exit('exit') def connectionLost(self, reason):
print ("====connection lost===") class EchoFactory(protocol.ClientFactory): # client也必须定义自己的类,继承ClientFactory类,重写方法
protocol = EchoClient #父类调用了protocol,但是在父类里protocol为空,类似于handle,所以你要在这里重新给protocol赋值(类的内存地址),客户端执行的操作在该这定义 def clientConnectionFailed(self, connector, reason): # 父类里的该方法为空,你必须自己定义 (如果连接失败了,直接执行该方法)
print ("Connection failed - goodbye!")
reactor.stop() def clientConnectionLost(self, connector, reason): # 同理(如果连接关闭了,执行该方法,类似于socketserver的finish)
print ("Connection lost - goodbye!")
reactor.stop() # this connects the protocol to a server running on port 8000
def main():
f = EchoFactory() # 实例化自定义的类
reactor.connectTCP("localhost", 1234, f) #连接server,传入类
reactor.run() # this only runs if the module was *not* imported
if __name__ == '__main__':
main()

运行服务器端脚本将启动一个TCP服务器,监听端口1234上的连接。服务器采用的是Echo协议,数据经TCP transport对象写出。运行客户端脚本将对服务器发起一个TCP连接,回显服务器端的回应然后终止连接并停止reactor事件循环。这里的Factory用来对连接的双方生成protocol对象实例。两端的通信是异步的,connectTCP负责注册回调函数到reactor事件循环中,当socket上有数据可读时通知回调处理。

其实在这里也可以用上次socketclient的脚本来访问server,只不过twisted里实现了这种方法 ,上述是按照他规定的方法来完成交互。

一个传送文件的例子 

server side

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#_*_coding:utf-8_*_
# This is the Twisted Fast Poetry Server, version 1.0 import optparse, os # optparse 类似于sys.argv 处理用户的输入的参数 from twisted.internet.protocol import ServerFactory, Protocol def parse_args():
usage = """usage: %prog [options] poetry-file This is the Fast Poetry Server, Twisted edition.
Run it like this: python twisted_sendfile.py <path-to-poetry-file> If you are in the base directory of the twisted-intro package,
you could run it like this: python twisted-server-1/fastpoetry.py poetry/ecstasy.txt to serve up John Donne's Ecstasy, which I know you want to do.
""" parser = optparse.OptionParser(usage) #创建实例
help = "The port to listen on. Default to a random available port."
parser.add_option('--port', type='int', help=help)#设置参数,如果用户输入的是--port,规定必须为int
help = "The interface to listen on. Default is localhost."
parser.add_option('--iface', help=help, default='localhost')# 指定地址 默认localhost
options, args = parser.parse_args() # 解析用户输入的参数
print("--arg:",args) #解析用户除参数以为的字符串,这里为文件名
print("-->",options) #解析用户输入的参数,类型为字典--> {'iface': 'localhost', 'port': 123}
if len(args) != 1: #判断用户是否输入文件里,这里只能指定一个文件
parser.error('Provide exactly one poetry file.')
poetry_file = args[0] if not os.path.exists(args[0]): #判断文件是否存在
parser.error('No such file: %s' % poetry_file) return options, poetry_file #返回用户输入的字符串 class PoetryProtocol(Protocol): #handle
def connectionMade(self):
self.transport.write(self.factory.poem)
self.transport.loseConnection() class PoetryFactory(ServerFactory): #基础类
protocol = PoetryProtocol# 导入自定义的类,因为ServerFactory类里并没有做任何事情,所以我们只能亲历而为了
def __init__(self, poem):#因为基类无法添加形参,自定义一个构造方法,让用户传入文件内容
self.poem = poem def main():
options, poetry_file = parse_args()#options 用户传入的参数,poetry_file 文件名
poem = open(poetry_file).read() # 打开文件
factory = PoetryFactory(bytes(poem,"utf8")) # 初始化基类,传入读取的数据
from twisted.internet import reactor # 导入事件分发器
port = reactor.listenTCP(options.port or 9000, factory,
interface=options.iface) # 绑定端口,默认9000,interface 指定主机地址
print ('Serving %s on %s.' % (poetry_file, port.getHost()))
reactor.run() if __name__ == '__main__':
main()

client side

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# This is the Twisted Get Poetry Now! client, version 3.0. # NOTE: This should not be used as the basis for production code. import optparse from twisted.internet.protocol import Protocol, ClientFactory def parse_args():
usage = """usage: %prog [options] [hostname]:port ... This is the Get Poetry Now! client, Twisted version 3.0
Run it like this: python get-poetry-1.py port1 port2 port3 ... 指定服务器端的端口,可以同时接受多个文件
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args() # _表示过滤掉第一个值
print('==addr:',_,addresses) #==addr: {} ['111', '222', '333']
if not addresses:# 如果没输入地址,打印help
print (parser.format_help())
parser.exit() def parse_address(addr):# 区分用户指定的主机和端口,默认为127.0.0.1
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port) # 返回(主机地址,端口)
#return parse_address(addresses)
return map(parse_address, addresses)
class PoetryProtocol(Protocol): # handle
poem = ''
def dataReceived(self, data): # 接受数据
self.poem += data
#self.factory = PoetryClientFactory
print('[%s] recv:[%s]' %(self.transport.getPeer(),len(self.poem))) def connectionLost(self, reason): # 连接关闭后执行此函数
self.poemReceived(self.poem) def poemReceived(self, poem): #
self.factory.poem_finished(poem)
class PoetryClientFactory(ClientFactory):#定义基类继承指定的类
protocol = PoetryProtocol #绑定和server交互的类
def __init__(self, callback):
self.callback = callback
def poem_finished(self, poem):
self.callback(poem) #执行回调的那个函数
#self.get_poem(poem)
def get_poetry(host, port, callback):
"""
Download a poem from the given host and port and invoke
callback(poem)
when the poem is complete.
"""
from twisted.internet import reactor
factory = PoetryClientFactory(callback) # 实例化ProtryClientFactory 将传入的回调函数传入
reactor.connectTCP(host, port, factory) # 链接
def poetry_main():
addresses = parse_args() #((172.0.0.1,9000),(...))
from twisted.internet import reactor
poems = []
def got_poem(poem):
poems.append(poem)
if len(poems) == len(addresses):
reactor.stop() for address in addresses:#循环列表 获取地址和端口
host, port = address
get_poetry(host, port, got_poem) # 将got_poem函数链接函数
reactor.run() print("main loop done...")
#for poem in poems:
# Eprint poem if __name__ == '__main__':
poetry_main()