您好,欢迎访问一九零五行业门户网

Java NIO原理分析与基本使用

java nio原理分析这里主要围绕着java nio展开,从java nio的基本使用,到介绍linux下nio api,再到java selector其底层的实现原理。
java nio基本使用
linux下的nio系统调用介绍
selector原理
channel和buffer之间的堆外内存
java nio基本使用从jdk nio文档里面可以发现,java将其划分成了三大块:channel,buffer以及多路复用selector。channel的存在,封装了对什么实体的连接通道(如网络/文件);buffer封装了对数据的缓冲存储,最后对于selector则是提供了一种可以以单线程非阻塞的方式,来处理多个连接。
基本应用示例nio的基本步骤是,创建selector和serversocketchannel,然后注册channel的accept事件,调用select方法,等待连接的到来,以及接收连接后将其注册到selector中。下面的为echo server的示例:
public class selectordemo {     public static void main(string[] args) throws ioexception {         selector selector = selector.open();         serversocketchannel socketchannel = serversocketchannel.open();         socketchannel.bind(new inetsocketaddress(8080));         socketchannel.configureblocking(false);         socketchannel.register(selector, selectionkey.op_accept);         while (true) {             int ready = selector.select();             if (ready == 0) {                 continue;             } else if (ready < 0) { break; } set<selectionkey> keys = selector.selectedkeys();             iterator<selectionkey> iterator = keys.iterator();             while (iterator.hasnext()) {                 selectionkey key = iterator.next();                 if (key.isacceptable()) {                     serversocketchannel channel = (serversocketchannel) key.channel();                     socketchannel accept = channel.accept();                     if (accept == null) {                         continue;                     }                     accept.configureblocking(false);                     accept.register(selector, selectionkey.op_read);                 } else if (key.isreadable()) {                     // 读事件                     deal((socketchannel) key.channel(), key);                 } else if (key.iswritable()) {                     // 写事件                     resp((socketchannel) key.channel(), key);                 }                 // 注:处理完成后要从中移除掉                 iterator.remove();             }         }         selector.close();         socketchannel.close();     }     private static void deal(socketchannel channel, selectionkey key) throws ioexception {         bytebuffer buffer = bytebuffer.allocate(1024);         bytebuffer responsebuffer = bytebuffer.allocate(1024);         int read = channel.read(buffer);         if (read > 0) {             buffer.flip();             responsebuffer.put(buffer);         } else if (read == -1) {             system.out.println(socket close);             channel.close();             return;         }         key.interestops(selectionkey.op_read | selectionkey.op_write);         key.attach(responsebuffer);     }     private static void resp(socketchannel channel, selectionkey key) throws ioexception {         bytebuffer buffer = (bytebuffer) key.attachment();         buffer.flip();         channel.write(buffer);         if (!buffer.hasremaining()) {             key.attach(null);             key.interestops(selectionkey.op_read);         }     } }
linux下的nio系统调用介绍在linux环境下,提供了几种方式可以实现nio,如epoll,poll,select等。对于select/poll,每次调用,都是从外部传入fd和监听事件,这就导致每次调用的时候,都需要将这些数据从用户态复制到内核态,就导致了每次调用代价比较大,而且每次从select/poll返回回来,都是全量的数据,需要自行去遍历检查哪些是ready的。对于epoll,则为增量式的,系统内部维护了所需要的fd和监听事件,要注册的时候,调用epoll_ctl即可,而每次调用,不再需要传入了,返回的时候,只返回ready的监听事件和fd。下面作个简单的伪代码:
具体的可以看以前的文章:
// 1. 创建server socket // 2. 绑定地址 // 3. 监听端口 // 4. 创建epoll int epollfd = epoll_create(1024); // 5. 注册监听事件 struct epoll_event event; event.events = epollin | epollrdhup | epollet; event.data.fd = serverfd; epoll_ctl(epollfd, epoll_ctl_add, serverfd, &event); while(true) {     readynums = epoll_wait( epollfd, events, 1024, -1 );          if ( readynums < 0 ) { printf("epoll_wait error\n"); exit(-1); } for ( i = 0; i < readynums; ++i) { if ( events[i].data.fd == serverfd ) { clientfd = accept( serverfd, null, null ); // 注册监听事件 ... }else if ( events[i].events & epollin ) { // 处理读事件 }else if ( events[i].events & epollrdhup ) { // 关闭连接事件 close( events[i].data.fd ); } }
selector原理selectionkey从java顶层使用者角度来看,channel通过注册,返回selectionkey,而selector.select方法,也是通过返回selectionkey来使用。那么这里为什么会需要这个类呢?这个类有什么作用?无论是任何语言,其实都脱离不了系统底层的支持,通过上述linux下的基本应用,可以知道,通过系统调用,向其传递和返回的都是fd以及事件这些参数,那么站在设计角度来看,就需要有一个映射关系,使得可以关联起来,这里有channel封装的是通过,如果将ready事件这些参数放在里面,不太合适,这个时候,selectionkey出现了,在selectionkey内部,保存channel的引用以及一些事件信息,然后selector通过fd找到selectionkey来进行关联。在底层ep里面,就有一个属性:map<integer,selectionkeyimpl> fdtokey。epollselectorimpl在linux 2.6+版本,java nio采用的epoll(即epollselectorimpl类),对于2.4.x的,则使用poll(即pollselectorimpl类),这里以epoll为例。
select方法顶层selector,通过调用select方法,最终会调用到epollselectorimpl.doselect方法,通过该方法,可以看到,其首先会处理一些不再注册的事件,调用pollwrapper.poll(timeout);,然后再进行一次清理,最后,可以看到需要处理映射关系
protected int doselect(long timeout)     throws ioexception {     if (closed)         throw new closedselectorexception();     // 处理一些不再注册的事件     processderegisterqueue();     try {         begin();         pollwrapper.poll(timeout);     } finally {         end();     }     // 再进行一次清理     processderegisterqueue();     int numkeysupdated = updateselectedkeys();     if (pollwrapper.interrupted()) {         // clear the wakeup pipe         pollwrapper.puteventops(pollwrapper.interruptedindex(), 0);         synchronized (interruptlock) {             pollwrapper.clearinterrupted();             ioutil.drain(fd0);             interrupttriggered = false;         }     }     return numkeysupdated; } private int updateselectedkeys() {     int entries = pollwrapper.updated;     int numkeysupdated = 0;     for (int i=0; i<entries; i++) { // 获取fd int nextfd = pollwrapper.getdescriptor(i); // 根据fd找到对应的selectionkey selectionkeyimpl ski = fdtokey.get(integer.valueof(nextfd)); // ski is null in the case of an interrupt if (ski != null) { // 找到该fd的ready事件 int rops = pollwrapper.geteventops(i); if (selectedkeys.contains(ski)) { // 将底层的事件转换为java封装的事件,selectionkey.op_read等 if (ski.channel.translateandsetreadyops(rops, ski)) { numkeysupdated++; } } else { // 没有在原有的selectedkey里面,说明是在等待过程中加入的 ski.channel.translateandsetreadyops(rops, ski); if ((ski.nioreadyops() & ski.niointerestops()) != 0) { // 需要更新selectedkeys集合 selectedkeys.add(ski); numkeysupdated++; } } } } // 返回ready的channel个数 return numkeysupdated; }
epollarraywrapperepollarraywrapper封装了底层的调用,里面包含几个native方法,如:
private native int epollcreate(); private native void epollctl(int epfd, int opcode, int fd, int events); private native int epollwait(long polladdress, int numfds, long timeout, int epfd) throws ioexception;
在openjdk的native目录(native/sun/nio/ch)里面可以找到对应的实现epollarraywrapper.c。
(这里顺带提一下,要实现native方法,可以在类里的方法加上native关键字,然后编译成class文件,再转换输出.h,c/c++底层实现该头文件的方法,编译成so库,放到对应目录即可)
在初始化文件方法里面,可以看到,是通过动态解析加载进来的,最终调用的epoll_create等方法。
jniexport void jnicall java_sun_nio_ch_epollarraywrapper_init(jnienv *env, jclass this) { epoll_create_func = (epoll_create_t) dlsym(rtld_default, "epoll_create"); epoll_ctl_func = (epoll_ctl_t) dlsym(rtld_default, "epoll_ctl"); epoll_wait_func = (epoll_wait_t) dlsym(rtld_default, "epoll_wait"); if ((epoll_create_func == null) || (epoll_ctl_func == null) || (epoll_wait_func == null)) { jnu_throwinternalerror(env, "unable to get address of epoll functions, pre-2.6 kernel?"); } }
channel和buffer之间的堆外内存经常会听见别人说,堆外内存容易泄漏,以及netty框架里面采用了堆外内存,减少拷贝提高性能。那么这里面的堆外内存指的是什么?之前怀着一个好奇心,通过read方法,最后追踪到socketchannelimpl里面read方法,里面调用了ioutil的read方法。里面会首先判断传入的buffer是不是directbuffer,如果不是(则是heapbytebuffer),则会创建一个临时的directbuffer,然后再将其复制到堆内。ioutil.read方法:
static int read(filedescriptor var0, bytebuffer var1, long var2, nativedispatcher var4, object var5) throws ioexception { if(var1.isreadonly()) { throw new illegalargumentexception("read-only buffer"); } else if(var1 instanceof directbuffer) { // 为堆外内存,则直接读取 return readintonativebuffer(var0, var1, var2, var4, var5); } else { // 为堆内内存,先获取临时堆外内存 bytebuffer var6 = util.gettemporarydirectbuffer(var1.remaining()); int var8; try { // 读取到堆外内存 int var7 = readintonativebuffer(var0, var6, var2, var4, var5); var6.flip(); if(var7 > 0) {                 // 复制到堆内                 var1.put(var6);             }             var8 = var7;         } finally {             // 释放临时堆外内存             util.offerfirsttemporarydirectbuffer(var6);         }         return var8;     } }
这里有一个问题就是,为什么会需要directbuffer以及堆外内存?通过对directbytebuffer的创建来分析,可以知道,通过unsafe.allocatememory(size);来分配内存的,而对于该方法来说,可以说是直接调用malloc返回,这一块内存是不受gc管理的,也就是所说的:堆外内存容易泄漏。但是对于使用directbytebuffer来说,会创建一个deallocator,注册到cleaner里面,当对象被回收的时候,则会被直接,从而释放掉内存,减少内存泄漏。要用堆外内存,从上面的创建来看,堆外内存创建后,以long型地址保存的,而堆内内存会受到gc影响,对象会被移动,如果采用堆内内存,进行系统调用的时候,那么gc就需要停止,否则就会有问题,基于这一点,采用了堆外内存(这一块参考了r大的理解:)。
注:堆外内存的创建(unsafe.cpp):
// 仅仅作了对齐以及将长度放在数组前方就返回了 unsafe_entry(jlong, unsafe_allocatememory(jnienv *env, jobject unsafe, jlong size))   unsafewrapper(unsafe_allocatememory);   size_t sz = (size_t)size;   if (sz != (julong)size || size < 0) {     throw_0(vmsymbols::java_lang_illegalargumentexception());   }   if (sz == 0) {     return 0;   }   sz = round_to(sz, heapwordsize);   void* x = os::malloc(sz);   if (x == null) {     throw_0(vmsymbols::java_lang_outofmemoryerror());   }   //copy::fill_to_words((heapword*)x, sz / heapwordsize);   return addr_to_java(x); unsafe_end
以上就是java nio原理分析与基本使用的详细内容。
其它类似信息

推荐信息