使用场景

常有一些使用场景,比如操作远程服务失败,需要在指定时间后重试。业务方面,比如有订单在X分钟后没有完成操作则变更状态等需求,这里使用RedisTemplete,利用expired消息实现了一个基于Redis在指时间后进行操作的服务实现。

redis 远程访问

redis-cli -h {redis_host} -p {redis_port}
配置
修改Redis配置文件/etc/redis/redis.conf,找到bind那行配置:

bind 127.0.0.1
bind 0.0.0.0

指定配置文件然后重启Redis服务即可:

$ sudo redis-server /etc/redis/redis.conf

键空间通知

这里需要配置 notify-keyspace-events 的参数为 “Ex”。x 代表了过期事件。 notify-keyspace-events "Ex"

添加键值事件订阅
127.0.0.1:6379> psubscribe __keyevent@0__:expired

实现思路

  • 定义一个TimeKey表示该字段已重试的次数,在开始尝试重试操作的时候校验重试次数是否达到上限,并确定下次重试时间,目前实现是以1s,10s,1min,10min,1h,3h,6h的间隔重试。
  • 定义一个EntityKey表示该重试传入的实体,这里使用了Json序列化实体,当然也可以用其他的方法序列化,可以通过RedisSerializer<T>接口实现序列化与反序列化过程,并在使用RedisTemplete或者初始化时候setKeySerializer(RedisSerializer<?> serializer)setValueSerializer(RedisSerializer<?> serializer)
  • 定义一个NotifyKey表示订阅过期事件的key,当接受到该key过期事件的时候,检索EntityKey并分发到指定的RetryImplement。

配置redisconfig

要监听过期事件,需要配置RedisMessageListenerContainer

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.Set;
/**
* Created by Sequarius on 2016/9/7.
*/
@Configuration
public class RedisConfig {
@Resource
private JedisConnectionFactory jedisConnectionFactory;
@Resource
private ExpiresMessageController expiresMessageController;
@Bean
public StringRedisSerializer stringRedisSerializer(){
return new StringRedisSerializer();
}
@Bean
public GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer(){
return new GenericJackson2JsonRedisSerializer();
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory);
container.setConnectionFactory(jedisConnectionFactory);
Set<Topic> topics = new HashSet<Topic>() {{
add(new ChannelTopic("__keyevent@0__:expired"));
}};
container.addMessageListener(expiresMessageController, topics);
return container;
}
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory,
StringRedisSerializer serializer,
GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer
) {
RedisTemplate template = new RedisTemplate();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(serializer);
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}

实现MessageListener接口

MessageListener是接受message的一个回调接口,在public void onMessage(Message message, byte[] bytes)中回调接收到的消息,message有两个属性,一个是’byte[]’的MessageBody,一个也是byte[]的MessageChannel,该属性表示该message发生的时间频段,在这个项目下为"__keyevent@0__:expired",即为0号库的过期事件。

这里使用了类名+Tag的方式标注一个unique的重试实体,在分发过程中检索对应的RetryListener初始化传入的类名进行消息分发。

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* Created by Sequarius on 2016/9/7.
*/
@Component
public class ExpiresMessageController implements MessageListener {
@Resource
private RedisTemplate redisTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
private Map<String, SubMessageListener> subMessageListeners;
@PostConstruct
void init() {
subMessageListeners = new HashMap<>();
}
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] bodyBytes = message.getBody();
String notifyKey = (String) stringRedisTemplate.getValueSerializer().deserialize(bodyBytes);
//no subscribe for us
if (!notifyKey.startsWith(Constant.PREFIX_PAYMENT_KEY)) {
return;
}
String subScribeObjName = notifyKey.replaceAll(Constant.PREFIX_PAYMENT_KEY, "");
// Object o = redisTemplate.opsForValue().get(Constant.PREFIX_PAYMENT_ENTITY_KEY + subScribeObjName);
String[] objInfo = subScribeObjName.split(":");
String className = objInfo[0];
String tag=objInfo[1];
log.debug("getmessage=={}", notifyKey);
log.debug("className=={}", className);
try {
Class target=Class.forName(className);
redisTemplate.setValueSerializer(new FastJsonSerializer<>(target));
Object o = redisTemplate.opsForValue().get(Constant.PREFIX_PAYMENT_ENTITY_KEY + subScribeObjName);
subMessageListeners.get(className).onMessage(o,tag);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public <E,T> void addSubMessageListener(Class<?> clazz, SubMessageListener<E,T> subMessageListener) {
log.debug("puted:{}",clazz.getName());
subMessageListeners.put(clazz.getName(), subMessageListener);
}
}

实现序列化接口

由于项目使用的FastJson,就用FastJson实现了一个简单的序列化与反序列化,当然在初始化传入了class,以便在反序列化时使用

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import java.nio.charset.Charset;
/**
* Created by Sequarius on 2016/9/8.
*/
@Slf4j
public class FastJsonSerializer<T> implements RedisSerializer<T> {
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private Class<T> targetClass;
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
try {
return JSON.toJSONString(t, SerializerFeature.WriteClassName)
.getBytes(DEFAULT_CHARSET);
} catch (Exception e) {
log.debug(e.getMessage(), e);
return new byte[0];
}
}
public FastJsonSerializer(Class<T> targetClass) {
this.targetClass = targetClass;
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length <= 0) {
return null;
}
String jsonStr = new String(bytes, DEFAULT_CHARSET);
try {
return JSON.parseObject(jsonStr, targetClass);
} catch (Exception e) {
log.debug(e.getMessage(), e);
return null;
}
}
}

