Java io.netty.util.ReferenceCountUtil 代码实例

时间:2023-03-09 00:12:13
Java io.netty.util.ReferenceCountUtil 代码实例

原文:https://www.helplib.com/Java_API_Classes/article_64580

以下是展示如何使用io.netty.util.ReferenceCountUtil的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

实例 1

复制代码
private static void testPerformOpeningHandshake0(boolean subProtocol) {
EmbeddedChannel ch = new EmbeddedChannel(
new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
FullHttpRequest req = ReferenceCountUtil.releaseLater(
new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"));
req.headers().set(Names.HOST, "server.example.com");
req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase());
req.headers().set(Names.CONNECTION, "Upgrade");
req.headers().set(Names.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
req.headers().set(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com");
req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
req.headers().set(Names.SEC_WEBSOCKET_VERSION, "13");
if (subProtocol) {
new WebSocketServerHandshaker13(
"ws://example.com/chat", "chat", false, Integer.MAX_VALUE).handshake(ch, req);
} else {
new WebSocketServerHandshaker13(
"ws://example.com/chat", null, false, Integer.MAX_VALUE).handshake(ch, req);
}
ByteBuf resBuf = (ByteBuf) ch.readOutbound();
EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
ch2.writeInbound(resBuf);
HttpResponse res = (HttpResponse) ch2.readInbound();
Assert.assertEquals(
"s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(Names.SEC_WEBSOCKET_ACCEPT));
if (subProtocol) {
Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
} else {
Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
}
ReferenceCountUtil.release(res);
}

实例 2

复制代码
@Test
public void testHttpUpgradeRequest() throws Exception {
EmbeddedChannel ch = createChannel(new MockOutboundHandler());
ChannelHandlerContext handshakerCtx = ch.pipeline().context(WebSocketServerProtocolHandshakeHandler.class);
writeUpgradeRequest(ch);
assertEquals(SWITCHING_PROTOCOLS, ReferenceCountUtil.releaseLater(responses.remove()).getStatus());
assertNotNull(WebSocketServerProtocolHandler.getHandshaker(handshakerCtx));
}

实例 3

复制代码
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Class<?> messageClass = msg.getClass();
if (!handshaker.isHandshakeComplete()) {
ctx.pipeline().remove(HttpObjectAggregator.class);
handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
httpChannel = new NettyHttpChannel(tcpStream, new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) {
@Override
protected void doSubscribeHeaders(Subscriber<? super Void> s) {
Publishers.<Void>empty().subscribe(s);
}
};
NettyHttpWSClientHandler.super.channelActive(ctx);
super.channelRead(ctx, msg);
return;
}
if (TextWebSocketFrame.class.isAssignableFrom(messageClass)) {
try {
//don't inflate the String bytes now
channelSubscriber.onNext(new StringBuffer(((TextWebSocketFrame) msg).content().nioBuffer()));
} finally {
ReferenceCountUtil.release(msg);
}
} else if (CloseWebSocketFrame.class.isAssignableFrom(messageClass)) {
ctx.close();
} else {
doRead(ctx, ((WebSocketFrame)msg).content());
}
}

实例 4

复制代码
@SuppressWarnings("unchecked")
protected final void doRead(ChannelHandlerContext ctx, Object msg) {
try {
if (null == channelSubscriber || msg == Unpooled.EMPTY_BUFFER) {
ReferenceCountUtil.release(msg);
return;
}
NettyBuffer buffer = NettyBuffer.create(msg);
try {
channelSubscriber.onNext(buffer);
}
finally {
if (buffer.getByteBuf() != null) {
if (buffer.getByteBuf()
.refCnt() != 0) {
ReferenceCountUtil.release(buffer.getByteBuf());
}
}
}
}
catch (Throwable err) {
Exceptions.throwIfFatal(err);
if (channelSubscriber != null) {
channelSubscriber.onError(err);
}
else {
throw err;
}
}
}

实例 5

复制代码
/**
* Test try to reproduce issue #1335
*/
@Test
public void testBindMultiple() throws Exception {
DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
NioEventLoopGroup group = new NioEventLoopGroup();
try {
for (int i = 0; i < 100; i++) {
Bootstrap udpBootstrap = new Bootstrap();
udpBootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
channelGroup.add(datagramChannel);
}
Assert.assertEquals(100, channelGroup.size());
} finally {
channelGroup.close().sync();
group.shutdownGracefully().sync();
}
}

实例 6

复制代码
@BeforeClass
public static void init() {
// Configure a test server
group = new LocalEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
}
});
localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}

实例 7

复制代码
@BeforeClass
public static void init() {
// Configure a test server
group = new LocalEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard
ReferenceCountUtil.release(msg);
}
});
}
});
localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}

实例 8

