feat: キューのタイムアウトを監視するように

This commit is contained in:
usbharu 2024-03-06 17:44:18 +09:00
parent 43f9bfbc61
commit 154e5efd57
Signed by: usbharu
GPG Key ID: 8CB1087135660B8D
9 changed files with 152 additions and 29 deletions

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.broker.mongodb package dev.usbharu.owl.broker.mongodb
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Filters.* import com.mongodb.client.model.Filters.*
import com.mongodb.client.model.FindOneAndUpdateOptions import com.mongodb.client.model.FindOneAndUpdateOptions
import com.mongodb.client.model.ReplaceOptions import com.mongodb.client.model.ReplaceOptions
@ -24,6 +25,8 @@ import com.mongodb.client.model.Updates.set
import com.mongodb.kotlin.client.coroutine.MongoDatabase import com.mongodb.kotlin.client.coroutine.MongoDatabase
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository
import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
@ -35,12 +38,15 @@ import java.time.Instant
import java.util.* import java.util.*
@Singleton @Singleton
class MongodbQueuedTaskRepository(private val propertySerializerFactory: PropertySerializerFactory,database: MongoDatabase) : QueuedTaskRepository { class MongodbQueuedTaskRepository(
private val propertySerializerFactory: PropertySerializerFactory,
database: MongoDatabase
) : QueuedTaskRepository {
private val collection = database.getCollection<QueuedTaskMongodb>("queued_task") private val collection = database.getCollection<QueuedTaskMongodb>("queued_task")
override suspend fun save(queuedTask: QueuedTask): QueuedTask { override suspend fun save(queuedTask: QueuedTask): QueuedTask {
collection.replaceOne( collection.replaceOne(
eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory,queuedTask), eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
ReplaceOptions().upsert(true) ReplaceOptions().upsert(true)
) )
return queuedTask return queuedTask
@ -75,6 +81,11 @@ class MongodbQueuedTaskRepository(private val propertySerializerFactory: Propert
) )
).map { it.toQueuedTask(propertySerializerFactory) } ).map { it.toQueuedTask(propertySerializerFactory) }
} }
override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask> {
return collection.find(Filters.lte(QueuedTaskMongodb::queuedAt.name, instant))
.map { it.toQueuedTask(propertySerializerFactory) }
}
} }
data class QueuedTaskMongodb( data class QueuedTaskMongodb(
@ -93,16 +104,55 @@ data class QueuedTaskMongodb(
attempt, attempt,
queuedAt, queuedAt,
task.toTask(propertySerializerFactory), task.toTask(propertySerializerFactory),
UUID.fromString(assignedConsumer), assignedConsumer?.let { UUID.fromString(it) },
assignedAt assignedAt
) )
} }
data class TaskMongodb(
val name: String,
val id: String,
val publishProducerId: String,
val publishedAt: Instant,
val nextRetry: Instant,
val completedAt: Instant?,
val attempt: Int,
val properties: Map<String, String>
) {
fun toTask(propertySerializerFactory: PropertySerializerFactory): Task {
return Task(
name = name,
id = UUID.fromString(id),
publishProducerId = UUID.fromString(publishProducerId),
publishedAt = publishedAt,
nextRetry = nextRetry,
completedAt = completedAt,
attempt = attempt,
properties = PropertySerializeUtils.deserialize(propertySerializerFactory, properties)
)
}
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
return TaskMongodb(
task.name,
task.id.toString(),
task.publishProducerId.toString(),
task.publishedAt,
task.nextRetry,
task.completedAt,
task.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
)
}
}
}
companion object { companion object {
fun of(propertySerializerFactory: PropertySerializerFactory,queuedTask: QueuedTask): QueuedTaskMongodb { fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb {
return QueuedTaskMongodb( return QueuedTaskMongodb(
queuedTask.task.id.toString(), queuedTask.task.id.toString(),
TaskMongodb.of(propertySerializerFactory,queuedTask.task), TaskMongodb.of(propertySerializerFactory, queuedTask.task),
queuedTask.attempt, queuedTask.attempt,
queuedTask.queuedAt, queuedTask.queuedAt,
queuedTask.assignedConsumer?.toString(), queuedTask.assignedConsumer?.toString(),

View File

@ -25,6 +25,9 @@ import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
import org.bson.codecs.pojo.annotations.BsonRepresentation
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
@ -50,6 +53,8 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
data class TaskMongodb( data class TaskMongodb(
val name: String, val name: String,
@BsonId
@BsonRepresentation(BsonType.STRING)
val id: String, val id: String,
val publishProducerId: String, val publishProducerId: String,
val publishedAt: Instant, val publishedAt: Instant,

View File

@ -18,6 +18,7 @@ package dev.usbharu.owl.broker
import dev.usbharu.owl.broker.service.DefaultRetryPolicyFactory import dev.usbharu.owl.broker.service.DefaultRetryPolicyFactory
import dev.usbharu.owl.broker.service.RetryPolicyFactory import dev.usbharu.owl.broker.service.RetryPolicyFactory
import dev.usbharu.owl.common.retry.ExponentialRetryPolicy
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.koin.core.context.startKoin import org.koin.core.context.startKoin
import org.koin.dsl.module import org.koin.dsl.module
@ -40,7 +41,7 @@ fun main() {
val module = module { val module = module {
single<RetryPolicyFactory> { single<RetryPolicyFactory> {
DefaultRetryPolicyFactory(emptyMap()) DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy()))
} }
} }
modules(module, defaultModule, moduleContext.module()) modules(module, defaultModule, moduleContext.module())

View File

@ -55,7 +55,7 @@ class OwlBrokerApplication(
) )
return coroutineScope.launch { return coroutineScope.launch {
taskManagementService.startManagement() taskManagementService.startManagement(this)
} }
} }

View File

@ -17,6 +17,7 @@
package dev.usbharu.owl.broker.domain.model.queuedtask package dev.usbharu.owl.broker.domain.model.queuedtask
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import java.time.Instant
import java.util.* import java.util.*
interface QueuedTaskRepository { interface QueuedTaskRepository {
@ -28,4 +29,6 @@ interface QueuedTaskRepository {
suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask
fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks:List<String>,limit:Int): Flow<QueuedTask> fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks:List<String>,limit:Int): Flow<QueuedTask>
fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask>
} }

View File

@ -0,0 +1,48 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import org.koin.core.annotation.Singleton
import java.time.Instant
interface QueueScanner {
fun startScan(): Flow<QueuedTask>
}
@Singleton
class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner {
override fun startScan(): Flow<QueuedTask> {
return flow {
while (currentCoroutineContext().isActive) {
emitAll(scanQueue())
delay(1000)
}
}
}
private fun scanQueue(): Flow<QueuedTask> {
return queueStore.findByQueuedAtBeforeAndAssignedConsumerIsNull(Instant.now().minusSeconds(10))
}
}

View File

@ -20,6 +20,7 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
import java.time.Instant
interface QueueStore { interface QueueStore {
suspend fun enqueue(queuedTask: QueuedTask) suspend fun enqueue(queuedTask: QueuedTask)
@ -28,6 +29,8 @@ interface QueueStore {
suspend fun dequeue(queuedTask: QueuedTask) suspend fun dequeue(queuedTask: QueuedTask)
suspend fun dequeueAll(queuedTaskList: List<QueuedTask>) suspend fun dequeueAll(queuedTaskList: List<QueuedTask>)
fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask> fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>
fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask>
} }
@Singleton @Singleton
@ -55,4 +58,8 @@ class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : Q
return queuedTaskRepository.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks, limit) return queuedTaskRepository.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks, limit)
} }
override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask> {
return queuedTaskRepository.findByQueuedAtBeforeAndAssignedConsumerIsNull(instant)
}
} }

