您好,欢迎访问一九零五行业门户网

HDFS写文件过程分析

hdfs是一个分布式文件系统,在hdfs上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在hdfs文件系统上创建并写一个文件,流程如下图(来自《hadoop:the definitive guide》一书)所示: 具体过程描述如下: client调用distributedfilesy
hdfs是一个分布式文件系统,在hdfs上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在hdfs文件系统上创建并写一个文件,流程如下图(来自《hadoop:the definitive guide》一书)所示:
具体过程描述如下:
client调用distributedfilesystem对象的create方法,创建一个文件输出流(fsdataoutputstream)对象通过distributedfilesystem对象与hadoop集群的namenode进行一次rpc远程调用,在hdfs的namespace中创建一个文件条目(entry),该条目没有任何的block通过fsdataoutputstream对象,向datanode写入数据,数据首先被写入fsdataoutputstream对象内部的buffer中,然后数据被分割成一个个packet数据包以packet最小单位,基于socket连接发送到按特定算法选择的hdfs集群中一组datanode(正常是3个,可能大于等于1)中的一个节点上,在这组datanode组成的pipeline上依次传输packet这组datanode组成的pipeline反方向上,发送ack,最终由pipeline中第一个datanode节点将pipeline ack发送给client完成向文件写入数据,client在文件输出流(fsdataoutputstream)对象上调用close方法,关闭流调用distributedfilesystem对象的complete方法,通知namenode文件写入成功下面代码使用hadoop的api来实现向hdfs的文件写入数据,同样也包括创建一个文件和写数据两个主要过程,代码如下所示:
static string[] contents = new string[] { aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa, bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb, cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc, dddddddddddddddddddddddddddddddd, eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee, }; public static void main(string[] args) { string file = hdfs://h1:8020/data/test/test.log; path path = new path(file); configuration conf = new configuration(); filesystem fs = null; fsdataoutputstream output = null; try { fs = path.getfilesystem(conf); output = fs.create(path); // 创建文件 for(string line : contents) { // 写入数据 output.write(line.getbytes(utf-8)); output.flush(); } } catch (ioexception e) { e.printstacktrace(); } finally { try { output.close(); } catch (ioexception e) { e.printstacktrace(); } } }
结合上面的示例代码,我们先从fs.create(path);开始,可以看到filesystem的实现distributedfilesystem中给出了最终返回fsdataoutputstream对象的抽象逻辑,代码如下所示:
public fsdataoutputstream create(path f, fspermission permission, boolean overwrite, int buffersize, short replication, long blocksize, progressable progress) throws ioexception { statistics.incrementwriteops(1); return new fsdataoutputstream (dfs.create(getpathname(f), permission, overwrite, true, replication, blocksize, progress, buffersize), statistics); }
上面,dfsclient dfs的create方法中创建了一个outputstream对象,在dfsclient的create方法:
public outputstream create(string src, fspermission permission, boolean overwrite, boolean createparent, short replication, long blocksize, progressable progress, int buffersize ) throws ioexception { ... ...}
创建了一个dfsoutputstream对象,如下所示:
final dfsoutputstream result = new dfsoutputstream(src, masked, overwrite, createparent, replication, blocksize, progress, buffersize, conf.getint(io.bytes.per.checksum, 512));
下面,我们从dfsoutputstream类开始,说明其内部实现原理。
dfsoutputstream内部原理
打开一个dfsoutputstream流,client会写数据到流内部的一个缓冲区中,然后数据被分解成多个packet,每个packet大小为64k字节,每个packet又由一组chunk和这组chunk对应的checksum数据组成,默认chunk大小为512字节,每个checksum是对512字节数据计算的校验和数据。
当client写入的字节流数据达到一个packet的长度,这个packet会被构建出来,然后会被放到队列dataqueue中,接着datastreamer线程会不断地从dataqueue队列中取出packet,发送到复制pipeline中的第一个datanode上,并将该packet从dataqueue队列中移到ackqueue队列中。responseprocessor线程接收从datanode发送过来的ack,如果是一个成功的ack,表示复制pipeline中的所有datanode都已经接收到这个packet,responseprocessor线程将packet从队列ackqueue中删除。
在发送过程中,如果发生错误,所有未完成的packet都会从ackqueue队列中移除掉,然后重新创建一个新的pipeline,排除掉出错的那些datanode节点,接着datastreamer线程继续从dataqueue队列中发送packet。
下面是dfsoutputstream的结构及其原理,如图所示:
我们从下面3个方面来描述内部流程:
创建packetclient写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个chunk大小(512b)时,便会创建一个packet对象,然后向该packet对象中写chunk checksum校验和数据,以及实际数据块chunk data,校验和数据是基于实际数据块计算得到的。每次满足一个chunk大小时,都会向packet中写上述数据内容,直到达到一个packet对象大小(64k),就会将该packet对象放入到dataqueue队列中,等待datastreamer线程取出并发送到datanode节点。
发送packetdatastreamer线程从dataqueue队列中取出packet对象,放到ackqueue队列中,然后向datanode节点发送这个packet对象所对应的数据。
接收ack发送一个packet数据包以后,会有一个用来接收ack的responseprocessor线程,如果收到成功的ack,则表示一个packet发送成功。如果成功,则responseprocessor线程会将ackqueue队列中对应的packet删除。
dfsoutputstream初始化
首先看一下,dfsoutputstream的初始化过程,构造方法如下所示:
dfsoutputstream(string src, fspermission masked, boolean overwrite, boolean createparent, short replication, long blocksize, progressable progress, int buffersize, int bytesperchecksum) throws ioexception { this(src, blocksize, progress, bytesperchecksum, replication); computepacketchunksize(writepacketsize, bytesperchecksum); // 默认 writepacketsize=64*1024(即64k),bytesperchecksum=512(没512个字节计算一个校验和), try { if (createparent) { // createparent为true表示,如果待创建的文件的父级目录不存在,则自动创建 namenode.create(src, masked, clientname, overwrite, replication, blocksize); } else { namenode.create(src, masked, clientname, overwrite, false, replication, blocksize); } } catch(remoteexception re) { throw re.unwrapremoteexception(accesscontrolexception.class, filealreadyexistsexception.class, filenotfoundexception.class, nsquotaexceededexception.class, dsquotaexceededexception.class); } streamer.start(); // 启动一个datastreamer线程,用来将写入的字节流打包成packet,然后发送到对应的datanode节点上 }上面computepacketchunksize方法计算了一个packet的相关参数,我们结合代码来查看,如下所示: int chunksize = csize + checksum.getchecksumsize(); int n = datanode.pkt_header_len + size_of_integer; chunksperpacket = math.max((psize - n + chunksize-1)/chunksize, 1); packetsize = n + chunksize*chunksperpacket;
我们用默认的参数值替换上面的参数,得到:
int chunksize = 512 + 4; int n = 21 + 4; chunksperpacket = math.max((64*1024 - 25 + 516-1)/516, 1); // 127 packetsize = 25 + 516*127;
上面对应的参数,说明如下表所示:
参数名称 参数值 参数含义
chunksize 512+4=516 每个chunk的字节数(数据+校验和)
csize 512 每个chunk数据的字节数
psize 64*1024 每个packet的最大字节数(不包含header)
datanode.pkt_header_len 21 每个packet的header的字节数
chunksperpacket 127 组成每个packet的chunk的个数
packetsize 25+516*127=65557 每个packet的字节数(一个header+一组chunk)
在计算好一个packet相关的参数以后,调用create方法与namenode进行rpc请求,请求创建文件:
if (createparent) { // createparent为true表示,如果待创建的文件的父级目录不存在,则自动创建 namenode.create(src, masked, clientname, overwrite, replication, blocksize); } else { namenode.create(src, masked, clientname, overwrite, false, replication, blocksize); }
远程调用上面方法,会在fsnamesystem中创建对应的文件路径,并初始化与该创建的文件相关的一些信息,如租约(向datanode节点写数据的凭据)。文件在fsnamesystem中创建成功,就要初始化并启动一个datastreamer线程,用来向datanode写数据,后面我们详细说明具体处理逻辑。
packet结构与定义
client向hdfs写数据,数据会被组装成packet,然后发送到datanode节点。packet分为两类,一类是实际数据包,另一类是heatbeat包。一个packet数据包的组成结构,如图所示:
上图中,一个packet是由header和data两部分组成,其中header部分包含了一个packet的概要属性信息,如下表所示:
字段名称 字段类型 字段长度 字段含义
pktlen int 4 4 + datalen + checksumlen
offsetinblock long 8 packet在block中偏移量
seqno long 8 packet序列号,在同一个block唯一
lastpacketinblock boolean 1 是否是一个block的最后一个packet
datalen int 4 datapos – datastart,不包含header和checksum的长度
data部分是一个packet的实际数据部分,主要包括一个4字节校验和(checksum)与一个chunk部分,chunk部分最大为512字节。
在构建一个packet的过程中,首先将字节流数据写入一个buffer缓冲区中,也就是从偏移量为25的位置(checksumstart)开始写packet数据的chunk checksum部分,从偏移量为533的位置(datastart)开始写packet数据的chunk data部分,直到一个packet创建完成为止。如果一个packet的大小未能达到最大长度,也就是上图对应的缓冲区中,chunk checksum与chunk data之间还保留了一段未被写过的缓冲区位置,这种情况说明,已经在写一个文件的最后一个block的最后一个packet。在发送这个packet之前,会检查chunksum与chunk data之间的缓冲区是否为空白缓冲区(gap),如果有则将chunk data部分向前移动,使得chunk data 1与chunk checksum n相邻,然后才会被发送到datanode节点。
我们看一下packet对应的packet类定义,定义了如下一些字段:
bytebuffer buffer; // only one of buf and buffer is non-null byte[] buf; long seqno; // sequencenumber of buffer in block long offsetinblock; // 该packet在block中的偏移量 boolean lastpacketinblock; // is this the last packet in block? int numchunks; // number of chunks currently in packet int maxchunks; // 一个packet中包含的chunk的个数 int datastart; int datapos; int checksumstart; int checksumpos;
packet类有一个默认的没有参数的构造方法,它是用来做heatbeat的,如下所示:
packet() { this.lastpacketinblock = false; this.numchunks = 0; this.offsetinblock = 0; this.seqno = heart_beat_seqno; // 值为-1 buffer = null; int packetsize = datanode.pkt_header_len + size_of_integer; // 21+4=25 buf = new byte[packetsize]; checksumstart = datastart = packetsize; checksumpos = checksumstart; datapos = datastart; maxchunks = 0; }
通过代码可以看到,一个heatbeat的内容,实际上只有一个长度为25字节的header数据。通过this.seqno = heart_beat_seqno;的值可以判断一个packet是否是heatbeat包,如果seqno为-1表示这是一个heatbeat包。
client发送packet数据
可以dfsclient类中看到,发送一个packet之前,首先需要向选定的datanode发送一个header数据包,表明要向datanode写数据,该header的数据结构,如图所示:
上图显示的是client发送packet到第一个datanode节点的header数据结构,主要包括待发送的packet所在的block(先向namenode分配block id等信息)的相关信息、pipeline中另外2个datanode的信息、访问令牌(access token)和校验和信息,header中各个字段及其类型,详见下表:
字段名称 字段类型 字段长度 字段含义
transfer version short 2 client与datanode之间数据传输版本号,由常量datatransferprotocol.data_transfer_version定义,值为17
op int 4 操作类型,由常量datatransferprotocol.op_write_block定义,值为80
blkid long 8 block的id值,由namenode分配
gs long 8 时间戳(generation stamp),namenode分配blkid的时候生成的时间戳
dncnt int 4 datanode复制pipeline中datanode节点的数量
recovery flag boolean 1 recover标志
client text client主机的名称,在使用text进行序列化的时候,实际包含长度len与主机名称字符串clienthost
srcnode boolean 1 是否发送src node的信息,默认值为false,不发送src node的信息
nonsrcdncnt int 4 由client写的该header数据,该数不包含pipeline中第一个节点(即为dncnt-1)
dn2 datanodeinfo datanode信息,包括storageid、infoport、ipcport、capacity、dfsused、remaining、lastupdate、xceivercount、location、hostname、adminstate
dn3 datanodeinfo datanode信息,包括storageid、infoport、ipcport、capacity、dfsused、remaining、lastupdate、xceivercount、location、hostname、adminstate
access token token 访问令牌信息,包括identifierlength、identifier、pwdlength、pwd、kindlength、kind、servicelength、service
checksum header datachecksum 1+4 校验和header信息,包括type、bytesperchecksum
header数据包发送成功,client会收到一个成功响应码(datatransferprotocol.op_status_success = 0),接着将packet数据发送到pipeline中第一个datanode上,如下所示:
packet one = null; one = dataqueue.getfirst(); // regular data packet bytebuffer buf = one.getbuffer(); // write out data to remote datanode blockstream.write(buf.array(), buf.position(), buf.remaining()); if (one.lastpacketinblock) { // 如果是block中的最后一个packet,还要写入一个0标识该block已经写入完成 blockstream.writeint(0); // indicate end-of-block }
否则,如果失败,则会与namenode进行rpc调用,删除该block,并把该pipeline中第一个datanode加入到excludednodes列表中,代码如下所示:
if (!success) { log.info(abandoning + block); namenode.abandonblock(block, src, clientname); if (errorindex datanode端服务组件
数据最终会发送到datanode节点上,在一个datanode上,数据在各个组件之间流动,流程如下图所示:
datanode服务中创建一个后台线程dataxceiverserver,它是一个socketserver,用来接收来自client(或者datanode pipeline中的非最后一个datanode节点)的写数据请求,然后在dataxceiverserver中将连接过来的socket直接派发给一个独立的后台线程dataxceiver进行处理。所以,client写数据时连接一个datanode pipeline的结构,实际流程如图所示:
每个datanode服务中的dataxceiver后台线程接收到来自前一个节点(client/datanode)的socket连接,首先读取header数据:
block block = new block(in.readlong(), dataxceiverserver.estimateblocksize, in.readlong()); log.info(receiving + block + src: + remoteaddress + dest: + localaddress); int pipelinesize = in.readint(); // num of datanodes in entire pipeline boolean isrecovery = in.readboolean(); // is this part of recovery? string client = text.readstring(in); // working on behalf of this client boolean hassrcdatanode = in.readboolean(); // is src node info present if (hassrcdatanode) { srcdatanode = new datanodeinfo(); srcdatanode.readfields(in); } int numtargets = in.readint(); if (numtargets accesstoken = new token(); accesstoken.readfields(in);
上面代码中,读取header的数据,与前一个client/datanode写入header字段的顺序相对应,不再累述。在完成读取header数据后,当前datanode会首先将header数据再发送到pipeline中下一个datanode结点,当然该datanode肯定不是pipeline中最后一个datanode节点。接着,该datanode会接收来自前一个client/datanode节点发送的packet数据,接收packet数据的逻辑实际上在blockreceiver中完成,包括将来自前一个client/datanode节点发送的packet数据写入本地磁盘。在blockreceiver中,首先会将接收到的packet数据发送写入到pipeline中下一个datanode节点,然后再将接收到的数据写入到本地磁盘的block文件中。
datanode持久化packet数据
在datanode节点的blockreceiver中进行packet数据的持久化,一个packet是一个block中一个数据分组,我们首先看一下,一个block在持久化到磁盘上的物理存储结构,如下图所示:
每个block文件(如上图中blk_1084013198文件)都对应一个meta文件(如上图中blk_1084013198_10273532.meta文件),block文件是一个一个chunk的二进制数据(每个chunk的大小是512字节),而meta文件是与每一个chunk对应的checksum数据,是序列化形式存储。
写文件过程中client/datanode与namenode进行rpc调用
client在hdfs文件系统中写文件过程中,会发生多次与namenode节点进行rpc调用来完成写数据相关操作,主要是在如下时机进行rpc调用:
写文件开始时创建文件:client调用create在namenode节点的namespace中创建一个标识该文件的条目在client连接pipeline中第一个datanode节点之前,client调用addblock分配一个block(blkid+datanode列表+租约)如果与pipeline中第一个datanode节点连接失败,client调用abandonblock放弃一个已经分配的block一个block已经写入到datanode节点磁盘,client调用fsync让namenode持久化block的位置信息数据文件写完以后,client调用complete方法通知namenode写入文件成功datanode节点接收到并成功持久化一个block的数据后,datanode调用blockreceived方法通知namenode已经接收到block具体rpc调用的详细过程,可以参考源码。
原文地址:hdfs写文件过程分析, 感谢原作者分享。
其它类似信息

推荐信息