编写一个简易的Java NIO Reactor库

时间:2022-09-09 11:12:17

开源地址

https://github.com/sea-boat/net-reactor

源码设计

接收器Acceptor

/**
*
* @author seaboat
* @date 2016-08-25
* @version 1.0
* <pre><b>email: </b>849586227@qq.com</pre>
* <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
* <p>This Acceptor provides a NIO mode to accept client sockets.</p>
*/

public final class Acceptor extends Thread {

private static final Logger LOGGER = LoggerFactory
.getLogger(Acceptor.class);
private final int port;
private final Selector selector;
private final ServerSocketChannel serverChannel;
private long acceptCount;
private static final AcceptIdGenerator IdGenerator = new AcceptIdGenerator();
private ReactorPool reactorPool;

public Acceptor(ReactorPool reactorPool, String name, String bindIp,
int port) throws IOException {
super.setName(name);
this.port = port;
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
this.serverChannel.bind(new InetSocketAddress(bindIp, port), 100);
this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
this.reactorPool = reactorPool;
}

public int getPort() {
return port;
}

public long getAcceptCount() {
return acceptCount;
}

@Override
public void run() {
final Selector selector = this.selector;
for (;;) {
++acceptCount;
try {
selector.select(1000L);
Set<SelectionKey> keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) {
accept();
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Throwable e) {
LOGGER.warn(getName(), e);
}
}
}

/**
* Accept client sockets.
*/

private void accept() {
SocketChannel channel = null;
try {
channel = serverChannel.accept();
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 1024);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
reactorPool.getNextReactor().postRegister(
new FrontendConnection(channel, IdGenerator.getId()));
} catch (Throwable e) {
closeChannel(channel);
LOGGER.warn(getName(), e);
}
}

/**
* Close a channel.
*
* @param channel
*/

private static void closeChannel(SocketChannel channel) {
if (channel == null) {
return;
}
Socket socket = channel.socket();
if (socket != null) {
try {
socket.close();
LOGGER.info("channel close.");
} catch (IOException e) {
LOGGER.warn("IOException happens when closing socket : ", e);
}
}
try {
channel.close();
} catch (IOException e) {
LOGGER.warn("IOException happens when closing channel : ", e);
}
}

/**
* ID Generator.
*/

private static class AcceptIdGenerator {
private static final long MAX_VALUE = 0xffffffffL;
private long acceptId = 0L;
private final Object lock = new Object();

private long getId() {
synchronized (lock) {
if (acceptId >= MAX_VALUE) {
acceptId = 0L;
}
return ++acceptId;
}
}
}
}

Reactor类

/**
*
* @author seaboat
* @date 2016-08-25
* @version 1.0
* <pre><b>email: </b>849586227@qq.com</pre>
* <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
* <p>Reactor reacts all sockets.</p>
*/

public final class Reactor extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
private final String name;
private final Selector selector;
private final ConcurrentLinkedQueue<FrontendConnection> queue;
private long doCount;
private Handler handler;

public Reactor(String name, Handler handler) throws IOException {
this.name = name;
this.selector = Selector.open();
this.queue = new ConcurrentLinkedQueue<FrontendConnection>();
this.handler = handler;
}

final void postRegister(FrontendConnection frontendConnection) {
queue.offer(frontendConnection);
this.selector.wakeup();
}

@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> keys = null;
for (;;) {
++doCount;
try {
selector.select(500L);
register(selector);
keys = selector.selectedKeys();
for (SelectionKey key : keys) {
FrontendConnection connection = null;
Object attach = key.attachment();
if (attach != null && key.isValid()) {
connection = (FrontendConnection) attach;
if (key.isReadable()) {
try {
connection.read();
handler.handle(connection);
} catch (IOException e) {
connection.close();
LOGGER.warn("IOException happens : ", e);
continue;
} catch (Throwable e) {
LOGGER.warn("Throwable happens : ", e);
continue;
}
}
if (key.isValid() && key.isWritable()) {
connection.write();
}
} else {
key.cancel();
}
}
} catch (Throwable e) {
LOGGER.warn("exception happens selecting : ", e);
} finally {
if (keys != null) {
keys.clear();
}
}
}
}

private void register(Selector selector) {
FrontendConnection c = null;
if (queue.isEmpty()) {
return;
}
while ((c = queue.poll()) != null) {
try {
c.register(selector);
} catch (Throwable e) {
LOGGER.warn("ClosedChannelException happens : ", e);
}
}
}

final Queue<FrontendConnection> getRegisterQueue() {
return queue;
}

final long getReactCount() {
return doCount;
}

}

Reactor池

/**
*
* @author seaboat
* @date 2016-08-25
* @version 1.0
* <pre><b>email: </b>849586227@qq.com</pre>
* <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
* <p>Reactor pool. Socket connections are polling to the reactor of this pool. </p>
*/

