挂接P2P通道-- ESFramework 4.0 进阶(08)

时间:2022-01-06 08:47:37

最新版本的ESFramework/ESPlus提供了基于TCP和UDP的P2P通道,而无论我们是使用基于TCP的P2P通道,还是使用基于UDP的P2P通道,ESPlus保证所有的P2P通信都是可靠的。这是因为ESPlus在原始UDP的基础上模拟TCP的机制进行了再次封装,以使UDP像TCP一样可靠。在客户端之间需要高频通信的分布式系统中(如IM系统等),可靠的P2P通信将为您节省巨大的带宽和服务器成本。详情请参见:ESFramework 开发手册(04) -- 可靠的P2P 以及 ESFramework 使用技巧 -- 部署P2P服务器 

ESFramework 4.0 进阶(07)-- 消息同步调用一文中我们介绍了客户端与服务器进行交互的一种常见情况:客户端向服务器发送请求消息,服务器处理完毕后返回应答消息给客户端。还有一种常见情况是,客户端需要发送一个消息给另外一个在线的用户。一般,这样的P2P消息是通过服务器中转的。很多情况下,中转不会有很大的问题,但是对于那种类似用户之间需要视频会话、文件传输等高频率、大尺寸消息交互的应用来说,所有的消息都经过服务器中转,就会大大地增加服务器的压力。对于这种需求,使用P2P通道是最常见的解决方案。即,用户与用户之间交互的消息通过P2P通道来发送。

一.P2P通道管理器IP2PChannelManager

  ESFramework使用ESFramework.Passive.IP2PChannelManager来管理所有的P2P通道,IP2PChannelManager接口定义如下:


    public interface IP2PChannelManager
    {    
        /// <summary>
        /// 通向目标用户的P2P通道是否可用。
        /// </summary>
        /// <param name="destUserID">目标用户ID</param>        
        bool P2PChannelUsable(string destUserID) ;         /// <summary>
        /// 通过P2PChannel发送消息。
        /// </summary>
        /// <param name="destUserID">目标用户ID</param>
        /// <param name="msg">要发送的消息</param>
        /// <param name="dataPriority">发送的优先级</param>
        void SendMessage(string destUserID, IMessage msg, DataPriority dataPriority);
    }   

如果应用中,不需要使用P2P通道,则可以直接使用null object模式的ESFramework.Passive.EmptyP2PChannelManager类。

实现P2P通道的最常见方式就是P2P打洞,而ESPlus提供了现成基于TCP的P2P通道管理器ESPlus.Application.P2PSession.Passive.Tcp.TcpChannelManager和基于UDP的P2P通道管理器ESPlus.Application.P2PSession.Passive.Udp.UdpChannelManager供我们使用。但是,用起来并不简单,要想成功的部署支持P2P的应用,还需要部署NAPT服务器和P2P服务器,以支持P2P打洞和P2P通道的建立。这些已经不是ESFramework的核心内容,这里就不展开了。

如果使用者能实现自己的P2P通道(比如基于UPnP),那么只要实现IP2PChannelManager接口,并挂接到MessageTransceiver就可以了。

二. 消息收发器IMessageTransceiver

ESFramework通过ESFramework.Passive.IMessageTransceiver来支持P2P通道的挂接,IMessageTransceiver的主要作用是向使用者屏蔽了P2P消息是经过服务器中转的还是经由P2P通道发送的,这使得底层的通道选择对于上层应用开发者是透明的。

