您好,欢迎访问一九零五行业门户网

怎么使用Java多线程实现第三方数据同步

一、场景最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+。在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换;增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换。在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!!
二、获取数据的方式2.1 递归方式
使用递归方式时,要求数据量少,否则会出现栈溢出或堆溢出!!!并且递归方式是单线程,所以会导致同步速度很慢!!!
/** * 数据同步 - 递归方式 * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。 * data为自定义实体类,这里仅做示例!!!*/ private void fetchandsavedb(int pageindex, int pagesize) throws exception { log.info("【数据同步 - 存量】,第{}次同步,", pageindex); list<data> datas= getdatabypage(pageindex,pagesize); if (collectionutils.isnotempty(datas)) { dataservice.saveorupdatebatch(datas); log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageindex); if (datas.size() < pagesize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageindex); return; } // 递归操作-直到数据同步完毕 fetchandsavedb(pageindex + 1, pagesize); } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageindex); return; } } /** * 获取分页数据,data为自定义实体类,这里仅做示例!!! */ private list<data> getdatabypage(int pageindex, int pagesize) throws exception { //通过feign调用第三方接口获取数据 string data = datafeignservice.fetchalldata(pagesize, pageindex); jsonobject jsonobject = jsonobject.parseobject(data); jsonarray datalist = jsonobject.getjsonarray("datalist"); list<data> datas = datalist.tojavalist(data.class); return datas; }
2.2 多线程方式
由于递归方式是单线程,考虑到数据的庞大,且易造成内存溢出,因此将递归更换成多线程方式,不仅避免了内存溢出的情况,且速度大大的提升!!!
public void synalldata() { // 定义原子变量 - 页数 atomicinteger pageindex = new atomicinteger(0); // 创建线程池 executorservice fixedthreadpool = executors.newfixedthreadpool(10); // 100万数据 int total = 1000000;//数据总量 int times = total / 1000; if (total % 1000!= 0) { times = times + 1; } localdatetime beginlocaldatetime = localdatetime.now(); log.info("【数据同步 - 存量】开始同步时间:{}", beginlocaldatetime.format(datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss"))); for (int index = 1; index <= times; index++) { fixedthreadpool.submit(new runnable() { @override public void run() { try { multifetchandsavedb(pageindex.incrementandget(), 1000); } catch (exception e) { log.error("并发获取并保存数据异常:{}", e); } } }); } localdatetime endlocaldatetime = localdatetime.now(); log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟", endlocaldatetime.format(datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss")), duration.between(beginlocaldatetime, endlocaldatetime).tominutes()); } /** * 数据同步 - 【多线程方式】 * * @throws exception */ private void multifetchandsavedb(int pageindex, int pagesize) throws exception { log.info("【数据同步 - 存量】,第{}次同步,", pageindex); list<data> datas= getdatabypage(pageindex, pagesize);//getdatabypage()同上2.1 if (collectionutils.isnotempty(datas)) { log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageindex); if (datas.size() < pagesize) { log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageindex); return; } } else { log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageindex); return; } }
三、增量数据如何对接增量数据需要写定时任务,可使用scheduled注解,并需要将增量数据存放到目标库中且进行数据转换!
以上就是怎么使用java多线程实现第三方数据同步的详细内容。
其它类似信息

推荐信息