您好,欢迎访问一九零五行业门户网

怎么使用SpringBoot定时任务实现数据同步

前言业务的需求是,通过中台调用api接口获得,设备数据,要求现实设备数据的同步。
方案一:通过轮询接口的方式执行 pulldata() 方法实现数据同步
该方式的原理是先清空之前的所有数据,然后重新插入通过api调用获取的最新数据。该方法的优点,逻辑简单。缺点是,频繁删除、插入数据。再调用查询数据时候,某一时刻,数据全部删除,还没及时插入的时候。数据可能有异常。
方案二:通过轮询接口的方式执行 pulldatanew() 方法实现数据同步
该方式的原理是先查询数据库,已有数据,然后和通过api调用获取的最新数据进行比对,找出数据中增量、减量和变量,进行同步更新。该方法的优点,减少对数据库的频繁操作,提升性能。缺点:无发现明显缺点。
package com.hxtx.spacedata.task; import com.alibaba.fastjson.json;import com.alibaba.fastjson.jsonarray;import com.alibaba.fastjson.jsonobject;import com.baomidou.mybatisplus.core.conditions.query.lambdaquerywrapper;import com.baomidou.mybatisplus.core.toolkit.collectionutils;import com.google.api.client.util.lists;import com.hxtx.spacedata.common.domain.responsedto;import com.hxtx.spacedata.config.springcontextutil;import com.hxtx.spacedata.controller.file.filesminiocontroller;import com.hxtx.spacedata.domain.entity.entityconfig.entitypointentity;import com.hxtx.spacedata.service.entityconfig.entitypointservice;import com.hxtx.spacedata.util.httpproxyutil;import lombok.extern.slf4j.slf4j;import org.springframework.beans.factory.annotation.autowired;import org.springframework.beans.factory.annotation.value;import org.springframework.scheduling.annotation.scheduled;import org.springframework.stereotype.component;import org.springframework.transaction.annotation.transactional; import java.util.list;import java.util.map;import java.util.objects;import java.util.stream.collectors; /** * 中台设备数据 定时任务执行 * * @author tarzan liu * @version 1.0.0 * @description * @date 2020/12/07 */@component@slf4jpublic class entitypointtask { @autowired private entitypointservice entitypointservice; @value("${middleground.server.host}") private string host; @value("${middleground.server.port}") private string port; private static filesminiocontroller filesminiocontroller = springcontextutil.getbean(filesminiocontroller.class); /** * 设备定义点数据拉取 * * @author tarzan liu * @date 2020/12/2 */ @scheduled(cron = "0/30 * * * * ?") // 30秒校验一次 public void pulldatataskbycorn() { string result = httpproxyutil.sendget("http://" + host + ":" + port + "/interface/system/list"); jsonobject jsonobject = json.parseobject(result); if (objects.nonnull(jsonobject)) { jsonarray array = jsonobject.getjsonarray("data"); if (array != null && array.size() != 0) { for (int i = 0; i < array.size(); i++) { jsonobject obj = array.getjsonobject(i); string systemid = obj.getstring("id"); pulldatanew(systemid); } } } } @transactional(rollbackfor = throwable.class) public responsedto<string> pulldata(string code) { list<entitypointentity> list = lists.newarraylist(); string result = httpproxyutil.sendget("http://" + host + ":" + port + "/interface/defintionview/listbysystemid/" + code); jsonobject jsonobject = json.parseobject(result); if (objects.nonnull(jsonobject)) { jsonarray array = jsonobject.getjsonarray("data"); if (array != null && array.size() != 0) { for (int i = 0; i < array.size(); i++) { jsonobject obj = array.getjsonobject(i); string pointid = obj.getstring("pointid"); string name = obj.getstring("name"); list.add(entitypointentity.builder().pointid(pointid).name(name).code(code).build()); } list<entitypointentity> existlist = entitypointservice.list(new lambdaquerywrapper<entitypointentity>().eq(entitypointentity::getcode, code).isnotnull(entitypointentity::getvalue)); if (collectionutils.isnotempty(existlist)) { map<string, string> existmap = existlist.stream().collect(collectors.tomap(entitypointentity::getpointid, entitypointentity::getvalue)); list.foreach(e -> { string value = existmap.get(e.getpointid()); if (value != null) { e.setvalue(value); } }); } entitypointservice.remove(new lambdaquerywrapper<entitypointentity>().eq(entitypointentity::getcode, code)); entitypointservice.savebatch(list); } } return responsedto.succ(); } @transactional(rollbackfor = throwable.class) public responsedto<string> pulldatanew(string code) { string result = httpproxyutil.sendget("http://" + host + ":" + port + "/interface/defintionview/listbysystemid/" + code); jsonobject jsonobject = json.parseobject(result); if (objects.nonnull(jsonobject)) { jsonarray data = jsonobject.getjsonarray("data"); list<entitypointentity> list = data.tojavalist(entitypointentity.class); if (collectionutils.isnotempty(list)) { list.foreach(e -> e.setcode(code)); list<entitypointentity> existlist = entitypointservice.list(new lambdaquerywrapper<entitypointentity>().eq(entitypointentity::getcode, code)); if (collectionutils.isnotempty(existlist)) { //存在map map<string, string> existmap = existlist.stream().collect(collectors.tomap(entitypointentity::getpointid, entitypointentity::getname)); //传输map map<string, string> datamap = list.stream().collect(collectors.tomap(entitypointentity::getpointid, entitypointentity::getname)); //增量 list<entitypointentity> increment = list.stream().filter(e -> existmap.get(e.getpointid()) == null).collect(collectors.tolist()); if (collectionutils.isnotempty(increment)) { entitypointservice.savebatch(increment); } //减量 list<entitypointentity> decrement = existlist.stream().filter(e -> datamap.get(e.getpointid()) == null).collect(collectors.tolist()); if (collectionutils.isnotempty(decrement)) { entitypointservice.removebyids(decrement.stream().map(entitypointentity::getid).collect(collectors.tolist())); } //变量 list<entitypointentity> variable = existlist.stream().filter(e -> datamap.get(e.getpointid()) != null && !datamap.get(e.getpointid()).equals(e.getname())).collect(collectors.tolist()); if (collectionutils.isnotempty(variable)) { variable.foreach(e -> { e.setname(datamap.get(e.getpointid())); }); entitypointservice.updatebatchbyid(variable); } } else { entitypointservice.savebatch(list); } } } return responsedto.succ(); } }
数据库对应实体类
import com.baomidou.mybatisplus.annotation.idtype;import com.baomidou.mybatisplus.annotation.tableid;import com.baomidou.mybatisplus.annotation.tablename;import lombok.allargsconstructor;import lombok.builder;import lombok.data;import lombok.noargsconstructor; import java.io.serializable;import java.util.date; @builder@noargsconstructor@allargsconstructor@data@tablename(value = "t_entity_point")public class entitypointentity implements serializable { private static final long serialversionuid = 2181036545424452651l; /** * 定义点id */ @tableid(value = "id", type = idtype.assign_id) private long id; /** * 定义点id */ private string pointid; /** * 名称 */ private string name; /** * 绘制数据 */ private string value; /** * 编码 */ private string code; /** * 创建时间 */ private date createtime; }
http请求代理工具类
import lombok.extern.slf4j.slf4j;import org.apache.http.consts;import org.apache.http.httpentity;import org.apache.http.httpstatus;import org.apache.http.namevaluepair;import org.apache.http.client.config.requestconfig;import org.apache.http.client.entity.urlencodedformentity;import org.apache.http.client.methods.closeablehttpresponse;import org.apache.http.client.methods.httppost;import org.apache.http.conn.ssl.sslconnectionsocketfactory;import org.apache.http.conn.ssl.truststrategy;import org.apache.http.impl.client.closeablehttpclient;import org.apache.http.impl.client.httpclients;import org.apache.http.message.basicnamevaluepair;import org.apache.http.ssl.sslcontextbuilder;import org.apache.http.util.entityutils; import javax.net.ssl.sslcontext;import java.io.bufferedreader;import java.io.inputstreamreader;import java.io.printwriter;import java.net.url;import java.net.urlconnection;import java.security.cert.certificateexception;import java.security.cert.x509certificate;import java.util.arraylist;import java.util.list;import java.util.map; /** * http请求代理类 * * @author tarzan liu * @description 发送get post请求 */@slf4jpublic class httpproxyutil { /** * 使用urlconnection进行get请求 * * @param api_url * @return */ public static string sendget(string api_url) { return sendget(api_url, "", "utf-8"); } /** * 使用urlconnection进行get请求 * * @param api_url * @param param * @return */ public static string sendget(string api_url, string param) { return sendget(api_url, param, "utf-8"); } /** * 使用urlconnection进行get请求 * * @param api_url 请求路径 * @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值, 可以为空 * @param charset 字符集 * @return */ public static string sendget(string api_url, string param, string charset) { stringbuffer buffer = new stringbuffer(); try { // 判断有无参数,若是拼接好的url,就不必再拼接了 if (param != null && !"".equals(param)) { api_url = api_url + "?" + param; } log.info("请求的路径是:" + api_url); url realurl = new url(api_url); // 打开联接 urlconnection conn = realurl.openconnection(); // 设置通用的请求属性 conn.setrequestproperty("accept", "*/*"); conn.setrequestproperty("connection", "keep-alive"); conn.setrequestproperty("user-agent", "mozilla/4.0(compatible; msie 6.0; windows nt 5.1; sv1)"); conn.setconnecttimeout(12000); //设置连接主机超时(单位:毫秒) conn.setreadtimeout(12000); // 设置从主机读取数据超时(单位:毫秒) conn.connect(); // 建立实际的联接 // 定义 bufferedreader输入流来读取url的相应 try (bufferedreader in = new bufferedreader(new inputstreamreader(conn.getinputstream(), charset))) { string line; while ((line = in.readline()) != null) {// buffer.append("\n"+line); buffer.append(line); } } } catch (exception e) { log.error("发送get请求出现异常! " + e.getmessage()); return null; } // log.info("响应返回数据:" + buffer.tostring()); return buffer.tostring(); } /** * 使用urlconnection进行post请求 * * @param api_url 请求路径 * @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空 * @return */ public static string sendpost(string api_url, string param) { return sendpost(api_url, param, "utf-8"); } /** * 使用urlconnection进行post请求 * * @param api_url 请求路径 * @param param 请求格式有name1=value1&name2=value2、json、xml、map或其他形式,具体要看接收方的取值,最好不为空 * @param charset 字符集 * @return */ public static string sendpost(string api_url, string param, string charset) { stringbuffer buffer = new stringbuffer(); try { log.info("请求的路径是:" + api_url + ",参数是:" + param); url realurl = new url(api_url); // 打开联接 urlconnection conn = realurl.openconnection(); // 设置通用的请求属性 conn.setrequestproperty("accept", "*/*"); conn.setrequestproperty("connection", "keep-alive"); conn.setrequestproperty("user-agent", "mozilla/4.0(compatible; msie 6.0; windows nt 5.1; sv1)"); conn.setconnecttimeout(12000); //设置连接主机超时(单位:毫秒) conn.setreadtimeout(12000); // 设置从主机读取数据超时(单位:毫秒) // 发送post请求必须设置如下两行 conn.setdooutput(true); conn.setdoinput(true); // 获取urlconnection对象对应的输出流 try (printwriter out = new printwriter(conn.getoutputstream())) { out.print(param); // 发送请求参数 out.flush();// flush输出流的缓冲 } // 定义 bufferedreader输入流来读取url的相应,得指明使用utf-8编码,否则到api服务器xml的中文不能被成功识别 try (bufferedreader in = new bufferedreader(new inputstreamreader(conn.getinputstream(), charset))) { string line; while ((line = in.readline()) != null) {// buffer.append("\n"+line); buffer.append(line); } } } catch (exception e) { log.error("发送post请求出现异常! " + e.getmessage()); e.printstacktrace(); } log.info("响应返回数据:" + buffer.tostring()); return buffer.tostring(); } public static closeablehttpclient createsslclientdefault() throws exception { sslcontext sslcontext = new sslcontextbuilder().loadtrustmaterial(null, new alltruststrategy()).build(); sslconnectionsocketfactory sslsf = new sslconnectionsocketfactory(sslcontext); return httpclients.custom().setsslsocketfactory(sslsf).build(); } // 加载证书 private static class alltruststrategy implements truststrategy { public boolean istrusted(x509certificate[] x509certificates, string s) throws certificateexception { return true; } } /** * 支持https请求 * * @param url * @param param * @return * @throws exception */ public static string sendhttpclientpost(string url, map<string, string> param) throws exception { closeablehttpclient httpclient = createsslclientdefault(); httppost httppost = null; closeablehttpresponse response = null; string result = ""; try { // 发起http的post请求 httppost = new httppost(url); list<namevaluepair> paramlist = new arraylist<namevaluepair>(); for (string key : param.keyset()) { paramlist.add(new basicnamevaluepair(key, param.get(key))); } log.info("http请求地址:" + url + ",参数:" + paramlist.tostring()); // utf8+url编码 httppost.setentity(new urlencodedformentity(paramlist, consts.utf_8)); httppost.setconfig(requestconfig.custom().setconnecttimeout(30000).setsockettimeout(30000).build()); response = httpclient.execute(httppost); httpentity entity = response.getentity(); int statuscode = response.getstatusline().getstatuscode(); if (httpstatus.sc_ok == statuscode) { // 如果响应码是200 } result = entityutils.tostring(entity); log.info("状态码:" + statuscode + ",响应信息:" + result); } finally { if (response != null) { response.close(); } if (httppost != null) { httppost.releaseconnection(); } httpclient.close(); } return result; }}
以上就是怎么使用springboot定时任务实现数据同步的详细内容。
其它类似信息

推荐信息