问题基于提出的需求,我认为主要有以下两个问题:
因为有本地缓存,如何保证数据一致性。当一个节点数据改变,其他节点的数据如何失效?
数据不对,需要重新同步,缓存如何失效?
流程图接下来就是配合产品和其他开发人员画出流程图,如下:
使用一张配置表,记录是否需要缓存,是否开启缓存,来达到通知时候缓存失效的情况。
因为项目要求一般,即使消息丢失,也不会存在太大的影响,所以最终选择了 redis 里面的订阅、发布功能,实现通知其他节点失效本地缓存。
开发上面问题清楚了,流程图也清楚了。那就准备开始写 bug 了。整体思路是自定义注解实现切面,尽量降低对业务代码的耦合度。
cacheconfig主要是在代码中解释,定义一个 cachemanager 以便与业务结合。需要特别注意最大可缓存条数,以免占用程序内存过多导致内存爆满。当然也不能太小了,因为还要考虑命中率的问题。所以这就得结合实际得业务来确定最终的大小。
@bean(name = jkddcx)@primarypublic cachemanager cachemanager() { caffeinecachemanager cachemanager = new caffeinecachemanager(); cachemanager.setcaffeine(caffeine.newbuilder() // 设置最后一次写入或访问后经过固定时间过期 .expireafteraccess(expire, time_unit) //设置本地缓存写入后过期时间 .expireafterwrite(expire, time_unit) // 初始的缓存空间大小 .initialcapacity(500) // 缓存的最大条数 .maximumsize(1000));// 使用人数 * 5 (每个人不同的入参 5 条)\ return cachemanager;}
@caffeinecache自定义注解,把可以用到的参数都能加上。
@target({ elementtype.method ,elementtype.type})@retention(retentionpolicy.runtime)@documentedpublic @interface caffeinecache { public string moudleid() default ""; //用于在数据库中配置参数 public string methodid() default ""; public string cachaname() default ""; //动态切换实际的 cachemanager public string cachemanager() default "";}
cachemessagelistener缓存监听器,主要是保证多节点数据一致性的问题。当一个节点缓存更新,通知其他的节点相应处理。使用 redis 的发布订阅功能,并实现messagelistener 接口来实现主要技术。
当然下面还有个细节就是一般生产环境是禁用 redis#keys 命令的,所以得换个方式扫描对应的 key。
public class cachemessagelistener implements messagelistener { @override public void onmessage(message message, byte[] pattern) { cachemessage cachemessage = (cachemessage) redistemplate.getvalueserializer().deserialize(message.getbody()); logger.info("收到redis清除缓存消息, 开始清除本地缓存, the cachename is {}, the key is {}", cachemessage.getcachename(), cachemessage.getkey());// rediscaffeinecachemanager.clearlocal(cachemessage.getcachename(), cachemessage.getkey()); /** * 如果是一个类上使用了 注解 @caffeinecache ,那么所有接口都会缓存。 * 下面的逻辑是:除了当前模块的接口访问的入参 key,其他的 redis 缓存都会被清除 * (比如此模块的表更新了,但是当前调用此接口只是缓存了当前这个入参的redis,其他的数据删除) */ string prefixkey = redisconstant.wxymg_data_cache + cachemessage.getcachename(); set<string> keys = redistemplate.execute((rediscallback<set<string>>) connection -> { set<string> keystmp = new hashset<>(); cursor<byte[]> cursor = connection.scan(new scanoptions.scanoptionsbuilder(). match(prefixkey + "*"). count(50).build()); while (cursor.hasnext()) { keystmp.add(new string(cursor.next())); } return keystmp; }); iterator iterator = keys.iterator(); while (iterator.hasnext()) { if (iterator.next().tostring().equals(cachemessage.getkey())) { iterator.remove(); } } redistemplate.delete(keys); cacheconfig.cachemanager().getcache(cachemessage.getcachename()).clear(); //cachename 下的都删除 }}
caffeinecacheaspect然后就是切面的逻辑处理,里面的内容和 流程图 一模一样,只是使用代码实现了需求。
其中:下面的代码是 redis 发布消息。
redistemplate.convertandsend(cacheconfig.topic, new cachemessage(caffeinecache.cachaname(), rediskey));
cachemessage这是在 redis 发布消息的时候一个消息体,也是自定义的,可以加更多的参数属性
public class cachemessage implements serializable { private static final long serialversionuid = -1l; private string cachename; private object key; public cachemessage(string cachename, object key) { super(); this.cachename = cachename; this.key = key; }}
以上就是怎么使用caffeine_redis自定义二级缓存的详细内容。