Bull任务队列实现浅析

2021年2月20日 20:00

使用(简单):

const Bull = require('bull');
const Queue = new Bull('Queue');
// 添加任务
Queue.add({
  foo: 'bar'
});
// 任务处理函数
Queue.process(async job => {
  return doSomething(job.data);
});
// 监听任务
Queue.on('completed', (job, resulve) => {
  console.log('Job completed');
});

Job的生命周期:

job-lifecycle

结论(从不同方面来讲):

  • 如何实现并发(concurrency)? 使用Promise.all(其实就算没有指定并发数量,也是用的Promise.all,只不过数组中只有一个promise)

  • 如何指定优先级?add任务的时候,可以显示设定优先级priority,优先级最高为1,最低为自定义的任何正整数

  • 如何在当前任务中插入指定优先级的任务?add任务的使用,

    const myJob = ,await myqueue.add({ foo: 'bar' }, { priority: 3 });

    这里并非使用JavaScript数组去查找,而是通过lua脚本来调用redis,关键代码(lua脚本):

    		-- Priority add
        rcall("ZADD", KEYS[6], priority, jobId)
        local count = rcall("ZCOUNT", KEYS[6], 0, priority)
      
        local len = rcall("LLEN", target)
        local id = rcall("LINDEX", target, len - (count-1))
        if id then
          rcall("LINSERT", target, "BEFORE", id, jobId)
        else
          rcall("RPUSH", target, jobId)
        end
    

    有优先级的队列比标准队列会稍微慢一点,官方有解释:

    Keep in mind that priority queues are a bit slower than a standard queue (currently insertion time O(n), n being the number of jobs currently waiting in the queue, instead of O(1) for standard queues).

    主要是插入的时候相比标准队列,时间复杂度会从O(1)飙到O(n)

  • 从最底层来说,通过JavaScript来调用lua脚本,用过lua脚本来调用redis(应该是使用Redis的事务来一次执行多个命令),Bull是基于Redis的PUSH-POP来实现队列的,然后通过PUBLISH-SUBSCRIBE来做事件分发