From 9074560d663c685b3969f112fd4953f0161b9764 Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Mon, 18 Mar 2024 14:35:02 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20priority=E3=82=92=E8=A8=AD=E5=AE=9A?= =?UTF-8?q?=E3=81=A7=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../owl/broker/mongodb/MongoModuleContext.kt | 2 +- .../mongodb/MongodbQueuedTaskRepository.kt | 17 ++++++++------ .../domain/model/queuedtask/QueuedTask.kt | 1 + .../grpc/TaskResultSubscribeService.kt | 8 ++----- .../broker/service/TaskManagementService.kt | 22 ++++++++++--------- .../src/main/proto/task_result_producer.proto | 2 ++ 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt index eece963..e3ab269 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt @@ -36,7 +36,7 @@ class MongoModuleContext : ModuleContext { ConnectionString( System.getProperty( "owl.broker.mongo.url", - "mongodb://agent1.build:27017" + "mongodb://localhost:27017" ) ) ) diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt index 3b6c550..0cfc7f0 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -109,6 +109,7 @@ data class QueuedTaskMongodb( val task: TaskMongodb, val attempt: Int, val queuedAt: Instant, + val priority:Int, val isActive: Boolean, val timeoutAt: Instant?, val assignedConsumer: String?, @@ -117,13 +118,14 @@ data class QueuedTaskMongodb( fun toQueuedTask(propertySerializerFactory: PropertySerializerFactory): QueuedTask { return QueuedTask( - attempt, - queuedAt, - task.toTask(propertySerializerFactory), - isActive, - timeoutAt, - assignedConsumer?.let { UUID.fromString(it) }, - assignedAt + attempt = attempt, + queuedAt = queuedAt, + task = task.toTask(propertySerializerFactory), + priority = priority, + isActive = isActive, + timeoutAt = timeoutAt, + assignedConsumer = assignedConsumer?.let { UUID.fromString(it) }, + assignedAt = assignedAt ) } @@ -174,6 +176,7 @@ data class QueuedTaskMongodb( TaskMongodb.of(propertySerializerFactory, queuedTask.task), queuedTask.attempt, queuedTask.queuedAt, + queuedTask.priority, queuedTask.isActive, queuedTask.timeoutAt, queuedTask.assignedConsumer?.toString(), diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt index f8c0437..b9dab65 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt @@ -28,6 +28,7 @@ data class QueuedTask( val attempt: Int, val queuedAt: Instant, val task: Task, + val priority: Int, val isActive: Boolean, val timeoutAt: Instant?, val assignedConsumer: UUID?, diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt index 9b93081..287dc44 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt @@ -16,18 +16,14 @@ package dev.usbharu.owl.broker.interfaces.grpc -import TaskResultProducer.TaskResults -import TaskResultSubscribeServiceGrpcKt -import dev.usbharu.owl.Uuid +import dev.usbharu.owl.* import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.service.TaskManagementService import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory -import dev.usbharu.owl.taskResult import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import org.koin.core.annotation.Singleton -import taskResults import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -38,7 +34,7 @@ class TaskResultSubscribeService( coroutineContext: CoroutineContext = EmptyCoroutineContext ) : TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) { - override fun subscribe(request: Uuid.UUID): Flow { + override fun subscribe(request: Uuid.UUID): Flow { return taskManagementService .subscribeResult(request.toUUID()) .map { diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt index 4cd78e5..066dafa 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -83,18 +83,20 @@ class TaskManagementServiceImpl( private suspend fun enqueueTask(task: Task): QueuedTask { - val queuedTask = QueuedTask( - task.attempt + 1, - Instant.now(), - task, - isActive = true, - timeoutAt = null, - null, - null - ) - val definedTask = taskDefinitionRepository.findByName(task.name) ?: throw TaskNotRegisterException("Task ${task.name} not definition.") + + val queuedTask = QueuedTask( + attempt = task.attempt + 1, + queuedAt = Instant.now(), + task = task, + priority = definedTask.priority, + isActive = true, + timeoutAt = null, + assignedConsumer = null, + assignedAt = null + ) + val copy = task.copy( nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy) .nextRetry(Instant.now(), queuedTask.attempt) diff --git a/broker/src/main/proto/task_result_producer.proto b/broker/src/main/proto/task_result_producer.proto index 8ab59a0..6102a02 100644 --- a/broker/src/main/proto/task_result_producer.proto +++ b/broker/src/main/proto/task_result_producer.proto @@ -2,6 +2,8 @@ syntax = "proto3"; import "uuid.proto"; import "task_result.proto"; +option java_package = "dev.usbharu.owl"; + message TaskResults { string name = 1; UUID id = 2;