diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/FailedSaveException.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/FailedSaveException.kt new file mode 100644 index 0000000..4cb6830 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/FailedSaveException.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.exception.repository + +class FailedSaveException : RuntimeException { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super( + message, + cause, + enableSuppression, + writableStackTrace + ) +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/RecordNotFoundException.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/RecordNotFoundException.kt new file mode 100644 index 0000000..62921c4 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/RecordNotFoundException.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.exception.repository + +open class RecordNotFoundException : RuntimeException { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super( + message, + cause, + enableSuppression, + writableStackTrace + ) +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/IncompatibleTaskException.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/IncompatibleTaskException.kt new file mode 100644 index 0000000..9f940bc --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/IncompatibleTaskException.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.exception.service + +class IncompatibleTaskException : RuntimeException { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super( + message, + cause, + enableSuppression, + writableStackTrace + ) +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/QueueCannotDequeueException.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/QueueCannotDequeueException.kt new file mode 100644 index 0000000..49b47db --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/QueueCannotDequeueException.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.exception.service + +class QueueCannotDequeueException : RuntimeException { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super( + message, + cause, + enableSuppression, + writableStackTrace + ) +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/RetryPolicyNotFoundException.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/RetryPolicyNotFoundException.kt new file mode 100644 index 0000000..dd6954b --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/RetryPolicyNotFoundException.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.exception.service + +class RetryPolicyNotFoundException : RuntimeException { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super( + message, + cause, + enableSuppression, + writableStackTrace + ) +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/TaskNotRegisterException.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/TaskNotRegisterException.kt new file mode 100644 index 0000000..ca89416 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/TaskNotRegisterException.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.exception.service + +class TaskNotRegisterException : RuntimeException { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super( + message, + cause, + enableSuppression, + writableStackTrace + ) +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt index 8e89722..ff12278 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt @@ -16,6 +16,7 @@ package dev.usbharu.owl.broker.service +import dev.usbharu.owl.broker.domain.exception.repository.RecordNotFoundException import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import kotlinx.coroutines.flow.Flow @@ -34,7 +35,8 @@ class AssignQueuedTaskDeciderImpl( ) : AssignQueuedTaskDecider { override fun findAssignableQueue(consumerId: UUID, numberOfConcurrent: Int): Flow { return flow { - val consumer = consumerRepository.findById(consumerId) ?: TODO() + val consumer = consumerRepository.findById(consumerId) + ?: throw RecordNotFoundException("Consumer not found. id: $consumerId") emitAll( queueStore.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( consumer.tasks, diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt index 40dde20..a809382 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt @@ -16,6 +16,7 @@ package dev.usbharu.owl.broker.service +import dev.usbharu.owl.broker.domain.exception.service.QueueCannotDequeueException import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect @@ -65,8 +66,9 @@ class QueuedTaskAssignerImpl( queuedTask.assignedConsumer ) assignedTaskQueue - } catch (e: Exception) { - TODO("Not yet implemented") + } catch (e: QueueCannotDequeueException) { + logger.debug("Failed dequeue queue", e) + return null } } diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt index c3b8c46..32436a1 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt @@ -16,6 +16,7 @@ package dev.usbharu.owl.broker.service +import dev.usbharu.owl.broker.domain.exception.service.IncompatibleTaskException import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository import org.koin.core.annotation.Singleton @@ -30,6 +31,14 @@ interface RegisterTaskService { @Singleton class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService { override suspend fun registerTask(taskDefinition: TaskDefinition) { + val definedTask = taskDefinitionRepository.findByName(taskDefinition.name) + if (definedTask != null) { + logger.debug("Task already defined. name: ${taskDefinition.name}") + if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) { + throw IncompatibleTaskException("Task ${taskDefinition.name} has already been defined, and the parameters are incompatible.") + } + return + } taskDefinitionRepository.save(taskDefinition) logger.info("Register a new task. name: {}",taskDefinition.name) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicyFactory.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicyFactory.kt index 3b4d851..58df5d6 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicyFactory.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicyFactory.kt @@ -16,15 +16,25 @@ package dev.usbharu.owl.broker.service -import dev.usbharu.owl.common.retry.ExponentialRetryPolicy +import dev.usbharu.owl.broker.domain.exception.service.RetryPolicyNotFoundException import dev.usbharu.owl.common.retry.RetryPolicy +import org.slf4j.LoggerFactory interface RetryPolicyFactory { fun factory(name: String): RetryPolicy } -class DefaultRetryPolicyFactory(private val map: Map) : RetryPolicyFactory { +class DefaultRetryPolicyFactory(private val map: Map) : RetryPolicyFactory { override fun factory(name: String): RetryPolicy { - return map[name]?: ExponentialRetryPolicy() + return map[name] ?: throwException(name) + } + + private fun throwException(name: String): Nothing { + logger.warn("RetryPolicy not found. name: {}", name) + throw RetryPolicyNotFoundException("RetryPolicy not found. name: $name") + } + + companion object { + private val logger = LoggerFactory.getLogger(RetryPolicyFactory::class.java) } } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt index d575124..6b3af78 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -16,6 +16,7 @@ package dev.usbharu.owl.broker.service +import dev.usbharu.owl.broker.domain.exception.service.TaskNotRegisterException import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.TaskRepository @@ -46,13 +47,13 @@ class TaskManagementServiceImpl( private val taskRepository: TaskRepository ) : TaskManagementService { - private var flow:Flow = flowOf() + private var flow: Flow = flowOf() override suspend fun startManagement() { flow = taskScanner.startScan() - flow.onEach { - enqueueTask(it) - }.collect() + flow.onEach { + enqueueTask(it) + }.collect() } @@ -61,7 +62,7 @@ class TaskManagementServiceImpl( return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent) } - private suspend fun enqueueTask(task: Task):QueuedTask{ + private suspend fun enqueueTask(task: Task): QueuedTask { val queuedTask = QueuedTask( task.attempt + 1, @@ -71,19 +72,21 @@ class TaskManagementServiceImpl( null ) + val definedTask = taskDefinitionRepository.findByName(task.name) + ?: throw TaskNotRegisterException("Task ${task.name} not definition.") val copy = task.copy( - nextRetry = retryPolicyFactory.factory(taskDefinitionRepository.findByName(task.name)?.retryPolicy.orEmpty()) + nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy) .nextRetry(Instant.now(), task.attempt) ) taskRepository.save(copy) queueStore.enqueue(queuedTask) - logger.debug("Enqueue Task. {} {}", task.name, task.id) + logger.debug("Enqueue Task. name: {} id: {} attempt: {}", task.name, task.id, queuedTask.attempt) return queuedTask } - companion object{ + companion object { private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java) } } \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt index 288bc16..a1812c3 100644 --- a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt @@ -16,6 +16,7 @@ package dev.usbharu.owl.broker.service +import dev.usbharu.owl.broker.domain.exception.service.TaskNotRegisterException import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository @@ -49,7 +50,8 @@ class TaskPublishServiceImpl( override suspend fun publishTask(publishTask: PublishTask): PublishedTask { val id = UUID.randomUUID() - val definition = taskDefinitionRepository.findByName(publishTask.name) ?: TODO() + val definition = taskDefinitionRepository.findByName(publishTask.name) + ?: throw TaskNotRegisterException("Task ${publishTask.name} not definition.") val published = Instant.now() val nextRetry = retryPolicyFactory.factory(definition.name).nextRetry(published,0)