您好,欢迎访问一九零五行业门户网

Java NIO 反应堆模式

java nio 反应堆模式简单模型
一般nio里反应堆模式都是这样:一个acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到socket channel注册到按某种算法从reactor池中取出的一个reactor上,注册的事件为读,写等,之后这个socket channel的所有io事件都和acceptor没关系,都由被注册到的那个reactor来负责。
每个acceptor和每个reactor都各自持有一个selector
当然每个acceptor和reactor都得是一个线程(起码在逻辑上得是线程)
简单实现,三个类nioacceptor、nioreactor和reactorpool:
package cc.lixiaohui.demo.dp.reator; import java.io.ioexception; import java.net.inetsocketaddress; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; import java.util.objects; import java.util.set; import org.slf4j.logger; import org.slf4j.loggerfactory; /** * acceptor负责处理selectionkey.op_accept事件, 将接收到的socketchannel注册到reactor上去 */ public class nioacceptor { private int port; private string host; private selector selector; // java nio selector private final serversocketchannel serverchannel; // java nio serversocketchannel private reactorpool reactorpool; // nioreactor池 private thread thread; // 工作线程 private volatile boolean stop = false; private static final logger logger = loggerfactory.getlogger(nioacceptor.class); public nioacceptor(int port, string host, int reactorpoolsize) throws ioexception { this.port = port; this.host = objects.requirenonnull(host); this.reactorpool = new reactorpool(reactorpoolsize); selector = selector.open(); // 创建selector serverchannel = serversocketchannel.open(); // new server socket channel serverchannel.configureblocking(false); // in non-blocking mode serverchannel.bind(new inetsocketaddress(host, port)); // bind serverchannel.register(selector, selectionkey.op_accept); // } public void stop() throws interruptedexception { stop = true; thread.join(); } public void start() { thread = new thread(new accepttask(this)); thread.start(); } private static class accepttask implements runnable { nioacceptor acceptor; accepttask(nioacceptor acceptor) { this.acceptor = acceptor; } public void run() { final selector selector = acceptor.selector; set<selectionkey> keys = null; while (!acceptor.stop) { // 运行中 try { selector.select(1000l); // select, 最多等1秒 keys = selector.selectedkeys(); try { for (selectionkey key : keys) { if (key.isvalid() && key.isacceptable()) { // 可accept socketchannel channel = acceptor.serverchannel.accept(); channel.configureblocking(false); // 取下一个reactor并把socketchannel加入到reactor的注册队列 acceptor.reactorpool.nextreactor().postregistry(channel); } else { key.cancel(); } } } finally { keys.clear(); } } catch (ioexception e) { logger.error("", e); } } } } }
/** * reactor负责selectionkey.op_read | selectionkey.op_write等事件 */ public class nioreactor { /** 待注册的{@link socketchannel} 队列 */ private queue<socketchannel> registerqueue = new concurrentlinkedqueue<socketchannel>(); private selector selector; private volatile boolean stop = false; private thread thread; private static final logger logger = loggerfactory.getlogger(nioreactor.class); public nioreactor() throws ioexception { selector = selector.open(); } public void postregistry(socketchannel channel) { registerqueue.add(channel); selector.wakeup(); // 唤醒selector, 以便让其即时处理注册 } public nioreactor start() { thread = new thread(new reacttask(this)); thread.start(); return this; } public void stop() throws interruptedexception { stop = true; thread.join(); } /** * 处理队列里面的待注册的socketchannel */ private void doregister(selector selector) { while (!registerqueue.isempty()) { socketchannel channel = registerqueue.poll(); try { // 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件 channel.register(selector, selectionkey.op_read); } catch (closedchannelexception e) { logger.error("", e); } } } private void handlewrite(selectionkey key) { // todo 业务写 } private void handleread(selectionkey key) { // todo 业务读 } private static class reacttask implements runnable { nioreactor reactor; reacttask(nioreactor reactor) { this.reactor = reactor; } public void run() { set<selectionkey> keys = null; while (!reactor.stop) { final selector selector = reactor.selector; try { selector.select(500l); reactor.doregister(selector); // 处理注册 keys = selector.selectedkeys(); for (selectionkey key : keys) { try { if (!key.isvalid()) { // not valid key.cancel(); continue; } if (key.isreadable()) { // 可读 reactor.handleread(key); } if (key.iswritable()) { // 可写 reactor.handlewrite(key); } } catch (throwable t) { logger.error("", t); continue; } } } catch (ioexception e) { logger.error("", e); } } } } }
reactorpool用来管理reactor:
public class reactorpool extends linkedlist<nioreactor>{ private static final long serialversionuid = 6525233920805533099l; private final int capacity; public reactorpool(int size) { this.capacity = size; } // 轮询算法取下一个reactor public nioreactor nextreactor() throws ioexception { // 新建或从头部拿一个reactor nioreactor reactor = size() < capacity ? new nioreactor().start() : poll(); add(reactor);// 加到尾部 return reactor; } }
其它类似信息

推荐信息