From 17ae665441f2733a85eabc4413e3e0818ec8539a Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 15 Apr 2024 11:28:29 +0900 Subject: [PATCH 1/9] =?UTF-8?q?feat:=20wip=20consumer=20Main=E3=81=8B?= =?UTF-8?q?=E3=82=89=E8=B5=B7=E5=8B=95=E3=81=A7=E3=81=8D=E3=82=8B=E3=82=88?= =?UTF-8?q?=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../usbharu/owl/consumer/ConsumerConfig.kt | 2 + .../kotlin/dev/usbharu/owl/consumer/Main.kt | 45 +++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt index 52bca554..3ee08539 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -18,4 +18,6 @@ package dev.usbharu.dev.usbharu.owl.consumer data class ConsumerConfig( val concurrent:Int, + val address: String, + val port: Int ) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt new file mode 100644 index 00000000..785d2808 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig +import dev.usbharu.owl.AssignmentTaskServiceGrpcKt +import dev.usbharu.owl.SubscribeTaskServiceGrpcKt +import dev.usbharu.owl.TaskResultServiceGrpcKt +import dev.usbharu.owl.common.property.CustomPropertySerializerFactory +import io.grpc.ManagedChannelBuilder +import kotlinx.coroutines.runBlocking + +fun main() { + + val consumerConfig = ConsumerConfig(20, "localhost", 50051) + + val channel = ManagedChannelBuilder.forAddress(consumerConfig.address, consumerConfig.port).usePlaintext().build() + val subscribeTaskServiceCoroutineStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) + val assignmentTaskServiceCoroutineStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) + val taskResultServiceCoroutineStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) + val customPropertySerializerFactory = CustomPropertySerializerFactory(emptySet()) + val consumer = Consumer( + subscribeTaskServiceCoroutineStub, assignmentTaskServiceCoroutineStub, taskResultServiceCoroutineStub, + emptyMap(), customPropertySerializerFactory, consumerConfig + ) + + runBlocking { + consumer.start() + } + +} \ No newline at end of file From 416467b5ec09e20d98d1956a3edfe6e72d4f7cbf Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 15 Apr 2024 11:49:05 +0900 Subject: [PATCH 2/9] =?UTF-8?q?feat:=20wip=20consumer=20SPI=E3=81=A7TaskRu?= =?UTF-8?q?nner=E3=82=92=E3=83=AD=E3=83=BC=E3=83=89=E3=81=A7=E3=81=8D?= =?UTF-8?q?=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kotlin/dev/usbharu/owl/consumer/Main.kt | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt index 785d2808..fd8ce00b 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt @@ -17,29 +17,46 @@ package dev.usbharu.owl.consumer import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.AssignmentTaskServiceGrpcKt import dev.usbharu.owl.SubscribeTaskServiceGrpcKt import dev.usbharu.owl.TaskResultServiceGrpcKt import dev.usbharu.owl.common.property.CustomPropertySerializerFactory import io.grpc.ManagedChannelBuilder import kotlinx.coroutines.runBlocking +import java.util.* fun main() { val consumerConfig = ConsumerConfig(20, "localhost", 50051) val channel = ManagedChannelBuilder.forAddress(consumerConfig.address, consumerConfig.port).usePlaintext().build() - val subscribeTaskServiceCoroutineStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) - val assignmentTaskServiceCoroutineStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) - val taskResultServiceCoroutineStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) + val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) + val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) + val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) val customPropertySerializerFactory = CustomPropertySerializerFactory(emptySet()) + + val taskRunnerMap = ServiceLoader + .load(TaskRunner::class.java) + .associateBy { it::class.qualifiedName!! } + .filterNot { it.key.isBlank() } + val consumer = Consumer( - subscribeTaskServiceCoroutineStub, assignmentTaskServiceCoroutineStub, taskResultServiceCoroutineStub, - emptyMap(), customPropertySerializerFactory, consumerConfig + subscribeStub, + assignmentTaskStub, + taskResultStub, + taskRunnerMap, + customPropertySerializerFactory, + consumerConfig ) runBlocking { + consumer.init("consumer", "consumer-1") consumer.start() + + Runtime.getRuntime().addShutdownHook(Thread { + consumer.stop() + }) } } \ No newline at end of file From 786740e2946109daf6375b822e8e283ee1053851 Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 15 Apr 2024 14:46:10 +0900 Subject: [PATCH 3/9] =?UTF-8?q?feat:=20Consumer=E3=81=AE=E3=81=BF=E3=81=A7?= =?UTF-8?q?=E8=B5=B7=E5=8B=95=E3=81=99=E3=82=8B=E3=81=A8=E3=81=8D=E3=81=AE?= =?UTF-8?q?=E3=82=A8=E3=83=B3=E3=83=88=E3=83=AA=E3=83=BC=E3=83=9D=E3=82=A4?= =?UTF-8?q?=E3=83=B3=E3=83=88=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dev/usbharu/owl/consumer/Consumer.kt | 13 ++- .../usbharu/owl/consumer/ConsumerConfig.kt | 6 +- .../kotlin/dev/usbharu/owl/consumer/Main.kt | 39 +-------- .../owl/consumer/StandaloneConsumer.kt | 80 +++++++++++++++++++ .../owl/consumer/StandaloneConsumerConfig.kt | 25 ++++++ .../StandaloneConsumerConfigLoader.kt | 36 +++++++++ 6 files changed, 152 insertions(+), 47 deletions(-) create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt index 1ee51d76..d8a8aadd 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -16,7 +16,6 @@ package dev.usbharu.owl.consumer -import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.* @@ -30,9 +29,9 @@ import java.time.Instant import kotlin.math.max class Consumer( - private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, - private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, - private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, + private val subscribeTaskStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, + private val assignmentTaskStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, + private val taskResultStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, private val runnerMap: Map, private val propertySerializerFactory: PropertySerializerFactory, consumerConfig: ConsumerConfig @@ -47,7 +46,7 @@ class Consumer( suspend fun init(name: String, hostname: String) { logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) logger.debug("Registered Tasks: {}", runnerMap.keys) - consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest { this.name = name this.hostname = hostname this.tasks.addAll(runnerMap.keys) @@ -58,9 +57,9 @@ class Consumer( suspend fun start() { coroutineScope = CoroutineScope(Dispatchers.Default) coroutineScope { - taskResultServiceCoroutineStub + taskResultStub .tasKResult(flow { - assignmentTaskServiceCoroutineStub + assignmentTaskStub .ready(flow { while (coroutineScope.isActive) { val andSet = concurrent.getAndUpdate { 0 } diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt index 3ee08539..9c340af2 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -14,10 +14,8 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer data class ConsumerConfig( - val concurrent:Int, - val address: String, - val port: Int + val concurrent: Int ) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt index fd8ce00b..31fba291 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt @@ -16,47 +16,14 @@ package dev.usbharu.owl.consumer -import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig -import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner -import dev.usbharu.owl.AssignmentTaskServiceGrpcKt -import dev.usbharu.owl.SubscribeTaskServiceGrpcKt -import dev.usbharu.owl.TaskResultServiceGrpcKt -import dev.usbharu.owl.common.property.CustomPropertySerializerFactory -import io.grpc.ManagedChannelBuilder import kotlinx.coroutines.runBlocking -import java.util.* fun main() { - - val consumerConfig = ConsumerConfig(20, "localhost", 50051) - - val channel = ManagedChannelBuilder.forAddress(consumerConfig.address, consumerConfig.port).usePlaintext().build() - val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) - val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) - val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) - val customPropertySerializerFactory = CustomPropertySerializerFactory(emptySet()) - - val taskRunnerMap = ServiceLoader - .load(TaskRunner::class.java) - .associateBy { it::class.qualifiedName!! } - .filterNot { it.key.isBlank() } - - val consumer = Consumer( - subscribeStub, - assignmentTaskStub, - taskResultStub, - taskRunnerMap, - customPropertySerializerFactory, - consumerConfig - ) + val standaloneConsumer = StandaloneConsumer() runBlocking { - consumer.init("consumer", "consumer-1") - consumer.start() - - Runtime.getRuntime().addShutdownHook(Thread { - consumer.stop() - }) + standaloneConsumer.init() + standaloneConsumer.start() } } \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt new file mode 100644 index 00000000..0c430a21 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner +import dev.usbharu.owl.AssignmentTaskServiceGrpcKt +import dev.usbharu.owl.SubscribeTaskServiceGrpcKt +import dev.usbharu.owl.TaskResultServiceGrpcKt +import dev.usbharu.owl.common.property.CustomPropertySerializerFactory +import dev.usbharu.owl.common.property.PropertySerializerFactory +import io.grpc.ManagedChannelBuilder +import java.nio.file.Path +import java.util.* + +class StandaloneConsumer( + private val config: StandaloneConsumerConfig, + private val propertySerializerFactory: PropertySerializerFactory +) { + constructor( + path: Path, propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory( + emptySet() + ) + ) : this(StandaloneConsumerConfigLoader.load(path), propertySerializerFactory) + + constructor(string: String) : this(Path.of(string)) + + constructor() : this(Path.of("consumer.properties")) + + private val channel = ManagedChannelBuilder.forAddress(config.address, config.port) + .usePlaintext() + .build() + + private val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) + private val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) + private val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) + + private val taskRunnerMap = ServiceLoader + .load(TaskRunner::class.java) + .associateBy { it::class.qualifiedName!! } + .filterNot { it.key.isBlank() } + + private val consumer = Consumer( + subscribeStub, + assignmentTaskStub, + taskResultStub, + taskRunnerMap, + propertySerializerFactory, + ConsumerConfig(config.concurrency) + ) + + suspend fun init() { + consumer.init(config.name, config.hostname) + } + + suspend fun start() { + consumer.start() + Runtime.getRuntime().addShutdownHook(Thread { + consumer.stop() + }) + } + + fun stop() { + consumer.stop() + } + +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt new file mode 100644 index 00000000..b551a0f4 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +data class StandaloneConsumerConfig( + val address: String, + val port: Int, + val name: String, + val hostname: String, + val concurrency: Int, +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt new file mode 100644 index 00000000..0fff6235 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import java.nio.file.Files +import java.nio.file.Path +import java.util.* + +object StandaloneConsumerConfigLoader { + fun load(path: Path): StandaloneConsumerConfig { + val properties = Properties() + + properties.load(Files.newInputStream(path)) + + val address = properties.getProperty("address") + val port = properties.getProperty("port").toInt() + val name = properties.getProperty("name") + val hostname = properties.getProperty("hostname") + + return StandaloneConsumerConfig(address, port, name, hostname) + } +} \ No newline at end of file From 86d0366ab5cd66e6596bd51cc0c108da0e886ea8 Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 15 Apr 2024 14:52:01 +0900 Subject: [PATCH 4/9] =?UTF-8?q?fix:=20=E3=83=91=E3=83=A9=E3=83=A1=E3=83=BC?= =?UTF-8?q?=E3=82=BF=E3=83=BC=E3=82=92=E5=BF=98=E3=82=8C=E3=81=A6=E3=81=9F?= =?UTF-8?q?=E3=81=AE=E3=82=92=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt index 0fff6235..2ee599ca 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt @@ -30,7 +30,8 @@ object StandaloneConsumerConfigLoader { val port = properties.getProperty("port").toInt() val name = properties.getProperty("name") val hostname = properties.getProperty("hostname") + val concurrency = properties.getProperty("concurrency").toInt() - return StandaloneConsumerConfig(address, port, name, hostname) + return StandaloneConsumerConfig(address, port, name, hostname, concurrency) } } \ No newline at end of file From acd3555f97b9478da3035684b7234ec5c5f2a416 Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 15 Apr 2024 15:05:51 +0900 Subject: [PATCH 5/9] =?UTF-8?q?fix:=20=E3=83=91=E3=83=83=E3=82=B1=E3=83=BC?= =?UTF-8?q?=E3=82=B8=E5=90=8D=E3=82=92=E4=BF=AE=E6=AD=A3=20map=E3=81=AB?= =?UTF-8?q?=E6=A0=BC=E7=B4=8D=E3=81=99=E3=82=8B=E3=81=A8=E3=81=8D=E3=81=AE?= =?UTF-8?q?=E3=82=AD=E3=83=BC=E5=90=8D=E3=82=92=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt | 2 -- .../kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt | 4 +--- .../main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt | 2 +- .../src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt | 2 +- .../src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt | 7 ++++--- 5 files changed, 7 insertions(+), 10 deletions(-) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt index d8a8aadd..0670500e 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -16,8 +16,6 @@ package dev.usbharu.owl.consumer -import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest -import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.* import dev.usbharu.owl.Uuid.UUID import dev.usbharu.owl.common.property.PropertySerializeUtils diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt index 0c430a21..2bdfd87c 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt @@ -16,7 +16,6 @@ package dev.usbharu.owl.consumer -import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.AssignmentTaskServiceGrpcKt import dev.usbharu.owl.SubscribeTaskServiceGrpcKt import dev.usbharu.owl.TaskResultServiceGrpcKt @@ -50,8 +49,7 @@ class StandaloneConsumer( private val taskRunnerMap = ServiceLoader .load(TaskRunner::class.java) - .associateBy { it::class.qualifiedName!! } - .filterNot { it.key.isBlank() } + .associateBy { it.name } private val consumer = Consumer( subscribeStub, diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt index d66e9a73..db3a3e4c 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer import dev.usbharu.owl.common.property.PropertyValue import java.time.Instant diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt index 20858a9b..0c4f5d33 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer import dev.usbharu.owl.common.property.PropertyValue diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt index 44513ec1..b772101f 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt @@ -14,8 +14,9 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer -fun interface TaskRunner { - suspend fun run(taskRequest: TaskRequest):TaskResult +interface TaskRunner { + val name: String + suspend fun run(taskRequest: TaskRequest): TaskResult } \ No newline at end of file From 5349b1a060f1ef9f1b5f81edeb5cccb22925e53c Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 22 Apr 2024 10:58:44 +0900 Subject: [PATCH 6/9] =?UTF-8?q?doc:=20OwlProducer=E3=81=AE=E3=83=89?= =?UTF-8?q?=E3=82=AD=E3=83=A5=E3=83=A1=E3=83=B3=E3=83=88=E3=82=92=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../usbharu/owl/producer/api/OwlProducer.kt | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt index b15d1d28..21495894 100644 --- a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt +++ b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt @@ -20,10 +20,32 @@ import dev.usbharu.owl.common.task.PublishedTask import dev.usbharu.owl.common.task.Task import dev.usbharu.owl.common.task.TaskDefinition +/** + * タスクを発生させるクライアント + * + */ interface OwlProducer { + /** + * Producerを開始します + * + */ suspend fun start() + /** + * タスク定義を登録します + * + * @param T 登録するタスク + * @param taskDefinition 登録するタスクの定義 + */ suspend fun registerTask(taskDefinition: TaskDefinition) + + /** + * タスクを公開します。タスクは定義済みである必要があります。 + * + * @param T 公開するタスク + * @param task タスクの詳細 + * @return 公開されたタスク + */ suspend fun publishTask(task: T): PublishedTask } From 4e7600d3bf486c27e6a145e4e9240a1096b9b183 Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 22 Apr 2024 12:28:37 +0900 Subject: [PATCH 7/9] =?UTF-8?q?doc:=20=E3=83=89=E3=82=AD=E3=83=A5=E3=83=A1?= =?UTF-8?q?=E3=83=B3=E3=83=88=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../owl/common/property/PropertyValue.kt | 5 +++ .../owl/consumer/StandaloneConsumer.kt | 34 +++++++++++++++---- .../owl/consumer/StandaloneConsumerConfig.kt | 9 +++++ .../StandaloneConsumerConfigLoader.kt | 9 +++++ .../dev/usbharu/owl/consumer/TaskResult.kt | 7 ++++ 5 files changed, 57 insertions(+), 7 deletions(-) diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt index 440aa356..f5fc04f7 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt @@ -16,6 +16,11 @@ package dev.usbharu.owl.common.property +/** + * プロパティで使用される値 + * + * @param T プロパティの型 + */ sealed class PropertyValue { abstract val value: T abstract val type: PropertyType diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt index 2bdfd87c..7d1cf295 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt @@ -25,12 +25,19 @@ import io.grpc.ManagedChannelBuilder import java.nio.file.Path import java.util.* +/** + * 単独で起動できるConsumer + * + * @property config Consumerの起動構成 + * @property propertySerializerFactory [dev.usbharu.owl.common.property.PropertyValue]のシリアライザーのファクトリ + */ class StandaloneConsumer( private val config: StandaloneConsumerConfig, private val propertySerializerFactory: PropertySerializerFactory ) { constructor( - path: Path, propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory( + path: Path, + propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory( emptySet() ) ) : this(StandaloneConsumerConfigLoader.load(path), propertySerializerFactory) @@ -52,18 +59,27 @@ class StandaloneConsumer( .associateBy { it.name } private val consumer = Consumer( - subscribeStub, - assignmentTaskStub, - taskResultStub, - taskRunnerMap, - propertySerializerFactory, - ConsumerConfig(config.concurrency) + subscribeTaskStub = subscribeStub, + assignmentTaskStub = assignmentTaskStub, + taskResultStub = taskResultStub, + runnerMap = taskRunnerMap, + propertySerializerFactory = propertySerializerFactory, + consumerConfig = ConsumerConfig(config.concurrency) ) + /** + * Consumerを初期化します + * + */ suspend fun init() { consumer.init(config.name, config.hostname) } + /** + * Consumerのワーカーを起動し、タスクの受付を開始します。 + * + * シャットダウンフックに[stop]が登録されます。 + */ suspend fun start() { consumer.start() Runtime.getRuntime().addShutdownHook(Thread { @@ -71,6 +87,10 @@ class StandaloneConsumer( }) } + /** + * Consumerを停止します + * + */ fun stop() { consumer.stop() } diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt index b551a0f4..ff31dde8 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt @@ -16,6 +16,15 @@ package dev.usbharu.owl.consumer +/** + * 単独で起動できるConsumerの構成 + * + * @property address brokerのアドレス + * @property port brokerのポート + * @property name Consumerの名前 + * @property hostname Consumerのホスト名 + * @property concurrency ConsumerのWorkerの最大同時実行数 + */ data class StandaloneConsumerConfig( val address: String, val port: Int, diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt index 2ee599ca..d11bda43 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt @@ -20,7 +20,16 @@ import java.nio.file.Files import java.nio.file.Path import java.util.* +/** + * 単独で起動できるConsumerの構成のローダー + */ object StandaloneConsumerConfigLoader { + /** + * [Path]から構成を読み込みます + * + * @param path 読み込むパス + * @return 読み込まれた構成 + */ fun load(path: Path): StandaloneConsumerConfig { val properties = Properties() diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt index 0c4f5d33..3e21dfb9 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt @@ -18,6 +18,13 @@ package dev.usbharu.owl.consumer import dev.usbharu.owl.common.property.PropertyValue +/** + * タスクの実行結果 + * + * @property success 成功したらtrue + * @property result タスクの実行結果のMap + * @property message その他メッセージ + */ data class TaskResult( val success: Boolean, val result: Map>, From 3315fe20e12ead5c2df72d931a8f3c57d1188218 Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 22 Apr 2024 14:58:26 +0900 Subject: [PATCH 8/9] =?UTF-8?q?doc:=20=E3=83=89=E3=82=AD=E3=83=A5=E3=83=A1?= =?UTF-8?q?=E3=83=B3=E3=83=88=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CustomPropertySerializerFactory.kt | 6 +++- .../owl/common/property/PropertySerializer.kt | 31 +++++++++++++++++++ .../property/PropertySerializerFactory.kt | 18 +++++++++++ .../dev/usbharu/owl/consumer/Consumer.kt | 28 +++++++++++++++++ .../usbharu/owl/consumer/ConsumerConfig.kt | 5 +++ .../dev/usbharu/owl/consumer/TaskRequest.kt | 9 ++++++ .../dev/usbharu/owl/consumer/TaskRunner.kt | 14 +++++++++ .../defaultimpl/DefaultOwlProducer.kt | 2 +- .../defaultimpl/DefaultOwlProducerBuilder.kt | 2 ++ .../defaultimpl/DefaultOwlProducerConfig.kt | 2 +- 10 files changed, 114 insertions(+), 3 deletions(-) diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt index 3f3e8263..c1d0537b 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt @@ -16,7 +16,11 @@ package dev.usbharu.owl.common.property - +/** + * [Set]でカスタマイズできる[PropertySerializerFactory] + * + * @property propertySerializers [PropertySerializer]の[Set] + */ open class CustomPropertySerializerFactory(private val propertySerializers: Set>) : PropertySerializerFactory { override fun factory(propertyValue: PropertyValue): PropertySerializer { diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt index b1c3bb53..950bd9f4 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt @@ -16,10 +16,41 @@ package dev.usbharu.owl.common.property +/** + * [PropertyValue]をシリアライズ・デシリアライズします + * + * @param T [PropertyValue]の型 + */ interface PropertySerializer { + /** + * [PropertyValue]をサポートしているかを確認します + * + * @param propertyValue 確認する[PropertyValue] + * @return サポートしている場合true + */ fun isSupported(propertyValue: PropertyValue<*>): Boolean + + /** + * シリアライズ済みの[PropertyValue]から[PropertyValue]をサポートしているかを確認します + * + * @param string 確認するシリアライズ済みの[PropertyValue] + * @return サポートしている場合true + */ fun isSupported(string: String): Boolean + + /** + * [PropertyValue]をシリアライズします + * + * @param propertyValue シリアライズする[PropertyValue] + * @return シリアライズ済みの[PropertyValue] + */ fun serialize(propertyValue: PropertyValue<*>): String + /** + * デシリアライズします + * + * @param string シリアライズ済みの[PropertyValue] + * @return デシリアライズされた[PropertyValue] + */ fun deserialize(string: String): PropertyValue } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt index bc18d270..9983d6cf 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt @@ -16,7 +16,25 @@ package dev.usbharu.owl.common.property +/** + * [PropertyValue]のシリアライザーのファクトリ + * + */ interface PropertySerializerFactory { + /** + * [PropertyValue]からシリアライザーを作成します + * + * @param T [PropertyValue]の型 + * @param propertyValue シリアライザーを作成する[PropertyValue] + * @return 作成されたシリアライザー + */ fun factory(propertyValue: PropertyValue): PropertySerializer + + /** + * シリアライズ済みの[PropertyValue]からシリアライザーを作成します + * + * @param string シリアライズ済みの[PropertyValue] + * @return 作成されたシリアライザー + */ fun factory(string: String): PropertySerializer<*> } \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt index 0670500e..40b44201 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -26,6 +26,19 @@ import org.slf4j.LoggerFactory import java.time.Instant import kotlin.math.max +/** + * Consumer + * + * @property subscribeTaskStub + * @property assignmentTaskStub + * @property taskResultStub + * @property runnerMap + * @property propertySerializerFactory + * @constructor + * TODO + * + * @param consumerConfig + */ class Consumer( private val subscribeTaskStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, private val assignmentTaskStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, @@ -41,6 +54,13 @@ class Consumer( private val concurrent = MutableStateFlow(consumerConfig.concurrent) private val processing = MutableStateFlow(0) + + /** + * Consumerを初期化します + * + * @param name Consumer名 + * @param hostname Consumerのホスト名 + */ suspend fun init(name: String, hostname: String) { logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) logger.debug("Registered Tasks: {}", runnerMap.keys) @@ -52,6 +72,10 @@ class Consumer( logger.info("Success initialize consumer. ConsumerID: {}", consumerId) } + /** + * タスクの受付を開始します + * + */ suspend fun start() { coroutineScope = CoroutineScope(Dispatchers.Default) coroutineScope { @@ -138,6 +162,10 @@ class Consumer( } } + /** + * タスクの受付を停止します + * + */ fun stop() { logger.info("Stop Consumer. consumerID: {}", consumerId) coroutineScope.cancel() diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt index 9c340af2..a4609e51 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -16,6 +16,11 @@ package dev.usbharu.owl.consumer +/** + * Consumerの構成 + * + * @property concurrent Consumerのワーカーの同時実行数 + */ data class ConsumerConfig( val concurrent: Int ) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt index db3a3e4c..006856f2 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt @@ -20,6 +20,15 @@ import dev.usbharu.owl.common.property.PropertyValue import java.time.Instant import java.util.* +/** + * タスクをConsumerに要求します + * + * @property name タスク名 + * @property id タスクID + * @property attempt 試行回数 + * @property queuedAt タスクがキューに入れられた時間 + * @property properties タスクに渡されたパラメータ + */ data class TaskRequest( val name:String, val id:UUID, diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt index b772101f..613b0166 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt @@ -16,7 +16,21 @@ package dev.usbharu.owl.consumer +/** + * タスクを実行するランナー + * + */ interface TaskRunner { + /** + * 実行するタスク名 + */ val name: String + + /** + * タスクを実行する + * + * @param taskRequest 実行するタスク + * @return タスク実行結果 + */ suspend fun run(taskRequest: TaskRequest): TaskResult } \ No newline at end of file diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt index 573fba71..33a14e34 100644 --- a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.producer.defaultimpl +package dev.usbharu.owl.producer.defaultimpl import com.google.protobuf.timestamp import dev.usbharu.owl.* diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt index 8ebaaf1c..8100088f 100644 --- a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt @@ -17,6 +17,8 @@ package dev.usbharu.dev.usbharu.owl.producer.defaultimpl import dev.usbharu.owl.producer.api.OwlProducerBuilder +import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducer +import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducerConfig import io.grpc.ManagedChannelBuilder class DefaultOwlProducerBuilder : OwlProducerBuilder { diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt index 527e1d04..a5eaddf6 100644 --- a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.producer.defaultimpl +package dev.usbharu.owl.producer.defaultimpl import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.producer.api.OwlProducerConfig From c6e40caf502b8d74a0b1878c1b26c4f657b6aa4b Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 22 Apr 2024 15:58:51 +0900 Subject: [PATCH 9/9] =?UTF-8?q?doc:=20=E3=83=89=E3=82=AD=E3=83=A5=E3=83=A1?= =?UTF-8?q?=E3=83=B3=E3=83=88=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/property/BooleanPropertyValue.kt | 9 ++++ .../common/property/DoublePropertyValue.kt | 9 ++++ .../common/property/IntegerPropertyValue.kt | 9 ++++ .../common/property/PropertySerializeUtils.kt | 17 +++++++ .../owl/common/property/PropertyType.kt | 18 +++++++ .../owl/common/property/PropertyValue.kt | 7 +++ .../common/property/StringPropertyValue.kt | 9 ++++ .../common/retry/ExponentialRetryPolicy.kt | 8 +++- .../usbharu/owl/common/retry/RetryPolicy.kt | 13 +++++ .../owl/common/task/PropertyDefinition.kt | 12 +++++ .../usbharu/owl/common/task/PublishedTask.kt | 10 +++- .../dev/usbharu/owl/common/task/Task.kt | 4 ++ .../usbharu/owl/common/task/TaskDefinition.kt | 47 +++++++++++++++++++ 13 files changed, 170 insertions(+), 2 deletions(-) diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt index 0a04685a..b595f31c 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt @@ -1,10 +1,19 @@ package dev.usbharu.owl.common.property +/** + * Boolean型のプロパティ + * + * @property value プロパティ + */ class BooleanPropertyValue(override val value: Boolean) : PropertyValue() { override val type: PropertyType get() = PropertyType.binary } +/** + * [BooleanPropertyValue]のシリアライザー + * + */ class BooleanPropertySerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is Boolean diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt index 13156f20..c201cbaa 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt @@ -1,10 +1,19 @@ package dev.usbharu.owl.common.property +/** + * Double型のプロパティ + * + * @property value プロパティ + */ class DoublePropertyValue(override val value: Double) : PropertyValue() { override val type: PropertyType get() = PropertyType.number } +/** + * [DoublePropertyValue]のシリアライザー + * + */ class DoublePropertySerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is Double diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt index 42782069..9b49c39c 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt @@ -16,11 +16,20 @@ package dev.usbharu.owl.common.property +/** + * Integer型のプロパティ + * + * @property value プロパティ + */ class IntegerPropertyValue(override val value: Int) : PropertyValue() { override val type: PropertyType get() = PropertyType.number } +/** + * [IntegerPropertyValue]のシリアライザー + * + */ class IntegerPropertySerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is Int diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt index 38d7a4b8..248e63f2 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt @@ -16,13 +16,30 @@ package dev.usbharu.owl.common.property +/** + * [PropertySerializer]のユーティリティークラス + */ object PropertySerializeUtils { + /** + * Stringと[PropertyValue]の[Map]から[PropertyValue]をシリアライズし、StringとStringの[Map]として返します + * + * @param serializerFactory シリアライズに使用する[PropertySerializerFactory] + * @param properties シリアライズする[Map] + * @return Stringとシリアライズ済みの[PropertyValue]の[Map] + */ fun serialize( serializerFactory: PropertySerializerFactory, properties: Map> ): Map = properties.map { it.key to serializerFactory.factory(it.value).serialize(it.value) }.toMap() + /** + * Stringとシリアライズ済みの[PropertyValue]の[Map]からシリアライズ済みの[PropertyValue]をデシリアライズし、Stringと[PropertyValue]の[Map]として返します + * + * @param serializerFactory デシリアライズに使用する[PropertySerializerFactory] + * @param properties デシリアライズする[Map] + * @return Stringと[PropertyValue]の[Map] + */ fun deserialize( serializerFactory: PropertySerializerFactory, properties: Map diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt index c9dabae5..4259c62f 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt @@ -16,8 +16,26 @@ package dev.usbharu.owl.common.property +/** + * プロパティの型 + * + */ enum class PropertyType { + /** + * 数字 + * + */ number, + + /** + * 文字列 + * + */ string, + + /** + * バイナリ + * + */ binary } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt index f5fc04f7..c251c54f 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt @@ -22,6 +22,13 @@ package dev.usbharu.owl.common.property * @param T プロパティの型 */ sealed class PropertyValue { + /** + * プロパティ + */ abstract val value: T + + /** + * プロパティの型 + */ abstract val type: PropertyType } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt index a7030d22..5b4cbaa1 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt @@ -1,10 +1,19 @@ package dev.usbharu.owl.common.property +/** + * String型のプロパティ + * + * @property value プロパティ + */ class StringPropertyValue(override val value: String) : PropertyValue() { override val type: PropertyType get() = PropertyType.string } +/** + * [StringPropertyValue]のシリアライザー + * + */ class StringPropertyValueSerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is String diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt b/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt index 753746db..b34cacf5 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt @@ -4,8 +4,14 @@ import java.time.Instant import kotlin.math.pow import kotlin.math.roundToLong +/** + * 指数関数的に待機時間が増えるリトライポリシー + * `firstRetrySeconds x attempt ^ 2 - firstRetrySeconds` + * + * @property firstRetrySeconds + */ class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy { override fun nextRetry(now: Instant, attempt: Int): Instant = - now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - 30) + now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds) } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt b/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt index 04a73a00..da6e4ec0 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt @@ -18,6 +18,19 @@ package dev.usbharu.owl.common.retry import java.time.Instant +/** + * リトライポリシー + * + */ interface RetryPolicy { + /** + * 次のリトライ時刻を返します。 + * + * [attempt]を負の値にしてはいけません + * + * @param now 現在の時刻 + * @param attempt 試行回数 + * @return 次のリトライ時刻 + */ fun nextRetry(now: Instant, attempt: Int): Instant } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt index 11f8dcda..cbc96b0b 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt @@ -18,7 +18,19 @@ package dev.usbharu.owl.common.task import dev.usbharu.owl.common.property.PropertyType +/** + * プロパティ定義 + * + * @property map プロパティ名とプロパティタイプの[Map] + */ class PropertyDefinition(val map: Map) : Map by map { + /** + * プロパティ定義のハッシュを求めます + * + * ハッシュ値はプロパティ名とプロパティタイプ名を結合したものを結合し、各文字のUTF-16コードと31を掛け続けたものです。 + * + * @return + */ fun hash(): Long { var hash = 1L map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 } diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt index 57d55ecb..a0f944e5 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt @@ -17,8 +17,16 @@ package dev.usbharu.owl.common.task import java.time.Instant -import java.util.UUID +import java.util.* +/** + * 公開済みのタスク + * + * @param T タスク + * @property task タスク + * @property id タスクのID + * @property published 公開された時刻 + */ data class PublishedTask( val task: T, val id: UUID, diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt index 2f196e8e..42d8dc03 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt @@ -16,5 +16,9 @@ package dev.usbharu.owl.common.task +/** + * タスク + * + */ open class Task { } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt index d62b94de..b96c12de 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt @@ -18,15 +18,62 @@ package dev.usbharu.owl.common.task import dev.usbharu.owl.common.property.PropertyValue +/** + * タスク定義 + * + * @param T タスク + */ interface TaskDefinition { + /** + * タスク名 + */ val name: String + + /** + * 優先度 + */ val priority: Int + + /** + * 最大リトライ数 + */ val maxRetry: Int + + /** + * リトライポリシー名 + * + * ポリシーの解決は各Brokerに依存しています + */ val retryPolicy: String + + /** + * タスク実行時のタイムアウト(ミリ秒) + */ val timeoutMilli: Long + + /** + * プロパティ定義 + */ val propertyDefinition: PropertyDefinition + + /** + * [Task]の[Class] + */ val type: Class + /** + * タスクをシリアライズします. + * プロパティのシリアライズと混同しないようにしてください。 + * @param task シリアライズするタスク + * @return シリアライズされたタスク + */ fun serialize(task: T): Map> + + /** + * タスクをデシリアライズします。 + * プロパティのデシリアライズと混同しないようにしてください + * @param value デシリアライズするタスク + * @return デシリアライズされたタスク + */ fun deserialize(value: Map>): T } \ No newline at end of file