面试:MQ之外的延迟队列实现方案

发布于2024-07-10
字数: 2446
Redis
DelayQueue
面试

面试:MQ之外的延迟队列实现方案

面试被问到除了rabbitmq之类的消息队列,还有什么延迟队列的实现方案,所以今天写一下简单的代码,加深一下印象。

Redis实现延迟队列

实现思路

Redis中的ZSet结构,为每个元素维护了score属性和value属性,并且提供了根据范围查找元素的命令。我们可以将任务执行时间的时间戳作为score,将队列消息作为value,然后将任务存入Redis中,接下来再开启一个定时任务,周期性的轮询Redis,获取时间戳小于当前时间戳的任务,取出后执行任务,执行完毕后再删除任务。

详细步骤

创建任务对象

java 复制代码
import lombok.Data;

import java.io.Serializable;

/**
 * @author lsl
 */
@Data
public class DelayMessage implements Serializable {

    private static final long serialVersionUID = 4513843422294138196L;

    /**
     * 消息ID
     */
    private Integer messageId;

    /**
     * 消息内容
     */
    private String content;

    /**
     * 消息消费时间,时间戳格式,单位:毫秒
     */
    private Long expireTime;
}

创建自定义消息队列,提供添加任务、获取任务、删除任务等方法。

java 复制代码
import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * @author lsl
 */
@Component
public class CustomDelayQueue {

    private static final String KEY = "delay_queue";

    public static final Gson GSON = new Gson();

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void addMessage(DelayMessage message) {
        redisTemplate.opsForZSet().add(KEY, GSON.toJson(message), message.getExpireTime());
    }

    public void removeMessage(DelayMessage message) {
        redisTemplate.opsForZSet().remove(KEY, GSON.toJson(message));
    }

    public List<DelayMessage> getExpireMessage() {
        long startMills = 0;
        long endMills = System.currentTimeMillis();
        Set<Object> messageSet = redisTemplate.opsForZSet().rangeByScore(KEY, startMills, endMills);
        if (messageSet == null || messageSet.isEmpty()) {
            return Collections.emptyList();
        }
        return messageSet.stream().map(item -> GSON.fromJson(item.toString(), DelayMessage.class)).collect(Collectors.toList());
    }
}

接下来开启定时任务轮询Redis

