diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt index 1c348921..43207580 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt @@ -21,7 +21,9 @@ import com.mongodb.client.model.ReplaceOptions import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.consumer.Consumer import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.singleOrNull +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -32,13 +34,13 @@ import java.util.* class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository { private val collection = database.getCollection("consumers") - override suspend fun save(consumer: Consumer): Consumer { + override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) { collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true)) - return consumer + return@withContext consumer } - override suspend fun findById(id: UUID): Consumer? { - return collection.find(Filters.eq("_id", id.toString())).singleOrNull()?.toConsumer() + override suspend fun findById(id: UUID): Consumer? = withContext(Dispatchers.IO) { + return@withContext collection.find(Filters.eq("_id", id.toString())).singleOrNull()?.toConsumer() } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt index fe93cf88..3593e5f8 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt @@ -21,6 +21,8 @@ import com.mongodb.client.model.ReplaceOptions import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.producer.Producer import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext import org.koin.core.annotation.Singleton import java.time.Instant import java.util.* @@ -30,13 +32,13 @@ class MongodbProducerRepository(database: MongoDatabase) : ProducerRepository { private val collection = database.getCollection("producers") - override suspend fun save(producer: Producer): Producer { + override suspend fun save(producer: Producer): Producer = withContext(Dispatchers.IO) { collection.replaceOne( Filters.eq("_id", producer.id.toString()), ProducerMongodb.of(producer), ReplaceOptions().upsert(true) ) - return producer + return@withContext producer } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt index a32e1fd6..34201994 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -16,7 +16,6 @@ package dev.usbharu.owl.broker.mongodb -import com.mongodb.client.model.Filters import com.mongodb.client.model.Filters.* import com.mongodb.client.model.FindOneAndUpdateOptions import com.mongodb.client.model.ReplaceOptions @@ -28,8 +27,11 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -45,29 +47,34 @@ class MongodbQueuedTaskRepository( private val collection = database.getCollection("queued_task") override suspend fun save(queuedTask: QueuedTask): QueuedTask { - collection.replaceOne( - eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), - ReplaceOptions().upsert(true) - ) + withContext(Dispatchers.IO) { + collection.replaceOne( + eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), + ReplaceOptions().upsert(true) + ) + } return queuedTask } override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask { - val findOneAndUpdate = collection.findOneAndUpdate( - and( - eq("_id", id.toString()), - eq(QueuedTaskMongodb::assignedConsumer.name, null) - ), - listOf( - set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), - set(QueuedTaskMongodb::assignedAt.name, update.assignedAt) - ), - FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER) - ) - if (findOneAndUpdate == null) { - TODO() + return withContext(Dispatchers.IO) { + + val findOneAndUpdate = collection.findOneAndUpdate( + and( + eq("_id", id.toString()), + eq(QueuedTaskMongodb::assignedConsumer.name, null) + ), + listOf( + set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), + set(QueuedTaskMongodb::assignedAt.name, update.assignedAt) + ), + FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER) + ) + if (findOneAndUpdate == null) { + TODO() + } + findOneAndUpdate.toQueuedTask(propertySerializerFactory) } - return findOneAndUpdate.toQueuedTask(propertySerializerFactory) } override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( @@ -79,12 +86,12 @@ class MongodbQueuedTaskRepository( `in`("task.name", tasks), eq(QueuedTaskMongodb::assignedConsumer.name, null) ) - ).map { it.toQueuedTask(propertySerializerFactory) } + ).map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow { - return collection.find(Filters.lte(QueuedTaskMongodb::queuedAt.name, instant)) - .map { it.toQueuedTask(propertySerializerFactory) } + return collection.find(lte(QueuedTaskMongodb::queuedAt.name, instant)) + .map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt index a186cd52..ced2b19a 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt @@ -21,7 +21,9 @@ import com.mongodb.client.model.ReplaceOptions import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.singleOrNull +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -31,21 +33,21 @@ import org.koin.core.annotation.Singleton class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionRepository { private val collection = database.getCollection("task_definition") - override suspend fun save(taskDefinition: TaskDefinition): TaskDefinition { + override suspend fun save(taskDefinition: TaskDefinition): TaskDefinition = withContext(Dispatchers.IO) { collection.replaceOne( Filters.eq("_id", taskDefinition.name), TaskDefinitionMongodb.of(taskDefinition), ReplaceOptions().upsert(true) ) - return taskDefinition + return@withContext taskDefinition } - override suspend fun deleteByName(name: String) { + override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) { collection.deleteOne(Filters.eq("_id",name)) } - override suspend fun findByName(name: String): TaskDefinition? { - return collection.find(Filters.eq("_id", name)).singleOrNull()?.toTaskDefinition() + override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) { + return@withContext collection.find(Filters.eq("_id", name)).singleOrNull()?.toTaskDefinition() } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 6fe26dde..1403ab5a 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -23,8 +23,11 @@ import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -37,17 +40,17 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali TaskRepository { private val collection = database.getCollection("tasks") - override suspend fun save(task: Task): Task { + override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) { collection.replaceOne( Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), ReplaceOptions().upsert(true) ) - return task + return@withContext task } override fun findByNextRetryBefore(timestamp: Instant): Flow { return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) - .map { it.toTask(propertySerializerFactory) } + .map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } }