feat: タスクをまとめて追加できるように

This commit is contained in:
usbharu 2024-03-07 12:15:07 +09:00
parent dcc1f582a9
commit ce888930cc
Signed by: usbharu
GPG Key ID: 6556747BF94EEBC8
5 changed files with 84 additions and 7 deletions

View File

@ -17,6 +17,7 @@
package dev.usbharu.owl.broker.mongodb package dev.usbharu.owl.broker.mongodb
import com.mongodb.client.model.Filters import com.mongodb.client.model.Filters
import com.mongodb.client.model.ReplaceOneModel
import com.mongodb.client.model.ReplaceOptions import com.mongodb.client.model.ReplaceOptions
import com.mongodb.kotlin.client.coroutine.MongoDatabase import com.mongodb.kotlin.client.coroutine.MongoDatabase
import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.Task
@ -49,6 +50,16 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
return@withContext task return@withContext task
} }
override suspend fun saveAll(tasks: List<Task>): Unit = withContext(Dispatchers.IO) {
collection.bulkWrite(tasks.map {
ReplaceOneModel(
Filters.eq(it.id.toString()),
TaskMongodb.of(propertySerializerFactory, it),
ReplaceOptions().upsert(true)
)
})
}
override fun findByNextRetryBefore(timestamp: Instant): Flow<Task> { override fun findByNextRetryBefore(timestamp: Instant): Flow<Task> {
return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp))
.map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) .map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)

View File

@ -23,6 +23,8 @@ import java.util.*
interface TaskRepository { interface TaskRepository {
suspend fun save(task: Task):Task suspend fun save(task: Task):Task
suspend fun saveAll(tasks:List<Task>)
fun findByNextRetryBefore(timestamp:Instant): Flow<Task> fun findByNextRetryBefore(timestamp:Instant): Flow<Task>
suspend fun findById(uuid: UUID): Task? suspend fun findById(uuid: UUID): Task?

View File

@ -18,6 +18,7 @@ package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.PublishTaskOuterClass import dev.usbharu.owl.PublishTaskOuterClass
import dev.usbharu.owl.PublishTaskOuterClass.PublishedTask import dev.usbharu.owl.PublishTaskOuterClass.PublishedTask
import dev.usbharu.owl.PublishTaskOuterClass.PublishedTasks
import dev.usbharu.owl.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase import dev.usbharu.owl.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase
import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.PublishTask import dev.usbharu.owl.broker.service.PublishTask
@ -59,11 +60,25 @@ class TaskPublishService(
logger.warn("exception ", e) logger.warn("exception ", e)
throw StatusException(Status.INTERNAL) throw StatusException(Status.INTERNAL)
} }
}
override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishTaskOuterClass.PublishedTasks {
val tasks = request.propertiesArrayList.map {
PublishTask(
request.name,
request.producerId.toUUID(),
PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap)
)
}
val publishTasks = taskPublishService.publishTasks(tasks)
return PublishedTasks.newBuilder().setName(request.name).addAllId(publishTasks.map { it.id.toUUID() }).build()
} }
companion object { companion object {
private val logger = LoggerFactory.getLogger(dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService::class.java) private val logger =
LoggerFactory.getLogger(dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService::class.java)
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.*
interface TaskPublishService { interface TaskPublishService {
suspend fun publishTask(publishTask: PublishTask): PublishedTask suspend fun publishTask(publishTask: PublishTask): PublishedTask
suspend fun publishTasks(list: List<PublishTask>): List<PublishedTask>
} }
data class PublishTask( data class PublishTask(
@ -77,6 +78,37 @@ class TaskPublishServiceImpl(
) )
} }
override suspend fun publishTasks(list: List<PublishTask>): List<PublishedTask> {
val first = list.first()
val definition = taskDefinitionRepository.findByName(first.name)
?: throw TaskNotRegisterException("Task ${first.name} not definition.")
val published = Instant.now()
val nextRetry = retryPolicyFactory.factory(definition.retryPolicy).nextRetry(published, 0)
val tasks = list.map {
Task(
it.name,
UUID.randomUUID(),
first.producerId,
published,
nextRetry,
null,
0,
it.properties
)
}
taskRepository.saveAll(tasks)
logger.debug("Published {} tasks. name: {}", tasks.size, first.name)
return tasks.map { PublishedTask(it.name, it.id) }
}
companion object { companion object {
private val logger = LoggerFactory.getLogger(TaskPublishServiceImpl::class.java) private val logger = LoggerFactory.getLogger(TaskPublishServiceImpl::class.java)
} }

View File

@ -15,11 +15,28 @@ message PublishTask {
UUID producer_id = 4; UUID producer_id = 4;
} }
message Properties {
map<string,string> properties = 1;
}
message PublishTasks {
string name = 1;
google.protobuf.Timestamp publishedAt = 2;
repeated Properties propertiesArray = 3;
UUID producer_id = 4;
}
message PublishedTask { message PublishedTask {
string name = 1; string name = 1;
UUID id = 2; UUID id = 2;
} }
message PublishedTasks {
string name = 1;
repeated UUID id = 2;
}
service TaskPublishService { service TaskPublishService {
rpc publishTask (PublishTask) returns (PublishedTask); rpc publishTask (PublishTask) returns (PublishedTask);
rpc publishTasks(PublishTasks) returns (PublishedTasks);
} }