基于spring4.0配置分布式ehcache,以及相关使用

时间:2023-03-09 14:55:50
基于spring4.0配置分布式ehcache,以及相关使用

说明:本文是基于RMI手动同步的方式,使用程序动态注入配置缓存,抛弃传统的ehcache.xml配置方式

1,注入cacheManager管理所有缓存,添加各个缓存名及相关参数配置:

思路大致是:

在项目的主配置类中,通过@Bean注入cacheManager,统一管理所有cache,传统的用xml方式配置太累赘,其实只需要更该chacheName,其他参数变化不是很大,考虑用程序封装做到最大程度的代码重用所以采取拼接JSON串的形式,配置缓存名,及所有相关需要设置好的参数。然后通过解析JSON,拼接成带主机IP的RmiUrl,以manual手动rmi同步的方式配置peerDiscovery成员发现,再通过配置cacheManagerPeerListenerFactory这个类启动本机监听程序,来发现其他主机发来的同步请求。最后再用@EnableCaching注解开启spring支持对ehcache注解使用

下面是项目配置好的详细代码:

 package com.cshtong.tower.web.init;

 import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties; import javax.annotation.Resource; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.ehcache.EhCacheCacheManager;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.cache.interceptor.SimpleKeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.cshtong.tower.service.UserService;
import com.cshtong.tower.util.StringUtil; import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheException;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.config.DiskStoreConfiguration;
import net.sf.ehcache.config.FactoryConfiguration;
import net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory;
import net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory;
import net.sf.ehcache.distribution.RMICacheReplicatorFactory;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy; @EnableCaching
@Configuration
@EnableTransactionManagement
public class EhcacheConfig extends RMICacheReplicatorFactory{ private static Logger logger = LoggerFactory.getLogger(EhcacheConfig.class);
private static final String EHCACHE_PROPERTIES = "ehcache.properties"; /** 最大缓存数量 0 = no limit. **/
private static final Integer MAX_CACHE = 0;
/** 缓存失效时间间隔/秒 **/
private static final Integer TIME_TOLIVE_SECONDS = 24*60*60;
/** 缓存闲置多少秒后自动销毁 **/
private static final Integer TIME_TOIDLE_SECONDS = 60*60;
/** 磁盘失效线程运行时间间隔/秒。 **/
private static final Integer DISK_EXPIRY_Thread_INTERVAL_SENCONDS = 100; @Resource
UserService userService; /** 注入cacheManager **/
@Bean
public org.springframework.cache.CacheManager cacheManager() {
org.springframework.cache.CacheManager cm = new EhCacheCacheManager(putCache());
return cm;
} @Bean
public KeyGenerator keyGenerator() {
return new SimpleKeyGenerator();
} @Bean
public CacheManager putCache(){ String rmiUrls = initRmiURLs(); net.sf.ehcache.config.Configuration cf = new net.sf.ehcache.config.Configuration();
Properties pro = initRmiUrlsProperties();
if (null!=rmiUrls) {
/** 临时文件目录 **/
cf.diskStore(new DiskStoreConfiguration().path("java.io.tmpdir"))
/**成员发现 peerDiscovery 方式:manual:手动,rmiUrls:主机名+端口号+缓存名,用来接受或者发送信息的接口,不同的缓存或者机器用“|”隔开 **/
.cacheManagerPeerProviderFactory(new FactoryConfiguration<FactoryConfiguration<?>>()
.className(RMICacheManagerPeerProviderFactory.class.getName())
.properties("peerDiscovery=manual,rmiUrls=" + rmiUrls)
);
/** 本机监听程序,来发现其他主机发来的同步请求 hostName=192.168.1.112 这里默认是本机可以不配置 **/
cf.cacheManagerPeerListenerFactory(new FactoryConfiguration<FactoryConfiguration<?>>()
.className(RMICacheManagerPeerListenerFactory.class.getName())
.properties("port="+ pro.getProperty("rmiPortNumber") +",socketTimeoutMillis=2000")
);
}else{
logger.debug("The rmiUrls is null!!");
} CacheManager cm = null; try {
cm = CacheManager.create(cf);
} catch (UncheckedIOException e) {
logger.debug("Fail to load config. message:{}", e.getMessage());
} List<Cache> array = null;
try {
array = cacheCollection();
} catch (CacheException e) {
logger.error("collect cache Failed");
} for (Cache list:array) {
cm.addCache(list);
} return cm;
} public static List<Cache> cacheCollection(){
String listParameters = cacheParametersCollection();
return cacheConf(listParameters);
} /**
* 添加缓存的参数
* 相关属性:
name : "缓存名称,cache的唯一标识(ehcache会把这个cache放到HashMap里)
maxElementsInMemory : 缓存最大个数,0没有限制。
eternal="false" : 对象是否永久有效,一但设置了,timeout将不起作用。 (必须设置)
maxEntriesLocalHeap="1000" : 堆内存中最大缓存对象数,0没有限制(必须设置)
maxEntriesLocalDisk= "1000" : 硬盘最大缓存个数。
overflowToDisk="false" : 当缓存达到maxElementsInMemory值是,是否允许溢出到磁盘(必须设设置)(内存不足时,是否启用磁盘缓存。)
diskSpoolBufferSizeMB : 这个参数设置DiskStore(磁盘缓存)的缓存区大小。默认是30MB。每个Cache都应该有自己的一个缓冲区。
diskPersistent="false" : 磁盘缓存在JVM重新启动时是否保持(默认为false)硬盘持久化
timeToIdleSeconds="0" : 导致元素过期的访问间隔(秒为单位),即当缓存闲置n秒后销毁。 当eternal为false时,这个属性才有效,0表示可以永远空闲,默认为0
timeToLiveSeconds="0" : 元素在缓存里存在的时间(秒为单位),即当缓存存活n秒后销毁. 0 表示永远存在不过期
memoryStoreEvictionPolicy="LFU" : 当达到maxElementsInMemory时,如何强制进行驱逐默认使用"最近使用(LRU)"策略,其它还有先入先出FIFO,最少使用LFU,较少使用LRU
diskExpiryThreadIntervalSeconds :磁盘失效线程运行时间间隔,默认是120秒。
clearOnFlush : 内存数量最大时是否清除。 cacheEventListenerFactory : 给缓存添加监听
replicateAsynchronously=true : 异步的方式
replicatePuts=true,replicateUpdates=true,replicateUpdatesViaCopy=true,replicateRemovals= true : 在put,update,copy,remove操作是否复制
cationIntervalMillis=1000 : 同步时间1s
bootstrapCacheLoaderFactory 启动是指一启动就同步数据
* @return
*/
public static String cacheParametersCollection(){ String listParameters = "[" ; /** 排班:列表详情 **/
listParameters += "{'cacheName':'schedule_listPatrol','maxEntriesLocalHeap':'0'}"; /** APP:用户当月的排班 **/
listParameters += "{'cacheName':'owner_schedule','maxEntriesLocalHeap':'0'}"; /**考勤统计 :echarts图表相关数据**/
listParameters += "{'cacheName':'attendance_findByOrgIds','maxEntriesLocalHeap':'0'}";
/**考勤统计 :主界面表格相关的数据**/
listParameters += "{'cacheName':'attendance_findByOrgIdsAndPage','maxEntriesLocalHeap':'0','timeToIdleSeconds':'0'}"; /** 机构 **/
listParameters += "{'cacheName':'org_findAll','maxEntriesLocalHeap':'0'}"; /** 用户信息 key=userId **/
listParameters += "{'cacheName':'list_userInfo','maxEntriesLocalHeap':'0'}"; /** APP 勤务圈 **/
listParameters += "{'cacheName':'app_message','maxElementsInMemory':'0','maxEntriesLocalHeap':'0'}"; /** 报警,最近1km内的用户 **/
listParameters += "{'cacheName':'rescue_users','maxElementsInMemory':'0','maxEntriesLocalHeap':'0'}"; /** 报警,最近1km内的用户 **/
listParameters += "{'cacheName':'app_message_typeName','maxElementsInMemory':'0','maxEntriesLocalHeap':'0'}"; listParameters += "]"; return listParameters;
} /**
* 添加成员发现: //主机ip+端口号/
* @return
*/
public static List<String> cacheManagerPeerProviderCollection(){
Properties pro = initRmiUrlsProperties(); List<String> ip = new ArrayList<>();
try {
ip.add(pro.get("machine1").toString());
ip.add(pro.get("machine2").toString());
} catch (Exception e) {
logger.error("Fail to provider cacheManagerPeer. config file [{}], message:{}", EHCACHE_PROPERTIES, e.getMessage());
} InetAddress ia;
try {
ia = InetAddress.getLocalHost();
String localip = ia.getHostAddress();
for (int i = 0; i < ip.size(); i++) {
/** 过滤本机ip **/
if (localip.equalsIgnoreCase(ip.get(i))) {
ip.remove(i);
}
}
} catch (UnknownHostException Host) {
Host.printStackTrace();
logger.error("Unknown to host Address. config file [{}], message:{}", EHCACHE_PROPERTIES, Host.getMessage());
} List<String> peer = new ArrayList<>();
for (int j = 0; j < ip.size(); j++) {
peer.add("//" + ip.get(j) + ":" + pro.getProperty("rmiPortNumber").toString());
} return peer;
} public static String initRmiURLs(){
String rmiUrls = "";
String listParameters = cacheParametersCollection();
JSONArray array = initCacheParameters(listParameters);
for (Iterator<Object> iterator = array.iterator(); iterator.hasNext();) {
JSONObject obj = (JSONObject)iterator.next();
String cacheName = obj.get("cacheName").toString();
List<String> peer = cacheManagerPeerProviderCollection();
for (String list:peer) {
rmiUrls += list + cacheName + "|";
}
}
if (!"".equals(rmiUrls)) {
rmiUrls = rmiUrls.substring(0,rmiUrls.length()-1);
}
return rmiUrls;
} public static JSONArray initCacheParameters(String listParameters){
JSONArray array = null;
try {
array = JSONArray.parseArray(listParameters);
} catch (Exception e) {
logger.error("Fail to init The cache parameters. message:{}", e.getMessage());
}
return array;
} public static Properties initRmiUrlsProperties(){
InputStream resourcesStream = EhcacheConfig.class.getClassLoader().getResourceAsStream(EHCACHE_PROPERTIES);
Properties pro = new Properties();
try {
pro.load(resourcesStream);
} catch (IOException e) {
logger.error("Fail to load config. config file [{}], message:{}", EHCACHE_PROPERTIES, e.getMessage());
}
return pro;
} /**
* @param listPatrolParameters 缓存参数JSON数组
* @return 缓存的集合
*/
@SuppressWarnings("deprecation")
public static List<Cache> cacheConf(String listParameters){ List<Cache> listCache = new ArrayList<>();
JSONArray array = initCacheParameters(listParameters);
for (Iterator<Object> iterator = array.iterator(); iterator.hasNext();) {
JSONObject obj = (JSONObject)iterator.next(); String cacheName = obj.get("cacheName").toString();
String maxElementsInMemory = obj.getString("maxElementsInMemory");
String maxEntriesLocalHeap = obj.getString("maxEntriesLocalHeap");
String timeToLiveSeconds = obj.getString("timeToLiveSeconds");
String timeToIdleSeconds = obj.getString("timeToIdleSeconds"); RMICacheReplicatorFactory rmi = new RMICacheReplicatorFactory();
Properties pro = initRmiUrlsProperties();
rmi.createCacheEventListener(pro); CacheConfiguration cacheConfiguration = new CacheConfiguration(cacheName,StringUtil.isNull(maxEntriesLocalHeap)?MAX_CACHE:Integer.parseInt(maxEntriesLocalHeap))
.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LFU)
.maxElementsInMemory(StringUtil.isNull(maxElementsInMemory)?MAX_CACHE:Integer.parseInt(maxElementsInMemory))
.overflowToDisk(true)
.eternal(false)
.timeToLiveSeconds(StringUtil.isNull(timeToLiveSeconds)?TIME_TOLIVE_SECONDS:Integer.parseInt(timeToLiveSeconds))
.timeToIdleSeconds(StringUtil.isNull(timeToIdleSeconds)?TIME_TOIDLE_SECONDS:Integer.parseInt(timeToIdleSeconds))
.diskPersistent(false)
.diskExpiryThreadIntervalSeconds(DISK_EXPIRY_Thread_INTERVAL_SENCONDS)
.clearOnFlush(true)
.cacheEventListenerFactory(new CacheConfiguration.CacheEventListenerFactoryConfiguration().className(RMICacheReplicatorFactory.class.getName()));
Cache cache = new Cache(cacheConfiguration); listCache.add(cache);
} return listCache;
}
}

