feat: キューがタイムアウトしたらそれを保存するように

This commit is contained in:
usbharu 2024-03-06 21:17:29 +09:00
parent 1e57f1ef4b
commit dcc1f582a9
Signed by: usbharu
GPG Key ID: 8CB1087135660B8D
14 changed files with 88 additions and 39 deletions

View File

@ -62,11 +62,13 @@ class MongodbQueuedTaskRepository(
val findOneAndUpdate = collection.findOneAndUpdate(
and(
eq("_id", id.toString()),
eq(QueuedTaskMongodb::assignedConsumer.name, null)
eq(QueuedTaskMongodb::isActive.name, true)
),
listOf(
set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer),
set(QueuedTaskMongodb::assignedAt.name, update.assignedAt)
set(QueuedTaskMongodb::assignedAt.name, update.assignedAt),
set(QueuedTaskMongodb::queuedAt.name, update.queuedAt),
set(QueuedTaskMongodb::isActive.name, update.isActive)
),
FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER)
)
@ -77,20 +79,25 @@ class MongodbQueuedTaskRepository(
}
}
override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(
override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(
tasks: List<String>,
limit: Int
): Flow<QueuedTask> {
return collection.find<QueuedTaskMongodb>(
and(
`in`("task.name", tasks),
eq(QueuedTaskMongodb::assignedConsumer.name, null)
eq(QueuedTaskMongodb::isActive.name, true)
)
).map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask> {
return collection.find(lte(QueuedTaskMongodb::queuedAt.name, instant))
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> {
return collection.find(
and(
lte(QueuedTaskMongodb::queuedAt.name, instant),
eq(QueuedTaskMongodb::isActive.name, true)
)
)
.map { it.toQueuedTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
}
@ -102,6 +109,8 @@ data class QueuedTaskMongodb(
val task: TaskMongodb,
val attempt: Int,
val queuedAt: Instant,
val isActive: Boolean,
val timeoutAt: Instant?,
val assignedConsumer: String?,
val assignedAt: Instant?
) {
@ -111,6 +120,8 @@ data class QueuedTaskMongodb(
attempt,
queuedAt,
task.toTask(propertySerializerFactory),
isActive,
timeoutAt,
assignedConsumer?.let { UUID.fromString(it) },
assignedAt
)
@ -155,6 +166,7 @@ data class QueuedTaskMongodb(
}
}
}
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb {
return QueuedTaskMongodb(
@ -162,6 +174,8 @@ data class QueuedTaskMongodb(
TaskMongodb.of(propertySerializerFactory, queuedTask.task),
queuedTask.attempt,
queuedTask.queuedAt,
queuedTask.isActive,
queuedTask.timeoutAt,
queuedTask.assignedConsumer?.toString(),
queuedTask.assignedAt
)

View File

@ -27,6 +27,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.singleOrNull
import kotlinx.coroutines.withContext
import org.bson.BsonType
import org.bson.codecs.pojo.annotations.BsonId
@ -52,6 +53,10 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp))
.map { it.toTask(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
override suspend fun findById(uuid: UUID): Task? = withContext(Dispatchers.IO) {
collection.find(Filters.eq(uuid.toString())).singleOrNull()?.toTask(propertySerializerFactory)
}
}
data class TaskMongodb(

View File

@ -22,11 +22,14 @@ import java.util.*
/**
* @param attempt キューされた時点での試行回数より1多い
* @param isActive trueならアサイン可能 falseならアサイン済みかタイムアウト等で無効
*/
data class QueuedTask(
val attempt: Int,
val queuedAt: Instant,
val task: Task,
val isActive: Boolean,
val timeoutAt: Instant?,
val assignedConsumer: UUID?,
val assignedAt:Instant?
val assignedAt: Instant?
)

View File

@ -28,7 +28,7 @@ interface QueuedTaskRepository {
*/
suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask
fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks:List<String>,limit:Int): Flow<QueuedTask>
fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>
fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask>
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
}

View File

@ -18,9 +18,12 @@ package dev.usbharu.owl.broker.domain.model.task
import kotlinx.coroutines.flow.Flow
import java.time.Instant
import java.util.*
interface TaskRepository {
suspend fun save(task: Task):Task
fun findByNextRetryBefore(timestamp:Instant): Flow<Task>
suspend fun findById(uuid: UUID): Task?
}

View File

@ -55,7 +55,7 @@ class TaskPublishService(
)
)
PublishedTask.newBuilder().setName(publishedTask.name).setId(publishedTask.id.toUUID()).build()
}catch (e:Error){
} catch (e: Throwable) {
logger.warn("exception ",e)
throw StatusException(Status.INTERNAL)
}

View File

@ -38,7 +38,7 @@ class AssignQueuedTaskDeciderImpl(
val consumer = consumerRepository.findById(consumerId)
?: throw RecordNotFoundException("Consumer not found. id: $consumerId")
emitAll(
queueStore.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(
queueStore.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(
consumer.tasks,
numberOfConcurrent
).take(numberOfConcurrent)

View File

@ -42,7 +42,7 @@ class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner {
}
private fun scanQueue(): Flow<QueuedTask> {
return queueStore.findByQueuedAtBeforeAndAssignedConsumerIsNull(Instant.now().minusSeconds(10))
return queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10))
}
}

View File

@ -28,9 +28,9 @@ interface QueueStore {
suspend fun dequeue(queuedTask: QueuedTask)
suspend fun dequeueAll(queuedTaskList: List<QueuedTask>)
fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>
fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>
fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask>
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
}
@Singleton
@ -51,15 +51,15 @@ class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : Q
return queuedTaskList.forEach { dequeue(it) }
}
override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(
override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(
tasks: List<String>,
limit: Int
): Flow<QueuedTask> {
return queuedTaskRepository.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks, limit)
return queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit)
}
override fun findByQueuedAtBeforeAndAssignedConsumerIsNull(instant: Instant): Flow<QueuedTask> {
return queuedTaskRepository.findByQueuedAtBeforeAndAssignedConsumerIsNull(instant)
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> {
return queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant)
}
}

