netty12---线程池简单源码

时间:2023-03-09 02:07:49
netty12---线程池简单源码

netty12---线程池简单源码

package com.cn;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import com.cn.pool.NioSelectorRunnablePool; /**
* 抽象selector线程基类
*/
public abstract class AbstractNioSelector implements Runnable { /**
* 线程池
*/
private final Executor executor; /**
* 选择器
*/
protected Selector selector; /**
* 选择器wakenUp状态标记
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean(); /**
* 线程安全任务队列
*/
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); /**
* 线程名称
*/
private String threadName; /**
* 线程池管理对象
*/
protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
this.executor = executor;
this.threadName = threadName;
this.selectorRunnablePool = selectorRunnablePool;
openSelector();
} /**
* 获取selector并启动线程,一个线程拥有了select才能为多个客服服务。
*/
private void openSelector() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException("Failed to create a selector.");
}
executor.execute(this);//像线程池中加入一个任务,并执行任务的run方法。运行当前任务,执行run方法。从线程池拿出一个线程执行这个任务。
} @Override
public void run() { Thread.currentThread().setName(this.threadName);//给当前线程付一个名字 while (true) {
try {
wakenUp.set(false); select(selector);//接口,执行NioServerBoss或者NioServerWorker的select方法 processTaskQueue();//执行完任务队列里面的任务 process(selector);//接口,执行NioServerBoss或者NioServerWorker的process方法
} catch (Exception e) {
// ignore
}
} } /**
* 注册一个任务并激活selector
*
* @param task
*/
protected final void registerTask(Runnable task) {
taskQueue.add(task); Selector selector = this.selector; if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {//wakenUp是不是false,是false就置为true,
selector.wakeup();
}
} else {
taskQueue.remove(task);
}
} /**
* 执行队列里的任务
*/
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();//task是runnable元素
}
} /**
* 获取线程管理对象
* @return
*/
public NioSelectorRunnablePool getSelectorRunnablePool() {
return selectorRunnablePool;
} /**
* select抽象方法
子类有重写
*/
protected abstract int select(Selector selector) throws IOException; /**
* selector的业务处理
子类有重写
*/
protected abstract void process(Selector selector) throws IOException; }
package com.cn;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor; import com.cn.pool.Boss;
import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
* boss实现类,每一个NioServerBoss再一个线程里面
*/
public class NioServerBoss extends AbstractNioSelector implements Boss{ public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor, threadName, selectorRunnablePool);
} @Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
} for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 新客户端
SocketChannel channel = server.accept();
// 设置为非阻塞
channel.configureBlocking(false);
// 获取一个worker
Worker nextworker = getSelectorRunnablePool().nextWorker();//通过线程管理对象获取一个worker(runnable任务对象),
// 注册新客户端接入任务,将新的连接请求交给worker。
nextworker.registerNewChannelTask(channel);//往别的任务队列里面加任务
//安卓里面,子线程不能改变UI,要改变就要向主线程的任务队列里面加任务。 System.out.println("新客户端链接");
}
} public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){
final Selector selector = this.selector;
registerTask(new Runnable() {
@Override
public void run() {
try {
//注册serverChannel到selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
} @Override
protected int select(Selector selector) throws IOException {
return selector.select();
}
}
package com.cn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor; import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
* worker实现类,每一个NioServerWorker再一个线程里面
*/
public class NioServerWorker extends AbstractNioSelector implements Worker{ public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor, threadName, selectorRunnablePool);
} @Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 移除,防止重复处理
ite.remove(); // 得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel(); // 数据总长度
int ret = 0;
boolean failure = true;
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取数据
try {
ret = channel.read(buffer);
failure = false;
} catch (Exception e) {
// ignore
}
//判断是否连接已断开
if (ret <= 0 || failure) {
key.cancel();
System.out.println("客户端断开连接");
}else{
System.out.println("收到数据:" + new String(buffer.array())); //回写数据
ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
channel.write(outBuffer);// 将消息回送给客户端
}
}
} /**
* 加入一个新的socket客户端
*/
public void registerNewChannelTask(final SocketChannel channel){
final Selector selector = this.selector;
registerTask(new Runnable() {
@Override
public void run() {
try {
//将客户端注册到selector中
channel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
} @Override
protected int select(Selector selector) throws IOException {
return selector.select(60000);
} }
package com.cn;

import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel; import com.cn.pool.Boss;
import com.cn.pool.NioSelectorRunnablePool;
/**
* 服务类
*/
public class ServerBootstrap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {
this.selectorRunnablePool = selectorRunnablePool;
} /**
* 监听端口
* @param localAddress
*/
public void bind(final SocketAddress localAddress){
try {
// 获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverChannel.configureBlocking(false);
// 将该通道对应的ServerSocket绑定到port端口
serverChannel.socket().bind(localAddress); //获取一个boss线程
Boss nextBoss = selectorRunnablePool.nextBoss();
//向boss注册一个ServerSocket通道
nextBoss.registerAcceptChannelTask(serverChannel);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.cn;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import com.cn.pool.NioSelectorRunnablePool;
/**
* 启动函数
*/
public class Start { public static void main(String[] args) { //管理线程池的,初始化2个线程池,一个boss一个work,
NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); //获取服务类
ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool); //绑定端口
bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start");
} }
package com.cn.pool;

import java.nio.channels.ServerSocketChannel;
/**
* boss接口
*/
public interface Boss { /**
* 加入一个新的ServerSocket,监听连接
*/
public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.cn.pool;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import com.cn.NioServerBoss;
import com.cn.NioServerWorker;
/**
* selector线程管理者
*
* 线程池是有多个线程,每个线程里面有一个任务队列,线程run的时候会从任务队列取一个任务出来,执行任务的run方法,
队列里面没有任务就阻塞等待新的任务进来。
*/
public class NioSelectorRunnablePool { /**
* boss任务数组,boss用来监听端口的,
*/
private final AtomicInteger bossIndex = new AtomicInteger();
private Boss[] bosses; /**
* worker任务数组,用来处理事件的,
*/
private final AtomicInteger workerIndex = new AtomicInteger();
private Worker[] workeres; //boss和worker是一个线程池
public NioSelectorRunnablePool(Executor boss, Executor worker) {
initBoss(boss, 1);//boss是一个线程池。
initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
} /**
* 初始化boss线程池的runable任务数组
* @param boss
* @param count
*/
private void initBoss(Executor boss, int count) {
this.bosses = new NioServerBoss[count];
//this.bosses是一个数组,里面是一个个的NioServerBoss,
//NioServerBoss是runnable任务对象。runnable对象里面有线程池、选择器、线程名、线程管理者。
//executor.execute(this);通过NioServerBoss里面的线程池把任务对象NioServerBoss自己运行起来。
//所有的NioServerBoss任务对象都是通过boss线程池来调度的。
for (int i = 0; i < bosses.length; i++) {
bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);//this是NioSelectorRunnablePool线程池管理者。
//boss thread 是任务runable的名字
}
} /**
* 初始化worker线程池的runable任务数组
* @param worker
* @param count
*/
private void initWorker(Executor worker, int count) {
this.workeres = new NioServerWorker[2/*count*/];
for (int i = 0; i < workeres.length; i++) {
//所有的NioServerWorker任务对象都是通过worker线程池来调度的。
workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);
}
//boss线程池里面有8个NioServerBoss.runable对象(8个大任务,开了8个线程),
//每一个NioServerWorker再一个线程里面。8个NioServerBoss.runable对象一开始就去run,
//每个NioServerBoss.runable对象里面有一个任务队列taskQueue,队列里面是一个个的Runnable对象。 /*
public static void main(String[] args) {
//创建一个线程池,可回收的,没任务就回收了。newCachedThreadPool可以很大。60秒没任务就回收。
ExecutorService pool = Executors.newCachedThreadPool();//线程池
for(int i = 1; i < 5; i++){//4个任务,一个任务就是一个Runnable
pool.execute(new Runnable() {//没有返回值
@Override
public void run() {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread name: " + Thread.currentThread().getName()); }
});
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pool.shutdown();//任务执行完就关了。
/*thread name: pool-1-thread-1
thread name: pool-1-thread-2
thread name: pool-1-thread-1
thread name: pool-1-thread-2 线程执行完了会回收,不一定开4个线程*/
} /**
* 获取一个worker的runable任务,给每个work平均分配
*/
public Worker nextWorker() {
return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];
} /**
* 获取一个boss的runable任务
*/
public Boss nextBoss() {
return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
}
}
package com.cn.pool;

import java.nio.channels.SocketChannel;
/**
* worker接口
*/
public interface Worker { /**
* 加入一个新的客户端会话,监听客户端的处理
*/
public void registerNewChannelTask(SocketChannel channel); }