From 827c378ac15f52427c6c7eb589fe115891002658 Mon Sep 17 00:00:00 2001 From: MeiMei <30769358+mei23@users.noreply.github.com> Date: Mon, 9 Sep 2019 22:46:45 +0900 Subject: [PATCH] AP Lock (#5410) --- package.json | 1 + src/misc/app-lock.ts | 22 ++++++ .../activitypub/kernel/announce/note.ts | 79 ++++++++++--------- src/remote/activitypub/kernel/create/note.ts | 16 +++- src/remote/activitypub/models/note.ts | 37 ++++----- yarn.lock | 5 ++ 6 files changed, 100 insertions(+), 60 deletions(-) create mode 100644 src/misc/app-lock.ts diff --git a/package.json b/package.json index c8e65b8b4c..ce82682669 100644 --- a/package.json +++ b/package.json @@ -199,6 +199,7 @@ "recaptcha-promise": "0.1.3", "reconnecting-websocket": "4.2.0", "redis": "2.8.0", + "redis-lock": "0.1.4", "reflect-metadata": "0.1.13", "rename": "1.0.4", "request": "2.88.0", diff --git a/src/misc/app-lock.ts b/src/misc/app-lock.ts new file mode 100644 index 0000000000..30579ed934 --- /dev/null +++ b/src/misc/app-lock.ts @@ -0,0 +1,22 @@ +import redis from '../db/redis'; +import { promisify } from 'util'; + +/** + * Retry delay (ms) for lock acquisition + */ +const retryDelay = 100; + +const lock: (key: string, timeout?: number) => Promise<() => void> + = redis + ? promisify(require('redis-lock')(redis, retryDelay)) + : async () => () => { }; + +/** + * Get AP Object lock + * @param uri AP object ID + * @param timeout Lock timeout (ms), The timeout releases previous lock. + * @returns Unlock function + */ +export function getApLock(uri: string, timeout = 30 * 1000) { + return lock(`ap-object:${uri}`, timeout); +} diff --git a/src/remote/activitypub/kernel/announce/note.ts b/src/remote/activitypub/kernel/announce/note.ts index 2a07f50c8a..f0594a57b7 100644 --- a/src/remote/activitypub/kernel/announce/note.ts +++ b/src/remote/activitypub/kernel/announce/note.ts @@ -7,6 +7,7 @@ import { resolvePerson } from '../../models/person'; import { apLogger } from '../../logger'; import { extractDbHost } from '../../../../misc/convert-host'; import { fetchMeta } from '../../../../misc/fetch-meta'; +import { getApLock } from '../../../../misc/app-lock'; const logger = apLogger; @@ -25,47 +26,53 @@ export default async function(resolver: Resolver, actor: IRemoteUser, activity: const meta = await fetchMeta(); if (meta.blockedHosts.includes(extractDbHost(uri))) return; - // 既に同じURIを持つものが登録されていないかチェック - const exist = await fetchNote(uri); - if (exist) { - return; - } + const unlock = await getApLock(uri); - // Announce対象をresolve - let renote; try { - renote = await resolveNote(note); - } catch (e) { - // 対象が4xxならスキップ - if (e.statusCode >= 400 && e.statusCode < 500) { - logger.warn(`Ignored announce target ${note.inReplyTo} - ${e.statusCode}`); + // 既に同じURIを持つものが登録されていないかチェック + const exist = await fetchNote(uri); + if (exist) { return; } - logger.warn(`Error in announce target ${note.inReplyTo} - ${e.statusCode || e}`); - throw e; + + // Announce対象をresolve + let renote; + try { + renote = await resolveNote(note); + } catch (e) { + // 対象が4xxならスキップ + if (e.statusCode >= 400 && e.statusCode < 500) { + logger.warn(`Ignored announce target ${note.inReplyTo} - ${e.statusCode}`); + return; + } + logger.warn(`Error in announce target ${note.inReplyTo} - ${e.statusCode || e}`); + throw e; + } + + logger.info(`Creating the (Re)Note: ${uri}`); + + //#region Visibility + const to = getApIds(activity.to); + const cc = getApIds(activity.cc); + + const visibility = getVisibility(to, cc, actor); + + let visibleUsers: User[] = []; + if (visibility == 'specified') { + visibleUsers = await Promise.all(to.map(uri => resolvePerson(uri))); + } + //#endergion + + await post(actor, { + createdAt: activity.published ? new Date(activity.published) : null, + renote, + visibility, + visibleUsers, + uri + }); + } finally { + unlock(); } - - logger.info(`Creating the (Re)Note: ${uri}`); - - //#region Visibility - const to = getApIds(activity.to); - const cc = getApIds(activity.cc); - - const visibility = getVisibility(to, cc, actor); - - let visibleUsers: User[] = []; - if (visibility == 'specified') { - visibleUsers = await Promise.all(to.map(uri => resolvePerson(uri))); - } - //#endergion - - await post(actor, { - createdAt: activity.published ? new Date(activity.published) : null, - renote, - visibility, - visibleUsers, - uri - }); } type visibility = 'public' | 'home' | 'followers' | 'specified'; diff --git a/src/remote/activitypub/kernel/create/note.ts b/src/remote/activitypub/kernel/create/note.ts index 70e61bdf1b..a28eaa11fb 100644 --- a/src/remote/activitypub/kernel/create/note.ts +++ b/src/remote/activitypub/kernel/create/note.ts @@ -1,13 +1,23 @@ import Resolver from '../../resolver'; import { IRemoteUser } from '../../../../models/entities/user'; import { createNote, fetchNote } from '../../models/note'; +import { getApId } from '../../type'; +import { getApLock } from '../../../../misc/app-lock'; /** * 投稿作成アクティビティを捌きます */ export default async function(resolver: Resolver, actor: IRemoteUser, note: any, silent = false): Promise { - const exist = await fetchNote(note); - if (exist == null) { - await createNote(note); + const uri = getApId(note); + + const unlock = await getApLock(uri); + + try { + const exist = await fetchNote(note); + if (exist == null) { + await createNote(note); + } + } finally { + unlock(); } } diff --git a/src/remote/activitypub/models/note.ts b/src/remote/activitypub/models/note.ts index 9afbb39151..6430813589 100644 --- a/src/remote/activitypub/models/note.ts +++ b/src/remote/activitypub/models/note.ts @@ -22,6 +22,7 @@ import { Emoji } from '../../../models/entities/emoji'; import { genId } from '../../../misc/gen-id'; import { fetchMeta } from '../../../misc/fetch-meta'; import { ensure } from '../../../prelude/ensure'; +import { getApLock } from '../../../misc/app-lock'; const logger = apLogger; @@ -257,30 +258,24 @@ export async function resolveNote(value: string | IObject, resolver?: Resolver): const meta = await fetchMeta(); if (meta.blockedHosts.includes(extractDbHost(uri))) throw { statusCode: 451 }; - //#region このサーバーに既に登録されていたらそれを返す - const exist = await fetchNote(uri); + const unlock = await getApLock(uri); - if (exist) { - return exist; - } - //#endregion + try { + //#region このサーバーに既に登録されていたらそれを返す + const exist = await fetchNote(uri); - // リモートサーバーからフェッチしてきて登録 - // ここでuriの代わりに添付されてきたNote Objectが指定されていると、サーバーフェッチを経ずにノートが生成されるが - // 添付されてきたNote Objectは偽装されている可能性があるため、常にuriを指定してサーバーフェッチを行う。 - return await createNote(uri, resolver, true).catch(e => { - if (e.name === 'duplicated') { - return fetchNote(uri).then(note => { - if (note == null) { - throw new Error('something happened'); - } else { - return note; - } - }); - } else { - throw e; + if (exist) { + return exist; } - }); + //#endregion + + // リモートサーバーからフェッチしてきて登録 + // ここでuriの代わりに添付されてきたNote Objectが指定されていると、サーバーフェッチを経ずにノートが生成されるが + // 添付されてきたNote Objectは偽装されている可能性があるため、常にuriを指定してサーバーフェッチを行う。 + return await createNote(uri, resolver, true); + } finally { + unlock(); + } } export async function extractEmojis(tags: ITag[], host: string): Promise { diff --git a/yarn.lock b/yarn.lock index 1b20c58129..5481200849 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9561,6 +9561,11 @@ redis-errors@^1.0.0, redis-errors@^1.2.0: resolved "https://registry.yarnpkg.com/redis-errors/-/redis-errors-1.2.0.tgz#eb62d2adb15e4eaf4610c04afe1529384250abad" integrity sha1-62LSrbFeTq9GEMBK/hUpOEJQq60= +redis-lock@0.1.4: + version "0.1.4" + resolved "https://registry.yarnpkg.com/redis-lock/-/redis-lock-0.1.4.tgz#e83590bee22b5f01cdb65bfbd88d988045356272" + integrity sha512-7/+zu86XVQfJVx1nHTzux5reglDiyUCDwmW7TSlvVezfhH2YLc/Rc8NE0ejQG+8/0lwKzm29/u/4+ogKeLosiA== + redis-parser@^2.6.0: version "2.6.0" resolved "https://registry.yarnpkg.com/redis-parser/-/redis-parser-2.6.0.tgz#52ed09dacac108f1a631c07e9b69941e7a19504b"