feat: Producerにタスク完了を通知する方法を追加

This commit is contained in:
usbharu 2024-03-09 13:36:16 +09:00
parent 159dd943ec
commit fbafee4e5e
11 changed files with 274 additions and 13 deletions

View File

@ -73,6 +73,19 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
override suspend fun findById(uuid: UUID): Task? = withContext(Dispatchers.IO) { override suspend fun findById(uuid: UUID): Task? = withContext(Dispatchers.IO) {
collection.find(Filters.eq(uuid.toString())).singleOrNull()?.toTask(propertySerializerFactory) collection.find(Filters.eq(uuid.toString())).singleOrNull()?.toTask(propertySerializerFactory)
} }
override suspend fun findByIdAndUpdate(id: UUID, task: Task) {
collection.replaceOne(
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task),
ReplaceOptions().upsert(false)
)
}
override suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> {
return collection
.find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString()))
.map { it.toTask(propertySerializerFactory) }
}
} }
data class TaskMongodb( data class TaskMongodb(

View File

@ -0,0 +1,93 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.mongodb
import com.mongodb.client.model.Filters
import com.mongodb.client.model.ReplaceOptions
import com.mongodb.kotlin.client.coroutine.MongoDatabase
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
import org.bson.codecs.pojo.annotations.BsonRepresentation
import org.koin.core.annotation.Singleton
import java.util.*
@Singleton
class MongodbTaskResultRepository(
database: MongoDatabase,
private val propertySerializerFactory: PropertySerializerFactory
) : TaskResultRepository {
private val collection = database.getCollection<TaskResultMongodb>("task_results")
override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) {
collection.replaceOne(
Filters.eq(taskResult.id.toString()), TaskResultMongodb.of(propertySerializerFactory, taskResult),
ReplaceOptions().upsert(true)
)
return@withContext taskResult
}
override fun findByTaskId(id: UUID): Flow<TaskResult> {
return collection.find(Filters.eq(id.toString())).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
}
data class TaskResultMongodb(
@BsonId
@BsonRepresentation(BsonType.STRING)
val id: String,
val taskId: String,
val success: Boolean,
val attempt: Int,
val result: Map<String, String>,
val message: String
) {
fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult {
return TaskResult(
UUID.fromString(id),
UUID.fromString(taskId),
success,
attempt,
PropertySerializeUtils.deserialize(propertySerializerFactory, result),
message
)
}
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb {
return TaskResultMongodb(
taskResult.id.toString(),
taskResult.taskId.toString(),
taskResult.success,
taskResult.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result),
taskResult.message
)
}
}
}

View File

@ -33,7 +33,8 @@ class OwlBrokerApplication(
private val producerService: ProducerService, private val producerService: ProducerService,
private val subscribeTaskService: SubscribeTaskService, private val subscribeTaskService: SubscribeTaskService,
private val taskPublishService: TaskPublishService, private val taskPublishService: TaskPublishService,
private val taskManagementService: TaskManagementService private val taskManagementService: TaskManagementService,
private val taskResultSubscribeService: TaskResultSubscribeService
) { ) {
private lateinit var server: Server private lateinit var server: Server
@ -45,6 +46,7 @@ class OwlBrokerApplication(
.addService(producerService) .addService(producerService)
.addService(subscribeTaskService) .addService(subscribeTaskService)
.addService(taskPublishService) .addService(taskPublishService)
.addService(taskResultSubscribeService)
.build() .build()
server.start() server.start()

View File

@ -30,4 +30,6 @@ interface TaskRepository {
suspend fun findById(uuid: UUID): Task? suspend fun findById(uuid: UUID): Task?
suspend fun findByIdAndUpdate(id:UUID,task: Task) suspend fun findByIdAndUpdate(id:UUID,task: Task)
suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId:UUID):Flow<Task>
} }

View File

