From 1e57f1ef4b6bc2a99b88648ea5ea9f23200516b3 Mon Sep 17 00:00:00 2001 From: usbharu Date: Wed, 6 Mar 2024 18:27:28 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20MongoDB=E3=81=B8=E3=81=AE=E3=82=A2?= =?UTF-8?q?=E3=82=AF=E3=82=BB=E3=82=B9=E3=82=92IO=E3=82=B9=E3=83=AC?= =?UTF-8?q?=E3=83=83=E3=83=89=E3=81=A7=E8=A1=8C=E3=81=86=E3=82=88=E3=81=86?= =?UTF-8?q?=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mongodb/MongodbConsumerRepository.kt | 10 ++-- .../mongodb/MongodbProducerRepository.kt | 6 ++- .../mongodb/MongodbQueuedTaskRepository.kt | 51 +++++++++++-------- .../MongodbTaskDefinitionRepository.kt | 12 +++-- .../broker/mongodb/MongodbTaskRepository.kt | 9 ++-- 5 files changed, 52 insertions(+), 36 deletions(-) diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt index 1c34892..4320758 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt @@ -21,7 +21,9 @@ import com.mongodb.client.model.ReplaceOptions import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.consumer.Consumer import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.singleOrNull +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -32,13 +34,13 @@ import java.util.* class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository { private val collection = database.getCollection("consumers") - override suspend fun save(consumer: Consumer): Consumer { + override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) { collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true)) - return consumer + return@withContext consumer } - override suspend fun findById(id: UUID): Consumer? { - return collection.find(Filters.eq("_id", id.toString())).singleOrNull()?.toConsumer() + override suspend fun findById(id: UUID): Consumer? = withContext(Dispatchers.IO) { + return@withContext collection.find(Filters.eq("_id", id.toString())).singleOrNull()?.toConsumer() } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt index fe93cf8..3593e5f 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt @@ -21,6 +21,8 @@ import com.mongodb.client.model.ReplaceOptions import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.producer.Producer import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext import org.koin.core.annotation.Singleton import java.time.Instant import java.util.* @@ -30,13 +32,13 @@ class MongodbProducerRepository(database: MongoDatabase) : ProducerRepository { private val collection = database.getCollection("producers") - override suspend fun save(producer: Producer): Producer { + override suspend fun save(producer: Producer): Producer = withContext(Dispatchers.IO) { collection.replaceOne( Filters.eq("_id", producer.id.toString()), ProducerMongodb.of(producer), ReplaceOptions().upsert(true) ) - return producer + return@withContext producer } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt index a32e1fd..3420199 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -16,7 +16,6 @@ package dev.usbharu.owl.broker.mongodb -import com.mongodb.client.model.Filters import com.mongodb.client.model.Filters.* import com.mongodb.client.model.FindOneAndUpdateOptions import com.mongodb.client.model.ReplaceOptions @@ -28,8 +27,11 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -45,29 +47,34 @@ class MongodbQueuedTaskRepository( private val collection = database.getCollection("queued_task") override suspend fun save(queuedTask: QueuedTask): QueuedTask { - collection.replaceOne( - eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), - ReplaceOptions().upsert(true) - ) + withContext(Dispatchers.IO) { + collection.replaceOne( + eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), + ReplaceOptions().upsert(true) + ) + } return queuedTask } override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask { - val findOneAndUpdate = collection.findOneAndUpdate( - and( - eq("_id", id.toString()), - eq(QueuedTaskMongodb::assignedConsumer.name, null) - ), - listOf( - set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), - set(QueuedTaskMongodb::assignedAt.name, update.assignedAt) - ), - FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER) - ) - if (findOneAndUpdate == null) { - TODO() + return withContext(Dispatchers.IO) { + + val findOneAndUpdate = collection.findOneAndUpdate( + and( + eq("_id", id.toString()), + eq(QueuedTaskMongodb::assignedConsumer.name, null) + ), + listOf( + set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), + set(QueuedTaskMongodb::assignedAt.name, update.assignedAt) + ), + FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER) + ) + if (findOneAndUpdate == null) { + TODO() + } + findOneAndUpdate.toQueuedTask(propertySerializerFactory) } - return findOneAndUpdate.toQueuedTask(propertySerializerFactory) } override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( @@ -79,12 +86,12 @@ class MongodbQueuedTaskRepository( `in`("task.name", tasks), eq(QueuedTaskMongodb::assignedConsumer.name, null) ) - ).map { it.toQueuedTask(propertySerializerFactory) } + ).map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow { - return collection.find(Filters.lte(QueuedTaskMongodb::queuedAt.name, instant)) - .map { it.toQueuedTask(propertySerializerFactory) } + return collection.find(lte(QueuedTaskMongodb::queuedAt.name, instant)) + .map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt index a186cd5..ced2b19 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt @@ -21,7 +21,9 @@ import com.mongodb.client.model.ReplaceOptions import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.singleOrNull +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -31,21 +33,21 @@ import org.koin.core.annotation.Singleton class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionRepository { private val collection = database.getCollection("task_definition") - override suspend fun save(taskDefinition: TaskDefinition): TaskDefinition { + override suspend fun save(taskDefinition: TaskDefinition): TaskDefinition = withContext(Dispatchers.IO) { collection.replaceOne( Filters.eq("_id", taskDefinition.name), TaskDefinitionMongodb.of(taskDefinition), ReplaceOptions().upsert(true) ) - return taskDefinition + return@withContext taskDefinition } - override suspend fun deleteByName(name: String) { + override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) { collection.deleteOne(Filters.eq("_id",name)) } - override suspend fun findByName(name: String): TaskDefinition? { - return collection.find(Filters.eq("_id", name)).singleOrNull()?.toTaskDefinition() + override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) { + return@withContext collection.find(Filters.eq("_id", name)).singleOrNull()?.toTaskDefinition() } } diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 6fe26dd..1403ab5 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -23,8 +23,11 @@ import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.withContext import org.bson.BsonType import org.bson.codecs.pojo.annotations.BsonId import org.bson.codecs.pojo.annotations.BsonRepresentation @@ -37,17 +40,17 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali TaskRepository { private val collection = database.getCollection("tasks") - override suspend fun save(task: Task): Task { + override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) { collection.replaceOne( Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), ReplaceOptions().upsert(true) ) - return task + return@withContext task } override fun findByNextRetryBefore(timestamp: Instant): Flow { return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) - .map { it.toTask(propertySerializerFactory) } + .map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } }