您好,欢迎访问一九零五行业门户网

Java如何实现多线程大批量同步数据

背景最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据。
最后是打算把这两张表的数据同时存储到mongodb中去,以提高查询效率。
一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!!
最后,改造了一番,2小时,就成功同步了300w+数据。
以下是主要逻辑。
线程的个数请根据你自己的服务器性能酌情设置。
思路先通过count查出结果集的总条数,设置每个线程分页查询的条数,通过总条数和单次条数得到线程数量,通过改变limit的下标实现分批查询。
代码实现package com.github.admin.controller.loans;import com.baomidou.mybatisplus.mapper.entitywrapper;import com.github.admin.model.entity.casecheckcallrecord;import com.github.admin.model.entity.duyan.duyancallrecorddetail;import com.github.admin.model.entity.loans.casecallremarkrecord;import com.github.admin.service.duyan.duyancallrecorddetailservice;import com.github.admin.service.loans.casecallremarkrecordservice;import com.github.common.constant.mongodbconstant;import com.github.common.util.dingdingmsgsendutils;import com.github.common.util.listutils;import com.github.common.util.response;import com.github.common.util.concurrent.executors;import lombok.extern.slf4j.slf4j;import org.apache.commons.collections.collectionutils;import org.springframework.beans.beanutils;import org.springframework.beans.factory.disposablebean;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.value;import org.springframework.data.mongodb.core.mongotemplate;import org.springframework.data.mongodb.core.query.criteria;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.requestmapping;import org.springframework.web.bind.annotation.restcontroller;import java.util.arraylist;import java.util.list;import java.util.map;import java.util.concurrent.callable;import java.util.concurrent.executorservice;import java.util.concurrent.future;/** * 多线程同步历史数据 * @author songfayuan * @date 2019-09-26 15:38 */@slf4j@restcontroller@requestmapping("/demo")public class synchronizehistoricaldatacontroller implements disposablebean { private executorservice executor = executors.newfixedthreadpool(10, "synchronizehistoricaldatacontroller"); //newfixedthreadpool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 @value("${spring.profiles.active}") private string profile; @autowired private duyancallrecorddetailservice duyancallrecorddetailservice; @autowired private mongotemplate mongotemplate; @autowired private casecallremarkrecordservice casecallremarkrecordservice; /** * 多线程同步通话记录历史数据 * @param params * @return * @throws exception */ @getmapping("/synchistorydata") public response synchistorydata(map<string, object> params) throws exception { executor.execute(new runnable() { @override public void run() { try { logichandler(params); } catch (exception e) { log.warn("多线程同步稽查通话记录历史数据才处理异常,errmsg = {}", e); dingdingmsgsendutils.senddingdinggroupmsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errmsg = "+e); } } }); return response.success("请求成功"); } /** * 处理数据逻辑 * @param params * @throws exception */ private void logichandler(map<string, object> params) throws exception { /******返回结果:多线程处理完的最终数据******/ list<duyancallrecorddetail> result = new arraylist<>(); /******查询数据库总的数据条数******/ int count = this.duyancallrecorddetailservice.selectcount(new entitywrapper<duyancallrecorddetail>() .eq("is_delete", 0) .eq("platform_type", 1)); dingdingmsgsendutils.senddingdinggroupmsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");// int count = 2620266; /******限制每次查询的条数******/ int num = 1000; /******计算需要查询的次数******/ int times = count / num; if (count % num != 0) { times = times + 1; } /******每个线程开始查询的行数******/ int offset = 0; /******添加任务******/ list<callable<list<duyancallrecorddetail>>> tasks = new arraylist<>(); for (int i = 0; i < times; i++) { callable<list<duyancallrecorddetail>> qfe = new thredquery(duyancallrecorddetailservice, params, offset, num); tasks.add(qfe); offset = offset + num; } /******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/ list<list<callable<list<duyancallrecorddetail>>>> smalllist = listutils.partition(tasks, 10); for (list<callable<list<duyancallrecorddetail>>> callablelist : smalllist) { if (collectionutils.isnotempty(callablelist)) {// executor.execute(new runnable() {// @override// public void run() {// log.info("任务拆分执行开始:线程{}拆分处理开始...", thread.currentthread().getname());//// log.info("任务拆分执行结束:线程{}拆分处理开始...", thread.currentthread().getname());// }// }); try { list<future<list<duyancallrecorddetail>>> futures = executor.invokeall(callablelist); /******处理线程返回结果******/ if (futures != null && futures.size() > 0) { for (future<list<duyancallrecorddetail>> future : futures) { list<duyancallrecorddetail> duyancallrecorddetaillist = future.get(); if (collectionutils.isnotempty(duyancallrecorddetaillist)){ executor.execute(new runnable() { @override public void run() { /******异步存储******/ log.info("异步存储mongodb开始:线程{}拆分处理开始...", thread.currentthread().getname()); savemongodb(duyancallrecorddetaillist); log.info("异步存储mongodb结束:线程{}拆分处理开始...", thread.currentthread().getname()); } }); } //result.addall(future.get()); } } } catch (exception e) { log.warn("任务拆分执行异常,errmsg = {}", e); dingdingmsgsendutils.senddingdinggroupmsg("【系统消息】" + profile + "环境,任务拆分执行异常,errmsg = "+e); } } } } /** * 数据存储mongodb * @param duyancallrecorddetaillist */ private void savemongodb(list<duyancallrecorddetail> duyancallrecorddetaillist) { for (duyancallrecorddetail duyancallrecorddetail : duyancallrecorddetaillist) { /******重复数据不同步mongodb******/ org.springframework.data.mongodb.core.query.query query = new org.springframework.data.mongodb.core.query.query(); query.addcriteria(criteria.where("calluuid").is(duyancallrecorddetail.getcalluuid())); list<casecheckcallrecord> casecheckcallrecordlist = mongotemplate.find(query, casecheckcallrecord.class, mongodbconstant.case_check_call_record); if (collectionutils.isnotempty(casecheckcallrecordlist)) { log.warn("call_uuid = {}在mongodb已经存在数据,后面数据将被舍弃...", duyancallrecorddetail.getcalluuid()); continue; } /******关联填写的记录******/ casecallremarkrecord casecallremarkrecord = this.casecallremarkrecordservice.selectone(new entitywrapper<casecallremarkrecord>() .eq("is_delete", 0) .eq("call_uuid", duyancallrecorddetail.getcalluuid())); casecheckcallrecord casecheckcallrecord = new casecheckcallrecord(); beanutils.copyproperties(duyancallrecorddetail, casecheckcallrecord); //补充 casecheckcallrecord.setcollectoruserid(duyancallrecorddetail.getuserid()); if (casecallremarkrecord != null) { //补充 casecheckcallrecord.setcalleename(casecallremarkrecord.getcontactname()); } log.info("正在存储数据到mongodb:{}", casecheckcallrecord.tostring()); this.mongotemplate.save(casecheckcallrecord, mongodbconstant.case_check_call_record); } } @override public void destroy() throws exception { executor.shutdown(); }}class thredquery implements callable<list<duyancallrecorddetail>> { /******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/ private duyancallrecorddetailservice myservice; /******查询条件 根据条件来定义该类的属性******/ private map<string, object> params; /******分页index******/ private int offset; /******数量******/ private int num; public thredquery(duyancallrecorddetailservice myservice, map<string, object> params, int offset, int num) { this.myservice = myservice; this.params = params; this.offset = offset; this.num = num; } @override public list<duyancallrecorddetail> call() throws exception { /******通过service查询得到对应结果******/ list<duyancallrecorddetail> duyancallrecorddetaillist = myservice.selectlist(new entitywrapper<duyancallrecorddetail>() .eq("is_delete", 0) .eq("platform_type", 1) .last("limit "+offset+", "+num)); return duyancallrecorddetaillist; }}
listutils工具
package com.github.common.util;import com.google.common.collect.lists;import lombok.extern.slf4j.slf4j;import java.io.*;import java.util.arraylist;import java.util.list;/** * 描述:list工具类 * @author songfayuan * 2018年7月22日下午2:23:22 */@slf4jpublic class listutils { /** * 描述:list集合深拷贝 * @param src * @return * @throws ioexception * @throws classnotfoundexception * @author songfayuan * 2018年7月22日下午2:35:23 */ public static <t> list<t> deepcopy(list<t> src) { try { bytearrayoutputstream byteout = new bytearrayoutputstream(); objectoutputstream out = new objectoutputstream(byteout); out.writeobject(src); bytearrayinputstream bytein = new bytearrayinputstream(byteout.tobytearray()); objectinputstream in = new objectinputstream(bytein); @suppresswarnings("unchecked") list<t> dest = (list<t>) in.readobject(); return dest; } catch (classnotfoundexception e) { e.printstacktrace(); return null; } catch (ioexception e) { e.printstacktrace(); return null; } } /** * 描述:对象深拷贝 * @param src * @return * @throws ioexception * @throws classnotfoundexception * @author songfayuan * 2018年12月14日 */ public static <t> t objdeepcopy(t src) { try { bytearrayoutputstream byteout = new bytearrayoutputstream(); objectoutputstream out = new objectoutputstream(byteout); out.writeobject(src); bytearrayinputstream bytein = new bytearrayinputstream(byteout.tobytearray()); objectinputstream in = new objectinputstream(bytein); @suppresswarnings("unchecked") t dest = (t) in.readobject(); return dest; } catch (classnotfoundexception e) { log.error("errmsg = {}", e); return null; } catch (ioexception e) { log.error("errmsg = {}", e); return null; } } /** * 将一个list均分成n个list,主要通过偏移量来实现的 * @author songfayuan * 2018年12月14日 */ public static <t> list<list<t>> averageassign(list<t> source, int n) { list<list<t>> result = new arraylist<list<t>>(); int remaider = source.size() % n; //(先计算出余数) int number = source.size() / n; //然后是商 int offset = 0;//偏移量 for (int i = 0; i < n; i++) { list<t> value = null; if (remaider > 0) { value = source.sublist(i * number + offset, (i + 1) * number + offset + 1); remaider--; offset++; } else { value = source.sublist(i * number + offset, (i + 1) * number + offset); } result.add(value); } return result; } /** * list按指定长度分割 * @param list the list to return consecutive sublists of (需要分隔的list) * @param size the desired size of each sublist (the last may be smaller) (分隔的长度) * @author songfayuan * @date 2019-07-07 21:37 */ public static <t> list<list<t>> partition(list<t> list, int size){ return lists.partition(list, size); // 使用guava } /** * 测试 * @param args */ public static void main(string[] args) { list<integer> biglist = new arraylist<>(); for (int i = 0; i < 101; i++){ biglist.add(i); } log.info("biglist长度为:{}", biglist.size()); log.info("biglist为:{}", biglist); list<list<integer>> smallists = partition(biglist, 20); log.info("smallists长度为:{}", smallists.size()); for (list<integer> smallist : smallists) { log.info("拆分结果:{},长度为:{}", smallist, smallist.size()); } }}
以上就是java如何实现多线程大批量同步数据的详细内容。
其它类似信息

推荐信息