diff --git a/hideout-core/build.gradle.kts b/hideout-core/build.gradle.kts index 2537e715..ae7b5128 100644 --- a/hideout-core/build.gradle.kts +++ b/hideout-core/build.gradle.kts @@ -188,10 +188,8 @@ dependencies { implementation(libs.bundles.exposed) implementation(libs.bundles.coroutines) implementation(libs.bundles.ktor.client) - implementation(libs.bundles.serialization) implementation(libs.bundles.apache.tika) implementation(libs.bundles.openapi) - implementation(libs.bundles.kjob) implementation(libs.bundles.owl.producer) implementation(libs.bundles.spring.boot.oauth2) implementation(libs.bundles.spring.boot.data.mongodb) diff --git a/hideout-core/src/main/kotlin/dev/usbharu/hideout/activitypub/service/activity/follow/APFollowProcessor.kt b/hideout-core/src/main/kotlin/dev/usbharu/hideout/activitypub/service/activity/follow/APFollowProcessor.kt index 751c59f3..3cc8cc68 100644 --- a/hideout-core/src/main/kotlin/dev/usbharu/hideout/activitypub/service/activity/follow/APFollowProcessor.kt +++ b/hideout-core/src/main/kotlin/dev/usbharu/hideout/activitypub/service/activity/follow/APFollowProcessor.kt @@ -16,7 +16,6 @@ package dev.usbharu.hideout.activitypub.service.activity.follow -import com.fasterxml.jackson.databind.ObjectMapper import dev.usbharu.hideout.activitypub.domain.model.Follow import dev.usbharu.hideout.activitypub.service.common.AbstractActivityPubProcessor import dev.usbharu.hideout.activitypub.service.common.ActivityPubProcessContext @@ -29,7 +28,6 @@ import org.springframework.stereotype.Service @Service class APFollowProcessor( transaction: Transaction, - private val objectMapper: ObjectMapper, private val owlProducer: OwlProducer, ) : AbstractActivityPubProcessor(transaction) { diff --git a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedJobRepository.kt b/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedJobRepository.kt deleted file mode 100644 index 50c60379..00000000 --- a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedJobRepository.kt +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Copyright (C) 2024 usbharu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.usbharu.hideout.core.infrastructure.kjobexposed - -import kjob.core.job.JobProgress -import kjob.core.job.JobSettings -import kjob.core.job.JobStatus -import kjob.core.job.ScheduledJob -import kjob.core.repository.JobRepository -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.* -import org.jetbrains.exposed.dao.id.LongIdTable -import org.jetbrains.exposed.sql.* -import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq -import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList -import org.jetbrains.exposed.sql.SqlExpressionBuilder.plus -import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction -import org.jetbrains.exposed.sql.transactions.transaction -import java.time.Clock -import java.time.Instant -import java.util.* - -class ExposedJobRepository( - private val database: Database, - private val tableName: String, - private val clock: Clock, - private val json: Json -) : - JobRepository { - - class Jobs(tableName: String) : LongIdTable(tableName) { - val status = text("status") - val runAt = long("runAt").nullable() - val statusMessage = text("statusMessage").nullable() - val retries = integer("retries") - val kjobId = char("kjobId", 36).nullable() - val createdAt = long("createdAt") - val updatedAt = long("updatedAt") - val jobId = text("jobId") - val name = text("name") - val properties = text("properties").nullable() - val step = integer("step") - val max = integer("max").nullable() - val startedAt = long("startedAt").nullable() - val completedAt = long("completedAt").nullable() - } - - val jobs: Jobs = Jobs(tableName) - - fun createTable() { - transaction(database) { - SchemaUtils.create(jobs) - } - } - - @Suppress("InjectDispatcher") - suspend fun query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() } - - override suspend fun completeProgress(id: String): Boolean { - val now = Instant.now(clock).toEpochMilli() - return query { - jobs.update({ jobs.id eq id.toLong() }) { - it[jobs.completedAt] = now - it[jobs.updatedAt] = now - } == 1 - } - } - - override suspend fun exist(jobId: String): Boolean { - return query { - jobs.selectAll().where(jobs.jobId eq jobId).empty().not() - } - } - - @Suppress("SuspendFunWithFlowReturnType") - override suspend fun findNext(names: Set, status: Set, limit: Int): Flow { - return query { - jobs.selectAll().where( - jobs.status.inList(list = status.map { it.name }) - .and(if (names.isEmpty()) Op.TRUE else jobs.name.inList(names)) - ).limit(limit) - .map { it.toScheduledJob() }.asFlow() - } - } - - override suspend fun get(id: String): ScheduledJob? { - val single = query { jobs.selectAll().where(jobs.id eq id.toLong()).singleOrNull() } ?: return null - return single.toScheduledJob() - } - - override suspend fun reset(id: String, oldKjobId: UUID?): Boolean { - return query { - jobs.update({ - jobs.id eq id.toLong() and if (oldKjobId == null) { - jobs.kjobId.isNull() - } else { - jobs.kjobId eq oldKjobId.toString() - } - }) { - it[jobs.status] = JobStatus.CREATED.name - it[jobs.statusMessage] = null - it[jobs.kjobId] = null - it[jobs.step] = 0 - it[jobs.max] = null - it[jobs.startedAt] = null - it[jobs.completedAt] = null - it[jobs.updatedAt] = Instant.now(clock).toEpochMilli() - } == 1 - } - } - - override suspend fun save(jobSettings: JobSettings, runAt: Instant?): ScheduledJob { - val now = Instant.now(clock) - val scheduledJob = - ScheduledJob( - id = "", - status = JobStatus.CREATED, - runAt = runAt, - statusMessage = null, - retries = 0, - kjobId = null, - createdAt = now, - updatedAt = now, - settings = jobSettings, - progress = JobProgress(0) - ) - val id = query { - jobs.insert { - it[jobs.status] = scheduledJob.status.name - it[jobs.createdAt] = scheduledJob.createdAt.toEpochMilli() - it[jobs.updatedAt] = scheduledJob.updatedAt.toEpochMilli() - it[jobs.jobId] = scheduledJob.settings.id - it[jobs.name] = scheduledJob.settings.name - it[jobs.properties] = scheduledJob.settings.properties.stringify() - it[jobs.runAt] = scheduledJob.runAt?.toEpochMilli() - it[jobs.statusMessage] = null - it[jobs.retries] = 0 - it[jobs.kjobId] = null - it[jobs.step] = 0 - it[jobs.max] = null - it[jobs.startedAt] = null - it[jobs.completedAt] = null - }[jobs.id].value - } - return scheduledJob.copy(id = id.toString()) - } - - override suspend fun setProgressMax(id: String, max: Long): Boolean { - val now = Instant.now(clock).toEpochMilli() - return query { - jobs.update({ jobs.id eq id.toLong() }) { - it[jobs.max] = max.toInt() - it[jobs.updatedAt] = now - } == 1 - } - } - - override suspend fun startProgress(id: String): Boolean { - val now = Instant.now(clock).toEpochMilli() - return query { - jobs.update({ jobs.id eq id.toLong() }) { - it[jobs.startedAt] = now - it[jobs.updatedAt] = now - } == 1 - } - } - - override suspend fun stepProgress(id: String, step: Long): Boolean { - val now = Instant.now(clock).toEpochMilli() - return query { - jobs.update({ jobs.id eq id.toLong() }) { - it[jobs.step] = jobs.step + step.toInt() - it[jobs.updatedAt] = now - } == 1 - } - } - - override suspend fun update( - id: String, - oldKjobId: UUID?, - kjobId: UUID?, - status: JobStatus, - statusMessage: String?, - retries: Int - ): Boolean { - return query { - jobs.update({ - (jobs.id eq id.toLong()) and if (oldKjobId == null) { - jobs.kjobId.isNull() - } else { - jobs.kjobId eq oldKjobId.toString() - } - }) { - it[jobs.status] = status.name - it[jobs.retries] = retries - it[jobs.updatedAt] = Instant.now(clock).toEpochMilli() - it[jobs.id] = id.toLong() - it[jobs.statusMessage] = statusMessage - it[jobs.kjobId] = kjobId.toString() - } == 1 - } - } - - @Suppress("CyclomaticComplexMethod") - private fun String?.parseJsonMap(): Map { - this ?: return emptyMap() - return json.parseToJsonElement(this).jsonObject.mapValues { (_, el) -> - if (el is JsonObject) { - val t = el["t"]?.run { jsonPrimitive.content } ?: error("Cannot get jsonPrimitive") - val value = el["v"]?.jsonArray ?: error("Cannot get jsonArray") - when (t) { - "s" -> value.map { it.jsonPrimitive.content } - "d" -> value.map { it.jsonPrimitive.double } - "l" -> value.map { it.jsonPrimitive.long } - "i" -> value.map { it.jsonPrimitive.int } - "b" -> value.map { it.jsonPrimitive.boolean } - else -> error("Unknown type prefix '$t'") - }.toList() - } else { - val content = el.jsonPrimitive.content - val t = content.substringBefore(':') - val value = content.substringAfter(':') - when (t) { - "s" -> value - "d" -> value.toDouble() - "l" -> value.toLong() - "i" -> value.toInt() - "b" -> value.toBoolean() - else -> error("Unknown type prefix '$t'") - } - } - } - } - - @Suppress("CyclomaticComplexMethod") - private fun Map.stringify(): String? { - if (isEmpty()) { - return null - } - - @Suppress("UNCHECKED_CAST") - fun listSerialize(value: List<*>): JsonElement { - return if (value.isEmpty()) { - buildJsonObject { - put("t", "s") - putJsonArray("v") {} - } - } else { - val (t, values) = when (val item = value.first()) { - is Double -> "d" to (value as List).map(::JsonPrimitive) - is Long -> "l" to (value as List).map(::JsonPrimitive) - is Int -> "i" to (value as List).map(::JsonPrimitive) - is String -> "s" to (value as List).map(::JsonPrimitive) - is Boolean -> "b" to (value as List).map(::JsonPrimitive) - else -> error("Cannot serialize unsupported list property value: $item") - } - buildJsonObject { - put("t", t) - put("v", JsonArray(values)) - } - } - } - - fun createJsonPrimitive(string: String, value: Any) = JsonPrimitive("$string:$value") - - val jsonObject = JsonObject( - mapValues { (_, value) -> - when (value) { - is List<*> -> listSerialize(value) - is Double -> createJsonPrimitive("d", value) - is Long -> createJsonPrimitive("l", value) - is Int -> createJsonPrimitive("i", value) - is String -> createJsonPrimitive("s", value) - is Boolean -> createJsonPrimitive("b", value) - else -> error("Cannot serialize unsupported property value: $value") - } - } - ) - return json.encodeToString(jsonObject) - } - - private fun ResultRow.toScheduledJob(): ScheduledJob { - val single = this - - return ScheduledJob( - id = single[jobs.id].value.toString(), - status = JobStatus.valueOf(single[jobs.status]), - runAt = single[jobs.runAt]?.let { Instant.ofEpochMilli(it) }, - statusMessage = single[jobs.statusMessage], - retries = single[jobs.retries], - kjobId = single[jobs.kjobId]?.let { - try { - @Suppress("SwallowedException") - UUID.fromString(it) - } catch (ignored: IllegalArgumentException) { - null - } - }, - createdAt = Instant.ofEpochMilli(single[jobs.createdAt]), - updatedAt = Instant.ofEpochMilli(single[jobs.updatedAt]), - settings = JobSettings( - id = single[jobs.jobId], - name = single[jobs.name], - properties = single[jobs.properties].parseJsonMap() - ), - progress = JobProgress( - step = single[jobs.step].toLong(), - max = single[jobs.max]?.toLong(), - startedAt = single[jobs.startedAt]?.let { Instant.ofEpochMilli(it) }, - completedAt = single[jobs.completedAt]?.let { Instant.ofEpochMilli(it) } - ) - ) - } -} diff --git a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedKJob.kt b/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedKJob.kt deleted file mode 100644 index b2f5c49d..00000000 --- a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedKJob.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (C) 2024 usbharu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.usbharu.hideout.core.infrastructure.kjobexposed - -import kjob.core.BaseKJob -import kjob.core.KJob -import kjob.core.KJobFactory -import kotlinx.coroutines.runBlocking -import org.jetbrains.exposed.sql.Database -import java.time.Clock - -class ExposedKJob(config: Configuration) : BaseKJob(config) { - - private val database: Database = config.connectionDatabase ?: Database.connect( - requireNotNull(config.connectionString), - requireNotNull(config.driverClassName) - ) - - override val jobRepository: ExposedJobRepository - get() = ExposedJobRepository(database, config.jobTableName, Clock.systemUTC(), config.json) - - override val lockRepository: ExposedLockRepository - get() = ExposedLockRepository(database, config, clock) - - override fun start(): KJob { - jobRepository.createTable() - lockRepository.createTable() - return super.start() - } - - override fun shutdown(): Unit = runBlocking { - super.shutdown() - lockRepository.clearExpired() - } - - companion object : KJobFactory { - override fun create(configure: Configuration.() -> Unit): KJob = ExposedKJob(Configuration().apply(configure)) - } - - class Configuration : BaseKJob.Configuration() { - var connectionString: String? = null - var driverClassName: String? = null - var connectionDatabase: Database? = null - - var jobTableName: String = "kjobJobs" - - var lockTableName: String = "kjobLocks" - - var expireLockInMinutes: Long = 5L - } -} diff --git a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedLockRepository.kt b/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedLockRepository.kt deleted file mode 100644 index f087f224..00000000 --- a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/infrastructure/kjobexposed/ExposedLockRepository.kt +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (C) 2024 usbharu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.usbharu.hideout.core.infrastructure.kjobexposed - -import kjob.core.job.Lock -import kjob.core.repository.LockRepository -import kotlinx.coroutines.Dispatchers -import org.jetbrains.exposed.dao.id.UUIDTable -import org.jetbrains.exposed.sql.* -import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq -import org.jetbrains.exposed.sql.SqlExpressionBuilder.greater -import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction -import org.jetbrains.exposed.sql.transactions.transaction -import java.time.Clock -import java.time.Instant -import java.util.* -import kotlin.time.Duration.Companion.minutes - -class ExposedLockRepository( - private val database: Database, - private val config: ExposedKJob.Configuration, - private val clock: Clock -) : LockRepository { - - class Locks(tableName: String) : UUIDTable(tableName) { - val updatedAt = long("updatedAt") - val expiresAt = long("expiresAt") - } - - val locks: Locks = Locks(config.lockTableName) - - fun createTable() { - transaction(database) { - SchemaUtils.create(locks) - } - } - - @Suppress("InjectDispatcher") - suspend fun query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() } - - override suspend fun exists(id: UUID): Boolean { - val now = Instant.now(clock) - return query { - locks.selectAll().where(locks.id eq id and locks.expiresAt.greater(now.toEpochMilli())).empty().not() - } - } - - override suspend fun ping(id: UUID): Lock { - val now = Instant.now(clock) - val expiresAt = now.plusSeconds(config.expireLockInMinutes.minutes.inWholeSeconds) - val lock = Lock(id, now) - query { - if (locks.selectAll().where(locks.id eq id).limit(1) - .map { Lock(it[locks.id].value, Instant.ofEpochMilli(it[locks.expiresAt])) }.isEmpty() - ) { - locks.insert { - it[locks.id] = id - it[locks.updatedAt] = now.toEpochMilli() - it[locks.expiresAt] = expiresAt.toEpochMilli() - } - } else { - locks.update({ locks.id eq id }) { - it[locks.updatedAt] = now.toEpochMilli() - it[locks.expiresAt] = expiresAt.toEpochMilli() - } - } - } - return lock - } - - suspend fun clearExpired() { - val now = Instant.now(clock).toEpochMilli() - query { - locks.deleteWhere { locks.expiresAt greater now } - } - } -}