From e9277db29fa815e4f6cfdf995397ae6214a5956b Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Sat, 9 Mar 2024 13:36:16 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Producer=E3=81=AB=E3=82=BF=E3=82=B9?= =?UTF-8?q?=E3=82=AF=E5=AE=8C=E4=BA=86=E3=82=92=E9=80=9A=E7=9F=A5=E3=81=99?= =?UTF-8?q?=E3=82=8B=E6=96=B9=E6=B3=95=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/mongodb/MongodbTaskRepository.kt | 13 +++ .../mongodb/MongodbTaskResultRepository.kt | 93 +++++++++++++++++++ .../owl/broker/OwlBrokerApplication.kt | 4 +- .../domain/model/task/TaskRepository.kt | 2 + .../domain/model/taskresult/TaskResult.kt | 1 + .../model/taskresult/TaskResultRepository.kt | 4 + .../interfaces/grpc/TaskResultService.kt | 4 +- .../grpc/TaskResultSubscribeService.kt | 62 +++++++++++++ .../broker/service/TaskManagementService.kt | 61 +++++++++--- .../usbharu/owl/broker/service/TaskResults.kt | 28 ++++++ .../src/main/proto/task_result_producer.proto | 15 +++ 11 files changed, 274 insertions(+), 13 deletions(-) create mode 100644 broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt create mode 100644 broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt create mode 100644 broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt create mode 100644 broker/src/main/proto/task_result_producer.proto diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 836007c..2d7215a 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -73,6 +73,19 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali override suspend fun findById(uuid: UUID): Task? = withContext(Dispatchers.IO) { collection.find(Filters.eq(uuid.toString())).singleOrNull()?.toTask(propertySerializerFactory) } + + override suspend fun findByIdAndUpdate(id: UUID, task: Task) { + collection.replaceOne( + Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), + ReplaceOptions().upsert(false) + ) + } + + override suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow { + return collection + .find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString())) + .map { it.toTask(propertySerializerFactory) } + } } data class TaskMongodb( diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt new file mode 100644 index 0000000..ed000fe --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import com.mongodb.client.model.Filters +import com.mongodb.client.model.ReplaceOptions +import com.mongodb.kotlin.client.coroutine.MongoDatabase +import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult +import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.withContext +import org.bson.BsonType +import org.bson.codecs.pojo.annotations.BsonId +import org.bson.codecs.pojo.annotations.BsonRepresentation +import org.koin.core.annotation.Singleton +import java.util.* + +@Singleton +class MongodbTaskResultRepository( + database: MongoDatabase, + private val propertySerializerFactory: PropertySerializerFactory +) : TaskResultRepository { + + private val collection = database.getCollection("task_results") + override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) { + collection.replaceOne( + Filters.eq(taskResult.id.toString()), TaskResultMongodb.of(propertySerializerFactory, taskResult), + ReplaceOptions().upsert(true) + ) + return@withContext taskResult + } + + override fun findByTaskId(id: UUID): Flow { + return collection.find(Filters.eq(id.toString())).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO) + } +} + +data class TaskResultMongodb( + @BsonId + @BsonRepresentation(BsonType.STRING) + val id: String, + val taskId: String, + val success: Boolean, + val attempt: Int, + val result: Map, + val message: String +) { + + fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult { + return TaskResult( + UUID.fromString(id), + UUID.fromString(taskId), + success, + attempt, + PropertySerializeUtils.deserialize(propertySerializerFactory, result), + message + ) + } + + companion object { + fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb { + return TaskResultMongodb( + taskResult.id.toString(), + taskResult.taskId.toString(), + taskResult.success, + taskResult.attempt, + PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result), + taskResult.message + ) + + } + + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt index 92c203a..66696f2 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt @@ -33,7 +33,8 @@ class OwlBrokerApplication( private val producerService: ProducerService, private val subscribeTaskService: SubscribeTaskService, private val taskPublishService: TaskPublishService, - private val taskManagementService: TaskManagementService + private val taskManagementService: TaskManagementService, + private val taskResultSubscribeService: TaskResultSubscribeService ) { private lateinit var server: Server @@ -45,6 +46,7 @@ class OwlBrokerApplication( .addService(producerService) .addService(subscribeTaskService) .addService(taskPublishService) + .addService(taskResultSubscribeService) .build() server.start() diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt index 474d41f..009dea2 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt @@ -30,4 +30,6 @@ interface TaskRepository { suspend fun findById(uuid: UUID): Task? suspend fun findByIdAndUpdate(id:UUID,task: Task) + + suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId:UUID):Flow } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt index e727ced..b024d04 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt @@ -21,6 +21,7 @@ import java.util.* data class TaskResult( val id: UUID, + val taskId:UUID, val success: Boolean, val attempt: Int, val result: Map>, diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt index d58072a..4118cfe 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt @@ -16,6 +16,10 @@ package dev.usbharu.owl.broker.domain.model.taskresult +import kotlinx.coroutines.flow.Flow +import java.util.* + interface TaskResultRepository { suspend fun save(taskResult: TaskResult):TaskResult + fun findByTaskId(id:UUID): Flow } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt index d73ac89..613480b 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt @@ -27,6 +27,7 @@ import dev.usbharu.owl.common.property.PropertySerializerFactory import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach +import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -40,7 +41,8 @@ class TaskResultService( requests.onEach { taskManagementService.queueProcessed( TaskResult( - id = it.id.toUUID(), + id = UUID.randomUUID(), + taskId = it.id.toUUID(), success = it.success, attempt = it.attempt, result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap), diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt new file mode 100644 index 0000000..9b93081 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.interfaces.grpc + +import TaskResultProducer.TaskResults +import TaskResultSubscribeServiceGrpcKt +import dev.usbharu.owl.Uuid +import dev.usbharu.owl.broker.external.toUUID +import dev.usbharu.owl.broker.service.TaskManagementService +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import dev.usbharu.owl.taskResult +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import org.koin.core.annotation.Singleton +import taskResults +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +@Singleton +class TaskResultSubscribeService( + private val taskManagementService: TaskManagementService, + private val propertySerializerFactory: PropertySerializerFactory, + coroutineContext: CoroutineContext = EmptyCoroutineContext +) : + TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) { + override fun subscribe(request: Uuid.UUID): Flow { + return taskManagementService + .subscribeResult(request.toUUID()) + .map { + taskResults { + id = it.id.toUUID() + name = it.name + attempt = it.attempt + success = it.success + results.addAll(it.results.map { + taskResult { + id = it.taskId.toUUID() + success = it.success + attempt = it.attempt + result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result)) + message = it.message + } + }) + } + } + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt index 36abe9c..4cd78e5 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -23,14 +23,9 @@ import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch +import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import org.koin.core.annotation.Singleton import org.slf4j.LoggerFactory import java.time.Instant @@ -43,6 +38,8 @@ interface TaskManagementService { fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow suspend fun queueProcessed(taskResult: TaskResult) + + fun subscribeResult(producerId: UUID): Flow } @Singleton @@ -53,7 +50,8 @@ class TaskManagementServiceImpl( private val assignQueuedTaskDecider: AssignQueuedTaskDecider, private val retryPolicyFactory: RetryPolicyFactory, private val taskRepository: TaskRepository, - private val queueScanner: QueueScanner + private val queueScanner: QueueScanner, + private val taskResultRepository: TaskResultRepository ) : TaskManagementService { private var taskFlow: Flow = flowOf() @@ -132,8 +130,49 @@ class TaskManagementServiceImpl( val task = taskRepository.findById(taskResult.id) ?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}") - taskRepository.findByIdAndUpdate(taskResult.id,task.copy(completedAt = Instant.now())) -//todo タスク完了後の処理を書く + val taskDefinition = taskDefinitionRepository.findByName(task.name) + ?: throw TaskNotRegisterException("Task ${task.name} not definition.") + + val completedAt = if (taskResult.success) { + Instant.now() + } else if (taskResult.attempt >= taskDefinition.maxRetry) { + Instant.now() + } else { + null + } + + taskResultRepository.save(taskResult) + + taskRepository.findByIdAndUpdate( + taskResult.id, + task.copy(completedAt = completedAt, attempt = taskResult.attempt) + ) + + } + + override fun subscribeResult(producerId: UUID): Flow { + return flow { + + while (currentCoroutineContext().isActive) { + taskRepository + .findByPublishProducerIdAndCompletedAtIsNotNull(producerId) + .onEach { + val results = taskResultRepository.findByTaskId(it.id).toList() + emit( + TaskResults( + it.name, + it.id, + results.any { it.success }, + it.attempt, + results + ) + ) + } + delay(500) + } + + } + } companion object { diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt new file mode 100644 index 0000000..a61dbb0 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult +import java.util.* + +data class TaskResults( + val name:String, + val id:UUID, + val success:Boolean, + val attempt:Int, + val results: List +) diff --git a/broker/src/main/proto/task_result_producer.proto b/broker/src/main/proto/task_result_producer.proto new file mode 100644 index 0000000..8ab59a0 --- /dev/null +++ b/broker/src/main/proto/task_result_producer.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +import "uuid.proto"; +import "task_result.proto"; + +message TaskResults { + string name = 1; + UUID id = 2; + bool success = 3; + int32 attempt = 4; + repeated TaskResult results = 5; +} + +service TaskResultSubscribeService { + rpc subscribe(UUID) returns (stream TaskResults); +} \ No newline at end of file