feat: priorityを設定できるように

This commit is contained in:
usbharu 2024-03-18 14:35:02 +09:00
parent fbafee4e5e
commit cbfcc21dee
6 changed files with 28 additions and 24 deletions

View File

@ -36,7 +36,7 @@ class MongoModuleContext : ModuleContext {
ConnectionString( ConnectionString(
System.getProperty( System.getProperty(
"owl.broker.mongo.url", "owl.broker.mongo.url",
"mongodb://agent1.build:27017" "mongodb://localhost:27017"
) )
) )
) )

View File

@ -109,6 +109,7 @@ data class QueuedTaskMongodb(
val task: TaskMongodb, val task: TaskMongodb,
val attempt: Int, val attempt: Int,
val queuedAt: Instant, val queuedAt: Instant,
val priority:Int,
val isActive: Boolean, val isActive: Boolean,
val timeoutAt: Instant?, val timeoutAt: Instant?,
val assignedConsumer: String?, val assignedConsumer: String?,
@ -117,13 +118,14 @@ data class QueuedTaskMongodb(
fun toQueuedTask(propertySerializerFactory: PropertySerializerFactory): QueuedTask { fun toQueuedTask(propertySerializerFactory: PropertySerializerFactory): QueuedTask {
return QueuedTask( return QueuedTask(
attempt, attempt = attempt,
queuedAt, queuedAt = queuedAt,
task.toTask(propertySerializerFactory), task = task.toTask(propertySerializerFactory),
isActive, priority = priority,
timeoutAt, isActive = isActive,
assignedConsumer?.let { UUID.fromString(it) }, timeoutAt = timeoutAt,
assignedAt assignedConsumer = assignedConsumer?.let { UUID.fromString(it) },
assignedAt = assignedAt
) )
} }
@ -174,6 +176,7 @@ data class QueuedTaskMongodb(
TaskMongodb.of(propertySerializerFactory, queuedTask.task), TaskMongodb.of(propertySerializerFactory, queuedTask.task),
queuedTask.attempt, queuedTask.attempt,
queuedTask.queuedAt, queuedTask.queuedAt,
queuedTask.priority,
queuedTask.isActive, queuedTask.isActive,
queuedTask.timeoutAt, queuedTask.timeoutAt,
queuedTask.assignedConsumer?.toString(), queuedTask.assignedConsumer?.toString(),

View File

@ -28,6 +28,7 @@ data class QueuedTask(
val attempt: Int, val attempt: Int,
val queuedAt: Instant, val queuedAt: Instant,
val task: Task, val task: Task,
val priority: Int,
val isActive: Boolean, val isActive: Boolean,
val timeoutAt: Instant?, val timeoutAt: Instant?,
val assignedConsumer: UUID?, val assignedConsumer: UUID?,

View File

@ -16,18 +16,14 @@
package dev.usbharu.owl.broker.interfaces.grpc package dev.usbharu.owl.broker.interfaces.grpc
import TaskResultProducer.TaskResults import dev.usbharu.owl.*
import TaskResultSubscribeServiceGrpcKt
import dev.usbharu.owl.Uuid
import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.TaskManagementService import dev.usbharu.owl.broker.service.TaskManagementService
import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.taskResult
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
import taskResults
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
@ -38,7 +34,7 @@ class TaskResultSubscribeService(
coroutineContext: CoroutineContext = EmptyCoroutineContext coroutineContext: CoroutineContext = EmptyCoroutineContext
) : ) :
TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) { TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) {
override fun subscribe(request: Uuid.UUID): Flow<TaskResults> { override fun subscribe(request: Uuid.UUID): Flow<TaskResultProducer.TaskResults> {
return taskManagementService return taskManagementService
.subscribeResult(request.toUUID()) .subscribeResult(request.toUUID())
.map { .map {

View File

@ -83,18 +83,20 @@ class TaskManagementServiceImpl(
private suspend fun enqueueTask(task: Task): QueuedTask { private suspend fun enqueueTask(task: Task): QueuedTask {
val queuedTask = QueuedTask(
task.attempt + 1,
Instant.now(),
task,
isActive = true,
timeoutAt = null,
null,
null
)
val definedTask = taskDefinitionRepository.findByName(task.name) val definedTask = taskDefinitionRepository.findByName(task.name)
?: throw TaskNotRegisterException("Task ${task.name} not definition.") ?: throw TaskNotRegisterException("Task ${task.name} not definition.")
val queuedTask = QueuedTask(
attempt = task.attempt + 1,
queuedAt = Instant.now(),
task = task,
priority = definedTask.priority,
isActive = true,
timeoutAt = null,
assignedConsumer = null,
assignedAt = null
)
val copy = task.copy( val copy = task.copy(
nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy) nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy)
.nextRetry(Instant.now(), queuedTask.attempt) .nextRetry(Instant.now(), queuedTask.attempt)

View File

@ -2,6 +2,8 @@ syntax = "proto3";
import "uuid.proto"; import "uuid.proto";
import "task_result.proto"; import "task_result.proto";
option java_package = "dev.usbharu.owl";
message TaskResults { message TaskResults {
string name = 1; string name = 1;
UUID id = 2; UUID id = 2;