java 复制代码
@Scheduled(cron = "0/1 * * * * *")
public void handlerMsgSingle() {
    List<DelayMessage> expireMessage = getDelayMessages();
    if (expireMessage.isEmpty()) {
        return;
    }
    for (DelayMessage message : expireMessage) {
        if (message == null) {
            return;
        }
        try {
            log.info("开始处理消息 -> {}", message);
            Thread.sleep(1000);
            log.info("消息消费结束 -> {}", message);
            delayQueue.removeMessage(message);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

private List<DelayMessage> getDelayMessages() {
    SimpleDateFormat dateFormat = new SimpleDateFormat(DATETIME_PATTERN);
    String currTime = dateFormat.format(new Date());
    List<DelayMessage> expireMessage = delayQueue.getExpireMessage();
    log.info("{}时刻消息数量: {}", currTime, expireMessage.size());
    return expireMessage;
}

执行结果

bash 复制代码
2024-07-10 11:01:41.014  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 11:01:41时刻消息数量: 4
2024-07-10 11:01:41.014  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 开始处理消息 -> DelayMessage(messageId=96, content=content:96, expireTime=1720580500754)
2024-07-10 11:01:42.023  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 消息消费结束 -> DelayMessage(messageId=96, content=content:96, expireTime=1720580500754)
2024-07-10 11:01:42.031  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 开始处理消息 -> DelayMessage(messageId=135, content=content:135, expireTime=1720580500754)
2024-07-10 11:01:43.042  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 消息消费结束 -> DelayMessage(messageId=135, content=content:135, expireTime=1720580500754)
2024-07-10 11:01:43.043  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 开始处理消息 -> DelayMessage(messageId=813, content=content:813, expireTime=1720580500754)
2024-07-10 11:01:44.046  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 消息消费结束 -> DelayMessage(messageId=813, content=content:813, expireTime=1720580500754)
2024-07-10 11:01:44.048  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 开始处理消息 -> DelayMessage(messageId=881, content=content:881, expireTime=1720580500754)
2024-07-10 11:01:45.049  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 消息消费结束 -> DelayMessage(messageId=881, content=content:881, expireTime=1720580500754)
2024-07-10 11:01:46.004  INFO 10212 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 11:01:46时刻消息数量: 0

从日志来看消息不断被消费,但是也有一个问题,因为定时任务是单线程轮询的,所以如果上一个任务没执行完毕,会影响下一个任务的执行时间,如果消息比较多就会造成消息堆积和消费时间不准确的问题。所以可以引入线程池来提高消费消息的能力,每一个消息都单独开一个线程去消费。

java 复制代码
@Scheduled(cron = "0/1 * * * * *")
public void handlerMessage() {
    List<DelayMessage> expireMessage = getDelayMessages();
    if (expireMessage.isEmpty()) {
        return;
    }
    for (DelayMessage message : expireMessage) {
        executor.execute(() -> messageService.processMessageNoLock(message));
    }
}


@Async("executor")
public void processMessageNoLock(DelayMessage message) {
    if (message == null) {
        return;
    }
    try {
        log.info("开始处理消息 -> {}", message);
        Thread.sleep(1000);
        log.info("消息消费结束 -> {}", message);
        delayQueue.removeMessage(message);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

执行结果

bash 复制代码
2024-07-10 11:19:38.013  INFO 23944 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 11:19:38时刻消息数量: 4
2024-07-10 11:19:38.028  INFO 23944 --- [   MyExecutor-3] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:38.028  INFO 23944 --- [   MyExecutor-4] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:38.028  INFO 23944 --- [   MyExecutor-2] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:38.028  INFO 23944 --- [   MyExecutor-1] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:39.009  INFO 23944 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 11:19:39时刻消息数量: 4
2024-07-10 11:19:39.039  INFO 23944 --- [   MyExecutor-4] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:39.039  INFO 23944 --- [   MyExecutor-3] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:39.039  INFO 23944 --- [   MyExecutor-2] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:39.039  INFO 23944 --- [   MyExecutor-1] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:39.049  INFO 23944 --- [   MyExecutor-2] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:39.049  INFO 23944 --- [   MyExecutor-3] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:39.049  INFO 23944 --- [   MyExecutor-1] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:39.049  INFO 23944 --- [   MyExecutor-4] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:40.016  INFO 23944 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 11:19:40时刻消息数量: 0
2024-07-10 11:19:40.060  INFO 23944 --- [   MyExecutor-1] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=785, content=content:785, expireTime=1720581577014)
2024-07-10 11:19:40.060  INFO 23944 --- [   MyExecutor-3] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=812, content=content:812, expireTime=1720581577014)
2024-07-10 11:19:40.060  INFO 23944 --- [   MyExecutor-4] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=984, content=content:984, expireTime=1720581577014)
2024-07-10 11:19:40.060  INFO 23944 --- [   MyExecutor-2] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=524, content=content:524, expireTime=1720581577014)
2024-07-10 11:19:41.014  INFO 23944 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 11:19:41时刻消息数量: 0

从日志中可以看到消费消息的能力提高了,但是又出现了新的问题,消息被重复消费了,因为在多线程的环境下A线程在消费消息的过程中,B线程并不知道A线程在消费了,所以B线程又去获取了消息进行消费。为此,可以使用setnx来解决,在消费消息之前先去获取锁,如果获取不到,则代表有其他线程在消费了,那么当前线程就不要处理这条消息了。代码如下:

java 复制代码
@Scheduled(cron = "0/1 * * * * *")
public void handlerMessage() {
    List<DelayMessage> expireMessage = getDelayMessages();
    if (expireMessage.isEmpty()) {
        return;
    }
    for (DelayMessage message : expireMessage) {
        executor.execute(() -> messageService.processMessageWithLock(message));
    }
}


@Async("executor")
public void processMessageWithLock(DelayMessage message) {
    try {
        Boolean isProcessed = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY + message.getMessageId(), 0);
        if (Boolean.TRUE.equals(isProcessed)) {
            log.info("开始处理消息 -> {}", message);
            Thread.sleep(1000);
            log.info("消息消费结束 -> {}", message);
            delayQueue.removeMessage(message);
        }
    } catch (Exception e) {
        log.error("{}消费异常, {}", message, e);
    }
}

执行结果:

bash 复制代码
2024-07-10 14:11:34.025  INFO 29096 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 14:11:34时刻消息数量: 4
2024-07-10 14:11:34.059  INFO 29096 --- [   MyExecutor-1] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=754, content=content:754, expireTime=1720591893982)
2024-07-10 14:11:34.059  INFO 29096 --- [   MyExecutor-2] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=164, content=content:164, expireTime=1720591893982)
2024-07-10 14:11:34.059  INFO 29096 --- [   MyExecutor-3] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=641, content=content:641, expireTime=1720591893982)
2024-07-10 14:11:34.060  INFO 29096 --- [   MyExecutor-4] com.lsl.service.MessageService           : 开始处理消息 -> DelayMessage(messageId=264, content=content:264, expireTime=1720591893982)
2024-07-10 14:11:35.016  INFO 29096 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 14:11:35时刻消息数量: 4
2024-07-10 14:11:35.075  INFO 29096 --- [   MyExecutor-2] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=164, content=content:164, expireTime=1720591893982)
2024-07-10 14:11:35.075  INFO 29096 --- [   MyExecutor-3] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=641, content=content:641, expireTime=1720591893982)
2024-07-10 14:11:35.075  INFO 29096 --- [   MyExecutor-4] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=264, content=content:264, expireTime=1720591893982)
2024-07-10 14:11:35.075  INFO 29096 --- [   MyExecutor-1] com.lsl.service.MessageService           : 消息消费结束 -> DelayMessage(messageId=754, content=content:754, expireTime=1720591893982)
2024-07-10 14:11:36.010  INFO 29096 --- [   scheduling-1] com.lsl.handler.DelayMessageHandler      : 2024-07-10 14:11:36时刻消息数量: 0

