Merge pull request #76 from MisskeyIO/fix-redis
fix(backend): イベント用redis分離が上手く動かない問題を修正
This commit is contained in:
		
						commit
						23db82d400
					
				|  | @ -37,8 +37,24 @@ const $redis: Provider = { | |||
| 	inject: [DI.config], | ||||
| }; | ||||
| 
 | ||||
| const $redisForPubsub: Provider = { | ||||
| 	provide: DI.redisForPubsub, | ||||
| const $redisForPub: Provider = { | ||||
| 	provide: DI.redisForPub, | ||||
| 	useFactory: (config) => { | ||||
| 		const redis = new Redis({ | ||||
| 			port: config.redisForPubsub.port, | ||||
| 			host: config.redisForPubsub.host, | ||||
| 			family: config.redisForPubsub.family == null ? 0 : config.redisForPubsub.family, | ||||
| 			password: config.redisForPubsub.pass, | ||||
| 			keyPrefix: `${config.redisForPubsub.prefix}:`, | ||||
| 			db: config.redisForPubsub.db ?? 0, | ||||
| 		}); | ||||
| 		return redis; | ||||
| 	}, | ||||
| 	inject: [DI.config], | ||||
| }; | ||||
| 
 | ||||
| const $redisForSub: Provider = { | ||||
| 	provide: DI.redisForSub, | ||||
| 	useFactory: (config) => { | ||||
| 		const redis = new Redis({ | ||||
| 			port: config.redisForPubsub.port, | ||||
|  | @ -57,14 +73,15 @@ const $redisForPubsub: Provider = { | |||
| @Global() | ||||
| @Module({ | ||||
| 	imports: [RepositoryModule], | ||||
| 	providers: [$config, $db, $redis, $redisForPubsub], | ||||
| 	exports: [$config, $db, $redis, $redisForPubsub, RepositoryModule], | ||||
| 	providers: [$config, $db, $redis, $redisForPub, $redisForSub], | ||||
| 	exports: [$config, $db, $redis, $redisForPub, $redisForSub, RepositoryModule], | ||||
| }) | ||||
| export class GlobalModule implements OnApplicationShutdown { | ||||
| 	constructor( | ||||
| 		@Inject(DI.db) private db: DataSource, | ||||
| 		@Inject(DI.redis) private redisClient: Redis.Redis, | ||||
| 		@Inject(DI.redisForPubsub) private redisForPubsub: Redis.Redis, | ||||
| 		@Inject(DI.redisForPub) private redisForPub: Redis.Redis, | ||||
| 		@Inject(DI.redisForSub) private redisForSub: Redis.Redis, | ||||
| 	) {} | ||||
| 
 | ||||
| 	async onApplicationShutdown(signal: string): Promise<void> { | ||||
|  | @ -79,7 +96,8 @@ export class GlobalModule implements OnApplicationShutdown { | |||
| 		await Promise.all([ | ||||
| 			this.db.destroy(), | ||||
| 			this.redisClient.disconnect(), | ||||
| 			this.redisForPubsub.disconnect(), | ||||
| 			this.redisForPub.disconnect(), | ||||
| 			this.redisForSub.disconnect(), | ||||
| 		]); | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -27,8 +27,8 @@ export class AntennaService implements OnApplicationShutdown { | |||
| 		@Inject(DI.redis) | ||||
| 		private redisClient: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
| 		@Inject(DI.redisForSub) | ||||
| 		private redisForSub: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.mutingsRepository) | ||||
| 		private mutingsRepository: MutingsRepository, | ||||
|  | @ -52,12 +52,12 @@ export class AntennaService implements OnApplicationShutdown { | |||
| 		this.antennasFetched = false; | ||||
| 		this.antennas = []; | ||||
| 
 | ||||
| 		this.redisForPubsub.on('message', this.onRedisMessage); | ||||
| 		this.redisForSub.on('message', this.onRedisMessage); | ||||
| 	} | ||||
| 
 | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		this.redisForPubsub.off('message', this.onRedisMessage); | ||||
| 		this.redisForSub.off('message', this.onRedisMessage); | ||||
| 	} | ||||
| 
 | ||||
| 	@bindThis | ||||
|  |  | |||
|  | @ -27,8 +27,8 @@ export class CacheService implements OnApplicationShutdown { | |||
| 		@Inject(DI.redis) | ||||
| 		private redisClient: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
| 		@Inject(DI.redisForSub) | ||||
| 		private redisForSub: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.usersRepository) | ||||
| 		private usersRepository: UsersRepository, | ||||
|  | @ -116,7 +116,7 @@ export class CacheService implements OnApplicationShutdown { | |||
| 			fromRedisConverter: (value) => new Set(JSON.parse(value)), | ||||
| 		}); | ||||
| 
 | ||||
| 		this.redisForPubsub.on('message', this.onMessage); | ||||
| 		this.redisForSub.on('message', this.onMessage); | ||||
| 	} | ||||
| 
 | ||||
| 	@bindThis | ||||
|  | @ -167,6 +167,6 @@ export class CacheService implements OnApplicationShutdown { | |||
| 
 | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		this.redisForPubsub.off('message', this.onMessage); | ||||
| 		this.redisForSub.off('message', this.onMessage); | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -26,8 +26,8 @@ export class GlobalEventService { | |||
| 		@Inject(DI.config) | ||||
| 		private config: Config, | ||||
| 
 | ||||
| 		@Inject(DI.redis) | ||||
| 		private redisClient: Redis.Redis, | ||||
| 		@Inject(DI.redisForPub) | ||||
| 		private redisForPub: Redis.Redis, | ||||
| 	) { | ||||
| 	} | ||||
| 
 | ||||
|  | @ -37,7 +37,7 @@ export class GlobalEventService { | |||
| 			{ type: type, body: null } : | ||||
| 			{ type: type, body: value }; | ||||
| 
 | ||||
| 		this.redisClient.publish(this.config.host, JSON.stringify({ | ||||
| 		this.redisForPub.publish(this.config.host, JSON.stringify({ | ||||
| 			channel: channel, | ||||
| 			message: message, | ||||
| 		})); | ||||
|  |  | |||
|  | @ -14,8 +14,8 @@ export class MetaService implements OnApplicationShutdown { | |||
| 	private intervalId: NodeJS.Timer; | ||||
| 
 | ||||
| 	constructor( | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
| 		@Inject(DI.redisForSub) | ||||
| 		private redisForSub: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.db) | ||||
| 		private db: DataSource, | ||||
|  | @ -33,7 +33,7 @@ export class MetaService implements OnApplicationShutdown { | |||
| 			}, 1000 * 60 * 5); | ||||
| 		} | ||||
| 
 | ||||
| 		this.redisForPubsub.on('message', this.onMessage); | ||||
| 		this.redisForSub.on('message', this.onMessage); | ||||
| 	} | ||||
| 
 | ||||
| 	@bindThis | ||||
|  | @ -122,6 +122,6 @@ export class MetaService implements OnApplicationShutdown { | |||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		clearInterval(this.intervalId); | ||||
| 		this.redisForPubsub.off('message', this.onMessage); | ||||
| 		this.redisForSub.off('message', this.onMessage); | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -64,8 +64,8 @@ export class RoleService implements OnApplicationShutdown { | |||
| 	public static NotAssignedError = class extends Error {}; | ||||
| 
 | ||||
| 	constructor( | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
| 		@Inject(DI.redisForSub) | ||||
| 		private redisForSub: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.usersRepository) | ||||
| 		private usersRepository: UsersRepository, | ||||
|  | @ -87,7 +87,7 @@ export class RoleService implements OnApplicationShutdown { | |||
| 		this.rolesCache = new MemorySingleCache<Role[]>(1000 * 60 * 60 * 1); | ||||
| 		this.roleAssignmentByUserIdCache = new MemoryKVCache<RoleAssignment[]>(1000 * 60 * 60 * 1); | ||||
| 
 | ||||
| 		this.redisForPubsub.on('message', this.onMessage); | ||||
| 		this.redisForSub.on('message', this.onMessage); | ||||
| 	} | ||||
| 
 | ||||
| 	@bindThis | ||||
|  | @ -400,6 +400,6 @@ export class RoleService implements OnApplicationShutdown { | |||
| 
 | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		this.redisForPubsub.off('message', this.onMessage); | ||||
| 		this.redisForSub.off('message', this.onMessage); | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -13,14 +13,14 @@ export class WebhookService implements OnApplicationShutdown { | |||
| 	private webhooks: Webhook[] = []; | ||||
| 
 | ||||
| 	constructor( | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
| 		@Inject(DI.redisForSub) | ||||
| 		private redisForSub: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.webhooksRepository) | ||||
| 		private webhooksRepository: WebhooksRepository, | ||||
| 	) { | ||||
| 		//this.onMessage = this.onMessage.bind(this);
 | ||||
| 		this.redisForPubsub.on('message', this.onMessage); | ||||
| 		this.redisForSub.on('message', this.onMessage); | ||||
| 	} | ||||
| 
 | ||||
| 	@bindThis | ||||
|  | @ -82,6 +82,6 @@ export class WebhookService implements OnApplicationShutdown { | |||
| 
 | ||||
| 	@bindThis | ||||
| 	public onApplicationShutdown(signal?: string | undefined) { | ||||
| 		this.redisForPubsub.off('message', this.onMessage); | ||||
| 		this.redisForSub.off('message', this.onMessage); | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -2,7 +2,8 @@ export const DI = { | |||
| 	config: Symbol('config'), | ||||
| 	db: Symbol('db'), | ||||
| 	redis: Symbol('redis'), | ||||
| 	redisForPubsub: Symbol('redisForPubsub'), | ||||
| 	redisForPub: Symbol('redisForPub'), | ||||
| 	redisForSub: Symbol('redisForSub'), | ||||
| 
 | ||||
| 	//#region Repositories
 | ||||
| 	usersRepository: Symbol('usersRepository'), | ||||
|  |  | |||
|  | @ -22,8 +22,8 @@ export class StreamingApiServerService { | |||
| 		@Inject(DI.config) | ||||
| 		private config: Config, | ||||
| 
 | ||||
| 		@Inject(DI.redisForPubsub) | ||||
| 		private redisForPubsub: Redis.Redis, | ||||
| 		@Inject(DI.redisForSub) | ||||
| 		private redisForSub: Redis.Redis, | ||||
| 
 | ||||
| 		@Inject(DI.usersRepository) | ||||
| 		private usersRepository: UsersRepository, | ||||
|  | @ -81,7 +81,7 @@ export class StreamingApiServerService { | |||
| 				ev.emit(parsed.channel, parsed.message); | ||||
| 			} | ||||
| 
 | ||||
| 			this.redisForPubsub.on('message', onRedisMessage); | ||||
| 			this.redisForSub.on('message', onRedisMessage); | ||||
| 
 | ||||
| 			const main = new MainStreamConnection( | ||||
| 				this.channelsService, | ||||
|  | @ -111,7 +111,7 @@ export class StreamingApiServerService { | |||
| 			connection.once('close', () => { | ||||
| 				ev.removeAllListeners(); | ||||
| 				main.dispose(); | ||||
| 				this.redisForPubsub.off('message', onRedisMessage); | ||||
| 				this.redisForSub.off('message', onRedisMessage); | ||||
| 				if (intervalId) clearInterval(intervalId); | ||||
| 			}); | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue