enhance(backend): improve cache

This commit is contained in:
syuilo 2023-04-05 10:21:10 +09:00
parent 625fed8838
commit f44504097c
32 changed files with 264 additions and 448 deletions

View File

@ -1,6 +1,6 @@
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import Redis from 'ioredis'; import Redis from 'ioredis';
import type { UserProfile, UsersRepository } from '@/models/index.js'; import type { BlockingsRepository, ChannelFollowingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, UserProfile, UserProfilesRepository, UsersRepository } from '@/models/index.js';
import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js'; import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js';
import type { LocalUser, User } from '@/models/entities/User.js'; import type { LocalUser, User } from '@/models/entities/User.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
@ -16,7 +16,12 @@ export class CacheService implements OnApplicationShutdown {
public localUserByIdCache: MemoryKVCache<LocalUser>; public localUserByIdCache: MemoryKVCache<LocalUser>;
public uriPersonCache: MemoryKVCache<User | null>; public uriPersonCache: MemoryKVCache<User | null>;
public userProfileCache: RedisKVCache<UserProfile>; public userProfileCache: RedisKVCache<UserProfile>;
public userMutingsCache: RedisKVCache<string[]>; public userMutingsCache: RedisKVCache<Set<string>>;
public userBlockingCache: RedisKVCache<Set<string>>;
public userBlockedCache: RedisKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
public renoteMutingsCache: RedisKVCache<Set<string>>;
public userFollowingsCache: RedisKVCache<Set<string>>;
public userFollowingChannelsCache: RedisKVCache<Set<string>>;
constructor( constructor(
@Inject(DI.redis) @Inject(DI.redis)
@ -28,6 +33,24 @@ export class CacheService implements OnApplicationShutdown {
@Inject(DI.usersRepository) @Inject(DI.usersRepository)
private usersRepository: UsersRepository, private usersRepository: UsersRepository,
@Inject(DI.userProfilesRepository)
private userProfilesRepository: UserProfilesRepository,
@Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository,
@Inject(DI.blockingsRepository)
private blockingsRepository: BlockingsRepository,
@Inject(DI.renoteMutingsRepository)
private renoteMutingsRepository: RenoteMutingsRepository,
@Inject(DI.followingsRepository)
private followingsRepository: FollowingsRepository,
@Inject(DI.channelFollowingsRepository)
private channelFollowingsRepository: ChannelFollowingsRepository,
private userEntityService: UserEntityService, private userEntityService: UserEntityService,
) { ) {
//this.onMessage = this.onMessage.bind(this); //this.onMessage = this.onMessage.bind(this);
@ -36,8 +59,62 @@ export class CacheService implements OnApplicationShutdown {
this.localUserByNativeTokenCache = new MemoryKVCache<LocalUser | null>(Infinity); this.localUserByNativeTokenCache = new MemoryKVCache<LocalUser | null>(Infinity);
this.localUserByIdCache = new MemoryKVCache<LocalUser>(Infinity); this.localUserByIdCache = new MemoryKVCache<LocalUser>(Infinity);
this.uriPersonCache = new MemoryKVCache<User | null>(Infinity); this.uriPersonCache = new MemoryKVCache<User | null>(Infinity);
this.userProfileCache = new RedisKVCache<UserProfile>(this.redisClient, 'userProfile', 1000 * 60 * 60 * 24, 1000 * 60);
this.userMutingsCache = new RedisKVCache<string[]>(this.redisClient, 'userMutings', 1000 * 60 * 60 * 24, 1000 * 60); this.userProfileCache = new RedisKVCache<UserProfile>(this.redisClient, 'userProfile', {
lifetime: 1000 * 60 * 30, // 30m
memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }),
toRedisConverter: (value) => JSON.stringify(value),
fromRedisConverter: (value) => JSON.parse(value), // TODO: date型の考慮
});
this.userMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userMutings', {
lifetime: 1000 * 60 * 30, // 30m
memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
fromRedisConverter: (value) => new Set(JSON.parse(value)),
});
this.userBlockingCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocking', {
lifetime: 1000 * 60 * 30, // 30m
memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))),
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
fromRedisConverter: (value) => new Set(JSON.parse(value)),
});
this.userBlockedCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocked', {
lifetime: 1000 * 60 * 30, // 30m
memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))),
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
fromRedisConverter: (value) => new Set(JSON.parse(value)),
});
this.renoteMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'renoteMutings', {
lifetime: 1000 * 60 * 30, // 30m
memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
fromRedisConverter: (value) => new Set(JSON.parse(value)),
});
this.userFollowingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowings', {
lifetime: 1000 * 60 * 30, // 30m
memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId'] }).then(xs => new Set(xs.map(x => x.followeeId))),
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
fromRedisConverter: (value) => new Set(JSON.parse(value)),
});
this.userFollowingChannelsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowingChannels', {
lifetime: 1000 * 60 * 30, // 30m
memoryCacheLifetime: 1000 * 60, // 1m
fetcher: (key) => this.channelFollowingsRepository.find({ where: { followerId: key }, select: ['followeeId'] }).then(xs => new Set(xs.map(x => x.followeeId))),
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
fromRedisConverter: (value) => new Set(JSON.parse(value)),
});
this.redisSubscriber.on('message', this.onMessage); this.redisSubscriber.on('message', this.onMessage);
} }

View File

@ -36,8 +36,5 @@ export class DeleteAccountService {
await this.usersRepository.update(user.id, { await this.usersRepository.update(user.id, {
isDeleted: true, isDeleted: true,
}); });
// Terminate streaming
this.globalEventService.publishUserEvent(user.id, 'terminate', {});
} }
} }

View File

@ -14,7 +14,6 @@ import type {
MainStreamTypes, MainStreamTypes,
NoteStreamTypes, NoteStreamTypes,
UserListStreamTypes, UserListStreamTypes,
UserStreamTypes,
} from '@/server/api/stream/types.js'; } from '@/server/api/stream/types.js';
import type { Packed } from '@/misc/json-schema.js'; import type { Packed } from '@/misc/json-schema.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
@ -49,11 +48,6 @@ export class GlobalEventService {
this.publish('internal', type, typeof value === 'undefined' ? null : value); this.publish('internal', type, typeof value === 'undefined' ? null : value);
} }
@bindThis
public publishUserEvent<K extends keyof UserStreamTypes>(userId: User['id'], type: K, value?: UserStreamTypes[K]): void {
this.publish(`user:${userId}`, type, typeof value === 'undefined' ? null : value);
}
@bindThis @bindThis
public publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): void { public publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): void {
this.publish('broadcast', type, typeof value === 'undefined' ? null : value); this.publish('broadcast', type, typeof value === 'undefined' ? null : value);

View File

@ -73,7 +73,7 @@ export class NotificationService implements OnApplicationShutdown {
type: Notification['type'], type: Notification['type'],
data: Partial<Notification>, data: Partial<Notification>,
): Promise<Notification | null> { ): Promise<Notification | null> {
const profile = await this.cacheService.userProfileCache.fetch(notifieeId, () => this.userProfilesRepository.findOneByOrFail({ userId: notifieeId })); const profile = await this.cacheService.userProfileCache.fetch(notifieeId);
const isMuted = profile.mutingNotificationTypes.includes(type); const isMuted = profile.mutingNotificationTypes.includes(type);
if (isMuted) return null; if (isMuted) return null;
@ -82,8 +82,8 @@ export class NotificationService implements OnApplicationShutdown {
return null; return null;
} }
const mutings = await this.cacheService.userMutingsCache.fetch(notifieeId, () => this.mutingsRepository.findBy({ muterId: notifieeId }).then(xs => xs.map(x => x.muteeId))); const mutings = await this.cacheService.userMutingsCache.fetch(notifieeId);
if (mutings.includes(data.notifierId)) { if (mutings.has(data.notifierId)) {
return null; return null;
} }
} }

View File

@ -1,40 +1,26 @@
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import Redis from 'ioredis';
import { IdService } from '@/core/IdService.js'; import { IdService } from '@/core/IdService.js';
import type { User } from '@/models/entities/User.js'; import type { User } from '@/models/entities/User.js';
import type { Blocking } from '@/models/entities/Blocking.js'; import type { Blocking } from '@/models/entities/Blocking.js';
import { QueueService } from '@/core/QueueService.js'; import { QueueService } from '@/core/QueueService.js';
import { GlobalEventService } from '@/core/GlobalEventService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js';
import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
import type { UsersRepository, FollowingsRepository, FollowRequestsRepository, BlockingsRepository, UserListsRepository, UserListJoiningsRepository } from '@/models/index.js'; import type { FollowRequestsRepository, BlockingsRepository, UserListsRepository, UserListJoiningsRepository } from '@/models/index.js';
import Logger from '@/logger.js'; import Logger from '@/logger.js';
import { UserEntityService } from '@/core/entities/UserEntityService.js'; import { UserEntityService } from '@/core/entities/UserEntityService.js';
import { ApRendererService } from '@/core/activitypub/ApRendererService.js'; import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
import { LoggerService } from '@/core/LoggerService.js'; import { LoggerService } from '@/core/LoggerService.js';
import { WebhookService } from '@/core/WebhookService.js'; import { WebhookService } from '@/core/WebhookService.js';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
import { MemoryKVCache } from '@/misc/cache.js'; import { CacheService } from '@/core/CacheService.js';
import { StreamMessages } from '@/server/api/stream/types.js'; import { UserFollowingService } from '@/core/UserFollowingService.js';
@Injectable() @Injectable()
export class UserBlockingService implements OnApplicationShutdown { export class UserBlockingService {
private logger: Logger; private logger: Logger;
// キーがユーザーIDで、値がそのユーザーがブロックしているユーザーのIDのリストなキャッシュ
private blockingsByUserIdCache: MemoryKVCache<User['id'][]>;
constructor( constructor(
@Inject(DI.redisSubscriber)
private redisSubscriber: Redis.Redis,
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@Inject(DI.followingsRepository)
private followingsRepository: FollowingsRepository,
@Inject(DI.followRequestsRepository) @Inject(DI.followRequestsRepository)
private followRequestsRepository: FollowRequestsRepository, private followRequestsRepository: FollowRequestsRepository,
@ -47,47 +33,17 @@ export class UserBlockingService implements OnApplicationShutdown {
@Inject(DI.userListJoiningsRepository) @Inject(DI.userListJoiningsRepository)
private userListJoiningsRepository: UserListJoiningsRepository, private userListJoiningsRepository: UserListJoiningsRepository,
private cacheService: CacheService,
private userFollowingService: UserFollowingService,
private userEntityService: UserEntityService, private userEntityService: UserEntityService,
private idService: IdService, private idService: IdService,
private queueService: QueueService, private queueService: QueueService,
private globalEventService: GlobalEventService, private globalEventService: GlobalEventService,
private webhookService: WebhookService, private webhookService: WebhookService,
private apRendererService: ApRendererService, private apRendererService: ApRendererService,
private perUserFollowingChart: PerUserFollowingChart,
private loggerService: LoggerService, private loggerService: LoggerService,
) { ) {
this.logger = this.loggerService.getLogger('user-block'); this.logger = this.loggerService.getLogger('user-block');
this.blockingsByUserIdCache = new MemoryKVCache<User['id'][]>(Infinity);
this.redisSubscriber.on('message', this.onMessage);
}
@bindThis
private async onMessage(_: string, data: string): Promise<void> {
const obj = JSON.parse(data);
if (obj.channel === 'internal') {
const { type, body } = obj.message as StreamMessages['internal']['payload'];
switch (type) {
case 'blockingCreated': {
const cached = this.blockingsByUserIdCache.get(body.blockerId);
if (cached) {
this.blockingsByUserIdCache.set(body.blockerId, [...cached, ...[body.blockeeId]]);
}
break;
}
case 'blockingDeleted': {
const cached = this.blockingsByUserIdCache.get(body.blockerId);
if (cached) {
this.blockingsByUserIdCache.set(body.blockerId, cached.filter(x => x !== body.blockeeId));
}
break;
}
default:
break;
}
}
} }
@bindThis @bindThis
@ -95,8 +51,8 @@ export class UserBlockingService implements OnApplicationShutdown {
await Promise.all([ await Promise.all([
this.cancelRequest(blocker, blockee), this.cancelRequest(blocker, blockee),
this.cancelRequest(blockee, blocker), this.cancelRequest(blockee, blocker),
this.unFollow(blocker, blockee), this.userFollowingService.unfollow(blocker, blockee),
this.unFollow(blockee, blocker), this.userFollowingService.unfollow(blockee, blocker),
this.removeFromList(blockee, blocker), this.removeFromList(blockee, blocker),
]); ]);
@ -111,6 +67,9 @@ export class UserBlockingService implements OnApplicationShutdown {
await this.blockingsRepository.insert(blocking); await this.blockingsRepository.insert(blocking);
this.cacheService.userBlockingCache.refresh(blocker.id);
this.cacheService.userBlockedCache.refresh(blockee.id);
this.globalEventService.publishInternalEvent('blockingCreated', { this.globalEventService.publishInternalEvent('blockingCreated', {
blockerId: blocker.id, blockerId: blocker.id,
blockeeId: blockee.id, blockeeId: blockee.id,
@ -148,7 +107,6 @@ export class UserBlockingService implements OnApplicationShutdown {
this.userEntityService.pack(followee, follower, { this.userEntityService.pack(followee, follower, {
detail: true, detail: true,
}).then(async packed => { }).then(async packed => {
this.globalEventService.publishUserEvent(follower.id, 'unfollow', packed);
this.globalEventService.publishMainStream(follower.id, 'unfollow', packed); this.globalEventService.publishMainStream(follower.id, 'unfollow', packed);
const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow')); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow'));
@ -173,54 +131,6 @@ export class UserBlockingService implements OnApplicationShutdown {
} }
} }
@bindThis
private async unFollow(follower: User, followee: User) {
const following = await this.followingsRepository.findOneBy({
followerId: follower.id,
followeeId: followee.id,
});
if (following == null) {
return;
}
await Promise.all([
this.followingsRepository.delete(following.id),
this.usersRepository.decrement({ id: follower.id }, 'followingCount', 1),
this.usersRepository.decrement({ id: followee.id }, 'followersCount', 1),
this.perUserFollowingChart.update(follower, followee, false),
]);
// Publish unfollow event
if (this.userEntityService.isLocalUser(follower)) {
this.userEntityService.pack(followee, follower, {
detail: true,
}).then(async packed => {
this.globalEventService.publishUserEvent(follower.id, 'unfollow', packed);
this.globalEventService.publishMainStream(follower.id, 'unfollow', packed);
const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow'));
for (const webhook of webhooks) {
this.queueService.webhookDeliver(webhook, 'unfollow', {
user: packed,
});
}
});
}
// リモートにフォローをしていたらUndoFollow送信
if (this.userEntityService.isLocalUser(follower) && this.userEntityService.isRemoteUser(followee)) {
const content = this.apRendererService.addContext(this.apRendererService.renderUndo(this.apRendererService.renderFollow(follower, followee), follower));
this.queueService.deliver(follower, content, followee.inbox, false);
}
// リモートからフォローをされていたらRejectFollow送信
if (this.userEntityService.isLocalUser(followee) && this.userEntityService.isRemoteUser(follower)) {
const content = this.apRendererService.addContext(this.apRendererService.renderReject(this.apRendererService.renderFollow(follower, followee), followee));
this.queueService.deliver(followee, content, follower.inbox, false);
}
}
@bindThis @bindThis
private async removeFromList(listOwner: User, user: User) { private async removeFromList(listOwner: User, user: User) {
const userLists = await this.userListsRepository.findBy({ const userLists = await this.userListsRepository.findBy({
@ -254,6 +164,9 @@ export class UserBlockingService implements OnApplicationShutdown {
await this.blockingsRepository.delete(blocking.id); await this.blockingsRepository.delete(blocking.id);
this.cacheService.userBlockingCache.refresh(blocker.id);
this.cacheService.userBlockedCache.refresh(blockee.id);
this.globalEventService.publishInternalEvent('blockingDeleted', { this.globalEventService.publishInternalEvent('blockingDeleted', {
blockerId: blocker.id, blockerId: blocker.id,
blockeeId: blockee.id, blockeeId: blockee.id,
@ -268,17 +181,6 @@ export class UserBlockingService implements OnApplicationShutdown {
@bindThis @bindThis
public async checkBlocked(blockerId: User['id'], blockeeId: User['id']): Promise<boolean> { public async checkBlocked(blockerId: User['id'], blockeeId: User['id']): Promise<boolean> {
const blockedUserIds = await this.blockingsByUserIdCache.fetch(blockerId, () => this.blockingsRepository.find({ return (await this.cacheService.userBlockingCache.fetch(blockerId)).has(blockeeId);
where: {
blockerId,
},
select: ['blockeeId'],
}).then(records => records.map(record => record.blockeeId)));
return blockedUserIds.includes(blockeeId);
}
@bindThis
public onApplicationShutdown(signal?: string | undefined) {
this.redisSubscriber.off('message', this.onMessage);
} }
} }

View File

@ -18,6 +18,7 @@ import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
import { UserBlockingService } from '@/core/UserBlockingService.js'; import { UserBlockingService } from '@/core/UserBlockingService.js';
import { MetaService } from '@/core/MetaService.js'; import { MetaService } from '@/core/MetaService.js';
import { CacheService } from '@/core/CacheService.js';
import Logger from '../logger.js'; import Logger from '../logger.js';
const logger = new Logger('following/create'); const logger = new Logger('following/create');
@ -53,6 +54,7 @@ export class UserFollowingService {
@Inject(DI.instancesRepository) @Inject(DI.instancesRepository)
private instancesRepository: InstancesRepository, private instancesRepository: InstancesRepository,
private cacheService: CacheService,
private userEntityService: UserEntityService, private userEntityService: UserEntityService,
private userBlockingService: UserBlockingService, private userBlockingService: UserBlockingService,
private idService: IdService, private idService: IdService,
@ -172,6 +174,8 @@ export class UserFollowingService {
} }
}); });
this.cacheService.userFollowingsCache.refresh(follower.id);
const req = await this.followRequestsRepository.findOneBy({ const req = await this.followRequestsRepository.findOneBy({
followeeId: followee.id, followeeId: followee.id,
followerId: follower.id, followerId: follower.id,
@ -225,7 +229,6 @@ export class UserFollowingService {
this.userEntityService.pack(followee.id, follower, { this.userEntityService.pack(followee.id, follower, {
detail: true, detail: true,
}).then(async packed => { }).then(async packed => {
this.globalEventService.publishUserEvent(follower.id, 'follow', packed as Packed<'UserDetailedNotMe'>);
this.globalEventService.publishMainStream(follower.id, 'follow', packed as Packed<'UserDetailedNotMe'>); this.globalEventService.publishMainStream(follower.id, 'follow', packed as Packed<'UserDetailedNotMe'>);
const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('follow')); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('follow'));
@ -279,6 +282,8 @@ export class UserFollowingService {
await this.followingsRepository.delete(following.id); await this.followingsRepository.delete(following.id);
this.cacheService.userFollowingsCache.refresh(follower.id);
this.decrementFollowing(follower, followee); this.decrementFollowing(follower, followee);
// Publish unfollow event // Publish unfollow event
@ -286,7 +291,6 @@ export class UserFollowingService {
this.userEntityService.pack(followee.id, follower, { this.userEntityService.pack(followee.id, follower, {
detail: true, detail: true,
}).then(async packed => { }).then(async packed => {
this.globalEventService.publishUserEvent(follower.id, 'unfollow', packed);
this.globalEventService.publishMainStream(follower.id, 'unfollow', packed); this.globalEventService.publishMainStream(follower.id, 'unfollow', packed);
const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow')); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow'));
@ -579,7 +583,6 @@ export class UserFollowingService {
detail: true, detail: true,
}); });
this.globalEventService.publishUserEvent(follower.id, 'unfollow', packedFollowee);
this.globalEventService.publishMainStream(follower.id, 'unfollow', packedFollowee); this.globalEventService.publishMainStream(follower.id, 'unfollow', packedFollowee);
const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow')); const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow'));

View File

@ -1,34 +1,47 @@
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import type { UsersRepository, MutingsRepository } from '@/models/index.js'; import { In } from 'typeorm';
import type { MutingsRepository, Muting } from '@/models/index.js';
import { IdService } from '@/core/IdService.js'; import { IdService } from '@/core/IdService.js';
import { QueueService } from '@/core/QueueService.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import type { User } from '@/models/entities/User.js'; import type { User } from '@/models/entities/User.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
import { CacheService } from '@/core/CacheService';
@Injectable() @Injectable()
export class UserMutingService { export class UserMutingService {
constructor( constructor(
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@Inject(DI.mutingsRepository) @Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository, private mutingsRepository: MutingsRepository,
private idService: IdService, private idService: IdService,
private queueService: QueueService, private cacheService: CacheService,
private globalEventService: GlobalEventService,
) { ) {
} }
@bindThis @bindThis
public async mute(user: User, target: User): Promise<void> { public async mute(user: User, target: User, expiresAt: Date | null = null): Promise<void> {
await this.mutingsRepository.insert({ await this.mutingsRepository.insert({
id: this.idService.genId(), id: this.idService.genId(),
createdAt: new Date(), createdAt: new Date(),
expiresAt: expiresAt ?? null,
muterId: user.id, muterId: user.id,
muteeId: target.id, muteeId: target.id,
}); });
this.cacheService.userMutingsCache.refresh(user.id);
}
@bindThis
public async unmute(mutings: Muting[]): Promise<void> {
if (mutings.length === 0) return;
await this.mutingsRepository.delete({
id: In(mutings.map(m => m.id)),
});
const muterIds = [...new Set(mutings.map(m => m.muterId))];
for (const muterId of muterIds) {
this.cacheService.userMutingsCache.refresh(muterId);
}
} }
} }

View File

@ -1,29 +1,29 @@
import Redis from 'ioredis'; import Redis from 'ioredis';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
// redis通すとDateのインスタンスはstringに変換されるので
type Serialized<T> = {
[K in keyof T]:
T[K] extends Date
? string
: T[K] extends (Date | null)
? (string | null)
: T[K] extends Record<string, any>
? Serialized<T[K]>
: T[K];
};
export class RedisKVCache<T> { export class RedisKVCache<T> {
private redisClient: Redis.Redis; private redisClient: Redis.Redis;
private name: string; private name: string;
private lifetime: number; private lifetime: number;
private memoryCache: MemoryKVCache<T>; private memoryCache: MemoryKVCache<T>;
private fetcher: (key: string) => Promise<T>;
private toRedisConverter: (value: T) => string;
private fromRedisConverter: (value: string) => T;
constructor(redisClient: RedisKVCache<never>['redisClient'], name: RedisKVCache<never>['name'], lifetime: RedisKVCache<never>['lifetime'], memoryCacheLifetime: number) { constructor(redisClient: RedisKVCache<T>['redisClient'], name: RedisKVCache<T>['name'], opts: {
lifetime: RedisKVCache<T>['lifetime'];
memoryCacheLifetime: number;
fetcher: RedisKVCache<T>['fetcher'];
toRedisConverter: RedisKVCache<T>['toRedisConverter'];
fromRedisConverter: RedisKVCache<T>['fromRedisConverter'];
}) {
this.redisClient = redisClient; this.redisClient = redisClient;
this.name = name; this.name = name;
this.lifetime = lifetime; this.lifetime = opts.lifetime;
this.memoryCache = new MemoryKVCache(memoryCacheLifetime); this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime);
this.fetcher = opts.fetcher;
this.toRedisConverter = opts.toRedisConverter;
this.fromRedisConverter = opts.fromRedisConverter;
} }
@bindThis @bindThis
@ -32,25 +32,25 @@ export class RedisKVCache<T> {
if (this.lifetime === Infinity) { if (this.lifetime === Infinity) {
await this.redisClient.set( await this.redisClient.set(
`kvcache:${this.name}:${key}`, `kvcache:${this.name}:${key}`,
JSON.stringify(value), this.toRedisConverter(value),
); );
} else { } else {
await this.redisClient.set( await this.redisClient.set(
`kvcache:${this.name}:${key}`, `kvcache:${this.name}:${key}`,
JSON.stringify(value), this.toRedisConverter(value),
'ex', Math.round(this.lifetime / 1000), 'ex', Math.round(this.lifetime / 1000),
); );
} }
} }
@bindThis @bindThis
public async get(key: string): Promise<Serialized<T> | T | undefined> { public async get(key: string): Promise<T | undefined> {
const memoryCached = this.memoryCache.get(key); const memoryCached = this.memoryCache.get(key);
if (memoryCached !== undefined) return memoryCached; if (memoryCached !== undefined) return memoryCached;
const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`); const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`);
if (cached == null) return undefined; if (cached == null) return undefined;
return JSON.parse(cached); return this.fromRedisConverter(cached);
} }
@bindThis @bindThis
@ -61,28 +61,28 @@ export class RedisKVCache<T> {
/** /**
* fetcherを呼び出して結果をキャッシュ& * fetcherを呼び出して結果をキャッシュ&
* optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします
*/ */
@bindThis @bindThis
public async fetch(key: string, fetcher: () => Promise<T>, validator?: (cachedValue: Serialized<T> | T) => boolean): Promise<Serialized<T> | T> { public async fetch(key: string): Promise<T> {
const cachedValue = await this.get(key); const cachedValue = await this.get(key);
if (cachedValue !== undefined) { if (cachedValue !== undefined) {
if (validator) {
if (validator(cachedValue)) {
// Cache HIT // Cache HIT
return cachedValue; return cachedValue;
} }
} else {
// Cache HIT
return cachedValue;
}
}
// Cache MISS // Cache MISS
const value = await fetcher(); const value = await this.fetcher(key);
this.set(key, value); this.set(key, value);
return value; return value;
} }
@bindThis
public async refresh(key: string) {
const value = await this.fetcher(key);
this.set(key, value);
// TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする
}
} }
// TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする? // TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする?

View File

@ -4,10 +4,10 @@ import { DI } from '@/di-symbols.js';
import type { MutingsRepository } from '@/models/index.js'; import type { MutingsRepository } from '@/models/index.js';
import type { Config } from '@/config.js'; import type { Config } from '@/config.js';
import type Logger from '@/logger.js'; import type Logger from '@/logger.js';
import { GlobalEventService } from '@/core/GlobalEventService.js'; import { bindThis } from '@/decorators.js';
import { UserMutingService } from '@/core/UserMutingService.js';
import { QueueLoggerService } from '../QueueLoggerService.js'; import { QueueLoggerService } from '../QueueLoggerService.js';
import type Bull from 'bull'; import type Bull from 'bull';
import { bindThis } from '@/decorators.js';
@Injectable() @Injectable()
export class CheckExpiredMutingsProcessorService { export class CheckExpiredMutingsProcessorService {
@ -20,7 +20,7 @@ export class CheckExpiredMutingsProcessorService {
@Inject(DI.mutingsRepository) @Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository, private mutingsRepository: MutingsRepository,
private globalEventService: GlobalEventService, private userMutingService: UserMutingService,
private queueLoggerService: QueueLoggerService, private queueLoggerService: QueueLoggerService,
) { ) {
this.logger = this.queueLoggerService.logger.createSubLogger('check-expired-mutings'); this.logger = this.queueLoggerService.logger.createSubLogger('check-expired-mutings');
@ -37,13 +37,7 @@ export class CheckExpiredMutingsProcessorService {
.getMany(); .getMany();
if (expired.length > 0) { if (expired.length > 0) {
await this.mutingsRepository.delete({ await this.userMutingService.unmute(expired);
id: In(expired.map(m => m.id)),
});
for (const m of expired) {
this.globalEventService.publishUserEvent(m.muterId, 'unmute', m.mutee!);
}
} }
this.logger.succ('All expired mutings checked.'); this.logger.succ('All expired mutings checked.');

View File

@ -9,6 +9,7 @@ import { NoteReadService } from '@/core/NoteReadService.js';
import { GlobalEventService } from '@/core/GlobalEventService.js'; import { GlobalEventService } from '@/core/GlobalEventService.js';
import { NotificationService } from '@/core/NotificationService.js'; import { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
import { CacheService } from '@/core/CacheService.js';
import { AuthenticateService } from './AuthenticateService.js'; import { AuthenticateService } from './AuthenticateService.js';
import MainStreamConnection from './stream/index.js'; import MainStreamConnection from './stream/index.js';
import { ChannelsService } from './stream/ChannelsService.js'; import { ChannelsService } from './stream/ChannelsService.js';
@ -45,7 +46,7 @@ export class StreamingApiServerService {
@Inject(DI.userProfilesRepository) @Inject(DI.userProfilesRepository)
private userProfilesRepository: UserProfilesRepository, private userProfilesRepository: UserProfilesRepository,
private globalEventService: GlobalEventService, private cacheService: CacheService,
private noteReadService: NoteReadService, private noteReadService: NoteReadService,
private authenticateService: AuthenticateService, private authenticateService: AuthenticateService,
private channelsService: ChannelsService, private channelsService: ChannelsService,
@ -73,8 +74,6 @@ export class StreamingApiServerService {
return; return;
} }
const connection = request.accept();
const ev = new EventEmitter(); const ev = new EventEmitter();
async function onRedisMessage(_: string, data: string): Promise<void> { async function onRedisMessage(_: string, data: string): Promise<void> {
@ -85,19 +84,19 @@ export class StreamingApiServerService {
this.redisSubscriber.on('message', onRedisMessage); this.redisSubscriber.on('message', onRedisMessage);
const main = new MainStreamConnection( const main = new MainStreamConnection(
this.followingsRepository,
this.mutingsRepository,
this.renoteMutingsRepository,
this.blockingsRepository,
this.channelFollowingsRepository,
this.userProfilesRepository,
this.channelsService, this.channelsService,
this.globalEventService,
this.noteReadService, this.noteReadService,
this.notificationService, this.notificationService,
connection, ev, user, miapp, this.cacheService,
ev, user, miapp,
); );
await main.init();
const connection = request.accept();
main.init2(connection);
const intervalId = user ? setInterval(() => { const intervalId = user ? setInterval(() => {
this.usersRepository.update(user.id, { this.usersRepository.update(user.id, {
lastActiveDate: new Date(), lastActiveDate: new Date(),

View File

@ -61,11 +61,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
await this.usersRepository.update(user.id, { await this.usersRepository.update(user.id, {
isDeleted: true, isDeleted: true,
}); });
if (this.userEntityService.isLocalUser(user)) {
// Terminate streaming
this.globalEventService.publishUserEvent(user.id, 'terminate', {});
}
}); });
} }
} }

View File

@ -62,11 +62,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
targetId: user.id, targetId: user.id,
}); });
// Terminate streaming
if (this.userEntityService.isLocalUser(user)) {
this.globalEventService.publishUserEvent(user.id, 'terminate', {});
}
(async () => { (async () => {
await this.userSuspendService.doPostSuspend(user).catch(e => {}); await this.userSuspendService.doPostSuspend(user).catch(e => {});
await this.unFollowAll(user).catch(e => {}); await this.unFollowAll(user).catch(e => {});

View File

@ -41,7 +41,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
private channelFollowingsRepository: ChannelFollowingsRepository, private channelFollowingsRepository: ChannelFollowingsRepository,
private idService: IdService, private idService: IdService,
private globalEventService: GlobalEventService,
) { ) {
super(meta, paramDef, async (ps, me) => { super(meta, paramDef, async (ps, me) => {
const channel = await this.channelsRepository.findOneBy({ const channel = await this.channelsRepository.findOneBy({
@ -58,8 +57,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
followerId: me.id, followerId: me.id,
followeeId: channel.id, followeeId: channel.id,
}); });
this.globalEventService.publishUserEvent(me.id, 'followChannel', channel);
}); });
} }
} }

View File

@ -38,8 +38,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
@Inject(DI.channelFollowingsRepository) @Inject(DI.channelFollowingsRepository)
private channelFollowingsRepository: ChannelFollowingsRepository, private channelFollowingsRepository: ChannelFollowingsRepository,
private globalEventService: GlobalEventService,
) { ) {
super(meta, paramDef, async (ps, me) => { super(meta, paramDef, async (ps, me) => {
const channel = await this.channelsRepository.findOneBy({ const channel = await this.channelsRepository.findOneBy({
@ -54,8 +52,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
followerId: me.id, followerId: me.id,
followeeId: channel.id, followeeId: channel.id,
}); });
this.globalEventService.publishUserEvent(me.id, 'unfollowChannel', channel);
}); });
} }
} }

View File

@ -54,11 +54,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
// Publish event // Publish event
this.globalEventService.publishInternalEvent('userTokenRegenerated', { id: me.id, oldToken, newToken }); this.globalEventService.publishInternalEvent('userTokenRegenerated', { id: me.id, oldToken, newToken });
this.globalEventService.publishMainStream(me.id, 'myTokenRegenerated'); this.globalEventService.publishMainStream(me.id, 'myTokenRegenerated');
// Terminate streaming
setTimeout(() => {
this.globalEventService.publishUserEvent(me.id, 'terminate', {});
}, 5000);
}); });
} }
} }

View File

@ -35,9 +35,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
id: ps.tokenId, id: ps.tokenId,
userId: me.id, userId: me.id,
}); });
// Terminate streaming
this.globalEventService.publishUserEvent(me.id, 'terminate');
} }
}); });
} }

