feat: 型安全ジョブキューの骨組みを作成

This commit is contained in:
usbharu 2023-11-26 12:40:59 +09:00
parent 1e8f49b554
commit 25b689b73a
9 changed files with 42 additions and 21 deletions

View File

@ -4,5 +4,5 @@ import dev.usbharu.hideout.core.external.job.HideoutJob
import kjob.core.dsl.JobContextWithProps
interface ApJobService {
suspend fun <T : HideoutJob> processActivity(job: JobContextWithProps<T>, hideoutJob: HideoutJob)
suspend fun <T, R : HideoutJob<T, R>> processActivity(job: JobContextWithProps<R>, hideoutJob: HideoutJob<T, R>)
}

View File

@ -21,7 +21,10 @@ class ApJobServiceImpl(
private val inboxJobProcessor: InboxJobProcessor
) : ApJobService {
@Suppress("REDUNDANT_ELSE_IN_WHEN")
override suspend fun <T : HideoutJob> processActivity(job: JobContextWithProps<T>, hideoutJob: HideoutJob) {
override suspend fun <T, R : HideoutJob<T, R>> processActivity(
job: JobContextWithProps<R>,
hideoutJob: HideoutJob<T, R>
) {
logger.debug("processActivity: ${hideoutJob.name}")
@Suppress("ElseCaseInsteadOfExhaustiveWhen")

View File

@ -11,7 +11,10 @@ import org.springframework.boot.ApplicationRunner
import org.springframework.stereotype.Component
@Component
class JobQueueRunner(private val jobQueueParentService: JobQueueParentService, private val jobs: List<HideoutJob>) :
class JobQueueRunner(
private val jobQueueParentService: JobQueueParentService,
private val jobs: List<HideoutJob<*, *>>
) :
ApplicationRunner {
override fun run(args: ApplicationArguments?) {
LOGGER.info("Init job queue. ${jobs.size}")
@ -26,7 +29,7 @@ class JobQueueRunner(private val jobQueueParentService: JobQueueParentService, p
@Component
class JobQueueWorkerRunner(
private val jobQueueWorkerService: JobQueueWorkerService,
private val jobs: List<HideoutJob>,
private val jobs: List<HideoutJob<*, *>>,
private val apJobService: ApJobService
) : ApplicationRunner {
override fun run(args: ApplicationArguments?) {

View File

@ -6,9 +6,10 @@ import kjob.core.dsl.ScheduleContext
import kjob.core.job.JobProps
import org.springframework.stereotype.Component
abstract class HideoutJob<T, R : HideoutJob<T, R>>(name: String = "") : Job(name) {
abstract fun convert(value: T): ScheduleContext<R>.(R) -> Unit
abstract fun convert(props: JobProps<R>): T
abstract class HideoutJob<out T, out R : HideoutJob<T, R>>(name: String = "") : Job(name) {
abstract fun convert(value: @UnsafeVariance T): ScheduleContext<@UnsafeVariance R>.(@UnsafeVariance R) -> Unit
fun convertUnsafe(props: JobProps<*>): T = convert(props as JobProps<R>)
abstract fun convert(props: JobProps<@UnsafeVariance R>): T
}
data class ReceiveFollowJobParam(

View File

@ -1,6 +1,7 @@
package dev.usbharu.hideout.core.infrastructure.kjobexposed
import dev.usbharu.hideout.core.external.job.HideoutJob
import dev.usbharu.hideout.core.service.job.JobProcessor
import dev.usbharu.hideout.core.service.job.JobQueueWorkerService
import kjob.core.dsl.JobContextWithProps
import kjob.core.dsl.JobRegisterContext
@ -12,7 +13,7 @@ import org.springframework.stereotype.Service
@Service
@ConditionalOnProperty(name = ["hideout.use-mongodb"], havingValue = "false", matchIfMissing = true)
class KJobJobQueueWorkerService() : JobQueueWorkerService {
class KJobJobQueueWorkerService(private val jobQueueProcessorList: List<JobProcessor<*, *>>) : JobQueueWorkerService {
val kjob by lazy {
kjob(ExposedKJob) {
@ -27,5 +28,14 @@ class KJobJobQueueWorkerService() : JobQueueWorkerService {
defines.forEach { job ->
kjob.register(job.first, job.second)
}
for (jobProcessor in jobQueueProcessorList) {
kjob.register(jobProcessor.job()) {
execute {
val param = it.convertUnsafe(props)
jobProcessor.process(param)
}
}
}
}
}

View File

@ -1,15 +1,15 @@
package dev.usbharu.hideout.core.infrastructure.kjobmongodb
import com.mongodb.reactivestreams.client.MongoClient
import dev.usbharu.hideout.core.external.job.HideoutJob
import dev.usbharu.hideout.core.service.job.JobQueueWorkerService
import kjob.core.dsl.JobContextWithProps
import kjob.core.dsl.JobRegisterContext
import kjob.core.dsl.KJobFunctions
import kjob.core.kjob
import kjob.mongo.Mongo
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.stereotype.Service
import dev.usbharu.hideout.core.external.job.HideoutJob as HJ
import kjob.core.dsl.JobContextWithProps as JCWP
@Service
@ConditionalOnProperty(name = ["hideout.use-mongodb"], havingValue = "true", matchIfMissing = false)
@ -23,9 +23,7 @@ class KJobMongoJobQueueWorkerService(private val mongoClient: MongoClient) : Job
}.start()
}
override fun init(
defines: List<Pair<HJ, JobRegisterContext<HJ, JCWP<HJ>>.(HJ) -> KJobFunctions<HJ, JCWP<HJ>>>>
) {
override fun <T, R : HideoutJob<T, R>> init(defines: List<Pair<R, JobRegisterContext<R, JobContextWithProps<R>>.(R) -> KJobFunctions<R, JobContextWithProps<R>>>>) {
defines.forEach { job ->
kjob.register(job.first, job.second)
}

View File

@ -1,6 +1,7 @@
package dev.usbharu.hideout.core.infrastructure.kjobmongodb
import com.mongodb.reactivestreams.client.MongoClient
import dev.usbharu.hideout.core.external.job.HideoutJob
import dev.usbharu.hideout.core.service.job.JobQueueParentService
import kjob.core.Job
import kjob.core.dsl.ScheduleContext
@ -23,10 +24,15 @@ class KjobMongoJobQueueParentService(private val mongoClient: MongoClient) : Job
override fun init(jobDefines: List<Job>) = Unit
@Deprecated("use type safe → scheduleTypeSafe")
override suspend fun <J : Job> schedule(job: J, block: ScheduleContext<J>.(J) -> Unit) {
kjob.schedule(job, block)
}
override suspend fun <T, J : HideoutJob<T, J>> scheduleTypeSafe(job: J, jobProps: T) {
TODO("Not yet implemented")
}
override fun close() {
kjob.shutdown()
}

View File

@ -0,0 +1,8 @@
package dev.usbharu.hideout.core.service.job
import dev.usbharu.hideout.core.external.job.HideoutJob
interface JobProcessor<out T, out R : HideoutJob<T, R>> {
suspend fun process(param: @UnsafeVariance T)
fun job(): R
}

View File

@ -1,8 +0,0 @@
package dev.usbharu.hideout.core.service.job
import dev.usbharu.hideout.core.external.job.HideoutJob
interface JobProcessorService<T, R : HideoutJob<T, R>> {
suspend fun process(param: T)
suspend fun job(): Class<R>
}