feat: KJobを導入

This commit is contained in:
usbharu 2023-04-05 16:59:16 +09:00
parent d93c8af068
commit 07500651ae
8 changed files with 496 additions and 0 deletions

View File

@ -28,6 +28,14 @@ repositories {
mavenCentral()
}
kotlin {
target {
compilations.all {
kotlinOptions.jvmTarget = JavaVersion.VERSION_11.toString()
}
}
}
dependencies {
implementation("io.ktor:ktor-server-core-jvm:$ktor_version")
implementation("io.ktor:ktor-server-auth-jvm:$ktor_version")
@ -63,6 +71,8 @@ dependencies {
implementation("tech.barbero.http-messages-signing:http-messages-signing-core:1.0.0")
testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
implementation("org.drewcarlson:kjob-core:0.6.0")
}
jib {

View File

@ -0,0 +1,10 @@
package dev.usbharu.hideout.service.job
import kjob.core.Job
import kjob.core.dsl.ScheduleContext
interface JobQueueService {
fun init(jobDefines:List<Job>)
suspend fun <J : Job> schedule(job: J, block: ScheduleContext<J>.(J) -> Unit)
}

View File

@ -0,0 +1,10 @@
package dev.usbharu.hideout.service.job
import kjob.core.Job
import kjob.core.dsl.JobContextWithProps
import kjob.core.dsl.JobRegisterContext
import kjob.core.dsl.KJobFunctions
interface JobWorkerService {
fun init(defines: List<Pair<Job, JobRegisterContext<Job, JobContextWithProps<Job>>.(Job) -> KJobFunctions<Job, JobContextWithProps<Job>>>>)
}

View File

@ -0,0 +1,24 @@
package dev.usbharu.hideout.service.job
import dev.usbharu.kjob.exposed.ExposedKJob
import kjob.core.Job
import kjob.core.KJob
import kjob.core.dsl.ScheduleContext
import kjob.core.kjob
import org.jetbrains.exposed.sql.Database
class KJobJobQueueService(private val database: Database) : JobQueueService {
val kjob: KJob = kjob(ExposedKJob) {
connectionDatabase = database
isWorker = false
}.start()
override fun init(jobDefines: List<Job>) {
}
override suspend fun <J : Job> schedule(job: J,block:ScheduleContext<J>.(J)->Unit) {
kjob.schedule(job,block)
}
}

View File

@ -0,0 +1,28 @@
package dev.usbharu.hideout.service.job
import dev.usbharu.kjob.exposed.ExposedKJob
import kjob.core.Job
import kjob.core.dsl.JobContextWithProps
import kjob.core.dsl.JobRegisterContext
import kjob.core.dsl.KJobFunctions
import kjob.core.kjob
import org.jetbrains.exposed.sql.Database
class KJobJobWorkerService(private val database: Database) : JobWorkerService {
val kjob by lazy {
kjob(ExposedKJob) {
connectionDatabase = database
nonBlockingMaxJobs = 10
blockingMaxJobs = 10
jobExecutionPeriodInSeconds = 10
}.start()
}
override fun init(defines: List<Pair<Job,JobRegisterContext<Job, JobContextWithProps<Job>>.(Job) -> KJobFunctions<Job, JobContextWithProps<Job>>>>) {
defines.forEach { job ->
kjob.register(job.first, job.second)
}
}
}

View File

@ -0,0 +1,290 @@
package dev.usbharu.kjob.exposed
import kjob.core.job.JobProgress
import kjob.core.job.JobSettings
import kjob.core.job.JobStatus
import kjob.core.job.ScheduledJob
import kjob.core.repository.JobRepository
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.*
import org.jetbrains.exposed.dao.id.LongIdTable
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList
import org.jetbrains.exposed.sql.SqlExpressionBuilder.isNull
import org.jetbrains.exposed.sql.SqlExpressionBuilder.plus
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Clock
import java.time.Instant
import java.util.*
class ExposedJobRepository(
private val database: Database,
private val tableName: String,
private val clock: Clock,
private val json: Json
) :
JobRepository {
class Jobs(tableName: String) : LongIdTable(tableName) {
val status = text("status")
val runAt = long("runAt").nullable()
val statusMessage = text("statusMessage").nullable()
val retries = integer("retries")
val kjobId = char("kjobId", 36).nullable()
val createdAt = long("createdAt")
val updatedAt = long("updatedAt")
val jobId = text("jobId")
val name = text("name")
val properties = text("properties").nullable()
val step = integer("step")
val max = integer("max").nullable()
val startedAt = long("startedAt").nullable()
val completedAt = long("completedAt").nullable()
}
val jobs: Jobs = Jobs(tableName)
fun createTable() {
transaction(database) {
SchemaUtils.create(jobs)
}
}
suspend fun <T> query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() }
override suspend fun completeProgress(id: String): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.completedAt] = now
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun exist(jobId: String): Boolean {
return query {
jobs.select(jobs.jobId eq jobId).empty().not()
}
}
override suspend fun findNext(names: Set<String>, status: Set<JobStatus>, limit: Int): Flow<ScheduledJob> {
return query {
jobs.select(
jobs.status.inList(list = status.map { it.name })
.and(if (names.isEmpty()) Op.TRUE else jobs.name.inList(names))
).limit(limit)
.map { it.toScheduledJob() }.asFlow()
}
}
override suspend fun get(id: String): ScheduledJob? {
val single = query { jobs.select(jobs.id eq id.toLong()).singleOrNull() } ?: return null
return single.toScheduledJob()
}
override suspend fun reset(id: String, oldKjobId: UUID?): Boolean {
return query {
jobs.update({ jobs.id eq id.toLong() and if (oldKjobId == null) jobs.kjobId.isNull() else jobs.kjobId eq oldKjobId.toString() }) {
it[jobs.status] = JobStatus.CREATED.name
it[jobs.statusMessage] = null
it[jobs.kjobId] = null
it[jobs.step] = 0
it[jobs.max] = null
it[jobs.startedAt] = null
it[jobs.completedAt] = null
it[jobs.updatedAt] = Instant.now(clock).toEpochMilli()
} == 1
}
}
override suspend fun save(jobSettings: JobSettings, runAt: Instant?): ScheduledJob {
val now = Instant.now(clock)
val scheduledJob =
ScheduledJob("", JobStatus.CREATED, runAt, null, 0, null, now, now, jobSettings, JobProgress(0))
val id = query {
jobs.insert {
it[jobs.status] = scheduledJob.status.name
it[jobs.createdAt] = scheduledJob.createdAt.toEpochMilli()
it[jobs.updatedAt] = scheduledJob.updatedAt.toEpochMilli()
it[jobs.jobId] = scheduledJob.settings.id
it[jobs.name] = scheduledJob.settings.name
it[jobs.properties] = scheduledJob.settings.properties.stringify()
it[jobs.runAt] = scheduledJob.runAt?.toEpochMilli()
it[jobs.statusMessage] = null
it[jobs.retries] = 0
it[jobs.kjobId] = null
it[jobs.step] = 0
it[jobs.max] = null
it[jobs.startedAt] = null
it[jobs.completedAt] = null
}[jobs.id].value
}
return scheduledJob.copy(id = id.toString())
}
override suspend fun setProgressMax(id: String, max: Long): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.max] = max.toInt()
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun startProgress(id: String): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.startedAt] = now
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun stepProgress(id: String, step: Long): Boolean {
val now = Instant.now(clock).toEpochMilli()
return query {
jobs.update({ jobs.id eq id.toLong() }) {
it[jobs.step] = jobs.step + step.toInt()
it[jobs.updatedAt] = now
} == 1
}
}
override suspend fun update(
id: String,
oldKjobId: UUID?,
kjobId: UUID?,
status: JobStatus,
statusMessage: String?,
retries: Int
): Boolean {
return query {
jobs.update({ (jobs.id eq id.toLong()) and if (oldKjobId == null) jobs.kjobId.isNull() else jobs.kjobId eq oldKjobId.toString() }) {
it[jobs.status] = status.name
it[jobs.retries] = retries
it[jobs.updatedAt] = Instant.now(clock).toEpochMilli()
it[jobs.id] = id.toLong()
it[jobs.statusMessage] = statusMessage
it[jobs.kjobId] = kjobId.toString()
} == 1
}
}
private fun String?.parseJsonMap(): Map<String, Any> {
this ?: return emptyMap()
return json.parseToJsonElement(this).jsonObject.mapValues { (_, el) ->
if (el is JsonObject) {
val t = el["t"]?.jsonPrimitive?.content ?: error("Cannot get jsonPrimitive")
val value = el["v"]?.jsonArray ?: error("Cannot get jsonArray")
when (t) {
"s" -> value.map { it.jsonPrimitive.content }
"d" -> value.map { it.jsonPrimitive.double }
"l" -> value.map { it.jsonPrimitive.long }
"i" -> value.map { it.jsonPrimitive.int }
"b" -> value.map { it.jsonPrimitive.boolean }
else -> error("Unknown type prefix '$t'")
}.toList()
} else {
val content = el.jsonPrimitive.content
val t = content.substringBefore(':')
val value = content.substringAfter(':')
when (t) {
"s" -> value
"d" -> value.toDouble()
"l" -> value.toLong()
"i" -> value.toInt()
"b" -> value.toBoolean()
else -> error("Unknown type prefix '$t'")
}
}
}
}
private fun Map<String, Any>.stringify(): String? {
if (isEmpty()) {
return null
}
fun listSerialize(value: List<*>): JsonElement {
return if (value.isEmpty()) {
buildJsonObject {
put("t", "s")
putJsonArray("v") {}
}
} else {
val (t, values) = when (val item = value.first()) {
is Double -> "d" to (value as List<Double>).map(::JsonPrimitive)
is Long -> "l" to (value as List<Long>).map(::JsonPrimitive)
is Int -> "i" to (value as List<Int>).map(::JsonPrimitive)
is String -> "s" to (value as List<String>).map(::JsonPrimitive)
is Boolean -> "b" to (value as List<Boolean>).map(::JsonPrimitive)
else -> error("Cannot serialize unsupported list property value: $value")
}
buildJsonObject {
put("t", t)
put("v", JsonArray(values))
}
}
}
fun createJsonPrimitive(string: String, value: Any) = JsonPrimitive("$string:$value")
val jsonObject = JsonObject(
mapValues { (_, value) ->
when (value) {
is List<*> -> listSerialize(value)
is Double -> createJsonPrimitive("d", value)
is Long -> createJsonPrimitive("l", value)
is Int -> createJsonPrimitive("i", value)
is String -> createJsonPrimitive("s", value)
is Boolean -> createJsonPrimitive("b", value)
else -> error("Cannot serialize unsupported property value: $value")
}
}
)
return json.encodeToString(jsonObject)
}
private fun ResultRow.toScheduledJob(): ScheduledJob {
val single = this
jobs.run {
return ScheduledJob(
id = single[this.id].value.toString(),
status = JobStatus.valueOf(single[status]),
runAt = single[runAt]?.let { Instant.ofEpochMilli(it) },
statusMessage = single[statusMessage],
retries = single[retries],
kjobId = single[kjobId]?.let {
try {
UUID.fromString(it)
} catch (e: IllegalArgumentException) {
null
}
},
createdAt = Instant.ofEpochMilli(single[createdAt]),
updatedAt = Instant.ofEpochMilli(single[updatedAt]),
settings = JobSettings(
id = single[jobId],
name = single[name],
properties = single[properties].parseJsonMap()
),
progress = JobProgress(
step = single[step].toLong(),
max = single[max]?.toLong(),
startedAt = single[startedAt]?.let { Instant.ofEpochMilli(it) },
completedAt = single[completedAt]?.let { Instant.ofEpochMilli(it) }
)
)
}
}
}