View File

@ -284,7 +284,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
// Publish meUpdated event // Publish meUpdated event
this.globalEventService.publishMainStream(user.id, 'meUpdated', iObj); this.globalEventService.publishMainStream(user.id, 'meUpdated', iObj);
this.globalEventService.publishUserEvent(user.id, 'updateUserProfile', updatedProfile);
// 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認
if (user.isLocked && ps.isLocked === false) { if (user.isLocked && ps.isLocked === false) {

View File

@ -1,13 +1,10 @@
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import ms from 'ms'; import ms from 'ms';
import { Endpoint } from '@/server/api/endpoint-base.js'; import { Endpoint } from '@/server/api/endpoint-base.js';
import { IdService } from '@/core/IdService.js';
import type { MutingsRepository } from '@/models/index.js'; import type { MutingsRepository } from '@/models/index.js';
import type { Muting } from '@/models/entities/Muting.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
import { GetterService } from '@/server/api/GetterService.js'; import { GetterService } from '@/server/api/GetterService.js';
import { CacheService } from '@/core/CacheService.js'; import { UserMutingService } from '@/core/UserMutingService.js';
import { ApiError } from '../../error.js'; import { ApiError } from '../../error.js';
export const meta = { export const meta = {
@ -63,10 +60,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
@Inject(DI.mutingsRepository) @Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository, private mutingsRepository: MutingsRepository,
private globalEventService: GlobalEventService,
private getterService: GetterService, private getterService: GetterService,
private idService: IdService, private userMutingService: UserMutingService,
private cacheService: CacheService,
) { ) {
super(meta, paramDef, async (ps, me) => { super(meta, paramDef, async (ps, me) => {
const muter = me; const muter = me;
@ -96,17 +91,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
return; return;
} }
// Create mute await this.userMutingService.mute(muter, mutee, ps.expiresAt ? new Date(ps.expiresAt) : null);
await this.mutingsRepository.insert({
id: this.idService.genId(),
createdAt: new Date(),
expiresAt: ps.expiresAt ? new Date(ps.expiresAt) : null,
muterId: muter.id,
muteeId: mutee.id,
} as Muting);
this.cacheService.userMutingsCache.delete(muter.id);
this.globalEventService.publishUserEvent(me.id, 'mute', mutee);
}); });
} }
} }

