feat: Producerにタスク完了を通知する方法を追加

This commit is contained in:
usbharu 2024-03-09 13:36:16 +09:00
parent dab52d25a4
commit e9277db29f
Signed by: usbharu
GPG Key ID: 6556747BF94EEBC8
11 changed files with 274 additions and 13 deletions

View File

@ -73,6 +73,19 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
override suspend fun findById(uuid: UUID): Task? = withContext(Dispatchers.IO) {
collection.find(Filters.eq(uuid.toString())).singleOrNull()?.toTask(propertySerializerFactory)
}
override suspend fun findByIdAndUpdate(id: UUID, task: Task) {
collection.replaceOne(
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task),
ReplaceOptions().upsert(false)
)
}
override suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> {
return collection
.find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString()))
.map { it.toTask(propertySerializerFactory) }
}
}
data class TaskMongodb(

View File

@ -0,0 +1,93 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.mongodb
import com.mongodb.client.model.Filters
import com.mongodb.client.model.ReplaceOptions
import com.mongodb.kotlin.client.coroutine.MongoDatabase
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
import org.bson.codecs.pojo.annotations.BsonRepresentation
import org.koin.core.annotation.Singleton
import java.util.*
@Singleton
class MongodbTaskResultRepository(
database: MongoDatabase,
private val propertySerializerFactory: PropertySerializerFactory
) : TaskResultRepository {
private val collection = database.getCollection<TaskResultMongodb>("task_results")
override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) {
collection.replaceOne(
Filters.eq(taskResult.id.toString()), TaskResultMongodb.of(propertySerializerFactory, taskResult),
ReplaceOptions().upsert(true)
)
return@withContext taskResult
}
override fun findByTaskId(id: UUID): Flow<TaskResult> {
return collection.find(Filters.eq(id.toString())).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
}
data class TaskResultMongodb(
@BsonId
@BsonRepresentation(BsonType.STRING)
val id: String,
val taskId: String,
val success: Boolean,
val attempt: Int,
val result: Map<String, String>,
val message: String
) {
fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult {
return TaskResult(
UUID.fromString(id),
UUID.fromString(taskId),
success,
attempt,
PropertySerializeUtils.deserialize(propertySerializerFactory, result),
message
)
}
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb {
return TaskResultMongodb(
taskResult.id.toString(),
taskResult.taskId.toString(),
taskResult.success,
taskResult.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result),
taskResult.message
)
}
}
}

View File

