Improve queue configuration

Resolve #4157
Resolve #4158
This commit is contained in:
syuilo 2019-02-06 13:51:02 +09:00
parent e8a089b731
commit f0eb26f7e8
3 changed files with 71 additions and 30 deletions

View File

@ -5,11 +5,15 @@ program
.version(pkg.version)
.option('--no-daemons', 'Disable daemon processes (for debbuging)')
.option('--disable-clustering', 'Disable clustering')
.option('--disable-queue', 'Disable job queue')
.option('--disable-queue', 'Disable job queue processing')
.option('--only-queue', 'Pocessing job queue only')
.option('--quiet', 'Suppress all logs')
.option('--verbose', 'Enable all logs')
.option('--slow', 'Delay all requests (for debbuging)')
.option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.')
.parse(process.argv);
if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true;
if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true;
export { program };

View File

@ -35,6 +35,11 @@ const ev = new Xev();
function main() {
process.title = `Misskey (${cluster.isMaster ? 'master' : 'worker'})`;
if (program.onlyQueue) {
queueMain();
return;
}
if (cluster.isMaster || program.disableClustering) {
masterMain();
@ -53,12 +58,7 @@ function main() {
}
}
/**
* Init master process
*/
async function masterMain() {
let config: Config;
function greet() {
if (!program.quiet) {
//#region Misskey logo
const v = `v${pkg.version}`;
@ -75,10 +75,34 @@ async function masterMain() {
bootLogger.info('Welcome to Misskey!');
bootLogger.info(`Misskey v${pkg.version}`, true);
bootLogger.info('Misskey is maintained by @syuilo, @AyaMorisawa, @mei23, and @acid-chicken.');
}
/**
* Init master process
*/
async function masterMain() {
greet();
let config: Config;
try {
// initialize app
config = await init();
if (config.port == null) {
bootLogger.error('The port is not configured. Please configure port.', true);
process.exit(1);
}
if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) {
bootLogger.error('You need root privileges to listen on well-known port on Linux', true);
process.exit(1);
}
if (!await isPortAvailable(config.port)) {
bootLogger.error(`Port ${config.port} is already in use`, true);
process.exit(1);
}
} catch (e) {
bootLogger.error('Fatal error occurred during initialization', true);
process.exit(1);
@ -90,6 +114,9 @@ async function masterMain() {
await spawnWorkers(config.clusterLimit);
}
// start queue
require('./queue').default();
bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, true);
}
@ -100,15 +127,35 @@ async function workerMain() {
// start server
await require('./server').default();
// start processor
require('./queue').default();
if (cluster.isWorker) {
// Send a 'ready' message to parent process
process.send('ready');
}
}
async function queueMain() {
greet();
try {
// initialize app
await init();
} catch (e) {
bootLogger.error('Fatal error occurred during initialization', true);
process.exit(1);
}
bootLogger.succ('Misskey initialized');
// start processor
const queue = require('./queue').default();
if (queue) {
bootLogger.succ('Queue started', true);
} else {
bootLogger.error('Queue not available');
}
}
const runningNodejsVersion = process.version.slice(1).split('.').map(x => parseInt(x, 10));
const requiredNodejsVersion = [10, 0, 0];
const satisfyNodejsVersion = !lessThan(runningNodejsVersion, requiredNodejsVersion);
@ -170,21 +217,6 @@ async function init(): Promise<Config> {
configLogger.succ('Loaded');
if (config.port == null) {
bootLogger.error('The port is not configured. Please configure port.', true);
process.exit(1);
}
if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) {
bootLogger.error('You need root privileges to listen on well-known port on Linux', true);
process.exit(1);
}
if (!await isPortAvailable(config.port)) {
bootLogger.error(`Port ${config.port} is already in use`, true);
process.exit(1);
}
// Try to connect to MongoDB
try {
await checkMongoDB(config, bootLogger);

View File

@ -4,13 +4,15 @@ import config from '../config';
import { ILocalUser } from '../models/user';
import { program } from '../argv';
import handler from './processors';
import { queueLogger } from './logger';
const enableQueue = config.redis != null && !program.disableQueue;
const enableQueue = !program.disableQueue;
const queueAvailable = config.redis != null;
const queue = initializeQueue();
function initializeQueue() {
if (enableQueue) {
if (queueAvailable) {
return new Queue('misskey', {
redis: {
port: config.redis.port,
@ -30,7 +32,7 @@ function initializeQueue() {
}
export function createHttpJob(data: any) {
if (enableQueue) {
if (queueAvailable) {
return queue.createJob(data)
.retries(4)
.backoff('exponential', 16384) // 16s
@ -52,7 +54,7 @@ export function deliver(user: ILocalUser, content: any, to: any) {
}
export function createExportNotesJob(user: ILocalUser) {
if (!enableQueue) throw 'queue disabled';
if (!queueAvailable) throw 'queue unavailable';
return queue.createJob({
type: 'exportNotes',
@ -62,7 +64,10 @@ export function createExportNotesJob(user: ILocalUser) {
}
export default function() {
if (enableQueue) {
if (queueAvailable && enableQueue) {
queue.process(128, handler);
queueLogger.succ('Processing started');
}
return queue;
}