View File

@ -1,10 +1,10 @@
import { Inject, Injectable } from '@nestjs/common'; import { Inject, Injectable } from '@nestjs/common';
import { Endpoint } from '@/server/api/endpoint-base.js'; import { Endpoint } from '@/server/api/endpoint-base.js';
import type { MutingsRepository } from '@/models/index.js'; import type { MutingsRepository } from '@/models/index.js';
import { GlobalEventService } from '@/core/GlobalEventService.js';
import { DI } from '@/di-symbols.js'; import { DI } from '@/di-symbols.js';
import { ApiError } from '../../error.js';
import { GetterService } from '@/server/api/GetterService.js'; import { GetterService } from '@/server/api/GetterService.js';
import { UserMutingService } from '@/core/UserMutingService.js';
import { ApiError } from '../../error.js';
export const meta = { export const meta = {
tags: ['account'], tags: ['account'],
@ -49,7 +49,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
@Inject(DI.mutingsRepository) @Inject(DI.mutingsRepository)
private mutingsRepository: MutingsRepository, private mutingsRepository: MutingsRepository,
private globalEventService: GlobalEventService, private userMutingService: UserMutingService,
private getterService: GetterService, private getterService: GetterService,
) { ) {
super(meta, paramDef, async (ps, me) => { super(meta, paramDef, async (ps, me) => {
@ -76,12 +76,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
throw new ApiError(meta.errors.notMuting); throw new ApiError(meta.errors.notMuting);
} }
// Delete mute await this.userMutingService.unmute([exist]);
await this.mutingsRepository.delete({
id: exist.id,
});
this.globalEventService.publishUserEvent(me.id, 'unmute', mutee);
}); });
} }
} }