View File

@ -0,0 +1,50 @@
package dev.usbharu.kjob.exposed
import kjob.core.BaseKJob
import kjob.core.KJob
import kjob.core.KJobFactory
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.Database
import java.time.Clock
class ExposedKJob(config: Configuration) : BaseKJob<ExposedKJob.Configuration>(config) {
companion object : KJobFactory<ExposedKJob, Configuration> {
override fun create(configure: Configuration.() -> Unit): KJob {
return ExposedKJob(Configuration().apply(configure))
}
}
class Configuration : BaseKJob.Configuration() {
var connectionString: String? = null
var driverClassName: String? = null
var connectionDatabase: Database? = null
var jobTableName = "kjobJobs"
var lockTableName = "kjobLocks"
var expireLockInMinutes = 5L
}
private val database: Database = config.connectionDatabase ?: Database.connect(
requireNotNull(config.connectionString),
requireNotNull(config.driverClassName)
)
override val jobRepository: ExposedJobRepository
get() = ExposedJobRepository(database, config.jobTableName, Clock.systemUTC(), config.json)
override val lockRepository: ExposedLockRepository
get() = ExposedLockRepository(database, config, clock)
override fun start(): KJob {
jobRepository.createTable()
lockRepository.createTable()
return super.start()
}
override fun shutdown() = runBlocking {
super.shutdown()
lockRepository.clearExpired()
}
}

