java nio 反应堆模式简单模型
一般nio里反应堆模式都是这样:一个acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到socket channel注册到按某种算法从reactor池中取出的一个reactor上,注册的事件为读,写等,之后这个socket channel的所有io事件都和acceptor没关系,都由被注册到的那个reactor来负责。
每个acceptor和每个reactor都各自持有一个selector
当然每个acceptor和reactor都得是一个线程(起码在逻辑上得是线程)
简单实现,三个类nioacceptor、nioreactor和reactorpool:
package cc.lixiaohui.demo.dp.reator;
import java.io.ioexception;
import java.net.inetsocketaddress;
import java.nio.channels.selectionkey;
import java.nio.channels.selector;
import java.nio.channels.serversocketchannel;
import java.nio.channels.socketchannel;
import java.util.objects;
import java.util.set;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
/**
* acceptor负责处理selectionkey.op_accept事件, 将接收到的socketchannel注册到reactor上去
*/
public class nioacceptor {
private int port;
private string host;
private selector selector; // java nio selector
private final serversocketchannel serverchannel; // java nio serversocketchannel
private reactorpool reactorpool; // nioreactor池
private thread thread; // 工作线程
private volatile boolean stop = false;
private static final logger logger = loggerfactory.getlogger(nioacceptor.class);
public nioacceptor(int port, string host, int reactorpoolsize) throws ioexception {
this.port = port;
this.host = objects.requirenonnull(host);
this.reactorpool = new reactorpool(reactorpoolsize);
selector = selector.open(); // 创建selector
serverchannel = serversocketchannel.open(); // new server socket channel
serverchannel.configureblocking(false); // in non-blocking mode
serverchannel.bind(new inetsocketaddress(host, port)); // bind
serverchannel.register(selector, selectionkey.op_accept); //
}
public void stop() throws interruptedexception {
stop = true;
thread.join();
}
public void start() {
thread = new thread(new accepttask(this));
thread.start();
}
private static class accepttask implements runnable {
nioacceptor acceptor;
accepttask(nioacceptor acceptor) {
this.acceptor = acceptor;
}
public void run() {
final selector selector = acceptor.selector;
set<selectionkey> keys = null;
while (!acceptor.stop) { // 运行中
try {
selector.select(1000l); // select, 最多等1秒
keys = selector.selectedkeys();
try {
for (selectionkey key : keys) {
if (key.isvalid() && key.isacceptable()) { // 可accept
socketchannel channel = acceptor.serverchannel.accept();
channel.configureblocking(false);
// 取下一个reactor并把socketchannel加入到reactor的注册队列
acceptor.reactorpool.nextreactor().postregistry(channel);
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (ioexception e) {
logger.error("", e);
}
}
}
}
}
/**
* reactor负责selectionkey.op_read | selectionkey.op_write等事件
*/
public class nioreactor {
/** 待注册的{@link socketchannel} 队列 */
private queue<socketchannel> registerqueue = new concurrentlinkedqueue<socketchannel>();
private selector selector;
private volatile boolean stop = false;
private thread thread;
private static final logger logger = loggerfactory.getlogger(nioreactor.class);
public nioreactor() throws ioexception {
selector = selector.open();
}
public void postregistry(socketchannel channel) {
registerqueue.add(channel);
selector.wakeup(); // 唤醒selector, 以便让其即时处理注册
}
public nioreactor start() {
thread = new thread(new reacttask(this));
thread.start();
return this;
}
public void stop() throws interruptedexception {
stop = true;
thread.join();
}
/**
* 处理队列里面的待注册的socketchannel
*/
private void doregister(selector selector) {
while (!registerqueue.isempty()) {
socketchannel channel = registerqueue.poll();
try {
// 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件
channel.register(selector, selectionkey.op_read);
} catch (closedchannelexception e) {
logger.error("", e);
}
}
}
private void handlewrite(selectionkey key) {
// todo 业务写
}
private void handleread(selectionkey key) {
// todo 业务读
}
private static class reacttask implements runnable {
nioreactor reactor;
reacttask(nioreactor reactor) {
this.reactor = reactor;
}
public void run() {
set<selectionkey> keys = null;
while (!reactor.stop) {
final selector selector = reactor.selector;
try {
selector.select(500l);
reactor.doregister(selector); // 处理注册
keys = selector.selectedkeys();
for (selectionkey key : keys) {
try {
if (!key.isvalid()) { // not valid
key.cancel();
continue;
}
if (key.isreadable()) { // 可读
reactor.handleread(key);
}
if (key.iswritable()) { // 可写
reactor.handlewrite(key);
}
} catch (throwable t) {
logger.error("", t);
continue;
}
}
} catch (ioexception e) {
logger.error("", e);
}
}
}
}
}
reactorpool用来管理reactor:
public class reactorpool extends linkedlist<nioreactor>{
private static final long serialversionuid = 6525233920805533099l;
private final int capacity;
public reactorpool(int size) {
this.capacity = size;
}
// 轮询算法取下一个reactor
public nioreactor nextreactor() throws ioexception {
// 新建或从头部拿一个reactor
nioreactor reactor = size() < capacity ? new nioreactor().start() : poll();
add(reactor);// 加到尾部
return reactor;
}
}