perf(backend): Use addBulk to add deliver queues (#96)
* perf(backend): Use addBulk to add deliver queues (#11114) * BullMQ -> Bull --------- Co-authored-by: tamaina <tamaina@hotmail.co.jp>
This commit is contained in:
parent
680f367924
commit
3e93450fd4
|
@ -9,7 +9,7 @@ import { DI } from '@/di-symbols.js';
|
||||||
import { bindThis } from '@/decorators.js';
|
import { bindThis } from '@/decorators.js';
|
||||||
import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
|
import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
|
||||||
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
|
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
|
||||||
import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
|
import type { DbJobData, DeliverJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
|
||||||
import type httpSignature from '@peertube/http-signature';
|
import type httpSignature from '@peertube/http-signature';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
|
@ -33,7 +33,7 @@ export class QueueService {
|
||||||
if (content == null) return null;
|
if (content == null) return null;
|
||||||
if (to == null) return null;
|
if (to == null) return null;
|
||||||
|
|
||||||
const data = {
|
const data: DeliverJobData = {
|
||||||
user: {
|
user: {
|
||||||
id: user.id,
|
id: user.id,
|
||||||
},
|
},
|
||||||
|
@ -53,6 +53,39 @@ export class QueueService {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ApDeliverManager-DeliverManager.execute()からinboxesを突っ込んでaddBulkしたい
|
||||||
|
* @param user `{ id: string; }` この関数ではThinUserに変換しないので前もって変換してください
|
||||||
|
* @param content IActivity | null
|
||||||
|
* @param inboxes `Map<string, boolean>` / key: to (inbox url), value: isSharedInbox (whether it is sharedInbox)
|
||||||
|
* @returns void
|
||||||
|
*/
|
||||||
|
@bindThis
|
||||||
|
public async deliverMany(user: ThinUser, content: IActivity | null, inboxes: Map<string, boolean>) {
|
||||||
|
const opts = {
|
||||||
|
attempts: this.config.deliverJobMaxAttempts ?? 12,
|
||||||
|
timeout: 1 * 60 * 1000, // 1min
|
||||||
|
backoff: {
|
||||||
|
type: 'apBackoff',
|
||||||
|
},
|
||||||
|
removeOnComplete: true,
|
||||||
|
removeOnFail: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
await this.deliverQueue.addBulk(Array.from(inboxes.entries()).map(d => ({
|
||||||
|
name: d[0],
|
||||||
|
data: {
|
||||||
|
user,
|
||||||
|
content,
|
||||||
|
to: d[0],
|
||||||
|
isSharedInbox: d[1],
|
||||||
|
} as DeliverJobData,
|
||||||
|
opts,
|
||||||
|
})));
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
|
public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
|
||||||
const data = {
|
const data = {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import type { LocalUser, RemoteUser, User } from '@/models/entities/User.js';
|
||||||
import { QueueService } from '@/core/QueueService.js';
|
import { QueueService } from '@/core/QueueService.js';
|
||||||
import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
||||||
import { bindThis } from '@/decorators.js';
|
import { bindThis } from '@/decorators.js';
|
||||||
|
import { ThinUser } from '@/queue/types.js';
|
||||||
|
|
||||||
interface IRecipe {
|
interface IRecipe {
|
||||||
type: string;
|
type: string;
|
||||||
|
@ -94,7 +95,7 @@ export class ApDeliverManagerService {
|
||||||
}
|
}
|
||||||
|
|
||||||
class DeliverManager {
|
class DeliverManager {
|
||||||
private actor: { id: User['id']; host: null; };
|
private actor: ThinUser;
|
||||||
private activity: any;
|
private activity: any;
|
||||||
private recipes: IRecipe[] = [];
|
private recipes: IRecipe[] = [];
|
||||||
|
|
||||||
|
@ -111,7 +112,13 @@ class DeliverManager {
|
||||||
actor: { id: User['id']; host: null; },
|
actor: { id: User['id']; host: null; },
|
||||||
activity: any,
|
activity: any,
|
||||||
) {
|
) {
|
||||||
this.actor = actor;
|
// 型で弾いてはいるが一応ローカルユーザーかチェック
|
||||||
|
if (actor.host != null) throw new Error('actor.host must be null');
|
||||||
|
|
||||||
|
// パフォーマンス向上のためキューに突っ込むのはidのみに絞る
|
||||||
|
this.actor = {
|
||||||
|
id: actor.id,
|
||||||
|
};
|
||||||
this.activity = activity;
|
this.activity = activity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,9 +162,8 @@ class DeliverManager {
|
||||||
*/
|
*/
|
||||||
@bindThis
|
@bindThis
|
||||||
public async execute() {
|
public async execute() {
|
||||||
if (!this.userEntityService.isLocalUser(this.actor)) return;
|
|
||||||
|
|
||||||
// The value flags whether it is shared or not.
|
// The value flags whether it is shared or not.
|
||||||
|
// key: inbox URL, value: whether it is sharedInbox
|
||||||
const inboxes = new Map<string, boolean>();
|
const inboxes = new Map<string, boolean>();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -201,9 +207,6 @@ class DeliverManager {
|
||||||
.forEach(recipe => inboxes.set(recipe.to.inbox!, false));
|
.forEach(recipe => inboxes.set(recipe.to.inbox!, false));
|
||||||
|
|
||||||
// deliver
|
// deliver
|
||||||
for (const inbox of inboxes) {
|
this.queueService.deliverMany(this.actor, this.activity, inboxes);
|
||||||
// inbox[0]: inbox, inbox[1]: whether it is sharedInbox
|
|
||||||
this.queueService.deliver(this.actor, this.activity, inbox[0], inbox[1]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue