diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt index 1ee51d76..d8a8aadd 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -16,7 +16,6 @@ package dev.usbharu.owl.consumer -import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.* @@ -30,9 +29,9 @@ import java.time.Instant import kotlin.math.max class Consumer( - private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, - private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, - private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, + private val subscribeTaskStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, + private val assignmentTaskStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, + private val taskResultStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, private val runnerMap: Map, private val propertySerializerFactory: PropertySerializerFactory, consumerConfig: ConsumerConfig @@ -47,7 +46,7 @@ class Consumer( suspend fun init(name: String, hostname: String) { logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) logger.debug("Registered Tasks: {}", runnerMap.keys) - consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest { this.name = name this.hostname = hostname this.tasks.addAll(runnerMap.keys) @@ -58,9 +57,9 @@ class Consumer( suspend fun start() { coroutineScope = CoroutineScope(Dispatchers.Default) coroutineScope { - taskResultServiceCoroutineStub + taskResultStub .tasKResult(flow { - assignmentTaskServiceCoroutineStub + assignmentTaskStub .ready(flow { while (coroutineScope.isActive) { val andSet = concurrent.getAndUpdate { 0 } diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt index 3ee08539..9c340af2 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -14,10 +14,8 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.consumer +package dev.usbharu.owl.consumer data class ConsumerConfig( - val concurrent:Int, - val address: String, - val port: Int + val concurrent: Int ) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt index fd8ce00b..31fba291 100644 --- a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Main.kt @@ -16,47 +16,14 @@ package dev.usbharu.owl.consumer -import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig -import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner -import dev.usbharu.owl.AssignmentTaskServiceGrpcKt -import dev.usbharu.owl.SubscribeTaskServiceGrpcKt -import dev.usbharu.owl.TaskResultServiceGrpcKt -import dev.usbharu.owl.common.property.CustomPropertySerializerFactory -import io.grpc.ManagedChannelBuilder import kotlinx.coroutines.runBlocking -import java.util.* fun main() { - - val consumerConfig = ConsumerConfig(20, "localhost", 50051) - - val channel = ManagedChannelBuilder.forAddress(consumerConfig.address, consumerConfig.port).usePlaintext().build() - val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) - val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) - val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) - val customPropertySerializerFactory = CustomPropertySerializerFactory(emptySet()) - - val taskRunnerMap = ServiceLoader - .load(TaskRunner::class.java) - .associateBy { it::class.qualifiedName!! } - .filterNot { it.key.isBlank() } - - val consumer = Consumer( - subscribeStub, - assignmentTaskStub, - taskResultStub, - taskRunnerMap, - customPropertySerializerFactory, - consumerConfig - ) + val standaloneConsumer = StandaloneConsumer() runBlocking { - consumer.init("consumer", "consumer-1") - consumer.start() - - Runtime.getRuntime().addShutdownHook(Thread { - consumer.stop() - }) + standaloneConsumer.init() + standaloneConsumer.start() } } \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt new file mode 100644 index 00000000..0c430a21 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner +import dev.usbharu.owl.AssignmentTaskServiceGrpcKt +import dev.usbharu.owl.SubscribeTaskServiceGrpcKt +import dev.usbharu.owl.TaskResultServiceGrpcKt +import dev.usbharu.owl.common.property.CustomPropertySerializerFactory +import dev.usbharu.owl.common.property.PropertySerializerFactory +import io.grpc.ManagedChannelBuilder +import java.nio.file.Path +import java.util.* + +class StandaloneConsumer( + private val config: StandaloneConsumerConfig, + private val propertySerializerFactory: PropertySerializerFactory +) { + constructor( + path: Path, propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory( + emptySet() + ) + ) : this(StandaloneConsumerConfigLoader.load(path), propertySerializerFactory) + + constructor(string: String) : this(Path.of(string)) + + constructor() : this(Path.of("consumer.properties")) + + private val channel = ManagedChannelBuilder.forAddress(config.address, config.port) + .usePlaintext() + .build() + + private val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) + private val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel) + private val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) + + private val taskRunnerMap = ServiceLoader + .load(TaskRunner::class.java) + .associateBy { it::class.qualifiedName!! } + .filterNot { it.key.isBlank() } + + private val consumer = Consumer( + subscribeStub, + assignmentTaskStub, + taskResultStub, + taskRunnerMap, + propertySerializerFactory, + ConsumerConfig(config.concurrency) + ) + + suspend fun init() { + consumer.init(config.name, config.hostname) + } + + suspend fun start() { + consumer.start() + Runtime.getRuntime().addShutdownHook(Thread { + consumer.stop() + }) + } + + fun stop() { + consumer.stop() + } + +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt new file mode 100644 index 00000000..b551a0f4 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfig.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +data class StandaloneConsumerConfig( + val address: String, + val port: Int, + val name: String, + val hostname: String, + val concurrency: Int, +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt new file mode 100644 index 00000000..0fff6235 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumerConfigLoader.kt @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import java.nio.file.Files +import java.nio.file.Path +import java.util.* + +object StandaloneConsumerConfigLoader { + fun load(path: Path): StandaloneConsumerConfig { + val properties = Properties() + + properties.load(Files.newInputStream(path)) + + val address = properties.getProperty("address") + val port = properties.getProperty("port").toInt() + val name = properties.getProperty("name") + val hostname = properties.getProperty("hostname") + + return StandaloneConsumerConfig(address, port, name, hostname) + } +} \ No newline at end of file