From 4bc9e3abd28cff8e8e9f7867a72685bb0c39dc67 Mon Sep 17 00:00:00 2001 From: usbharu Date: Tue, 17 Sep 2024 22:02:29 +0900 Subject: [PATCH] style: fix lint --- owl/build.gradle.kts | 20 +- .../owl/broker/mongodb/MongoModuleContext.kt | 4 +- .../mongodb/MongodbConsumerRepository.kt | 22 ++- .../mongodb/MongodbProducerRepository.kt | 2 +- .../mongodb/MongodbQueuedTaskRepository.kt | 42 ++--- .../MongodbTaskDefinitionRepository.kt | 4 +- .../broker/mongodb/MongodbTaskRepository.kt | 43 +++-- .../mongodb/MongodbTaskResultRepository.kt | 35 ++-- .../kotlin/dev/usbharu/owl/broker/Main.kt | 4 +- .../dev/usbharu/owl/broker/ModuleContext.kt | 8 +- .../owl/broker/OwlBrokerApplication.kt | 5 +- .../exception/InvalidRepositoryException.kt | 2 +- .../repository/FailedSaveException.kt | 2 +- .../repository/RecordNotFoundException.kt | 2 +- .../service/IncompatibleTaskException.kt | 2 +- .../service/QueueCannotDequeueException.kt | 2 +- .../service/TaskNotRegisterException.kt | 2 +- .../model/consumer/ConsumerRepository.kt | 6 +- .../broker/domain/model/producer/Producer.kt | 8 +- .../model/producer/ProducerRepository.kt | 4 +- .../model/queuedtask/QueuedTaskRepository.kt | 6 +- .../owl/broker/domain/model/task/Task.kt | 6 +- .../domain/model/task/TaskRepository.kt | 12 +- .../model/taskdefinition/TaskDefinition.kt | 2 +- .../TaskDefinitionRepository.kt | 6 +- .../domain/model/taskresult/TaskResult.kt | 2 +- .../model/taskresult/TaskResultRepository.kt | 6 +- .../owl/broker/external/GrpcExtension.kt | 2 +- .../interfaces/grpc/AssignmentTaskService.kt | 5 +- .../interfaces/grpc/DefinitionTaskService.kt | 19 +- .../broker/interfaces/grpc/ProducerService.kt | 10 +- .../interfaces/grpc/SubscribeTaskService.kt | 2 +- .../interfaces/grpc/TaskPublishService.kt | 7 +- .../interfaces/grpc/TaskResultService.kt | 2 +- .../grpc/TaskResultSubscribeService.kt | 20 +- .../broker/service/AssignQueuedTaskDecider.kt | 4 +- .../owl/broker/service/ConsumerService.kt | 2 +- .../DefaultPropertySerializerFactory.kt | 2 +- .../owl/broker/service/ProducerService.kt | 4 +- .../owl/broker/service/QueueScanner.kt | 19 +- .../usbharu/owl/broker/service/QueueStore.kt | 21 +-- .../owl/broker/service/QueuedTaskAssigner.kt | 4 +- .../owl/broker/service/RegisterTaskService.kt | 15 +- .../broker/service/TaskManagementService.kt | 12 +- .../owl/broker/service/TaskPublishService.kt | 19 +- .../usbharu/owl/broker/service/TaskResults.kt | 8 +- .../usbharu/owl/broker/service/TaskScanner.kt | 2 +- .../src/main/kotlin/dev/usbharu/Main.kt | 2 +- .../common/property/ObjectPropertyValue.kt | 3 +- .../dev/usbharu/owl/common/ReflectionUtils.kt | 5 +- .../common/property/BooleanPropertyValue.kt | 19 +- .../CustomPropertySerializerFactory.kt | 11 +- .../common/property/DoublePropertyValue.kt | 19 +- .../owl/common/property/FloatPropertyValue.kt | 19 +- .../common/property/IntegerPropertyValue.kt | 19 +- .../owl/common/property/LongPropertyValue.kt | 19 +- .../property/PropertySerializeException.kt | 2 +- .../common/property/PropertySerializeUtils.kt | 2 +- .../owl/common/property/PropertySerializer.kt | 2 +- .../property/PropertySerializerFactory.kt | 2 +- .../owl/common/property/PropertyType.kt | 2 +- .../owl/common/property/PropertyValue.kt | 8 +- .../common/property/StringPropertyValue.kt | 18 +- .../common/retry/ExponentialRetryPolicy.kt | 3 +- .../usbharu/owl/common/retry/RetryPolicy.kt | 2 +- .../owl/common/retry/RetryPolicyFactory.kt | 7 +- .../retry/RetryPolicyNotFoundException.kt | 2 +- .../owl/common/task/PropertyDefinition.kt | 2 - .../usbharu/owl/common/task/PublishedTask.kt | 2 +- .../dev/usbharu/owl/common/task/Task.kt | 2 +- .../usbharu/owl/common/task/TaskDefinition.kt | 3 +- .../owl/consumer/AbstractTaskRunner.kt | 3 +- .../dev/usbharu/owl/consumer/Consumer.kt | 171 ++++++++++-------- .../kotlin/dev/usbharu/owl/consumer/Main.kt | 3 +- .../consumer/ServiceLoaderTaskRunnerLoader.kt | 2 +- .../owl/consumer/StandaloneConsumer.kt | 11 +- .../StandaloneConsumerConfigLoader.kt | 2 +- .../dev/usbharu/owl/consumer/TaskRequest.kt | 8 +- .../dev/usbharu/owl/consumer/TaskResult.kt | 2 +- .../dev/usbharu/owl/consumer/TaskRunner.kt | 2 +- .../usbharu/owl/consumer/TaskRunnerLoader.kt | 2 +- .../owl/producer/api/OwlProducerBuilder.kt | 2 +- .../producer/api/OwlProducerBuilderConfig.kt | 2 +- .../owl/producer/api/OwlProducerConfig.kt | 2 +- .../defaultimpl/DefaultOwlProducer.kt | 33 ++-- .../defaultimpl/DefaultOwlProducerBuilder.kt | 2 +- .../defaultimpl/DefaultOwlProducerConfig.kt | 2 +- .../embedded/EmbeddedGrpcOwlProducer.kt | 2 +- .../EmbeddedGrpcOwlProducerBuilder.kt | 2 +- .../embedded/EmbeddedGrpcOwlProducerConfig.kt | 2 +- .../producer/embedded/EmbeddedOwlProducer.kt | 3 +- .../embedded/EmbeddedOwlProducerBuilder.kt | 3 +- 92 files changed, 413 insertions(+), 461 deletions(-) diff --git a/owl/build.gradle.kts b/owl/build.gradle.kts index 5ea6ae9e..5443b6b6 100644 --- a/owl/build.gradle.kts +++ b/owl/build.gradle.kts @@ -53,7 +53,6 @@ subprojects { autoCorrect = true } - project.gradle.taskGraph.whenReady { if (this.hasTask(":koverGenerateArtifact")) { val task = this.allTasks.find { println(it.name);it.name == "test" } @@ -61,8 +60,23 @@ subprojects { verificationTask.ignoreFailures = true } } - tasks.test { - useJUnitPlatform() + tasks { + withType { + exclude("**/generated/**") + setSource("src/main/kotlin") + exclude("build/") + configureEach { + exclude("**/org/koin/ksp/generated/**", "**/generated/**") + } + } + withType() { + configureEach { + exclude("**/org/koin/ksp/generated/**", "**/generated/**") + } + } + withType { + useJUnitPlatform() + } } publishing { diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt index 5c1af4fb..472207ea 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt @@ -32,7 +32,6 @@ import org.koin.dsl.module class MongoModuleContext : ModuleContext { override fun module(): Module { - return module { single { val clientSettings = @@ -47,7 +46,6 @@ class MongoModuleContext : ModuleContext { ) .uuidRepresentation(UuidRepresentation.STANDARD).build() - MongoClient.create(clientSettings) .getDatabase(System.getProperty("owl.broker.mongo.database", "mongo-test")) } @@ -59,4 +57,4 @@ class MongoModuleContext : ModuleContext { single { MongodbTaskResultRepository(get(), get()) } } } -} \ No newline at end of file +} diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt index 4310b956..3d4eef8d 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt @@ -33,7 +33,11 @@ class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository { private val collection = database.getCollection("consumers") override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) { - collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true)) + collection.replaceOne( + Filters.eq("_id", consumer.id.toString()), + ConsumerMongodb.of(consumer), + ReplaceOptions().upsert(true) + ) return@withContext consumer } @@ -49,15 +53,19 @@ data class ConsumerMongodb( val name: String, val hostname: String, val tasks: List -){ +) { - fun toConsumer():Consumer{ + fun toConsumer(): Consumer { return Consumer( - UUID.fromString(id), name, hostname, tasks + UUID.fromString(id), + name, + hostname, + tasks ) } - companion object{ - fun of(consumer: Consumer):ConsumerMongodb{ + + companion object { + fun of(consumer: Consumer): ConsumerMongodb { return ConsumerMongodb( consumer.id.toString(), consumer.name, @@ -66,4 +74,4 @@ data class ConsumerMongodb( ) } } -} \ No newline at end of file +} diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt index 76d9a755..6fba0352 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt @@ -68,4 +68,4 @@ data class ProducerMongodb( ) } } -} \ No newline at end of file +} diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt index 833baca9..720ac9dc 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -48,7 +48,8 @@ class MongodbQueuedTaskRepository( override suspend fun save(queuedTask: QueuedTask): QueuedTask { withContext(Dispatchers.IO) { collection.replaceOne( - eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), + eq("_id", queuedTask.task.id.toString()), + QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), ReplaceOptions().upsert(true) ) } @@ -57,7 +58,6 @@ class MongodbQueuedTaskRepository( override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask { return withContext(Dispatchers.IO) { - val findOneAndUpdate = collection.findOneAndUpdate( and( eq("_id", id.toString()), @@ -108,7 +108,7 @@ data class QueuedTaskMongodb( val task: TaskMongodb, val attempt: Int, val queuedAt: Instant, - val priority:Int, + val priority: Int, val isActive: Boolean, val timeoutAt: Instant?, val assignedConsumer: String?, @@ -155,14 +155,14 @@ data class QueuedTaskMongodb( companion object { fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb { return TaskMongodb( - task.name, - task.id.toString(), - task.publishProducerId.toString(), - task.publishedAt, - task.nextRetry, - task.completedAt, - task.attempt, - PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) + name = task.name, + id = task.id.toString(), + publishProducerId = task.publishProducerId.toString(), + publishedAt = task.publishedAt, + nextRetry = task.nextRetry, + completedAt = task.completedAt, + attempt = task.attempt, + properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) ) } } @@ -171,16 +171,16 @@ data class QueuedTaskMongodb( companion object { fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb { return QueuedTaskMongodb( - queuedTask.task.id.toString(), - TaskMongodb.of(propertySerializerFactory, queuedTask.task), - queuedTask.attempt, - queuedTask.queuedAt, - queuedTask.priority, - queuedTask.isActive, - queuedTask.timeoutAt, - queuedTask.assignedConsumer?.toString(), - queuedTask.assignedAt + id = queuedTask.task.id.toString(), + task = TaskMongodb.of(propertySerializerFactory, queuedTask.task), + attempt = queuedTask.attempt, + queuedAt = queuedTask.queuedAt, + priority = queuedTask.priority, + isActive = queuedTask.isActive, + timeoutAt = queuedTask.timeoutAt, + assignedConsumer = queuedTask.assignedConsumer?.toString(), + assignedAt = queuedTask.assignedAt ) } } -} \ No newline at end of file +} diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt index f3b384a1..e51177cc 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt @@ -41,7 +41,7 @@ class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionR } override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) { - collection.deleteOne(Filters.eq("_id",name)) + collection.deleteOne(Filters.eq("_id", name)) } override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) { @@ -82,4 +82,4 @@ data class TaskDefinitionMongodb( ) } } -} \ No newline at end of file +} diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt index 2745b0cd..e0299652 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -36,27 +36,29 @@ import org.bson.codecs.pojo.annotations.BsonRepresentation import java.time.Instant import java.util.* - class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) : TaskRepository { private val collection = database.getCollection("tasks") override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) { collection.replaceOne( - Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), + Filters.eq("_id", task.id.toString()), + TaskMongodb.of(propertySerializerFactory, task), ReplaceOptions().upsert(true) ) return@withContext task } override suspend fun saveAll(tasks: List): Unit = withContext(Dispatchers.IO) { - collection.bulkWrite(tasks.map { - ReplaceOneModel( - Filters.eq(it.id.toString()), - TaskMongodb.of(propertySerializerFactory, it), - ReplaceOptions().upsert(true) - ) - }) + collection.bulkWrite( + tasks.map { + ReplaceOneModel( + Filters.eq(it.id.toString()), + TaskMongodb.of(propertySerializerFactory, it), + ReplaceOptions().upsert(true) + ) + } + ) } override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow { @@ -75,12 +77,13 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali override suspend fun findByIdAndUpdate(id: UUID, task: Task) { collection.replaceOne( - Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), + Filters.eq("_id", task.id.toString()), + TaskMongodb.of(propertySerializerFactory, task), ReplaceOptions().upsert(false) ) } - override suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow { + override fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow { return collection .find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString())) .map { it.toTask(propertySerializerFactory) } @@ -116,15 +119,15 @@ data class TaskMongodb( companion object { fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb { return TaskMongodb( - task.name, - task.id.toString(), - task.publishProducerId.toString(), - task.publishedAt, - task.nextRetry, - task.completedAt, - task.attempt, - PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) + name = task.name, + id = task.id.toString(), + publishProducerId = task.publishProducerId.toString(), + publishedAt = task.publishedAt, + nextRetry = task.nextRetry, + completedAt = task.completedAt, + attempt = task.attempt, + properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) ) } } -} \ No newline at end of file +} diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt index 2336a45c..cc671a65 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskResultRepository.kt @@ -41,14 +41,17 @@ class MongodbTaskResultRepository( private val collection = database.getCollection("task_results") override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) { collection.replaceOne( - Filters.eq(taskResult.id.toString()), TaskResultMongodb.of(propertySerializerFactory, taskResult), + Filters.eq(taskResult.id.toString()), + TaskResultMongodb.of(propertySerializerFactory, taskResult), ReplaceOptions().upsert(true) ) return@withContext taskResult } override fun findByTaskId(id: UUID): Flow { - return collection.find(Filters.eq(id.toString())).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO) + return collection.find( + Filters.eq(id.toString()) + ).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO) } } @@ -65,27 +68,25 @@ data class TaskResultMongodb( fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult { return TaskResult( - UUID.fromString(id), - UUID.fromString(taskId), - success, - attempt, - PropertySerializeUtils.deserialize(propertySerializerFactory, result), - message + id = UUID.fromString(id), + taskId = UUID.fromString(taskId), + success = success, + attempt = attempt, + result = PropertySerializeUtils.deserialize(propertySerializerFactory, result), + message = message ) } companion object { fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb { return TaskResultMongodb( - taskResult.id.toString(), - taskResult.taskId.toString(), - taskResult.success, - taskResult.attempt, - PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result), - taskResult.message + id = taskResult.id.toString(), + taskId = taskResult.taskId.toString(), + success = taskResult.success, + attempt = taskResult.attempt, + result = PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result), + message = taskResult.message ) - } - } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt index d5f25364..2a0ebb1f 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt @@ -90,7 +90,6 @@ fun main() { logger.info("Use module name: {}", moduleContext) - val koin = startKoin { printLogger() @@ -98,7 +97,6 @@ fun main() { single { DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy())) } - } modules(mainModule, module, moduleContext.module()) } @@ -108,4 +106,4 @@ fun main() { runBlocking { application.start(50051).join() } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/ModuleContext.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/ModuleContext.kt index fbb32f7c..a75dffb3 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/ModuleContext.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/ModuleContext.kt @@ -19,11 +19,9 @@ package dev.usbharu.owl.broker import org.koin.core.module.Module interface ModuleContext { - fun module():Module + fun module(): Module } data object EmptyModuleContext : ModuleContext { - override fun module(): Module { - return org.koin.dsl.module { } - } -} \ No newline at end of file + override fun module(): Module = org.koin.dsl.module { } +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt index a67661ca..96b824ce 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt @@ -38,7 +38,7 @@ class OwlBrokerApplication( private lateinit var server: Server - fun start(port: Int,coroutineScope: CoroutineScope = GlobalScope):Job { + fun start(port: Int, coroutineScope: CoroutineScope = GlobalScope): Job { server = ServerBuilder.forPort(port) .addService(assignmentTaskService) .addService(definitionTaskService) @@ -64,5 +64,4 @@ class OwlBrokerApplication( fun stop() { server.shutdown() } - -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/InvalidRepositoryException.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/InvalidRepositoryException.kt index 9019c314..a48232fa 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/InvalidRepositoryException.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/InvalidRepositoryException.kt @@ -27,4 +27,4 @@ class InvalidRepositoryException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/FailedSaveException.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/FailedSaveException.kt index 4cb68305..782ac6ab 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/FailedSaveException.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/FailedSaveException.kt @@ -27,4 +27,4 @@ class FailedSaveException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/RecordNotFoundException.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/RecordNotFoundException.kt index 62921c46..2a28a662 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/RecordNotFoundException.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/repository/RecordNotFoundException.kt @@ -27,4 +27,4 @@ open class RecordNotFoundException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/IncompatibleTaskException.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/IncompatibleTaskException.kt index 9f940bc2..54fe2717 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/IncompatibleTaskException.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/IncompatibleTaskException.kt @@ -27,4 +27,4 @@ class IncompatibleTaskException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/QueueCannotDequeueException.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/QueueCannotDequeueException.kt index 49b47dbf..c89df8cc 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/QueueCannotDequeueException.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/QueueCannotDequeueException.kt @@ -27,4 +27,4 @@ class QueueCannotDequeueException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/TaskNotRegisterException.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/TaskNotRegisterException.kt index ca894165..bc134711 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/TaskNotRegisterException.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/exception/service/TaskNotRegisterException.kt @@ -27,4 +27,4 @@ class TaskNotRegisterException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/ConsumerRepository.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/ConsumerRepository.kt index 34ebb0af..47289625 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/ConsumerRepository.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/ConsumerRepository.kt @@ -19,7 +19,7 @@ package dev.usbharu.owl.broker.domain.model.consumer import java.util.* interface ConsumerRepository { - suspend fun save(consumer: Consumer):Consumer + suspend fun save(consumer: Consumer): Consumer - suspend fun findById(id:UUID):Consumer? -} \ No newline at end of file + suspend fun findById(id: UUID): Consumer? +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/Producer.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/Producer.kt index eebbafd4..4bf96d77 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/Producer.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/Producer.kt @@ -20,9 +20,9 @@ import java.time.Instant import java.util.* data class Producer( - val id:UUID, - val name:String, - val hostname:String, - val registeredTask:List, + val id: UUID, + val name: String, + val hostname: String, + val registeredTask: List, val createdAt: Instant ) diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/ProducerRepository.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/ProducerRepository.kt index 932cef10..67aff038 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/ProducerRepository.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/ProducerRepository.kt @@ -17,5 +17,5 @@ package dev.usbharu.owl.broker.domain.model.producer interface ProducerRepository { - suspend fun save(producer: Producer):Producer -} \ No newline at end of file + suspend fun save(producer: Producer): Producer +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt index 9f3c12a7..0382eff9 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt @@ -21,14 +21,14 @@ import java.time.Instant import java.util.* interface QueuedTaskRepository { - suspend fun save(queuedTask: QueuedTask):QueuedTask + suspend fun save(queuedTask: QueuedTask): QueuedTask /** * トランザクションの代わり */ - suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask + suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List, limit: Int): Flow fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/Task.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/Task.kt index 8bf3a162..03dfb63f 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/Task.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/Task.kt @@ -24,11 +24,11 @@ import java.util.* * @param attempt 失敗を含めて試行した回数 */ data class Task( - val name:String, + val name: String, val id: UUID, - val publishProducerId:UUID, + val publishProducerId: UUID, val publishedAt: Instant, - val nextRetry:Instant, + val nextRetry: Instant, val completedAt: Instant? = null, val attempt: Int, val properties: Map> diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt index 009dea2d..6f6294db 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt @@ -21,15 +21,15 @@ import java.time.Instant import java.util.* interface TaskRepository { - suspend fun save(task: Task):Task + suspend fun save(task: Task): Task - suspend fun saveAll(tasks:List) + suspend fun saveAll(tasks: List) - fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp:Instant): Flow + fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow suspend fun findById(uuid: UUID): Task? - suspend fun findByIdAndUpdate(id:UUID,task: Task) + suspend fun findByIdAndUpdate(id: UUID, task: Task) - suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId:UUID):Flow -} \ No newline at end of file + fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinition.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinition.kt index 8fcb8f1e..a775d727 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinition.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinition.kt @@ -22,5 +22,5 @@ data class TaskDefinition( val maxRetry: Int, val timeoutMilli: Long, val propertyDefinitionHash: Long, - val retryPolicy:String + val retryPolicy: String ) diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinitionRepository.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinitionRepository.kt index 2a1c3c6a..185a70a5 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinitionRepository.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinitionRepository.kt @@ -18,7 +18,7 @@ package dev.usbharu.owl.broker.domain.model.taskdefinition interface TaskDefinitionRepository { suspend fun save(taskDefinition: TaskDefinition): TaskDefinition - suspend fun deleteByName(name:String) + suspend fun deleteByName(name: String) - suspend fun findByName(name:String):TaskDefinition? -} \ No newline at end of file + suspend fun findByName(name: String): TaskDefinition? +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt index b024d04c..b2480e65 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResult.kt @@ -21,7 +21,7 @@ import java.util.* data class TaskResult( val id: UUID, - val taskId:UUID, + val taskId: UUID, val success: Boolean, val attempt: Int, val result: Map>, diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt index 4118cfed..492b2851 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskresult/TaskResultRepository.kt @@ -20,6 +20,6 @@ import kotlinx.coroutines.flow.Flow import java.util.* interface TaskResultRepository { - suspend fun save(taskResult: TaskResult):TaskResult - fun findByTaskId(id:UUID): Flow -} \ No newline at end of file + suspend fun save(taskResult: TaskResult): TaskResult + fun findByTaskId(id: UUID): Flow +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/external/GrpcExtension.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/external/GrpcExtension.kt index 93159174..601cfe5a 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/external/GrpcExtension.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/external/GrpcExtension.kt @@ -32,4 +32,4 @@ fun UUID.toUUID(): Uuid.UUID = Uuid fun Timestamp.toInstant(): Instant = Instant.ofEpochSecond(seconds, nanos.toLong()) -fun Instant.toTimestamp():Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build() \ No newline at end of file +fun Instant.toTimestamp(): Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build() diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt index e3f0752c..f0f46d6a 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt @@ -16,7 +16,6 @@ package dev.usbharu.owl.broker.interfaces.grpc - import dev.usbharu.owl.broker.external.toTimestamp import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.service.QueuedTaskAssigner @@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext - class AssignmentTaskService( coroutineContext: CoroutineContext = EmptyCoroutineContext, private val queuedTaskAssigner: QueuedTaskAssigner, @@ -42,7 +40,6 @@ class AssignmentTaskService( AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) { override fun ready(requests: Flow): Flow { - return try { requests .flatMapMerge { @@ -72,4 +69,4 @@ class AssignmentTaskService( companion object { private val logger = LoggerFactory.getLogger(AssignmentTaskService::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/DefinitionTaskService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/DefinitionTaskService.kt index 59afb0d1..a6436405 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/DefinitionTaskService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/DefinitionTaskService.kt @@ -25,17 +25,20 @@ import dev.usbharu.owl.generated.DefinitionTaskServiceGrpcKt.DefinitionTaskServi import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext -class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineContext,private val registerTaskService: RegisterTaskService) : +class DefinitionTaskService( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + private val registerTaskService: RegisterTaskService +) : DefinitionTaskServiceCoroutineImplBase(coroutineContext) { override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined { registerTaskService.registerTask( TaskDefinition( - request.name, - request.priority, - request.maxRetry, - request.timeoutMilli, - request.propertyDefinitionHash, - request.retryPolicy + name = request.name, + priority = request.priority, + maxRetry = request.maxRetry, + timeoutMilli = request.timeoutMilli, + propertyDefinitionHash = request.propertyDefinitionHash, + retryPolicy = request.retryPolicy ) ) return TaskDefined @@ -50,4 +53,4 @@ class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineC registerTaskService.unregisterTask(request.name) return Empty.getDefaultInstance() } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/ProducerService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/ProducerService.kt index d2157a2d..358d6827 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/ProducerService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/ProducerService.kt @@ -24,18 +24,20 @@ import dev.usbharu.owl.generated.ProducerServiceGrpcKt.ProducerServiceCoroutineI import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext - class ProducerService( coroutineContext: CoroutineContext = EmptyCoroutineContext, private val producerService: ProducerService ) : ProducerServiceCoroutineImplBase(coroutineContext) { - override suspend fun registerProducer(request: ProducerOuterClass.Producer): ProducerOuterClass.RegisterProducerResponse { + override suspend fun registerProducer( + request: ProducerOuterClass.Producer + ): ProducerOuterClass.RegisterProducerResponse { val registerProducer = producerService.registerProducer( RegisterProducerRequest( - request.name, request.hostname + request.name, + request.hostname ) ) return ProducerOuterClass.RegisterProducerResponse.newBuilder().setId(registerProducer.toUUID()).build() } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/SubscribeTaskService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/SubscribeTaskService.kt index d8ed8386..a1eadee4 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/SubscribeTaskService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/SubscribeTaskService.kt @@ -34,4 +34,4 @@ class SubscribeTaskService( consumerService.registerConsumer(RegisterConsumerRequest(request.name, request.hostname, request.tasksList)) return Consumer.SubscribeTaskResponse.newBuilder().setId(id.toUUID()).build() } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt index 8a7bff55..22aff532 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt @@ -39,13 +39,9 @@ class TaskPublishService( TaskPublishServiceCoroutineImplBase(coroutineContext) { override suspend fun publishTask(request: PublishTaskOuterClass.PublishTask): PublishedTask { - logger.warn("aaaaaaaaaaa") - - return try { - val publishedTask = taskPublishService.publishTask( PublishTask( request.name, @@ -61,7 +57,6 @@ class TaskPublishService( } override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishedTasks { - val tasks = request.propertiesArrayList.map { PublishTask( request.name, @@ -79,4 +74,4 @@ class TaskPublishService( private val logger = LoggerFactory.getLogger(dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt index 300d3cf5..4c5411e3 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt @@ -67,4 +67,4 @@ class TaskResultService( companion object { private val logger = LoggerFactory.getLogger(TaskResultService::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt index 5855cc6d..6ac87395 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultSubscribeService.kt @@ -41,16 +41,18 @@ class TaskResultSubscribeService( name = it.name attempt = it.attempt success = it.success - results.addAll(it.results.map { - taskResult { - id = it.taskId.toUUID() - success = it.success - attempt = it.attempt - result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result)) - message = it.message + results.addAll( + it.results.map { + taskResult { + id = it.taskId.toUUID() + success = it.success + attempt = it.attempt + result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result)) + message = it.message + } } - }) + ) } } } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt index 44dfa36a..e58a56b3 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt @@ -42,7 +42,5 @@ class AssignQueuedTaskDeciderImpl( ).take(numberOfConcurrent) ) } - } - -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ConsumerService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ConsumerService.kt index 84b21095..3b7da438 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ConsumerService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ConsumerService.kt @@ -57,4 +57,4 @@ data class RegisterConsumerRequest( val name: String, val hostname: String, val tasks: List -) \ No newline at end of file +) diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt index 0eed7b40..84baaf23 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt @@ -28,4 +28,4 @@ class DefaultPropertySerializerFactory : LongPropertySerializer(), FloatPropertySerializer(), ) - ) \ No newline at end of file + ) diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ProducerService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ProducerService.kt index d781ba45..796d6698 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ProducerService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/ProducerService.kt @@ -26,10 +26,8 @@ interface ProducerService { suspend fun registerProducer(producer: RegisterProducerRequest): UUID } - class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService { override suspend fun registerProducer(producer: RegisterProducerRequest): UUID { - val id = UUID.randomUUID() val saveProducer = Producer( @@ -54,4 +52,4 @@ class ProducerServiceImpl(private val producerRepository: ProducerRepository) : data class RegisterProducerRequest( val name: String, val hostname: String -) \ No newline at end of file +) diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt index 6368c93e..ced1fa20 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueScanner.kt @@ -29,19 +29,14 @@ interface QueueScanner { fun startScan(): Flow } - class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner { - override fun startScan(): Flow { - return flow { - while (currentCoroutineContext().isActive) { - emitAll(scanQueue()) - delay(1000) - } + override fun startScan(): Flow = flow { + while (currentCoroutineContext().isActive) { + emitAll(scanQueue()) + delay(1000) } } - private fun scanQueue(): Flow { - return queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10)) - } - -} \ No newline at end of file + private fun scanQueue(): Flow = + queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10)) +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt index 990e2b76..2dbb6aaa 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt @@ -32,33 +32,24 @@ interface QueueStore { fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow } - class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore { override suspend fun enqueue(queuedTask: QueuedTask) { queuedTaskRepository.save(queuedTask) } - override suspend fun enqueueAll(queuedTaskList: List) { - queuedTaskList.forEach { enqueue(it) } - } + override suspend fun enqueueAll(queuedTaskList: List) = queuedTaskList.forEach { enqueue(it) } override suspend fun dequeue(queuedTask: QueuedTask) { queuedTaskRepository.findByTaskIdAndAssignedConsumerIsNullAndUpdate(queuedTask.task.id, queuedTask) } - override suspend fun dequeueAll(queuedTaskList: List) { - return queuedTaskList.forEach { dequeue(it) } - } + override suspend fun dequeueAll(queuedTaskList: List) = queuedTaskList.forEach { dequeue(it) } override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority( tasks: List, limit: Int - ): Flow { - return queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit) - } + ): Flow = queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit) - override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow { - return queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant) - } - -} \ No newline at end of file + override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow = + queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant) +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt index 33c95413..caa061af 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt @@ -27,7 +27,6 @@ interface QueuedTaskAssigner { fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow } - class QueuedTaskAssignerImpl( private val taskManagementService: TaskManagementService, private val queueStore: QueueStore @@ -49,7 +48,6 @@ class QueuedTaskAssignerImpl( private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? { return try { - val assignedTaskQueue = queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false) logger.trace( @@ -78,4 +76,4 @@ class QueuedTaskAssignerImpl( companion object { private val logger = LoggerFactory.getLogger(QueuedTaskAssignerImpl::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt index 58945d42..e9812bce 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt @@ -24,33 +24,34 @@ import org.slf4j.LoggerFactory interface RegisterTaskService { suspend fun registerTask(taskDefinition: TaskDefinition) - suspend fun unregisterTask(name:String) + suspend fun unregisterTask(name: String) } - class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService { override suspend fun registerTask(taskDefinition: TaskDefinition) { val definedTask = taskDefinitionRepository.findByName(taskDefinition.name) if (definedTask != null) { logger.debug("Task already defined. name: ${taskDefinition.name}") if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) { - throw IncompatibleTaskException("Task ${taskDefinition.name} has already been defined, and the parameters are incompatible.") + throw IncompatibleTaskException( + "Task ${taskDefinition.name} has already been defined, and the parameters are incompatible." + ) } return } taskDefinitionRepository.save(taskDefinition) - logger.info("Register a new task. name: {}",taskDefinition.name) + logger.info("Register a new task. name: {}", taskDefinition.name) } // todo すでにpublish済みのタスクをどうするか決めさせる override suspend fun unregisterTask(name: String) { taskDefinitionRepository.deleteByName(name) - logger.info("Unregister a task. name: {}",name) + logger.info("Unregister a task. name: {}", name) } - companion object{ + companion object { private val logger = LoggerFactory.getLogger(RegisterTaskServiceImpl::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt index 29133704..c076fecd 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory import java.time.Instant import java.util.* - interface TaskManagementService { suspend fun startManagement(coroutineScope: CoroutineScope) @@ -75,13 +74,11 @@ class TaskManagementServiceImpl( } } - override fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow { return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent) } private suspend fun enqueueTask(task: Task): QueuedTask { - val definedTask = taskDefinitionRepository.findByName(task.name) ?: throw TaskNotRegisterException("Task ${task.name} not definition.") @@ -113,7 +110,6 @@ class TaskManagementServiceImpl( queueStore.dequeue(timeoutQueue) - val task = taskRepository.findById(timeoutQueue.task.id) ?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}") val copy = task.copy(attempt = timeoutQueue.attempt) @@ -148,12 +144,10 @@ class TaskManagementServiceImpl( taskResult.taskId, task.copy(completedAt = completedAt, attempt = taskResult.attempt) ) - } override fun subscribeResult(producerId: UUID): Flow { return flow { - while (currentCoroutineContext().isActive) { taskRepository .findByPublishProducerIdAndCompletedAtIsNotNull(producerId) @@ -163,7 +157,7 @@ class TaskManagementServiceImpl( TaskResults( it.name, it.id, - results.any { it.success }, + results.any { taskResult -> taskResult.success }, it.attempt, results ) @@ -171,12 +165,10 @@ class TaskManagementServiceImpl( } delay(500) } - } - } companion object { private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt index a6604d10..d1dfbe56 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt @@ -78,7 +78,6 @@ class TaskPublishServiceImpl( } override suspend fun publishTasks(list: List): List { - val first = list.first() val definition = taskDefinitionRepository.findByName(first.name) @@ -90,14 +89,14 @@ class TaskPublishServiceImpl( val tasks = list.map { Task( - it.name, - UUID.randomUUID(), - first.producerId, - published, - nextRetry, - null, - 0, - it.properties + name = it.name, + id = UUID.randomUUID(), + publishProducerId = first.producerId, + publishedAt = published, + nextRetry = nextRetry, + completedAt = null, + attempt = 0, + properties = it.properties ) } @@ -111,4 +110,4 @@ class TaskPublishServiceImpl( companion object { private val logger = LoggerFactory.getLogger(TaskPublishServiceImpl::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt index a61dbb0c..e4e48d3c 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskResults.kt @@ -20,9 +20,9 @@ import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult import java.util.* data class TaskResults( - val name:String, - val id:UUID, - val success:Boolean, - val attempt:Int, + val name: String, + val id: UUID, + val success: Boolean, + val attempt: Int, val results: List ) diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt index 5d469533..1966a168 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt @@ -49,4 +49,4 @@ class TaskScannerImpl(private val taskRepository: TaskRepository) : companion object { private val logger = LoggerFactory.getLogger(TaskScannerImpl::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/Main.kt b/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/Main.kt index 64eb428e..511cd2b6 100644 --- a/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/Main.kt +++ b/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/Main.kt @@ -18,4 +18,4 @@ package dev.usbharu fun main() { println("Hello World!") -} \ No newline at end of file +} diff --git a/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt b/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt index 6583d4f1..52fbb844 100644 --- a/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt +++ b/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt @@ -45,6 +45,5 @@ class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : Propert Class.forName(string.substringAfter("jackson:").substringBefore(":")) ) ) - } -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/ReflectionUtils.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/ReflectionUtils.kt index f0a6ff90..e72502a6 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/ReflectionUtils.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/ReflectionUtils.kt @@ -23,4 +23,7 @@ val Class<*>.allFields: List superclass.allFields + declaredFields } else { declaredFields.toList() - }.map { it.trySetAccessible();it } \ No newline at end of file + }.map { + it.trySetAccessible() + it + } diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt index b595f31c..3774f1a2 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt @@ -15,19 +15,12 @@ class BooleanPropertyValue(override val value: Boolean) : PropertyValue * */ class BooleanPropertySerializer : PropertySerializer { - override fun isSupported(propertyValue: PropertyValue<*>): Boolean { - return propertyValue.value is Boolean - } + override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Boolean - override fun isSupported(string: String): Boolean { - return string.startsWith("bool:") - } + override fun isSupported(string: String): Boolean = string.startsWith("bool:") - override fun serialize(propertyValue: PropertyValue<*>): String { - return "bool:" + propertyValue.value.toString() - } + override fun serialize(propertyValue: PropertyValue<*>): String = "bool:" + propertyValue.value.toString() - override fun deserialize(string: String): PropertyValue { - return BooleanPropertyValue(string.replace("bool:", "").toBoolean()) - } -} \ No newline at end of file + override fun deserialize(string: String): PropertyValue = + BooleanPropertyValue(string.replace("bool:", "").toBoolean()) +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt index 00d7a3f3..e7ea1e53 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt @@ -23,12 +23,9 @@ package dev.usbharu.owl.common.property */ open class CustomPropertySerializerFactory(private val propertySerializers: Set>) : PropertySerializerFactory { - override fun factory(propertyValue: PropertyValue): PropertySerializer { - return propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer? + override fun factory(propertyValue: PropertyValue): PropertySerializer = + propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer? ?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue") - } - override fun factory(string: String): PropertySerializer<*> { - return propertySerializers.first { it.isSupported(string) } - } -} \ No newline at end of file + override fun factory(string: String): PropertySerializer<*> = propertySerializers.first { it.isSupported(string) } +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt index c201cbaa..035c2114 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt @@ -15,19 +15,12 @@ class DoublePropertyValue(override val value: Double) : PropertyValue() * */ class DoublePropertySerializer : PropertySerializer { - override fun isSupported(propertyValue: PropertyValue<*>): Boolean { - return propertyValue.value is Double - } + override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Double - override fun isSupported(string: String): Boolean { - return string.startsWith("double:") - } + override fun isSupported(string: String): Boolean = string.startsWith("double:") - override fun serialize(propertyValue: PropertyValue<*>): String { - return "double:" + propertyValue.value.toString() - } + override fun serialize(propertyValue: PropertyValue<*>): String = "double:" + propertyValue.value.toString() - override fun deserialize(string: String): PropertyValue { - return DoublePropertyValue(string.replace("double:", "").toDouble()) - } -} \ No newline at end of file + override fun deserialize(string: String): PropertyValue = + DoublePropertyValue(string.replace("double:", "").toDouble()) +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/FloatPropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/FloatPropertyValue.kt index 0f18c832..dbb6b68d 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/FloatPropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/FloatPropertyValue.kt @@ -26,19 +26,12 @@ class FloatPropertyValue(override val value: Float) : PropertyValue() { * */ class FloatPropertySerializer : PropertySerializer { - override fun isSupported(propertyValue: PropertyValue<*>): Boolean { - return propertyValue.value is Float - } + override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Float - override fun isSupported(string: String): Boolean { - return string.startsWith("float:") - } + override fun isSupported(string: String): Boolean = string.startsWith("float:") - override fun serialize(propertyValue: PropertyValue<*>): String { - return "float:" + propertyValue.value.toString() - } + override fun serialize(propertyValue: PropertyValue<*>): String = "float:" + propertyValue.value.toString() - override fun deserialize(string: String): PropertyValue { - return FloatPropertyValue(string.replace("float:", "").toFloat()) - } -} \ No newline at end of file + override fun deserialize(string: String): PropertyValue = + FloatPropertyValue(string.replace("float:", "").toFloat()) +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt index 9b49c39c..c593fc24 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt @@ -31,19 +31,12 @@ class IntegerPropertyValue(override val value: Int) : PropertyValue() { * */ class IntegerPropertySerializer : PropertySerializer { - override fun isSupported(propertyValue: PropertyValue<*>): Boolean { - return propertyValue.value is Int - } + override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Int - override fun isSupported(string: String): Boolean { - return string.startsWith("int32:") - } + override fun isSupported(string: String): Boolean = string.startsWith("int32:") - override fun serialize(propertyValue: PropertyValue<*>): String { - return "int32:" + propertyValue.value.toString() - } + override fun serialize(propertyValue: PropertyValue<*>): String = "int32:" + propertyValue.value.toString() - override fun deserialize(string: String): PropertyValue { - return IntegerPropertyValue(string.replace("int32:", "").toInt()) - } -} \ No newline at end of file + override fun deserialize(string: String): PropertyValue = + IntegerPropertyValue(string.replace("int32:", "").toInt()) +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/LongPropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/LongPropertyValue.kt index 660b0b69..ebbb80df 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/LongPropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/LongPropertyValue.kt @@ -27,19 +27,12 @@ class LongPropertyValue(override val value: Long) : PropertyValue() { * */ class LongPropertySerializer : PropertySerializer { - override fun isSupported(propertyValue: PropertyValue<*>): Boolean { - return propertyValue.value is Long - } + override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Long - override fun isSupported(string: String): Boolean { - return string.startsWith("int64:") - } + override fun isSupported(string: String): Boolean = string.startsWith("int64:") - override fun serialize(propertyValue: PropertyValue<*>): String { - return "int64:" + propertyValue.value.toString() - } + override fun serialize(propertyValue: PropertyValue<*>): String = "int64:" + propertyValue.value.toString() - override fun deserialize(string: String): PropertyValue { - return LongPropertyValue(string.replace("int64:", "").toLong()) - } -} \ No newline at end of file + override fun deserialize(string: String): PropertyValue = + LongPropertyValue(string.replace("int64:", "").toLong()) +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt index 4acc597d..b1d29663 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt @@ -27,4 +27,4 @@ class PropertySerializeException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt index 817f3d24..9ae57a50 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt @@ -59,4 +59,4 @@ object PropertySerializeUtils { } }.toMap() } -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt index 950bd9f4..62e1ceff 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt @@ -53,4 +53,4 @@ interface PropertySerializer { * @return デシリアライズされた[PropertyValue] */ fun deserialize(string: String): PropertyValue -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt index 9983d6cf..22ac4466 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt @@ -37,4 +37,4 @@ interface PropertySerializerFactory { * @return 作成されたシリアライザー */ fun factory(string: String): PropertySerializer<*> -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt index 4259c62f..047e7655 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt @@ -38,4 +38,4 @@ enum class PropertyType { * */ binary -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt index d04ca229..cbedabf3 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt @@ -31,9 +31,5 @@ abstract class PropertyValue { * プロパティの型 */ abstract val type: PropertyType - override fun toString(): String { - return "PropertyValue(value=$value, type=$type)" - } - - -} \ No newline at end of file + override fun toString(): String = "PropertyValue(value=$value, type=$type)" +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt index 5b4cbaa1..edd5c393 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt @@ -15,19 +15,11 @@ class StringPropertyValue(override val value: String) : PropertyValue() * */ class StringPropertyValueSerializer : PropertySerializer { - override fun isSupported(propertyValue: PropertyValue<*>): Boolean { - return propertyValue.value is String - } + override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is String - override fun isSupported(string: String): Boolean { - return string.startsWith("str:") - } + override fun isSupported(string: String): Boolean = string.startsWith("str:") - override fun serialize(propertyValue: PropertyValue<*>): String { - return "str:" + propertyValue.value - } + override fun serialize(propertyValue: PropertyValue<*>): String = "str:" + propertyValue.value - override fun deserialize(string: String): PropertyValue { - return StringPropertyValue(string.replace("str:", "")) - } -} \ No newline at end of file + override fun deserialize(string: String): PropertyValue = StringPropertyValue(string.replace("str:", "")) +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt index b34cacf5..78918a04 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt @@ -13,5 +13,4 @@ import kotlin.math.roundToLong class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy { override fun nextRetry(now: Instant, attempt: Int): Instant = now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds) - -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt index da6e4ec0..755dd45e 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt @@ -33,4 +33,4 @@ interface RetryPolicy { * @return 次のリトライ時刻 */ fun nextRetry(now: Instant, attempt: Int): Instant -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyFactory.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyFactory.kt index 6948e7bf..75590e84 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyFactory.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyFactory.kt @@ -16,7 +16,6 @@ package dev.usbharu.owl.common.retry - import org.slf4j.LoggerFactory interface RetryPolicyFactory { @@ -24,9 +23,7 @@ interface RetryPolicyFactory { } class DefaultRetryPolicyFactory(private val map: Map) : RetryPolicyFactory { - override fun factory(name: String): RetryPolicy { - return map[name] ?: throwException(name) - } + override fun factory(name: String): RetryPolicy = map[name] ?: throwException(name) private fun throwException(name: String): Nothing { logger.warn("RetryPolicy not found. name: {}", name) @@ -36,4 +33,4 @@ class DefaultRetryPolicyFactory(private val map: Map) : Ret companion object { private val logger = LoggerFactory.getLogger(RetryPolicyFactory::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyNotFoundException.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyNotFoundException.kt index dba37f35..d16ed33f 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyNotFoundException.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicyNotFoundException.kt @@ -27,4 +27,4 @@ class RetryPolicyNotFoundException : RuntimeException { enableSuppression, writableStackTrace ) -} \ No newline at end of file +} diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt index cbc96b0b..ed1a2b3f 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt @@ -36,6 +36,4 @@ class PropertyDefinition(val map: Map) : Map( val task: T, val id: UUID, val published: Instant -) \ No newline at end of file +) diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt index f29d5d2d..e83a2807 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt @@ -20,4 +20,4 @@ package dev.usbharu.owl.common.task * タスク * */ -open class Task \ No newline at end of file +open class Task diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt index 5cced498..57b04aed 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt @@ -107,7 +107,6 @@ interface TaskDefinition { * @return デシリアライズされたタスク */ fun deserialize(value: Map>): T { - val task = try { type.getDeclaredConstructor().newInstance() } catch (e: Exception) { @@ -127,4 +126,4 @@ interface TaskDefinition { return task } -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/AbstractTaskRunner.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/AbstractTaskRunner.kt index e1a5125e..18bbff55 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/AbstractTaskRunner.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/AbstractTaskRunner.kt @@ -29,5 +29,4 @@ abstract class AbstractTaskRunner>(private val t } abstract suspend fun typedRun(typedParam: T, taskRequest: TaskRequest): TaskResult - -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt index ee925335..0069465d 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -66,11 +66,13 @@ class Consumer( suspend fun init(name: String, hostname: String) { logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) logger.debug("Registered Tasks: {}", runnerMap.keys) - consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest { - this.name = name - this.hostname = hostname - this.tasks.addAll(runnerMap.keys) - }).id + consumerId = subscribeTaskStub.subscribeTask( + subscribeTaskRequest { + this.name = name + this.hostname = hostname + this.tasks.addAll(runnerMap.keys) + } + ).id logger.info("Success initialize consumer. ConsumerID: {}", consumerId) } @@ -84,78 +86,92 @@ class Consumer( while (isActive) { try { taskResultStub - .tasKResult(flow { - assignmentTaskStub - .ready(flow { - requestTask() - }).onEach { - logger.info("Start Task name: {} id: {}", it.name, it.id) - processing.update { it + 1 } + .tasKResult( + flow { + assignmentTaskStub + .ready( + flow { + requestTask() + } + ).onEach { + logger.info("Start Task name: {} id: {}", it.name, it.id) + processing.update { it + 1 } - try { - val taskResult = runnerMap.getValue(it.name).run( - TaskRequest( + try { + val taskResult = runnerMap.getValue(it.name).run( + TaskRequest( + it.name, + java.util.UUID( + it.id.mostSignificantUuidBits, + it.id.leastSignificantUuidBits + ), + it.attempt, + Instant.ofEpochSecond( + it.queuedAt.seconds, + it.queuedAt.nanos.toLong() + ), + PropertySerializeUtils.deserialize( + propertySerializerFactory, + it.propertiesMap + ) + ) + ) + + emit( + taskResult { + this.success = taskResult.success + this.attempt = it.attempt + this.id = it.id + this.result.putAll( + PropertySerializeUtils.serialize( + propertySerializerFactory, + taskResult.result + ) + ) + this.message = taskResult.message + } + ) + logger.info( + "Success execute task. name: {} success: {}", it.name, - java.util.UUID( - it.id.mostSignificantUuidBits, - it.id.leastSignificantUuidBits - ), - it.attempt, - Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), - PropertySerializeUtils.deserialize( - propertySerializerFactory, - it.propertiesMap - ) + taskResult.success ) - ) - - emit(taskResult { - this.success = taskResult.success - this.attempt = it.attempt - this.id = it.id - this.result.putAll( - PropertySerializeUtils.serialize( - propertySerializerFactory, taskResult.result - ) + logger.debug("TRACE RESULT {}", taskResult) + } catch (e: CancellationException) { + logger.warn("Cancelled execute task.", e) + emit( + taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + } ) - this.message = taskResult.message - }) - logger.info( - "Success execute task. name: {} success: {}", - it.name, - taskResult.success - ) - logger.debug("TRACE RESULT {}", taskResult) - } catch (e: CancellationException) { - logger.warn("Cancelled execute task.", e) - emit(taskResult { - this.success = false - this.attempt = it.attempt - this.id = it.id - this.message = e.localizedMessage - }) - throw e - } catch (e: Exception) { - logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e) - emit(taskResult { - this.success = false - this.attempt = it.attempt - this.id = it.id - this.message = e.localizedMessage - }) - } finally { - logger.debug(" Task name: {} id: {}", it.name, it.id) - processing.update { it - 1 } - concurrent.update { - if (it < 64) { - it + 1 - } else { - 64 + throw e + } catch (e: Exception) { + logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e) + emit( + taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + } + ) + } finally { + logger.debug(" Task name: {} id: {}", it.name, it.id) + processing.update { it - 1 } + concurrent.update { + if (it < 64) { + it + 1 + } else { + 64 + } } } - } - }.flowOn(Dispatchers.Default).collect() - }) + }.flowOn(Dispatchers.Default).collect() + } + ) } catch (e: CancellationException) { throw e } catch (e: Exception) { @@ -171,14 +187,15 @@ class Consumer( while (coroutineScope.isActive) { val andSet = concurrent.getAndUpdate { 0 } - if (andSet != 0) { logger.debug("Request {} tasks.", andSet) try { - emit(readyRequest { - this.consumerId = this@Consumer.consumerId - this.numberOfConcurrent = andSet - }) + emit( + readyRequest { + this.consumerId = this@Consumer.consumerId + this.numberOfConcurrent = andSet + } + ) } catch (e: CancellationException) { throw e } catch (e: Exception) { @@ -206,4 +223,4 @@ class Consumer( companion object { private val logger = LoggerFactory.getLogger(Consumer::class.java) } -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt index 31fba291..9ba808b1 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt @@ -25,5 +25,4 @@ fun main() { standaloneConsumer.init() standaloneConsumer.start() } - -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/ServiceLoaderTaskRunnerLoader.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/ServiceLoaderTaskRunnerLoader.kt index a34fc37b..1a639b81 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/ServiceLoaderTaskRunnerLoader.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/ServiceLoaderTaskRunnerLoader.kt @@ -26,4 +26,4 @@ class ServiceLoaderTaskRunnerLoader : TaskRunnerLoader { override fun load(): Map { return taskRunnerMap } -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt index a9b42635..215d9ca1 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt @@ -90,9 +90,11 @@ class StandaloneConsumer( */ suspend fun start() { consumer.start() - Runtime.getRuntime().addShutdownHook(Thread { - consumer.stop() - }) + Runtime.getRuntime().addShutdownHook( + Thread { + consumer.stop() + } + ) } /** @@ -102,5 +104,4 @@ class StandaloneConsumer( fun stop() { consumer.stop() } - -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt index d11bda43..b4c65667 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt @@ -43,4 +43,4 @@ object StandaloneConsumerConfigLoader { return StandaloneConsumerConfig(address, port, name, hostname, concurrency) } -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt index 006856f2..fa735fc9 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt @@ -30,9 +30,9 @@ import java.util.* * @property properties タスクに渡されたパラメータ */ data class TaskRequest( - val name:String, - val id:UUID, - val attempt:Int, + val name: String, + val id: UUID, + val attempt: Int, val queuedAt: Instant, - val properties:Map> + val properties: Map> ) diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt index 07d4bd9f..d6b6bf78 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt @@ -35,4 +35,4 @@ data class TaskResult( return TaskResult(true, result, "") } } -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt index 613b0166..234a7a68 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt @@ -33,4 +33,4 @@ interface TaskRunner { * @return タスク実行結果 */ suspend fun run(taskRequest: TaskRequest): TaskResult -} \ No newline at end of file +} diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunnerLoader.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunnerLoader.kt index 2581e5de..5b9f211b 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunnerLoader.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunnerLoader.kt @@ -18,4 +18,4 @@ package dev.usbharu.owl.consumer interface TaskRunnerLoader { fun load(): Map -} \ No newline at end of file +} diff --git a/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt b/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt index 0678266e..f0b288cb 100644 --- a/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt +++ b/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt @@ -43,4 +43,4 @@ interface OwlProducerBuilder

{ * @return 作成された[OwlProducer] */ fun build(): P -} \ No newline at end of file +} diff --git a/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt b/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt index efedddf0..b0d11048 100644 --- a/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt +++ b/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt @@ -31,4 +31,4 @@ fun

, C : OwlProducerConfig> OWL( ): P { owlProducerBuilder.apply(owlProducerBuilder.config().apply { configBlock() }) return owlProducerBuilder.build() -} \ No newline at end of file +} diff --git a/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerConfig.kt b/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerConfig.kt index 9334b0fd..82618a45 100644 --- a/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerConfig.kt +++ b/owl/owl-producer/owl-producer-api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerConfig.kt @@ -20,4 +20,4 @@ package dev.usbharu.owl.producer.api * [OwlProducer]の構成 * */ -interface OwlProducerConfig \ No newline at end of file +interface OwlProducerConfig diff --git a/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt b/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt index 065eb28e..62bbb7b1 100644 --- a/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt +++ b/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt @@ -36,10 +36,12 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce override suspend fun start() { producerServiceCoroutineStub = ProducerServiceGrpcKt.ProducerServiceCoroutineStub(defaultOwlProducerConfig.channel) - producerId = producerServiceCoroutineStub.registerProducer(producer { - this.name = defaultOwlProducerConfig.name - this.hostname = defaultOwlProducerConfig.hostname - }).id + producerId = producerServiceCoroutineStub.registerProducer( + producer { + this.name = defaultOwlProducerConfig.name + this.hostname = defaultOwlProducerConfig.hostname + } + ).id defineTaskServiceCoroutineStub = DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub(defaultOwlProducerConfig.channel) @@ -48,17 +50,18 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub(defaultOwlProducerConfig.channel) } - override suspend fun registerTask(taskDefinition: TaskDefinition) { - defineTaskServiceCoroutineStub.register(taskDefinition { - this.producerId = this@DefaultOwlProducer.producerId - this.name = taskDefinition.name - this.maxRetry = taskDefinition.maxRetry - this.priority = taskDefinition.priority - this.retryPolicy = taskDefinition.retryPolicy - this.timeoutMilli = taskDefinition.timeoutMilli - this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash() - }) + defineTaskServiceCoroutineStub.register( + taskDefinition { + this.producerId = this@DefaultOwlProducer.producerId + this.name = taskDefinition.name + this.maxRetry = taskDefinition.maxRetry + this.priority = taskDefinition.priority + this.retryPolicy = taskDefinition.retryPolicy + this.timeoutMilli = taskDefinition.timeoutMilli + this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash() + } + ) } override suspend fun publishTask(task: T): PublishedTask { @@ -91,4 +94,4 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce override suspend fun stop() { defaultOwlProducerConfig.channel.shutdownNow() } -} \ No newline at end of file +} diff --git a/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt b/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt index 4e45e9f4..cc7f6787 100644 --- a/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt +++ b/owl/owl-producer/owl-producer-default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt @@ -44,4 +44,4 @@ class DefaultOwlProducerBuilder : OwlProducerBuilder publishTask(task: T): PublishedTask { - val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition val publishTask = application.get().publishTask( @@ -117,4 +116,4 @@ class EmbeddedOwlProducer( override suspend fun stop() { brokerApplication.stop() } -} \ No newline at end of file +} diff --git a/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerBuilder.kt b/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerBuilder.kt index e92b6fc9..37b75808 100644 --- a/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerBuilder.kt +++ b/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerBuilder.kt @@ -46,7 +46,6 @@ class EmbeddedOwlProducerBuilder : OwlProducerBuilder