SubMessageListener

自定义的一个消息分发,分发给指定的订阅任务,而避免了全局的广播

public interface SubMessageListener<E,T> {
void onMessage(E entity,T tag);
}

BaseRetry

定义了一个基础的retry方法,提供了尝试重试等基础方法的实现,实现类自需要定义setMessageListener()并在初始化后调用便可以接收到指定Entity的消息。

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Created by Sequarius on 2016/9/7.
*/
@Slf4j
public abstract class BaseRetry<E, T> {
@Resource
private ExpiresMessageController expiresMessageController;
@Resource
private RedisTemplate redisTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
public Class<E> clazz;
// 重试尝试间隔为1s,10s,1min,10min,1h,3h,6h一次递增
private final List<Long> RETRY_TABLE = new ArrayList<Long>() {
{
add(0L);
add(1L);
add(10L);
add(60L);
add(10 * 60L);
add(60 * 60L);
add(3 * 60 * 60L);
add(6 * 60 * 60L);
}
};
public BaseRetry(Class<E> clazz) {
this.clazz = clazz;
}
public void retry(E entity, T tag) {
RetryEntity retryEntity = new RetryEntity(entity, tag).invoke();
String timeKey = retryEntity.getTimeKey();
String entryKey = retryEntity.getEntryKey();
StringBuffer entityTag = retryEntity.getEntityTag();
String memoryValue = stringRedisTemplate.opsForValue().get(timeKey);
int lastRetryTime = (memoryValue == null) ? 0 : Integer.valueOf(memoryValue);
//first Retry save entity
if (lastRetryTime == 0) {
redisTemplate.setDefaultSerializer(new FastJsonSerializer<>(clazz));
redisTemplate.opsForValue().set(entryKey, entity);
}
//max Retry delete entity
if (lastRetryTime >= RETRY_TABLE.size() - 1) {
log.warn("retry in max times entityTag=={}", entityTag.toString());
log.warn("retry entity=={}",stringRedisTemplate.opsForValue().get(entryKey));
redisTemplate.delete(entryKey);
stringRedisTemplate.delete(timeKey);
throw new RetryTimeOutOfRangeException();
}
log.debug("time key={};entrykey={};lastRetryTime={}", timeKey, entryKey, lastRetryTime);
//add 1 times
stringRedisTemplate.opsForValue().increment(timeKey, 1);
stringRedisTemplate.opsForValue().set(entityTag.insert(0, Constant.PREFIX_PAYMENT_KEY).toString(),
"notify_key", RETRY_TABLE.get(lastRetryTime + 1), TimeUnit.SECONDS);
}
//
protected void addListenner(Class<?> clazz, SubMessageListener listener) {
expiresMessageController.addSubMessageListener(clazz, listener);
}
public abstract void setMessageListener();
public void retrySuccess(E entity, T tag){
RetryEntity retryEntity = new RetryEntity(entity, tag).invoke();
redisTemplate.delete(retryEntity.getTimeKey());
redisTemplate.delete(retryEntity.getEntryKey());
}
private class RetryEntity {
private E entity;
private T tag;
private StringBuffer entityTag;
private String timeKey;
private String entryKey;
public RetryEntity(E entity, T tag) {
this.entity = entity;
this.tag = tag;
}
public StringBuffer getEntityTag() {
return entityTag;
}
public String getTimeKey() {
return timeKey;
}
public String getEntryKey() {
return entryKey;
}
public RetryEntity invoke() {
entityTag = new StringBuffer(entity.getClass().getName())
.append(":").append(tag.toString());
//retry times key
timeKey = new StringBuffer(Constant.PREFIX_PAYMENT_TIME_KEY).append(entityTag).toString();
//retry entity key
entryKey = new StringBuffer(Constant.PREFIX_PAYMENT_ENTITY_KEY).append(entityTag).toString();
return this;
}
}
}

一个简单的http失败重试示例

为了简单起见,这里使用了String作为Entity,通常建议使用自定义bean作为Entity

import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
/**
* Created by Sequarius on 2016/9/12.
*/
@Component
@Slf4j
public class HttpGetRequestRetry extends BaseRetry<String, String> {
@Resource
OkHttpClient okHttpClient;
public HttpGetRequestRetry() {
super(String.class);
}
@Override
@PostConstruct
public void setMessageListener() {
addListenner(String.class, new SubMessageListener<String, String>() {
@Override
public void onMessage(String url, String tag) {
makeRequest(url, tag);
}
});
}
public void makeRequest(String url, String tag) {
Request request = new Request.Builder().url(url).get().build();
try {
Response response = okHttpClient.newCall(request).execute();
String body = response.body().string();
int responseCode = response.code();
if (responseCode != HttpStatus.OK.value() || !body.equals("SUCCESS")) {
log.warn("call back url={} ,response code={},body={},fail, try again ", url, responseCode, body);
this.retry(url, tag);
}else{
log.debug("call back url={} ,response code={},body={},ok ", url, responseCode, body);
retrySuccess(url,tag);
}
} catch (IOException e) {
log.debug(e.getMessage(), e);
try {
this.retry(url, tag);
} catch (RetryTimeOutOfRangeException e1) {
log.error(e1.getMessage(), e);
}
}
}
}

至此,对于使用HttpGetRequestRetrymakeRequest(String url, String tag)的Http请求便可以在失败时完成重试了。