diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 875c420..836007c 100644 --- a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -60,8 +60,13 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali }) } - override fun findByNextRetryBefore(timestamp: Instant): Flow { - return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) + override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow { + return collection.find( + Filters.and( + Filters.lte(TaskMongodb::nextRetry.name, timestamp), + Filters.eq(TaskMongodb::completedAt.name, null) + ) + ) .map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) } diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt index 38e001b..474d41f 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt @@ -25,7 +25,9 @@ interface TaskRepository { suspend fun saveAll(tasks:List) - fun findByNextRetryBefore(timestamp:Instant): Flow + fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp:Instant): Flow suspend fun findById(uuid: UUID): Task? + + suspend fun findByIdAndUpdate(id:UUID,task: Task) } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt new file mode 100644 index 0000000..e727ced --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.taskresult + +import dev.usbharu.owl.common.property.PropertyValue +import java.util.* + +data class TaskResult( + val id: UUID, + val success: Boolean, + val attempt: Int, + val result: Map>, + val message: String +) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt new file mode 100644 index 0000000..d58072a --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.taskresult + +interface TaskResultRepository { + suspend fun save(taskResult: TaskResult):TaskResult +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt new file mode 100644 index 0000000..d73ac89 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.interfaces.grpc + +import com.google.protobuf.Empty +import dev.usbharu.owl.TaskResultOuterClass +import dev.usbharu.owl.TaskResultServiceGrpcKt +import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult +import dev.usbharu.owl.broker.external.toUUID +import dev.usbharu.owl.broker.service.TaskManagementService +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +class TaskResultService( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + private val taskManagementService: TaskManagementService, + private val propertySerializerFactory: PropertySerializerFactory +) : + TaskResultServiceGrpcKt.TaskResultServiceCoroutineImplBase(coroutineContext) { + override suspend fun tasKResult(requests: Flow): Empty { + requests.onEach { + taskManagementService.queueProcessed( + TaskResult( + id = it.id.toUUID(), + success = it.success, + attempt = it.attempt, + result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap), + message = it.message + ) + ) + }.collect() + return Empty.getDefaultInstance() + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt index 2cf574d..36abe9c 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -22,6 +22,7 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow @@ -40,6 +41,8 @@ interface TaskManagementService { suspend fun startManagement(coroutineScope: CoroutineScope) fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow + + suspend fun queueProcessed(taskResult: TaskResult) } @Singleton @@ -125,6 +128,13 @@ class TaskManagementServiceImpl( taskRepository.save(copy) } + override suspend fun queueProcessed(taskResult: TaskResult) { + val task = taskRepository.findById(taskResult.id) + ?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}") + + taskRepository.findByIdAndUpdate(taskResult.id,task.copy(completedAt = Instant.now())) +//todo タスク完了後の処理を書く + } companion object { private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt index ff579e5..3204f40 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt @@ -45,7 +45,7 @@ class TaskScannerImpl(private val taskRepository: TaskRepository) : } private fun scanTask(): Flow { - return taskRepository.findByNextRetryBefore(Instant.now()) + return taskRepository.findByNextRetryBeforeAndCompletedAtIsNull(Instant.now()) } companion object { diff --git a/broker/src/main/proto/task_result.proto b/broker/src/main/proto/task_result.proto index 43a07ae..642f742 100644 --- a/broker/src/main/proto/task_result.proto +++ b/broker/src/main/proto/task_result.proto @@ -10,4 +10,9 @@ message TaskResult { bool success = 2; int32 attempt = 3; map result = 4; + string message = 5; +} + +service TaskResultService{ + rpc tasKResult(stream TaskResult) returns (google.protobuf.Empty); } \ No newline at end of file