public class ReactorPool {
private final Reactor[] reactors;
private volatile int nextReactor;
private String name = "reactor";

public ReactorPool(int poolSize, Handler handler) throws IOException {
reactors = new Reactor[poolSize];
for (int i = 0; i < poolSize; i++) {
Reactor reactor = new Reactor(name + "-" + i,handler);
reactors[i] = reactor;
reactor.start();
}
}

public Reactor getNextReactor() {
if (++nextReactor == reactors.length) {
nextReactor = 0;
}
return reactors[nextReactor];
}
}

前端连接抽象

/**
*
* @author seaboat
* @date 2016-08-25
* @version 1.0
* <pre><b>email: </b>849586227@qq.com</pre>
* <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
* <p>This is a abstraction of frontend.</p>
*/

public class FrontendConnection {
private static final Logger LOGGER = LoggerFactory
.getLogger(FrontendConnection.class);
private long id;
private SocketChannel channel;
private SelectionKey selectionKey;
private ByteBuffer readBuffer;
private static int BYFFERSIZE = 1024;
protected ConcurrentLinkedQueue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue<ByteBuffer>();

public FrontendConnection(SocketChannel channel, long id) {
this.id = id;
this.channel = channel;
}

public SocketChannel getChannel() {
return channel;
}

public long getId() {
return id;
}

public void read() throws IOException {
readBuffer = ByteBuffer.allocate(BYFFERSIZE);
channel.read(readBuffer);
}

public void close() throws IOException {
channel.close();
}

public void write() throws IOException {
ByteBuffer buffer;
while ((buffer = writeQueue.poll()) != null) {
buffer.flip();
while (buffer.hasRemaining()) {
int len = channel.write(buffer);
if (len < 0) {
throw new EOFException();
}
if (len == 0) {
selectionKey.interestOps(selectionKey.interestOps()
| SelectionKey.OP_WRITE);
selectionKey.selector().wakeup();
break;
}
}
}
selectionKey.interestOps(selectionKey.interestOps()
& ~SelectionKey.OP_WRITE);
}

public ByteBuffer getReadBuffer() {
return readBuffer;
}

public ConcurrentLinkedQueue<ByteBuffer> getWriteQueue() {
return writeQueue;
}

public void register(Selector selector) throws Throwable {
selectionKey = channel.register(selector, SelectionKey.OP_READ, this);
}

}

处理

/**
*
* @author seaboat
* @date 2016-08-25
* @version 1.0
* <pre><b>email: </b>849586227@qq.com</pre>
* <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
* <p>This Handler will be call when there is a data having be ready.</p>
*/

public interface Handler {

public void handle(FrontendConnection connection);

}

定义自己的处理

/**
*
* @author seaboat
* @date 2016-08-25
* @version 1.0
* <pre><b>email: </b>849586227@qq.com</pre>
* <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
* <p>Demo.</p>
*/

public class MyHandler implements Handler {

private static final Logger LOGGER = LoggerFactory
.getLogger(MyHandler.class);
private long readSize;

/**
* The logic to deal with the received data.
*
* It means that reactor will trigger this function once the data is received.
*/

public void handle(FrontendConnection connection) {
Buffer buff = connection.getReadBuffer();
readSize = +readSize + buff.position();
LOGGER.info(connection.getId() + " connection has receive " + readSize);
if (readSize % 5 == 0) {
ByteBuffer sendBuffer = ByteBuffer.allocate(10);;
sendBuffer.wrap("hello".getBytes());
connection.getWriteQueue().add(sendBuffer);
try {
connection.write();
} catch (IOException e) {
LOGGER.warn("IOException", e);
}
}
}

}

启动

/**
*
* @author seaboat
* @date 2016-08-25
* @version 1.0
* <pre><b>email: </b>849586227@qq.com</pre>
* <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
* <p>The reactor bootstrap.</p>
*/

public class Bootstrap {
private static final Logger LOGGER = LoggerFactory
.getLogger(Bootstrap.class);
private static String acceptorName = "acceptor-thread";
private static String host = "localhost";
private static int port = 6789;

public static void main(String[] args) {
try {
LOGGER.info("starting up ......");
Handler handler = new MyHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
new Acceptor(reactorPool, acceptorName, host, port).start();
LOGGER.info("started up successfully.");
while (true) {
Thread.sleep(300 * 1000);
}
} catch (Throwable e) {
LOGGER.error(" launch error", e);
System.exit(-1);
}
}
}

net-reactor

it’s a simple and easy net framework with nio mode written by java

how-to

just simply like:

Handler handler = new MyHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
new Acceptor(reactorPool, acceptorName, host, port).start();