本人原创,转载请注明出处!欢迎大家加入 giraph 技术交流群 : 228591158 giraph中aggregator的用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析giraph如何实现 aggregators 。 基本原理 :在每个超级步中,每个worker计算
本人原创,转载请注明出处!欢迎大家加入giraph 技术交流群: 228591158
giraph中aggregator的用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析giraph如何实现aggregators。
基本原理:在每个超级步中,每个worker计算本地的聚集值。超级步计算完成后,把本地的聚集值发送给master汇总。在mastercompute()执行后,把全局的聚集值回发给所有的workers。
缺点:当某个应用(或算法)使用了多个聚集器(aggregators),master要完成所有聚集器的计算。因为master要接受、处理、发送大量的数据,无论是在计算方面还是网络通信层次,都会导致master成为系统瓶颈。
改进:采用分片聚集 (sharded aggregators) . 在每个超级步的最后,每个聚集器被派发给一个worker,该worker接受和聚集其他workers发送给该聚集器的值。然后workers把自己的所有的聚集器发送给master,这样master就无需执行任何聚集,只是接收每个聚集器的最终值。在mastercompute.compute执行后,master不是直接把所有的聚集器发送给所有的workers,而是发送给聚集器所属的worker,然后每个worker再把其上的聚集器发送给所有的workers.
首先给出master worker间, worker worker间通信协议,在每个类中的dorequest(serverdata serverdata)方法中会解析并存储收到的消息。
1). org.apache.giraph.comm.requests.sendworkeraggregatorsrequest 类 . worker --> worker owner
功能:每个worker把当前超步的局部 aggregated values 发送到该aggregator的拥有者。
2). org.apache.giraph.comm.requests.sendaggregatorstomasterrequest 类. worker owner--> master
功能:每个worker把自己所拥有的aggregator的最终 aggregated values 发送给 master。
3). org.apache.giraph.comm.requests.sendaggregatorstoownerrequest 类. master --> worker owner.
功能:master把最终的 aggregated values 或aggregators 发送给该aggregator的拥有者。
4). org.apache.giraph.comm.requests.sendaggregatorstoworkerrequest 类。 worker owner--> worker
功能: 发送最终的 aggregated values 到 其他workers。发送者为该aggregator的拥有者,接受者为除发送者之外的所有workers。
idc49rosvlayvu1hc3rlcknvbxb1dguuy29tchv0zsgpt723qnbqu/g1w7xevtu8r8b3jimymdu0mdu++c6qxusz9cq8jimymdu0mds8l3n0cm9uzz6ho7tttdoxupazrly2sr2/qsq8o6xnyxn0zxjdb21wdxrllmnvbxb1dguokbe9t6iyxbvxtcpby8v509bwzxj0zxguy29tchv0zsgp1nq12jc49rosvlayvb7bvk+1xcyjmja1nda7oam8l3a+cjxwpjeuiltttdowupazrly2sr2/qsq8o6xcc3btzxj2awnltwfzdgvytfftw01hc3rlckfnz3jlz2f0b3jiyw5kbgvywoc1xgzpbmlzafn1cgvyu3rlcchnyxn0zxjdbgllbnqgbwfzdgvyq2xpzw50ksc3vbeosng+27yvxvffybeiuphxb3jrzxkjrl7bvk/g97xedmfsdwxoqsnp0ru49rosvlayvbxeyku+1r7bvk8miziwntqwo6oozmluywwgywdncmvnyxrlzcb2ywx1zxojqaostdrsu7tozqqz9cq8jimymdu0mduho8/iupiz9k1hc3rlckfnz3jlz2f0b3jiyw5kbgvytcta4lzms9c52m+1o6zi58/co7o8l3a+cjxwpjxpbwcgc3jjpq==http://www.2cto.com/uploadfile/collfiles/20140523/201405230851307.jpg alt=\>
finishsuperstep(masterclient masterclient) 方法核心内容如下:
喎?http://www.2cto.com/kf/ware/vc/ target=_blank class=keylink>vcd4kphbyzsbjbgfzcz0=brush:sql;> /** * finalize aggregators for current superstep and share them with workers */ public void finishsuperstep(masterclient masterclient) { for (aggregatorwrapper aggregator : aggregatormap.values()) { if (aggregator.ischanged()) { // if master compute changed the value, use the one he chose aggregator.setpreviousaggregatedvalue( aggregator.getcurrentaggregatedvalue()); // reset aggregator for the next superstep aggregator.resetcurrentaggregator(); } } /** * 把聚集器发送给所属的worker。发送内容: * 1). name of the aggregator * 2). class of the aggregator * 3). value of the aggretator */ try { for (map.entry> entry : aggregatormap.entryset()) { masterclient.sendaggregator(entry.getkey(), entry.getvalue().getaggregatorclass(), entry.getvalue().getpreviousaggregatedvalue()); } masterclient.finishsendingaggregatedvalues(); } catch (ioexception e) { throw new illegalstateexception(finishsuperstep: + ioexception occurred while sending aggregators, e); } }
问题1:如何确定aggregator的worker owner ?
答:根据aggregator的name来确定它所属的worker,计算方法如下:
/** * 根据aggregatorname和所有的workers列表来计算aggregator所属的worker * 参数aggregatorname:name of the aggregator * 参数workers: workers的list列表 * 返回值:worker which owns the aggregator */public static workerinfo getowner(string aggregatorname,list workers) { //用aggregatorname的hashcode()值模以 workers的总数目 int index = math.abs(aggregatorname.hashcode() % workers.size()); return workers.get(index); //返回aggregator所属的worker}
问题2:worker 如何判断自身是否接收完自己所拥有的aggregators?
答:master给某个worker发送aggregators时,同时发送到该worker的aggregators数目。使用的 sendaggregatorstoownerrequest类对消息进行封装和解析。
2. worker接受master发送的aggregator,worker把接收到的聚集体值发送给其他所有workers,然后每个workers就会得到上一个超级步的全局聚集值。
由前文知道,每个worker都有一个serverdata对象,serverdata类中关于aggregator的两个成员变量如下:
// 保存worker在当前超步拥有的aggregatorsprivate final owneraggregatorserverdata owneraggregator;// 保存前一个超步的aggregatorsprivate final allaggregatorserverdata allaggregatordata;
可以看到,owneraggregatordata用来存储在当前超步master发送给worker的聚集器,allaggregatordata用来保存上一个超级步全局的聚集值。owneraggregatordata和allaggregatordata值的初始化在sendaggregatorstoownerrequest 类中的dorequest(serverdata serverdata)方法中,如下:
public void dorequest(serverdata serverdata) { datainput input = getdatainput(); allaggregatorserverdata aggregatordata = serverdata.getallaggregatordata(); try { //收到的aggregators数目。在countingoutputstream类中有计数器counter, //每向输出流中添加一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。 int numaggregators = input.readint(); for (int i = 0; i maxbytesperaggregatorrequest) { requestprocessor.sendaggregatedvaluestomaster( aggregatoroutput.flush()); } } requestprocessor.sendaggregatedvaluestomaster(aggregatoroutput.flush()); // wait for master to receive aggregated values before proceeding serviceworker.getworkerclient().waitallrequests(); owneraggregatordata.reset(); }
调用关系如下:
4. 大同步后,master调用masteraggregatorhandler类的preparesusperstep(masterclient)方法,收集聚集器的值。方法内容如下:
public void preparesuperstep(masterclient masterclient) { // 收集上次超级步的聚集值,为master compute 做准备 for (aggregatorwrapper aggregator : aggregatormap.values()) { // 如果是 persistent aggregator,则累加 if (aggregator.ispersistent()) { aggregator.aggregatecurrent(aggregator.getpreviousaggregatedvalue()); } aggregator.setpreviousaggregatedvalue( aggregator.getcurrentaggregatedvalue()); aggregator.resetcurrentaggregator(); progressable.progress(); } }
然后调用mastercompute.compute()方法(可能会修改聚集器的值),在该方法内若根据聚集器的值调用了mastercompute类的haltcompute()方法来终止matercompute,则表明要结束整个job。那么master就会通知所有workers要结束整个作业;在该方法内若没有调用mastercompute类的haltcompute()方法,则回到步骤1继续进行迭代。
备注:job迭代结束条件有三,满足其一就行:
1) 达到最大迭代次数
2) 没有活跃顶点且没有消息在传递
3) 终止mastercompute计算
总结:为解决在多个aggregator条件下,master成为系统瓶颈的问题。采取了把所有aggregator派发给某一部分workers,由这些workers完成全局的聚集值的计算与发送,master只需要与这些workers进行简单数据通信即可,大大降低了master的工作量。
追加:下面用图示方法说明上述执行过程。
实验条件:
1). 一个master,四个worker
2). 两个aggregators,记为a1和a2。
1. master把aggregators发送给workers,收到aggregator的worker就作为该aggregator的owner。下图中master把a1发送给worker1,a2发送给worker3.那么worker1就作为a1的owner,worker3就是a2的owner。该步骤在masteraggregatorhandler类的finishsuperstep(masterclient masterclient) 方法中完成,使用的是sendaggregatorstoownerrequest 通信协议。注:每个owner worker 可能有多个聚集器。
图1 master分发aggregator
2. workers接受master发送的aggregator,然后把aggregator发送给其他workers。worker1要把a1分别发送给worker2、worker3和worker4;worker3要把a2分别发送给worker1、worker2和worker4。该步骤在workeraggregatorhandler类的preparesuperstep( workeraggregatorrequestprocessor requestprocessor)方法中完成,使用的是sendaggregatorstomasterrequest 通信协议。此步骤完成后,每个worker上都有了聚集器a1和a2(具体为上一个超步的全局最终聚集值)。
3. 每个worker调用vertex.compute()方法开始计算,收集本地的aggregator聚集值。对聚集体a1来说,worker1、worker2、worker3、worker4的本地聚集值依次记为:
a11 、a12、 a13、a14;对聚集器a2来说,worker1、worker2、worker3、worker4的本地聚集值依次记为:
a21 、a22、 a23、a24 。计算完成后,每个worker就要把本地的聚集值发送给聚集器的owner,聚集器的owner在接受的时候会合并聚集。那么a11 、a12、 a13、a14要发送给worker1进行全局聚集得到a1’,a21 、a22、 a23、a24 要发送给worker3进行全局聚集得到a2’ 。
公式如下:
此部分采用的是sendworkeraggregatorsrequest通信协议。worker1和worker3要把汇总的a1和a2的新值:a1’ 和a2’发送给master,供下一次超级步的mastercompute.compute()方法使用采用的是sendaggregatorstomasterrequest通信协议。此部分在workeraggregatorhandler类的finishsuperstep( workeraggregatorrequestprocessor requestprocessor)方法中完成。过程如下图所示:
4. master收到worker1发送的a1’ 和woker3发送的a2’后,此步骤在masteraggregatorhandler类的preparesusperstep(masterclient)方法中完成。然后调用mastercompute.compute()方法,此方法可能会修改聚集器的值,如得到a1’’和a2’’。在mastercompute.compute()方法内若根据聚集器的值调用了mastercompute类的haltcompute()方法来终止matercompute,则表明要结束整个job。那么master就会通知所有workers要结束整个作业;在该方法内若没有调用mastercompute类的haltcompute()方法,则回到步骤1继续进行迭代,继续把a1’’发送给worker1,a2’’发送给worker3。
完!
本人原创,转载请注明出处!欢迎大家加入giraph 技术交流群: 228591158