从日志看解决了消息的重复消费问题,不过还是存在以下问题:

  • 某条消息在消费的过程中服务宕机了,那么服务重启后,redis中的锁并没有释放,这样会导致这条消息无法完成消费
  • 随着时间的累积,消费的消息越来越多,会占用redis资源

不过这里只是写个简单的示例代码,就不往下深挖了,后面看看有没有时间吧。

JDK自带延迟队列

JDK自带了一个DelayQueue,通过实现Delayed接口来实现延迟队列,详细代码如下:

任务对象代码:

java 复制代码
import lombok.Data;

import java.io.Serializable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
* JDK自带DelayQueue任务对象
*@author lsl
*/
@Data
public class DelayTask implements Delayed, Serializable {

    private static final long serialVersionUID = -400765043663885765L;

    private DelayMessage delayMessage;

    /**
     * 获取任务剩余有效时间
     * @param unit the time unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        long remainTime = this.delayMessage.getExpireTime() - System.currentTimeMillis();
        if (remainTime < 0) {
            remainTime = 0;
        }
        return remainTime;
    }

    /**
     * 比较任务剩余有效时间
     * @param o the object to be compared.
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        long remainTime = this.delayMessage.getExpireTime() - o.getDelay(TimeUnit.MILLISECONDS);
        if (remainTime > 0) {
            return 1;
        }
        if (remainTime < 0) {
            return -1;
        }
        return 0;
    }
}

队列服务类:

java 复制代码
import com.lsl.model.DelayMessage;
import com.lsl.model.DelayTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;

/**
 * @author lsl
 */
@Service
@Slf4j
public class DelayTaskService {

    @Autowired
    Executor executor;

    @Autowired
    MessageService messageService;

    public void handlerDelayTask() {
        DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
        Calendar instance = Calendar.getInstance();
        Random random = new Random(System.currentTimeMillis());
        instance.add(Calendar.SECOND, 30);
        for (int i = 0; i < 3; i++) {
            int msgId = random.nextInt(1000);
            DelayMessage delayMessage = new DelayMessage();
            delayMessage.setMessageId(msgId);
            delayMessage.setContent("delay task:" + msgId);
            delayMessage.setExpireTime(instance.getTime().getTime());
            DelayTask task = new DelayTask();
            task.setDelayMessage(delayMessage);
            delayQueue.put(task);
        }
        log.info("消息发送完成");
        while (true) {
            try {
                DelayTask task = delayQueue.take();
                executor.execute(() -> {
                    messageService.processDelayTask(task);
                });
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

}

public void processDelayTask(DelayTask task) {
    log.info("开始处理延迟任务 -> {}", task);
    try {
        Thread.sleep(1000);
    } catch (Exception e) {
        log.error("{}消费异常, {}", task, e);
    }
    log.info("处理延迟任务结束 -> {}", task);
}

执行结果:

bash 复制代码
2024-07-10 14:24:22.489  INFO 6088 --- [   MyExecutor-2] com.lsl.service.MessageService           : 开始处理延迟任务 -> DelayTask(delayMessage=DelayMessage(messageId=598, content=delay task:598, expireTime=1720592662473))
2024-07-10 14:24:22.489  INFO 6088 --- [   MyExecutor-3] com.lsl.service.MessageService           : 开始处理延迟任务 -> DelayTask(delayMessage=DelayMessage(messageId=474, content=delay task:474, expireTime=1720592662473))
2024-07-10 14:24:22.489  INFO 6088 --- [   MyExecutor-1] com.lsl.service.MessageService           : 开始处理延迟任务 -> DelayTask(delayMessage=DelayMessage(messageId=442, content=delay task:442, expireTime=1720592662473))
2024-07-10 14:24:23.491  INFO 6088 --- [   MyExecutor-1] com.lsl.service.MessageService           : 处理延迟任务结束 -> DelayTask(delayMessage=DelayMessage(messageId=442, content=delay task:442, expireTime=1720592662473))
2024-07-10 14:24:23.491  INFO 6088 --- [   MyExecutor-3] com.lsl.service.MessageService           : 处理延迟任务结束 -> DelayTask(delayMessage=DelayMessage(messageId=474, content=delay task:474, expireTime=1720592662473))
2024-07-10 14:24:23.491  INFO 6088 --- [   MyExecutor-2] com.lsl.service.MessageService           : 处理延迟任务结束 -> DelayTask(delayMessage=DelayMessage(messageId=598, content=delay task:598, expireTime=1720592662473))

最后放一下完整代码地址:git clone https://gitee.com/lslmoney/delay-queue.git