复制代码
@Override
public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
if (message == null) {
throw new NullPointerException("message");
}
if (matcher == null) {
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels) {
if (matcher.matches(c)) {
futures.put(c, c.write(safeDuplicate(message)));
}
}
ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
}

实例 9

复制代码
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (isRemote(ctx)) {
ByteBuf payload = (ByteBuf) msg;
byte[] data = getPayloadFromByteBuf(payload);
writeBuffer(data);
return;
}
ReferenceCountUtil.retain(msg);
// propagate the data to rest of handlers in pipeline
ctx.fireChannelRead(msg);
}

实例 10

复制代码
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
// The first message must be authentication response
if (this.authenticationUrl != null && (this.cookies == null || this.cookies.isEmpty())) {
HttpResponse response = (HttpResponse) msg;
CharSequence cookieData = response.headers().get(new AsciiString("set-cookie"));
if (cookieData != null) {
this.cookies = ServerCookieDecoder.decode(cookieData.toString());
if (this.cookies == null || this.cookies.isEmpty()) {
throw new WebSocketAuthenticationFailureException("Could not authenticate");
}
if (log.isDebugEnabled()) {
for (Cookie cookie : this.cookies) {
log.debug("Server says must set cookie with name {} and value {}", cookie.name(), cookie.value());
}
}
} else {
throw new ITException("Could not authenticate");
}
if (log.isDebugEnabled()) {
log.debug("Authentication succeeded for user {}", this.user);
}
handShaker.handshake(ctx.channel());
return;
}
// The second one must be the response for web socket handshake
if (!handShaker.isHandshakeComplete()) {
handShaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
if (log.isDebugEnabled()) {
log.debug("Web socket client connected for user {}", this.user);
}
handshakeFuture.setSuccess();
return;
}
// Take the byte buff and send it up to Stomp decoder
if (msg instanceof WebSocketFrame) {
if (log.isDebugEnabled()) {
if (msg instanceof TextWebSocketFrame) {
log.debug("Received text frame {}", ((TextWebSocketFrame) msg).text());
}
}
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(((WebSocketFrame) msg).content());
}
}

实例 11

复制代码
@Override
protected void encode(ChannelHandlerContext ctx, DefaultHttpMessage defaultHttpMessage, List out) throws Exception {
if (defaultHttpMessage.headers().contains(HttpHeaders.CONTENT_LENGTH, "", true)) {
defaultHttpMessage.headers().remove(HttpHeaders.CONTENT_LENGTH);
}
ReferenceCountUtil.retain(defaultHttpMessage);
out.add(defaultHttpMessage);
}

实例 12

复制代码
private static Object safeDuplicate(Object message) {
if (message instanceof ByteBuf) {
return ((ByteBuf) message).duplicate().retain();
} else if (message instanceof ByteBufHolder) {
return ((ByteBufHolder) message).duplicate().retain();
} else {
return ReferenceCountUtil.retain(message);
}
}

实例 13

复制代码
@Override
public void onNext(T t) {
// Retain so that post-buffer, the ByteBuf does not get released.
// Release will be done after reading from the subject.
ReferenceCountUtil.retain(t);
state.bufferedObserver.onNext(t);
// Schedule timeout once and when not subscribed yet.
if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives.
@Override
public void call(Long aLong) {
disposeIfNotSubscribed();
}
});
}
}

实例 14

复制代码
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
Channel channel = ctx.channel();
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
if (handleRequest(request, channel, ctx)) {
if (httpMethodInfoBuilder.getHttpResourceModel()
.isStreamingReqSupported() &&
channel.pipeline().get("aggregator") != null) {
channel.pipeline().remove("aggregator");
} else if (!httpMethodInfoBuilder.getHttpResourceModel()
.isStreamingReqSupported() &&
channel.pipeline().get("aggregator") == null) {
channel.pipeline().addAfter("router", "aggregator",
new HttpObjectAggregator(Integer.MAX_VALUE));
}
}
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
} else if (msg instanceof HttpContent) {
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
}
}

实例 15

复制代码
@Override
public void onData(final ByteBuf input) {
// We need to retain until the serializer gets around to processing it.
ReferenceCountUtil.retain(input);
serializer.execute(new Runnable() {
@Override
public void run() {
if (isTraceBytes()) {
TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
}
ByteBuffer source = input.nioBuffer();
do {
ByteBuffer buffer = protonTransport.getInputBuffer();
int limit = Math.min(buffer.remaining(), source.remaining());
ByteBuffer duplicate = source.duplicate();
duplicate.limit(source.position() + limit);
buffer.put(duplicate);
protonTransport.processInput();
source.position(source.position() + limit);
} while (source.hasRemaining());
ReferenceCountUtil.release(input);
// Process the state changes from the latest data and then answer back
// any pending updates to the Broker.
processUpdates();
pumpToProtonTransport();
}
});
}