Merge pull request 'consumer' (#2) from consumer into master
Reviewed-on: #2
This commit is contained in:
commit
88574b14c6
|
@ -1,10 +1,19 @@
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Boolean型のプロパティ
|
||||||
|
*
|
||||||
|
* @property value プロパティ
|
||||||
|
*/
|
||||||
class BooleanPropertyValue(override val value: Boolean) : PropertyValue<Boolean>() {
|
class BooleanPropertyValue(override val value: Boolean) : PropertyValue<Boolean>() {
|
||||||
override val type: PropertyType
|
override val type: PropertyType
|
||||||
get() = PropertyType.binary
|
get() = PropertyType.binary
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [BooleanPropertyValue]のシリアライザー
|
||||||
|
*
|
||||||
|
*/
|
||||||
class BooleanPropertySerializer : PropertySerializer<Boolean> {
|
class BooleanPropertySerializer : PropertySerializer<Boolean> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
return propertyValue.value is Boolean
|
return propertyValue.value is Boolean
|
||||||
|
|
|
@ -16,7 +16,11 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [Set]でカスタマイズできる[PropertySerializerFactory]
|
||||||
|
*
|
||||||
|
* @property propertySerializers [PropertySerializer]の[Set]
|
||||||
|
*/
|
||||||
open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) :
|
open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) :
|
||||||
PropertySerializerFactory {
|
PropertySerializerFactory {
|
||||||
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> {
|
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> {
|
||||||
|
|
|
@ -1,10 +1,19 @@
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Double型のプロパティ
|
||||||
|
*
|
||||||
|
* @property value プロパティ
|
||||||
|
*/
|
||||||
class DoublePropertyValue(override val value: Double) : PropertyValue<Double>() {
|
class DoublePropertyValue(override val value: Double) : PropertyValue<Double>() {
|
||||||
override val type: PropertyType
|
override val type: PropertyType
|
||||||
get() = PropertyType.number
|
get() = PropertyType.number
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [DoublePropertyValue]のシリアライザー
|
||||||
|
*
|
||||||
|
*/
|
||||||
class DoublePropertySerializer : PropertySerializer<Double> {
|
class DoublePropertySerializer : PropertySerializer<Double> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
return propertyValue.value is Double
|
return propertyValue.value is Double
|
||||||
|
|
|
@ -16,11 +16,20 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integer型のプロパティ
|
||||||
|
*
|
||||||
|
* @property value プロパティ
|
||||||
|
*/
|
||||||
class IntegerPropertyValue(override val value: Int) : PropertyValue<Int>() {
|
class IntegerPropertyValue(override val value: Int) : PropertyValue<Int>() {
|
||||||
override val type: PropertyType
|
override val type: PropertyType
|
||||||
get() = PropertyType.number
|
get() = PropertyType.number
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [IntegerPropertyValue]のシリアライザー
|
||||||
|
*
|
||||||
|
*/
|
||||||
class IntegerPropertySerializer : PropertySerializer<Int> {
|
class IntegerPropertySerializer : PropertySerializer<Int> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
return propertyValue.value is Int
|
return propertyValue.value is Int
|
||||||
|
|
|
@ -16,13 +16,30 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [PropertySerializer]のユーティリティークラス
|
||||||
|
*/
|
||||||
object PropertySerializeUtils {
|
object PropertySerializeUtils {
|
||||||
|
/**
|
||||||
|
* Stringと[PropertyValue]の[Map]から[PropertyValue]をシリアライズし、StringとStringの[Map]として返します
|
||||||
|
*
|
||||||
|
* @param serializerFactory シリアライズに使用する[PropertySerializerFactory]
|
||||||
|
* @param properties シリアライズする[Map]
|
||||||
|
* @return Stringとシリアライズ済みの[PropertyValue]の[Map]
|
||||||
|
*/
|
||||||
fun serialize(
|
fun serialize(
|
||||||
serializerFactory: PropertySerializerFactory,
|
serializerFactory: PropertySerializerFactory,
|
||||||
properties: Map<String, PropertyValue<*>>
|
properties: Map<String, PropertyValue<*>>
|
||||||
): Map<String, String> =
|
): Map<String, String> =
|
||||||
properties.map { it.key to serializerFactory.factory(it.value).serialize(it.value) }.toMap()
|
properties.map { it.key to serializerFactory.factory(it.value).serialize(it.value) }.toMap()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stringとシリアライズ済みの[PropertyValue]の[Map]からシリアライズ済みの[PropertyValue]をデシリアライズし、Stringと[PropertyValue]の[Map]として返します
|
||||||
|
*
|
||||||
|
* @param serializerFactory デシリアライズに使用する[PropertySerializerFactory]
|
||||||
|
* @param properties デシリアライズする[Map]
|
||||||
|
* @return Stringと[PropertyValue]の[Map]
|
||||||
|
*/
|
||||||
fun deserialize(
|
fun deserialize(
|
||||||
serializerFactory: PropertySerializerFactory,
|
serializerFactory: PropertySerializerFactory,
|
||||||
properties: Map<String, String>
|
properties: Map<String, String>
|
||||||
|
|
|
@ -16,10 +16,41 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [PropertyValue]をシリアライズ・デシリアライズします
|
||||||
|
*
|
||||||
|
* @param T [PropertyValue]の型
|
||||||
|
*/
|
||||||
interface PropertySerializer<T> {
|
interface PropertySerializer<T> {
|
||||||
|
/**
|
||||||
|
* [PropertyValue]をサポートしているかを確認します
|
||||||
|
*
|
||||||
|
* @param propertyValue 確認する[PropertyValue]
|
||||||
|
* @return サポートしている場合true
|
||||||
|
*/
|
||||||
fun isSupported(propertyValue: PropertyValue<*>): Boolean
|
fun isSupported(propertyValue: PropertyValue<*>): Boolean
|
||||||
|
|
||||||
|
/**
|
||||||
|
* シリアライズ済みの[PropertyValue]から[PropertyValue]をサポートしているかを確認します
|
||||||
|
*
|
||||||
|
* @param string 確認するシリアライズ済みの[PropertyValue]
|
||||||
|
* @return サポートしている場合true
|
||||||
|
*/
|
||||||
fun isSupported(string: String): Boolean
|
fun isSupported(string: String): Boolean
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [PropertyValue]をシリアライズします
|
||||||
|
*
|
||||||
|
* @param propertyValue シリアライズする[PropertyValue]
|
||||||
|
* @return シリアライズ済みの[PropertyValue]
|
||||||
|
*/
|
||||||
fun serialize(propertyValue: PropertyValue<*>): String
|
fun serialize(propertyValue: PropertyValue<*>): String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* デシリアライズします
|
||||||
|
*
|
||||||
|
* @param string シリアライズ済みの[PropertyValue]
|
||||||
|
* @return デシリアライズされた[PropertyValue]
|
||||||
|
*/
|
||||||
fun deserialize(string: String): PropertyValue<T>
|
fun deserialize(string: String): PropertyValue<T>
|
||||||
}
|
}
|
|
@ -16,7 +16,25 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [PropertyValue]のシリアライザーのファクトリ
|
||||||
|
*
|
||||||
|
*/
|
||||||
interface PropertySerializerFactory {
|
interface PropertySerializerFactory {
|
||||||
|
/**
|
||||||
|
* [PropertyValue]からシリアライザーを作成します
|
||||||
|
*
|
||||||
|
* @param T [PropertyValue]の型
|
||||||
|
* @param propertyValue シリアライザーを作成する[PropertyValue]
|
||||||
|
* @return 作成されたシリアライザー
|
||||||
|
*/
|
||||||
fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T>
|
fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* シリアライズ済みの[PropertyValue]からシリアライザーを作成します
|
||||||
|
*
|
||||||
|
* @param string シリアライズ済みの[PropertyValue]
|
||||||
|
* @return 作成されたシリアライザー
|
||||||
|
*/
|
||||||
fun factory(string: String): PropertySerializer<*>
|
fun factory(string: String): PropertySerializer<*>
|
||||||
}
|
}
|
|
@ -16,8 +16,26 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* プロパティの型
|
||||||
|
*
|
||||||
|
*/
|
||||||
enum class PropertyType {
|
enum class PropertyType {
|
||||||
|
/**
|
||||||
|
* 数字
|
||||||
|
*
|
||||||
|
*/
|
||||||
number,
|
number,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 文字列
|
||||||
|
*
|
||||||
|
*/
|
||||||
string,
|
string,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* バイナリ
|
||||||
|
*
|
||||||
|
*/
|
||||||
binary
|
binary
|
||||||
}
|
}
|
|
@ -16,7 +16,19 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* プロパティで使用される値
|
||||||
|
*
|
||||||
|
* @param T プロパティの型
|
||||||
|
*/
|
||||||
sealed class PropertyValue<T> {
|
sealed class PropertyValue<T> {
|
||||||
|
/**
|
||||||
|
* プロパティ
|
||||||
|
*/
|
||||||
abstract val value: T
|
abstract val value: T
|
||||||
|
|
||||||
|
/**
|
||||||
|
* プロパティの型
|
||||||
|
*/
|
||||||
abstract val type: PropertyType
|
abstract val type: PropertyType
|
||||||
}
|
}
|
|
@ -1,10 +1,19 @@
|
||||||
package dev.usbharu.owl.common.property
|
package dev.usbharu.owl.common.property
|
||||||
|
|
||||||
|
/**
|
||||||
|
* String型のプロパティ
|
||||||
|
*
|
||||||
|
* @property value プロパティ
|
||||||
|
*/
|
||||||
class StringPropertyValue(override val value: String) : PropertyValue<String>() {
|
class StringPropertyValue(override val value: String) : PropertyValue<String>() {
|
||||||
override val type: PropertyType
|
override val type: PropertyType
|
||||||
get() = PropertyType.string
|
get() = PropertyType.string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [StringPropertyValue]のシリアライザー
|
||||||
|
*
|
||||||
|
*/
|
||||||
class StringPropertyValueSerializer : PropertySerializer<String> {
|
class StringPropertyValueSerializer : PropertySerializer<String> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
return propertyValue.value is String
|
return propertyValue.value is String
|
||||||
|
|
|
@ -4,8 +4,14 @@ import java.time.Instant
|
||||||
import kotlin.math.pow
|
import kotlin.math.pow
|
||||||
import kotlin.math.roundToLong
|
import kotlin.math.roundToLong
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指数関数的に待機時間が増えるリトライポリシー
|
||||||
|
* `firstRetrySeconds x attempt ^ 2 - firstRetrySeconds`
|
||||||
|
*
|
||||||
|
* @property firstRetrySeconds
|
||||||
|
*/
|
||||||
class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy {
|
class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy {
|
||||||
override fun nextRetry(now: Instant, attempt: Int): Instant =
|
override fun nextRetry(now: Instant, attempt: Int): Instant =
|
||||||
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - 30)
|
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds)
|
||||||
|
|
||||||
}
|
}
|
|
@ -18,6 +18,19 @@ package dev.usbharu.owl.common.retry
|
||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
|
/**
|
||||||
|
* リトライポリシー
|
||||||
|
*
|
||||||
|
*/
|
||||||
interface RetryPolicy {
|
interface RetryPolicy {
|
||||||
|
/**
|
||||||
|
* 次のリトライ時刻を返します。
|
||||||
|
*
|
||||||
|
* [attempt]を負の値にしてはいけません
|
||||||
|
*
|
||||||
|
* @param now 現在の時刻
|
||||||
|
* @param attempt 試行回数
|
||||||
|
* @return 次のリトライ時刻
|
||||||
|
*/
|
||||||
fun nextRetry(now: Instant, attempt: Int): Instant
|
fun nextRetry(now: Instant, attempt: Int): Instant
|
||||||
}
|
}
|
|
@ -18,7 +18,19 @@ package dev.usbharu.owl.common.task
|
||||||
|
|
||||||
import dev.usbharu.owl.common.property.PropertyType
|
import dev.usbharu.owl.common.property.PropertyType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* プロパティ定義
|
||||||
|
*
|
||||||
|
* @property map プロパティ名とプロパティタイプの[Map]
|
||||||
|
*/
|
||||||
class PropertyDefinition(val map: Map<String, PropertyType>) : Map<String, PropertyType> by map {
|
class PropertyDefinition(val map: Map<String, PropertyType>) : Map<String, PropertyType> by map {
|
||||||
|
/**
|
||||||
|
* プロパティ定義のハッシュを求めます
|
||||||
|
*
|
||||||
|
* ハッシュ値はプロパティ名とプロパティタイプ名を結合したものを結合し、各文字のUTF-16コードと31を掛け続けたものです。
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
fun hash(): Long {
|
fun hash(): Long {
|
||||||
var hash = 1L
|
var hash = 1L
|
||||||
map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 }
|
map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 }
|
||||||
|
|
|
@ -17,8 +17,16 @@
|
||||||
package dev.usbharu.owl.common.task
|
package dev.usbharu.owl.common.task
|
||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.UUID
|
import java.util.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 公開済みのタスク
|
||||||
|
*
|
||||||
|
* @param T タスク
|
||||||
|
* @property task タスク
|
||||||
|
* @property id タスクのID
|
||||||
|
* @property published 公開された時刻
|
||||||
|
*/
|
||||||
data class PublishedTask<T : Task>(
|
data class PublishedTask<T : Task>(
|
||||||
val task: T,
|
val task: T,
|
||||||
val id: UUID,
|
val id: UUID,
|
||||||
|
|
|
@ -16,5 +16,9 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.task
|
package dev.usbharu.owl.common.task
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスク
|
||||||
|
*
|
||||||
|
*/
|
||||||
open class Task {
|
open class Task {
|
||||||
}
|
}
|
|
@ -18,15 +18,62 @@ package dev.usbharu.owl.common.task
|
||||||
|
|
||||||
import dev.usbharu.owl.common.property.PropertyValue
|
import dev.usbharu.owl.common.property.PropertyValue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスク定義
|
||||||
|
*
|
||||||
|
* @param T タスク
|
||||||
|
*/
|
||||||
interface TaskDefinition<T : Task> {
|
interface TaskDefinition<T : Task> {
|
||||||
|
/**
|
||||||
|
* タスク名
|
||||||
|
*/
|
||||||
val name: String
|
val name: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 優先度
|
||||||
|
*/
|
||||||
val priority: Int
|
val priority: Int
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 最大リトライ数
|
||||||
|
*/
|
||||||
val maxRetry: Int
|
val maxRetry: Int
|
||||||
|
|
||||||
|
/**
|
||||||
|
* リトライポリシー名
|
||||||
|
*
|
||||||
|
* ポリシーの解決は各Brokerに依存しています
|
||||||
|
*/
|
||||||
val retryPolicy: String
|
val retryPolicy: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスク実行時のタイムアウト(ミリ秒)
|
||||||
|
*/
|
||||||
val timeoutMilli: Long
|
val timeoutMilli: Long
|
||||||
|
|
||||||
|
/**
|
||||||
|
* プロパティ定義
|
||||||
|
*/
|
||||||
val propertyDefinition: PropertyDefinition
|
val propertyDefinition: PropertyDefinition
|
||||||
|
|
||||||
|
/**
|
||||||
|
* [Task]の[Class]
|
||||||
|
*/
|
||||||
val type: Class<T>
|
val type: Class<T>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクをシリアライズします.
|
||||||
|
* プロパティのシリアライズと混同しないようにしてください。
|
||||||
|
* @param task シリアライズするタスク
|
||||||
|
* @return シリアライズされたタスク
|
||||||
|
*/
|
||||||
fun serialize(task: T): Map<String, PropertyValue<*>>
|
fun serialize(task: T): Map<String, PropertyValue<*>>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクをデシリアライズします。
|
||||||
|
* プロパティのデシリアライズと混同しないようにしてください
|
||||||
|
* @param value デシリアライズするタスク
|
||||||
|
* @return デシリアライズされたタスク
|
||||||
|
*/
|
||||||
fun deserialize(value: Map<String, PropertyValue<*>>): T
|
fun deserialize(value: Map<String, PropertyValue<*>>): T
|
||||||
}
|
}
|
|
@ -16,9 +16,6 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.consumer
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig
|
|
||||||
import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest
|
|
||||||
import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner
|
|
||||||
import dev.usbharu.owl.*
|
import dev.usbharu.owl.*
|
||||||
import dev.usbharu.owl.Uuid.UUID
|
import dev.usbharu.owl.Uuid.UUID
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
|
@ -29,10 +26,23 @@ import org.slf4j.LoggerFactory
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import kotlin.math.max
|
import kotlin.math.max
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumer
|
||||||
|
*
|
||||||
|
* @property subscribeTaskStub
|
||||||
|
* @property assignmentTaskStub
|
||||||
|
* @property taskResultStub
|
||||||
|
* @property runnerMap
|
||||||
|
* @property propertySerializerFactory
|
||||||
|
* @constructor
|
||||||
|
* TODO
|
||||||
|
*
|
||||||
|
* @param consumerConfig
|
||||||
|
*/
|
||||||
class Consumer(
|
class Consumer(
|
||||||
private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub,
|
private val subscribeTaskStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub,
|
||||||
private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub,
|
private val assignmentTaskStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub,
|
||||||
private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub,
|
private val taskResultStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub,
|
||||||
private val runnerMap: Map<String, TaskRunner>,
|
private val runnerMap: Map<String, TaskRunner>,
|
||||||
private val propertySerializerFactory: PropertySerializerFactory,
|
private val propertySerializerFactory: PropertySerializerFactory,
|
||||||
consumerConfig: ConsumerConfig
|
consumerConfig: ConsumerConfig
|
||||||
|
@ -44,10 +54,17 @@ class Consumer(
|
||||||
|
|
||||||
private val concurrent = MutableStateFlow(consumerConfig.concurrent)
|
private val concurrent = MutableStateFlow(consumerConfig.concurrent)
|
||||||
private val processing = MutableStateFlow(0)
|
private val processing = MutableStateFlow(0)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumerを初期化します
|
||||||
|
*
|
||||||
|
* @param name Consumer名
|
||||||
|
* @param hostname Consumerのホスト名
|
||||||
|
*/
|
||||||
suspend fun init(name: String, hostname: String) {
|
suspend fun init(name: String, hostname: String) {
|
||||||
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
|
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
|
||||||
logger.debug("Registered Tasks: {}", runnerMap.keys)
|
logger.debug("Registered Tasks: {}", runnerMap.keys)
|
||||||
consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest {
|
consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest {
|
||||||
this.name = name
|
this.name = name
|
||||||
this.hostname = hostname
|
this.hostname = hostname
|
||||||
this.tasks.addAll(runnerMap.keys)
|
this.tasks.addAll(runnerMap.keys)
|
||||||
|
@ -55,12 +72,16 @@ class Consumer(
|
||||||
logger.info("Success initialize consumer. ConsumerID: {}", consumerId)
|
logger.info("Success initialize consumer. ConsumerID: {}", consumerId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクの受付を開始します
|
||||||
|
*
|
||||||
|
*/
|
||||||
suspend fun start() {
|
suspend fun start() {
|
||||||
coroutineScope = CoroutineScope(Dispatchers.Default)
|
coroutineScope = CoroutineScope(Dispatchers.Default)
|
||||||
coroutineScope {
|
coroutineScope {
|
||||||
taskResultServiceCoroutineStub
|
taskResultStub
|
||||||
.tasKResult(flow {
|
.tasKResult(flow {
|
||||||
assignmentTaskServiceCoroutineStub
|
assignmentTaskStub
|
||||||
.ready(flow {
|
.ready(flow {
|
||||||
while (coroutineScope.isActive) {
|
while (coroutineScope.isActive) {
|
||||||
val andSet = concurrent.getAndUpdate { 0 }
|
val andSet = concurrent.getAndUpdate { 0 }
|
||||||
|
@ -141,6 +162,10 @@ class Consumer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクの受付を停止します
|
||||||
|
*
|
||||||
|
*/
|
||||||
fun stop() {
|
fun stop() {
|
||||||
logger.info("Stop Consumer. consumerID: {}", consumerId)
|
logger.info("Stop Consumer. consumerID: {}", consumerId)
|
||||||
coroutineScope.cancel()
|
coroutineScope.cancel()
|
||||||
|
|
|
@ -14,8 +14,13 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package dev.usbharu.dev.usbharu.owl.consumer
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumerの構成
|
||||||
|
*
|
||||||
|
* @property concurrent Consumerのワーカーの同時実行数
|
||||||
|
*/
|
||||||
data class ConsumerConfig(
|
data class ConsumerConfig(
|
||||||
val concurrent:Int,
|
val concurrent: Int
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2024 usbharu
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val standaloneConsumer = StandaloneConsumer()
|
||||||
|
|
||||||
|
runBlocking {
|
||||||
|
standaloneConsumer.init()
|
||||||
|
standaloneConsumer.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2024 usbharu
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
|
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
|
||||||
|
import dev.usbharu.owl.SubscribeTaskServiceGrpcKt
|
||||||
|
import dev.usbharu.owl.TaskResultServiceGrpcKt
|
||||||
|
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
|
||||||
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
|
import io.grpc.ManagedChannelBuilder
|
||||||
|
import java.nio.file.Path
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 単独で起動できるConsumer
|
||||||
|
*
|
||||||
|
* @property config Consumerの起動構成
|
||||||
|
* @property propertySerializerFactory [dev.usbharu.owl.common.property.PropertyValue]のシリアライザーのファクトリ
|
||||||
|
*/
|
||||||
|
class StandaloneConsumer(
|
||||||
|
private val config: StandaloneConsumerConfig,
|
||||||
|
private val propertySerializerFactory: PropertySerializerFactory
|
||||||
|
) {
|
||||||
|
constructor(
|
||||||
|
path: Path,
|
||||||
|
propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory(
|
||||||
|
emptySet()
|
||||||
|
)
|
||||||
|
) : this(StandaloneConsumerConfigLoader.load(path), propertySerializerFactory)
|
||||||
|
|
||||||
|
constructor(string: String) : this(Path.of(string))
|
||||||
|
|
||||||
|
constructor() : this(Path.of("consumer.properties"))
|
||||||
|
|
||||||
|
private val channel = ManagedChannelBuilder.forAddress(config.address, config.port)
|
||||||
|
.usePlaintext()
|
||||||
|
.build()
|
||||||
|
|
||||||
|
private val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel)
|
||||||
|
private val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel)
|
||||||
|
private val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel)
|
||||||
|
|
||||||
|
private val taskRunnerMap = ServiceLoader
|
||||||
|
.load(TaskRunner::class.java)
|
||||||
|
.associateBy { it.name }
|
||||||
|
|
||||||
|
private val consumer = Consumer(
|
||||||
|
subscribeTaskStub = subscribeStub,
|
||||||
|
assignmentTaskStub = assignmentTaskStub,
|
||||||
|
taskResultStub = taskResultStub,
|
||||||
|
runnerMap = taskRunnerMap,
|
||||||
|
propertySerializerFactory = propertySerializerFactory,
|
||||||
|
consumerConfig = ConsumerConfig(config.concurrency)
|
||||||
|
)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumerを初期化します
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
suspend fun init() {
|
||||||
|
consumer.init(config.name, config.hostname)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumerのワーカーを起動し、タスクの受付を開始します。
|
||||||
|
*
|
||||||
|
* シャットダウンフックに[stop]が登録されます。
|
||||||
|
*/
|
||||||
|
suspend fun start() {
|
||||||
|
consumer.start()
|
||||||
|
Runtime.getRuntime().addShutdownHook(Thread {
|
||||||
|
consumer.stop()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consumerを停止します
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
fun stop() {
|
||||||
|
consumer.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2024 usbharu
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 単独で起動できるConsumerの構成
|
||||||
|
*
|
||||||
|
* @property address brokerのアドレス
|
||||||
|
* @property port brokerのポート
|
||||||
|
* @property name Consumerの名前
|
||||||
|
* @property hostname Consumerのホスト名
|
||||||
|
* @property concurrency ConsumerのWorkerの最大同時実行数
|
||||||
|
*/
|
||||||
|
data class StandaloneConsumerConfig(
|
||||||
|
val address: String,
|
||||||
|
val port: Int,
|
||||||
|
val name: String,
|
||||||
|
val hostname: String,
|
||||||
|
val concurrency: Int,
|
||||||
|
)
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2024 usbharu
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
|
import java.nio.file.Files
|
||||||
|
import java.nio.file.Path
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 単独で起動できるConsumerの構成のローダー
|
||||||
|
*/
|
||||||
|
object StandaloneConsumerConfigLoader {
|
||||||
|
/**
|
||||||
|
* [Path]から構成を読み込みます
|
||||||
|
*
|
||||||
|
* @param path 読み込むパス
|
||||||
|
* @return 読み込まれた構成
|
||||||
|
*/
|
||||||
|
fun load(path: Path): StandaloneConsumerConfig {
|
||||||
|
val properties = Properties()
|
||||||
|
|
||||||
|
properties.load(Files.newInputStream(path))
|
||||||
|
|
||||||
|
val address = properties.getProperty("address")
|
||||||
|
val port = properties.getProperty("port").toInt()
|
||||||
|
val name = properties.getProperty("name")
|
||||||
|
val hostname = properties.getProperty("hostname")
|
||||||
|
val concurrency = properties.getProperty("concurrency").toInt()
|
||||||
|
|
||||||
|
return StandaloneConsumerConfig(address, port, name, hostname, concurrency)
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,12 +14,21 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package dev.usbharu.dev.usbharu.owl.consumer
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
import dev.usbharu.owl.common.property.PropertyValue
|
import dev.usbharu.owl.common.property.PropertyValue
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクをConsumerに要求します
|
||||||
|
*
|
||||||
|
* @property name タスク名
|
||||||
|
* @property id タスクID
|
||||||
|
* @property attempt 試行回数
|
||||||
|
* @property queuedAt タスクがキューに入れられた時間
|
||||||
|
* @property properties タスクに渡されたパラメータ
|
||||||
|
*/
|
||||||
data class TaskRequest(
|
data class TaskRequest(
|
||||||
val name:String,
|
val name:String,
|
||||||
val id:UUID,
|
val id:UUID,
|
||||||
|
|
|
@ -14,10 +14,17 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package dev.usbharu.dev.usbharu.owl.consumer
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
import dev.usbharu.owl.common.property.PropertyValue
|
import dev.usbharu.owl.common.property.PropertyValue
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクの実行結果
|
||||||
|
*
|
||||||
|
* @property success 成功したらtrue
|
||||||
|
* @property result タスクの実行結果のMap
|
||||||
|
* @property message その他メッセージ
|
||||||
|
*/
|
||||||
data class TaskResult(
|
data class TaskResult(
|
||||||
val success: Boolean,
|
val success: Boolean,
|
||||||
val result: Map<String, PropertyValue<*>>,
|
val result: Map<String, PropertyValue<*>>,
|
||||||
|
|
|
@ -14,8 +14,23 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package dev.usbharu.dev.usbharu.owl.consumer
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
fun interface TaskRunner {
|
/**
|
||||||
|
* タスクを実行するランナー
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
interface TaskRunner {
|
||||||
|
/**
|
||||||
|
* 実行するタスク名
|
||||||
|
*/
|
||||||
|
val name: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクを実行する
|
||||||
|
*
|
||||||
|
* @param taskRequest 実行するタスク
|
||||||
|
* @return タスク実行結果
|
||||||
|
*/
|
||||||
suspend fun run(taskRequest: TaskRequest): TaskResult
|
suspend fun run(taskRequest: TaskRequest): TaskResult
|
||||||
}
|
}
|
|
@ -20,10 +20,32 @@ import dev.usbharu.owl.common.task.PublishedTask
|
||||||
import dev.usbharu.owl.common.task.Task
|
import dev.usbharu.owl.common.task.Task
|
||||||
import dev.usbharu.owl.common.task.TaskDefinition
|
import dev.usbharu.owl.common.task.TaskDefinition
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクを発生させるクライアント
|
||||||
|
*
|
||||||
|
*/
|
||||||
interface OwlProducer {
|
interface OwlProducer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Producerを開始します
|
||||||
|
*
|
||||||
|
*/
|
||||||
suspend fun start()
|
suspend fun start()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスク定義を登録します
|
||||||
|
*
|
||||||
|
* @param T 登録するタスク
|
||||||
|
* @param taskDefinition 登録するタスクの定義
|
||||||
|
*/
|
||||||
suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>)
|
suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* タスクを公開します。タスクは定義済みである必要があります。
|
||||||
|
*
|
||||||
|
* @param T 公開するタスク
|
||||||
|
* @param task タスクの詳細
|
||||||
|
* @return 公開されたタスク
|
||||||
|
*/
|
||||||
suspend fun <T : Task> publishTask(task: T): PublishedTask<T>
|
suspend fun <T : Task> publishTask(task: T): PublishedTask<T>
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
|
package dev.usbharu.owl.producer.defaultimpl
|
||||||
|
|
||||||
import com.google.protobuf.timestamp
|
import com.google.protobuf.timestamp
|
||||||
import dev.usbharu.owl.*
|
import dev.usbharu.owl.*
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
|
package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
|
||||||
|
|
||||||
import dev.usbharu.owl.producer.api.OwlProducerBuilder
|
import dev.usbharu.owl.producer.api.OwlProducerBuilder
|
||||||
|
import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducer
|
||||||
|
import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducerConfig
|
||||||
import io.grpc.ManagedChannelBuilder
|
import io.grpc.ManagedChannelBuilder
|
||||||
|
|
||||||
class DefaultOwlProducerBuilder : OwlProducerBuilder<DefaultOwlProducer, DefaultOwlProducerConfig> {
|
class DefaultOwlProducerBuilder : OwlProducerBuilder<DefaultOwlProducer, DefaultOwlProducerConfig> {
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
|
package dev.usbharu.owl.producer.defaultimpl
|
||||||
|
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.producer.api.OwlProducerConfig
|
import dev.usbharu.owl.producer.api.OwlProducerConfig
|
||||||
|
|
Loading…
Reference in New Issue