From 5cdf78483db1ad208dc4e2c1813bae5d6d102427 Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Sat, 11 May 2024 19:04:57 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Consumer=E3=82=92=E8=AA=AD=E3=81=BF?= =?UTF-8?q?=E8=BE=BC=E3=82=93=E3=81=A7=E8=B5=B7=E5=8B=95=E3=81=A7=E3=81=8D?= =?UTF-8?q?=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hideout/application/config/OwlConfig.kt | 19 +- .../hideout/core/external/job/InboxTask.kt | 21 ++ hideout-core/src/main/resources/log4j2.xml | 16 ++ hideout-worker/build.gradle.kts | 9 + .../usbharu/hideout/SpringTaskRunnerLoader.kt | 26 +++ .../dev/usbharu/hideout/WorkerRunner.kt | 36 ++++ libs.versions.toml | 2 +- .../mongodb/MongodbQueuedTaskRepository.kt | 2 +- .../kotlin/dev/usbharu/owl/broker/Main.kt | 2 +- .../owl/broker/OwlBrokerApplication.kt | 4 +- .../interfaces/grpc/AssignmentTaskService.kt | 46 +++-- .../interfaces/grpc/TaskResultService.kt | 39 ++-- .../DefaultPropertySerializerFactory.kt | 4 +- .../owl/broker/service/QueuedTaskAssigner.kt | 7 +- .../common/property/ObjectPropertyValue.kt | 5 +- .../CustomPropertySerializerFactory.kt | 3 +- .../property/PropertySerializeException.kt | 30 +++ .../common/property/PropertySerializeUtils.kt | 13 +- .../owl/common/property/PropertyValue.kt | 5 + .../usbharu/owl/common/task/TaskDefinition.kt | 6 +- .../dev/usbharu/owl/consumer/Consumer.kt | 182 ++++++++++-------- .../owl/consumer/StandaloneConsumer.kt | 8 +- .../src/main/resources/consumer.properties | 20 ++ .../producer/embedded/EmbeddedOwlProducer.kt | 6 +- .../embedded/EmbeddedOwlProducerConfig.kt | 2 + 25 files changed, 392 insertions(+), 121 deletions(-) create mode 100644 hideout-core/src/main/resources/log4j2.xml create mode 100644 hideout-worker/src/main/kotlin/dev/usbharu/hideout/SpringTaskRunnerLoader.kt create mode 100644 hideout-worker/src/main/kotlin/dev/usbharu/hideout/WorkerRunner.kt create mode 100644 owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt create mode 100644 owl/owl-consumer/src/main/resources/consumer.properties diff --git a/hideout-core/src/main/kotlin/dev/usbharu/hideout/application/config/OwlConfig.kt b/hideout-core/src/main/kotlin/dev/usbharu/hideout/application/config/OwlConfig.kt index 51d3be21..e5454e23 100644 --- a/hideout-core/src/main/kotlin/dev/usbharu/hideout/application/config/OwlConfig.kt +++ b/hideout-core/src/main/kotlin/dev/usbharu/hideout/application/config/OwlConfig.kt @@ -16,7 +16,9 @@ package dev.usbharu.hideout.application.config +import com.fasterxml.jackson.databind.ObjectMapper import dev.usbharu.owl.broker.ModuleContext +import dev.usbharu.owl.common.property.* import dev.usbharu.owl.common.retry.RetryPolicyFactory import dev.usbharu.owl.producer.api.OWL import dev.usbharu.owl.producer.api.OwlProducer @@ -24,6 +26,7 @@ import dev.usbharu.owl.producer.defaultimpl.DEFAULT import dev.usbharu.owl.producer.embedded.EMBEDDED import dev.usbharu.owl.producer.embedded.EMBEDDED_GRPC import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -32,7 +35,10 @@ import java.util.* @Configuration class OwlConfig(private val producerConfig: ProducerConfig) { @Bean - fun producer(@Autowired(required = false) retryPolicyFactory: RetryPolicyFactory? = null): OwlProducer { + fun producer( + @Autowired(required = false) retryPolicyFactory: RetryPolicyFactory? = null, + @Qualifier("activitypub") objectMapper: ObjectMapper, + ): OwlProducer { return when (producerConfig.mode) { ProducerMode.EMBEDDED -> { OWL(EMBEDDED) { @@ -46,6 +52,17 @@ class OwlConfig(private val producerConfig: ProducerConfig) { if (moduleContext != null) { this.moduleContext = moduleContext } + this.propertySerializerFactory = CustomPropertySerializerFactory( + setOf( + IntegerPropertySerializer(), + StringPropertyValueSerializer(), + DoublePropertySerializer(), + BooleanPropertySerializer(), + LongPropertySerializer(), + FloatPropertySerializer(), + ObjectPropertySerializer(objectMapper), + ) + ) } } diff --git a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/external/job/InboxTask.kt b/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/external/job/InboxTask.kt index b3e342b0..de6b926f 100644 --- a/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/external/job/InboxTask.kt +++ b/hideout-core/src/main/kotlin/dev/usbharu/hideout/core/external/job/InboxTask.kt @@ -18,6 +18,9 @@ package dev.usbharu.hideout.core.external.job import dev.usbharu.hideout.activitypub.service.common.ActivityType import dev.usbharu.httpsignature.common.HttpRequest +import dev.usbharu.owl.common.property.ObjectPropertyValue +import dev.usbharu.owl.common.property.PropertyValue +import dev.usbharu.owl.common.property.StringPropertyValue import dev.usbharu.owl.common.task.Task import dev.usbharu.owl.common.task.TaskDefinition import org.springframework.stereotype.Component @@ -33,4 +36,22 @@ data class InboxTask( data object InboxTaskDef : TaskDefinition { override val type: Class get() = InboxTask::class.java + + override fun serialize(task: InboxTask): Map> { + return mapOf( + "json" to StringPropertyValue(task.json), + "type" to ObjectPropertyValue(task.type), + "httpRequest" to ObjectPropertyValue(task.httpRequest), + "headers" to ObjectPropertyValue(task.headers), + ) + } + + override fun deserialize(value: Map>): InboxTask { + return InboxTask( + value.getValue("json").value as String, + value.getValue("type").value as ActivityType, + value.getValue("httpRequest").value as HttpRequest, + value.getValue("headers").value as Map>, + ) + } } diff --git a/hideout-core/src/main/resources/log4j2.xml b/hideout-core/src/main/resources/log4j2.xml new file mode 100644 index 00000000..e1d64a3b --- /dev/null +++ b/hideout-core/src/main/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/hideout-worker/build.gradle.kts b/hideout-worker/build.gradle.kts index 5ae0e94c..db74eb66 100644 --- a/hideout-worker/build.gradle.kts +++ b/hideout-worker/build.gradle.kts @@ -45,12 +45,21 @@ dependencies { implementation("dev.usbharu:http-signature:1.0.0") implementation("org.springframework.boot:spring-boot-starter") implementation("org.jetbrains.kotlin:kotlin-reflect") + implementation("org.springframework.boot:spring-boot-starter-log4j2") implementation(libs.jackson.databind) implementation(libs.jackson.module.kotlin) + implementation(libs.bundles.coroutines) testImplementation("org.springframework.boot:spring-boot-starter-test") } +configurations { + all { + exclude("org.springframework.boot", "spring-boot-starter-logging") + exclude("ch.qos.logback", "logback-classic") + } +} + tasks.test { useJUnitPlatform() } diff --git a/hideout-worker/src/main/kotlin/dev/usbharu/hideout/SpringTaskRunnerLoader.kt b/hideout-worker/src/main/kotlin/dev/usbharu/hideout/SpringTaskRunnerLoader.kt new file mode 100644 index 00000000..6ff0b4a3 --- /dev/null +++ b/hideout-worker/src/main/kotlin/dev/usbharu/hideout/SpringTaskRunnerLoader.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.hideout + +import dev.usbharu.owl.consumer.TaskRunner +import dev.usbharu.owl.consumer.TaskRunnerLoader +import org.springframework.stereotype.Component + +@Component +class SpringTaskRunnerLoader(private val taskRunners: List) : TaskRunnerLoader { + override fun load(): Map = taskRunners.associateBy { it.name } +} \ No newline at end of file diff --git a/hideout-worker/src/main/kotlin/dev/usbharu/hideout/WorkerRunner.kt b/hideout-worker/src/main/kotlin/dev/usbharu/hideout/WorkerRunner.kt new file mode 100644 index 00000000..012e5ee5 --- /dev/null +++ b/hideout-worker/src/main/kotlin/dev/usbharu/hideout/WorkerRunner.kt @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.hideout + +import dev.usbharu.owl.consumer.StandaloneConsumer +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import org.springframework.boot.ApplicationArguments +import org.springframework.boot.ApplicationRunner +import org.springframework.stereotype.Component + +@Component +class WorkerRunner(private val springTaskRunnerLoader: SpringTaskRunnerLoader) : ApplicationRunner { + override fun run(args: ApplicationArguments?) { + GlobalScope.launch(Dispatchers.Default) { + val consumer = StandaloneConsumer(taskRunnerLoader = springTaskRunnerLoader) + consumer.init() + consumer.start() + } + } +} \ No newline at end of file diff --git a/libs.versions.toml b/libs.versions.toml index 2d43afd5..85f71822 100644 --- a/libs.versions.toml +++ b/libs.versions.toml @@ -11,7 +11,7 @@ serialization = "1.6.3" kjob = "0.6.0" tika = "2.9.1" owl = "0.0.1" -jackson = "2.17.1" +jackson = "2.15.4" [libraries] diff --git a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt index 343b5c21..a20f1d02 100644 --- a/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt +++ b/owl/owl-broker/owl-broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -66,7 +66,7 @@ class MongodbQueuedTaskRepository( eq(QueuedTaskMongodb::isActive.name, true) ), listOf( - set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), + set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer?.toString()), set(QueuedTaskMongodb::assignedAt.name, update.assignedAt), set(QueuedTaskMongodb::queuedAt.name, update.queuedAt), set(QueuedTaskMongodb::isActive.name, update.isActive) diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt index 7f3d71b4..265a9487 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt @@ -44,7 +44,7 @@ fun main() { DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy())) } } - modules(module, defaultModule, moduleContext.module()) + modules(defaultModule, module, moduleContext.module()) } val application = koin.koin.get() diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt index 66696f23..6ed8527e 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt @@ -34,7 +34,8 @@ class OwlBrokerApplication( private val subscribeTaskService: SubscribeTaskService, private val taskPublishService: TaskPublishService, private val taskManagementService: TaskManagementService, - private val taskResultSubscribeService: TaskResultSubscribeService + private val taskResultSubscribeService: TaskResultSubscribeService, + private val taskResultService: TaskResultService, ) { private lateinit var server: Server @@ -47,6 +48,7 @@ class OwlBrokerApplication( .addService(subscribeTaskService) .addService(taskPublishService) .addService(taskResultSubscribeService) + .addService(taskResultService) .build() server.start() diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt index 4004de82..9c71b8e0 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt @@ -24,10 +24,13 @@ import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.service.QueuedTaskAssigner import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory +import io.grpc.Status +import io.grpc.StatusException import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.map import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -40,19 +43,34 @@ class AssignmentTaskService( AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) { override fun ready(requests: Flow): Flow { - return requests - .flatMapMerge { - queuedTaskAssigner.ready(it.consumerId.toUUID(), it.numberOfConcurrent) - } - .map { - Task.TaskRequest - .newBuilder() - .setName(it.task.name) - .setId(it.task.id.toUUID()) - .setAttempt(it.attempt) - .setQueuedAt(it.queuedAt.toTimestamp()) - .putAllProperties(PropertySerializeUtils.serialize(propertySerializerFactory, it.task.properties)) - .build() - } + + return try { + requests + .flatMapMerge { + queuedTaskAssigner.ready(it.consumerId.toUUID(), it.numberOfConcurrent) + } + .map { + Task.TaskRequest + .newBuilder() + .setName(it.task.name) + .setId(it.task.id.toUUID()) + .setAttempt(it.attempt) + .setQueuedAt(it.queuedAt.toTimestamp()) + .putAllProperties( + PropertySerializeUtils.serialize( + propertySerializerFactory, + it.task.properties + ) + ) + .build() + } + } catch (e: Exception) { + logger.warn("Error while reading requests", e) + throw StatusException(Status.INTERNAL.withDescription("Error while reading requests").withCause(e)) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(AssignmentTaskService::class.java) } } \ No newline at end of file diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt index 613480b9..1a82a7ef 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskResultService.kt @@ -24,13 +24,19 @@ import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.service.TaskManagementService import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializerFactory +import io.grpc.Status +import io.grpc.StatusException +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext +@Singleton class TaskResultService( coroutineContext: CoroutineContext = EmptyCoroutineContext, private val taskManagementService: TaskManagementService, @@ -38,18 +44,29 @@ class TaskResultService( ) : TaskResultServiceGrpcKt.TaskResultServiceCoroutineImplBase(coroutineContext) { override suspend fun tasKResult(requests: Flow): Empty { - requests.onEach { - taskManagementService.queueProcessed( - TaskResult( - id = UUID.randomUUID(), - taskId = it.id.toUUID(), - success = it.success, - attempt = it.attempt, - result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap), - message = it.message + try { + requests.onEach { + taskManagementService.queueProcessed( + TaskResult( + id = UUID.randomUUID(), + taskId = it.id.toUUID(), + success = it.success, + attempt = it.attempt, + result = PropertySerializeUtils.deserialize(propertySerializerFactory, it.resultMap), + message = it.message + ) ) - ) - }.collect() + }.collect() + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + logger.warn("Error while executing task results", e) + throw StatusException(Status.INTERNAL.withDescription("Error while executing task results").withCause(e)) + } return Empty.getDefaultInstance() } + + companion object { + private val logger = LoggerFactory.getLogger(TaskResultService::class.java) + } } \ No newline at end of file diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt index d35c6e06..b1caaf51 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/DefaultPropertySerializerFactory.kt @@ -26,6 +26,8 @@ class DefaultPropertySerializerFactory : IntegerPropertySerializer(), StringPropertyValueSerializer(), DoublePropertySerializer(), - BooleanPropertySerializer() + BooleanPropertySerializer(), + LongPropertySerializer(), + FloatPropertySerializer(), ) ) \ No newline at end of file diff --git a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt index da6c1390..a529bbe4 100644 --- a/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt +++ b/owl/owl-broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt @@ -18,10 +18,7 @@ package dev.usbharu.owl.broker.service import dev.usbharu.owl.broker.domain.exception.service.QueueCannotDequeueException import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.* import org.koin.core.annotation.Singleton import org.slf4j.LoggerFactory import java.time.Instant @@ -37,6 +34,7 @@ class QueuedTaskAssignerImpl( private val queueStore: QueueStore ) : QueuedTaskAssigner { override fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow { + logger.trace("Ready {}/{}", numberOfConcurrent, consumerId) return flow { taskManagementService.findAssignableTask(consumerId, numberOfConcurrent) .onEach { @@ -46,6 +44,7 @@ class QueuedTaskAssignerImpl( emit(assignTask) } } + .catch { logger.warn("Failed to assign task {}", consumerId, it) } .collect() } } diff --git a/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt b/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt index 10682e1d..7d92a350 100644 --- a/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt +++ b/owl/owl-common/owl-common-serialize-jackson/src/main/kotlin/dev/usbharu/owl/common/property/ObjectPropertyValue.kt @@ -25,6 +25,7 @@ class ObjectPropertyValue(override val value: Any) : PropertyValue() { class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : PropertySerializer { override fun isSupported(propertyValue: PropertyValue<*>): Boolean { + println(propertyValue::class.java) return propertyValue is ObjectPropertyValue } @@ -39,11 +40,11 @@ class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : Propert } override fun deserialize(string: String): PropertyValue { - +//todo jacksonに読み込ませるStringがjackson:classname:jsonになっているのでjsonだけを読み込ませる return ObjectPropertyValue( objectMapper.readValue( string, - Class.forName(string.substringAfter("jackson:").substringBeforeLast(":")) + Class.forName(string.substringAfter("jackson:").substringBefore(":")) ) ) diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt index c1d0537b..00d7a3f3 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/CustomPropertySerializerFactory.kt @@ -24,7 +24,8 @@ package dev.usbharu.owl.common.property open class CustomPropertySerializerFactory(private val propertySerializers: Set>) : PropertySerializerFactory { override fun factory(propertyValue: PropertyValue): PropertySerializer { - return propertySerializers.first { it.isSupported(propertyValue) } as PropertySerializer + return propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer? + ?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue") } override fun factory(string: String): PropertySerializer<*> { diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt new file mode 100644 index 00000000..4acc597d --- /dev/null +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeException.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + +class PropertySerializeException : RuntimeException { + constructor() : super() + constructor(message: String?) : super(message) + constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor(cause: Throwable?) : super(cause) + constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super( + message, + cause, + enableSuppression, + writableStackTrace + ) +} \ No newline at end of file diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt index 248e63f2..94c411c4 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt @@ -29,9 +29,16 @@ object PropertySerializeUtils { */ fun serialize( serializerFactory: PropertySerializerFactory, - properties: Map> - ): Map = - properties.map { it.key to serializerFactory.factory(it.value).serialize(it.value) }.toMap() + properties: Map>, + ): Map { + return properties.map { + try { + it.key to serializerFactory.factory(it.value).serialize(it.value) + } catch (e: Exception) { + throw PropertySerializeException("Failed to serialize property in ${serializerFactory.javaClass}", e) + } + }.toMap() + } /** * Stringとシリアライズ済みの[PropertyValue]の[Map]からシリアライズ済みの[PropertyValue]をデシリアライズし、Stringと[PropertyValue]の[Map]として返します diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt index 90910a8c..d04ca229 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt @@ -31,4 +31,9 @@ abstract class PropertyValue { * プロパティの型 */ abstract val type: PropertyType + override fun toString(): String { + return "PropertyValue(value=$value, type=$type)" + } + + } \ No newline at end of file diff --git a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt index 0b7ec1dd..a4ef6a5b 100644 --- a/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt +++ b/owl/owl-common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt @@ -107,7 +107,11 @@ interface TaskDefinition { */ fun deserialize(value: Map>): T { - val task = type.getDeclaredConstructor().newInstance() + val task = try { + type.getDeclaredConstructor().newInstance() + } catch (e: Exception) { + throw IllegalArgumentException("Unable to deserialize value $value for type ${type.name}", e) + } type.fields.associateBy { it.name }.mapValues { when { diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt index 777e2d4a..886bacc4 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -81,86 +81,116 @@ class Consumer( suspend fun start() { coroutineScope = CoroutineScope(Dispatchers.Default) coroutineScope { - taskResultStub - .tasKResult(flow { - assignmentTaskStub - .ready(flow { - while (coroutineScope.isActive) { - val andSet = concurrent.getAndUpdate { 0 } + while (isActive) { + try { + taskResultStub + .tasKResult(flow { + assignmentTaskStub + .ready(flow { + requestTask() + }).onEach { + logger.info("Start Task name: {}", it.name) + processing.update { it + 1 } + try { - if (andSet != 0) { - logger.debug("Request {} tasks.", andSet) - emit(readyRequest { - this.consumerId = consumerId - this.numberOfConcurrent = andSet - }) - continue - } - delay(100) - - concurrent.update { - ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) - } - } - }).onEach { - logger.info("Start Task name: {}", it.name) - processing.update { it + 1 } - - try { - - val taskResult = runnerMap.getValue(it.name).run( - TaskRequest( - it.name, - java.util.UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), - it.attempt, - Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), - PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) - ) - ) - - emit(taskResult { - this.success = taskResult.success - this.attempt = it.attempt - this.id = it.id - this.result.putAll( - PropertySerializeUtils.serialize( - propertySerializerFactory, taskResult.result + val taskResult = runnerMap.getValue(it.name).run( + TaskRequest( + it.name, + java.util.UUID( + it.id.mostSignificantUuidBits, + it.id.leastSignificantUuidBits + ), + it.attempt, + Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), + PropertySerializeUtils.deserialize( + propertySerializerFactory, + it.propertiesMap + ) + ) ) - ) - this.message = taskResult.message - }) - logger.info("Success execute task. name: {} success: {}", it.name, taskResult.success) - logger.debug("TRACE RESULT {}", taskResult) - } catch (e: CancellationException) { - logger.warn("Cancelled execute task.", e) - emit(taskResult { - this.success = false - this.attempt = it.attempt - this.id = it.id - this.message = e.localizedMessage - }) - throw e - } catch (e: Exception) { - logger.warn("Failed execute task.", e) - emit(taskResult { - this.success = false - this.attempt = it.attempt - this.id = it.id - this.message = e.localizedMessage - }) - } finally { - processing.update { it - 1 } - concurrent.update { - if (it < 64) { - it + 1 - } else { - 64 + + emit(taskResult { + this.success = taskResult.success + this.attempt = it.attempt + this.id = it.id + this.result.putAll( + PropertySerializeUtils.serialize( + propertySerializerFactory, taskResult.result + ) + ) + this.message = taskResult.message + }) + logger.info( + "Success execute task. name: {} success: {}", + it.name, + taskResult.success + ) + logger.debug("TRACE RESULT {}", taskResult) + } catch (e: CancellationException) { + logger.warn("Cancelled execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + throw e + } catch (e: Exception) { + logger.warn("Failed execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + } finally { + processing.update { it - 1 } + concurrent.update { + if (it < 64) { + it + 1 + } else { + 64 + } + } } - } - } - }.flowOn(Dispatchers.Default).collect() - }) + }.flowOn(Dispatchers.Default).collect() + }) + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + logger.warn("Consumer error", e) + } + + delay(1000) + } + } + } + + private suspend fun FlowCollector.requestTask() { + while (coroutineScope.isActive) { + val andSet = concurrent.getAndUpdate { 0 } + + + if (andSet != 0) { + logger.debug("Request {} tasks.", andSet) + try { + emit(readyRequest { + this.consumerId = this@Consumer.consumerId + this.numberOfConcurrent = andSet + }) + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + logger.warn("Failed request task.", e) + } + continue + } + delay(100) + + concurrent.update { + ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) + } } } diff --git a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt index 0e54a92a..34f75a05 100644 --- a/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt +++ b/owl/owl-consumer/src/main/kotlin/dev/usbharu/owl/consumer/StandaloneConsumer.kt @@ -52,7 +52,11 @@ class StandaloneConsumer( constructor( propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory(emptySet()), taskRunnerLoader: TaskRunnerLoader = ServiceLoaderTaskRunnerLoader(), - ) : this(Path.of("consumer.properties"), propertySerializerFactory, taskRunnerLoader) + ) : this( + Path.of(StandaloneConsumer::class.java.getClassLoader().getResource("consumer.properties").toURI()), + propertySerializerFactory, + taskRunnerLoader + ) private val channel = ManagedChannelBuilder.forAddress(config.address, config.port) .usePlaintext() @@ -68,7 +72,7 @@ class StandaloneConsumer( taskResultStub = taskResultStub, taskRunnerLoader = taskRunnerLoader, propertySerializerFactory = propertySerializerFactory, - consumerConfig = ConsumerConfig(config.concurrency) + consumerConfig = ConsumerConfig(config.concurrency), ) /** diff --git a/owl/owl-consumer/src/main/resources/consumer.properties b/owl/owl-consumer/src/main/resources/consumer.properties new file mode 100644 index 00000000..05da7435 --- /dev/null +++ b/owl/owl-consumer/src/main/resources/consumer.properties @@ -0,0 +1,20 @@ +# +# Copyright (C) 2024 usbharu +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +address=localhost +port=50051 +name=owl +hostname=localhost +concurrency=10 \ No newline at end of file diff --git a/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt b/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt index bdf950bf..0f9c5647 100644 --- a/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt +++ b/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt @@ -20,6 +20,7 @@ import dev.usbharu.owl.broker.OwlBrokerApplication import dev.usbharu.owl.broker.domain.exception.InvalidRepositoryException import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository import dev.usbharu.owl.broker.service.* +import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.common.retry.RetryPolicyFactory import dev.usbharu.owl.common.task.PublishedTask import dev.usbharu.owl.common.task.Task @@ -51,8 +52,11 @@ class EmbeddedOwlProducer( single { embeddedOwlProducerConfig.retryPolicyFactory } + single { + embeddedOwlProducerConfig.propertySerializerFactory + } } - modules(module, defaultModule, embeddedOwlProducerConfig.moduleContext.module()) + modules(defaultModule, module, embeddedOwlProducerConfig.moduleContext.module()) }.koin application.getOrNull() diff --git a/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerConfig.kt b/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerConfig.kt index 086ad5bc..61e60220 100644 --- a/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerConfig.kt +++ b/owl/owl-producer/owl-producer-embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducerConfig.kt @@ -17,12 +17,14 @@ package dev.usbharu.owl.producer.embedded import dev.usbharu.owl.broker.ModuleContext +import dev.usbharu.owl.common.property.CustomPropertySerializerFactory import dev.usbharu.owl.common.retry.RetryPolicyFactory import dev.usbharu.owl.producer.api.OwlProducerConfig class EmbeddedOwlProducerConfig : OwlProducerConfig { lateinit var moduleContext: ModuleContext lateinit var retryPolicyFactory: RetryPolicyFactory + lateinit var propertySerializerFactory: CustomPropertySerializerFactory lateinit var name: String lateinit var port: String }