feat: MongoDBへのアクセスをIOスレッドで行うように

This commit is contained in:
usbharu 2024-03-06 18:27:28 +09:00
parent 154e5efd57
commit 1e57f1ef4b
Signed by: usbharu
GPG Key ID: 8CB1087135660B8D
5 changed files with 52 additions and 36 deletions

View File

@ -21,7 +21,9 @@ import com.mongodb.client.model.ReplaceOptions
import com.mongodb.kotlin.client.coroutine.MongoDatabase
import dev.usbharu.owl.broker.domain.model.consumer.Consumer
import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.singleOrNull
import kotlinx.coroutines.withContext
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
import org.bson.codecs.pojo.annotations.BsonRepresentation
@ -32,13 +34,13 @@ import java.util.*
class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository {
private val collection = database.getCollection<ConsumerMongodb>("consumers")
override suspend fun save(consumer: Consumer): Consumer {
override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) {
collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true))
return consumer
return@withContext consumer
}
override suspend fun findById(id: UUID): Consumer? {
return collection.find(Filters.eq("_id", id.toString())).singleOrNull()?.toConsumer()
override suspend fun findById(id: UUID): Consumer? = withContext(Dispatchers.IO) {
return@withContext collection.find(Filters.eq("_id", id.toString())).singleOrNull()?.toConsumer()
}
}

View File

@ -21,6 +21,8 @@ import com.mongodb.client.model.ReplaceOptions
import com.mongodb.kotlin.client.coroutine.MongoDatabase
import dev.usbharu.owl.broker.domain.model.producer.Producer
import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.koin.core.annotation.Singleton
import java.time.Instant
import java.util.*
@ -30,13 +32,13 @@ class MongodbProducerRepository(database: MongoDatabase) : ProducerRepository {
private val collection = database.getCollection<ProducerMongodb>("producers")
override suspend fun save(producer: Producer): Producer {
override suspend fun save(producer: Producer): Producer = withContext(Dispatchers.IO) {
collection.replaceOne(
Filters.eq("_id", producer.id.toString()),
ProducerMongodb.of(producer),
ReplaceOptions().upsert(true)
)
return producer
return@withContext producer
}
}

View File

@ -16,7 +16,6 @@
package dev.usbharu.owl.broker.mongodb
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Filters.*
import com.mongodb.client.model.FindOneAndUpdateOptions
import com.mongodb.client.model.ReplaceOptions
@ -28,8 +27,11 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository
import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
import org.bson.codecs.pojo.annotations.BsonRepresentation
@ -45,29 +47,34 @@ class MongodbQueuedTaskRepository(
private val collection = database.getCollection<QueuedTaskMongodb>("queued_task")
override suspend fun save(queuedTask: QueuedTask): QueuedTask {
collection.replaceOne(
eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
ReplaceOptions().upsert(true)
)
withContext(Dispatchers.IO) {
collection.replaceOne(
eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
ReplaceOptions().upsert(true)
)
}
return queuedTask
}
override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask {
val findOneAndUpdate = collection.findOneAndUpdate(
and(
eq("_id", id.toString()),
eq(QueuedTaskMongodb::assignedConsumer.name, null)
),
listOf(
set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer),
set(QueuedTaskMongodb::assignedAt.name, update.assignedAt)
),
FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER)
)
if (findOneAndUpdate == null) {
TODO()
return withContext(Dispatchers.IO) {
val findOneAndUpdate = collection.findOneAndUpdate(
and(
eq("_id", id.toString()),
eq(QueuedTaskMongodb::assignedConsumer.name, null)
),
listOf(
set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer),
set(QueuedTaskMongodb::assignedAt.name, update.assignedAt)
),
FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER)
)
if (findOneAndUpdate == null) {
TODO()
}
findOneAndUpdate.toQueuedTask(propertySerializerFactory)
}
return findOneAndUpdate.toQueuedTask(propertySerializerFactory)
}
override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(
@ -79,12 +86,12 @@ class MongodbQueuedTaskRepository(
`in`("task.name", tasks),
eq(QueuedTaskMongodb::assignedConsumer.name, null)
)
).map { it.toQueuedTask(propertySerializerFactory) }
).map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask> {
return collection.find(Filters.lte(QueuedTaskMongodb::queuedAt.name, instant))
.map { it.toQueuedTask(propertySerializerFactory) }
return collection.find(lte(QueuedTaskMongodb::queuedAt.name, instant))
.map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
}

View File

@ -21,7 +21,9 @@ import com.mongodb.client.model.ReplaceOptions
import com.mongodb.kotlin.client.coroutine.MongoDatabase
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.singleOrNull
import kotlinx.coroutines.withContext
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
import org.bson.codecs.pojo.annotations.BsonRepresentation
@ -31,21 +33,21 @@ import org.koin.core.annotation.Singleton
class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionRepository {
private val collection = database.getCollection<TaskDefinitionMongodb>("task_definition")
override suspend fun save(taskDefinition: TaskDefinition): TaskDefinition {
override suspend fun save(taskDefinition: TaskDefinition): TaskDefinition = withContext(Dispatchers.IO) {
collection.replaceOne(
Filters.eq("_id", taskDefinition.name),
TaskDefinitionMongodb.of(taskDefinition),
ReplaceOptions().upsert(true)
)
return taskDefinition
return@withContext taskDefinition
}
override suspend fun deleteByName(name: String) {
override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) {
collection.deleteOne(Filters.eq("_id",name))
}
override suspend fun findByName(name: String): TaskDefinition? {
return collection.find(Filters.eq("_id", name)).singleOrNull()?.toTaskDefinition()
override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) {
return@withContext collection.find(Filters.eq("_id", name)).singleOrNull()?.toTaskDefinition()
}
}

View File

@ -23,8 +23,11 @@ import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.broker.domain.model.task.TaskRepository
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
import org.bson.codecs.pojo.annotations.BsonRepresentation
@ -37,17 +40,17 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
TaskRepository {
private val collection = database.getCollection<TaskMongodb>("tasks")
override suspend fun save(task: Task): Task {
override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) {
collection.replaceOne(
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task),
ReplaceOptions().upsert(true)
)
return task
return@withContext task
}
override fun findByNextRetryBefore(timestamp: Instant): Flow<Task> {
return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp))
.map { it.toTask(propertySerializerFactory) }
.map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
}