diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 6840c41..875c420 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -17,6 +17,7 @@ package dev.usbharu.owl.broker.mongodb import com.mongodb.client.model.Filters +import com.mongodb.client.model.ReplaceOneModel import com.mongodb.client.model.ReplaceOptions import com.mongodb.kotlin.client.coroutine.MongoDatabase import dev.usbharu.owl.broker.domain.model.task.Task @@ -49,6 +50,16 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali return@withContext task } + override suspend fun saveAll(tasks: List): Unit = withContext(Dispatchers.IO) { + collection.bulkWrite(tasks.map { + ReplaceOneModel( + Filters.eq(it.id.toString()), + TaskMongodb.of(propertySerializerFactory, it), + ReplaceOptions().upsert(true) + ) + }) + } + override fun findByNextRetryBefore(timestamp: Instant): Flow { return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) .map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt index 18befd0..38e001b 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt @@ -23,6 +23,8 @@ import java.util.* interface TaskRepository { suspend fun save(task: Task):Task + suspend fun saveAll(tasks:List) + fun findByNextRetryBefore(timestamp:Instant): Flow suspend fun findById(uuid: UUID): Task? diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt index 6aac7f3..e305cb4 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt @@ -18,6 +18,7 @@ package dev.usbharu.owl.broker.interfaces.grpc import dev.usbharu.owl.PublishTaskOuterClass import dev.usbharu.owl.PublishTaskOuterClass.PublishedTask +import dev.usbharu.owl.PublishTaskOuterClass.PublishedTasks import dev.usbharu.owl.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.service.PublishTask @@ -56,14 +57,28 @@ class TaskPublishService( ) PublishedTask.newBuilder().setName(publishedTask.name).setId(publishedTask.id.toUUID()).build() } catch (e: Throwable) { - logger.warn("exception ",e) + logger.warn("exception ", e) throw StatusException(Status.INTERNAL) } - - } - companion object{ - private val logger = LoggerFactory.getLogger(dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService::class.java) + override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishTaskOuterClass.PublishedTasks { + + val tasks = request.propertiesArrayList.map { + PublishTask( + request.name, + request.producerId.toUUID(), + PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) + ) + } + + val publishTasks = taskPublishService.publishTasks(tasks) + + return PublishedTasks.newBuilder().setName(request.name).addAllId(publishTasks.map { it.id.toUUID() }).build() + } + + companion object { + private val logger = + LoggerFactory.getLogger(dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService::class.java) } } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt index 5b24922..8446bf4 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt @@ -28,6 +28,7 @@ import java.util.* interface TaskPublishService { suspend fun publishTask(publishTask: PublishTask): PublishedTask + suspend fun publishTasks(list: List): List } data class PublishTask( @@ -44,11 +45,11 @@ data class PublishedTask( @Singleton class TaskPublishServiceImpl( private val taskRepository: TaskRepository, - private val taskDefinitionRepository:TaskDefinitionRepository, + private val taskDefinitionRepository: TaskDefinitionRepository, private val retryPolicyFactory: RetryPolicyFactory ) : TaskPublishService { override suspend fun publishTask(publishTask: PublishTask): PublishedTask { - val id = UUID.randomUUID() + val id = UUID.randomUUID() val definition = taskDefinitionRepository.findByName(publishTask.name) ?: throw TaskNotRegisterException("Task ${publishTask.name} not definition.") @@ -77,6 +78,37 @@ class TaskPublishServiceImpl( ) } + override suspend fun publishTasks(list: List): List { + + val first = list.first() + + val definition = taskDefinitionRepository.findByName(first.name) + ?: throw TaskNotRegisterException("Task ${first.name} not definition.") + + val published = Instant.now() + + val nextRetry = retryPolicyFactory.factory(definition.retryPolicy).nextRetry(published, 0) + + val tasks = list.map { + Task( + it.name, + UUID.randomUUID(), + first.producerId, + published, + nextRetry, + null, + 0, + it.properties + ) + } + + taskRepository.saveAll(tasks) + + logger.debug("Published {} tasks. name: {}", tasks.size, first.name) + + return tasks.map { PublishedTask(it.name, it.id) } + } + companion object { private val logger = LoggerFactory.getLogger(TaskPublishServiceImpl::class.java) } diff --git a/broker/src/main/proto/publish_task.proto b/broker/src/main/proto/publish_task.proto index 447a4bc..9194077 100644 --- a/broker/src/main/proto/publish_task.proto +++ b/broker/src/main/proto/publish_task.proto @@ -15,11 +15,28 @@ message PublishTask { UUID producer_id = 4; } +message Properties { + map properties = 1; +} + +message PublishTasks { + string name = 1; + google.protobuf.Timestamp publishedAt = 2; + repeated Properties propertiesArray = 3; + UUID producer_id = 4; +} + message PublishedTask { string name = 1; UUID id = 2; } +message PublishedTasks { + string name = 1; + repeated UUID id = 2; +} + service TaskPublishService { rpc publishTask (PublishTask) returns (PublishedTask); + rpc publishTasks(PublishTasks) returns (PublishedTasks); }