properties文件:

 #RMICacheReplicatorFactory properties
replicateAsynchronously=true
replicatePuts=true
replicatePutsViaCopy=false
replicateUpdates=true
replicateUpdatesViaCopy=false
replicateRemovals=true
asynchronousReplicationIntervalMillis=1000
asynchronousReplicationMaximumBatchSize=1000 #RMI URLs
machine1=//主机ip+端口号/ #RMI port
rmiPortNumber=8010

到这里就注入好了缓存名为listParameters里面cacheName的所有缓存,如果后续添加或修改缓存,只需更改listParameters的相关属性,如果在集群环境,也只需在porperties文件中添加machine..配置即可。

2,基于注解方式的缓存使用:

1,为方便重用所有缓存建议在service层使用,当方法第一次执行时将返回值以key-value对象写进缓存,之后在执行该方法时,如果缓存的condition满足则直接取缓存返回,实际上方法都不会进!

2,在修改或添加方法使用@CachePut,查询方法使用@Cacheable,删除方法使用@CacheEvict,注意:缓存一定要配合使用,例如一个list方法将相应值缓存起来,如果有针对该值CUD操作时,一定要将新的返回值@CachePut,否则会出现数据脏读的情况!如果更新或修改方法的返回值与list不相同即该缓存@CacheEvict,否则会报数据映射错误!

