java nio原理分析这里主要围绕着java nio展开,从java nio的基本使用,到介绍linux下nio api,再到java selector其底层的实现原理。
java nio基本使用
linux下的nio系统调用介绍
selector原理
channel和buffer之间的堆外内存
java nio基本使用从jdk nio文档里面可以发现,java将其划分成了三大块:channel,buffer以及多路复用selector。channel的存在,封装了对什么实体的连接通道(如网络/文件);buffer封装了对数据的缓冲存储,最后对于selector则是提供了一种可以以单线程非阻塞的方式,来处理多个连接。
基本应用示例nio的基本步骤是,创建selector和serversocketchannel,然后注册channel的accept事件,调用select方法,等待连接的到来,以及接收连接后将其注册到selector中。下面的为echo server的示例:
public class selectordemo {
public static void main(string[] args) throws ioexception {
selector selector = selector.open();
serversocketchannel socketchannel = serversocketchannel.open();
socketchannel.bind(new inetsocketaddress(8080));
socketchannel.configureblocking(false);
socketchannel.register(selector, selectionkey.op_accept);
while (true) {
int ready = selector.select();
if (ready == 0) {
continue;
} else if (ready < 0) {
break;
}
set<selectionkey> keys = selector.selectedkeys();
iterator<selectionkey> iterator = keys.iterator();
while (iterator.hasnext()) {
selectionkey key = iterator.next();
if (key.isacceptable()) {
serversocketchannel channel = (serversocketchannel) key.channel();
socketchannel accept = channel.accept();
if (accept == null) {
continue;
}
accept.configureblocking(false);
accept.register(selector, selectionkey.op_read);
} else if (key.isreadable()) {
// 读事件
deal((socketchannel) key.channel(), key);
} else if (key.iswritable()) {
// 写事件
resp((socketchannel) key.channel(), key);
}
// 注:处理完成后要从中移除掉
iterator.remove();
}
}
selector.close();
socketchannel.close();
}
private static void deal(socketchannel channel, selectionkey key) throws ioexception {
bytebuffer buffer = bytebuffer.allocate(1024);
bytebuffer responsebuffer = bytebuffer.allocate(1024);
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
responsebuffer.put(buffer);
} else if (read == -1) {
system.out.println(socket close);
channel.close();
return;
}
key.interestops(selectionkey.op_read | selectionkey.op_write);
key.attach(responsebuffer);
}
private static void resp(socketchannel channel, selectionkey key) throws ioexception {
bytebuffer buffer = (bytebuffer) key.attachment();
buffer.flip();
channel.write(buffer);
if (!buffer.hasremaining()) {
key.attach(null);
key.interestops(selectionkey.op_read);
}
}
}
linux下的nio系统调用介绍在linux环境下,提供了几种方式可以实现nio,如epoll,poll,select等。对于select/poll,每次调用,都是从外部传入fd和监听事件,这就导致每次调用的时候,都需要将这些数据从用户态复制到内核态,就导致了每次调用代价比较大,而且每次从select/poll返回回来,都是全量的数据,需要自行去遍历检查哪些是ready的。对于epoll,则为增量式的,系统内部维护了所需要的fd和监听事件,要注册的时候,调用epoll_ctl即可,而每次调用,不再需要传入了,返回的时候,只返回ready的监听事件和fd。下面作个简单的伪代码:
具体的可以看以前的文章:
// 1. 创建server socket
// 2. 绑定地址
// 3. 监听端口
// 4. 创建epoll
int epollfd = epoll_create(1024);
// 5. 注册监听事件
struct epoll_event event;
event.events = epollin | epollrdhup | epollet;
event.data.fd = serverfd;
epoll_ctl(epollfd, epoll_ctl_add, serverfd, &event);
while(true) {
readynums = epoll_wait( epollfd, events, 1024, -1 );
if ( readynums < 0 )
{
printf("epoll_wait error\n");
exit(-1);
}
for ( i = 0; i < readynums; ++i)
{
if ( events[i].data.fd == serverfd )
{
clientfd = accept( serverfd, null, null );
// 注册监听事件
...
}else if ( events[i].events & epollin )
{
// 处理读事件
}else if ( events[i].events & epollrdhup )
{
// 关闭连接事件
close( events[i].data.fd );
}
}
selector原理selectionkey从java顶层使用者角度来看,channel通过注册,返回selectionkey,而selector.select方法,也是通过返回selectionkey来使用。那么这里为什么会需要这个类呢?这个类有什么作用?无论是任何语言,其实都脱离不了系统底层的支持,通过上述linux下的基本应用,可以知道,通过系统调用,向其传递和返回的都是fd以及事件这些参数,那么站在设计角度来看,就需要有一个映射关系,使得可以关联起来,这里有channel封装的是通过,如果将ready事件这些参数放在里面,不太合适,这个时候,selectionkey出现了,在selectionkey内部,保存channel的引用以及一些事件信息,然后selector通过fd找到selectionkey来进行关联。在底层ep里面,就有一个属性:map<integer,selectionkeyimpl> fdtokey。epollselectorimpl在linux 2.6+版本,java nio采用的epoll(即epollselectorimpl类),对于2.4.x的,则使用poll(即pollselectorimpl类),这里以epoll为例。
select方法顶层selector,通过调用select方法,最终会调用到epollselectorimpl.doselect方法,通过该方法,可以看到,其首先会处理一些不再注册的事件,调用pollwrapper.poll(timeout);,然后再进行一次清理,最后,可以看到需要处理映射关系
protected int doselect(long timeout)
throws ioexception
{
if (closed)
throw new closedselectorexception();
// 处理一些不再注册的事件
processderegisterqueue();
try {
begin();
pollwrapper.poll(timeout);
} finally {
end();
}
// 再进行一次清理
processderegisterqueue();
int numkeysupdated = updateselectedkeys();
if (pollwrapper.interrupted()) {
// clear the wakeup pipe
pollwrapper.puteventops(pollwrapper.interruptedindex(), 0);
synchronized (interruptlock) {
pollwrapper.clearinterrupted();
ioutil.drain(fd0);
interrupttriggered = false;
}
}
return numkeysupdated;
}
private int updateselectedkeys() {
int entries = pollwrapper.updated;
int numkeysupdated = 0;
for (int i=0; i<entries; i++) {
// 获取fd
int nextfd = pollwrapper.getdescriptor(i);
// 根据fd找到对应的selectionkey
selectionkeyimpl ski = fdtokey.get(integer.valueof(nextfd));
// ski is null in the case of an interrupt
if (ski != null) {
// 找到该fd的ready事件
int rops = pollwrapper.geteventops(i);
if (selectedkeys.contains(ski)) {
// 将底层的事件转换为java封装的事件,selectionkey.op_read等
if (ski.channel.translateandsetreadyops(rops, ski)) {
numkeysupdated++;
}
} else {
// 没有在原有的selectedkey里面,说明是在等待过程中加入的
ski.channel.translateandsetreadyops(rops, ski);
if ((ski.nioreadyops() & ski.niointerestops()) != 0) {
// 需要更新selectedkeys集合
selectedkeys.add(ski);
numkeysupdated++;
}
}
}
}
// 返回ready的channel个数
return numkeysupdated;
}
epollarraywrapperepollarraywrapper封装了底层的调用,里面包含几个native方法,如:
private native int epollcreate();
private native void epollctl(int epfd, int opcode, int fd, int events);
private native int epollwait(long polladdress, int numfds, long timeout,
int epfd) throws ioexception;
在openjdk的native目录(native/sun/nio/ch)里面可以找到对应的实现epollarraywrapper.c。
(这里顺带提一下,要实现native方法,可以在类里的方法加上native关键字,然后编译成class文件,再转换输出.h,c/c++底层实现该头文件的方法,编译成so库,放到对应目录即可)
在初始化文件方法里面,可以看到,是通过动态解析加载进来的,最终调用的epoll_create等方法。
jniexport void jnicall
java_sun_nio_ch_epollarraywrapper_init(jnienv *env, jclass this)
{
epoll_create_func = (epoll_create_t) dlsym(rtld_default, "epoll_create");
epoll_ctl_func = (epoll_ctl_t) dlsym(rtld_default, "epoll_ctl");
epoll_wait_func = (epoll_wait_t) dlsym(rtld_default, "epoll_wait");
if ((epoll_create_func == null) || (epoll_ctl_func == null) ||
(epoll_wait_func == null)) {
jnu_throwinternalerror(env, "unable to get address of epoll functions, pre-2.6 kernel?");
}
}
channel和buffer之间的堆外内存经常会听见别人说,堆外内存容易泄漏,以及netty框架里面采用了堆外内存,减少拷贝提高性能。那么这里面的堆外内存指的是什么?之前怀着一个好奇心,通过read方法,最后追踪到socketchannelimpl里面read方法,里面调用了ioutil的read方法。里面会首先判断传入的buffer是不是directbuffer,如果不是(则是heapbytebuffer),则会创建一个临时的directbuffer,然后再将其复制到堆内。ioutil.read方法:
static int read(filedescriptor var0, bytebuffer var1, long var2, nativedispatcher var4, object var5) throws ioexception {
if(var1.isreadonly()) {
throw new illegalargumentexception("read-only buffer");
} else if(var1 instanceof directbuffer) {
// 为堆外内存,则直接读取
return readintonativebuffer(var0, var1, var2, var4, var5);
} else {
// 为堆内内存,先获取临时堆外内存
bytebuffer var6 = util.gettemporarydirectbuffer(var1.remaining());
int var8;
try {
// 读取到堆外内存
int var7 = readintonativebuffer(var0, var6, var2, var4, var5);
var6.flip();
if(var7 > 0) {
// 复制到堆内
var1.put(var6);
}
var8 = var7;
} finally {
// 释放临时堆外内存
util.offerfirsttemporarydirectbuffer(var6);
}
return var8;
}
}
这里有一个问题就是,为什么会需要directbuffer以及堆外内存?通过对directbytebuffer的创建来分析,可以知道,通过unsafe.allocatememory(size);来分配内存的,而对于该方法来说,可以说是直接调用malloc返回,这一块内存是不受gc管理的,也就是所说的:堆外内存容易泄漏。但是对于使用directbytebuffer来说,会创建一个deallocator,注册到cleaner里面,当对象被回收的时候,则会被直接,从而释放掉内存,减少内存泄漏。要用堆外内存,从上面的创建来看,堆外内存创建后,以long型地址保存的,而堆内内存会受到gc影响,对象会被移动,如果采用堆内内存,进行系统调用的时候,那么gc就需要停止,否则就会有问题,基于这一点,采用了堆外内存(这一块参考了r大的理解:)。
注:堆外内存的创建(unsafe.cpp):
// 仅仅作了对齐以及将长度放在数组前方就返回了
unsafe_entry(jlong, unsafe_allocatememory(jnienv *env, jobject unsafe, jlong size))
unsafewrapper(unsafe_allocatememory);
size_t sz = (size_t)size;
if (sz != (julong)size || size < 0) {
throw_0(vmsymbols::java_lang_illegalargumentexception());
}
if (sz == 0) {
return 0;
}
sz = round_to(sz, heapwordsize);
void* x = os::malloc(sz);
if (x == null) {
throw_0(vmsymbols::java_lang_outofmemoryerror());
}
//copy::fill_to_words((heapword*)x, sz / heapwordsize);
return addr_to_java(x);
unsafe_end
以上就是java nio原理分析与基本使用的详细内容。