feat: 例外処理を追加

This commit is contained in:
usbharu 2024-03-05 17:46:36 +09:00
parent 718d3f3157
commit 9182b040fc
12 changed files with 223 additions and 15 deletions

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.exception.repository
class FailedSaveException : RuntimeException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.exception.repository
open class RecordNotFoundException : RuntimeException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.exception.service
class IncompatibleTaskException : RuntimeException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.exception.service
class QueueCannotDequeueException : RuntimeException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.exception.service
class RetryPolicyNotFoundException : RuntimeException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.broker.domain.exception.service
class TaskNotRegisterException : RuntimeException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.broker.service package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.exception.repository.RecordNotFoundException
import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
@ -34,7 +35,8 @@ class AssignQueuedTaskDeciderImpl(
) : AssignQueuedTaskDecider { ) : AssignQueuedTaskDecider {
override fun findAssignableQueue(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> { override fun findAssignableQueue(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
return flow { return flow {
val consumer = consumerRepository.findById(consumerId) ?: TODO() val consumer = consumerRepository.findById(consumerId)
?: throw RecordNotFoundException("Consumer not found. id: $consumerId")
emitAll( emitAll(
queueStore.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( queueStore.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(
consumer.tasks, consumer.tasks,

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.broker.service package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.exception.service.QueueCannotDequeueException
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
@ -65,8 +66,9 @@ class QueuedTaskAssignerImpl(
queuedTask.assignedConsumer queuedTask.assignedConsumer
) )
assignedTaskQueue assignedTaskQueue
} catch (e: Exception) { } catch (e: QueueCannotDequeueException) {
TODO("Not yet implemented") logger.debug("Failed dequeue queue", e)
return null
} }
} }

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.broker.service package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.exception.service.IncompatibleTaskException
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
@ -30,6 +31,14 @@ interface RegisterTaskService {
@Singleton @Singleton
class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService { class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService {
override suspend fun registerTask(taskDefinition: TaskDefinition) { override suspend fun registerTask(taskDefinition: TaskDefinition) {
val definedTask = taskDefinitionRepository.findByName(taskDefinition.name)
if (definedTask != null) {
logger.debug("Task already defined. name: ${taskDefinition.name}")
if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) {
throw IncompatibleTaskException("Task ${taskDefinition.name} has already been defined, and the parameters are incompatible.")
}
return
}
taskDefinitionRepository.save(taskDefinition) taskDefinitionRepository.save(taskDefinition)
logger.info("Register a new task. name: {}",taskDefinition.name) logger.info("Register a new task. name: {}",taskDefinition.name)

View File

@ -16,15 +16,25 @@
package dev.usbharu.owl.broker.service package dev.usbharu.owl.broker.service
import dev.usbharu.owl.common.retry.ExponentialRetryPolicy import dev.usbharu.owl.broker.domain.exception.service.RetryPolicyNotFoundException
import dev.usbharu.owl.common.retry.RetryPolicy import dev.usbharu.owl.common.retry.RetryPolicy
import org.slf4j.LoggerFactory
interface RetryPolicyFactory { interface RetryPolicyFactory {
fun factory(name: String): RetryPolicy fun factory(name: String): RetryPolicy
} }
class DefaultRetryPolicyFactory(private val map: Map<String,RetryPolicy>) : RetryPolicyFactory { class DefaultRetryPolicyFactory(private val map: Map<String, RetryPolicy>) : RetryPolicyFactory {
override fun factory(name: String): RetryPolicy { override fun factory(name: String): RetryPolicy {
return map[name]?: ExponentialRetryPolicy() return map[name] ?: throwException(name)
}
private fun throwException(name: String): Nothing {
logger.warn("RetryPolicy not found. name: {}", name)
throw RetryPolicyNotFoundException("RetryPolicy not found. name: $name")
}
companion object {
private val logger = LoggerFactory.getLogger(RetryPolicyFactory::class.java)
} }
} }

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.broker.service package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.exception.service.TaskNotRegisterException
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.task.TaskRepository
@ -46,13 +47,13 @@ class TaskManagementServiceImpl(
private val taskRepository: TaskRepository private val taskRepository: TaskRepository
) : TaskManagementService { ) : TaskManagementService {
private var flow:Flow<Task> = flowOf() private var flow: Flow<Task> = flowOf()
override suspend fun startManagement() { override suspend fun startManagement() {
flow = taskScanner.startScan() flow = taskScanner.startScan()
flow.onEach { flow.onEach {
enqueueTask(it) enqueueTask(it)
}.collect() }.collect()
} }
@ -61,7 +62,7 @@ class TaskManagementServiceImpl(
return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent) return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent)
} }
private suspend fun enqueueTask(task: Task):QueuedTask{ private suspend fun enqueueTask(task: Task): QueuedTask {
val queuedTask = QueuedTask( val queuedTask = QueuedTask(
task.attempt + 1, task.attempt + 1,
@ -71,19 +72,21 @@ class TaskManagementServiceImpl(
null null
) )
val definedTask = taskDefinitionRepository.findByName(task.name)
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
val copy = task.copy( val copy = task.copy(
nextRetry = retryPolicyFactory.factory(taskDefinitionRepository.findByName(task.name)?.retryPolicy.orEmpty()) nextRetry = retryPolicyFactory.factory(definedTask.retryPolicy)
.nextRetry(Instant.now(), task.attempt) .nextRetry(Instant.now(), task.attempt)
) )
taskRepository.save(copy) taskRepository.save(copy)
queueStore.enqueue(queuedTask) queueStore.enqueue(queuedTask)
logger.debug("Enqueue Task. {} {}", task.name, task.id) logger.debug("Enqueue Task. name: {} id: {} attempt: {}", task.name, task.id, queuedTask.attempt)
return queuedTask return queuedTask
} }
companion object{ companion object {
private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java) private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java)
} }
} }

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.broker.service package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.exception.service.TaskNotRegisterException
import dev.usbharu.owl.broker.domain.model.task.Task import dev.usbharu.owl.broker.domain.model.task.Task
import dev.usbharu.owl.broker.domain.model.task.TaskRepository import dev.usbharu.owl.broker.domain.model.task.TaskRepository
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
@ -49,7 +50,8 @@ class TaskPublishServiceImpl(
override suspend fun publishTask(publishTask: PublishTask): PublishedTask { override suspend fun publishTask(publishTask: PublishTask): PublishedTask {
val id = UUID.randomUUID() val id = UUID.randomUUID()
val definition = taskDefinitionRepository.findByName(publishTask.name) ?: TODO() val definition = taskDefinitionRepository.findByName(publishTask.name)
?: throw TaskNotRegisterException("Task ${publishTask.name} not definition.")
val published = Instant.now() val published = Instant.now()
val nextRetry = retryPolicyFactory.factory(definition.name).nextRetry(published,0) val nextRetry = retryPolicyFactory.factory(definition.name).nextRetry(published,0)