diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt index 3420199..3b6c550 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -62,11 +62,13 @@ class MongodbQueuedTaskRepository( val findOneAndUpdate = collection.findOneAndUpdate( and( eq("_id", id.toString()), - eq(QueuedTaskMongodb::assignedConsumer.name, null) + eq(QueuedTaskMongodb::isActive.name, true) ), listOf( set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), - set(QueuedTaskMongodb::assignedAt.name, update.assignedAt) + set(QueuedTaskMongodb::assignedAt.name, update.assignedAt), + set(QueuedTaskMongodb::queuedAt.name, update.queuedAt), + set(QueuedTaskMongodb::isActive.name, update.isActive) ), FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER) ) @@ -77,20 +79,25 @@ class MongodbQueuedTaskRepository( } } - override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( + override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority( tasks: List, limit: Int ): Flow { return collection.find( and( `in`("task.name", tasks), - eq(QueuedTaskMongodb::assignedConsumer.name, null) + eq(QueuedTaskMongodb::isActive.name, true) ) ).map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } - override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow { - return collection.find(lte(QueuedTaskMongodb::queuedAt.name, instant)) + override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow { + return collection.find( + and( + lte(QueuedTaskMongodb::queuedAt.name, instant), + eq(QueuedTaskMongodb::isActive.name, true) + ) + ) .map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } } @@ -102,6 +109,8 @@ data class QueuedTaskMongodb( val task: TaskMongodb, val attempt: Int, val queuedAt: Instant, + val isActive: Boolean, + val timeoutAt: Instant?, val assignedConsumer: String?, val assignedAt: Instant? ) { @@ -111,6 +120,8 @@ data class QueuedTaskMongodb( attempt, queuedAt, task.toTask(propertySerializerFactory), + isActive, + timeoutAt, assignedConsumer?.let { UUID.fromString(it) }, assignedAt ) @@ -155,6 +166,7 @@ data class QueuedTaskMongodb( } } } + companion object { fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb { return QueuedTaskMongodb( @@ -162,6 +174,8 @@ data class QueuedTaskMongodb( TaskMongodb.of(propertySerializerFactory, queuedTask.task), queuedTask.attempt, queuedTask.queuedAt, + queuedTask.isActive, + queuedTask.timeoutAt, queuedTask.assignedConsumer?.toString(), queuedTask.assignedAt ) diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 1403ab5..6840c41 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.singleOrNull import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId @@ -52,6 +53,10 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) .map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } + + override suspend fun findById(uuid: UUID): Task? = withContext(Dispatchers.IO) { + collection.find(Filters.eq(uuid.toString())).singleOrNull()?.toTask(propertySerializerFactory) + } } data class TaskMongodb( diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt index e2ed5f2..f8c0437 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt @@ -22,11 +22,14 @@ import java.util.* /** * @param attempt キューされた時点での試行回数より1多い + * @param isActive trueならアサイン可能 falseならアサイン済みかタイムアウト等で無効 */ data class QueuedTask( val attempt: Int, val queuedAt: Instant, val task: Task, + val isActive: Boolean, + val timeoutAt: Instant?, val assignedConsumer: UUID?, - val assignedAt:Instant? + val assignedAt: Instant? ) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt index 3de8800..9f3c12a 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt @@ -28,7 +28,7 @@ interface QueuedTaskRepository { */ suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask - fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks:List,limit:Int): Flow + fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List, limit: Int): Flow - fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow + fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt index 60159a2..18befd0 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt @@ -18,9 +18,12 @@ package dev.usbharu.owl.broker.domain.model.task import kotlinx.coroutines.flow.Flow import java.time.Instant +import java.util.* interface TaskRepository { suspend fun save(task: Task):Task fun findByNextRetryBefore(timestamp:Instant): Flow + + suspend fun findById(uuid: UUID): Task? } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt index 803c893..6aac7f3 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt @@ -55,7 +55,7 @@ class TaskPublishService( ) ) PublishedTask.newBuilder().setName(publishedTask.name).setId(publishedTask.id.toUUID()).build() - }catch (e:Error){ + } catch (e: Throwable) { logger.warn("exception ",e) throw StatusException(Status.INTERNAL) } diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt index ff12278..e8ff8b4 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt @@ -38,7 +38,7 @@ class AssignQueuedTaskDeciderImpl( val consumer = consumerRepository.findById(consumerId) ?: throw RecordNotFoundException("Consumer not found. id: $consumerId") emitAll( - queueStore.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( + queueStore.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority( consumer.tasks, numberOfConcurrent ).take(numberOfConcurrent) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt index 3a1669e..5102571 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt @@ -42,7 +42,7 @@ class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner { } private fun scanQueue(): Flow { - return queueStore.findByQueuedAtBeforeAndAssignedConsumerIsNull(Instant.now().minusSeconds(10)) + return queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10)) } } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt index 9fe6b1e..2630915 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt @@ -28,9 +28,9 @@ interface QueueStore { suspend fun dequeue(queuedTask: QueuedTask) suspend fun dequeueAll(queuedTaskList: List) - fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks: List, limit: Int): Flow + fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List, limit: Int): Flow - fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow + fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow } @Singleton @@ -51,15 +51,15 @@ class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : Q return queuedTaskList.forEach { dequeue(it) } } - override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( + override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority( tasks: List, limit: Int ): Flow { - return queuedTaskRepository.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks, limit) + return queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit) } - override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow { - return queuedTaskRepository.findByQueuedAtBeforeAndAssignedConsumerIsNull(instant) + override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow { + return queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant) } } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt index a809382..da6c139 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt @@ -28,33 +28,39 @@ import java.time.Instant import java.util.* interface QueuedTaskAssigner { - fun ready(consumerId: UUID,numberOfConcurrent:Int): Flow + fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow } @Singleton class QueuedTaskAssignerImpl( private val taskManagementService: TaskManagementService, private val queueStore: QueueStore -) : QueuedTaskAssigner{ +) : QueuedTaskAssigner { override fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow { return flow { taskManagementService.findAssignableTask(consumerId, numberOfConcurrent) .onEach { - val assignTask = assignTask(it, consumerId) + val assignTask = assignTask(it, consumerId) - if (assignTask != null) { - emit(assignTask) - } + if (assignTask != null) { + emit(assignTask) + } } .collect() } } - private suspend fun assignTask(queuedTask: QueuedTask,consumerId: UUID):QueuedTask?{ + private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? { return try { - val assignedTaskQueue = queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now()) - logger.trace("Try assign task: {} id: {} consumer: {}",queuedTask.task.name,queuedTask.task.id,consumerId) + val assignedTaskQueue = + queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false) + logger.trace( + "Try assign task: {} id: {} consumer: {}", + queuedTask.task.name, + queuedTask.task.id, + consumerId + ) queueStore.dequeue(assignedTaskQueue) @@ -72,7 +78,7 @@ class QueuedTaskAssignerImpl( } } - companion object{ + companion object { private val logger = LoggerFactory.getLogger(QueuedTaskAssignerImpl::class.java) } } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt index 1bbaae7..2cf574d 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -16,6 +16,7 @@ package dev.usbharu.owl.broker.service +import dev.usbharu.owl.broker.domain.exception.repository.RecordNotFoundException import dev.usbharu.owl.broker.domain.exception.service.TaskNotRegisterException import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.task.Task @@ -67,12 +68,7 @@ class TaskManagementServiceImpl( }, launch { queueFlow.onEach { - logger.warn( - "Queue timed out. name: {} id: {} attempt: {}", - it.task.name, - it.task.id, - it.attempt - ) + timeoutQueue(it) }.collect() } ).joinAll() @@ -90,6 +86,8 @@ class TaskManagementServiceImpl( task.attempt + 1, Instant.now(), task, + isActive = true, + timeoutAt = null, null, null ) @@ -98,7 +96,7 @@ class TaskManagementServiceImpl( ?: throw TaskNotRegisterException("Task ${task.name} not definition.") val copy = task.copy( nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy) - .nextRetry(Instant.now(), task.attempt) + .nextRetry(Instant.now(), queuedTask.attempt) ) taskRepository.save(copy) @@ -108,6 +106,26 @@ class TaskManagementServiceImpl( return queuedTask } + private suspend fun timeoutQueue(queuedTask: QueuedTask) { + val timeoutQueue = queuedTask.copy(isActive = false, timeoutAt = Instant.now()) + + queueStore.dequeue(timeoutQueue) + + + val task = taskRepository.findById(timeoutQueue.task.id) + ?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}") + val copy = task.copy(attempt = timeoutQueue.attempt) + + logger.warn( + "Queue timed out. name: {} id: {} attempt: {}", + timeoutQueue.task.name, + timeoutQueue.task.id, + timeoutQueue.attempt + ) + taskRepository.save(copy) + } + + companion object { private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java) } diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt index a1812c3..5b24922 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt @@ -54,7 +54,7 @@ class TaskPublishServiceImpl( ?: throw TaskNotRegisterException("Task ${publishTask.name} not definition.") val published = Instant.now() - val nextRetry = retryPolicyFactory.factory(definition.name).nextRetry(published,0) + val nextRetry = retryPolicyFactory.factory(definition.retryPolicy).nextRetry(published, 0) val task = Task( name = publishTask.name, diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt b/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt index 8745fc1..753746d 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt @@ -6,6 +6,6 @@ import kotlin.math.roundToLong class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy { override fun nextRetry(now: Instant, attempt: Int): Instant = - now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong())) + now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - 30) } \ No newline at end of file diff --git a/common/src/test/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicyTest.kt b/common/src/test/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicyTest.kt index 53ea7a1..c1a1dc2 100644 --- a/common/src/test/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicyTest.kt +++ b/common/src/test/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicyTest.kt @@ -25,12 +25,12 @@ class ExponentialRetryPolicyTest { fun exponential0() { val nextRetry = ExponentialRetryPolicy().nextRetry(Instant.ofEpochSecond(300), 0) - assertEquals(Instant.ofEpochSecond(330), nextRetry) + assertEquals(Instant.ofEpochSecond(300), nextRetry) } @Test fun exponential1() { val nextRetry = ExponentialRetryPolicy().nextRetry(Instant.ofEpochSecond(300), 1) - assertEquals(Instant.ofEpochSecond(360), nextRetry) + assertEquals(Instant.ofEpochSecond(330), nextRetry) } } \ No newline at end of file