@ -21,6 +21,7 @@ import java.util.*
data class TaskResult( data class TaskResult(
val id: UUID, val id: UUID,
val taskId:UUID,
val success: Boolean, val success: Boolean,
val attempt: Int, val attempt: Int,
val result: Map<String, PropertyValue<*>>, val result: Map<String, PropertyValue<*>>,

View File

@ -16,6 +16,10 @@
package dev.usbharu.owl.broker.domain.model.taskresult package dev.usbharu.owl.broker.domain.model.taskresult
import kotlinx.coroutines.flow.Flow
import java.util.*
interface TaskResultRepository { interface TaskResultRepository {
suspend fun save(taskResult: TaskResult):TaskResult suspend fun save(taskResult: TaskResult):TaskResult
fun findByTaskId(id:UUID): Flow<TaskResult>
} }

View File

@ -27,6 +27,7 @@ import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import java.util.*
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
@ -40,7 +41,8 @@ class TaskResultService(
requests.onEach { requests.onEach {
taskManagementService.queueProcessed( taskManagementService.queueProcessed(
TaskResult( TaskResult(
id = it.id.toUUID(), id = UUID.randomUUID(),
taskId = it.id.toUUID(),
success = it.success, success = it.success,
attempt = it.attempt, attempt = it.attempt,
result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap), result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap),

View File

@ -0,0 +1,62 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.interfaces.grpc
import TaskResultProducer.TaskResults
import TaskResultSubscribeServiceGrpcKt
import dev.usbharu.owl.Uuid
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.TaskManagementService
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.taskResult
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.koin.core.annotation.Singleton
import taskResults
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@Singleton
class TaskResultSubscribeService(
private val taskManagementService: TaskManagementService,
private val propertySerializerFactory: PropertySerializerFactory,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) :
TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) {
override fun subscribe(request: Uuid.UUID): Flow<TaskResults> {
return taskManagementService
.subscribeResult(request.toUUID())
.map {
taskResults {
id = it.id.toUUID()
name = it.name
attempt = it.attempt
success = it.success
results.addAll(it.results.map {
taskResult {
id = it.taskId.toUUID()
success = it.success
attempt = it.attempt
result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result))
message = it.message
}
})
}
}
}
}

View File

@ -23,14 +23,9 @@ import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.task.TaskRepository
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import kotlinx.coroutines.CoroutineScope import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.time.Instant import java.time.Instant
@ -43,6 +38,8 @@ interface TaskManagementService {
fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
suspend fun queueProcessed(taskResult: TaskResult) suspend fun queueProcessed(taskResult: TaskResult)
fun subscribeResult(producerId: UUID): Flow<TaskResults>
} }
@Singleton @Singleton
@ -53,7 +50,8 @@ class TaskManagementServiceImpl(
private val assignQueuedTaskDecider: AssignQueuedTaskDecider, private val assignQueuedTaskDecider: AssignQueuedTaskDecider,
private val retryPolicyFactory: RetryPolicyFactory, private val retryPolicyFactory: RetryPolicyFactory,
private val taskRepository: TaskRepository, private val taskRepository: TaskRepository,
private val queueScanner: QueueScanner private val queueScanner: QueueScanner,
private val taskResultRepository: TaskResultRepository
) : TaskManagementService { ) : TaskManagementService {
private var taskFlow: Flow<Task> = flowOf() private var taskFlow: Flow<Task> = flowOf()
@ -132,8 +130,49 @@ class TaskManagementServiceImpl(
val task = taskRepository.findById(taskResult.id) val task = taskRepository.findById(taskResult.id)
?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}") ?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}")
taskRepository.findByIdAndUpdate(taskResult.id,task.copy(completedAt = Instant.now())) val taskDefinition = taskDefinitionRepository.findByName(task.name)
//todo タスク完了後の処理を書く ?: throw TaskNotRegisterException("Task ${task.name} not definition.")
val completedAt = if (taskResult.success) {
Instant.now()
} else if (taskResult.attempt >= taskDefinition.maxRetry) {
Instant.now()
} else {
null
}
taskResultRepository.save(taskResult)
taskRepository.findByIdAndUpdate(
taskResult.id,
task.copy(completedAt = completedAt, attempt = taskResult.attempt)
)
}
override fun subscribeResult(producerId: UUID): Flow<TaskResults> {
return flow {
while (currentCoroutineContext().isActive) {
taskRepository
.findByPublishProducerIdAndCompletedAtIsNotNull(producerId)
.onEach {
val results = taskResultRepository.findByTaskId(it.id).toList()
emit(
TaskResults(
it.name,
it.id,
results.any { it.success },
it.attempt,
results
)
)
}
delay(500)
}
}
} }
companion object { companion object {

View File

@ -0,0 +1,28 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import java.util.*
data class TaskResults(
val name:String,
val id:UUID,
val success:Boolean,
val attempt:Int,
val results: List<TaskResult>
)

View File

@ -0,0 +1,15 @@
syntax = "proto3";
import "uuid.proto";
import "task_result.proto";
message TaskResults {
string name = 1;
UUID id = 2;
bool success = 3;
int32 attempt = 4;
repeated TaskResult results = 5;
}
service TaskResultSubscribeService {
rpc subscribe(UUID) returns (stream TaskResults);
}