View File

@ -21,10 +21,14 @@ import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.task.TaskRepository
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.time.Instant import java.time.Instant
@ -33,7 +37,7 @@ import java.util.*
interface TaskManagementService { interface TaskManagementService {
suspend fun startManagement() suspend fun startManagement(coroutineScope: CoroutineScope)
fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
} }
@ -44,17 +48,35 @@ class TaskManagementServiceImpl(
private val taskDefinitionRepository: TaskDefinitionRepository, private val taskDefinitionRepository: TaskDefinitionRepository,
private val assignQueuedTaskDecider: AssignQueuedTaskDecider, private val assignQueuedTaskDecider: AssignQueuedTaskDecider,
private val retryPolicyFactory: RetryPolicyFactory, private val retryPolicyFactory: RetryPolicyFactory,
private val taskRepository: TaskRepository private val taskRepository: TaskRepository,
private val queueScanner: QueueScanner
) : TaskManagementService { ) : TaskManagementService {
private var flow: Flow<Task> = flowOf() private var taskFlow: Flow<Task> = flowOf()
override suspend fun startManagement() { private var queueFlow: Flow<QueuedTask> = flowOf()
flow = taskScanner.startScan() override suspend fun startManagement(coroutineScope: CoroutineScope) {
taskFlow = taskScanner.startScan()
flow.onEach { queueFlow = queueScanner.startScan()
enqueueTask(it)
}.collect()
coroutineScope {
listOf(
launch {
taskFlow.onEach {
enqueueTask(it)
}.collect()
},
launch {
queueFlow.onEach {
logger.warn(
"Queue timed out. name: {} id: {} attempt: {}",
it.task.name,
it.task.id,
it.attempt
)
}.collect()
}
).joinAll()
}
} }

View File

@ -1,13 +0,0 @@
package dev.usbharu.owl.broker.service
import org.junit.jupiter.api.Test
class TaskManagementServiceImplTest {
@Test
fun findAssignableTask() {
val taskManagementServiceImpl = TaskManagementServiceImpl()
Thread.sleep(10000)
}
}