IMessageTransceiver接口定义如下:


    public interface IMessageTransceiver
    {
        IP2PChannelManager P2PChannelManager { set; }
        IServerAgent ServerAgent { set; }
        IContractHelper ContractHelper { set; }
        IResponseManager ResponseManager { set; }   
        IMessagePipe MessagePipe { set; }         /// <summary>
        /// 提交数据。
        /// (1)如果为非P2P消息,则直接向服务器提交。
        /// (2)如果消息为P2P(即接收者是另一个在线用户),且目标用户的P2PChannel可用,则将通过P2PChannel向该用户发送。否则,也是直接向服务器提交。     
        /// </summary>
        /// <param name="msg">要提交的消息</param>
        /// <param name="dataPriority">消息发送的优先级</param>
        void CommitRequest(IMessage msg, DataPriority dataPriority);         /// <summary>
        /// 提交数据并返回应答。如果resMessageType不为null,且超时仍然没有回复,则抛出超时异常。
        /// (1)如果dataPriority != DataPriority.CanBeDiscarded ,则resMessageType只能为null。  
        /// (2)如果为非P2P消息,则直接向服务器提交。
        /// (3)如果消息为P2P(即接收者是另一个在线用户),且目标用户的P2PChannel可用,则将通过P2PChannel向该用户发送。否则,也是直接向服务器提交。     
        /// </summary>
        /// <param name="msg">要提交的消息</param>
        /// <param name="dataPriority">消息发送的优先级</param>
        /// <param name="resMessageType">期望应答消息的类型</param>
        /// <returns>应答消息</returns>     
        IMessage CommitRequest(IMessage msg, DataPriority dataPriority, int? resMessageType);         /// <summary>
        /// 向服务器提交数据。该消息即使消息为P2P,且P2P通道可用,也一定要经过服务器中转。
        /// </summary>
        /// <param name="requestMsg">要提交的消息</param>
        /// <param name="dataPriority">消息发送的优先级</param>
        /// <param name="resMessageType">期望应答消息的类型</param>
        /// <returns>应答消息</returns>
        IMessage CommitRequestToServer(IMessage requestMsg, DataPriority dataPriority, int? resMessageType);            
    }

(1)当消息的接收者(IMessage.Header.DestUserID)为其它在线用户时,MessageTransceiver会先查看是否存在可用的P2P通道,如果存在,则直接交由IP2PChannelManager发送;否则,还是通过IServerAgent发送给服务器中转。

(2)如果消息的接收者为服务器,则通过IServerAgent直接向服务器提交。

(3)如果消息为请求消息且需要应答,则使用第二个CommitRequest方法。注意,这个请求消息可能是被服务器处理,也可能是被另外一个在线客户端处理,这取决于消息接收者(IMessage.Header.DestUserID)是服务器还是另外一个在线用户。

(4)对于有些类型的P2P消息,我们的项目可能需要该消息必须经过服务器中转,即使对应的P2P通道存在,也一定要中转(比如,消息要被服务端记录),那么这个时候就可以直接调用CommitRequestToServer方法。

(5)IMessageTransceiver之所以要依赖于IMessagePipe,是因为要保证消息在使用P2P通道发送之前,也必须先经过消息骨架流程。

(6)IMessageTransceiver为何要依赖于回复管理器IResponseManager了,这是因为当请求消息经过P2P通道发送时,是不需要调用IServerAgent,所以MessageTransceiver需要自己从回复管理器中提取匹配的回复消息。

站在客户端的角度,我们看到从IRegularSender到IServerAgent、再到IMessageTransceiver,是一个逐步增强的过程,并且后一个组件的实现都是建立在前一个组件之上的。IRegularSender解决了发送的消息必须经过MessagePipe,IServerAgent对消息同步调用提供支持,而IMessageTransceiver又向使用者屏蔽了底层消息发送的通道。

还有一点要特别指出的是,P2P通道的接收端可以直接将接收到的消息交给MessageDispatcher去分派处理,这样就复用了我们的消息处理骨架流程。

三.示范代码

在客户端的编程开发中,发送消息时我们最好是使用IMessageTransceiver而不是IRegularSender或IServerAgent,即使暂时用不到P2P通道也没关系,以后如果有启动P2P通道的需求,那么代码就不用修改,只要在配置文件中使用正式的P2P通道管理器对象来替换EmptyP2PChannelManager即可。

最后,我们来构造一个IMessageTransceiver实例:


    IContractHelper contractHelper = ......;
    IResponseManager responseManager = ......;
    IServerAgent serverAgent = ......;
    IMessageTransceiver messageTransceiver = new MessageTransceiver();
    messageTransceiver.ContractHelper = contractHelper;
    messageTransceiver.P2PChannelManager = new EmptyP2PChannelManager();
    messageTransceiver.ResponseManager = responseManager;
    messageTransceiver.ServerAgent = serverAgent;     //IMessage response = messageTransceiver.CommitRequest(requestMessage, DataPriority.Common ,102);