refactor(backend): ノートのエクスポート処理でStreams APIを使うように (#13465)
* refactor(backend): ノートのエクスポート処理でStreams APIを使うように * fixup! refactor(backend): ノートのエクスポート処理でStreams APIを使うように `await`忘れにより、ジョブがすぐに完了したことになり削除されてしまっていた。 それによって、`NoteStream`内での`updateProgress`メソッドの呼び出しで、`Missing key for job`のエラーが発生することがあった。 --------- Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
This commit is contained in:
parent
0d47877db1
commit
b7d9d16201
|
@ -0,0 +1,31 @@
|
|||
import * as fs from 'node:fs/promises';
|
||||
import type { PathLike } from 'node:fs';
|
||||
|
||||
/**
|
||||
* `fs.createWriteStream()`相当のことを行う`WritableStream` (Web標準)
|
||||
*/
|
||||
export class FileWriterStream extends WritableStream<Uint8Array> {
|
||||
constructor(path: PathLike) {
|
||||
let file: fs.FileHandle | null = null;
|
||||
|
||||
super({
|
||||
start: async () => {
|
||||
file = await fs.open(path, 'a');
|
||||
},
|
||||
write: async (chunk, controller) => {
|
||||
if (file === null) {
|
||||
controller.error();
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
await file.write(chunk);
|
||||
},
|
||||
close: async () => {
|
||||
await file?.close();
|
||||
},
|
||||
abort: async () => {
|
||||
await file?.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
import { TransformStream } from 'node:stream/web';
|
||||
|
||||
/**
|
||||
* ストリームに流れてきた各データについて`JSON.stringify()`した上で、それらを一つの配列にまとめる
|
||||
*/
|
||||
export class JsonArrayStream extends TransformStream<unknown, string> {
|
||||
constructor() {
|
||||
/** 最初の要素かどうかを変数に記録 */
|
||||
let isFirst = true;
|
||||
|
||||
super({
|
||||
start(controller) {
|
||||
controller.enqueue('[');
|
||||
},
|
||||
flush(controller) {
|
||||
controller.enqueue(']');
|
||||
},
|
||||
transform(chunk, controller) {
|
||||
if (isFirst) {
|
||||
isFirst = false;
|
||||
} else {
|
||||
// 妥当なJSON配列にするためには最初以外の要素の前に`,`を挿入しなければならない
|
||||
controller.enqueue(',\n');
|
||||
}
|
||||
|
||||
controller.enqueue(JSON.stringify(chunk));
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import * as fs from 'node:fs';
|
||||
import { ReadableStream, TextEncoderStream } from 'node:stream/web';
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { MoreThan } from 'typeorm';
|
||||
import { format as dateFormat } from 'date-fns';
|
||||
|
@ -18,10 +18,82 @@ import { bindThis } from '@/decorators.js';
|
|||
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
|
||||
import { Packed } from '@/misc/json-schema.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import { JsonArrayStream } from '@/misc/JsonArrayStream.js';
|
||||
import { FileWriterStream } from '@/misc/FileWriterStream.js';
|
||||
import { QueueLoggerService } from '../QueueLoggerService.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
import type { DbJobDataWithUser } from '../types.js';
|
||||
|
||||
class NoteStream extends ReadableStream<Record<string, unknown>> {
|
||||
constructor(
|
||||
job: Bull.Job,
|
||||
notesRepository: NotesRepository,
|
||||
pollsRepository: PollsRepository,
|
||||
driveFileEntityService: DriveFileEntityService,
|
||||
idService: IdService,
|
||||
userId: string,
|
||||
) {
|
||||
let exportedNotesCount = 0;
|
||||
let cursor: MiNote['id'] | null = null;
|
||||
|
||||
const serialize = (
|
||||
note: MiNote,
|
||||
poll: MiPoll | null,
|
||||
files: Packed<'DriveFile'>[],
|
||||
): Record<string, unknown> => {
|
||||
return {
|
||||
id: note.id,
|
||||
text: note.text,
|
||||
createdAt: idService.parse(note.id).date.toISOString(),
|
||||
fileIds: note.fileIds,
|
||||
files: files,
|
||||
replyId: note.replyId,
|
||||
renoteId: note.renoteId,
|
||||
poll: poll,
|
||||
cw: note.cw,
|
||||
visibility: note.visibility,
|
||||
visibleUserIds: note.visibleUserIds,
|
||||
localOnly: note.localOnly,
|
||||
reactionAcceptance: note.reactionAcceptance,
|
||||
};
|
||||
};
|
||||
|
||||
super({
|
||||
async pull(controller): Promise<void> {
|
||||
const notes = await notesRepository.find({
|
||||
where: {
|
||||
userId,
|
||||
...(cursor !== null ? { id: MoreThan(cursor) } : {}),
|
||||
},
|
||||
take: 100, // 100件ずつ取得
|
||||
order: { id: 1 },
|
||||
});
|
||||
|
||||
if (notes.length === 0) {
|
||||
job.updateProgress(100);
|
||||
controller.close();
|
||||
}
|
||||
|
||||
cursor = notes.at(-1)?.id ?? null;
|
||||
|
||||
for (const note of notes) {
|
||||
const poll = note.hasPoll
|
||||
? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1
|
||||
: null;
|
||||
const files = await driveFileEntityService.packManyByIds(note.fileIds); // N+1
|
||||
const content = serialize(note, poll, files);
|
||||
|
||||
controller.enqueue(content);
|
||||
exportedNotesCount++;
|
||||
}
|
||||
|
||||
const total = await notesRepository.countBy({ userId });
|
||||
job.updateProgress(exportedNotesCount / total);
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class ExportNotesProcessorService {
|
||||
private logger: Logger;
|
||||
|
@ -59,67 +131,19 @@ export class ExportNotesProcessorService {
|
|||
this.logger.info(`Temp file is ${path}`);
|
||||
|
||||
try {
|
||||
const stream = fs.createWriteStream(path, { flags: 'a' });
|
||||
// メモリが足りなくならないようにストリームで処理する
|
||||
await new NoteStream(
|
||||
job,
|
||||
this.notesRepository,
|
||||
this.pollsRepository,
|
||||
this.driveFileEntityService,
|
||||
this.idService,
|
||||
user.id,
|
||||
)
|
||||
.pipeThrough(new JsonArrayStream())
|
||||
.pipeThrough(new TextEncoderStream())
|
||||
.pipeTo(new FileWriterStream(path));
|
||||
|
||||
const write = (text: string): Promise<void> => {
|
||||
return new Promise<void>((res, rej) => {
|
||||
stream.write(text, err => {
|
||||
if (err) {
|
||||
this.logger.error(err);
|
||||
rej(err);
|
||||
} else {
|
||||
res();
|
||||
}
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
await write('[');
|
||||
|
||||
let exportedNotesCount = 0;
|
||||
let cursor: MiNote['id'] | null = null;
|
||||
|
||||
while (true) {
|
||||
const notes = await this.notesRepository.find({
|
||||
where: {
|
||||
userId: user.id,
|
||||
...(cursor ? { id: MoreThan(cursor) } : {}),
|
||||
},
|
||||
take: 100,
|
||||
order: {
|
||||
id: 1,
|
||||
},
|
||||
}) as MiNote[];
|
||||
|
||||
if (notes.length === 0) {
|
||||
job.updateProgress(100);
|
||||
break;
|
||||
}
|
||||
|
||||
cursor = notes.at(-1)?.id ?? null;
|
||||
|
||||
for (const note of notes) {
|
||||
let poll: MiPoll | undefined;
|
||||
if (note.hasPoll) {
|
||||
poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
|
||||
}
|
||||
const files = await this.driveFileEntityService.packManyByIds(note.fileIds);
|
||||
const content = JSON.stringify(this.serialize(note, poll, files));
|
||||
const isFirst = exportedNotesCount === 0;
|
||||
await write(isFirst ? content : ',\n' + content);
|
||||
exportedNotesCount++;
|
||||
}
|
||||
|
||||
const total = await this.notesRepository.countBy({
|
||||
userId: user.id,
|
||||
});
|
||||
|
||||
job.updateProgress(exportedNotesCount / total);
|
||||
}
|
||||
|
||||
await write(']');
|
||||
|
||||
stream.end();
|
||||
this.logger.succ(`Exported to: ${path}`);
|
||||
|
||||
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
|
||||
|
@ -130,22 +154,4 @@ export class ExportNotesProcessorService {
|
|||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> {
|
||||
return {
|
||||
id: note.id,
|
||||
text: note.text,
|
||||
createdAt: this.idService.parse(note.id).date.toISOString(),
|
||||
fileIds: note.fileIds,
|
||||
files: files,
|
||||
replyId: note.replyId,
|
||||
renoteId: note.renoteId,
|
||||
poll: poll,
|
||||
cw: note.cw,
|
||||
visibility: note.visibility,
|
||||
visibleUserIds: note.visibleUserIds,
|
||||
localOnly: note.localOnly,
|
||||
reactionAcceptance: note.reactionAcceptance,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue