feat: priorityを設定できるように
This commit is contained in:
parent
e9277db29f
commit
9074560d66
|
@ -36,7 +36,7 @@ class MongoModuleContext : ModuleContext {
|
||||||
ConnectionString(
|
ConnectionString(
|
||||||
System.getProperty(
|
System.getProperty(
|
||||||
"owl.broker.mongo.url",
|
"owl.broker.mongo.url",
|
||||||
"mongodb://agent1.build:27017"
|
"mongodb://localhost:27017"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -109,6 +109,7 @@ data class QueuedTaskMongodb(
|
||||||
val task: TaskMongodb,
|
val task: TaskMongodb,
|
||||||
val attempt: Int,
|
val attempt: Int,
|
||||||
val queuedAt: Instant,
|
val queuedAt: Instant,
|
||||||
|
val priority:Int,
|
||||||
val isActive: Boolean,
|
val isActive: Boolean,
|
||||||
val timeoutAt: Instant?,
|
val timeoutAt: Instant?,
|
||||||
val assignedConsumer: String?,
|
val assignedConsumer: String?,
|
||||||
|
@ -117,13 +118,14 @@ data class QueuedTaskMongodb(
|
||||||
|
|
||||||
fun toQueuedTask(propertySerializerFactory: PropertySerializerFactory): QueuedTask {
|
fun toQueuedTask(propertySerializerFactory: PropertySerializerFactory): QueuedTask {
|
||||||
return QueuedTask(
|
return QueuedTask(
|
||||||
attempt,
|
attempt = attempt,
|
||||||
queuedAt,
|
queuedAt = queuedAt,
|
||||||
task.toTask(propertySerializerFactory),
|
task = task.toTask(propertySerializerFactory),
|
||||||
isActive,
|
priority = priority,
|
||||||
timeoutAt,
|
isActive = isActive,
|
||||||
assignedConsumer?.let { UUID.fromString(it) },
|
timeoutAt = timeoutAt,
|
||||||
assignedAt
|
assignedConsumer = assignedConsumer?.let { UUID.fromString(it) },
|
||||||
|
assignedAt = assignedAt
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,6 +176,7 @@ data class QueuedTaskMongodb(
|
||||||
TaskMongodb.of(propertySerializerFactory, queuedTask.task),
|
TaskMongodb.of(propertySerializerFactory, queuedTask.task),
|
||||||
queuedTask.attempt,
|
queuedTask.attempt,
|
||||||
queuedTask.queuedAt,
|
queuedTask.queuedAt,
|
||||||
|
queuedTask.priority,
|
||||||
queuedTask.isActive,
|
queuedTask.isActive,
|
||||||
queuedTask.timeoutAt,
|
queuedTask.timeoutAt,
|
||||||
queuedTask.assignedConsumer?.toString(),
|
queuedTask.assignedConsumer?.toString(),
|
||||||
|
|
|
@ -28,6 +28,7 @@ data class QueuedTask(
|
||||||
val attempt: Int,
|
val attempt: Int,
|
||||||
val queuedAt: Instant,
|
val queuedAt: Instant,
|
||||||
val task: Task,
|
val task: Task,
|
||||||
|
val priority: Int,
|
||||||
val isActive: Boolean,
|
val isActive: Boolean,
|
||||||
val timeoutAt: Instant?,
|
val timeoutAt: Instant?,
|
||||||
val assignedConsumer: UUID?,
|
val assignedConsumer: UUID?,
|
||||||
|
|
|
@ -16,18 +16,14 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
import TaskResultProducer.TaskResults
|
import dev.usbharu.owl.*
|
||||||
import TaskResultSubscribeServiceGrpcKt
|
|
||||||
import dev.usbharu.owl.Uuid
|
|
||||||
import dev.usbharu.owl.broker.external.toUUID
|
import dev.usbharu.owl.broker.external.toUUID
|
||||||
import dev.usbharu.owl.broker.service.TaskManagementService
|
import dev.usbharu.owl.broker.service.TaskManagementService
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.taskResult
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import org.koin.core.annotation.Singleton
|
import org.koin.core.annotation.Singleton
|
||||||
import taskResults
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
|
@ -38,7 +34,7 @@ class TaskResultSubscribeService(
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext
|
coroutineContext: CoroutineContext = EmptyCoroutineContext
|
||||||
) :
|
) :
|
||||||
TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) {
|
TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) {
|
||||||
override fun subscribe(request: Uuid.UUID): Flow<TaskResults> {
|
override fun subscribe(request: Uuid.UUID): Flow<TaskResultProducer.TaskResults> {
|
||||||
return taskManagementService
|
return taskManagementService
|
||||||
.subscribeResult(request.toUUID())
|
.subscribeResult(request.toUUID())
|
||||||
.map {
|
.map {
|
||||||
|
|
|
@ -83,18 +83,20 @@ class TaskManagementServiceImpl(
|
||||||
|
|
||||||
private suspend fun enqueueTask(task: Task): QueuedTask {
|
private suspend fun enqueueTask(task: Task): QueuedTask {
|
||||||
|
|
||||||
val queuedTask = QueuedTask(
|
|
||||||
task.attempt + 1,
|
|
||||||
Instant.now(),
|
|
||||||
task,
|
|
||||||
isActive = true,
|
|
||||||
timeoutAt = null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
)
|
|
||||||
|
|
||||||
val definedTask = taskDefinitionRepository.findByName(task.name)
|
val definedTask = taskDefinitionRepository.findByName(task.name)
|
||||||
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
|
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
|
||||||
|
|
||||||
|
val queuedTask = QueuedTask(
|
||||||
|
attempt = task.attempt + 1,
|
||||||
|
queuedAt = Instant.now(),
|
||||||
|
task = task,
|
||||||
|
priority = definedTask.priority,
|
||||||
|
isActive = true,
|
||||||
|
timeoutAt = null,
|
||||||
|
assignedConsumer = null,
|
||||||
|
assignedAt = null
|
||||||
|
)
|
||||||
|
|
||||||
val copy = task.copy(
|
val copy = task.copy(
|
||||||
nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy)
|
nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy)
|
||||||
.nextRetry(Instant.now(), queuedTask.attempt)
|
.nextRetry(Instant.now(), queuedTask.attempt)
|
||||||
|
|
|
@ -2,6 +2,8 @@ syntax = "proto3";
|
||||||
import "uuid.proto";
|
import "uuid.proto";
|
||||||
import "task_result.proto";
|
import "task_result.proto";
|
||||||
|
|
||||||
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message TaskResults {
|
message TaskResults {
|
||||||
string name = 1;
|
string name = 1;
|
||||||
UUID id = 2;
|
UUID id = 2;
|
||||||
|
|
Loading…
Reference in New Issue