feat: 重複排除

This commit is contained in:
usbharu 2023-12-19 17:00:15 +09:00
parent f98178b37c
commit aa2741d614
26 changed files with 334 additions and 110 deletions

2
.gitignore vendored
View File

@ -43,3 +43,5 @@ out/
/tomcat-e2e/ /tomcat-e2e/
/e2eTest.log /e2eTest.log
/files/ /files/
*.log

View File

@ -49,7 +49,7 @@ class NoteQueryServiceImpl(private val postRepository: PostRepository, private v
val replyId = this[Posts.replyId] val replyId = this[Posts.replyId]
val replyTo = if (replyId != null) { val replyTo = if (replyId != null) {
try { try {
postRepository.findById(replyId).url postRepository.findById(replyId)?.url ?: throw FailedToGetResourcesException()
} catch (e: FailedToGetResourcesException) { } catch (e: FailedToGetResourcesException) {
logger.warn("Failed to get replyId: $replyId", e) logger.warn("Failed to get replyId: $replyId", e)
null null

View File

@ -30,7 +30,7 @@ class APLikeProcessor(
val personWithEntity = apUserService.fetchPersonWithEntity(actor) val personWithEntity = apUserService.fetchPersonWithEntity(actor)
try { try {
apNoteService.fetchNoteAsync(target).await() apNoteService.fetchNote(target)
} catch (e: FailedToGetActivityPubResourceException) { } catch (e: FailedToGetActivityPubResourceException) {
logger.debug("FAILED failed to get {}", target) logger.debug("FAILED failed to get {}", target)
logger.trace("", e) logger.trace("", e)

View File

@ -229,7 +229,6 @@ class APServiceImpl(
props[it.json] = json props[it.json] = json
props[it.type] = type.name props[it.type] = type.name
val writeValueAsString = objectMapper.writeValueAsString(httpRequest) val writeValueAsString = objectMapper.writeValueAsString(httpRequest)
println(writeValueAsString)
props[it.httpRequest] = writeValueAsString props[it.httpRequest] = writeValueAsString
props[it.headers] = objectMapper.writeValueAsString(map) props[it.headers] = objectMapper.writeValueAsString(map)
} }

View File

@ -18,7 +18,7 @@ abstract class AbstractActivityPubProcessor<T : Object>(
if (activity.isAuthorized.not() && allowUnauthorized.not()) { if (activity.isAuthorized.not() && allowUnauthorized.not()) {
throw HttpSignatureUnauthorizedException() throw HttpSignatureUnauthorizedException()
} }
logger.info("START ActivityPub process") logger.info("START ActivityPub process. {}", this.type())
try { try {
transaction.transaction { transaction.transaction {
internalProcess(activity) internalProcess(activity)
@ -27,7 +27,7 @@ abstract class AbstractActivityPubProcessor<T : Object>(
logger.warn("FAILED ActivityPub process", e) logger.warn("FAILED ActivityPub process", e)
throw FailedProcessException("Failed process", e) throw FailedProcessException("Failed process", e)
} }
logger.info("SUCCESS ActivityPub process") logger.info("SUCCESS ActivityPub process. {}", this.type())
} }
abstract suspend fun internalProcess(activity: ActivityPubProcessContext<T>) abstract suspend fun internalProcess(activity: ActivityPubProcessContext<T>)

View File

@ -15,28 +15,11 @@ import dev.usbharu.hideout.core.service.media.MediaService
import dev.usbharu.hideout.core.service.media.RemoteMedia import dev.usbharu.hideout.core.service.media.RemoteMedia
import dev.usbharu.hideout.core.service.post.PostService import dev.usbharu.hideout.core.service.post.PostService
import io.ktor.client.plugins.* import io.ktor.client.plugins.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.slf4j.MDCContext
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.cache.annotation.Cacheable
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.time.Instant import java.time.Instant
interface APNoteService { interface APNoteService {
@Cacheable("fetchNote")
fun fetchNoteAsync(url: String, targetActor: String? = null): Deferred<Note> {
return CoroutineScope(Dispatchers.IO + MDCContext()).async {
newSuspendedTransaction(MDCContext()) {
fetchNote(url, targetActor)
}
}
}
suspend fun fetchNote(url: String, targetActor: String? = null): Note suspend fun fetchNote(url: String, targetActor: String? = null): Note
suspend fun fetchNote(note: Note, targetActor: String? = null): Note suspend fun fetchNote(note: Note, targetActor: String? = null): Note
} }
@ -77,7 +60,7 @@ class APNoteServiceImpl(
) )
throw FailedToGetActivityPubResourceException("Could not retrieve $url.", e) throw FailedToGetActivityPubResourceException("Could not retrieve $url.", e)
} }
val savedNote = saveNote(note, targetActor, url) val savedNote = saveIfMissing(note, targetActor, url)
logger.debug("SUCCESS Fetch Note url: {}", url) logger.debug("SUCCESS Fetch Note url: {}", url)
return savedNote return savedNote
} }
@ -89,11 +72,15 @@ class APNoteServiceImpl(
): Note { ): Note {
requireNotNull(note.id) { "id is null" } requireNotNull(note.id) { "id is null" }
return try { return try {
noteQueryService.findByApid(note.id).first noteQueryService.findByApid(note.id).first
} catch (_: FailedToGetResourcesException) { } catch (e: FailedToGetResourcesException) {
saveNote(note, targetActor, url) saveNote(note, targetActor, url)
} }
} }
private suspend fun saveNote(note: Note, targetActor: String?, url: String): Note { private suspend fun saveNote(note: Note, targetActor: String?, url: String): Note {
@ -102,6 +89,10 @@ class APNoteServiceImpl(
targetActor targetActor
) )
if (postRepository.existByApIdWithLock(note.id)) {
return note
}
logger.debug("VISIBILITY url: {} to: {} cc: {}", note.id, note.to, note.cc) logger.debug("VISIBILITY url: {} to: {} cc: {}", note.id, note.to, note.cc)
val visibility = val visibility =

View File

@ -12,8 +12,8 @@ import dev.usbharu.hideout.core.domain.model.actor.Actor
import dev.usbharu.hideout.core.domain.model.actor.ActorRepository import dev.usbharu.hideout.core.domain.model.actor.ActorRepository
import dev.usbharu.hideout.core.service.user.RemoteUserCreateDto import dev.usbharu.hideout.core.service.user.RemoteUserCreateDto
import dev.usbharu.hideout.core.service.user.UserService import dev.usbharu.hideout.core.service.user.UserService
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
interface APUserService { interface APUserService {
suspend fun getPersonByName(name: String): Person suspend fun getPersonByName(name: String): Person
@ -76,7 +76,6 @@ class APUserServiceImpl(
override suspend fun fetchPerson(url: String, targetActor: String?): Person = override suspend fun fetchPerson(url: String, targetActor: String?): Person =
fetchPersonWithEntity(url, targetActor).first fetchPersonWithEntity(url, targetActor).first
@Transactional
override suspend fun fetchPersonWithEntity(url: String, targetActor: String?): Pair<Person, Actor> { override suspend fun fetchPersonWithEntity(url: String, targetActor: String?): Pair<Person, Actor> {
val userEntity = actorRepository.findByUrl(url) val userEntity = actorRepository.findByUrl(url)
@ -142,4 +141,8 @@ class APUserServiceImpl(
following = actorEntity.following, following = actorEntity.following,
manuallyApprovesFollowers = actorEntity.locked manuallyApprovesFollowers = actorEntity.locked
) )
companion object {
private val logger = LoggerFactory.getLogger(APUserServiceImpl::class.java)
}
} }

View File

@ -1,24 +1,37 @@
package dev.usbharu.hideout.application.infrastructure.exposed package dev.usbharu.hideout.application.infrastructure.exposed
import dev.usbharu.hideout.application.external.Transaction import dev.usbharu.hideout.application.external.Transaction
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.slf4j.MDCContext import kotlinx.coroutines.slf4j.MDCContext
import org.jetbrains.exposed.sql.StdOutSqlLogger import org.jetbrains.exposed.sql.Slf4jSqlDebugLogger
import org.jetbrains.exposed.sql.addLogger import org.jetbrains.exposed.sql.addLogger
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.sql.Connection
@Service @Service
class ExposedTransaction : Transaction { class ExposedTransaction : Transaction {
override suspend fun <T> transaction(block: suspend () -> T): T { override suspend fun <T> transaction(block: suspend () -> T): T {
return newSuspendedTransaction(MDCContext()) { // return newSuspendedTransaction(MDCContext(), transactionIsolation = java.sql.Connection.TRANSACTION_READ_COMMITTED) {
addLogger(StdOutSqlLogger) // warnLongQueriesDuration = 1000
block() // addLogger(Slf4jSqlDebugLogger)
// block()
// }
return transaction(transactionIsolation = Connection.TRANSACTION_READ_COMMITTED) {
debug = true
warnLongQueriesDuration = 1000
addLogger(Slf4jSqlDebugLogger)
runBlocking(MDCContext()) {
block()
}
} }
} }
override suspend fun <T> transaction(transactionLevel: Int, block: suspend () -> T): T { override suspend fun <T> transaction(transactionLevel: Int, block: suspend () -> T): T {
return newSuspendedTransaction(MDCContext(), transactionIsolation = transactionLevel) { return newSuspendedTransaction(MDCContext(), transactionIsolation = transactionLevel) {
addLogger(StdOutSqlLogger) addLogger(Slf4jSqlDebugLogger)
block() block()
} }
} }

View File

@ -0,0 +1,7 @@
package dev.usbharu.hideout.core.domain.exception
import dev.usbharu.hideout.core.domain.exception.resource.ResourceAccessException
interface SQLExceptionTranslator {
fun translate(message: String, sql: String? = null, exception: Exception): ResourceAccessException
}

View File

@ -0,0 +1,19 @@
package dev.usbharu.hideout.core.domain.exception
import dev.usbharu.hideout.core.domain.exception.resource.DuplicateException
import dev.usbharu.hideout.core.domain.exception.resource.ResourceAccessException
import org.springframework.dao.DataAccessException
import org.springframework.dao.DuplicateKeyException
class SpringDataAccessExceptionSQLExceptionTranslator : SQLExceptionTranslator {
override fun translate(message: String, sql: String?, exception: Exception): ResourceAccessException {
if (exception !is DataAccessException) {
throw IllegalArgumentException("exception must be DataAccessException.")
}
return when (exception) {
is DuplicateKeyException -> DuplicateException(message, exception.rootCause)
else -> ResourceAccessException(message, exception.rootCause)
}
}
}

View File

@ -0,0 +1,21 @@
package dev.usbharu.hideout.core.domain.exception.resource
import java.io.Serial
class DuplicateException : ResourceAccessException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
companion object {
@Serial
private const val serialVersionUID: Long = 7092046653037974417L
}
}

View File

@ -1,8 +1,6 @@
package dev.usbharu.hideout.core.domain.exception.resource package dev.usbharu.hideout.core.domain.exception.resource
import dev.usbharu.hideout.core.domain.exception.HideoutException open class NotFoundException : ResourceAccessException {
open class NotFoundException : HideoutException {
constructor() : super() constructor() : super()
constructor(message: String?) : super(message) constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause) constructor(message: String?, cause: Throwable?) : super(message, cause)

View File

@ -0,0 +1,23 @@
package dev.usbharu.hideout.core.domain.exception.resource
import java.io.Serial
class PostNotFoundException : NotFoundException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
companion object {
@Serial
private const val serialVersionUID: Long = 1315818410686905717L
fun withApId(apId: String): PostNotFoundException = PostNotFoundException("apId: $apId was not found.")
}
}

View File

@ -0,0 +1,16 @@
package dev.usbharu.hideout.core.domain.exception.resource
import dev.usbharu.hideout.core.domain.exception.HideoutException
open class ResourceAccessException : HideoutException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -6,7 +6,14 @@ import org.springframework.stereotype.Repository
@Repository @Repository
interface PostRepository { interface PostRepository {
suspend fun generateId(): Long suspend fun generateId(): Long
suspend fun save(post: Post): Boolean suspend fun save(post: Post): Post
suspend fun delete(id: Long) suspend fun delete(id: Long)
suspend fun findById(id: Long): Post suspend fun findById(id: Long): Post?
suspend fun findByUrl(url: String): Post?
suspend fun findByUrl2(url: String): Post? {
throw Exception()
}
suspend fun findByApId(apId: String): Post?
suspend fun existByApIdWithLock(apId: String): Boolean
} }

View File

@ -24,7 +24,7 @@ class MediaQueryServiceImpl : MediaQueryService {
} }
override suspend fun findByRemoteUrl(remoteUrl: String): MediaEntity { override suspend fun findByRemoteUrl(remoteUrl: String): MediaEntity {
return Media.select { Media.remoteUrl eq remoteUrl } return Media.select { Media.remoteUrl eq remoteUrl }.forUpdate()
.singleOr { FailedToGetResourcesException("remoteUrl: $remoteUrl is duplicate or not exist.", it) } .singleOr { FailedToGetResourcesException("remoteUrl: $remoteUrl is duplicate or not exist.", it) }
.toMedia() .toMedia()
} }

View File

@ -0,0 +1,64 @@
package dev.usbharu.hideout.core.infrastructure.exposedrepository
import dev.usbharu.hideout.core.domain.exception.SpringDataAccessExceptionSQLExceptionTranslator
import org.slf4j.Logger
import org.springframework.beans.factory.annotation.Value
import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator
import java.sql.SQLException
abstract class AbstractRepository {
protected abstract val logger: Logger
private val sqlErrorCodeSQLExceptionTranslator = SQLErrorCodeSQLExceptionTranslator()
private val springDataAccessExceptionSQLExceptionTranslator = SpringDataAccessExceptionSQLExceptionTranslator()
@Value("\${hideout.debug.trace-query-exception:false}")
private var traceQueryException: Boolean = false
@Value("\${hideout.debug.trace-query-call:false}")
private var traceQueryCall: Boolean = false
protected suspend fun <T> query(block: () -> T): T = try {
if (traceQueryCall) {
logger.trace(
"""
***** QUERY CALL STACK TRACE *****
${Throwable().stackTrace.joinToString("\n")}
***** QUERY CALL STACK TRACE *****
"""
)
}
block.invoke()
} catch (e: SQLException) {
if (traceQueryException) {
logger.trace("FAILED EXECUTE SQL", e)
}
if (e.cause !is SQLException) {
throw e
}
val dataAccessException =
sqlErrorCodeSQLExceptionTranslator.translate("Failed to persist entity", null, e.cause as SQLException)
?: throw e
if (traceQueryException) {
logger.trace("EXCEPTION TRANSLATED TO", dataAccessException)
}
val translate = springDataAccessExceptionSQLExceptionTranslator.translate(
"Failed to persist entity",
null,
dataAccessException
)
if (traceQueryException) {
logger.trace("EXCEPTION TRANSLATED TO", translate)
}
throw translate
}
}

View File

@ -8,6 +8,8 @@ import dev.usbharu.hideout.core.domain.model.actor.ActorRepository
import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.javatime.timestamp import org.jetbrains.exposed.sql.javatime.timestamp
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Repository import org.springframework.stereotype.Repository
@Repository @Repository
@ -15,12 +17,15 @@ class ActorRepositoryImpl(
private val idGenerateService: IdGenerateService, private val idGenerateService: IdGenerateService,
private val actorResultRowMapper: ResultRowMapper<Actor>, private val actorResultRowMapper: ResultRowMapper<Actor>,
private val actorQueryMapper: QueryMapper<Actor> private val actorQueryMapper: QueryMapper<Actor>
) : ) : ActorRepository, AbstractRepository() {
ActorRepository {
override suspend fun save(actor: Actor): Actor = query {
override suspend fun save(actor: Actor): Actor {
val singleOrNull = Actors.select { Actors.id eq actor.id }.forUpdate().empty() val singleOrNull = Actors.select { Actors.id eq actor.id }.forUpdate().empty()
if (singleOrNull) { if (singleOrNull) {
Actors.insert { Actors.insert {
it[id] = actor.id it[id] = actor.id
it[name] = actor.name it[name] = actor.name
@ -66,52 +71,63 @@ class ActorRepositoryImpl(
it[lastPostAt] = actor.lastPostDate it[lastPostAt] = actor.lastPostDate
} }
} }
return actor return@query actor
} }
override suspend fun findById(id: Long): Actor? = override suspend fun findById(id: Long): Actor? = query {
Actors.select { Actors.id eq id }.singleOrNull()?.let(actorResultRowMapper::map) return@query Actors.select { Actors.id eq id }.singleOrNull()?.let(actorResultRowMapper::map)
}
override suspend fun findByIdWithLock(id: Long): Actor? = override suspend fun findByIdWithLock(id: Long): Actor? = query {
Actors.select { Actors.id eq id }.forUpdate().singleOrNull()?.let(actorResultRowMapper::map) return@query Actors.select { Actors.id eq id }.forUpdate().singleOrNull()?.let(actorResultRowMapper::map)
}
override suspend fun findAll(limit: Int, offset: Long): List<Actor> = override suspend fun findAll(limit: Int, offset: Long): List<Actor> = query {
Actors.selectAll().limit(limit, offset).let(actorQueryMapper::map) return@query Actors.selectAll().limit(limit, offset).let(actorQueryMapper::map)
}
override suspend fun findByName(name: String): List<Actor> = override suspend fun findByName(name: String): List<Actor> = query {
Actors.select { Actors.name eq name }.let(actorQueryMapper::map) return@query Actors.select { Actors.name eq name }.let(actorQueryMapper::map)
}
override suspend fun findByNameAndDomain(name: String, domain: String): Actor? = Actors override suspend fun findByNameAndDomain(name: String, domain: String): Actor? = query {
.select { Actors.name eq name and (Actors.domain eq domain) } return@query Actors.select { Actors.name eq name and (Actors.domain eq domain) }.singleOrNull()
.singleOrNull() ?.let(actorResultRowMapper::map)
?.let(actorResultRowMapper::map) }
override suspend fun findByNameAndDomainWithLock(name: String, domain: String): Actor? = Actors override suspend fun findByNameAndDomainWithLock(name: String, domain: String): Actor? = query {
.select { Actors.name eq name and (Actors.domain eq domain) } return@query Actors.select { Actors.name eq name and (Actors.domain eq domain) }.forUpdate().singleOrNull()
.forUpdate() ?.let(actorResultRowMapper::map)
.singleOrNull() }
?.let(actorResultRowMapper::map)
override suspend fun findByUrl(url: String): Actor? = Actors.select { Actors.url eq url } override suspend fun findByUrl(url: String): Actor? = query {
.singleOrNull() return@query Actors.select { Actors.url eq url }.singleOrNull()?.let(actorResultRowMapper::map)
?.let(actorResultRowMapper::map) }
override suspend fun findByUrlWithLock(url: String): Actor? = Actors.select { Actors.url eq url }.forUpdate() override suspend fun findByUrlWithLock(url: String): Actor? = query {
.singleOrNull() return@query Actors.select { Actors.url eq url }.forUpdate().singleOrNull()?.let(actorResultRowMapper::map)
?.let(actorResultRowMapper::map) }
override suspend fun findByIds(ids: List<Long>): List<Actor> = override suspend fun findByIds(ids: List<Long>): List<Actor> = query {
Actors.select { Actors.id inList ids }.let(actorQueryMapper::map) return@query Actors.select { Actors.id inList ids }.let(actorQueryMapper::map)
}
override suspend fun findByKeyId(keyId: String): Actor? = Actors.select { Actors.keyId eq keyId } override suspend fun findByKeyId(keyId: String): Actor? = query {
.singleOrNull() return@query Actors.select { Actors.keyId eq keyId }.singleOrNull()?.let(actorResultRowMapper::map)
?.let(actorResultRowMapper::map) }
override suspend fun delete(id: Long) { override suspend fun delete(id: Long): Unit = query {
Actors.deleteWhere { Actors.id.eq(id) } Actors.deleteWhere { Actors.id.eq(id) }
} }
override suspend fun nextId(): Long = idGenerateService.generateId() override suspend fun nextId(): Long = idGenerateService.generateId()
companion object {
private val logger = LoggerFactory.getLogger(ActorRepositoryImpl::class.java)
}
override val logger: Logger
get() = Companion.logger
} }
object Actors : Table("actors") { object Actors : Table("actors") {
@ -120,16 +136,14 @@ object Actors : Table("actors") {
val domain: Column<String> = varchar("domain", length = 1000) val domain: Column<String> = varchar("domain", length = 1000)
val screenName: Column<String> = varchar("screen_name", length = 300) val screenName: Column<String> = varchar("screen_name", length = 300)
val description: Column<String> = varchar( val description: Column<String> = varchar(
"description", "description", length = 10000
length = 10000
) )
val inbox: Column<String> = varchar("inbox", length = 1000).uniqueIndex() val inbox: Column<String> = varchar("inbox", length = 1000).uniqueIndex()
val outbox: Column<String> = varchar("outbox", length = 1000).uniqueIndex() val outbox: Column<String> = varchar("outbox", length = 1000).uniqueIndex()
val url: Column<String> = varchar("url", length = 1000).uniqueIndex() val url: Column<String> = varchar("url", length = 1000).uniqueIndex()
val publicKey: Column<String> = varchar("public_key", length = 10000) val publicKey: Column<String> = varchar("public_key", length = 10000)
val privateKey: Column<String?> = varchar( val privateKey: Column<String?> = varchar(
"private_key", "private_key", length = 10000
length = 10000
).nullable() ).nullable()
val createdAt: Column<Long> = long("created_at") val createdAt: Column<Long> = long("created_at")
val keyId = varchar("key_id", length = 1000) val keyId = varchar("key_id", length = 1000)

View File

@ -19,7 +19,7 @@ class MediaRepositoryImpl(private val idGenerateService: IdGenerateService) : Me
override suspend fun save(media: EntityMedia): EntityMedia { override suspend fun save(media: EntityMedia): EntityMedia {
if (Media.select { if (Media.select {
Media.id eq media.id Media.id eq media.id
}.singleOrNull() != null }.forUpdate().singleOrNull() != null
) { ) {
Media.update({ Media.id eq media.id }) { Media.update({ Media.id eq media.id }) {
it[name] = media.name it[name] = media.name

View File

@ -2,24 +2,24 @@ package dev.usbharu.hideout.core.infrastructure.exposedrepository
import dev.usbharu.hideout.application.infrastructure.exposed.QueryMapper import dev.usbharu.hideout.application.infrastructure.exposed.QueryMapper
import dev.usbharu.hideout.application.service.id.IdGenerateService import dev.usbharu.hideout.application.service.id.IdGenerateService
import dev.usbharu.hideout.core.domain.exception.FailedToGetResourcesException
import dev.usbharu.hideout.core.domain.model.post.Post import dev.usbharu.hideout.core.domain.model.post.Post
import dev.usbharu.hideout.core.domain.model.post.PostRepository import dev.usbharu.hideout.core.domain.model.post.PostRepository
import dev.usbharu.hideout.util.singleOr
import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Repository import org.springframework.stereotype.Repository
@Repository @Repository
class PostRepositoryImpl( class PostRepositoryImpl(
private val idGenerateService: IdGenerateService, private val idGenerateService: IdGenerateService,
private val postQueryMapper: QueryMapper<Post> private val postQueryMapper: QueryMapper<Post>
) : PostRepository { ) : PostRepository, AbstractRepository() {
override suspend fun generateId(): Long = idGenerateService.generateId() override suspend fun generateId(): Long = idGenerateService.generateId()
override suspend fun save(post: Post): Boolean { override suspend fun save(post: Post): Post = query {
val singleOrNull = Posts.select { Posts.id eq post.id }.singleOrNull() val singleOrNull = Posts.select { Posts.id eq post.id }.forUpdate().singleOrNull()
if (singleOrNull == null) { if (singleOrNull == null) {
Posts.insert { Posts.insert {
it[id] = post.id it[id] = post.id
@ -61,18 +61,44 @@ class PostRepositoryImpl(
it[deleted] = post.delted it[deleted] = post.delted
} }
} }
return singleOrNull == null return@query post
} }
override suspend fun findById(id: Long): Post = override suspend fun findById(id: Long): Post? = query {
Posts.leftJoin(PostsMedia) return@query Posts.leftJoin(PostsMedia)
.select { Posts.id eq id } .select { Posts.id eq id }
.let(postQueryMapper::map) .let(postQueryMapper::map)
.singleOr { FailedToGetResourcesException("id: $id was not found.", it) } .singleOrNull()
}
override suspend fun delete(id: Long) { override suspend fun findByUrl(url: String): Post? = query {
return@query Posts.leftJoin(PostsMedia)
.select { Posts.url eq url }
.let(postQueryMapper::map)
.singleOrNull()
}
override suspend fun findByApId(apId: String): Post? = query {
return@query Posts.leftJoin(PostsMedia)
.select { Posts.apId eq apId }
.let(postQueryMapper::map)
.singleOrNull()
}
override suspend fun existByApIdWithLock(apId: String): Boolean = query {
return@query Posts.select { Posts.apId eq apId }.forUpdate().empty().not()
}
override suspend fun delete(id: Long): Unit = query {
Posts.deleteWhere { Posts.id eq id } Posts.deleteWhere { Posts.id eq id }
} }
override val logger: Logger
get() = Companion.logger
companion object {
private val logger = LoggerFactory.getLogger(PostRepositoryImpl::class.java)
}
} }
object Posts : Table() { object Posts : Table() {

View File

@ -7,8 +7,11 @@ import dev.usbharu.hideout.core.service.job.JobQueueWorkerService
import kjob.core.dsl.JobContextWithProps import kjob.core.dsl.JobContextWithProps
import kjob.core.dsl.JobRegisterContext import kjob.core.dsl.JobRegisterContext
import kjob.core.dsl.KJobFunctions import kjob.core.dsl.KJobFunctions
import kjob.core.job.JobExecutionType
import kjob.core.kjob import kjob.core.kjob
import kjob.mongo.Mongo import kjob.mongo.Mongo
import kotlinx.coroutines.CancellationException
import org.slf4j.MDC
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -24,6 +27,8 @@ class KJobMongoJobQueueWorkerService(
nonBlockingMaxJobs = 10 nonBlockingMaxJobs = 10
blockingMaxJobs = 10 blockingMaxJobs = 10
jobExecutionPeriodInSeconds = 1 jobExecutionPeriodInSeconds = 1
maxRetries = 3
defaultJobExecutor = JobExecutionType.NON_BLOCKING
}.start() }.start()
} }
@ -36,9 +41,22 @@ class KJobMongoJobQueueWorkerService(
} }
for (jobProcessor in jobQueueProcessorList) { for (jobProcessor in jobQueueProcessorList) {
kjob.register(jobProcessor.job()) { kjob.register(jobProcessor.job()) {
execute { execute {
val param = it.convertUnsafe(props) try {
jobProcessor.process(param) MDC.put("x-job-id", this.jobId)
val param = it.convertUnsafe(props)
jobProcessor.process(param)
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.warn("FAILED Excute Job. job name: {} job id: {}", it.name, this.jobId, e)
throw e
} finally {
MDC.remove("x-job-id")
}
}.onError {
logger.warn("FAILED Excute Job. job name: {} job id: {}", this.jobName, this.jobId, error)
} }
} }
} }

View File

@ -1,11 +1,15 @@
package dev.usbharu.hideout.core.infrastructure.mongorepository package dev.usbharu.hideout.core.infrastructure.mongorepository
import dev.usbharu.hideout.application.service.id.IdGenerateService import dev.usbharu.hideout.application.service.id.IdGenerateService
import dev.usbharu.hideout.core.domain.exception.resource.DuplicateException
import dev.usbharu.hideout.core.domain.exception.resource.ResourceAccessException
import dev.usbharu.hideout.core.domain.model.timeline.Timeline import dev.usbharu.hideout.core.domain.model.timeline.Timeline
import dev.usbharu.hideout.core.domain.model.timeline.TimelineRepository import dev.usbharu.hideout.core.domain.model.timeline.TimelineRepository
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.dao.DataAccessException
import org.springframework.dao.DuplicateKeyException
import org.springframework.stereotype.Repository import org.springframework.stereotype.Repository
@Repository @Repository
@ -23,8 +27,15 @@ class MongoTimelineRepositoryWrapper(
} }
} }
override suspend fun saveAll(timelines: List<Timeline>): List<Timeline> = override suspend fun saveAll(timelines: List<Timeline>): List<Timeline> {
mongoTimelineRepository.saveAll(timelines) try {
return mongoTimelineRepository.saveAll(timelines)
} catch (e: DuplicateKeyException) {
throw DuplicateException("Timeline duplicate.", e)
} catch (e: DataAccessException) {
throw ResourceAccessException(e)
}
}
override suspend fun findByUserId(id: Long): List<Timeline> { override suspend fun findByUserId(id: Long): List<Timeline> {
return withContext(Dispatchers.IO) { return withContext(Dispatchers.IO) {

View File

@ -2,6 +2,8 @@ package dev.usbharu.hideout.core.service.post
import dev.usbharu.hideout.activitypub.service.activity.create.ApSendCreateService import dev.usbharu.hideout.activitypub.service.activity.create.ApSendCreateService
import dev.usbharu.hideout.core.domain.exception.UserNotFoundException import dev.usbharu.hideout.core.domain.exception.UserNotFoundException
import dev.usbharu.hideout.core.domain.exception.resource.DuplicateException
import dev.usbharu.hideout.core.domain.exception.resource.PostNotFoundException
import dev.usbharu.hideout.core.domain.model.actor.Actor import dev.usbharu.hideout.core.domain.model.actor.Actor
import dev.usbharu.hideout.core.domain.model.actor.ActorRepository import dev.usbharu.hideout.core.domain.model.actor.ActorRepository
import dev.usbharu.hideout.core.domain.model.post.Post import dev.usbharu.hideout.core.domain.model.post.Post
@ -9,9 +11,7 @@ import dev.usbharu.hideout.core.domain.model.post.PostRepository
import dev.usbharu.hideout.core.domain.model.reaction.ReactionRepository import dev.usbharu.hideout.core.domain.model.reaction.ReactionRepository
import dev.usbharu.hideout.core.query.PostQueryService import dev.usbharu.hideout.core.query.PostQueryService
import dev.usbharu.hideout.core.service.timeline.TimelineService import dev.usbharu.hideout.core.service.timeline.TimelineService
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.dao.DuplicateKeyException
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.time.Instant import java.time.Instant
@ -79,18 +79,12 @@ class PostServiceImpl(
private suspend fun internalCreate(post: Post, isLocal: Boolean, actor: Actor): Post { private suspend fun internalCreate(post: Post, isLocal: Boolean, actor: Actor): Post {
return try { return try {
if (postRepository.save(post)) { val save = postRepository.save(post)
try { timelineService.publishTimeline(post, isLocal)
timelineService.publishTimeline(post, isLocal) actorRepository.save(actor.incrementPostsCount())
actorRepository.save(actor.incrementPostsCount()) save
} catch (e: DuplicateKeyException) { } catch (e: DuplicateException) {
logger.trace("Timeline already exists.", e) postRepository.findByApId(post.apId) ?: throw PostNotFoundException.withApId(post.apId)
}
}
post
} catch (e: ExposedSQLException) {
logger.warn("FAILED Save to post. url: ${post.apId}", e)
postQueryService.findByApId(post.apId)
} }
} }

View File

@ -33,6 +33,7 @@ class InMemoryCacheManager : CacheManager {
val processed = try { val processed = try {
block() block()
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace()
cacheKey.remove(key) cacheKey.remove(key)
throw e throw e
} }
@ -46,7 +47,7 @@ class InMemoryCacheManager : CacheManager {
override suspend fun getOrWait(key: String): ResolveResponse { override suspend fun getOrWait(key: String): ResolveResponse {
while (valueStore.contains(key).not()) { while (valueStore.contains(key).not()) {
if (cacheKey.containsKey(key).not()) { if (cacheKey.containsKey(key).not()) {
throw IllegalStateException("Invalid cache key.") throw IllegalStateException("Invalid cache key. $key")
} }
delay(1) delay(1)
} }

View File

@ -3,6 +3,7 @@ package dev.usbharu.hideout.core.service.user
import dev.usbharu.hideout.activitypub.service.activity.delete.APSendDeleteService import dev.usbharu.hideout.activitypub.service.activity.delete.APSendDeleteService
import dev.usbharu.hideout.application.config.ApplicationConfig import dev.usbharu.hideout.application.config.ApplicationConfig
import dev.usbharu.hideout.core.domain.exception.FailedToGetResourcesException import dev.usbharu.hideout.core.domain.exception.FailedToGetResourcesException
import dev.usbharu.hideout.core.domain.exception.resource.DuplicateException
import dev.usbharu.hideout.core.domain.exception.resource.UserNotFoundException import dev.usbharu.hideout.core.domain.exception.resource.UserNotFoundException
import dev.usbharu.hideout.core.domain.model.actor.Actor import dev.usbharu.hideout.core.domain.model.actor.Actor
import dev.usbharu.hideout.core.domain.model.actor.ActorRepository import dev.usbharu.hideout.core.domain.model.actor.ActorRepository
@ -15,10 +16,8 @@ import dev.usbharu.hideout.core.domain.model.userdetails.UserDetailRepository
import dev.usbharu.hideout.core.query.DeletedActorQueryService import dev.usbharu.hideout.core.query.DeletedActorQueryService
import dev.usbharu.hideout.core.service.instance.InstanceService import dev.usbharu.hideout.core.service.instance.InstanceService
import dev.usbharu.hideout.core.service.post.PostService import dev.usbharu.hideout.core.service.post.PostService
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.Instant import java.time.Instant
@Service @Service
@ -72,7 +71,6 @@ class UserServiceImpl(
return save return save
} }
@Transactional
override suspend fun createRemoteUser(user: RemoteUserCreateDto): Actor { override suspend fun createRemoteUser(user: RemoteUserCreateDto): Actor {
logger.info("START Create New remote user. name: {} url: {}", user.name, user.url) logger.info("START Create New remote user. name: {} url: {}", user.name, user.url)
@ -113,8 +111,7 @@ class UserServiceImpl(
val save = actorRepository.save(userEntity) val save = actorRepository.save(userEntity)
logger.warn("SUCCESS Create New remote user. id: {} name: {} url: {}", userEntity.id, user.name, user.url) logger.warn("SUCCESS Create New remote user. id: {} name: {} url: {}", userEntity.id, user.name, user.url)
save save
} catch (_: ExposedSQLException) { } catch (_: DuplicateException) {
logger.warn("FAILED User already exists. name: {} url: {}", user.name, user.url)
actorRepository.findByUrl(user.url)!! actorRepository.findByUrl(user.url)!!
} }
} }

View File

@ -15,7 +15,7 @@
</appender> </appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder> <encoder>
<pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{x-request-id}] [%X{x-job-id}] %logger{36} - <pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{x-request-id},%X{x-job-id}] %logger{36} -
%msg%n %msg%n
</pattern> </pattern>
</encoder> </encoder>