From eb6edc1473d07f7a8da44d180614545816cf483d Mon Sep 17 00:00:00 2001 From: anatawa12 Date: Wed, 2 Apr 2025 09:15:43 +0900 Subject: [PATCH] =?UTF-8?q?chore:=20XADD=E3=82=92retry=E3=81=99=E3=82=8B?= =?UTF-8?q?=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../backend/src/core/NotificationService.ts | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/packages/backend/src/core/NotificationService.ts b/packages/backend/src/core/NotificationService.ts index cca0868d77..eeade4569b 100644 --- a/packages/backend/src/core/NotificationService.ts +++ b/packages/backend/src/core/NotificationService.ts @@ -7,6 +7,7 @@ import { setTimeout } from 'node:timers/promises'; import * as Redis from 'ioredis'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { In } from 'typeorm'; +import { ReplyError } from 'ioredis'; import { DI } from '@/di-symbols.js'; import type { UsersRepository } from '@/models/_.js'; import type { MiUser } from '@/models/User.js'; @@ -145,22 +146,36 @@ export class NotificationService implements OnApplicationShutdown { } } - const notification = { - id: this.idService.gen(), - createdAt: new Date(), - type: type, - ...(notifierId ? { - notifierId, - } : {}), - ...data, - } as any as FilterUnionByProperty; + const createdAt = new Date(); + let notification: FilterUnionByProperty; + let redisId: string; - // TODO: 同一ミリ秒に生成されたときにランダム部分が昇順じゃなくてXADDが失敗する可能性があるのでid再生成しながらリトライするべきかも。またはidをRedisの生成したものから生成するようにする。 - const redisIdPromise = this.redisClient.xadd( - `notificationTimeline:${notifieeId}`, - 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(), - this.toXListId(notification.id), - 'data', JSON.stringify(notification)); + do { + notification = { + id: this.idService.gen(), + createdAt, + type: type, + ...(notifierId ? { + notifierId, + } : {}), + ...data, + } as unknown as FilterUnionByProperty; + + try { + redisId = (await this.redisClient.xadd( + `notificationTimeline:${notifieeId}`, + 'MAXLEN', '~', this.config.perUserNotificationsMaxCount.toString(), + this.toXListId(notification.id), + 'data', JSON.stringify(notification)))!; + } catch (e) { + // The ID specified in XADD is equal or smaller than the target stream top item で失敗することがあるのでリトライ + if (e instanceof ReplyError) continue; + throw e; + } + + break; + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + } while (true); const packed = await this.notificationEntityService.pack(notification, notifieeId, {}); @@ -174,7 +189,7 @@ export class NotificationService implements OnApplicationShutdown { const interval = notification.type === 'test' ? 0 : 2000; setTimeout(interval, 'unread notification', { signal: this.#shutdownController.signal }).then(async () => { const latestReadNotificationId = await this.redisClient.get(`latestReadNotification:${notifieeId}`); - if (latestReadNotificationId && (latestReadNotificationId >= (await redisIdPromise)!)) return; + if (latestReadNotificationId && (latestReadNotificationId >= redisId)) return; this.globalEventService.publishMainStream(notifieeId, 'unreadNotification', packed); this.pushNotificationService.pushNotification(notifieeId, 'notification', packed);