View File

@ -80,8 +80,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
await this.renoteMutingsRepository.delete({ await this.renoteMutingsRepository.delete({
id: exist.id, id: exist.id,
}); });
// publishUserEvent(user.id, 'unmute', mutee);
}); });
} }
} }

View File

@ -23,16 +23,16 @@ export default abstract class Channel {
return this.connection.following; return this.connection.following;
} }
protected get muting() { protected get userIdsWhoMeMuting() {
return this.connection.muting; return this.connection.userIdsWhoMeMuting;
} }
protected get renoteMuting() { protected get userIdsWhoMeMutingRenotes() {
return this.connection.renoteMuting; return this.connection.userIdsWhoMeMutingRenotes;
} }
protected get blocking() { protected get userIdsWhoBlockingMe() {
return this.connection.blocking; return this.connection.userIdsWhoBlockingMe;
} }
protected get followingChannels() { protected get followingChannels() {

View File

@ -35,11 +35,11 @@ class AntennaChannel extends Channel {
const note = await this.noteEntityService.pack(data.body.id, this.user, { detail: true }); const note = await this.noteEntityService.pack(data.body.id, this.user, { detail: true });
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.connection.cacheNote(note); this.connection.cacheNote(note);

View File

@ -47,11 +47,11 @@ class ChannelChannel extends Channel {
} }
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.connection.cacheNote(note); this.connection.cacheNote(note);

View File

@ -64,11 +64,11 @@ class GlobalTimelineChannel extends Channel {
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return; if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する // 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)

View File

@ -46,11 +46,11 @@ class HashtagChannel extends Channel {
} }
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.connection.cacheNote(note); this.connection.cacheNote(note);

View File

@ -24,7 +24,6 @@ class HomeTimelineChannel extends Channel {
@bindThis @bindThis
public async init(params: any) { public async init(params: any) {
// Subscribe events
this.subscriber.on('notesStream', this.onNote); this.subscriber.on('notesStream', this.onNote);
} }
@ -38,7 +37,7 @@ class HomeTimelineChannel extends Channel {
} }
// Ignore notes from instances the user has muted // Ignore notes from instances the user has muted
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return; if (isInstanceMuted(note, new Set<string>(this.userProfile!.mutedInstances ?? []))) return;
if (['followers', 'specified'].includes(note.visibility)) { if (['followers', 'specified'].includes(note.visibility)) {
note = await this.noteEntityService.pack(note.id, this.user!, { note = await this.noteEntityService.pack(note.id, this.user!, {
@ -71,18 +70,18 @@ class HomeTimelineChannel extends Channel {
} }
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する // 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)
// 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、 // 現状では、ワードミュートにおけるMutedNoteレコードの追加処理はストリーミングに流す処理と並列で行われるため、
// レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。 // レコードが追加されるNoteでも追加されるより先にここのストリーミングの処理に到達することが起こる。
// そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる // そのためレコードが存在するかのチェックでは不十分なので、改めてcheckWordMuteを呼んでいる
if (this.userProfile && await checkWordMute(note, this.user, this.userProfile.mutedWords)) return; if (await checkWordMute(note, this.user, this.userProfile!.mutedWords)) return;
this.connection.cacheNote(note); this.connection.cacheNote(note);

View File

@ -72,7 +72,7 @@ class HybridTimelineChannel extends Channel {
} }
// Ignore notes from instances the user has muted // Ignore notes from instances the user has muted
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return; if (isInstanceMuted(note, new Set<string>(this.userProfile!.mutedInstances ?? []))) return;
// 関係ない返信は除外 // 関係ない返信は除外
if (note.reply && !this.user!.showTimelineReplies) { if (note.reply && !this.user!.showTimelineReplies) {
@ -82,11 +82,11 @@ class HybridTimelineChannel extends Channel {
} }
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する // 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)

View File

@ -61,11 +61,11 @@ class LocalTimelineChannel extends Channel {
} }
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
// 流れてきたNoteがミュートすべきNoteだったら無視する // 流れてきたNoteがミュートすべきNoteだったら無視する
// TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある) // TODO: 将来的には、単にMutedNoteテーブルにレコードがあるかどうかで判定したい(以下の理由により難しそうではある)

View File

@ -26,7 +26,7 @@ class MainChannel extends Channel {
case 'notification': { case 'notification': {
// Ignore notifications from instances the user has muted // Ignore notifications from instances the user has muted
if (isUserFromMutedInstance(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return; if (isUserFromMutedInstance(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
if (data.body.userId && this.muting.has(data.body.userId)) return; if (data.body.userId && this.userIdsWhoMeMuting.has(data.body.userId)) return;
if (data.body.note && data.body.note.isHidden) { if (data.body.note && data.body.note.isHidden) {
const note = await this.noteEntityService.pack(data.body.note.id, this.user, { const note = await this.noteEntityService.pack(data.body.note.id, this.user, {
@ -40,7 +40,7 @@ class MainChannel extends Channel {
case 'mention': { case 'mention': {
if (isInstanceMuted(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return; if (isInstanceMuted(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
if (this.muting.has(data.body.userId)) return; if (this.userIdsWhoMeMuting.has(data.body.userId)) return;
if (data.body.isHidden) { if (data.body.isHidden) {
const note = await this.noteEntityService.pack(data.body.id, this.user, { const note = await this.noteEntityService.pack(data.body.id, this.user, {
detail: true, detail: true,

View File

@ -89,11 +89,11 @@ class UserListChannel extends Channel {
} }
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する // 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.muting)) return; if (isUserRelated(note, this.userIdsWhoMeMuting)) return;
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する // 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
if (isUserRelated(note, this.blocking)) return; if (isUserRelated(note, this.userIdsWhoBlockingMe)) return;
if (note.renote && !note.text && isUserRelated(note, this.renoteMuting)) return; if (note.renote && !note.text && isUserRelated(note, this.userIdsWhoMeMutingRenotes)) return;
this.send('note', note); this.send('note', note);
} }

View File

@ -1,13 +1,11 @@
import type { User } from '@/models/entities/User.js'; import type { User } from '@/models/entities/User.js';
import type { Channel as ChannelModel } from '@/models/entities/Channel.js';
import type { FollowingsRepository, MutingsRepository, RenoteMutingsRepository, UserProfilesRepository, ChannelFollowingsRepository, BlockingsRepository } from '@/models/index.js';
import type { AccessToken } from '@/models/entities/AccessToken.js'; import type { AccessToken } from '@/models/entities/AccessToken.js';
import type { UserProfile } from '@/models/entities/UserProfile.js';
import type { Packed } from '@/misc/json-schema.js'; import type { Packed } from '@/misc/json-schema.js';
import type { GlobalEventService } from '@/core/GlobalEventService.js';
import type { NoteReadService } from '@/core/NoteReadService.js'; import type { NoteReadService } from '@/core/NoteReadService.js';
import type { NotificationService } from '@/core/NotificationService.js'; import type { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js'; import { bindThis } from '@/decorators.js';
import { CacheService } from '@/core/CacheService.js';
import { UserProfile } from '@/models/index.js';
import type { ChannelsService } from './ChannelsService.js'; import type { ChannelsService } from './ChannelsService.js';
import type * as websocket from 'websocket'; import type * as websocket from 'websocket';
import type { EventEmitter } from 'events'; import type { EventEmitter } from 'events';
@ -19,106 +17,71 @@ import type { StreamEventEmitter, StreamMessages } from './types.js';
*/ */
export default class Connection { export default class Connection {
public user?: User; public user?: User;
public userProfile?: UserProfile | null;
public following: Set<User['id']> = new Set();
public muting: Set<User['id']> = new Set();
public renoteMuting: Set<User['id']> = new Set();
public blocking: Set<User['id']> = new Set(); // "被"blocking
public followingChannels: Set<ChannelModel['id']> = new Set();
public token?: AccessToken; public token?: AccessToken;
private wsConnection: websocket.connection; private wsConnection: websocket.connection;
public subscriber: StreamEventEmitter; public subscriber: StreamEventEmitter;
private channels: Channel[] = []; private channels: Channel[] = [];
private subscribingNotes: any = {}; private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = []; private cachedNotes: Packed<'Note'>[] = [];
public userProfile: UserProfile | null = null;
public following: Set<string> = new Set();
public followingChannels: Set<string> = new Set();
public userIdsWhoMeMuting: Set<string> = new Set();
public userIdsWhoBlockingMe: Set<string> = new Set();
public userIdsWhoMeMutingRenotes: Set<string> = new Set();
private fetchIntervalId: NodeJS.Timer | null = null;
constructor( constructor(
private followingsRepository: FollowingsRepository,
private mutingsRepository: MutingsRepository,
private renoteMutingsRepository: RenoteMutingsRepository,
private blockingsRepository: BlockingsRepository,
private channelFollowingsRepository: ChannelFollowingsRepository,
private userProfilesRepository: UserProfilesRepository,
private channelsService: ChannelsService, private channelsService: ChannelsService,
private globalEventService: GlobalEventService,
private noteReadService: NoteReadService, private noteReadService: NoteReadService,
private notificationService: NotificationService, private notificationService: NotificationService,
private cacheService: CacheService,
wsConnection: websocket.connection,
subscriber: EventEmitter, subscriber: EventEmitter,
user: User | null | undefined, user: User | null | undefined,
token: AccessToken | null | undefined, token: AccessToken | null | undefined,
) { ) {
this.wsConnection = wsConnection;
this.subscriber = subscriber; this.subscriber = subscriber;
if (user) this.user = user; if (user) this.user = user;
if (token) this.token = token; if (token) this.token = token;
}
//this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this); @bindThis
//this.onUserEvent = this.onUserEvent.bind(this); public async fetch() {
//this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this); if (this.user == null) return;
//this.onBroadcastMessage = this.onBroadcastMessage.bind(this); const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([
this.cacheService.userProfileCache.fetch(this.user.id),
this.cacheService.userFollowingsCache.fetch(this.user.id),
this.cacheService.userFollowingChannelsCache.fetch(this.user.id),
this.cacheService.userMutingsCache.fetch(this.user.id),
this.cacheService.userBlockedCache.fetch(this.user.id),
this.cacheService.renoteMutingsCache.fetch(this.user.id),
]);
this.userProfile = userProfile;
this.following = following;
this.followingChannels = followingChannels;
this.userIdsWhoMeMuting = userIdsWhoMeMuting;
this.userIdsWhoBlockingMe = userIdsWhoBlockingMe;
this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes;
}
@bindThis
public async init() {
if (this.user != null) {
await this.fetch();
this.fetchIntervalId = setInterval(this.fetch, 1000 * 10);
}
}
@bindThis
public async init2(wsConnection: websocket.connection) {
this.wsConnection = wsConnection;
this.wsConnection.on('message', this.onWsConnectionMessage); this.wsConnection.on('message', this.onWsConnectionMessage);
this.subscriber.on('broadcast', data => { this.subscriber.on('broadcast', data => {
this.onBroadcastMessage(data); this.onBroadcastMessage(data);
}); });
if (this.user) {
this.updateFollowing();
this.updateMuting();
this.updateRenoteMuting();
this.updateBlocking();
this.updateFollowingChannels();
this.updateUserProfile();
this.subscriber.on(`user:${this.user.id}`, this.onUserEvent);
}
}
@bindThis
private onUserEvent(data: StreamMessages['user']['payload']) { // { type, body }と展開するとそれぞれ型が分離してしまう
switch (data.type) {
case 'follow':
this.following.add(data.body.id);
break;
case 'unfollow':
this.following.delete(data.body.id);
break;
case 'mute':
this.muting.add(data.body.id);
break;
case 'unmute':
this.muting.delete(data.body.id);
break;
// TODO: renote mute events
// TODO: block events
case 'followChannel':
this.followingChannels.add(data.body.id);
break;
case 'unfollowChannel':
this.followingChannels.delete(data.body.id);
break;
case 'updateUserProfile':
this.userProfile = data.body;
break;
case 'terminate':
this.wsConnection.close();
this.dispose();
break;
default:
break;
}
} }
/** /**
@ -318,78 +281,12 @@ export default class Connection {
} }
} }
@bindThis
private async updateFollowing() {
const followings = await this.followingsRepository.find({
where: {
followerId: this.user!.id,
},
select: ['followeeId'],
});
this.following = new Set<string>(followings.map(x => x.followeeId));
}
@bindThis
private async updateMuting() {
const mutings = await this.mutingsRepository.find({
where: {
muterId: this.user!.id,
},
select: ['muteeId'],
});
this.muting = new Set<string>(mutings.map(x => x.muteeId));
}
@bindThis
private async updateRenoteMuting() {
const renoteMutings = await this.renoteMutingsRepository.find({
where: {
muterId: this.user!.id,
},
select: ['muteeId'],
});
this.renoteMuting = new Set<string>(renoteMutings.map(x => x.muteeId));
}
@bindThis
private async updateBlocking() { // ここでいうBlockingは被Blockingの意
const blockings = await this.blockingsRepository.find({
where: {
blockeeId: this.user!.id,
},
select: ['blockerId'],
});
this.blocking = new Set<string>(blockings.map(x => x.blockerId));
}
@bindThis
private async updateFollowingChannels() {
const followings = await this.channelFollowingsRepository.find({
where: {
followerId: this.user!.id,
},
select: ['followeeId'],
});
this.followingChannels = new Set<string>(followings.map(x => x.followeeId));
}
@bindThis
private async updateUserProfile() {
this.userProfile = await this.userProfilesRepository.findOneBy({
userId: this.user!.id,
});
}
/** /**
* *
*/ */
@bindThis @bindThis
public dispose() { public dispose() {
if (this.fetchIntervalId) clearInterval(this.fetchIntervalId);
for (const c of this.channels.filter(c => c.dispose)) { for (const c of this.channels.filter(c => c.dispose)) {
if (c.dispose) c.dispose(); if (c.dispose) c.dispose();
} }

View File

@ -38,6 +38,11 @@ export interface InternalStreamTypes {
antennaDeleted: Antenna; antennaDeleted: Antenna;
antennaUpdated: Antenna; antennaUpdated: Antenna;
metaUpdated: Meta; metaUpdated: Meta;
followChannel: { userId: User['id']; channelId: Channel['id']; };
unfollowChannel: { userId: User['id']; channelId: Channel['id']; };
updateUserProfile: UserProfile;
mute: { muterId: User['id']; muteeId: User['id']; };
unmute: { muterId: User['id']; muteeId: User['id']; };
} }
export interface BroadcastTypes { export interface BroadcastTypes {
@ -56,18 +61,6 @@ export interface BroadcastTypes {
}; };
} }
export interface UserStreamTypes {
terminate: Record<string, unknown>;
followChannel: Channel;
unfollowChannel: Channel;
updateUserProfile: UserProfile;
mute: User;
unmute: User;
follow: Packed<'UserDetailedNotMe'>;
unfollow: Packed<'User'>;
userAdded: Packed<'User'>;
}
export interface MainStreamTypes { export interface MainStreamTypes {
notification: Packed<'Notification'>; notification: Packed<'Notification'>;
mention: Packed<'Note'>; mention: Packed<'Note'>;
@ -200,10 +193,6 @@ export type StreamMessages = {
name: 'broadcast'; name: 'broadcast';
payload: EventUnionFromDictionary<SerializedAll<BroadcastTypes>>; payload: EventUnionFromDictionary<SerializedAll<BroadcastTypes>>;
}; };
user: {
name: `user:${User['id']}`;
payload: EventUnionFromDictionary<SerializedAll<UserStreamTypes>>;
};
main: { main: {
name: `mainStream:${User['id']}`; name: `mainStream:${User['id']}`;
payload: EventUnionFromDictionary<SerializedAll<MainStreamTypes>>; payload: EventUnionFromDictionary<SerializedAll<MainStreamTypes>>;