/** * チャートエンジン * * Tests located in test/chart */ import * as moment from 'moment'; import * as nestedProperty from 'nested-property'; import autobind from 'autobind-decorator'; import Logger from '../logger'; import { Schema } from '../../misc/schema'; import { EntitySchema, getRepository, Repository, LessThan, MoreThanOrEqual } from 'typeorm'; import { isDuplicateKeyValueError } from '../../misc/is-duplicate-key-value-error'; const logger = new Logger('chart', 'white', process.env.NODE_ENV !== 'test'); const utc = moment.utc; export type Obj = { [key: string]: any }; export type DeepPartial = { [P in keyof T]?: DeepPartial; }; type ArrayValue = { [P in keyof T]: T[P] extends number ? T[P][] : ArrayValue; }; type Span = 'day' | 'hour'; type Log = { id: number; /** * 集計のグループ */ group: string | null; /** * 集計日時のUnixタイムスタンプ(秒) */ date: number; /** * 集計期間 */ span: Span; /** * ユニークインクリメント用 */ unique?: Record; }; const camelToSnake = (str: string) => { return str.replace(/([A-Z])/g, s => '_' + s.charAt(0).toLowerCase()); }; /** * 様々なチャートの管理を司るクラス */ export default abstract class Chart> { private static readonly columnPrefix = '___'; private static readonly columnDot = '_'; private name: string; public schema: Schema; protected repository: Repository; protected abstract genNewLog(latest: T): DeepPartial; protected abstract async fetchActual(group?: string): Promise>; @autobind private static convertSchemaToFlatColumnDefinitions(schema: Schema) { const columns = {} as any; const flatColumns = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}${this.columnDot}${k}` : k; if (v.type === 'object') { flatColumns(v.properties, p); } else { columns[this.columnPrefix + p] = { type: 'bigint', }; } } }; flatColumns(schema.properties!); return columns; } @autobind private static convertFlattenColumnsToObject(x: Record) { const obj = {} as any; for (const k of Object.keys(x).filter(k => k.startsWith(Chart.columnPrefix))) { // now k is ___x_y_z const path = k.substr(Chart.columnPrefix.length).split(Chart.columnDot).join('.'); nestedProperty.set(obj, path, x[k]); } return obj; } @autobind private static convertObjectToFlattenColumns(x: Record) { const columns = {} as Record; const flatten = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}${this.columnDot}${k}` : k; if (typeof v === 'object') { flatten(v, p); } else { columns[this.columnPrefix + p] = v; } } }; flatten(x); return columns; } @autobind private static convertQuery(x: Record) { const query: Record = {}; const columns = Chart.convertObjectToFlattenColumns(x); for (const [k, v] of Object.entries(columns)) { if (v > 0) query[k] = () => `"${k}" + ${v}`; if (v < 0) query[k] = () => `"${k}" - ${v}`; } return query; } @autobind private static momentToTimestamp(x: moment.Moment): Log['date'] { return x.unix(); } @autobind public static schemaToEntity(name: string, schema: Schema): EntitySchema { return new EntitySchema({ name: `__chart__${camelToSnake(name)}`, columns: { id: { type: 'integer', primary: true, generated: true }, date: { type: 'integer', }, group: { type: 'varchar', length: 128, nullable: true }, span: { type: 'enum', enum: ['hour', 'day'] }, unique: { type: 'jsonb', default: {} }, ...Chart.convertSchemaToFlatColumnDefinitions(schema) }, }); } constructor(name: string, schema: Schema, grouped = false) { this.name = name; this.schema = schema; const entity = Chart.schemaToEntity(name, schema); const keys = ['span', 'date']; if (grouped) keys.push('group'); entity.options.uniques = [{ columns: keys }]; this.repository = getRepository(entity); } @autobind private getNewLog(latest: T | null): T { const log = latest ? this.genNewLog(latest) : {}; const flatColumns = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}.${k}` : k; if (v.type === 'object') { flatColumns(v.properties, p); } else { if (nestedProperty.get(log, p) == null) { nestedProperty.set(log, p, 0); } } } }; flatColumns(this.schema.properties!); return log as T; } @autobind private getCurrentDate(): [number, number, number, number] { const now = moment().utc(); const y = now.year(); const m = now.month(); const d = now.date(); const h = now.hour(); return [y, m, d, h]; } @autobind private getLatestLog(span: Span, group: string | null = null): Promise { return this.repository.findOne({ group: group, span: span }, { order: { date: -1 } }).then(x => x || null); } @autobind private async getCurrentLog(span: Span, group: string | null = null): Promise { const [y, m, d, h] = this.getCurrentDate(); const current = span == 'day' ? utc([y, m, d]) : span == 'hour' ? utc([y, m, d, h]) : null as never; // 現在(今日または今のHour)のログ const currentLog = await this.repository.findOne({ span: span, date: Chart.momentToTimestamp(current), ...(group ? { group: group } : {}) }); // ログがあればそれを返して終了 if (currentLog != null) { return currentLog; } let log: Log; let data: T; // 集計期間が変わってから、初めてのチャート更新なら // 最も最近のログを持ってくる // * 例えば集計期間が「日」である場合で考えると、 // * 昨日何もチャートを更新するような出来事がなかった場合は、 // * ログがそもそも作られずドキュメントが存在しないということがあり得るため、 // * 「昨日の」と決め打ちせずに「もっとも最近の」とします const latest = await this.getLatestLog(span, group); if (latest != null) { const obj = Chart.convertFlattenColumnsToObject( latest as Record); // 空ログデータを作成 data = await this.getNewLog(obj); } else { // ログが存在しなかったら // (Misskeyインスタンスを建てて初めてのチャート更新時) // 初期ログデータを作成 data = await this.getNewLog(null); logger.info(`${this.name}: Initial commit created`); } try { // 新規ログ挿入 log = await this.repository.save({ group: group, span: span, date: Chart.momentToTimestamp(current), ...Chart.convertObjectToFlattenColumns(data) }); } catch (e) { // duplicate key error // 並列動作している他のチャートエンジンプロセスと処理が重なる場合がある // その場合は再度最も新しいログを持ってくる if (isDuplicateKeyValueError(e)) { log = await this.getLatestLog(span, group) as Log; } else { logger.error(e); throw e; } } return log; } @autobind protected commit(query: Record, group: string | null = null, uniqueKey?: string, uniqueValue?: string): Promise { const update = async (log: Log) => { // ユニークインクリメントの場合、指定のキーに指定の値が既に存在していたら弾く if ( uniqueKey && log.unique && log.unique[uniqueKey] && log.unique[uniqueKey].includes(uniqueValue) ) return; // ユニークインクリメントの指定のキーに値を追加 if (uniqueKey && log.unique) { if (log.unique[uniqueKey]) { const sql = `jsonb_set("unique", '{${uniqueKey}}', ("unique"->>'${uniqueKey}')::jsonb || '["${uniqueValue}"]'::jsonb)`; query['unique'] = () => sql; } else { const sql = `jsonb_set("unique", '{${uniqueKey}}', '["${uniqueValue}"]')`; query['unique'] = () => sql; } } // ログ更新 await this.repository.createQueryBuilder() .update() .set(query) .where('id = :id', { id: log.id }) .execute(); }; return Promise.all([ this.getCurrentLog('day', group).then(log => update(log)), this.getCurrentLog('hour', group).then(log => update(log)), ]); } @autobind protected async inc(inc: DeepPartial, group: string | null = null): Promise { await this.commit(Chart.convertQuery(inc as any), group); } @autobind protected async incIfUnique(inc: DeepPartial, key: string, value: string, group: string | null = null): Promise { await this.commit(Chart.convertQuery(inc as any), group, key, value); } @autobind public async getChart(span: Span, range: number, group: string | null = null): Promise> { const [y, m, d, h] = this.getCurrentDate(); const gt = span == 'day' ? utc([y, m, d]).subtract(range, 'days') : span == 'hour' ? utc([y, m, d, h]).subtract(range, 'hours') : null as never; // ログ取得 let logs = await this.repository.find({ where: { group: group, span: span, date: MoreThanOrEqual(Chart.momentToTimestamp(gt)) }, order: { date: -1 }, }); // 要求された範囲にログがひとつもなかったら if (logs.length === 0) { // もっとも新しいログを持ってくる // (すくなくともひとつログが無いと隙間埋めできないため) const recentLog = await this.repository.findOne({ group: group, span: span }, { order: { date: -1 }, }); if (recentLog) { logs = [recentLog]; } // 要求された範囲の最も古い箇所に位置するログが存在しなかったら } else if (!utc(logs[logs.length - 1].date * 1000).isSame(gt)) { // 要求された範囲の最も古い箇所時点での最も新しいログを持ってきて末尾に追加する // (隙間埋めできないため) const outdatedLog = await this.repository.findOne({ group: group, span: span, date: LessThan(Chart.momentToTimestamp(gt)) }, { order: { date: -1 }, }); if (outdatedLog) { logs.push(outdatedLog); } } const chart: T[] = []; // 整形 for (let i = (range - 1); i >= 0; i--) { const current = span == 'day' ? utc([y, m, d]).subtract(i, 'days') : span == 'hour' ? utc([y, m, d, h]).subtract(i, 'hours') : null as never; const log = logs.find(l => utc(l.date * 1000).isSame(current)); if (log) { const data = Chart.convertFlattenColumnsToObject(log as Record); chart.unshift(data); } else { // 隙間埋め const latest = logs.find(l => utc(l.date * 1000).isBefore(current)); const data = latest ? Chart.convertFlattenColumnsToObject(latest as Record) : null; chart.unshift(this.getNewLog(data)); } } const res: ArrayValue = {} as any; /** * [{ foo: 1, bar: 5 }, { foo: 2, bar: 6 }, { foo: 3, bar: 7 }] * を * { foo: [1, 2, 3], bar: [5, 6, 7] } * にする */ const dive = (x: Obj, path?: string) => { for (const [k, v] of Object.entries(x)) { const p = path ? `${path}.${k}` : k; if (typeof v == 'object') { dive(v, p); } else { nestedProperty.set(res, p, chart.map(s => nestedProperty.get(s, p))); } } }; dive(chart[0]); return res; } } export function convertLog(logSchema: Schema): Schema { const v: Schema = JSON.parse(JSON.stringify(logSchema)); // copy if (v.type === 'number') { v.type = 'array'; v.items = { type: 'number' }; } else if (v.type === 'object') { for (const k of Object.keys(v.properties!)) { v.properties![k] = convertLog(v.properties![k]); } } return v; }