feat: EmbeddedOwlProducerを追加

This commit is contained in:
usbharu 2024-05-01 22:21:00 +09:00
parent 297c29f020
commit 949b0a935f
1 changed files with 3 additions and 40 deletions

View File

@ -18,7 +18,7 @@ package dev.usbharu.owl.producer.embedded
import dev.usbharu.owl.broker.ModuleContext
import dev.usbharu.owl.broker.OwlBrokerApplication
import dev.usbharu.owl.broker.service.*
import dev.usbharu.owl.broker.service.RetryPolicyFactory
import dev.usbharu.owl.common.task.PublishedTask
import dev.usbharu.owl.common.task.Task
import dev.usbharu.owl.common.task.TaskDefinition
@ -27,22 +27,16 @@ import org.koin.core.Koin
import org.koin.core.context.GlobalContext.startKoin
import org.koin.dsl.module
import org.koin.ksp.generated.defaultModule
import java.time.Instant
import java.util.*
class EmbeddedGrpcOwlProducer(
private val moduleContext: ModuleContext,
private val retryPolicyFactory: RetryPolicyFactory,
private val name: String,
private val port: Int,
private val owlProducer: OwlProducer,
) : OwlProducer {
private lateinit var producerId: UUID
private lateinit var application: Koin
private val taskMap: MutableMap<Class<*>, TaskDefinition<*>> = mutableMapOf()
override suspend fun start() {
application = startKoin {
printLogger()
@ -55,45 +49,14 @@ class EmbeddedGrpcOwlProducer(
modules(module, defaultModule, moduleContext.module())
}.koin
val producerService = application.get<ProducerService>()
producerId = producerService.registerProducer(RegisterProducerRequest(name, name))
application.get<OwlBrokerApplication>().start(port)
}
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
application.get<RegisterTaskService>()
.registerTask(
dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition(
name = taskDefinition.name,
priority = taskDefinition.priority,
maxRetry = taskDefinition.maxRetry,
timeoutMilli = taskDefinition.timeoutMilli,
propertyDefinitionHash = taskDefinition.propertyDefinition.hash(),
retryPolicy = taskDefinition.retryPolicy
)
)
taskMap[taskDefinition.type] = taskDefinition
owlProducer.registerTask(taskDefinition)
}
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition<T>
val publishTask = application.get<TaskPublishService>().publishTask(
PublishTask(
taskDefinition.name,
producerId,
taskDefinition.serialize(task)
)
)
return PublishedTask(
task,
publishTask.id,
Instant.now()
)
return owlProducer.publishTask(task)
}
}