port= 80 ) def sendPoolRequest(req: HttpRequest

时间:2022-02-19 08:22:50

   Akka-http的客户端Api应该是以HttpRequest操纵为主轴的网上动静交换模式编程工具。我们知道:Akka-http是搭建在Akka-stream之上的。所以,Akka-http在客户端构建与处事器的连接通道也可以用Akka-stream的Flow来暗示。这个Flow可以通过挪用Http.outgoingConnection来获取:

/** * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTTP client connection to the given endpoint. * Every materialization of the produced flow will attempt to establish a new outgoing connection. * * To configure additional settings for requests made using this method, * use the `akka.http.client` config section or pass in a [[akka.http.scaladsl.settings.ClientConnectionSettings]] explicitly. */ def outgoingConnection(host: String, port: Int = 80, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = _outgoingConnection(host, port, settings.withLocalAddressOverride(localAddress), ConnectionContext.noEncryption(), ClientTransport.TCP, log)

我们看到:这个函数实现了对Server端地点host+port的设定,,返回的功效类型是Flow[HttpRequest,HttpResponse,Future[OutgoingConnection]]。这个Flow代表将输入的HttpRequest转换成输出的HttpResponse。这个转换过程包孕了与Server之间的Request,Response动静交换。下面我们试着用这个Flow来向Server端发送request,并获取response:

val connFlow: Flow[HttpRequest,HttpResponse,Future[Http.OutgoingConnection]] = Http().outgoingConnection("akka.io") def sendHttpRequest(req: HttpRequest) = { Source.single(req) .via(connFlow) .runWith(Sink.head) } sendHttpRequest(HttpRequest(uri="/")) .andThen{ case Success(resp) => println(s"got response: ${resp.status.intValue()}") case Failure(err) => println(s"request failed: ${err.getMessage}") } .andThen {case _ => sys.terminate()}

上面的这种模式就是所谓Connection-Level-Client-Side-Api。这种模式可以让用户有更洪流平的*度控制connection的构建、使用及在connection上发送request的方法。一般来讲,当返回response的entity被完全消耗后系统会自动close connection,这套api还供给了一些手动要领可以在有需要的情况下手动进行connection close,如下:

//close connection by cancelling response entity resp.entity.dataBytes.runWith(Sink.cancelled) //close connection by receiving response with close header Http().bindAndHandleSync( { req ? HttpResponse(headers = headers.Connection("close") :: Nil) }, "akka.io", 80)(mat)

Akka-http客户端api还有一种实用的Host-Level-Client-Side-Api模式。这套api能自动针对每个端点维护一个连接池(connection-pool),用户只需对连接池进行配置。系统凭据连接池配置自动维护池内线程的生、死、动、停。akka-http.host-connection-pool配置中max-connections,max-open-requests,pipelining-limit等控制着connection、在途request的数量,需要出格注意。针对某个端点的连接池是通过Http().cachedHostConnectionPool(endPoint)获取的。同样,获取的也是一个client-flow实例。因为系统自动维护着线程池,所以client-flow实例可以任意引用,无论挪用次数与挪用时间间隔。cachedHostConnectionPool()函数界说如下:

/** * Same as [[#cachedHostConnectionPool]] but for encrypted (HTTPS) connections. * * If an explicit [[ConnectionContext]] is given then it rather than the configured default [[ConnectionContext]] will be used * for encryption on the connections. * * To configure additional settings for the pool (and requests made using it), * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443, connectionContext: HttpsConnectionContext = defaultClientHttpsContext, settings: ConnectionPoolSettings = defaultConnectionPoolSettings, log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val cps = ConnectionPoolSetup(settings, connectionContext, log) val setup = HostConnectionPoolSetup(host, port, cps) cachedHostConnectionPool(setup) }