diff --git a/build.gradle.kts b/build.gradle.kts index 54780dc5..b7e81f91 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -28,6 +28,14 @@ repositories { mavenCentral() } +kotlin { + target { + compilations.all { + kotlinOptions.jvmTarget = JavaVersion.VERSION_11.toString() + } + } +} + dependencies { implementation("io.ktor:ktor-server-core-jvm:$ktor_version") implementation("io.ktor:ktor-server-auth-jvm:$ktor_version") @@ -63,6 +71,8 @@ dependencies { implementation("tech.barbero.http-messages-signing:http-messages-signing-core:1.0.0") testImplementation("org.junit.jupiter:junit-jupiter:5.8.1") + + implementation("org.drewcarlson:kjob-core:0.6.0") } jib { diff --git a/src/main/kotlin/dev/usbharu/hideout/service/job/JobQueueService.kt b/src/main/kotlin/dev/usbharu/hideout/service/job/JobQueueService.kt new file mode 100644 index 00000000..94486f45 --- /dev/null +++ b/src/main/kotlin/dev/usbharu/hideout/service/job/JobQueueService.kt @@ -0,0 +1,10 @@ +package dev.usbharu.hideout.service.job + +import kjob.core.Job +import kjob.core.dsl.ScheduleContext + +interface JobQueueService { + + fun init(jobDefines:List) + suspend fun schedule(job: J, block: ScheduleContext.(J) -> Unit) +} diff --git a/src/main/kotlin/dev/usbharu/hideout/service/job/JobWorkerService.kt b/src/main/kotlin/dev/usbharu/hideout/service/job/JobWorkerService.kt new file mode 100644 index 00000000..86e8d69c --- /dev/null +++ b/src/main/kotlin/dev/usbharu/hideout/service/job/JobWorkerService.kt @@ -0,0 +1,10 @@ +package dev.usbharu.hideout.service.job + +import kjob.core.Job +import kjob.core.dsl.JobContextWithProps +import kjob.core.dsl.JobRegisterContext +import kjob.core.dsl.KJobFunctions + +interface JobWorkerService { + fun init(defines: List>.(Job) -> KJobFunctions>>>) +} diff --git a/src/main/kotlin/dev/usbharu/hideout/service/job/KJobJobQueueService.kt b/src/main/kotlin/dev/usbharu/hideout/service/job/KJobJobQueueService.kt new file mode 100644 index 00000000..f09d22d0 --- /dev/null +++ b/src/main/kotlin/dev/usbharu/hideout/service/job/KJobJobQueueService.kt @@ -0,0 +1,24 @@ +package dev.usbharu.hideout.service.job + +import dev.usbharu.kjob.exposed.ExposedKJob +import kjob.core.Job +import kjob.core.KJob +import kjob.core.dsl.ScheduleContext +import kjob.core.kjob +import org.jetbrains.exposed.sql.Database + +class KJobJobQueueService(private val database: Database) : JobQueueService { + + val kjob: KJob = kjob(ExposedKJob) { + connectionDatabase = database + isWorker = false + }.start() + + override fun init(jobDefines: List) { + + } + + override suspend fun schedule(job: J,block:ScheduleContext.(J)->Unit) { + kjob.schedule(job,block) + } +} diff --git a/src/main/kotlin/dev/usbharu/hideout/service/job/KJobJobWorkerService.kt b/src/main/kotlin/dev/usbharu/hideout/service/job/KJobJobWorkerService.kt new file mode 100644 index 00000000..5ee5a6ba --- /dev/null +++ b/src/main/kotlin/dev/usbharu/hideout/service/job/KJobJobWorkerService.kt @@ -0,0 +1,28 @@ +package dev.usbharu.hideout.service.job + +import dev.usbharu.kjob.exposed.ExposedKJob +import kjob.core.Job +import kjob.core.dsl.JobContextWithProps +import kjob.core.dsl.JobRegisterContext +import kjob.core.dsl.KJobFunctions +import kjob.core.kjob +import org.jetbrains.exposed.sql.Database + +class KJobJobWorkerService(private val database: Database) : JobWorkerService { + + val kjob by lazy { + kjob(ExposedKJob) { + connectionDatabase = database + nonBlockingMaxJobs = 10 + blockingMaxJobs = 10 + jobExecutionPeriodInSeconds = 10 + }.start() + } + + override fun init(defines: List>.(Job) -> KJobFunctions>>>) { + defines.forEach { job -> + kjob.register(job.first, job.second) + } + } + +} diff --git a/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedJobRepository.kt b/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedJobRepository.kt new file mode 100644 index 00000000..6533358a --- /dev/null +++ b/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedJobRepository.kt @@ -0,0 +1,290 @@ +package dev.usbharu.kjob.exposed + +import kjob.core.job.JobProgress +import kjob.core.job.JobSettings +import kjob.core.job.JobStatus +import kjob.core.job.ScheduledJob +import kjob.core.repository.JobRepository +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.* +import org.jetbrains.exposed.dao.id.LongIdTable +import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq +import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList +import org.jetbrains.exposed.sql.SqlExpressionBuilder.isNull +import org.jetbrains.exposed.sql.SqlExpressionBuilder.plus +import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction +import org.jetbrains.exposed.sql.transactions.transaction +import java.time.Clock +import java.time.Instant +import java.util.* + +class ExposedJobRepository( + private val database: Database, + private val tableName: String, + private val clock: Clock, + private val json: Json +) : + JobRepository { + + class Jobs(tableName: String) : LongIdTable(tableName) { + val status = text("status") + val runAt = long("runAt").nullable() + val statusMessage = text("statusMessage").nullable() + val retries = integer("retries") + val kjobId = char("kjobId", 36).nullable() + val createdAt = long("createdAt") + val updatedAt = long("updatedAt") + val jobId = text("jobId") + val name = text("name") + val properties = text("properties").nullable() + val step = integer("step") + val max = integer("max").nullable() + val startedAt = long("startedAt").nullable() + val completedAt = long("completedAt").nullable() + } + + val jobs: Jobs = Jobs(tableName) + + fun createTable() { + transaction(database) { + SchemaUtils.create(jobs) + } + } + + suspend fun query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() } + + + override suspend fun completeProgress(id: String): Boolean { + val now = Instant.now(clock).toEpochMilli() + return query { + jobs.update({ jobs.id eq id.toLong() }) { + it[jobs.completedAt] = now + it[jobs.updatedAt] = now + } == 1 + } + } + + override suspend fun exist(jobId: String): Boolean { + return query { + jobs.select(jobs.jobId eq jobId).empty().not() + } + } + + override suspend fun findNext(names: Set, status: Set, limit: Int): Flow { + return query { + jobs.select( + jobs.status.inList(list = status.map { it.name }) + .and(if (names.isEmpty()) Op.TRUE else jobs.name.inList(names)) + ).limit(limit) + .map { it.toScheduledJob() }.asFlow() + } + } + + override suspend fun get(id: String): ScheduledJob? { + val single = query { jobs.select(jobs.id eq id.toLong()).singleOrNull() } ?: return null + return single.toScheduledJob() + + } + + override suspend fun reset(id: String, oldKjobId: UUID?): Boolean { + return query { + jobs.update({ jobs.id eq id.toLong() and if (oldKjobId == null) jobs.kjobId.isNull() else jobs.kjobId eq oldKjobId.toString() }) { + it[jobs.status] = JobStatus.CREATED.name + it[jobs.statusMessage] = null + it[jobs.kjobId] = null + it[jobs.step] = 0 + it[jobs.max] = null + it[jobs.startedAt] = null + it[jobs.completedAt] = null + it[jobs.updatedAt] = Instant.now(clock).toEpochMilli() + } == 1 + } + } + + override suspend fun save(jobSettings: JobSettings, runAt: Instant?): ScheduledJob { + val now = Instant.now(clock) + val scheduledJob = + ScheduledJob("", JobStatus.CREATED, runAt, null, 0, null, now, now, jobSettings, JobProgress(0)) + val id = query { + jobs.insert { + it[jobs.status] = scheduledJob.status.name + it[jobs.createdAt] = scheduledJob.createdAt.toEpochMilli() + it[jobs.updatedAt] = scheduledJob.updatedAt.toEpochMilli() + it[jobs.jobId] = scheduledJob.settings.id + it[jobs.name] = scheduledJob.settings.name + it[jobs.properties] = scheduledJob.settings.properties.stringify() + it[jobs.runAt] = scheduledJob.runAt?.toEpochMilli() + it[jobs.statusMessage] = null + it[jobs.retries] = 0 + it[jobs.kjobId] = null + it[jobs.step] = 0 + it[jobs.max] = null + it[jobs.startedAt] = null + it[jobs.completedAt] = null + }[jobs.id].value + } + return scheduledJob.copy(id = id.toString()) + } + + override suspend fun setProgressMax(id: String, max: Long): Boolean { + val now = Instant.now(clock).toEpochMilli() + return query { + jobs.update({ jobs.id eq id.toLong() }) { + it[jobs.max] = max.toInt() + it[jobs.updatedAt] = now + } == 1 + } + } + + override suspend fun startProgress(id: String): Boolean { + val now = Instant.now(clock).toEpochMilli() + return query { + jobs.update({ jobs.id eq id.toLong() }) { + it[jobs.startedAt] = now + it[jobs.updatedAt] = now + } == 1 + } + } + + override suspend fun stepProgress(id: String, step: Long): Boolean { + val now = Instant.now(clock).toEpochMilli() + return query { + jobs.update({ jobs.id eq id.toLong() }) { + it[jobs.step] = jobs.step + step.toInt() + it[jobs.updatedAt] = now + } == 1 + } + } + + override suspend fun update( + id: String, + oldKjobId: UUID?, + kjobId: UUID?, + status: JobStatus, + statusMessage: String?, + retries: Int + ): Boolean { + return query { + jobs.update({ (jobs.id eq id.toLong()) and if (oldKjobId == null) jobs.kjobId.isNull() else jobs.kjobId eq oldKjobId.toString() }) { + it[jobs.status] = status.name + it[jobs.retries] = retries + it[jobs.updatedAt] = Instant.now(clock).toEpochMilli() + it[jobs.id] = id.toLong() + it[jobs.statusMessage] = statusMessage + it[jobs.kjobId] = kjobId.toString() + } == 1 + } + } + + private fun String?.parseJsonMap(): Map { + this ?: return emptyMap() + return json.parseToJsonElement(this).jsonObject.mapValues { (_, el) -> + if (el is JsonObject) { + val t = el["t"]?.jsonPrimitive?.content ?: error("Cannot get jsonPrimitive") + val value = el["v"]?.jsonArray ?: error("Cannot get jsonArray") + when (t) { + "s" -> value.map { it.jsonPrimitive.content } + "d" -> value.map { it.jsonPrimitive.double } + "l" -> value.map { it.jsonPrimitive.long } + "i" -> value.map { it.jsonPrimitive.int } + "b" -> value.map { it.jsonPrimitive.boolean } + else -> error("Unknown type prefix '$t'") + }.toList() + } else { + val content = el.jsonPrimitive.content + val t = content.substringBefore(':') + val value = content.substringAfter(':') + when (t) { + "s" -> value + "d" -> value.toDouble() + "l" -> value.toLong() + "i" -> value.toInt() + "b" -> value.toBoolean() + else -> error("Unknown type prefix '$t'") + } + } + } + } + + private fun Map.stringify(): String? { + if (isEmpty()) { + return null + } + + fun listSerialize(value: List<*>): JsonElement { + return if (value.isEmpty()) { + buildJsonObject { + put("t", "s") + putJsonArray("v") {} + } + } else { + val (t, values) = when (val item = value.first()) { + is Double -> "d" to (value as List).map(::JsonPrimitive) + is Long -> "l" to (value as List).map(::JsonPrimitive) + is Int -> "i" to (value as List).map(::JsonPrimitive) + is String -> "s" to (value as List).map(::JsonPrimitive) + is Boolean -> "b" to (value as List).map(::JsonPrimitive) + else -> error("Cannot serialize unsupported list property value: $value") + } + buildJsonObject { + put("t", t) + put("v", JsonArray(values)) + } + } + } + + fun createJsonPrimitive(string: String, value: Any) = JsonPrimitive("$string:$value") + + val jsonObject = JsonObject( + mapValues { (_, value) -> + when (value) { + is List<*> -> listSerialize(value) + is Double -> createJsonPrimitive("d", value) + is Long -> createJsonPrimitive("l", value) + is Int -> createJsonPrimitive("i", value) + is String -> createJsonPrimitive("s", value) + is Boolean -> createJsonPrimitive("b", value) + else -> error("Cannot serialize unsupported property value: $value") + } + } + ) + return json.encodeToString(jsonObject) + } + + private fun ResultRow.toScheduledJob(): ScheduledJob { + val single = this + jobs.run { + return ScheduledJob( + id = single[this.id].value.toString(), + status = JobStatus.valueOf(single[status]), + runAt = single[runAt]?.let { Instant.ofEpochMilli(it) }, + statusMessage = single[statusMessage], + retries = single[retries], + kjobId = single[kjobId]?.let { + try { + UUID.fromString(it) + } catch (e: IllegalArgumentException) { + null + } + }, + createdAt = Instant.ofEpochMilli(single[createdAt]), + updatedAt = Instant.ofEpochMilli(single[updatedAt]), + settings = JobSettings( + id = single[jobId], + name = single[name], + properties = single[properties].parseJsonMap() + ), + progress = JobProgress( + step = single[step].toLong(), + max = single[max]?.toLong(), + startedAt = single[startedAt]?.let { Instant.ofEpochMilli(it) }, + completedAt = single[completedAt]?.let { Instant.ofEpochMilli(it) } + ) + ) + } + } +} diff --git a/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedKJob.kt b/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedKJob.kt new file mode 100644 index 00000000..76d00008 --- /dev/null +++ b/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedKJob.kt @@ -0,0 +1,50 @@ +package dev.usbharu.kjob.exposed + +import kjob.core.BaseKJob +import kjob.core.KJob +import kjob.core.KJobFactory +import kotlinx.coroutines.runBlocking +import org.jetbrains.exposed.sql.Database +import java.time.Clock + +class ExposedKJob(config: Configuration) : BaseKJob(config) { + + companion object : KJobFactory { + override fun create(configure: Configuration.() -> Unit): KJob { + return ExposedKJob(Configuration().apply(configure)) + } + } + + class Configuration : BaseKJob.Configuration() { + var connectionString: String? = null + var driverClassName: String? = null + var connectionDatabase: Database? = null + + var jobTableName = "kjobJobs" + + var lockTableName = "kjobLocks" + + var expireLockInMinutes = 5L + } + + private val database: Database = config.connectionDatabase ?: Database.connect( + requireNotNull(config.connectionString), + requireNotNull(config.driverClassName) + ) + + override val jobRepository: ExposedJobRepository + get() = ExposedJobRepository(database, config.jobTableName, Clock.systemUTC(), config.json) + override val lockRepository: ExposedLockRepository + get() = ExposedLockRepository(database, config, clock) + + override fun start(): KJob { + jobRepository.createTable() + lockRepository.createTable() + return super.start() + } + + override fun shutdown() = runBlocking { + super.shutdown() + lockRepository.clearExpired() + } +} diff --git a/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedLockRepository.kt b/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedLockRepository.kt new file mode 100644 index 00000000..76d6fa44 --- /dev/null +++ b/src/main/kotlin/dev/usbharu/kjob/exposed/ExposedLockRepository.kt @@ -0,0 +1,74 @@ +package dev.usbharu.kjob.exposed + +import kjob.core.job.Lock +import kjob.core.repository.LockRepository +import kotlinx.coroutines.Dispatchers +import org.jetbrains.exposed.dao.id.UUIDTable +import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq +import org.jetbrains.exposed.sql.SqlExpressionBuilder.greater +import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction +import org.jetbrains.exposed.sql.transactions.transaction +import java.time.Clock +import java.time.Instant +import java.util.* +import kotlin.time.Duration.Companion.minutes + +class ExposedLockRepository( + private val database: Database, + private val config: ExposedKJob.Configuration, + private val clock: Clock +) : LockRepository { + + class Locks(tableName: String) : UUIDTable(tableName) { + val updatedAt = long("updatedAt") + val expiresAt = long("expiresAt") + } + + val locks: Locks = Locks(config.lockTableName) + + fun createTable() { + transaction(database) { + SchemaUtils.create(locks) + } + } + + suspend fun query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() } + + override suspend fun exists(id: UUID): Boolean { + val now = Instant.now(clock) + return query { + locks.select(locks.id eq id and locks.expiresAt.greater(now.toEpochMilli())).empty().not() + } + } + + override suspend fun ping(id: UUID): Lock { + val now = Instant.now(clock) + val expiresAt = now.plusSeconds(config.expireLockInMinutes.minutes.inWholeSeconds) + val lock = Lock(id, now) + query { + if (locks.select(locks.id eq id).limit(1) + .map { Lock(it[locks.id].value, Instant.ofEpochMilli(it[locks.expiresAt])) }.isEmpty() + ) { + locks.insert { + it[locks.id] = id + it[locks.updatedAt] = now.toEpochMilli() + it[locks.expiresAt] = expiresAt.toEpochMilli() + } + } else { + locks.update({ locks.id eq id }) { + it[locks.updatedAt] = now.toEpochMilli() + it[locks.expiresAt] = expiresAt.toEpochMilli() + } + } + } + return lock + } + + suspend fun clearExpired() { + val now = Instant.now(clock).toEpochMilli() + query { + locks.deleteWhere { locks.expiresAt greater now } + } + } +}