3,注解说明:

@Cacheable (value="缓存值/SpEL/不填默认返回值", key="缓存key/SpEL") : 应用到读取数据的方法上,即可缓存的方法,如查找方法:先从缓存中读取,如果没有再调用方法获取数据,然后把数据添加到缓存中。如图:

基于spring4.0配置分布式ehcache,以及相关使用

@CachePut 应用到写数据的方法上,如新增/修改方法,调用方法时会自动把相应的数据放入缓存: 如图:

基于spring4.0配置分布式ehcache,以及相关使用

@CacheEvict: 移除数据,如图:

基于spring4.0配置分布式ehcache,以及相关使用

@Caching 组合多个Cache注解使用,如图:

基于spring4.0配置分布式ehcache,以及相关使用

注解参数说明:

value:必须指定,其表示当前方法的返回值是会被缓存在哪个Cache上的,对应Cache的名称, listparamentes中的cacheName

key:通过Spring的EL表达式来指定我们的key。这里的EL表达式可以使用方法参数及它们对应的属性。使用方法参数时我们可以直接使用“#参数名”或者“#p参数index。我们统一采用方法的参数做唯一key,没有参数不写!

condition:有的时候我们可能并不希望缓存一个方法所有的返回结果,condition属性默认为空,表示将缓存所有的调用情形。其值是通过SpringEL表达式来指定的,当为true时表示进行缓存处理;当为false时表示不进行缓存处理,即每次调用该方法时该方法都会执行一次。如下示例表示只有当user的id为偶数时才会进行缓存

 @Cacheable(value="users",key="#user.id",condition="#user.id%2==0")
public User find(User user) {
System.out.println("find user by user " + user);
return user;
}

3,基于程序代码方式的缓存相关使用:

当注解不能完全满足需求,或需要在程序代码中实现动态操作时,就需要对ehcache实现相关封装,从而现对缓存手动进行增删改。可以考虑写成util,我这里是以service的形式现实的封装及相关调用,仅供参考。

 package com.cshtong.tower.service;

 import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List; import javax.annotation.Resource; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject;