View File

@ -0,0 +1,74 @@
package dev.usbharu.kjob.exposed
import kjob.core.job.Lock
import kjob.core.repository.LockRepository
import kotlinx.coroutines.Dispatchers
import org.jetbrains.exposed.dao.id.UUIDTable
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.greater
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.jetbrains.exposed.sql.transactions.transaction
import java.time.Clock
import java.time.Instant
import java.util.*
import kotlin.time.Duration.Companion.minutes
class ExposedLockRepository(
private val database: Database,
private val config: ExposedKJob.Configuration,
private val clock: Clock
) : LockRepository {
class Locks(tableName: String) : UUIDTable(tableName) {
val updatedAt = long("updatedAt")
val expiresAt = long("expiresAt")
}
val locks: Locks = Locks(config.lockTableName)
fun createTable() {
transaction(database) {
SchemaUtils.create(locks)
}
}
suspend fun <T> query(block: suspend () -> T): T = newSuspendedTransaction(Dispatchers.IO) { block() }
override suspend fun exists(id: UUID): Boolean {
val now = Instant.now(clock)
return query {
locks.select(locks.id eq id and locks.expiresAt.greater(now.toEpochMilli())).empty().not()
}
}
override suspend fun ping(id: UUID): Lock {
val now = Instant.now(clock)
val expiresAt = now.plusSeconds(config.expireLockInMinutes.minutes.inWholeSeconds)
val lock = Lock(id, now)
query {
if (locks.select(locks.id eq id).limit(1)
.map { Lock(it[locks.id].value, Instant.ofEpochMilli(it[locks.expiresAt])) }.isEmpty()
) {
locks.insert {
it[locks.id] = id
it[locks.updatedAt] = now.toEpochMilli()
it[locks.expiresAt] = expiresAt.toEpochMilli()
}
} else {
locks.update({ locks.id eq id }) {
it[locks.updatedAt] = now.toEpochMilli()
it[locks.expiresAt] = expiresAt.toEpochMilli()
}
}
}
return lock
}
suspend fun clearExpired() {
val now = Instant.now(clock).toEpochMilli()
query {
locks.deleteWhere { locks.expiresAt greater now }
}
}
}