From 77c12cba8d4b53b206ef68d7e497fb6887246bc1 Mon Sep 17 00:00:00 2001 From: Kaity A Date: Sun, 19 Mar 2023 08:26:47 +0000 Subject: [PATCH] Add sonic full-text search support (#9714) This pull request adds support for the [sonic](https://github.com/valeriansaliou/sonic) full text indexing server into Calckey. In addition to this, a stateful endpoint has been added that will completely (re-)index all notes into any (elasticsearch and/or sonic) indexing server defined in your config at `/api/admin/search/index-all`. It can (optionally) take input data to define the starting point, such as: ``` {"cursor": "9beg3lx6ad"} ``` Currently if both sonic and elasticsearch are defined in the config, sonic will take precedence for searching, but both indexes will continue to be updated for new note creations. Future enhancements may include the ability to choose which indexer to use (or combine multiple). Co-authored-by: Kaitlyn Allan Reviewed-on: https://codeberg.org/calckey/calckey/pulls/9714 Co-authored-by: Kaity A Co-committed-by: Kaity A --- .config/example.yml | 10 +++ .gitignore | 1 + packages/backend/package.json | 1 + packages/backend/src/config/types.ts | 7 ++ packages/backend/src/db/sonic.ts | 51 ++++++++++++ packages/backend/src/queue/index.ts | 14 ++++ .../processors/background/index-all-notes.ts | 76 ++++++++++++++++++ .../src/queue/processors/background/index.ts | 15 ++++ packages/backend/src/queue/queues.ts | 2 + packages/backend/src/server/api/endpoints.ts | 2 + .../server/api/endpoints/admin/queue/stats.ts | 8 ++ .../api/endpoints/admin/search/index-all.ts | 28 +++++++ .../src/server/api/endpoints/notes/search.ts | 79 ++++++++++++++++++- packages/backend/src/services/note/create.ts | 41 +++++++--- pnpm-lock.yaml | 7 ++ 15 files changed, 328 insertions(+), 14 deletions(-) create mode 100644 packages/backend/src/db/sonic.ts create mode 100644 packages/backend/src/queue/processors/background/index-all-notes.ts create mode 100644 packages/backend/src/queue/processors/background/index.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/search/index-all.ts diff --git a/.config/example.yml b/.config/example.yml index 02661b7fb..b7b56f228 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -72,6 +72,16 @@ redis: # user: # pass: +# ┌─────────────────────┐ +#───┘ Sonic configuration └───────────────────────────────────── + +#sonic: +# host: localhost +# port: 1491 +# auth: SecretPassword +# collection: notes +# bucket: default + # ┌───────────────┐ #───┘ ID generation └─────────────────────────────────────────── diff --git a/.gitignore b/.gitignore index 52139614c..5e1d4a26d 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,7 @@ ormconfig.json packages/backend/assets/instance.css packages/backend/assets/sounds/None.mp3 +!packages/backend/src/db # blender backups *.blend1 diff --git a/packages/backend/package.json b/packages/backend/package.json index ce6efde9c..e7bec1a22 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -112,6 +112,7 @@ "seedrandom": "^3.0.5", "semver": "7.3.8", "sharp": "0.31.3", + "sonic-channel": "^1.3.1", "speakeasy": "2.0.0", "stringz": "2.1.0", "summaly": "2.7.0", diff --git a/packages/backend/src/config/types.ts b/packages/backend/src/config/types.ts index ed9b0ece0..a7cdc89cf 100644 --- a/packages/backend/src/config/types.ts +++ b/packages/backend/src/config/types.ts @@ -32,6 +32,13 @@ export type Source = { pass?: string; index?: string; }; + sonic: { + host: string; + port: number; + auth?: string; + collection?: string; + bucket?: string; + }; proxy?: string; proxySmtp?: string; diff --git a/packages/backend/src/db/sonic.ts b/packages/backend/src/db/sonic.ts new file mode 100644 index 000000000..6c4d28f70 --- /dev/null +++ b/packages/backend/src/db/sonic.ts @@ -0,0 +1,51 @@ +import * as SonicChannel from "sonic-channel"; +import { dbLogger } from "./logger.js"; + +import config from "@/config/index.js"; + +const logger = dbLogger.createSubLogger("sonic", "gray", false); + +logger.info("Connecting to Sonic"); + +const handlers = (type: string): SonicChannel.Handlers => ( + { + connected: () => { + logger.succ(`Connected to Sonic ${type}`); + }, + disconnected: (error) => { + logger.warn(`Disconnected from Sonic ${type}, error: ${error}`); + }, + error: (error) => { + logger.warn(`Sonic ${type} error: ${error}`); + }, + retrying: () => { + logger.info(`Sonic ${type} retrying`); + }, + timeout: () => { + logger.warn(`Sonic ${type} timeout`); + }, + } +) + +const hasConfig = + config.sonic + && ( config.sonic.host + || config.sonic.port + || config.sonic.auth + ) + +const host = hasConfig ? config.sonic.host ?? "localhost" : ""; +const port = hasConfig ? config.sonic.port ?? 1491 : 0; +const auth = hasConfig ? config.sonic.auth ?? "SecretPassword" : ""; +const collection = hasConfig ? config.sonic.collection ?? "main" : ""; +const bucket = hasConfig ? config.sonic.bucket ?? "default" : ""; + +export default hasConfig + ? { + search: new SonicChannel.Search({host, port, auth}).connect(handlers("search")), + ingest: new SonicChannel.Ingest({host, port, auth}).connect(handlers("ingest")), + + collection, + bucket, + } + : null; diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index c40b3c6ae..c387efe92 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -13,6 +13,7 @@ import processDb from "./processors/db/index.js"; import processObjectStorage from "./processors/object-storage/index.js"; import processSystemQueue from "./processors/system/index.js"; import processWebhookDeliver from "./processors/webhook-deliver.js"; +import processBackground from "./processors/background/index.js"; import { endedPollNotification } from "./processors/ended-poll-notification.js"; import { queueLogger } from "./logger.js"; import { getJobInfo } from "./get-job-info.js"; @@ -24,6 +25,7 @@ import { objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue, + backgroundQueue, } from "./queues.js"; import type { ThinUser } from "./types.js"; @@ -418,6 +420,17 @@ export function createCleanRemoteFilesJob() { ); } +export function createIndexAllNotesJob(data = {}) { + return backgroundQueue.add( + "indexAllNotes", + data, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + export function webhookDeliver( webhook: Webhook, type: typeof webhookEventTypes[number], @@ -454,6 +467,7 @@ export default function () { webhookDeliverQueue.process(64, processWebhookDeliver); processDb(dbQueue); processObjectStorage(objectStorageQueue); + processBackground(backgroundQueue); systemQueue.add( "tickCharts", diff --git a/packages/backend/src/queue/processors/background/index-all-notes.ts b/packages/backend/src/queue/processors/background/index-all-notes.ts new file mode 100644 index 000000000..f03275886 --- /dev/null +++ b/packages/backend/src/queue/processors/background/index-all-notes.ts @@ -0,0 +1,76 @@ +import type Bull from "bull"; + +import { queueLogger } from "../../logger.js"; +import { Notes } from "@/models/index.js"; +import { MoreThan } from "typeorm"; +import { index } from "@/services/note/create.js" +import { Note } from "@/models/entities/note.js"; + +const logger = queueLogger.createSubLogger("index-all-notes"); + +export default async function indexAllNotes( + job: Bull.Job>, + done: ()=>void, +): Promise { + logger.info("Indexing all notes..."); + + let cursor: string|null = job.data.cursor as string ?? null; + let indexedCount: number = job.data.indexedCount as number ?? 0; + let total: number = job.data.total as number ?? 0; + + let running = true; + const take = 50000; + const batch = 100; + while (running) { + logger.info(`Querying for ${take} notes ${indexedCount}/${total ? total : '?'} at ${cursor}`); + + let notes: Note[] = []; + try { + notes = await Notes.find({ + where: { + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: take, + order: { + id: 1, + }, + }); + } catch (e) { + logger.error(`Failed to query notes ${e}`); + continue; + } + + if (notes.length === 0) { + job.progress(100); + running = false; + break; + } + + try { + const count = await Notes.count(); + total = count; + job.update({ indexedCount, cursor, total }) + } catch (e) { + } + + for (let i = 0; i < notes.length; i += batch) { + const chunk = notes.slice(i, i + batch); + await Promise.all(chunk.map(note => index(note))); + + indexedCount += chunk.length; + const pct = (indexedCount / total)*100; + job.update({ indexedCount, cursor, total }) + job.progress(+(pct.toFixed(1))); + logger.info(`Indexed notes ${indexedCount}/${total ? total : '?'}`); + } + cursor = notes[notes.length - 1].id; + job.update({ indexedCount, cursor, total }) + + if (notes.length < take) { + running = false; + } + } + + done(); + logger.info("All notes have been indexed."); +} diff --git a/packages/backend/src/queue/processors/background/index.ts b/packages/backend/src/queue/processors/background/index.ts new file mode 100644 index 000000000..cf96b67ef --- /dev/null +++ b/packages/backend/src/queue/processors/background/index.ts @@ -0,0 +1,15 @@ +import type Bull from "bull"; +import indexAllNotes from "./index-all-notes.js"; + +const jobs = { + indexAllNotes, +} as Record< + string, + Bull.ProcessCallbackFunction> +>; + +export default function (q: Bull.Queue) { + for (const [k, v] of Object.entries(jobs)) { + q.process(k, 16, v); + } +} diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index 12d9d6620..6d7fffcb3 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -27,6 +27,7 @@ export const webhookDeliverQueue = initializeQueue( "webhookDeliver", 64, ); +export const backgroundQueue = initializeQueue>("bg"); export const queues = [ systemQueue, @@ -36,4 +37,5 @@ export const queues = [ dbQueue, objectStorageQueue, webhookDeliverQueue, + backgroundQueue, ]; diff --git a/packages/backend/src/server/api/endpoints.ts b/packages/backend/src/server/api/endpoints.ts index bfafbaa62..ba0e721b9 100644 --- a/packages/backend/src/server/api/endpoints.ts +++ b/packages/backend/src/server/api/endpoints.ts @@ -51,6 +51,7 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js"; import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js"; import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js"; import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js"; +import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js"; import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js"; import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js"; import * as ep___admin_showModerationLogs from "./endpoints/admin/show-moderation-logs.js"; @@ -393,6 +394,7 @@ const eps = [ ["admin/relays/remove", ep___admin_relays_remove], ["admin/reset-password", ep___admin_resetPassword], ["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport], + ["admin/search/index-all", ep___admin_search_indexAll], ["admin/send-email", ep___admin_sendEmail], ["admin/server-info", ep___admin_serverInfo], ["admin/show-moderation-logs", ep___admin_showModerationLogs], diff --git a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts index ecd67d893..4a437c3d1 100644 --- a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts +++ b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts @@ -3,6 +3,7 @@ import { inboxQueue, dbQueue, objectStorageQueue, + backgroundQueue, } from "@/queue/queues.js"; import define from "../../../define.js"; @@ -37,6 +38,11 @@ export const meta = { nullable: false, ref: "QueueCount", }, + backgroundQueue: { + optional: false, + nullable: false, + ref: "QueueCount", + }, }, }, } as const; @@ -52,11 +58,13 @@ export default define(meta, paramDef, async (ps) => { const inboxJobCounts = await inboxQueue.getJobCounts(); const dbJobCounts = await dbQueue.getJobCounts(); const objectStorageJobCounts = await objectStorageQueue.getJobCounts(); + const backgroundJobCounts = await backgroundQueue.getJobCounts(); return { deliver: deliverJobCounts, inbox: inboxJobCounts, db: dbJobCounts, objectStorage: objectStorageJobCounts, + backgroundQueue: backgroundJobCounts, }; }); diff --git a/packages/backend/src/server/api/endpoints/admin/search/index-all.ts b/packages/backend/src/server/api/endpoints/admin/search/index-all.ts new file mode 100644 index 000000000..135b48ecc --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/search/index-all.ts @@ -0,0 +1,28 @@ +import define from "../../../define.js"; +import { createIndexAllNotesJob } from "@/queue/index.js"; + +export const meta = { + tags: ["admin"], + + requireCredential: true, + requireModerator: true, +} as const; + +export const paramDef = { + type: "object", + properties: { + cursor: { + type: "string", + format: "misskey:id", + nullable: true, + default: null, + }, + }, + required: [], +} as const; + +export default define(meta, paramDef, async (ps, _me) => { + createIndexAllNotesJob({ + cursor: ps.cursor ?? undefined, + }); +}); diff --git a/packages/backend/src/server/api/endpoints/notes/search.ts b/packages/backend/src/server/api/endpoints/notes/search.ts index ce60436db..5e431d4f7 100644 --- a/packages/backend/src/server/api/endpoints/notes/search.ts +++ b/packages/backend/src/server/api/endpoints/notes/search.ts @@ -1,7 +1,9 @@ import { In } from "typeorm"; import { Notes } from "@/models/index.js"; +import { Note } from "@/models/entities/note.js"; import config from "@/config/index.js"; import es from "../../../../db/elasticsearch.js"; +import sonic from "../../../../db/sonic.js"; import define from "../../define.js"; import { makePaginationQuery } from "../../common/make-pagination-query.js"; import { generateVisibilityQuery } from "../../common/generate-visibility-query.js"; @@ -59,7 +61,7 @@ export const paramDef = { } as const; export default define(meta, paramDef, async (ps, me) => { - if (es == null) { + if (es == null && sonic == null) { const query = makePaginationQuery( Notes.createQueryBuilder("note"), ps.sinceId, @@ -92,9 +94,82 @@ export default define(meta, paramDef, async (ps, me) => { if (me) generateMutedUserQuery(query, me); if (me) generateBlockedUserQuery(query, me); - const notes = await query.take(ps.limit).getMany(); + const notes: Note[] = await query.take(ps.limit).getMany(); return await Notes.packMany(notes, me); + } else if (sonic) { + let start = 0; + const chunkSize = 100; + + // Use sonic to fetch and step through all search results that could match the requirements + const ids = []; + while (true) { + const results = await sonic.search.query( + sonic.collection, + sonic.bucket, + ps.query, + { + limit: chunkSize, + offset: start, + }, + ); + + start += chunkSize; + + if (results.length === 0) { + break; + } + + const res = results + .map((k) => JSON.parse(k)) + .filter((key) => { + if (ps.userId && key.userId !== ps.userId) { + return false; + } + if (ps.channelId && key.channelId !== ps.channelId) { + return false; + } + if (ps.sinceId && key.id <= ps.sinceId) { + return false; + } + if (ps.untilId && key.id >= ps.untilId) { + return false; + } + return true; + }) + .map((key) => key.id); + + ids.push(...res); + } + + // Sort all the results by note id DESC (newest first) + ids.sort((a, b) => b - a); + + // Fetch the notes from the database until we have enough to satisfy the limit + start = 0; + const found = []; + while (found.length < ps.limit && start < ids.length) { + const chunk = ids.slice(start, start + chunkSize); + const notes: Note[] = await Notes.find({ + where: { + id: In(chunk), + }, + order: { + id: "DESC", + }, + }); + + // The notes are checked for visibility and muted/blocked users when packed + found.push(...await Notes.packMany(notes, me)); + start += chunkSize; + } + + // If we have more results than the limit, trim them + if (found.length > ps.limit) { + found.length = ps.limit; + } + + return found; } else { const userQuery = ps.userId != null diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 968aed880..6c7fd9ad5 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -1,5 +1,6 @@ import * as mfm from "mfm-js"; import es from "../../db/elasticsearch.js"; +import sonic from "../../db/sonic.js"; import { publishMainStream, publishNotesStream, @@ -588,7 +589,7 @@ export default async ( } // Register to search database - index(note); + await index(note); }); async function renderNoteOrRenoteActivity(data: Option, note: Note) { @@ -728,18 +729,34 @@ async function insertNote( } } -function index(note: Note) { - if (note.text == null || config.elasticsearch == null) return; +export async function index(note: Note): Promise { + if (!note.text) return; - es!.index({ - index: config.elasticsearch.index || "misskey_note", - id: note.id.toString(), - body: { - text: normalizeForSearch(note.text), - userId: note.userId, - userHost: note.userHost, - }, - }); + if (config.elasticsearch && es) { + es.index({ + index: config.elasticsearch.index || "misskey_note", + id: note.id.toString(), + body: { + text: normalizeForSearch(note.text), + userId: note.userId, + userHost: note.userHost, + }, + }); + } + + if (sonic) { + await sonic.ingest.push( + sonic.collection, + sonic.bucket, + JSON.stringify({ + id: note.id, + userId: note.userId, + userHost: note.userHost, + channelId: note.channelId, + }), + note.text, + ); + } } async function notifyToWatchersOfRenotee( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index de1200477..37bc6c805 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -196,6 +196,7 @@ importers: seedrandom: ^3.0.5 semver: 7.3.8 sharp: 0.31.3 + sonic-channel: ^1.3.1 speakeasy: 2.0.0 strict-event-emitter-types: 2.0.0 stringz: 2.1.0 @@ -310,6 +311,7 @@ importers: seedrandom: 3.0.5 semver: 7.3.8 sharp: 0.31.3 + sonic-channel: 1.3.1 speakeasy: 2.0.0 stringz: 2.1.0 summaly: 2.7.0 @@ -11594,6 +11596,11 @@ packages: smart-buffer: 4.2.0 dev: false + /sonic-channel/1.3.1: + resolution: {integrity: sha512-+K4IZVFE7Tf2DB4EFZ23xo7a/+gJaiOHhFzXVZpzkX6Rs/rvf4YbSxnEGdYw8mrTcjtpG+jLVQEhP8sNTtN5VA==} + engines: {node: '>= 6.0.0'} + dev: false + /sort-keys-length/1.0.1: resolution: {integrity: sha512-GRbEOUqCxemTAk/b32F2xa8wDTs+Z1QHOkbhJDQTvv/6G3ZkbJ+frYWsTcc7cBB3Fu4wy4XlLCuNtJuMn7Gsvw==} engines: {node: '>=0.10.0'}