@ -33,7 +33,8 @@ class OwlBrokerApplication(
private val producerService: ProducerService,
private val subscribeTaskService: SubscribeTaskService,
private val taskPublishService: TaskPublishService,
private val taskManagementService: TaskManagementService
private val taskManagementService: TaskManagementService,
private val taskResultSubscribeService: TaskResultSubscribeService
) {
private lateinit var server: Server
@ -45,6 +46,7 @@ class OwlBrokerApplication(
.addService(producerService)
.addService(subscribeTaskService)
.addService(taskPublishService)
.addService(taskResultSubscribeService)
.build()
server.start()

View File

@ -30,4 +30,6 @@ interface TaskRepository {
suspend fun findById(uuid: UUID): Task?
suspend fun findByIdAndUpdate(id:UUID,task: Task)
suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId:UUID):Flow<Task>
}

View File

@ -21,6 +21,7 @@ import java.util.*
data class TaskResult(
val id: UUID,
val taskId:UUID,
val success: Boolean,
val attempt: Int,
val result: Map<String, PropertyValue<*>>,

View File

@ -16,6 +16,10 @@
package dev.usbharu.owl.broker.domain.model.taskresult
import kotlinx.coroutines.flow.Flow
import java.util.*
interface TaskResultRepository {
suspend fun save(taskResult: TaskResult):TaskResult
fun findByTaskId(id:UUID): Flow<TaskResult>
}

View File

@ -27,6 +27,7 @@ import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@ -40,7 +41,8 @@ class TaskResultService(
requests.onEach {
taskManagementService.queueProcessed(
TaskResult(
id = it.id.toUUID(),
id = UUID.randomUUID(),
taskId = it.id.toUUID(),
success = it.success,
attempt = it.attempt,
result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap),

View File

@ -0,0 +1,62 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.interfaces.grpc
import TaskResultProducer.TaskResults
import TaskResultSubscribeServiceGrpcKt
import dev.usbharu.owl.Uuid
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.TaskManagementService
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.taskResult
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.koin.core.annotation.Singleton
import taskResults
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@Singleton
class TaskResultSubscribeService(
private val taskManagementService: TaskManagementService,
private val propertySerializerFactory: PropertySerializerFactory,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) :
TaskResultSubscribeServiceGrpcKt.TaskResultSubscribeServiceCoroutineImplBase(coroutineContext) {
override fun subscribe(request: Uuid.UUID): Flow<TaskResults> {
return taskManagementService
.subscribeResult(request.toUUID())
.map {
taskResults {
id = it.id.toUUID()
name = it.name
attempt = it.attempt
success = it.success
results.addAll(it.results.map {
taskResult {
id = it.taskId.toUUID()
success = it.success
attempt = it.attempt
result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result))
message = it.message
}
})
}
}
}
}

View File

@ -23,14 +23,9 @@ import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.broker.domain.model.task.TaskRepository
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.koin.core.annotation.Singleton
import org.slf4j.LoggerFactory
import java.time.Instant
@ -43,6 +38,8 @@ interface TaskManagementService {
fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
suspend fun queueProcessed(taskResult: TaskResult)
fun subscribeResult(producerId: UUID): Flow<TaskResults>
}
@Singleton
@ -53,7 +50,8 @@ class TaskManagementServiceImpl(
private val assignQueuedTaskDecider: AssignQueuedTaskDecider,
private val retryPolicyFactory: RetryPolicyFactory,
private val taskRepository: TaskRepository,
private val queueScanner: QueueScanner
private val queueScanner: QueueScanner,
private val taskResultRepository: TaskResultRepository
) : TaskManagementService {
private var taskFlow: Flow<Task> = flowOf()
@ -132,8 +130,49 @@ class TaskManagementServiceImpl(
val task = taskRepository.findById(taskResult.id)
?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}")
taskRepository.findByIdAndUpdate(taskResult.id,task.copy(completedAt = Instant.now()))
//todo タスク完了後の処理を書く
val taskDefinition = taskDefinitionRepository.findByName(task.name)
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
val completedAt = if (taskResult.success) {
Instant.now()
} else if (taskResult.attempt >= taskDefinition.maxRetry) {
Instant.now()
} else {
null
}
taskResultRepository.save(taskResult)
taskRepository.findByIdAndUpdate(
taskResult.id,
task.copy(completedAt = completedAt, attempt = taskResult.attempt)
)
}
override fun subscribeResult(producerId: UUID): Flow<TaskResults> {
return flow {
while (currentCoroutineContext().isActive) {
taskRepository
.findByPublishProducerIdAndCompletedAtIsNotNull(producerId)
.onEach {
val results = taskResultRepository.findByTaskId(it.id).toList()
emit(
TaskResults(
it.name,
it.id,
results.any { it.success },
it.attempt,
results
)
)
}
delay(500)
}
}
}
companion object {

View File

@ -0,0 +1,28 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import java.util.*
data class TaskResults(
val name:String,
val id:UUID,
val success:Boolean,
val attempt:Int,
val results: List<TaskResult>
)

View File

@ -0,0 +1,15 @@
syntax = "proto3";
import "uuid.proto";
import "task_result.proto";
message TaskResults {
string name = 1;
UUID id = 2;
bool success = 3;
int32 attempt = 4;
repeated TaskResult results = 5;
}
service TaskResultSubscribeService {
rpc subscribe(UUID) returns (stream TaskResults);
}