feat: タスク完了時の処理を追加

This commit is contained in:
usbharu 2024-03-07 16:29:26 +09:00
parent 18ee65e237
commit 159dd943ec
8 changed files with 128 additions and 4 deletions

View File

@ -60,8 +60,13 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
}) })
} }
override fun findByNextRetryBefore(timestamp: Instant): Flow<Task> { override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task> {
return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) return collection.find(
Filters.and(
Filters.lte(TaskMongodb::nextRetry.name, timestamp),
Filters.eq(TaskMongodb::completedAt.name, null)
)
)
.map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO) .map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
} }

View File

@ -25,7 +25,9 @@ interface TaskRepository {
suspend fun saveAll(tasks:List<Task>) suspend fun saveAll(tasks:List<Task>)
fun findByNextRetryBefore(timestamp:Instant): Flow<Task> fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp:Instant): Flow<Task>
suspend fun findById(uuid: UUID): Task? suspend fun findById(uuid: UUID): Task?
suspend fun findByIdAndUpdate(id:UUID,task: Task)
} }

View File

@ -0,0 +1,28 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.model.taskresult
import dev.usbharu.owl.common.property.PropertyValue
import java.util.*
data class TaskResult(
val id: UUID,
val success: Boolean,
val attempt: Int,
val result: Map<String, PropertyValue<*>>,
val message: String
)

View File

@ -0,0 +1,21 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.model.taskresult
interface TaskResultRepository {
suspend fun save(taskResult: TaskResult):TaskResult
}

View File

@ -0,0 +1,53 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.interfaces.grpc
import com.google.protobuf.Empty
import dev.usbharu.owl.TaskResultOuterClass
import dev.usbharu.owl.TaskResultServiceGrpcKt
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.TaskManagementService
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
class TaskResultService(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val taskManagementService: TaskManagementService,
private val propertySerializerFactory: PropertySerializerFactory
) :
TaskResultServiceGrpcKt.TaskResultServiceCoroutineImplBase(coroutineContext) {
override suspend fun tasKResult(requests: Flow<TaskResultOuterClass.TaskResult>): Empty {
requests.onEach {
taskManagementService.queueProcessed(
TaskResult(
id = it.id.toUUID(),
success = it.success,
attempt = it.attempt,
result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap),
message = it.message
)
)
}.collect()
return Empty.getDefaultInstance()
}
}

View File

@ -22,6 +22,7 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.task.TaskRepository
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
@ -40,6 +41,8 @@ interface TaskManagementService {
suspend fun startManagement(coroutineScope: CoroutineScope) suspend fun startManagement(coroutineScope: CoroutineScope)
fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
suspend fun queueProcessed(taskResult: TaskResult)
} }
@Singleton @Singleton
@ -125,6 +128,13 @@ class TaskManagementServiceImpl(
taskRepository.save(copy) taskRepository.save(copy)
} }
override suspend fun queueProcessed(taskResult: TaskResult) {
val task = taskRepository.findById(taskResult.id)
?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}")
taskRepository.findByIdAndUpdate(taskResult.id,task.copy(completedAt = Instant.now()))
//todo タスク完了後の処理を書く
}
companion object { companion object {
private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java) private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java)

View File

@ -45,7 +45,7 @@ class TaskScannerImpl(private val taskRepository: TaskRepository) :
} }
private fun scanTask(): Flow<Task> { private fun scanTask(): Flow<Task> {
return taskRepository.findByNextRetryBefore(Instant.now()) return taskRepository.findByNextRetryBeforeAndCompletedAtIsNull(Instant.now())
} }
companion object { companion object {

View File

@ -10,4 +10,9 @@ message TaskResult {
bool success = 2; bool success = 2;
int32 attempt = 3; int32 attempt = 3;
map<string, string> result = 4; map<string, string> result = 4;
string message = 5;
}
service TaskResultService{
rpc tasKResult(stream TaskResult) returns (google.protobuf.Empty);
} }