This commit is contained in:
syuilo 2024-09-20 13:08:04 +09:00
parent e896b08722
commit 480ec9803a
13 changed files with 93 additions and 20 deletions

View File

@ -103,6 +103,14 @@ redis:
# #prefix: example-prefix # #prefix: example-prefix
# #db: 1 # #db: 1
#redisForReactions:
# host: redis
# port: 6379
# #family: 0 # 0=Both, 4=IPv4, 6=IPv6
# #pass: example-pass
# #prefix: example-prefix
# #db: 1
# ┌───────────────────────────┐ # ┌───────────────────────────┐
#───┘ MeiliSearch configuration └───────────────────────────── #───┘ MeiliSearch configuration └─────────────────────────────

View File

@ -106,6 +106,14 @@ redis:
# #prefix: example-prefix # #prefix: example-prefix
# #db: 1 # #db: 1
#redisForReactions:
# host: redis
# port: 6379
# #family: 0 # 0=Both, 4=IPv4, 6=IPv6
# #pass: example-pass
# #prefix: example-prefix
# #db: 1
# ┌───────────────────────────┐ # ┌───────────────────────────┐
#───┘ MeiliSearch configuration └───────────────────────────── #───┘ MeiliSearch configuration └─────────────────────────────

View File

@ -172,6 +172,16 @@ redis:
# # You can specify more ioredis options... # # You can specify more ioredis options...
# #username: example-username # #username: example-username
#redisForReactions:
# host: localhost
# port: 6379
# #family: 0 # 0=Both, 4=IPv4, 6=IPv6
# #pass: example-pass
# #prefix: example-prefix
# #db: 1
# # You can specify more ioredis options...
# #username: example-username
# ┌───────────────────────────┐ # ┌───────────────────────────┐
#───┘ MeiliSearch configuration └───────────────────────────── #───┘ MeiliSearch configuration └─────────────────────────────

View File

@ -103,6 +103,14 @@ redis:
# #prefix: example-prefix # #prefix: example-prefix
# #db: 1 # #db: 1
#redisForReactions:
# host: redis
# port: 6379
# #family: 0 # 0=Both, 4=IPv4, 6=IPv6
# #pass: example-pass
# #prefix: example-prefix
# #db: 1
# ┌───────────────────────────┐ # ┌───────────────────────────┐
#───┘ MeiliSearch configuration └───────────────────────────── #───┘ MeiliSearch configuration └─────────────────────────────

View File

@ -124,6 +124,14 @@ redis:
# #prefix: example-prefix # #prefix: example-prefix
# #db: 1 # #db: 1
#redisForReactions:
# host: redis
# port: 6379
# #family: 0 # 0=Both, 4=IPv4, 6=IPv6
# #pass: example-pass
# #prefix: example-prefix
# #db: 1
# ┌───────────────────────────┐ # ┌───────────────────────────┐
#───┘ MeiliSearch configuration └───────────────────────────── #───┘ MeiliSearch configuration └─────────────────────────────

View File

@ -78,11 +78,19 @@ const $redisForTimelines: Provider = {
inject: [DI.config], inject: [DI.config],
}; };
const $redisForReactions: Provider = {
provide: DI.redisForReactions,
useFactory: (config: Config) => {
return new Redis.Redis(config.redisForReactions);
},
inject: [DI.config],
};
@Global() @Global()
@Module({ @Module({
imports: [RepositoryModule], imports: [RepositoryModule],
providers: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines], providers: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions],
exports: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, RepositoryModule], exports: [$config, $db, $meilisearch, $redis, $redisForPub, $redisForSub, $redisForTimelines, $redisForReactions, RepositoryModule],
}) })
export class GlobalModule implements OnApplicationShutdown { export class GlobalModule implements OnApplicationShutdown {
constructor( constructor(
@ -91,6 +99,7 @@ export class GlobalModule implements OnApplicationShutdown {
@Inject(DI.redisForPub) private redisForPub: Redis.Redis, @Inject(DI.redisForPub) private redisForPub: Redis.Redis,
@Inject(DI.redisForSub) private redisForSub: Redis.Redis, @Inject(DI.redisForSub) private redisForSub: Redis.Redis,
@Inject(DI.redisForTimelines) private redisForTimelines: Redis.Redis, @Inject(DI.redisForTimelines) private redisForTimelines: Redis.Redis,
@Inject(DI.redisForReactions) private redisForReactions: Redis.Redis,
) { } ) { }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
@ -103,6 +112,7 @@ export class GlobalModule implements OnApplicationShutdown {
this.redisForPub.disconnect(), this.redisForPub.disconnect(),
this.redisForSub.disconnect(), this.redisForSub.disconnect(),
this.redisForTimelines.disconnect(), this.redisForTimelines.disconnect(),
this.redisForReactions.disconnect(),
]); ]);
} }

View File

@ -49,6 +49,7 @@ type Source = {
redisForPubsub?: RedisOptionsSource; redisForPubsub?: RedisOptionsSource;
redisForJobQueue?: RedisOptionsSource; redisForJobQueue?: RedisOptionsSource;
redisForTimelines?: RedisOptionsSource; redisForTimelines?: RedisOptionsSource;
redisForReactions?: RedisOptionsSource;
meilisearch?: { meilisearch?: {
host: string; host: string;
port: string; port: string;
@ -171,6 +172,7 @@ export type Config = {
redisForPubsub: RedisOptions & RedisOptionsSource; redisForPubsub: RedisOptions & RedisOptionsSource;
redisForJobQueue: RedisOptions & RedisOptionsSource; redisForJobQueue: RedisOptions & RedisOptionsSource;
redisForTimelines: RedisOptions & RedisOptionsSource; redisForTimelines: RedisOptions & RedisOptionsSource;
redisForReactions: RedisOptions & RedisOptionsSource;
sentryForBackend: { options: Partial<Sentry.NodeOptions>; enableNodeProfiling: boolean; } | undefined; sentryForBackend: { options: Partial<Sentry.NodeOptions>; enableNodeProfiling: boolean; } | undefined;
sentryForFrontend: { options: Partial<Sentry.NodeOptions> } | undefined; sentryForFrontend: { options: Partial<Sentry.NodeOptions> } | undefined;
perChannelMaxNoteCacheCount: number; perChannelMaxNoteCacheCount: number;
@ -251,6 +253,7 @@ export function loadConfig(): Config {
redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis, redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis,
redisForJobQueue: config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis, redisForJobQueue: config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis,
redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis, redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis,
redisForReactions: config.redisForReactions ? convertRedisOptions(config.redisForReactions, host) : redis,
sentryForBackend: config.sentryForBackend, sentryForBackend: config.sentryForBackend,
sentryForFrontend: config.sentryForFrontend, sentryForFrontend: config.sentryForFrontend,
id: config.id, id: config.id,

View File

@ -4,7 +4,6 @@
*/ */
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import * as Redis from 'ioredis';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
import type { EmojisRepository, NoteReactionsRepository, UsersRepository, NotesRepository } from '@/models/_.js'; import type { EmojisRepository, NoteReactionsRepository, UsersRepository, NotesRepository } from '@/models/_.js';
import { IdentifiableError } from '@/misc/identifiable-error.js'; import { IdentifiableError } from '@/misc/identifiable-error.js';
@ -72,9 +71,6 @@ const decodeCustomEmojiRegexp = /^:([\w+-]+)(?:@([\w.-]+))?:$/;
@Injectable() @Injectable()
export class ReactionService { export class ReactionService {
constructor( constructor(
@Inject(DI.redis)
private redisClient: Redis.Redis, // TODO: 専用のRedisインスタンスにする
@Inject(DI.usersRepository) @Inject(DI.usersRepository)
private usersRepository: UsersRepository, private usersRepository: UsersRepository,
@ -198,10 +194,8 @@ export class ReactionService {
} }
} }
const rbt = true;
// Increment reactions count // Increment reactions count
if (rbt) { if (meta.enableReactionsBuffering) {
this.reactionsBufferingService.create(note, reaction); this.reactionsBufferingService.create(note, reaction);
// for debugging // for debugging

View File

@ -19,8 +19,8 @@ export class ReactionsBufferingService {
@Inject(DI.config) @Inject(DI.config)
private config: Config, private config: Config,
@Inject(DI.redis) @Inject(DI.redisForReactions)
private redisClient: Redis.Redis, // TODO: 専用のRedisインスタンスにする private redisForReactions: Redis.Redis, // TODO: 専用のRedisインスタンスにする
@Inject(DI.notesRepository) @Inject(DI.notesRepository)
private notesRepository: NotesRepository, private notesRepository: NotesRepository,
@ -29,17 +29,17 @@ export class ReactionsBufferingService {
@bindThis @bindThis
public async create(note: MiNote, reaction: string) { public async create(note: MiNote, reaction: string) {
this.redisClient.hincrby(`${REDIS_PREFIX}:${note.id}`, reaction, 1); this.redisForReactions.hincrby(`${REDIS_PREFIX}:${note.id}`, reaction, 1);
} }
@bindThis @bindThis
public async delete(note: MiNote, reaction: string) { public async delete(note: MiNote, reaction: string) {
this.redisClient.hincrby(`${REDIS_PREFIX}:${note.id}`, reaction, -1); this.redisForReactions.hincrby(`${REDIS_PREFIX}:${note.id}`, reaction, -1);
} }
@bindThis @bindThis
public async get(noteId: MiNote['id']): Promise<Record<string, number>> { public async get(noteId: MiNote['id']): Promise<Record<string, number>> {
const result = await this.redisClient.hgetall(`${REDIS_PREFIX}:${noteId}`); const result = await this.redisForReactions.hgetall(`${REDIS_PREFIX}:${noteId}`);
const delta = {} as Record<string, number>; const delta = {} as Record<string, number>;
for (const [name, count] of Object.entries(result)) { for (const [name, count] of Object.entries(result)) {
delta[name] = parseInt(count); delta[name] = parseInt(count);
@ -51,7 +51,7 @@ export class ReactionsBufferingService {
public async getMany(noteIds: MiNote['id'][]): Promise<Map<MiNote['id'], Record<string, number>>> { public async getMany(noteIds: MiNote['id'][]): Promise<Map<MiNote['id'], Record<string, number>>> {
const deltas = new Map<MiNote['id'], Record<string, number>>(); const deltas = new Map<MiNote['id'], Record<string, number>>();
const pipeline = this.redisClient.pipeline(); const pipeline = this.redisForReactions.pipeline();
for (const noteId of noteIds) { for (const noteId of noteIds) {
pipeline.hgetall(`${REDIS_PREFIX}:${noteId}`); pipeline.hgetall(`${REDIS_PREFIX}:${noteId}`);
} }
@ -77,8 +77,13 @@ export class ReactionsBufferingService {
let cursor = '0'; let cursor = '0';
do { do {
// https://github.com/redis/ioredis#transparent-key-prefixing // https://github.com/redis/ioredis#transparent-key-prefixing
const result = await this.redisClient.scan(cursor, 'MATCH', `${this.config.redis.prefix}:${REDIS_PREFIX}:*`, 'COUNT', '1000'); const result = await this.redisForReactions.scan(
console.log(result); cursor,
'MATCH',
`${this.config.redis.prefix}:${REDIS_PREFIX}:*`,
'COUNT',
'1000');
cursor = result[0]; cursor = result[0];
bufferedNoteIds.push(...result[1].map(x => x.replace(`${this.config.redis.prefix}:${REDIS_PREFIX}:`, ''))); bufferedNoteIds.push(...result[1].map(x => x.replace(`${this.config.redis.prefix}:${REDIS_PREFIX}:`, '')));
} while (cursor !== '0'); } while (cursor !== '0');
@ -86,7 +91,7 @@ export class ReactionsBufferingService {
const deltas = await this.getMany(bufferedNoteIds); const deltas = await this.getMany(bufferedNoteIds);
// clear // clear
const pipeline = this.redisClient.pipeline(); const pipeline = this.redisForReactions.pipeline();
for (const noteId of bufferedNoteIds) { for (const noteId of bufferedNoteIds) {
pipeline.del(`${REDIS_PREFIX}:${noteId}`); pipeline.del(`${REDIS_PREFIX}:${noteId}`);
} }

View File

@ -16,6 +16,7 @@ import { bindThis } from '@/decorators.js';
import { DebounceLoader } from '@/misc/loader.js'; import { DebounceLoader } from '@/misc/loader.js';
import { IdService } from '@/core/IdService.js'; import { IdService } from '@/core/IdService.js';
import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js'; import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js';
import { MetaService } from '@/core/MetaService.js';
import type { OnModuleInit } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common';
import type { CustomEmojiService } from '../CustomEmojiService.js'; import type { CustomEmojiService } from '../CustomEmojiService.js';
import type { ReactionService } from '../ReactionService.js'; import type { ReactionService } from '../ReactionService.js';
@ -42,6 +43,7 @@ export class NoteEntityService implements OnModuleInit {
private reactionService: ReactionService; private reactionService: ReactionService;
private reactionsBufferingService: ReactionsBufferingService; private reactionsBufferingService: ReactionsBufferingService;
private idService: IdService; private idService: IdService;
private metaService: MetaService;
private noteLoader = new DebounceLoader(this.findNoteOrFail); private noteLoader = new DebounceLoader(this.findNoteOrFail);
constructor( constructor(
@ -73,6 +75,8 @@ export class NoteEntityService implements OnModuleInit {
//private customEmojiService: CustomEmojiService, //private customEmojiService: CustomEmojiService,
//private reactionService: ReactionService, //private reactionService: ReactionService,
//private reactionsBufferingService: ReactionsBufferingService, //private reactionsBufferingService: ReactionsBufferingService,
//private idService: IdService,
//private metaService: MetaService,
) { ) {
} }
@ -83,6 +87,7 @@ export class NoteEntityService implements OnModuleInit {
this.reactionService = this.moduleRef.get('ReactionService'); this.reactionService = this.moduleRef.get('ReactionService');
this.reactionsBufferingService = this.moduleRef.get('ReactionsBufferingService'); this.reactionsBufferingService = this.moduleRef.get('ReactionsBufferingService');
this.idService = this.moduleRef.get('IdService'); this.idService = this.moduleRef.get('IdService');
this.metaService = this.moduleRef.get('MetaService');
} }
@bindThis @bindThis
@ -417,8 +422,9 @@ export class NoteEntityService implements OnModuleInit {
) { ) {
if (notes.length === 0) return []; if (notes.length === 0) return [];
const rbt = true; const meta = await this.metaService.fetch();
const reactionsDeltas = rbt ? await this.reactionsBufferingService.getMany(notes.map(x => x.id)) : new Map();
const reactionsDeltas = meta.enableReactionsBuffering ? await this.reactionsBufferingService.getMany(notes.map(x => x.id)) : new Map();
const meId = me ? me.id : null; const meId = me ? me.id : null;
const myReactionsMap = new Map<MiNote['id'], string | null>(); const myReactionsMap = new Map<MiNote['id'], string | null>();

View File

@ -11,6 +11,7 @@ export const DI = {
redisForPub: Symbol('redisForPub'), redisForPub: Symbol('redisForPub'),
redisForSub: Symbol('redisForSub'), redisForSub: Symbol('redisForSub'),
redisForTimelines: Symbol('redisForTimelines'), redisForTimelines: Symbol('redisForTimelines'),
redisForReactions: Symbol('redisForReactions'),
//#region Repositories //#region Repositories
usersRepository: Symbol('usersRepository'), usersRepository: Symbol('usersRepository'),

View File

@ -7,6 +7,7 @@ import { Inject, Injectable } from '@nestjs/common';
import type Logger from '@/logger.js'; import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js'; import { ReactionsBufferingService } from '@/core/ReactionsBufferingService.js';
import { MetaService } from '@/core/MetaService.js';
import { QueueLoggerService } from '../QueueLoggerService.js'; import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq'; import type * as Bull from 'bullmq';
@ -16,6 +17,7 @@ export class BakeBufferedReactionsProcessorService {
constructor( constructor(
private reactionsBufferingService: ReactionsBufferingService, private reactionsBufferingService: ReactionsBufferingService,
private metaService: MetaService,
private queueLoggerService: QueueLoggerService, private queueLoggerService: QueueLoggerService,
) { ) {
this.logger = this.queueLoggerService.logger.createSubLogger('bake-buffered-reactions'); this.logger = this.queueLoggerService.logger.createSubLogger('bake-buffered-reactions');
@ -23,6 +25,12 @@ export class BakeBufferedReactionsProcessorService {
@bindThis @bindThis
public async process(): Promise<void> { public async process(): Promise<void> {
const meta = await this.metaService.fetch();
if (!meta.enableReactionsBuffering) {
this.logger.info('Reactions buffering is disabled. Skipping...');
return;
}
this.logger.info('Baking buffered reactions...'); this.logger.info('Baking buffered reactions...');
await this.reactionsBufferingService.bake(); await this.reactionsBufferingService.bake();

View File

@ -27,6 +27,9 @@ export class HealthServerService {
@Inject(DI.redisForTimelines) @Inject(DI.redisForTimelines)
private redisForTimelines: Redis.Redis, private redisForTimelines: Redis.Redis,
@Inject(DI.redisForReactions)
private redisForReactions: Redis.Redis,
@Inject(DI.db) @Inject(DI.db)
private db: DataSource, private db: DataSource,
@ -43,6 +46,7 @@ export class HealthServerService {
this.redisForPub.ping(), this.redisForPub.ping(),
this.redisForSub.ping(), this.redisForSub.ping(),
this.redisForTimelines.ping(), this.redisForTimelines.ping(),
this.redisForReactions.ping(),
this.db.query('SELECT 1'), this.db.query('SELECT 1'),
...(this.meilisearch ? [this.meilisearch.health()] : []), ...(this.meilisearch ? [this.meilisearch.health()] : []),
]).then(() => 200, () => 503)); ]).then(() => 200, () => 503));