consumer #2

Merged
usbharu merged 10 commits from consumer into master 2024-04-30 09:26:10 +00:00
5 changed files with 57 additions and 7 deletions
Showing only changes of commit 67e5dd3c1d - Show all commits

View File

@ -16,6 +16,11 @@
package dev.usbharu.owl.common.property
/**
* プロパティで使用される値
*
* @param T プロパティの型
*/
sealed class PropertyValue<T> {
abstract val value: T
abstract val type: PropertyType

View File

@ -25,12 +25,19 @@ import io.grpc.ManagedChannelBuilder
import java.nio.file.Path
import java.util.*
/**
* 単独で起動できるConsumer
*
* @property config Consumerの起動構成
* @property propertySerializerFactory [dev.usbharu.owl.common.property.PropertyValue]のシリアライザーのファクトリ
*/
class StandaloneConsumer(
private val config: StandaloneConsumerConfig,
private val propertySerializerFactory: PropertySerializerFactory
) {
constructor(
path: Path, propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory(
path: Path,
propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory(
emptySet()
)
) : this(StandaloneConsumerConfigLoader.load(path), propertySerializerFactory)
@ -52,18 +59,27 @@ class StandaloneConsumer(
.associateBy { it.name }
private val consumer = Consumer(
subscribeStub,
assignmentTaskStub,
taskResultStub,
taskRunnerMap,
propertySerializerFactory,
ConsumerConfig(config.concurrency)
subscribeTaskStub = subscribeStub,
assignmentTaskStub = assignmentTaskStub,
taskResultStub = taskResultStub,
runnerMap = taskRunnerMap,
propertySerializerFactory = propertySerializerFactory,
consumerConfig = ConsumerConfig(config.concurrency)
)
/**
* Consumerを初期化します
*
*/
suspend fun init() {
consumer.init(config.name, config.hostname)
}
/**
* Consumerのワーカーを起動しタスクの受付を開始します
*
* シャットダウンフックに[stop]が登録されます
*/
suspend fun start() {
consumer.start()
Runtime.getRuntime().addShutdownHook(Thread {
@ -71,6 +87,10 @@ class StandaloneConsumer(
})
}
/**
* Consumerを停止します
*
*/
fun stop() {
consumer.stop()
}

View File

@ -16,6 +16,15 @@
package dev.usbharu.owl.consumer
/**
* 単独で起動できるConsumerの構成
*
* @property address brokerのアドレス
* @property port brokerのポート
* @property name Consumerの名前
* @property hostname Consumerのホスト名
* @property concurrency ConsumerのWorkerの最大同時実行数
*/
data class StandaloneConsumerConfig(
val address: String,
val port: Int,

View File

@ -20,7 +20,16 @@ import java.nio.file.Files
import java.nio.file.Path
import java.util.*
/**
* 単独で起動できるConsumerの構成のローダー
*/
object StandaloneConsumerConfigLoader {
/**
* [Path]から構成を読み込みます
*
* @param path 読み込むパス
* @return 読み込まれた構成
*/
fun load(path: Path): StandaloneConsumerConfig {
val properties = Properties()

View File

@ -18,6 +18,13 @@ package dev.usbharu.owl.consumer
import dev.usbharu.owl.common.property.PropertyValue
/**
* タスクの実行結果
*
* @property success 成功したらtrue
* @property result タスクの実行結果のMap
* @property message その他メッセージ
*/
data class TaskResult(
val success: Boolean,
val result: Map<String, PropertyValue<*>>,