import com.cshtong.tower.model.MessageType;
import com.cshtong.tower.model.User;
import com.cshtong.tower.repository.MessageTypeRepository; import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element; @Service
public class EhcacheServiceImpl implements EhcacheService{ private static Logger logger = LoggerFactory.getLogger(EhcacheServiceImpl.class); @Resource
private UserService userSerivce; @Resource
private MessageTypeRepository messageTypeRepository; public static CacheManager cacheManager(){ CacheManager cm = null;
try {
cm = CacheManager.newInstance();
} catch (UncheckedIOException e) {
logger.error("Fail to load config, message:{}", e.getMessage());
}
return cm;
} /**
* key可以为空
*/
@SuppressWarnings({"deprecation" })
@Override
public com.cshtong.tower.model.Ehcache findCache(String cacheName, String key) {
com.cshtong.tower.model.Ehcache eh = null;
if (null == key) {
Ehcache cache = cacheManager().getEhcache(cacheName);
eh = new com.cshtong.tower.model.Ehcache();
eh.setCacheName(cache.getName());
List<String> listKey = new ArrayList<>();
for (int j = 0; j < cache.getKeys().size(); j++) {
listKey.add(cache.getKeys().get(j).toString());
}
eh.setKeys(listKey);
eh.setSize(cache.getSize());
/*eh.setHitrate(cache.getStatistics().cacheHitRatio());*/
eh.setDiskStoreSize(cache.getDiskStoreSize());
eh.setMemoryStoreSize(cache.getMemoryStoreSize());
eh.setStatus(cache.getStatus().toString());
}else{
Element el = cacheManager().getEhcache(cacheName).get(key);
if (el == null) {
el = cacheManager().getEhcache(cacheName).get(Integer.parseInt(key));
}
eh = new com.cshtong.tower.model.Ehcache();
eh.setKeyHit(el.getHitCount());
eh.setLastUpdateTime(el.getLastUpdateTime());
eh.setValues(el.getValue());
}
return eh;
} @SuppressWarnings("deprecation")
@Override
public List<com.cshtong.tower.model.Ehcache> listAllEhcahce() {
List<com.cshtong.tower.model.Ehcache> list = new ArrayList<>();
String[] cache = cacheManager().getCacheNames();
List<Ehcache> cachelist = new ArrayList<>();
for (int i = 0; i < cache.length; i++) {
Ehcache c = cacheManager().getEhcache(cache[i]);
cachelist.add(c);
}
for (int i = 0; i < cachelist.size(); i++) {
com.cshtong.tower.model.Ehcache eh = new com.cshtong.tower.model.Ehcache();
eh.setCacheName(cachelist.get(i).getName());
List<String> listKey = new ArrayList<>();
for (int j = 0; j < cachelist.get(i).getKeys().size(); j++) {
listKey.add(cachelist.get(i).getKeys().get(j).toString());
}
eh.setKeys(listKey);
eh.setSize(cachelist.get(i).getSize());
eh.setStatus(cachelist.get(i).getStatus().toString());
eh.setMemoryStoreSize(cachelist.get(i).getMemoryStoreSize());
eh.setDiskStoreSize(cachelist.get(i).getDiskStoreSize());
list.add(eh);
}
return list;
} /**
* 获取缓存
* @param cacheName
* @param key
* @return json字符串
*/
@Override
public String getCache(String cacheName, Object key) {
logger.debug("Getting Cache that name is " + cacheName + "and key is" + key);
Element el = cacheManager().getCache(cacheName).get(key);
return JSONObject.toJSONString(el.getObjectValue());
} @Override
public Cache getCache(String name) {
logger.debug("Getting Cache that name is " + name);
return cacheManager().getCache(name);
} /**
* 获取所有的缓存
* @param names
* @return
*/
@Override
public String[] getCacheNames() {
logger.debug("All of the Cache is " + cacheManager().getCacheNames());
return cacheManager().getCacheNames();
} @Override
public void update(String cacheName, Object key, Object value) {
try {
remove(cacheName, key);
put(cacheName, key, value);
} catch (Exception e) {
logger.debug("Fail to update the Cache,which is " + cacheManager().getCacheNames());
}
} @Override
public void put(String cacheName, Object key, Object value) {
logger.debug("add Cache is " + cacheManager().getCacheNames() + ",and key is " + key + ",and value is" + value);
Element el = new Element(key, value);
cacheManager().getCache(cacheName).put(el);
} @Override
public boolean ishasCache(String cacheName, Object key) {
boolean rs = false;
Element value = cacheManager().getCache(cacheName).get(key);
if (null != value) {
rs = true;
}
return rs;
} /**
* 根据缓存名清除缓存key value
* @param name
*/
@Override
public void evictName(String name) {
logger.debug("delete Cache that name is " + name);
cacheManager().getCache(name).removeAll();
} /**
* 根据缓存名对应的key清除缓存
*/
@Override
public void remove(String cacheName, Object key) {
logger.debug("Delete Cache that Name is "+ cacheName +"and key is " + key );
Cache cache = cacheManager().getCache(cacheName);
cache.remove(key);
} /**
* 清除当前cacheManager的所有缓存
*/
@Override
public void clear() {
logger.debug("clear all cache!!");
cacheManager().clearAll();
} }

以上愚见,只是个人的理解,仅供参考。如有不对的地方,欢迎指正批评....