From d4c9d41f98ed7aefcc8e338393d26fc89d3c53ea Mon Sep 17 00:00:00 2001 From: syuilo Date: Mon, 4 Feb 2019 13:35:58 +0900 Subject: [PATCH] Enable job queue Resolve #3216 --- src/index.ts | 3 +++ src/queue/index.ts | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index 69e64737d..67c1ecb90 100644 --- a/src/index.ts +++ b/src/index.ts @@ -100,6 +100,9 @@ async function workerMain() { // start server await require('./server').default(); + // start processor + require('./queue').default(); + if (cluster.isWorker) { // Send a 'ready' message to parent process process.send('ready'); diff --git a/src/queue/index.ts b/src/queue/index.ts index 431d4cb3e..5e5f9dae4 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,9 +1,34 @@ +import * as Queue from 'bee-queue'; +import config from '../config'; import http from './processors/http'; import { ILocalUser } from '../models/user'; import Logger from '../misc/logger'; +const enableQueue = config.redis != null; + +const queue = new Queue('misskey', { + redis: { + port: config.redis.port, + host: config.redis.host, + password: config.redis.pass + }, + + removeOnSuccess: true, + removeOnFailure: true, + getEvents: false, + sendEvents: false, + storeJobs: false +}); + export function createHttpJob(data: any) { - return http({ data }, () => {}); + if (enableQueue) { + return queue.createJob(data) + .retries(4) + .backoff('exponential', 16384) // 16s + .save(); + } else { + return http({ data }, () => {}); + } } export function deliver(user: ILocalUser, content: any, to: any) { @@ -18,3 +43,9 @@ export function deliver(user: ILocalUser, content: any, to: any) { } export const queueLogger = new Logger('queue'); + +export default function() { + if (enableQueue) { + queue.process(128, http); + } +}