面试:MQ之外的延迟队列实现方案
面试:MQ之外的延迟队列实现方案
面试被问到除了rabbitmq之类的消息队列,还有什么延迟队列的实现方案,所以今天写一下简单的代码,加深一下印象。
Redis实现延迟队列
实现思路
Redis中的ZSet结构,为每个元素维护了score属性和value属性,并且提供了根据范围查找元素的命令。我们可以将任务执行时间的时间戳作为score,将队列消息作为value,然后将任务存入Redis中,接下来再开启一个定时任务,周期性的轮询Redis,获取时间戳小于当前时间戳的任务,取出后执行任务,执行完毕后再删除任务。
详细步骤
创建任务对象
import lombok.Data;
import java.io.Serializable;
/**
* @author lsl
*/
@Data
public class DelayMessage implements Serializable {
private static final long serialVersionUID = 4513843422294138196L;
/**
* 消息ID
*/
private Integer messageId;
/**
* 消息内容
*/
private String content;
/**
* 消息消费时间,时间戳格式,单位:毫秒
*/
private Long expireTime;
}
创建自定义消息队列,提供添加任务、获取任务、删除任务等方法。
import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author lsl
*/
@Component
public class CustomDelayQueue {
private static final String KEY = "delay_queue";
public static final Gson GSON = new Gson();
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void addMessage(DelayMessage message) {
redisTemplate.opsForZSet().add(KEY, GSON.toJson(message), message.getExpireTime());
}
public void removeMessage(DelayMessage message) {
redisTemplate.opsForZSet().remove(KEY, GSON.toJson(message));
}
public List<DelayMessage> getExpireMessage() {
long startMills = 0;
long endMills = System.currentTimeMillis();
Set<Object> messageSet = redisTemplate.opsForZSet().rangeByScore(KEY, startMills, endMills);
if (messageSet == null || messageSet.isEmpty()) {
return Collections.emptyList();
}
return messageSet.stream().map(item -> GSON.fromJson(item.toString(), DelayMessage.class)).collect(Collectors.toList());
}
}
接下来开启定时任务轮询Redis
@Scheduled(cron = "0/1 * * * * *")
public void handlerMsgSingle() {
List<DelayMessage> expireMessage = getDelayMessages();
if (expireMessage.isEmpty()) {
return;
}
for (DelayMessage message : expireMessage) {
if (message == null) {
return;
}
try {
log.info("开始处理消息 -> {}", message);
Thread.sleep(1000);
log.info("消息消费结束 -> {}", message);
delayQueue.removeMessage(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private List<DelayMessage> getDelayMessages() {
SimpleDateFormat dateFormat = new SimpleDateFormat(DATETIME_PATTERN);
String currTime = dateFormat.format(new Date());
List<DelayMessage> expireMessage = delayQueue.getExpireMessage();
log.info("{}时刻消息数量: {}", currTime, expireMessage.size());
return expireMessage;
}
执行结果
2024-07-10 11:01:41.014 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 11:01:41时刻消息数量: 4
2024-07-10 11:01:41.014 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 开始处理消息 -> DelayMessage(messageId=96, content=content:96, expireTime=1720580500754)
2024-07-10 11:01:42.023 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 消息消费结束 -> DelayMessage(messageId=96, content=content:96, expireTime=1720580500754)
2024-07-10 11:01:42.031 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 开始处理消息 -> DelayMessage(messageId=135, content=content:135, expireTime=1720580500754)
2024-07-10 11:01:43.042 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 消息消费结束 -> DelayMessage(messageId=135, content=content:135, expireTime=1720580500754)
2024-07-10 11:01:43.043 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 开始处理消息 -> DelayMessage(messageId=813, content=content:813, expireTime=1720580500754)
2024-07-10 11:01:44.046 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 消息消费结束 -> DelayMessage(messageId=813, content=content:813, expireTime=1720580500754)
2024-07-10 11:01:44.048 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 开始处理消息 -> DelayMessage(messageId=881, content=content:881, expireTime=1720580500754)
2024-07-10 11:01:45.049 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 消息消费结束 -> DelayMessage(messageId=881, content=content:881, expireTime=1720580500754)
2024-07-10 11:01:46.004 INFO 10212 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 11:01:46时刻消息数量: 0
从日志来看消息不断被消费,但是也有一个问题,因为定时任务是单线程轮询的,所以如果上一个任务没执行完毕,会影响下一个任务的执行时间,如果消息比较多就会造成消息堆积和消费时间不准确的问题。所以可以引入线程池来提高消费消息的能力,每一个消息都单独开一个线程去消费。
@Scheduled(cron = "0/1 * * * * *")
public void handlerMessage() {
List<DelayMessage> expireMessage = getDelayMessages();
if (expireMessage.isEmpty()) {
return;
}
for (DelayMessage message : expireMessage) {
executor.execute(() -> messageService.processMessageNoLock(message));
}
}
@Async("executor")
public void processMessageNoLock(DelayMessage message) {
if (message == null) {
return;
}
try {
log.info("开始处理消息 -> {}", message);
Thread.sleep(1000);
log.info("消息消费结束 -> {}", message);
delayQueue.removeMessage(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
执行结果
2024-07-10 11:19:38.013 INFO 23944 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 11:19:38时刻消息数量: 4
2024-07-10 11:19:38.028 INFO 23944 --- [ MyExecutor-3] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:38.028 INFO 23944 --- [ MyExecutor-4] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:38.028 INFO 23944 --- [ MyExecutor-2] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:38.028 INFO 23944 --- [ MyExecutor-1] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:39.009 INFO 23944 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 11:19:39时刻消息数量: 4
2024-07-10 11:19:39.039 INFO 23944 --- [ MyExecutor-4] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:39.039 INFO 23944 --- [ MyExecutor-3] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:39.039 INFO 23944 --- [ MyExecutor-2] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:39.039 INFO 23944 --- [ MyExecutor-1] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:39.049 INFO 23944 --- [ MyExecutor-2] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:39.049 INFO 23944 --- [ MyExecutor-3] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:39.049 INFO 23944 --- [ MyExecutor-1] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:39.049 INFO 23944 --- [ MyExecutor-4] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:40.016 INFO 23944 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 11:19:40时刻消息数量: 0
2024-07-10 11:19:40.060 INFO 23944 --- [ MyExecutor-1] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:40.060 INFO 23944 --- [ MyExecutor-3] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:40.060 INFO 23944 --- [ MyExecutor-4] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:40.060 INFO 23944 --- [ MyExecutor-2] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:41.014 INFO 23944 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 11:19:41时刻消息数量: 0
从日志中可以看到消费消息的能力提高了,但是又出现了新的问题,消息被重复消费了,因为在多线程的环境下A线程在消费消息的过程中,B线程并不知道A线程在消费了,所以B线程又去获取了消息进行消费。为此,可以使用setnx来解决,在消费消息之前先去获取锁,如果获取不到,则代表有其他线程在消费了,那么当前线程就不要处理这条消息了。代码如下:
@Scheduled(cron = "0/1 * * * * *")
public void handlerMessage() {
List<DelayMessage> expireMessage = getDelayMessages();
if (expireMessage.isEmpty()) {
return;
}
for (DelayMessage message : expireMessage) {
executor.execute(() -> messageService.processMessageWithLock(message));
}
}
@Async("executor")
public void processMessageWithLock(DelayMessage message) {
try {
Boolean isProcessed = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY + message.getMessageId(), 0);
if (Boolean.TRUE.equals(isProcessed)) {
log.info("开始处理消息 -> {}", message);
Thread.sleep(1000);
log.info("消息消费结束 -> {}", message);
delayQueue.removeMessage(message);
}
} catch (Exception e) {
log.error("{}消费异常, {}", message, e);
}
}
执行结果:
2024-07-10 14:11:34.025 INFO 29096 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 14:11:34时刻消息数量: 4
2024-07-10 14:11:34.059 INFO 29096 --- [ MyExecutor-1] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=754, content=content:754, expireTime=1720591893982)
2024-07-10 14:11:34.059 INFO 29096 --- [ MyExecutor-2] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=164, content=content:164, expireTime=1720591893982)
2024-07-10 14:11:34.059 INFO 29096 --- [ MyExecutor-3] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=641, content=content:641, expireTime=1720591893982)
2024-07-10 14:11:34.060 INFO 29096 --- [ MyExecutor-4] com.lsl.service.MessageService : 开始处理消息 -> DelayMessage(messageId=264, content=content:264, expireTime=1720591893982)
2024-07-10 14:11:35.016 INFO 29096 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 14:11:35时刻消息数量: 4
2024-07-10 14:11:35.075 INFO 29096 --- [ MyExecutor-2] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=164, content=content:164, expireTime=1720591893982)
2024-07-10 14:11:35.075 INFO 29096 --- [ MyExecutor-3] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=641, content=content:641, expireTime=1720591893982)
2024-07-10 14:11:35.075 INFO 29096 --- [ MyExecutor-4] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=264, content=content:264, expireTime=1720591893982)
2024-07-10 14:11:35.075 INFO 29096 --- [ MyExecutor-1] com.lsl.service.MessageService : 消息消费结束 -> DelayMessage(messageId=754, content=content:754, expireTime=1720591893982)
2024-07-10 14:11:36.010 INFO 29096 --- [ scheduling-1] com.lsl.handler.DelayMessageHandler : 2024-07-10 14:11:36时刻消息数量: 0
从日志看解决了消息的重复消费问题,不过还是存在以下问题:
- 某条消息在消费的过程中服务宕机了,那么服务重启后,redis中的锁并没有释放,这样会导致这条消息无法完成消费
- 随着时间的累积,消费的消息越来越多,会占用redis资源
不过这里只是写个简单的示例代码,就不往下深挖了,后面看看有没有时间吧。
JDK自带延迟队列
JDK自带了一个DelayQueue,通过实现Delayed接口来实现延迟队列,详细代码如下:
任务对象代码:
import lombok.Data;
import java.io.Serializable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* JDK自带DelayQueue任务对象
*@author lsl
*/
@Data
public class DelayTask implements Delayed, Serializable {
private static final long serialVersionUID = -400765043663885765L;
private DelayMessage delayMessage;
/**
* 获取任务剩余有效时间
* @param unit the time unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
long remainTime = this.delayMessage.getExpireTime() - System.currentTimeMillis();
if (remainTime < 0) {
remainTime = 0;
}
return remainTime;
}
/**
* 比较任务剩余有效时间
* @param o the object to be compared.
* @return
*/
@Override
public int compareTo(Delayed o) {
long remainTime = this.delayMessage.getExpireTime() - o.getDelay(TimeUnit.MILLISECONDS);
if (remainTime > 0) {
return 1;
}
if (remainTime < 0) {
return -1;
}
return 0;
}
}
队列服务类:
import com.lsl.model.DelayMessage;
import com.lsl.model.DelayTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
/**
* @author lsl
*/
@Service
@Slf4j
public class DelayTaskService {
@Autowired
Executor executor;
@Autowired
MessageService messageService;
public void handlerDelayTask() {
DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
Calendar instance = Calendar.getInstance();
Random random = new Random(System.currentTimeMillis());
instance.add(Calendar.SECOND, 30);
for (int i = 0; i < 3; i++) {
int msgId = random.nextInt(1000);
DelayMessage delayMessage = new DelayMessage();
delayMessage.setMessageId(msgId);
delayMessage.setContent("delay task:" + msgId);
delayMessage.setExpireTime(instance.getTime().getTime());
DelayTask task = new DelayTask();
task.setDelayMessage(delayMessage);
delayQueue.put(task);
}
log.info("消息发送完成");
while (true) {
try {
DelayTask task = delayQueue.take();
executor.execute(() -> {
messageService.processDelayTask(task);
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public void processDelayTask(DelayTask task) {
log.info("开始处理延迟任务 -> {}", task);
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("{}消费异常, {}", task, e);
}
log.info("处理延迟任务结束 -> {}", task);
}
执行结果:
2024-07-10 14:24:22.489 INFO 6088 --- [ MyExecutor-2] com.lsl.service.MessageService : 开始处理延迟任务 -> DelayTask(delayMessage=DelayMessage(messageId=598, content=delay task:598, expireTime=1720592662473))
2024-07-10 14:24:22.489 INFO 6088 --- [ MyExecutor-3] com.lsl.service.MessageService : 开始处理延迟任务 -> DelayTask(delayMessage=DelayMessage(messageId=474, content=delay task:474, expireTime=1720592662473))
2024-07-10 14:24:22.489 INFO 6088 --- [ MyExecutor-1] com.lsl.service.MessageService : 开始处理延迟任务 -> DelayTask(delayMessage=DelayMessage(messageId=442, content=delay task:442, expireTime=1720592662473))
2024-07-10 14:24:23.491 INFO 6088 --- [ MyExecutor-1] com.lsl.service.MessageService : 处理延迟任务结束 -> DelayTask(delayMessage=DelayMessage(messageId=442, content=delay task:442, expireTime=1720592662473))
2024-07-10 14:24:23.491 INFO 6088 --- [ MyExecutor-3] com.lsl.service.MessageService : 处理延迟任务结束 -> DelayTask(delayMessage=DelayMessage(messageId=474, content=delay task:474, expireTime=1720592662473))
2024-07-10 14:24:23.491 INFO 6088 --- [ MyExecutor-2] com.lsl.service.MessageService : 处理延迟任务结束 -> DelayTask(delayMessage=DelayMessage(messageId=598, content=delay task:598, expireTime=1720592662473))
最后放一下完整代码地址:git clone https://gitee.com/lslmoney/delay-queue.git。
