diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt index c44cf96..a32e1fd 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -16,6 +16,7 @@ package dev.usbharu.owl.broker.mongodb +import com.mongodb.client.model.Filters import com.mongodb.client.model.Filters.* import com.mongodb.client.model.FindOneAndUpdateOptions import com.mongodb.client.model.ReplaceOptions @@ -24,6 +25,8 @@ import com.mongodb.client.model.Updates.set import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository +import dev.usbharu.owl.broker.domain.model.task.Task +import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map @@ -35,12 +38,15 @@ import java.time.Instant import java.util.* @Singleton -class MongodbQueuedTaskRepository(private val propertySerializerFactory: PropertySerializerFactory,database: MongoDatabase) : QueuedTaskRepository { +class MongodbQueuedTaskRepository( + private val propertySerializerFactory: PropertySerializerFactory, + database: MongoDatabase +) : QueuedTaskRepository { private val collection = database.getCollection("queued_task") override suspend fun save(queuedTask: QueuedTask): QueuedTask { collection.replaceOne( - eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory,queuedTask), + eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), ReplaceOptions().upsert(true) ) return queuedTask @@ -75,6 +81,11 @@ class MongodbQueuedTaskRepository(private val propertySerializerFactory: Propert ) ).map { it.toQueuedTask(propertySerializerFactory) } } + + override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow { + return collection.find(Filters.lte(QueuedTaskMongodb::queuedAt.name, instant)) + .map { it.toQueuedTask(propertySerializerFactory) } + } } data class QueuedTaskMongodb( @@ -93,16 +104,55 @@ data class QueuedTaskMongodb( attempt, queuedAt, task.toTask(propertySerializerFactory), - UUID.fromString(assignedConsumer), + assignedConsumer?.let { UUID.fromString(it) }, assignedAt ) } + data class TaskMongodb( + val name: String, + val id: String, + val publishProducerId: String, + val publishedAt: Instant, + val nextRetry: Instant, + val completedAt: Instant?, + val attempt: Int, + val properties: Map + ) { + + fun toTask(propertySerializerFactory: PropertySerializerFactory): Task { + return Task( + name = name, + id = UUID.fromString(id), + publishProducerId = UUID.fromString(publishProducerId), + publishedAt = publishedAt, + nextRetry = nextRetry, + completedAt = completedAt, + attempt = attempt, + properties = PropertySerializeUtils.deserialize(propertySerializerFactory, properties) + ) + } + + companion object { + fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb { + return TaskMongodb( + task.name, + task.id.toString(), + task.publishProducerId.toString(), + task.publishedAt, + task.nextRetry, + task.completedAt, + task.attempt, + PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) + ) + } + } + } companion object { - fun of(propertySerializerFactory: PropertySerializerFactory,queuedTask: QueuedTask): QueuedTaskMongodb { + fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb { return QueuedTaskMongodb( queuedTask.task.id.toString(), - TaskMongodb.of(propertySerializerFactory,queuedTask.task), + TaskMongodb.of(propertySerializerFactory, queuedTask.task), queuedTask.attempt, queuedTask.queuedAt, queuedTask.assignedConsumer?.toString(), diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 9dd5b4e..6fe26dd 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -25,6 +25,9 @@ import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map +import org.bson.BsonType +import org.bson.codecs.pojo.annotations.BsonId +import org.bson.codecs.pojo.annotations.BsonRepresentation import org.koin.core.annotation.Singleton import java.time.Instant import java.util.* @@ -50,6 +53,8 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali data class TaskMongodb( val name: String, + @BsonId + @BsonRepresentation(BsonType.STRING) val id: String, val publishProducerId: String, val publishedAt: Instant, diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt index 092ffc6..2dd9e40 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt @@ -18,6 +18,7 @@ package dev.usbharu.owl.broker import dev.usbharu.owl.broker.service.DefaultRetryPolicyFactory import dev.usbharu.owl.broker.service.RetryPolicyFactory +import dev.usbharu.owl.common.retry.ExponentialRetryPolicy import kotlinx.coroutines.runBlocking import org.koin.core.context.startKoin import org.koin.dsl.module @@ -40,7 +41,7 @@ fun main() { val module = module { single { - DefaultRetryPolicyFactory(emptyMap()) + DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy())) } } modules(module, defaultModule, moduleContext.module()) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt index a6cc617..92c203a 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt @@ -55,7 +55,7 @@ class OwlBrokerApplication( ) return coroutineScope.launch { - taskManagementService.startManagement() + taskManagementService.startManagement(this) } } diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt index 049b92a..3de8800 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt @@ -17,6 +17,7 @@ package dev.usbharu.owl.broker.domain.model.queuedtask import kotlinx.coroutines.flow.Flow +import java.time.Instant import java.util.* interface QueuedTaskRepository { @@ -28,4 +29,6 @@ interface QueuedTaskRepository { suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks:List,limit:Int): Flow + + fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt new file mode 100644 index 0000000..3a1669e --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.isActive +import org.koin.core.annotation.Singleton +import java.time.Instant + +interface QueueScanner { + fun startScan(): Flow +} + +@Singleton +class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner { + override fun startScan(): Flow { + return flow { + while (currentCoroutineContext().isActive) { + emitAll(scanQueue()) + delay(1000) + } + } + } + + private fun scanQueue(): Flow { + return queueStore.findByQueuedAtBeforeAndAssignedConsumerIsNull(Instant.now().minusSeconds(10)) + } + +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt index bb49f6b..9fe6b1e 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt @@ -20,6 +20,7 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository import kotlinx.coroutines.flow.Flow import org.koin.core.annotation.Singleton +import java.time.Instant interface QueueStore { suspend fun enqueue(queuedTask: QueuedTask) @@ -28,6 +29,8 @@ interface QueueStore { suspend fun dequeue(queuedTask: QueuedTask) suspend fun dequeueAll(queuedTaskList: List) fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks: List, limit: Int): Flow + + fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow } @Singleton @@ -55,4 +58,8 @@ class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : Q return queuedTaskRepository.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks, limit) } + override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow { + return queuedTaskRepository.findByQueuedAtBeforeAndAssignedConsumerIsNull(instant) + } + } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt index 6b3af78..1bbaae7 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -21,10 +21,14 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch import org.koin.core.annotation.Singleton import org.slf4j.LoggerFactory import java.time.Instant @@ -33,7 +37,7 @@ import java.util.* interface TaskManagementService { - suspend fun startManagement() + suspend fun startManagement(coroutineScope: CoroutineScope) fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow } @@ -44,17 +48,35 @@ class TaskManagementServiceImpl( private val taskDefinitionRepository: TaskDefinitionRepository, private val assignQueuedTaskDecider: AssignQueuedTaskDecider, private val retryPolicyFactory: RetryPolicyFactory, - private val taskRepository: TaskRepository + private val taskRepository: TaskRepository, + private val queueScanner: QueueScanner ) : TaskManagementService { - private var flow: Flow = flowOf() - override suspend fun startManagement() { - flow = taskScanner.startScan() - - flow.onEach { - enqueueTask(it) - }.collect() + private var taskFlow: Flow = flowOf() + private var queueFlow: Flow = flowOf() + override suspend fun startManagement(coroutineScope: CoroutineScope) { + taskFlow = taskScanner.startScan() + queueFlow = queueScanner.startScan() + coroutineScope { + listOf( + launch { + taskFlow.onEach { + enqueueTask(it) + }.collect() + }, + launch { + queueFlow.onEach { + logger.warn( + "Queue timed out. name: {} id: {} attempt: {}", + it.task.name, + it.task.id, + it.attempt + ) + }.collect() + } + ).joinAll() + } } diff --git a/broker/src/test/kotlin/dev/usbharu/owl/broker/service/TaskManagementServiceImplTest.kt b/broker/src/test/kotlin/dev/usbharu/owl/broker/service/TaskManagementServiceImplTest.kt deleted file mode 100644 index 6d968be..0000000 --- a/broker/src/test/kotlin/dev/usbharu/owl/broker/service/TaskManagementServiceImplTest.kt +++ /dev/null @@ -1,13 +0,0 @@ -package dev.usbharu.owl.broker.service - -import org.junit.jupiter.api.Test - -class TaskManagementServiceImplTest { - - @Test - fun findAssignableTask() { - val taskManagementServiceImpl = TaskManagementServiceImpl() - - Thread.sleep(10000) - } -} \ No newline at end of file