From 878970d318827acc03eb8cdf5902bfdbefa95827 Mon Sep 17 00:00:00 2001 From: Laura Hausmann Date: Sun, 8 Oct 2023 16:36:20 +0200 Subject: [PATCH] [mastodon-client] Implement streaming API --- .../backend/src/misc/is-instance-muted.ts | 3 +- .../src/remote/activitypub/models/note.ts | 9 +- .../endpoints/admin/announcements/create.ts | 3 + .../endpoints/admin/announcements/delete.ts | 2 + .../server/api/mastodon/converters/note.ts | 9 +- .../api/mastodon/converters/notification.ts | 19 +- .../api/mastodon/endpoints/streaming.ts | 7 + .../src/server/api/mastodon/helpers/misc.ts | 2 +- .../src/server/api/mastodon/helpers/note.ts | 31 ++- .../server/api/mastodon/helpers/timeline.ts | 1 - .../backend/src/server/api/mastodon/index.ts | 11 + .../server/api/mastodon/streaming/channel.ts | 48 ++++ .../api/mastodon/streaming/channels/direct.ts | 72 +++++ .../api/mastodon/streaming/channels/list.ts | 89 ++++++ .../api/mastodon/streaming/channels/public.ts | 80 ++++++ .../api/mastodon/streaming/channels/tag.ts | 75 +++++ .../api/mastodon/streaming/channels/user.ts | 112 ++++++++ .../server/api/mastodon/streaming/index.ts | 261 ++++++++++++++++++ .../backend/src/server/api/stream/index.ts | 2 - .../backend/src/server/api/stream/types.ts | 12 + packages/backend/src/server/api/streaming.ts | 22 +- packages/backend/src/services/note/delete.ts | 4 +- packages/backend/src/services/note/edit.ts | 4 +- packages/backend/src/services/stream.ts | 16 +- 24 files changed, 865 insertions(+), 29 deletions(-) create mode 100644 packages/backend/src/server/api/mastodon/endpoints/streaming.ts create mode 100644 packages/backend/src/server/api/mastodon/streaming/channel.ts create mode 100644 packages/backend/src/server/api/mastodon/streaming/channels/direct.ts create mode 100644 packages/backend/src/server/api/mastodon/streaming/channels/list.ts create mode 100644 packages/backend/src/server/api/mastodon/streaming/channels/public.ts create mode 100644 packages/backend/src/server/api/mastodon/streaming/channels/tag.ts create mode 100644 packages/backend/src/server/api/mastodon/streaming/channels/user.ts create mode 100644 packages/backend/src/server/api/mastodon/streaming/index.ts diff --git a/packages/backend/src/misc/is-instance-muted.ts b/packages/backend/src/misc/is-instance-muted.ts index 1547d4555..da949e6de 100644 --- a/packages/backend/src/misc/is-instance-muted.ts +++ b/packages/backend/src/misc/is-instance-muted.ts @@ -1,7 +1,8 @@ import type { Packed } from "./schema.js"; +import { Note } from "@/models/entities/note.js"; export function isInstanceMuted( - note: Packed<"Note">, + note: Packed<"Note"> | Note, mutedInstances: Set, ): boolean { if (mutedInstances.has(note?.user?.host ?? "")) return true; diff --git a/packages/backend/src/remote/activitypub/models/note.ts b/packages/backend/src/remote/activitypub/models/note.ts index 1cb348973..bd2e1b73c 100644 --- a/packages/backend/src/remote/activitypub/models/note.ts +++ b/packages/backend/src/remote/activitypub/models/note.ts @@ -45,7 +45,7 @@ import { extractApMentions } from "./mention.js"; import DbResolver from "../db-resolver.js"; import { StatusError } from "@/misc/fetch.js"; import { shouldBlockInstance } from "@/misc/should-block-instance.js"; -import { publishNoteStream } from "@/services/stream.js"; +import { publishNoteStream, publishNoteUpdatesStream } from "@/services/stream.js"; import { extractHashtags } from "@/misc/extract-hashtags.js"; import { UserProfiles } from "@/models/index.js"; import { In } from "typeorm"; @@ -760,5 +760,12 @@ export async function updateNote(value: string | IObject, resolver?: Resolver) { publishNoteStream(note.id, "updated", { updatedAt: update.updatedAt, }); + + const updatedNote = { + ...note, + ...update + }; + + publishNoteUpdatesStream("updated", updatedNote); } } diff --git a/packages/backend/src/server/api/endpoints/admin/announcements/create.ts b/packages/backend/src/server/api/endpoints/admin/announcements/create.ts index 754cc6c89..13e2f5dc9 100644 --- a/packages/backend/src/server/api/endpoints/admin/announcements/create.ts +++ b/packages/backend/src/server/api/endpoints/admin/announcements/create.ts @@ -1,6 +1,7 @@ import define from "../../../define.js"; import { Announcements } from "@/models/index.js"; import { genId } from "@/misc/gen-id.js"; +import { publishBroadcastStream } from "@/services/stream.js"; export const meta = { tags: ["admin"], @@ -85,6 +86,8 @@ export default define(meta, paramDef, async (ps) => { isGoodNews: ps.isGoodNews ?? false, }).then((x) => Announcements.findOneByOrFail(x.identifiers[0])); + publishBroadcastStream("announcementAdded", announcement); + return Object.assign({}, announcement, { createdAt: announcement.createdAt.toISOString(), updatedAt: null, diff --git a/packages/backend/src/server/api/endpoints/admin/announcements/delete.ts b/packages/backend/src/server/api/endpoints/admin/announcements/delete.ts index 5665b94a7..e4f26ddb9 100644 --- a/packages/backend/src/server/api/endpoints/admin/announcements/delete.ts +++ b/packages/backend/src/server/api/endpoints/admin/announcements/delete.ts @@ -1,6 +1,7 @@ import define from "../../../define.js"; import { Announcements } from "@/models/index.js"; import { ApiError } from "../../../error.js"; +import { publishBroadcastStream } from "@/services/stream.js"; export const meta = { tags: ["admin"], @@ -30,5 +31,6 @@ export default define(meta, paramDef, async (ps, me) => { if (announcement == null) throw new ApiError(meta.errors.noSuchAnnouncement); + publishBroadcastStream("announcementDeleted", announcement.id); await Announcements.delete(announcement.id); }); diff --git a/packages/backend/src/server/api/mastodon/converters/note.ts b/packages/backend/src/server/api/mastodon/converters/note.ts index 111ee33ee..8e65c9919 100644 --- a/packages/backend/src/server/api/mastodon/converters/note.ts +++ b/packages/backend/src/server/api/mastodon/converters/note.ts @@ -18,7 +18,8 @@ import { awaitAll } from "@/prelude/await-all.js"; import { UserHelpers } from "@/server/api/mastodon/helpers/user.js"; import { IsNull } from "typeorm"; import { MfmHelpers } from "@/server/api/mastodon/helpers/mfm.js"; -import { MastoContext } from "@/server/api/mastodon/index.js"; +import { getStubMastoContext, MastoContext } from "@/server/api/mastodon/index.js"; +import { NoteHelpers } from "@/server/api/mastodon/helpers/note.js"; export class NoteConverter { public static async encode(note: Note, ctx: MastoContext, recurse: boolean = true): Promise { @@ -164,4 +165,10 @@ export class NoteConverter { }; }); } + + public static async encodeEvent(note: Note, user: ILocalUser | undefined): Promise { + const ctx = getStubMastoContext(user); + NoteHelpers.fixupEventNote(note); + return NoteConverter.encode(note, ctx); + } } diff --git a/packages/backend/src/server/api/mastodon/converters/notification.ts b/packages/backend/src/server/api/mastodon/converters/notification.ts index 58c99050d..dbfde26a2 100644 --- a/packages/backend/src/server/api/mastodon/converters/notification.ts +++ b/packages/backend/src/server/api/mastodon/converters/notification.ts @@ -6,7 +6,8 @@ import { UserHelpers } from "@/server/api/mastodon/helpers/user.js"; import { awaitAll } from "@/prelude/await-all.js"; import { NoteConverter } from "@/server/api/mastodon/converters/note.js"; import { getNote } from "@/server/api/common/getters.js"; -import { MastoContext } from "@/server/api/mastodon/index.js"; +import { getStubMastoContext, MastoContext } from "@/server/api/mastodon/index.js"; +import { Notifications } from "@/models/index.js"; type NotificationType = typeof notificationTypes[number]; @@ -26,11 +27,13 @@ export class NotificationConverter { type: this.encodeNotificationType(notification.type), }; - if (notification.note) { - const isPureRenote = notification.note.renoteId !== null && notification.note.text === null; + const note = notification.note ?? (notification.noteId ? await getNote(notification.noteId, localUser) : null); + + if (note) { + const isPureRenote = note.renoteId !== null && note.text === null; const encodedNote = isPureRenote - ? getNote(notification.note.renoteId!, localUser).then(note => NoteConverter.encode(note, ctx)) - : NoteConverter.encode(notification.note, ctx); + ? getNote(note.renoteId!, localUser).then(note => NoteConverter.encode(note, ctx)) + : NoteConverter.encode(note, ctx); result = Object.assign(result, { status: encodedNote, }); @@ -78,4 +81,10 @@ export class NotificationConverter { throw new Error(`Notification type ${t} not supported`); } } + + public static async encodeEvent(target: Notification["id"], user: ILocalUser): Promise { + const ctx = getStubMastoContext(user); + const notification = await Notifications.findOneByOrFail({ id: target }); + return this.encode(notification, ctx).catch(_ => null); + } } diff --git a/packages/backend/src/server/api/mastodon/endpoints/streaming.ts b/packages/backend/src/server/api/mastodon/endpoints/streaming.ts new file mode 100644 index 000000000..df378bf5c --- /dev/null +++ b/packages/backend/src/server/api/mastodon/endpoints/streaming.ts @@ -0,0 +1,7 @@ +import Router from "@koa/router"; + +export function setupEndpointsStreaming(router: Router): void { + router.get("/v1/streaming/health", async (ctx) => { + ctx.body = "OK"; + }); +} \ No newline at end of file diff --git a/packages/backend/src/server/api/mastodon/helpers/misc.ts b/packages/backend/src/server/api/mastodon/helpers/misc.ts index 5ff385c8f..f03b108b0 100644 --- a/packages/backend/src/server/api/mastodon/helpers/misc.ts +++ b/packages/backend/src/server/api/mastodon/helpers/misc.ts @@ -48,7 +48,7 @@ export class MiscHelpers { email: meta.maintainerEmail || "", version: `4.1.0 (compatible; Iceshrimp ${config.version})`, urls: { - streaming_api: `${config.url.replace(/^http(?=s?:\/\/)/, "ws")}/streaming`, + streaming_api: `${config.url.replace(/^http(?=s?:\/\/)/, "ws")}/mastodon`, }, stats: awaitAll({ user_count: userCount, diff --git a/packages/backend/src/server/api/mastodon/helpers/note.ts b/packages/backend/src/server/api/mastodon/helpers/note.ts index cf33c866f..42bc7b6da 100644 --- a/packages/backend/src/server/api/mastodon/helpers/note.ts +++ b/packages/backend/src/server/api/mastodon/helpers/note.ts @@ -23,13 +23,13 @@ import { VisibilityConverter } from "@/server/api/mastodon/converters/visibility import mfm from "mfm-js"; import { FileConverter } from "@/server/api/mastodon/converters/file.js"; import { MfmHelpers } from "@/server/api/mastodon/helpers/mfm.js"; -import { toArray } from "@/prelude/array.js"; +import { toArray, unique } from "@/prelude/array.js"; import { MastoApiError } from "@/server/api/mastodon/middleware/catch-errors.js"; import { Cache } from "@/misc/cache.js"; import AsyncLock from "async-lock"; import { IdentifiableError } from "@/misc/identifiable-error.js"; import { IsNull } from "typeorm"; -import { MastoContext } from "@/server/api/mastodon/index.js"; +import { getStubMastoContext, MastoContext } from "@/server/api/mastodon/index.js"; export class NoteHelpers { public static postIdempotencyCache = new Cache<{ status?: MastodonEntity.Status }>('postIdempotencyCache', 60 * 60); @@ -412,6 +412,33 @@ export class NoteHelpers { }); } + public static async getConversationFromEvent(noteId: string, user: ILocalUser): Promise { + const ctx = getStubMastoContext(user); + const note = await getNote(noteId, ctx.user); + const conversationId = note.threadId ?? note.id; + const userIds = unique([note.userId].concat(note.visibleUserIds).filter(p => p != ctx.user.id)); + const users = userIds.map(id => UserHelpers.getUserCached(id, ctx).catch(_ => null)); + const accounts = Promise.all(users).then(u => UserConverter.encodeMany(u.filter(u => u) as User[], ctx)); + const res = { + id: conversationId, + accounts: accounts.then(u => u.length > 0 ? u : UserConverter.encodeMany([ctx.user], ctx)), // failsafe to prevent apps from crashing case when all participant users have been deleted + last_status: NoteConverter.encode(note, ctx), + unread: true + }; + + return awaitAll(res); + } + + public static fixupEventNote(note: Note): Note { + note.createdAt = note.createdAt ? new Date(note.createdAt) : note.createdAt; + note.updatedAt = note.updatedAt ? new Date(note.updatedAt) : note.updatedAt; + note.reply = null; + note.renote = null; + note.user = null; + + return note; + } + public static getIdempotencyKey(ctx: MastoContext): string | null { const headers = ctx.headers; const user = ctx.user as ILocalUser; diff --git a/packages/backend/src/server/api/mastodon/helpers/timeline.ts b/packages/backend/src/server/api/mastodon/helpers/timeline.ts index 01c9262fb..a406ca798 100644 --- a/packages/backend/src/server/api/mastodon/helpers/timeline.ts +++ b/packages/backend/src/server/api/mastodon/helpers/timeline.ts @@ -52,7 +52,6 @@ export class TimelineHelpers { generateMutedUserRenotesQueryForNotes(query, user); query.andWhere("note.visibility != 'hidden'"); - query.andWhere("note.visibility != 'specified'"); return PaginationHelpers.execQueryLinkPagination(query, limit, minId !== undefined, ctx); } diff --git a/packages/backend/src/server/api/mastodon/index.ts b/packages/backend/src/server/api/mastodon/index.ts index 80227094b..46993038e 100644 --- a/packages/backend/src/server/api/mastodon/index.ts +++ b/packages/backend/src/server/api/mastodon/index.ts @@ -18,6 +18,9 @@ import { KoaBodyMiddleware } from "@/server/api/mastodon/middleware/koa-body.js" import { NormalizeQueryMiddleware } from "@/server/api/mastodon/middleware/normalize-query.js"; import { PaginationMiddleware } from "@/server/api/mastodon/middleware/pagination.js"; import { SetHeadersMiddleware } from "@/server/api/mastodon/middleware/set-headers.js"; +import { UserHelpers } from "@/server/api/mastodon/helpers/user.js"; +import { ILocalUser } from "@/models/entities/user.js"; +import { setupEndpointsStreaming } from "@/server/api/mastodon/endpoints/streaming.js"; export const logger = apiLogger.createSubLogger("mastodon"); export type MastoContext = RouterContext & DefaultContext; @@ -30,6 +33,7 @@ export function setupMastodonApi(router: Router): void { setupEndpointsFilter(router); setupEndpointsTimeline(router); setupEndpointsNotifications(router); + setupEndpointsStreaming(router); setupEndpointsSearch(router); setupEndpointsMedia(router); setupEndpointsList(router); @@ -45,3 +49,10 @@ function setupMiddleware(router: Router): void { router.use(AuthMiddleware); router.use(CacheMiddleware); } + +export function getStubMastoContext(user: ILocalUser | null | undefined): any { + return { + user: user ?? null, + cache: UserHelpers.getFreshAccountCache() + }; +} \ No newline at end of file diff --git a/packages/backend/src/server/api/mastodon/streaming/channel.ts b/packages/backend/src/server/api/mastodon/streaming/channel.ts new file mode 100644 index 000000000..759533e3c --- /dev/null +++ b/packages/backend/src/server/api/mastodon/streaming/channel.ts @@ -0,0 +1,48 @@ +import { MastodonStreamingConnection } from "."; + +export abstract class MastodonStream { + protected connection: MastodonStreamingConnection; + public readonly chName: string; + public static readonly shouldShare: boolean; + public static readonly requireCredential: boolean; + public static readonly requiredScopes: string[] = []; + + protected get user() { + return this.connection.user; + } + + protected get userProfile() { + return this.connection.userProfile; + } + + protected get following() { + return this.connection.following; + } + + protected get muting() { + return this.connection.muting; + } + + protected get renoteMuting() { + return this.connection.renoteMuting; + } + + protected get blocking() { + return this.connection.blocking; + } + + protected get subscriber() { + return this.connection.subscriber; + } + + protected constructor(connection: MastodonStreamingConnection, name: string) { + this.chName = name; + this.connection = connection; + } + + public abstract init(params: any): void; + + public dispose?(): void; + + public onMessage?(type: string, body: any): void; +} diff --git a/packages/backend/src/server/api/mastodon/streaming/channels/direct.ts b/packages/backend/src/server/api/mastodon/streaming/channels/direct.ts new file mode 100644 index 000000000..70f29f0b3 --- /dev/null +++ b/packages/backend/src/server/api/mastodon/streaming/channels/direct.ts @@ -0,0 +1,72 @@ +import { MastodonStream } from "../channel.js"; +import { Note } from "@/models/entities/note.js"; +import { NoteConverter } from "@/server/api/mastodon/converters/note.js"; +import { StreamMessages } from "@/server/api/stream/types.js"; +import { NoteHelpers } from "@/server/api/mastodon/helpers/note.js"; +import { Packed } from "@/misc/schema.js"; + +export class MastodonStreamDirect extends MastodonStream { + public static shouldShare = true; + public static requireCredential = true; + public static requiredScopes = ['read:statuses']; + + constructor(connection: MastodonStream["connection"], name: string) { + super(connection, name); + this.onNote = this.onNote.bind(this); + this.onNoteEvent = this.onNoteEvent.bind(this); + } + + override get user() { + return this.connection.user!; + } + + public async init() { + this.subscriber.on("notesStream", this.onNote); + this.subscriber.on("noteUpdatesStream", this.onNoteEvent); + } + + private async onNote(note: Note) { + if (!this.shouldProcessNote(note)) return; + + NoteConverter.encodeEvent(note, this.user).then(encoded => { + this.connection.send(this.chName, "update", encoded); + }); + + NoteHelpers.getConversationFromEvent(note.id, this.user).then(conversation => { + this.connection.send(this.chName, "conversation", conversation); + }); + } + + private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) { + const note = data.body; + if (!this.shouldProcessNote(note)) return; + + NoteHelpers.getConversationFromEvent(note.id, this.user).then(conversation => { + this.connection.send(this.chName, "conversation", conversation); + }); + + switch (data.type) { + case "updated": + NoteConverter.encodeEvent(note, this.user).then(encoded => { + this.connection.send(this.chName, "status.update", encoded); + }); + break; + case "deleted": + this.connection.send(this.chName, "delete", note.id); + break; + default: + break; + } + } + + private shouldProcessNote(note: Note | Packed<"Note">): boolean { + if (note.visibility !== "specified") return false; + if (note.userId !== this.user.id && !note.visibleUserIds?.includes(this.user.id)) return false; + return true; + } + + public dispose() { + this.subscriber.off("notesStream", this.onNote); + this.subscriber.off("noteUpdatesStream", this.onNoteEvent); + } +} diff --git a/packages/backend/src/server/api/mastodon/streaming/channels/list.ts b/packages/backend/src/server/api/mastodon/streaming/channels/list.ts new file mode 100644 index 000000000..0cb5968a1 --- /dev/null +++ b/packages/backend/src/server/api/mastodon/streaming/channels/list.ts @@ -0,0 +1,89 @@ +import { MastodonStream } from "../channel.js"; +import { getWordHardMute } from "@/misc/check-word-mute.js"; +import { Note } from "@/models/entities/note.js"; +import { NoteConverter } from "@/server/api/mastodon/converters/note.js"; +import { StreamMessages } from "@/server/api/stream/types.js"; +import { Packed } from "@/misc/schema.js"; +import { User } from "@/models/entities/user.js"; +import { UserListJoinings } from "@/models/index.js"; + +export class MastodonStreamList extends MastodonStream { + public static shouldShare = false; + public static requireCredential = true; + public static requiredScopes = ['read:statuses']; + private readonly listId: string; + private listUsers: User["id"][] = []; + private listUsersClock: NodeJS.Timer; + + constructor(connection: MastodonStream["connection"], name: string, list: string) { + super(connection, name); + this.listId = list; + this.onNote = this.onNote.bind(this); + this.onNoteEvent = this.onNoteEvent.bind(this); + this.updateListUsers = this.updateListUsers.bind(this); + } + + override get user() { + return this.connection.user!; + } + + public async init() { + if (!this.listId) return; + this.subscriber.on("notesStream", this.onNote); + this.subscriber.on("noteUpdatesStream", this.onNoteEvent); + + this.updateListUsers(); + this.listUsersClock = setInterval(this.updateListUsers, 5000); + } + + private async updateListUsers() { + const users = await UserListJoinings.find({ + where: { + userListId: this.listId, + }, + select: ["userId"], + }); + + this.listUsers = users.map((x) => x.userId); + } + + private async onNote(note: Note) { + if (!await this.shouldProcessNote(note)) return; + + const encoded = await NoteConverter.encodeEvent(note, this.user) + this.connection.send(this.chName, "update", encoded); + } + + private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) { + const note = data.body; + if (!await this.shouldProcessNote(note)) return; + + switch (data.type) { + case "updated": + const encoded = await NoteConverter.encodeEvent(note, this.user); + this.connection.send(this.chName, "status.update", encoded); + break; + case "deleted": + this.connection.send(this.chName, "delete", note.id); + break; + default: + break; + } + } + + private async shouldProcessNote(note: Note | Packed<"Note">): Promise { + if (!this.listUsers.includes(note.userId)) return false; + if (note.channelId) return false; + if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false; + if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false; + if (note.visibility === "specified") return !!note.visibleUserIds?.includes(this.user.id); + if (note.visibility === "followers") return this.following.has(note.userId); + return true; + } + + public dispose() { + this.subscriber.off("notesStream", this.onNote); + this.subscriber.off("noteUpdatesStream", this.onNoteEvent); + clearInterval(this.listUsersClock); + } +} diff --git a/packages/backend/src/server/api/mastodon/streaming/channels/public.ts b/packages/backend/src/server/api/mastodon/streaming/channels/public.ts new file mode 100644 index 000000000..57f0ad8e1 --- /dev/null +++ b/packages/backend/src/server/api/mastodon/streaming/channels/public.ts @@ -0,0 +1,80 @@ +import { MastodonStream } from "../channel.js"; +import { getWordHardMute } from "@/misc/check-word-mute.js"; +import { isUserRelated } from "@/misc/is-user-related.js"; +import { isInstanceMuted } from "@/misc/is-instance-muted.js"; +import { Note } from "@/models/entities/note.js"; +import { NoteConverter } from "@/server/api/mastodon/converters/note.js"; +import { StreamMessages } from "@/server/api/stream/types.js"; +import { fetchMeta } from "@/misc/fetch-meta.js"; + +export class MastodonStreamPublic extends MastodonStream { + public static shouldShare = true; + public static requireCredential = false; + private readonly mediaOnly: boolean; + private readonly localOnly: boolean; + private readonly remoteOnly: boolean; + + constructor(connection: MastodonStream["connection"], name: string) { + super(connection, name); + this.mediaOnly = name.endsWith(":media"); + this.localOnly = name.startsWith("public:local"); + this.remoteOnly = name.startsWith("public:remote"); + this.onNote = this.onNote.bind(this); + this.onNoteEvent = this.onNoteEvent.bind(this); + } + + public async init() { + const meta = await fetchMeta(); + if (meta.disableGlobalTimeline) { + if (this.user == null || !(this.user.isAdmin || this.user.isModerator)) + return; + } + + this.subscriber.on("notesStream", this.onNote); + this.subscriber.on("noteUpdatesStream", this.onNoteEvent); + } + + private async onNote(note: Note) { + if (!await this.shouldProcessNote(note)) return; + + const encoded = await NoteConverter.encodeEvent(note, this.user) + this.connection.send(this.chName, "update", encoded); + } + + private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) { + const note = data.body; + if (!await this.shouldProcessNote(note)) return; + + switch (data.type) { + case "updated": + const encoded = await NoteConverter.encodeEvent(note, this.user); + this.connection.send(this.chName, "status.update", encoded); + break; + case "deleted": + this.connection.send(this.chName, "delete", note.id); + break; + default: + break; + } + } + + private async shouldProcessNote(note: Note): Promise { + if (note.visibility !== "public") return false; + if (note.channelId != null) return false; + if (this.mediaOnly && note.fileIds.length < 1) return false; + if (this.localOnly && note.userHost !== null) return false; + if (this.remoteOnly && note.userHost === null) return false; + if (isInstanceMuted(note, new Set(this.userProfile?.mutedInstances ?? []))) return false; + if (isUserRelated(note, this.muting)) return false; + if (isUserRelated(note, this.blocking)) return false; + if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false; + if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false; + + return true; + } + + public dispose() { + this.subscriber.off("notesStream", this.onNote); + this.subscriber.off("noteUpdatesStream", this.onNoteEvent); + } +} diff --git a/packages/backend/src/server/api/mastodon/streaming/channels/tag.ts b/packages/backend/src/server/api/mastodon/streaming/channels/tag.ts new file mode 100644 index 000000000..d145cde85 --- /dev/null +++ b/packages/backend/src/server/api/mastodon/streaming/channels/tag.ts @@ -0,0 +1,75 @@ +import { MastodonStream } from "../channel.js"; +import { getWordHardMute } from "@/misc/check-word-mute.js"; +import { isUserRelated } from "@/misc/is-user-related.js"; +import { isInstanceMuted } from "@/misc/is-instance-muted.js"; +import { Note } from "@/models/entities/note.js"; +import { NoteConverter } from "@/server/api/mastodon/converters/note.js"; +import { StreamMessages } from "@/server/api/stream/types.js"; + +export class MastodonStreamTag extends MastodonStream { + public static shouldShare = false; + public static requireCredential = false; + private readonly localOnly: boolean; + private readonly tag: string; + + constructor(connection: MastodonStream["connection"], name: string, tag: string) { + super(connection, name); + this.tag = tag; + this.localOnly = name.startsWith("hashtag:local"); + this.onNote = this.onNote.bind(this); + this.onNoteEvent = this.onNoteEvent.bind(this); + } + + override get user() { + return this.connection.user!; + } + + public async init() { + if (!this.tag) return; + this.subscriber.on("notesStream", this.onNote); + this.subscriber.on("noteUpdatesStream", this.onNoteEvent); + } + + private async onNote(note: Note) { + if (!await this.shouldProcessNote(note)) return; + + const encoded = await NoteConverter.encodeEvent(note, this.user) + this.connection.send(this.chName, "update", encoded); + } + + private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) { + const note = data.body; + if (!await this.shouldProcessNote(note)) return; + + switch (data.type) { + case "updated": + const encoded = await NoteConverter.encodeEvent(note, this.user); + this.connection.send(this.chName, "status.update", encoded); + break; + case "deleted": + this.connection.send(this.chName, "delete", note.id); + break; + default: + break; + } + } + + private async shouldProcessNote(note: Note): Promise { + if (note.visibility !== "public") return false; + if (note.channelId != null) return false; + if (this.localOnly && note.userHost !== null) return false; + if (!note.tags?.includes(this.tag)) return false; + if (isInstanceMuted(note, new Set(this.userProfile?.mutedInstances ?? []))) return false; + if (isUserRelated(note, this.muting)) return false; + if (isUserRelated(note, this.blocking)) return false; + if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false; + if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false; + + return true; + } + + public dispose() { + this.subscriber.off("notesStream", this.onNote); + this.subscriber.off("noteUpdatesStream", this.onNoteEvent); + } +} diff --git a/packages/backend/src/server/api/mastodon/streaming/channels/user.ts b/packages/backend/src/server/api/mastodon/streaming/channels/user.ts new file mode 100644 index 000000000..3f92df5e5 --- /dev/null +++ b/packages/backend/src/server/api/mastodon/streaming/channels/user.ts @@ -0,0 +1,112 @@ +import { MastodonStream } from "../channel.js"; +import { getWordHardMute } from "@/misc/check-word-mute.js"; +import { isUserRelated } from "@/misc/is-user-related.js"; +import { isInstanceMuted } from "@/misc/is-instance-muted.js"; +import { Note } from "@/models/entities/note.js"; +import { NoteConverter } from "@/server/api/mastodon/converters/note.js"; +import { StreamMessages } from "@/server/api/stream/types.js"; +import { NotificationConverter } from "@/server/api/mastodon/converters/notification.js"; +import { AnnouncementConverter } from "@/server/api/mastodon/converters/announcement.js"; + +export class MastodonStreamUser extends MastodonStream { + public static shouldShare = true; + public static requireCredential = true; + public static requiredScopes = ['read:statuses', 'read:notifications']; + private readonly notificationsOnly: boolean; + + constructor(connection: MastodonStream["connection"], name: string) { + super(connection, name); + this.notificationsOnly = name === "user:notification"; + this.onNote = this.onNote.bind(this); + this.onNoteEvent = this.onNoteEvent.bind(this); + this.onUserEvent = this.onUserEvent.bind(this); + this.onBroadcastEvent = this.onBroadcastEvent.bind(this); + } + + override get user() { + return this.connection.user!; + } + + public async init() { + this.subscriber.on(`mainStream:${this.user.id}`, this.onUserEvent); + if (!this.notificationsOnly) { + this.subscriber.on("notesStream", this.onNote); + this.subscriber.on("noteUpdatesStream", this.onNoteEvent); + this.subscriber.on("broadcast", this.onBroadcastEvent); + } + } + + private async onNote(note: Note) { + if (!await this.shouldProcessNote(note)) return; + + const encoded = await NoteConverter.encodeEvent(note, this.user) + this.connection.send(this.chName, "update", encoded); + } + + private async onNoteEvent(data: StreamMessages["noteUpdates"]["payload"]) { + const note = data.body; + if (!await this.shouldProcessNote(note)) return; + + switch (data.type) { + case "updated": + const encoded = await NoteConverter.encodeEvent(note, this.user); + this.connection.send(this.chName, "status.update", encoded); + break; + case "deleted": + this.connection.send(this.chName, "delete", note.id); + break; + default: + break; + } + } + + private async onUserEvent(data: StreamMessages["main"]["payload"]) { + switch (data.type) { + case "notification": + const encoded = await NotificationConverter.encodeEvent(data.body.id, this.user); + if (encoded) this.connection.send(this.chName, "notification", encoded); + break; + default: + break; + } + } + + private async onBroadcastEvent(data: StreamMessages["broadcast"]["payload"]) { + switch (data.type) { + case "announcementAdded": + // This shouldn't be necessary but is for some reason + data.body.createdAt = new Date(data.body.createdAt); + this.connection.send(this.chName, "announcement", AnnouncementConverter.encode(data.body, false)); + break; + case "announcementDeleted": + this.connection.send(this.chName, "announcement.delete", data.body); + break; + default: + break; + } + } + + + private async shouldProcessNote(note: Note): Promise { + if (note.visibility === "hidden") return false; + if (note.visibility === "specified") return note.userId === this.user.id || note.visibleUserIds?.includes(this.user.id); + if (note.channelId) return false; + if (this.user!.id !== note.userId && !this.following.has(note.userId)) return false; + if (isInstanceMuted(note, new Set(this.userProfile?.mutedInstances ?? []))) return false; + if (isUserRelated(note, this.muting)) return false; + if (isUserRelated(note, this.blocking)) return false; + if (note.renote && !note.text && this.renoteMuting.has(note.userId)) return false; + if (this.userProfile && (await getWordHardMute(note, this.user, this.userProfile.mutedWords))) return false; + + return true; + } + + public dispose() { + this.subscriber.off(`mainStream:${this.user.id}`, this.onUserEvent); + if (!this.notificationsOnly) { + this.subscriber.off("notesStream", this.onNote); + this.subscriber.off("noteUpdatesStream", this.onNoteEvent); + this.subscriber.off("broadcast", this.onBroadcastEvent); + } + } +} diff --git a/packages/backend/src/server/api/mastodon/streaming/index.ts b/packages/backend/src/server/api/mastodon/streaming/index.ts new file mode 100644 index 000000000..60c2eb4c8 --- /dev/null +++ b/packages/backend/src/server/api/mastodon/streaming/index.ts @@ -0,0 +1,261 @@ +import type { EventEmitter } from "events"; +import type * as websocket from "websocket"; +import type { ILocalUser, User } from "@/models/entities/user.js"; +import type { MastodonStream } from "./channel.js"; +import { Blockings, Followings, Mutings, RenoteMutings, UserProfiles, } from "@/models/index.js"; +import type { AccessToken } from "@/models/entities/access-token.js"; +import type { UserProfile } from "@/models/entities/user-profile.js"; +import { StreamEventEmitter, StreamMessages } from "@/server/api/stream/types.js"; +import { apiLogger } from "@/server/api/logger.js"; +import { MastodonStreamUser } from "@/server/api/mastodon/streaming/channels/user.js"; +import { MastodonStreamDirect } from "@/server/api/mastodon/streaming/channels/direct.js"; +import { MastodonStreamPublic } from "@/server/api/mastodon/streaming/channels/public.js"; +import { MastodonStreamList } from "@/server/api/mastodon/streaming/channels/list.js"; +import { ParsedUrlQuery } from "querystring"; +import { toSingleLast } from "@/prelude/array.js"; +import { MastodonStreamTag } from "@/server/api/mastodon/streaming/channels/tag.js"; +import { AuthConverter } from "@/server/api/mastodon/converters/auth.js"; + +const logger = apiLogger.createSubLogger("streaming").createSubLogger("mastodon"); +const channels: Record = { + "user": MastodonStreamUser, + "user:notification": MastodonStreamUser, + "direct": MastodonStreamDirect, + "list": MastodonStreamList, + "public": MastodonStreamPublic, + "public:media": MastodonStreamPublic, + "public:local": MastodonStreamPublic, + "public:local:media": MastodonStreamPublic, + "public:remote": MastodonStreamPublic, + "public:remote:media": MastodonStreamPublic, + "hashtag": MastodonStreamTag, + "hashtag:local": MastodonStreamTag, +} + +export class MastodonStreamingConnection { + public user?: ILocalUser; + public userProfile?: UserProfile | null; + public following: Set = new Set(); + public muting: Set = new Set(); + public renoteMuting: Set = new Set(); + public blocking: Set = new Set(); + public token?: AccessToken; + private wsConnection: websocket.connection; + private channels: MastodonStream[] = []; + public subscriber: StreamEventEmitter; + + constructor( + wsConnection: websocket.connection, + subscriber: EventEmitter, + user: ILocalUser | null | undefined, + token: AccessToken | null | undefined, + query: ParsedUrlQuery, + ) { + const channel = toSingleLast(query.stream); + logger.debug(`New connection on channel: ${channel}`); + this.wsConnection = wsConnection; + this.subscriber = subscriber; + if (user) this.user = user; + if (token) this.token = token; + + this.onMessage = this.onMessage.bind(this); + this.onUserEvent = this.onUserEvent.bind(this); + + this.wsConnection.on("message", this.onMessage); + + if (this.user) { + this.updateFollowing(); + this.updateMuting(); + this.updateRenoteMuting(); + this.updateBlocking(); + this.updateUserProfile(); + + this.subscriber.on(`user:${this.user.id}`, this.onUserEvent); + } + + if (channel) { + const list = toSingleLast(query.list); + const tag = toSingleLast(query.tag); + this.onMessage({ + type: "utf8", + utf8Data: JSON.stringify({ stream: channel, type: "subscribe", list, tag }), + }); + } + } + + private onUserEvent(data: StreamMessages["user"]["payload"]) { + switch (data.type) { + case "follow": + this.following.add(data.body.id); + break; + case "unfollow": + this.following.delete(data.body.id); + break; + case "mute": + this.muting.add(data.body.id); + break; + case "unmute": + this.muting.delete(data.body.id); + break; + + // TODO: renote mute events + // TODO: block events + + case "updateUserProfile": + this.userProfile = data.body; + break; + case "terminate": + this.closeConnection(); + break; + default: + break; + } + } + + private async onMessage(data: websocket.Message) { + if (data.type !== "utf8") return; + if (data.utf8Data == null) return; + + let message: Record; + + try { + message = JSON.parse(data.utf8Data); + } catch (e) { + logger.error("Failed to parse json data, ignoring"); + return; + } + + const { stream, type, list, tag } = message; + + if (!message.stream || !message.type) { + logger.error("Invalid message received, ignoring"); + return; + } + + if (list ?? tag) + logger.info(`${type}: ${stream} ${list ?? tag}`); + else + logger.info(`${type}: ${stream}`); + + switch (type) { + case "subscribe": + this.connectChannel(stream, list, tag); + break; + case "unsubscribe": + this.disconnectChannel(stream); + break; + } + } + + public send(stream: string, event: string, payload: any) { + const json = JSON.stringify({ + stream: [stream], + event: event, + payload: typeof payload === "string" ? payload : JSON.stringify(payload), + }) + + this.wsConnection.send(json); + } + + public connectChannel(channel: string, list?: string, tag?: string) { + if (channels[channel].requireCredential) { + if (this.user == null) { + logger.info(`Refusing connection to channel ${channel} without authentication, terminating connection`); + this.closeConnection(); + return; + } else if (!AuthConverter.decode(channels[channel].requiredScopes).every(p => this.token?.permission.includes(p))) { + logger.info(`Refusing connection to channel ${channel} without required OAuth scopes, terminating connection`); + this.closeConnection(); + return; + } + } + + if ( + channels[channel].shouldShare && + this.channels.some((c) => c.chName === channel) + ) { + return; + } + + let ch: MastodonStream; + + if (channel === "list") { + ch = new channels[channel](this, channel, list); + } else if (channel.startsWith("hashtag")) + ch = new channels[channel](this, channel, tag); + else + ch = new channels[channel](this, channel); + this.channels.push(ch); + ch.init(null); + } + + public disconnectChannel(channelName: string) { + const channel = this.channels.find((c) => c.chName === channelName); + + if (channel) { + if (channel.dispose) channel.dispose(); + this.channels = this.channels.filter((c) => c.chName !== channelName); + } + } + + private async updateFollowing() { + const followings = await Followings.find({ + where: { + followerId: this.user!.id, + }, + select: ["followeeId"], + }); + + this.following = new Set(followings.map((x) => x.followeeId)); + } + + private async updateMuting() { + const mutings = await Mutings.find({ + where: { + muterId: this.user!.id, + }, + select: ["muteeId"], + }); + + this.muting = new Set(mutings.map((x) => x.muteeId)); + } + + private async updateRenoteMuting() { + const renoteMutings = await RenoteMutings.find({ + where: { + muterId: this.user!.id, + }, + select: ["muteeId"], + }); + + this.renoteMuting = new Set(renoteMutings.map((x) => x.muteeId)); + } + + private async updateBlocking() { + const blockings = await Blockings.find({ + where: { + blockeeId: this.user!.id, + }, + select: ["blockerId"], + }); + + this.blocking = new Set(blockings.map((x) => x.blockerId)); + } + + private async updateUserProfile() { + this.userProfile = await UserProfiles.findOneBy({ + userId: this.user!.id, + }); + } + + public closeConnection() { + this.wsConnection.close(); + this.dispose(); + } + + public dispose() { + for (const c of this.channels.filter((c) => c.dispose)) { + if (c.dispose) c.dispose(); + } + } +} diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index 763c9c461..465c3f045 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -56,7 +56,6 @@ export default class Connection { accessToken: string, prepareStream: string | undefined, ) { - console.log("constructor", prepareStream); this.wsConnection = wsConnection; this.subscriber = subscriber; if (user) this.user = user; @@ -85,7 +84,6 @@ export default class Connection { this.subscriber.on(`user:${this.user.id}`, this.onUserEvent); } - console.log("prepare", prepareStream); if (prepareStream) { this.onWsConnectionMessage({ type: "utf8", diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 601c56f41..569d1ae90 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -15,6 +15,7 @@ import type { Signin } from "@/models/entities/signin.js"; import type { Page } from "@/models/entities/page.js"; import type { Packed } from "@/misc/schema.js"; import type { Webhook } from "@/models/entities/webhook"; +import { Announcement } from "@/models/entities/announcement.js"; //#region Stream type-body definitions export interface InternalStreamTypes { @@ -59,6 +60,8 @@ export interface BroadcastTypes { emojiAdded: { emoji: Packed<"Emoji">; }; + announcementAdded: Announcement; + announcementDeleted: Announcement["id"]; } export interface UserStreamTypes { @@ -162,6 +165,11 @@ type NoteStreamEventTypes = { }; }; +export interface NoteUpdatesStreamTypes { + deleted: Note; + updated: Note; +} + export interface ChannelStreamTypes { typing: User["id"]; } @@ -271,6 +279,10 @@ export type StreamMessages = { name: "notesStream"; payload: Note; }; + noteUpdates: { + name: "noteUpdatesStream"; + payload: EventUnionFromDictionary; + }; }; // API event definitions diff --git a/packages/backend/src/server/api/streaming.ts b/packages/backend/src/server/api/streaming.ts index 14e07b748..c167c6d63 100644 --- a/packages/backend/src/server/api/streaming.ts +++ b/packages/backend/src/server/api/streaming.ts @@ -7,6 +7,10 @@ import { subscriber as redisClient } from "@/db/redis.js"; import { Users } from "@/models/index.js"; import MainStreamConnection from "./stream/index.js"; import authenticate from "./authenticate.js"; +import { apiLogger } from "@/server/api/logger.js"; +import { MastodonStreamingConnection } from "@/server/api/mastodon/streaming/index.js"; + +export const streamingLogger = apiLogger.createSubLogger("streaming"); export const initializeStreamingServer = (server: http.Server) => { // Init websocket server @@ -48,17 +52,15 @@ export const initializeStreamingServer = (server: http.Server) => { redisClient.on("message", onRedisMessage); const host = `https://${request.host}`; const prepareStream = q.stream?.toString(); - console.log("start", q); - const main = new MainStreamConnection( - connection, - ev, - user, - app, - host, - accessToken, - prepareStream, - ); + const isMastodon = request.resourceURL.pathname?.endsWith('/api/v1/streaming'); + if (isMastodon && !request.resourceURL.pathname?.startsWith('/mastodon')) { + streamingLogger.warn(`Received connect from mastodon on incorrect path: ${request.resourceURL.pathname}`); + } + + const main = isMastodon + ? new MastodonStreamingConnection(connection, ev, user, app, q) + : new MainStreamConnection(connection, ev, user, app, host, accessToken, prepareStream); const intervalId = user ? setInterval(() => { diff --git a/packages/backend/src/services/note/delete.ts b/packages/backend/src/services/note/delete.ts index 9b70a5c29..36eb972ef 100644 --- a/packages/backend/src/services/note/delete.ts +++ b/packages/backend/src/services/note/delete.ts @@ -1,5 +1,5 @@ import { Brackets, In } from "typeorm"; -import { publishNoteStream } from "@/services/stream.js"; +import { publishNoteStream, publishNoteUpdatesStream } from "@/services/stream.js"; import renderDelete from "@/remote/activitypub/renderer/delete.js"; import renderAnnounce from "@/remote/activitypub/renderer/announce.js"; import renderUndo from "@/remote/activitypub/renderer/undo.js"; @@ -53,6 +53,8 @@ export default async function ( deletedAt: deletedAt, }); + publishNoteUpdatesStream("deleted", note); + //#region ローカルの投稿なら削除アクティビティを配送 if (Users.isLocalUser(user) && !note.localOnly) { let renote: Note | null = null; diff --git a/packages/backend/src/services/note/edit.ts b/packages/backend/src/services/note/edit.ts index 66ed82d5f..1cf02496f 100644 --- a/packages/backend/src/services/note/edit.ts +++ b/packages/backend/src/services/note/edit.ts @@ -1,6 +1,6 @@ import * as mfm from "mfm-js"; import { - publishNoteStream, + publishNoteStream, publishNoteUpdatesStream, } from "@/services/stream.js"; import DeliverManager from "@/remote/activitypub/deliver-manager.js"; import renderNote from "@/remote/activitypub/renderer/note.js"; @@ -172,6 +172,8 @@ export default async function ( updatedAt: update.updatedAt, }); + publishNoteUpdatesStream("updated", note); + (async () => { const noteActivity = await renderNote(note, false); noteActivity.updated = note.updatedAt.toISOString(); diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts index f3846feaf..8aef36788 100644 --- a/packages/backend/src/services/stream.ts +++ b/packages/backend/src/services/stream.ts @@ -20,7 +20,7 @@ import type { MessagingStreamTypes, NoteStreamTypes, UserListStreamTypes, - UserStreamTypes, + UserStreamTypes, NoteUpdatesStreamTypes, } from "@/server/api/stream/types.js"; class Publisher { @@ -104,10 +104,19 @@ class Publisher { type: K, value?: NoteStreamTypes[K], ): void => { - this.publish(`noteStream:${noteId}`, type, { + const object = { id: noteId, body: value, - }); + }; + + this.publish(`noteStream:${noteId}`, type, object); + }; + + public publishNoteUpdatesStream = ( + type: K, + value: NoteUpdatesStreamTypes[K], + ): void => { + this.publish('noteUpdatesStream', type, value); }; public publishChannelStream = ( @@ -215,6 +224,7 @@ export const publishMainStream = publisher.publishMainStream; export const publishDriveStream = publisher.publishDriveStream; export const publishNoteStream = publisher.publishNoteStream; export const publishNotesStream = publisher.publishNotesStream; +export const publishNoteUpdatesStream = publisher.publishNoteUpdatesStream; export const publishChannelStream = publisher.publishChannelStream; export const publishUserListStream = publisher.publishUserListStream; export const publishAntennaStream = publisher.publishAntennaStream;