feat: タスク完了時の処理を追加
This commit is contained in:
parent
138edd5cbf
commit
dab52d25a4
|
@ -60,8 +60,13 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
|
|||
})
|
||||
}
|
||||
|
||||
override fun findByNextRetryBefore(timestamp: Instant): Flow<Task> {
|
||||
return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp))
|
||||
override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task> {
|
||||
return collection.find(
|
||||
Filters.and(
|
||||
Filters.lte(TaskMongodb::nextRetry.name, timestamp),
|
||||
Filters.eq(TaskMongodb::completedAt.name, null)
|
||||
)
|
||||
)
|
||||
.map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,9 @@ interface TaskRepository {
|
|||
|
||||
suspend fun saveAll(tasks:List<Task>)
|
||||
|
||||
fun findByNextRetryBefore(timestamp:Instant): Flow<Task>
|
||||
fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp:Instant): Flow<Task>
|
||||
|
||||
suspend fun findById(uuid: UUID): Task?
|
||||
|
||||
suspend fun findByIdAndUpdate(id:UUID,task: Task)
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright (C) 2024 usbharu
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dev.usbharu.owl.broker.domain.model.taskresult
|
||||
|
||||
import dev.usbharu.owl.common.property.PropertyValue
|
||||
import java.util.*
|
||||
|
||||
data class TaskResult(
|
||||
val id: UUID,
|
||||
val success: Boolean,
|
||||
val attempt: Int,
|
||||
val result: Map<String, PropertyValue<*>>,
|
||||
val message: String
|
||||
)
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright (C) 2024 usbharu
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dev.usbharu.owl.broker.domain.model.taskresult
|
||||
|
||||
interface TaskResultRepository {
|
||||
suspend fun save(taskResult: TaskResult):TaskResult
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright (C) 2024 usbharu
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dev.usbharu.owl.broker.interfaces.grpc
|
||||
|
||||
import com.google.protobuf.Empty
|
||||
import dev.usbharu.owl.TaskResultOuterClass
|
||||
import dev.usbharu.owl.TaskResultServiceGrpcKt
|
||||
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
|
||||
import dev.usbharu.owl.broker.external.toUUID
|
||||
import dev.usbharu.owl.broker.service.TaskManagementService
|
||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
class TaskResultService(
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
private val taskManagementService: TaskManagementService,
|
||||
private val propertySerializerFactory: PropertySerializerFactory
|
||||
) :
|
||||
TaskResultServiceGrpcKt.TaskResultServiceCoroutineImplBase(coroutineContext) {
|
||||
override suspend fun tasKResult(requests: Flow<TaskResultOuterClass.TaskResult>): Empty {
|
||||
requests.onEach {
|
||||
taskManagementService.queueProcessed(
|
||||
TaskResult(
|
||||
id = it.id.toUUID(),
|
||||
success = it.success,
|
||||
attempt = it.attempt,
|
||||
result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap),
|
||||
message = it.message
|
||||
)
|
||||
)
|
||||
}.collect()
|
||||
return Empty.getDefaultInstance()
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
|
|||
import dev.usbharu.owl.broker.domain.model.task.Task
|
||||
import dev.usbharu.owl.broker.domain.model.task.TaskRepository
|
||||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
|
||||
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
|
@ -40,6 +41,8 @@ interface TaskManagementService {
|
|||
|
||||
suspend fun startManagement(coroutineScope: CoroutineScope)
|
||||
fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
|
||||
|
||||
suspend fun queueProcessed(taskResult: TaskResult)
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
@ -125,6 +128,13 @@ class TaskManagementServiceImpl(
|
|||
taskRepository.save(copy)
|
||||
}
|
||||
|
||||
override suspend fun queueProcessed(taskResult: TaskResult) {
|
||||
val task = taskRepository.findById(taskResult.id)
|
||||
?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}")
|
||||
|
||||
taskRepository.findByIdAndUpdate(taskResult.id,task.copy(completedAt = Instant.now()))
|
||||
//todo タスク完了後の処理を書く
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java)
|
||||
|
|
|
@ -45,7 +45,7 @@ class TaskScannerImpl(private val taskRepository: TaskRepository) :
|
|||
}
|
||||
|
||||
private fun scanTask(): Flow<Task> {
|
||||
return taskRepository.findByNextRetryBefore(Instant.now())
|
||||
return taskRepository.findByNextRetryBeforeAndCompletedAtIsNull(Instant.now())
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
|
|
@ -10,4 +10,9 @@ message TaskResult {
|
|||
bool success = 2;
|
||||
int32 attempt = 3;
|
||||
map<string, string> result = 4;
|
||||
string message = 5;
|
||||
}
|
||||
|
||||
service TaskResultService{
|
||||
rpc tasKResult(stream TaskResult) returns (google.protobuf.Empty);
|
||||
}
|
Loading…
Reference in New Issue