View File

@ -28,33 +28,39 @@ import java.time.Instant
import java.util.*
interface QueuedTaskAssigner {
fun ready(consumerId: UUID,numberOfConcurrent:Int): Flow<QueuedTask>
fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
}
@Singleton
class QueuedTaskAssignerImpl(
private val taskManagementService: TaskManagementService,
private val queueStore: QueueStore
) : QueuedTaskAssigner{
) : QueuedTaskAssigner {
override fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
return flow {
taskManagementService.findAssignableTask(consumerId, numberOfConcurrent)
.onEach {
val assignTask = assignTask(it, consumerId)
val assignTask = assignTask(it, consumerId)
if (assignTask != null) {
emit(assignTask)
}
if (assignTask != null) {
emit(assignTask)
}
}
.collect()
}
}
private suspend fun assignTask(queuedTask: QueuedTask,consumerId: UUID):QueuedTask?{
private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? {
return try {
val assignedTaskQueue = queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now())
logger.trace("Try assign task: {} id: {} consumer: {}",queuedTask.task.name,queuedTask.task.id,consumerId)
val assignedTaskQueue =
queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false)
logger.trace(
"Try assign task: {} id: {} consumer: {}",
queuedTask.task.name,
queuedTask.task.id,
consumerId
)
queueStore.dequeue(assignedTaskQueue)
@ -72,7 +78,7 @@ class QueuedTaskAssignerImpl(
}
}
companion object{
companion object {
private val logger = LoggerFactory.getLogger(QueuedTaskAssignerImpl::class.java)
}
}

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.exception.repository.RecordNotFoundException
import dev.usbharu.owl.broker.domain.exception.service.TaskNotRegisterException
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import dev.usbharu.owl.broker.domain.model.task.Task
@ -67,12 +68,7 @@ class TaskManagementServiceImpl(
},
launch {
queueFlow.onEach {
logger.warn(
"Queue timed out. name: {} id: {} attempt: {}",
it.task.name,
it.task.id,
it.attempt
)
timeoutQueue(it)
}.collect()
}
).joinAll()
@ -90,6 +86,8 @@ class TaskManagementServiceImpl(
task.attempt + 1,
Instant.now(),
task,
isActive = true,
timeoutAt = null,
null,
null
)
@ -98,7 +96,7 @@ class TaskManagementServiceImpl(
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
val copy = task.copy(
nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy)
.nextRetry(Instant.now(), task.attempt)
.nextRetry(Instant.now(), queuedTask.attempt)
)
taskRepository.save(copy)
@ -108,6 +106,26 @@ class TaskManagementServiceImpl(
return queuedTask
}
private suspend fun timeoutQueue(queuedTask: QueuedTask) {
val timeoutQueue = queuedTask.copy(isActive = false, timeoutAt = Instant.now())
queueStore.dequeue(timeoutQueue)
val task = taskRepository.findById(timeoutQueue.task.id)
?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}")
val copy = task.copy(attempt = timeoutQueue.attempt)
logger.warn(
"Queue timed out. name: {} id: {} attempt: {}",
timeoutQueue.task.name,
timeoutQueue.task.id,
timeoutQueue.attempt
)
taskRepository.save(copy)
}
companion object {
private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java)
}

View File

@ -54,7 +54,7 @@ class TaskPublishServiceImpl(
?: throw TaskNotRegisterException("Task ${publishTask.name} not definition.")
val published = Instant.now()
val nextRetry = retryPolicyFactory.factory(definition.name).nextRetry(published,0)
val nextRetry = retryPolicyFactory.factory(definition.retryPolicy).nextRetry(published, 0)
val task = Task(
name = publishTask.name,

View File

@ -6,6 +6,6 @@ import kotlin.math.roundToLong
class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy {
override fun nextRetry(now: Instant, attempt: Int): Instant =
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()))
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - 30)
}

View File

@ -25,12 +25,12 @@ class ExponentialRetryPolicyTest {
fun exponential0() {
val nextRetry = ExponentialRetryPolicy().nextRetry(Instant.ofEpochSecond(300), 0)
assertEquals(Instant.ofEpochSecond(330), nextRetry)
assertEquals(Instant.ofEpochSecond(300), nextRetry)
}
@Test
fun exponential1() {
val nextRetry = ExponentialRetryPolicy().nextRetry(Instant.ofEpochSecond(300), 1)
assertEquals(Instant.ofEpochSecond(360), nextRetry)
assertEquals(Instant.ofEpochSecond(330), nextRetry)
}
}