您好,欢迎访问一九零五行业门户网

HDFS读文件过程分析:读取文件的Block数据

我们可以从java.io.inputstream类中看到,抽象出一个read方法,用来读取已经打开的inputstream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示: public abstract int read() throws ioexception; hadoop的dfsclient.dfsinpu
我们可以从java.io.inputstream类中看到,抽象出一个read方法,用来读取已经打开的inputstream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示:
public abstract int read() throws ioexception;
hadoop的dfsclient.dfsinputstream类实现了该抽象逻辑,如果我们清楚了如何从hdfs中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。
从hdfs读文件过程分析:获取文件对应的block列表(http://shiyanjun.cn/archives/925.html)中,我们已经获取到一个文件对应的block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。
client从datanode读取文件的一个字节
下面,我们通过分析dfsclient.dfsinputstream中实现的代码,读取hdfs上文件的内容。首先从下面的方法开始:
@override public synchronized int read() throws ioexception { int ret = read( onebytebuf, 0, 1 ); return ( ret 上面调用read(onebytebuf, 0, 1)读取一个字节到单字节缓冲区onebytebuf中,具体实现见如下方法:
@override public synchronized int read(byte buf[], int off, int len) throws ioexception { checkopen(); // 检查client是否正在运行 if (closed) { throw new ioexception(stream closed); } failures = 0; if (pos 0) { try { if (pos > blockend) { // blockend表示文件的长度(字节数) currentnode = blockseekto(pos); // 找到第pos个字节数据所在的datanode(实际根据该字节数据所在的block元数据来定位) } int reallen = (int) math.min((long) len, (blockend - pos + 1l)); int result = readbuffer(buf, off, reallen); // 读取一个字节到缓冲区中 if (result >= 0) { pos += result; // 每成功读取result个字节,pos增加result } else { // got a eos from reader though we expect more data on it. throw new ioexception(unexpected eos from the reader); } if (stats != null && result != -1) { stats.incrementbytesread(result); } return result; } catch (checksumexception ce) { throw ce; } catch (ioexception e) { if (retries == 1) { log.warn(dfs read: + stringutils.stringifyexception(e)); } blockend = -1; if (currentnode != null) { addtodeadnodes(currentnode); } if (--retries == 0) { throw e; } } } } return -1; }
读取文件数据的一个字节,具体过程如下:
检查流对象是否处于打开状态(前面已经获取到文件对应的block列表的元数据,并打开一个inputstream对象)从文件的第一个block开始读取,首先需要找到第一个block对应的数据块所在的datanode,可以从缓存的block列表中查询到(如果查找不到,则会与namenode进行一次rpc通信请求获取到)打开一个到该读取的block所在datanode节点的流,准备读取block数据建立了到datanode的连接后,读取一个字节数据到字节缓冲区中,返回读取的字节数(1个字节)在读取的过程中,以字节为单位,通过判断某个偏移位置的字节属于哪个block(根据block元数据所限定的字节偏移范围),在根据这个block去定位某一个datanode节点,这样就可连续地读取一个文件的全部数据(组成文件的、连续的多个block数据块)。
查找待读取的一个字节所在的datanode节点
上面public synchronized int read(byte buf[], int off, int len) throws ioexception方法,调用了blockseekto方法来获取,文件某个字节索引位置的数据所在的datanode节点。其实,很容易就能想到,想要获取到数据所在的datanode节点,一定是从block元数据中计算得到,然后根据client缓存的block映射列表,找到block对应的datanode列表,我们看一下blockseekto方法的代码实现:
private synchronized datanodeinfo blockseekto(long target) throws ioexception { ... ... datanodeinfo chosennode = null; int refetchtoken = 1; // only need to get a new access token once while (true) { locatedblock targetblock = getblockat(target, true); // 获取字节偏移位置为target的字节数据所在的block元数据对象 assert (target==this.pos) : wrong postion + pos + expect + target; long offsetintoblock = target - targetblock.getstartoffset(); dnaddrpair retval = choosedatanode(targetblock); // 选择一个datanode去读取数据 chosennode = retval.info; inetsocketaddress targetaddr = retval.addr; // 先尝试从本地读取数据,如果数据不在本地,则正常去读取远程的datanode节点 block blk = targetblock.getblock(); token accesstoken = targetblock.getblocktoken(); if (shouldtryshortcircuitread(targetaddr)) { try { blockreader = getlocalblockreader(conf, src, blk, accesstoken, chosennode, dfsclient.this.sockettimeout, offsetintoblock); // 创建一个用来读取本地数据的blockreader对象 return chosennode; } catch (accesscontrolexception ex) { log.warn(short circuit access failed , ex); //disable short circuit reads shortcircuitlocalreads = false; } catch (ioexception ex) { if (refetchtoken > 0 && tokenrefetchneeded(ex, targetaddr)) { /* get a new access token and retry. */ refetchtoken--; fetchblockat(target); continue; } else { log.info(failed to read + targetblock.getblock() + on local machine + stringutils.stringifyexception(ex)); log.info(try reading via the datanode on + targetaddr); } } } // 本地读取失败,按照更一般的方式去读取远程的datanode节点来获取数据 try { s = socketfactory.createsocket(); log.debug(connecting to + targetaddr); netutils.connect(s, targetaddr, getrandomlocalinterfaceaddr(), sockettimeout); s.setsotimeout(sockettimeout); blockreader = remoteblockreader.newblockreader(s, src, blk.getblockid(), accesstoken, blk.getgenerationstamp(), offsetintoblock, blk.getnumbytes() - offsetintoblock, buffersize, verifychecksum, clientname); // 创建一个远程的blockreader对象 return chosennode; } catch (ioexception ex) { if (refetchtoken > 0 && tokenrefetchneeded(ex, targetaddr)) { refetchtoken--; fetchblockat(target); } else { log.warn(failed to connect to + targetaddr + , add to deadnodes and continue + ex); if (log.isdebugenabled()) { log.debug(connection failure, ex); } // put chosen node into dead list, continue addtodeadnodes(chosennode); // 读取失败,会将选择的datanode加入到client的dead node列表,为下次读取选择合适的datanode读取文件数据提供参考元数据信息 } if (s != null) { try { s.close(); } catch (ioexception iex) { } } s = null; } } }
上面代码中,主要包括如下几个要点:
选择合适的datanode节点,提高读取效率在读取文件的时候,首先会从namenode获取文件对应的block列表元数据,返回的block列表是按照datanode的网络拓扑结构进行排序过的(本地节点优先,其次是同一机架节点),而且,client还维护了一个dead node列表,只要此时bock对应的datanode列表中节点不出现在dead node列表中就会被返回,用来作为读取数据的datanode节点。
如果client为集群datanode节点,尝试从本地读取block通过调用choosedatanode方法返回一个datanode结点,通过判断,如果该节点地址是本地地址,并且该节点上对应的block元数据信息的状态不是正在创建的状态,则满足从本地读取数据块的条件,然后会创建一个localblockreader对象,直接从本地读取。在创建localblockreader对象的过程中,会先从缓存中查找一个本地datanode相关的localdatanodeinfo对象,该对象定义了与从本地datanode读取数据的重要信息,以及缓存了待读取block对应的本地路径信息,可以从localdatanodeinfo类定义的属性来说明:
private clientdatanodeprotocol proxy = null; private final map cache;
如果缓存中存在待读取的block的相关信息,可以直接进行读取;否则,会创建一个proxy对象,以及计算待读取block的路径信息blocklocalpathinfo,最后再加入到缓存,为后续可能的读取加速。我们看一下如果没有从缓存中找到localdatanodeinfo信息(尤其是blocklocalpathinfo),则会执行如下逻辑:
// make rpc to local datanode to find local pathnames of blocks pathinfo = proxy.getblocklocalpathinfo(blk, token);
上面proxy为clientdatanodeprotocol类型,client与datanode进行rpc通信的协议,rpc调用getblocklocalpathinfo获取block对应的本地路径信息,可以在datanode类中查看具体实现,如下所示:
blocklocalpathinfo info = data.getblocklocalpathinfo(block);
datanode调用fsdataset(实现接口fsdatasetinterface)的getblocklocalpathinfo,如下所示:
@override //fsdatasetinterface public blocklocalpathinfo getblocklocalpathinfo(block block) throws ioexception { file datafile = getblockfile(block); // 获取本地block在本地datanode文件系统中的文件路径 file metafile = getmetafile(datafile, block); // 获取本地block在本地datanode文件系统中的元数据的文件路径 blocklocalpathinfo info = new blocklocalpathinfo(block, datafile.getabsolutepath(), metafile.getabsolutepath()); return info; }
接着可以直接去读取该block文件(如果需要检查校验和文件,会读取block的元数据文件metafile):
... // blockreaderlocal类的newblockreader静态方法 // get a local file system file blkfile = new file(pathinfo.getblockpath()); datain = new fileinputstream(blkfile); if (!skipchecksum) { // 如果检查block的校验和 // get the metadata file file metafile = new file(pathinfo.getmetapath()); checksumin = new fileinputstream(metafile); // read and handle the common header here. for now just a version blockmetadataheader header = blockmetadataheader.readheader(new datainputstream(checksumin)); short version = header.getversion(); if (version != fsdataset.metadata_version) { log.warn(wrong version ( + version + ) for metadata file for + blk + ignoring ...); } datachecksum checksum = header.getchecksum(); localblockreader = new blockreaderlocal(conf, file, blk, token, startoffset, length, pathinfo, checksum, true, datain, checksumin); } else { localblockreader = new blockreaderlocal(conf, file, blk, token, startoffset, length, pathinfo, datain); }
在上面代码中,返回了blocklocalpathinfo,但是很可能在这个过程中block被删除了,在删除block的时候,namenode会调度指派该datanode删除该block,恰好在这个时间间隔内block对应的blocklocalpathinfo信息已经失效(文件已经被删除),所以上面这段代码再try中会抛出异常,并在catch中捕获到io异常,会从缓存中再清除掉失效的block到blocklocalpathinfo的映射信息。
如果client非集群datanode节点,远程读取block如果client不是datanode本地节点,则只能跨网络节点远程读取,首先创建socket连接:
s = socketfactory.createsocket(); log.debug(connecting to + targetaddr); netutils.connect(s, targetaddr, getrandomlocalinterfaceaddr(), sockettimeout); s.setsotimeout(sockettimeout);
建立client到目标datanode(targetaddr)的连接,然后同样也是创建一个远程blockreader对象remoteblockreader来辅助读取block数据。创建remoteblockreader过程中,首先向目标datanode发送rpc请求:
// in and out will be closed when sock is closed (by the caller) dataoutputstream out = new dataoutputstream(new bufferedoutputstream(netutils.getoutputstream(sock,hdfsconstants.write_timeout))); //write the header. out.writeshort( datatransferprotocol.data_transfer_version ); // client与datanode之间传输数据的版本号 out.write( datatransferprotocol.op_read_block ); // 传输操作类型:读取block out.writelong( blockid ); // block id out.writelong( genstamp ); // 时间戳信息 out.writelong( startoffset ); // block起始偏移量 out.writelong( len ); // block长度 text.writestring(out, clientname); // 客户端标识 accesstoken.write(out); out.flush();
然后获取到datainputstream对象来读取datanode的响应信息:
datainputstream in = new datainputstream( new bufferedinputstream(netutils.getinputstream(sock), buffersize));
最后,返回一个对象remoteblockreader:
return new remoteblockreader(file, blockid, in, checksum, verifychecksum, startoffset, firstchunkoffset, sock);
借助blockreader来读取block字节
我们再回到blockseekto方法中,待读取block所在的datanode信息、blockreader信息都已经具备,接着就可以从包含输入流(inputstream)对象的blockreader中读取数据块中一个字节数据:
int result = readbuffer(buf, off, reallen);
将block数据中一个字节读取到buf中,如下所示:
private synchronized int readbuffer(byte buf[], int off, int len) throws ioexception { ioexception ioe; boolean retrycurrentnode = true; while (true) { // retry as many times as seektonewsource allows. try { return blockreader.read(buf, off, len); // 调用blockreader的read方法读取字节数据到buf中 } catch ( checksumexception ce ) { log.warn(found checksum error for + currentblock + from + currentnode.getname() + at + ce.getpos()); reportchecksumfailure(src, currentblock, currentnode); ioe = ce; retrycurrentnode = false; // 只尝试读取当前选择的datanode一次,失败的话就会被加入到client的dead node列表中 } catch ( ioexception e ) { if (!retrycurrentnode) { log.warn(exception while reading from + currentblock + of + src + from + currentnode + : + stringutils.stringifyexception(e)); } ioe = e; } boolean sourcefound = false; if (retrycurrentnode) { /* possibly retry the same node so that transient errors don't * result in application level failures (e.g. datanode could have * closed the connection because the client is idle for too long). */ sourcefound = seektoblocksource(pos); } else { addtodeadnodes(currentnode); // 加入到client的dead node列表中 sourcefound = seektonewsource(pos); // 从当前选择的datanode上读取数据失败,会再次选择一个datanode,这里seektonewsource方法内部调用了blockseekto方法去选择一个datanode } if (!sourcefound) { throw ioe; } retrycurrentnode = false; } }
通过blockreaderlocal或者remoteblockreader来读取block数据,逻辑非常类似,主要是控制读取字节的偏移量,记录偏移量的状态信息,详细可以查看它们的源码。
datanode节点处理读文件block请求
我们可以在datanode端看一下,如何处理一个读取block的请求。如果client与datanode不是同一个节点,则为远程读取文件block,首先client需要发送一个请求头信息,代码如下所示:
//write the header. out.writeshort( datatransferprotocol.data_transfer_version ); // client与datanode之间传输数据的版本号 out.write( datatransferprotocol.op_read_block ); // 传输操作类型:读取block out.writelong( blockid ); // block id out.writelong( genstamp ); // 时间戳信息 out.writelong( startoffset ); // block起始偏移量 out.writelong( len ); // block长度 text.writestring(out, clientname); // 客户端标识 accesstoken.write(out); out.flush();
datanode节点端通过验证数据传输版本号(datatransferprotocol.data_transfer_version)一致以后,会判断传输操作类型,如果是读操作datatransferprotocol.op_read_block,则会通过client建立的socket来创建一个outputstream对象,然后通过blocksender向client发送block数据,代码如下所示:
try { blocksender = new blocksender(block, startoffset, length, true, true, false, datanode, clienttracefmt); // 创建blocksender对象 } catch(ioexception e) { out.writeshort(datatransferprotocol.op_status_error); throw e; } out.writeshort(datatransferprotocol.op_status_success); // 回复一个响应header信息:成功状态 long read = blocksender.sendblock(out, basestream, null); // 发送请求的block数据
原文地址:hdfs读文件过程分析:读取文件的block数据, 感谢原作者分享。
其它类似信息

推荐信息