diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt index 0a04685a..b595f31c 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/BooleanPropertyValue.kt @@ -1,10 +1,19 @@ package dev.usbharu.owl.common.property +/** + * Boolean型のプロパティ + * + * @property value プロパティ + */ class BooleanPropertyValue(override val value: Boolean) : PropertyValue() { override val type: PropertyType get() = PropertyType.binary } +/** + * [BooleanPropertyValue]のシリアライザー + * + */ class BooleanPropertySerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is Boolean diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt index 3f3e8263..c1d0537b 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt @@ -16,7 +16,11 @@ package dev.usbharu.owl.common.property - +/** + * [Set]でカスタマイズできる[PropertySerializerFactory] + * + * @property propertySerializers [PropertySerializer]の[Set] + */ open class CustomPropertySerializerFactory(private val propertySerializers: Set>) : PropertySerializerFactory { override fun factory(propertyValue: PropertyValue): PropertySerializer { diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt index 13156f20..c201cbaa 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/DoublePropertyValue.kt @@ -1,10 +1,19 @@ package dev.usbharu.owl.common.property +/** + * Double型のプロパティ + * + * @property value プロパティ + */ class DoublePropertyValue(override val value: Double) : PropertyValue() { override val type: PropertyType get() = PropertyType.number } +/** + * [DoublePropertyValue]のシリアライザー + * + */ class DoublePropertySerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is Double diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt index 42782069..9b49c39c 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt @@ -16,11 +16,20 @@ package dev.usbharu.owl.common.property +/** + * Integer型のプロパティ + * + * @property value プロパティ + */ class IntegerPropertyValue(override val value: Int) : PropertyValue() { override val type: PropertyType get() = PropertyType.number } +/** + * [IntegerPropertyValue]のシリアライザー + * + */ class IntegerPropertySerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is Int diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt index 38d7a4b8..248e63f2 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt @@ -16,13 +16,30 @@ package dev.usbharu.owl.common.property +/** + * [PropertySerializer]のユーティリティークラス + */ object PropertySerializeUtils { + /** + * Stringと[PropertyValue]の[Map]から[PropertyValue]をシリアライズし、StringとStringの[Map]として返します + * + * @param serializerFactory シリアライズに使用する[PropertySerializerFactory] + * @param properties シリアライズする[Map] + * @return Stringとシリアライズ済みの[PropertyValue]の[Map] + */ fun serialize( serializerFactory: PropertySerializerFactory, properties: Map> ): Map = properties.map { it.key to serializerFactory.factory(it.value).serialize(it.value) }.toMap() + /** + * Stringとシリアライズ済みの[PropertyValue]の[Map]からシリアライズ済みの[PropertyValue]をデシリアライズし、Stringと[PropertyValue]の[Map]として返します + * + * @param serializerFactory デシリアライズに使用する[PropertySerializerFactory] + * @param properties デシリアライズする[Map] + * @return Stringと[PropertyValue]の[Map] + */ fun deserialize( serializerFactory: PropertySerializerFactory, properties: Map diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt index b1c3bb53..950bd9f4 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt @@ -16,10 +16,41 @@ package dev.usbharu.owl.common.property +/** + * [PropertyValue]をシリアライズ・デシリアライズします + * + * @param T [PropertyValue]の型 + */ interface PropertySerializer { + /** + * [PropertyValue]をサポートしているかを確認します + * + * @param propertyValue 確認する[PropertyValue] + * @return サポートしている場合true + */ fun isSupported(propertyValue: PropertyValue<*>): Boolean + + /** + * シリアライズ済みの[PropertyValue]から[PropertyValue]をサポートしているかを確認します + * + * @param string 確認するシリアライズ済みの[PropertyValue] + * @return サポートしている場合true + */ fun isSupported(string: String): Boolean + + /** + * [PropertyValue]をシリアライズします + * + * @param propertyValue シリアライズする[PropertyValue] + * @return シリアライズ済みの[PropertyValue] + */ fun serialize(propertyValue: PropertyValue<*>): String + /** + * デシリアライズします + * + * @param string シリアライズ済みの[PropertyValue] + * @return デシリアライズされた[PropertyValue] + */ fun deserialize(string: String): PropertyValue } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt index bc18d270..9983d6cf 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt @@ -16,7 +16,25 @@ package dev.usbharu.owl.common.property +/** + * [PropertyValue]のシリアライザーのファクトリ + * + */ interface PropertySerializerFactory { + /** + * [PropertyValue]からシリアライザーを作成します + * + * @param T [PropertyValue]の型 + * @param propertyValue シリアライザーを作成する[PropertyValue] + * @return 作成されたシリアライザー + */ fun factory(propertyValue: PropertyValue): PropertySerializer + + /** + * シリアライズ済みの[PropertyValue]からシリアライザーを作成します + * + * @param string シリアライズ済みの[PropertyValue] + * @return 作成されたシリアライザー + */ fun factory(string: String): PropertySerializer<*> } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt index c9dabae5..4259c62f 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt @@ -16,8 +16,26 @@ package dev.usbharu.owl.common.property +/** + * プロパティの型 + * + */ enum class PropertyType { + /** + * 数字 + * + */ number, + + /** + * 文字列 + * + */ string, + + /** + * バイナリ + * + */ binary } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt index 440aa356..c251c54f 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt @@ -16,7 +16,19 @@ package dev.usbharu.owl.common.property +/** + * プロパティで使用される値 + * + * @param T プロパティの型 + */ sealed class PropertyValue { + /** + * プロパティ + */ abstract val value: T + + /** + * プロパティの型 + */ abstract val type: PropertyType } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt index a7030d22..5b4cbaa1 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/StringPropertyValue.kt @@ -1,10 +1,19 @@ package dev.usbharu.owl.common.property +/** + * String型のプロパティ + * + * @property value プロパティ + */ class StringPropertyValue(override val value: String) : PropertyValue() { override val type: PropertyType get() = PropertyType.string } +/** + * [StringPropertyValue]のシリアライザー + * + */ class StringPropertyValueSerializer : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { return propertyValue.value is String diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt b/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt index 753746db..b34cacf5 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/retry/ExponentialRetryPolicy.kt @@ -4,8 +4,14 @@ import java.time.Instant import kotlin.math.pow import kotlin.math.roundToLong +/** + * 指数関数的に待機時間が増えるリトライポリシー + * `firstRetrySeconds x attempt ^ 2 - firstRetrySeconds` + * + * @property firstRetrySeconds + */ class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy { override fun nextRetry(now: Instant, attempt: Int): Instant = - now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - 30) + now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds) } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt b/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt index 04a73a00..da6e4ec0 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt @@ -18,6 +18,19 @@ package dev.usbharu.owl.common.retry import java.time.Instant +/** + * リトライポリシー + * + */ interface RetryPolicy { + /** + * 次のリトライ時刻を返します。 + * + * [attempt]を負の値にしてはいけません + * + * @param now 現在の時刻 + * @param attempt 試行回数 + * @return 次のリトライ時刻 + */ fun nextRetry(now: Instant, attempt: Int): Instant } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt index 11f8dcda..cbc96b0b 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt @@ -18,7 +18,19 @@ package dev.usbharu.owl.common.task import dev.usbharu.owl.common.property.PropertyType +/** + * プロパティ定義 + * + * @property map プロパティ名とプロパティタイプの[Map] + */ class PropertyDefinition(val map: Map) : Map by map { + /** + * プロパティ定義のハッシュを求めます + * + * ハッシュ値はプロパティ名とプロパティタイプ名を結合したものを結合し、各文字のUTF-16コードと31を掛け続けたものです。 + * + * @return + */ fun hash(): Long { var hash = 1L map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 } diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt index 57d55ecb..a0f944e5 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt @@ -17,8 +17,16 @@ package dev.usbharu.owl.common.task import java.time.Instant -import java.util.UUID +import java.util.* +/** + * 公開済みのタスク + * + * @param T タスク + * @property task タスク + * @property id タスクのID + * @property published 公開された時刻 + */ data class PublishedTask( val task: T, val id: UUID, diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt index 2f196e8e..42d8dc03 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt @@ -16,5 +16,9 @@ package dev.usbharu.owl.common.task +/** + * タスク + * + */ open class Task { } \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt index d62b94de..b96c12de 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt @@ -18,15 +18,62 @@ package dev.usbharu.owl.common.task import dev.usbharu.owl.common.property.PropertyValue +/** + * タスク定義 + * + * @param T タスク + */ interface TaskDefinition { + /** + * タスク名 + */ val name: String + + /** + * 優先度 + */ val priority: Int + + /** + * 最大リトライ数 + */ val maxRetry: Int + + /** + * リトライポリシー名 + * + * ポリシーの解決は各Brokerに依存しています + */ val retryPolicy: String + + /** + * タスク実行時のタイムアウト(ミリ秒) + */ val timeoutMilli: Long + + /** + * プロパティ定義 + */ val propertyDefinition: PropertyDefinition + + /** + * [Task]の[Class] + */ val type: Class + /** + * タスクをシリアライズします. + * プロパティのシリアライズと混同しないようにしてください。 + * @param task シリアライズするタスク + * @return シリアライズされたタスク + */ fun serialize(task: T): Map> + + /** + * タスクをデシリアライズします。 + * プロパティのデシリアライズと混同しないようにしてください + * @param value デシリアライズするタスク + * @return デシリアライズされたタスク + */ fun deserialize(value: Map>): T } \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt index 1ee51d76..40b44201 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -16,9 +16,6 @@ package dev.usbharu.owl.consumer -import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig -import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest -import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.* import dev.usbharu.owl.Uuid.UUID import dev.usbharu.owl.common.property.PropertySerializeUtils @@ -29,10 +26,23 @@ import org.slf4j.LoggerFactory import java.time.Instant import kotlin.math.max +/** + * Consumer + * + * @property subscribeTaskStub + * @property assignmentTaskStub + * @property taskResultStub + * @property runnerMap + * @property propertySerializerFactory + * @constructor + * TODO + * + * @param consumerConfig + */ class Consumer( - private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, - private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, - private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, + private val subscribeTaskStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, + private val assignmentTaskStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, + private val taskResultStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, private val runnerMap: Map, private val propertySerializerFactory: PropertySerializerFactory, consumerConfig: ConsumerConfig @@ -44,10 +54,17 @@ class Consumer( private val concurrent = MutableStateFlow(consumerConfig.concurrent) private val processing = MutableStateFlow(0) + + /** + * Consumerを初期化します + * + * @param name Consumer名 + * @param hostname Consumerのホスト名 + */ suspend fun init(name: String, hostname: String) { logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) logger.debug("Registered Tasks: {}", runnerMap.keys) - consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest { this.name = name this.hostname = hostname this.tasks.addAll(runnerMap.keys) @@ -55,12 +72,16 @@ class Consumer( logger.info("Success initialize consumer. ConsumerID: {}", consumerId) } + /** + * タスクの受付を開始します + * + */ suspend fun start() { coroutineScope = CoroutineScope(Dispatchers.Default) coroutineScope { - taskResultServiceCoroutineStub + taskResultStub .tasKResult(flow { - assignmentTaskServiceCoroutineStub + assignmentTaskStub .ready(flow { while (coroutineScope.isActive) { val andSet = concurrent.getAndUpdate { 0 } @@ -141,6 +162,10 @@ class Consumer( } } + /** + * タスクの受付を停止します + * + */ fun stop() { logger.info("Stop Consumer. consumerID: {}", consumerId) coroutineScope.cancel() diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt index 52bca554..a4609e51 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -14,8 +14,13 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer +/** + * Consumerの構成 + * + * @property concurrent Consumerのワーカーの同時実行数 + */ data class ConsumerConfig( - val concurrent:Int, + val concurrent: Int ) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt new file mode 100644 index 00000000..31fba291 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import kotlinx.coroutines.runBlocking + +fun main() { + val standaloneConsumer = StandaloneConsumer() + + runBlocking { + standaloneConsumer.init() + standaloneConsumer.start() + } + +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt new file mode 100644 index 00000000..7d1cf295 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import dev.usbharu.owl.AssignmentTaskServiceGrpcKt +import dev.usbharu.owl.SubscribeTaskServiceGrpcKt +import dev.usbharu.owl.TaskResultServiceGrpcKt +import dev.usbharu.owl.common.property.CustomPropertySerializerFactory +import dev.usbharu.owl.common.property.PropertySerializerFactory +import io.grpc.ManagedChannelBuilder +import java.nio.file.Path +import java.util.* + +/** + * 単独で起動できるConsumer + * + * @property config Consumerの起動構成 + * @property propertySerializerFactory [dev.usbharu.owl.common.property.PropertyValue]のシリアライザーのファクトリ + */ +class StandaloneConsumer( + private val config: StandaloneConsumerConfig, + private val propertySerializerFactory: PropertySerializerFactory +) { + constructor( + path: Path, + propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory( + emptySet() + ) + ) : this(StandaloneConsumerConfigLoader.load(path), propertySerializerFactory) + + constructor(string: String) : this(Path.of(string)) + + constructor() : this(Path.of("consumer.properties")) + + private val channel = ManagedChannelBuilder.forAddress(config.address, config.port) + .usePlaintext() + .build() + + private val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) + private val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) + private val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) + + private val taskRunnerMap = ServiceLoader + .load(TaskRunner::class.java) + .associateBy { it.name } + + private val consumer = Consumer( + subscribeTaskStub = subscribeStub, + assignmentTaskStub = assignmentTaskStub, + taskResultStub = taskResultStub, + runnerMap = taskRunnerMap, + propertySerializerFactory = propertySerializerFactory, + consumerConfig = ConsumerConfig(config.concurrency) + ) + + /** + * Consumerを初期化します + * + */ + suspend fun init() { + consumer.init(config.name, config.hostname) + } + + /** + * Consumerのワーカーを起動し、タスクの受付を開始します。 + * + * シャットダウンフックに[stop]が登録されます。 + */ + suspend fun start() { + consumer.start() + Runtime.getRuntime().addShutdownHook(Thread { + consumer.stop() + }) + } + + /** + * Consumerを停止します + * + */ + fun stop() { + consumer.stop() + } + +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt new file mode 100644 index 00000000..ff31dde8 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +/** + * 単独で起動できるConsumerの構成 + * + * @property address brokerのアドレス + * @property port brokerのポート + * @property name Consumerの名前 + * @property hostname Consumerのホスト名 + * @property concurrency ConsumerのWorkerの最大同時実行数 + */ +data class StandaloneConsumerConfig( + val address: String, + val port: Int, + val name: String, + val hostname: String, + val concurrency: Int, +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt new file mode 100644 index 00000000..d11bda43 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import java.nio.file.Files +import java.nio.file.Path +import java.util.* + +/** + * 単独で起動できるConsumerの構成のローダー + */ +object StandaloneConsumerConfigLoader { + /** + * [Path]から構成を読み込みます + * + * @param path 読み込むパス + * @return 読み込まれた構成 + */ + fun load(path: Path): StandaloneConsumerConfig { + val properties = Properties() + + properties.load(Files.newInputStream(path)) + + val address = properties.getProperty("address") + val port = properties.getProperty("port").toInt() + val name = properties.getProperty("name") + val hostname = properties.getProperty("hostname") + val concurrency = properties.getProperty("concurrency").toInt() + + return StandaloneConsumerConfig(address, port, name, hostname, concurrency) + } +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt index d66e9a73..006856f2 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt @@ -14,12 +14,21 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer import dev.usbharu.owl.common.property.PropertyValue import java.time.Instant import java.util.* +/** + * タスクをConsumerに要求します + * + * @property name タスク名 + * @property id タスクID + * @property attempt 試行回数 + * @property queuedAt タスクがキューに入れられた時間 + * @property properties タスクに渡されたパラメータ + */ data class TaskRequest( val name:String, val id:UUID, diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt index 20858a9b..3e21dfb9 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt @@ -14,10 +14,17 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer import dev.usbharu.owl.common.property.PropertyValue +/** + * タスクの実行結果 + * + * @property success 成功したらtrue + * @property result タスクの実行結果のMap + * @property message その他メッセージ + */ data class TaskResult( val success: Boolean, val result: Map>, diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt index 44513ec1..613b0166 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt @@ -14,8 +14,23 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer -fun interface TaskRunner { - suspend fun run(taskRequest: TaskRequest):TaskResult +/** + * タスクを実行するランナー + * + */ +interface TaskRunner { + /** + * 実行するタスク名 + */ + val name: String + + /** + * タスクを実行する + * + * @param taskRequest 実行するタスク + * @return タスク実行結果 + */ + suspend fun run(taskRequest: TaskRequest): TaskResult } \ No newline at end of file diff --git a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt index b15d1d28..21495894 100644 --- a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt +++ b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt @@ -20,10 +20,32 @@ import dev.usbharu.owl.common.task.PublishedTask import dev.usbharu.owl.common.task.Task import dev.usbharu.owl.common.task.TaskDefinition +/** + * タスクを発生させるクライアント + * + */ interface OwlProducer { + /** + * Producerを開始します + * + */ suspend fun start() + /** + * タスク定義を登録します + * + * @param T 登録するタスク + * @param taskDefinition 登録するタスクの定義 + */ suspend fun registerTask(taskDefinition: TaskDefinition) + + /** + * タスクを公開します。タスクは定義済みである必要があります。 + * + * @param T 公開するタスク + * @param task タスクの詳細 + * @return 公開されたタスク + */ suspend fun publishTask(task: T): PublishedTask } diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt index 573fba71..33a14e34 100644 --- a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.producer.defaultimpl +package dev.usbharu.owl.producer.defaultimpl import com.google.protobuf.timestamp import dev.usbharu.owl.* diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt index 8ebaaf1c..8100088f 100644 --- a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt @@ -17,6 +17,8 @@ package dev.usbharu.dev.usbharu.owl.producer.defaultimpl import dev.usbharu.owl.producer.api.OwlProducerBuilder +import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducer +import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducerConfig import io.grpc.ManagedChannelBuilder class DefaultOwlProducerBuilder : OwlProducerBuilder { diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt index 527e1d04..a5eaddf6 100644 --- a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.producer.defaultimpl +package dev.usbharu.owl.producer.defaultimpl import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.producer.api.OwlProducerConfig