feat: Consumerを読み込んで起動できるように

This commit is contained in:
usbharu 2024-05-11 19:04:57 +09:00
parent 918de02c86
commit 5cdf78483d
25 changed files with 392 additions and 121 deletions

View File

@ -16,7 +16,9 @@
package dev.usbharu.hideout.application.config package dev.usbharu.hideout.application.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.usbharu.owl.broker.ModuleContext import dev.usbharu.owl.broker.ModuleContext
import dev.usbharu.owl.common.property.*
import dev.usbharu.owl.common.retry.RetryPolicyFactory import dev.usbharu.owl.common.retry.RetryPolicyFactory
import dev.usbharu.owl.producer.api.OWL import dev.usbharu.owl.producer.api.OWL
import dev.usbharu.owl.producer.api.OwlProducer import dev.usbharu.owl.producer.api.OwlProducer
@ -24,6 +26,7 @@ import dev.usbharu.owl.producer.defaultimpl.DEFAULT
import dev.usbharu.owl.producer.embedded.EMBEDDED import dev.usbharu.owl.producer.embedded.EMBEDDED
import dev.usbharu.owl.producer.embedded.EMBEDDED_GRPC import dev.usbharu.owl.producer.embedded.EMBEDDED_GRPC
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
@ -32,7 +35,10 @@ import java.util.*
@Configuration @Configuration
class OwlConfig(private val producerConfig: ProducerConfig) { class OwlConfig(private val producerConfig: ProducerConfig) {
@Bean @Bean
fun producer(@Autowired(required = false) retryPolicyFactory: RetryPolicyFactory? = null): OwlProducer { fun producer(
@Autowired(required = false) retryPolicyFactory: RetryPolicyFactory? = null,
@Qualifier("activitypub") objectMapper: ObjectMapper,
): OwlProducer {
return when (producerConfig.mode) { return when (producerConfig.mode) {
ProducerMode.EMBEDDED -> { ProducerMode.EMBEDDED -> {
OWL(EMBEDDED) { OWL(EMBEDDED) {
@ -46,6 +52,17 @@ class OwlConfig(private val producerConfig: ProducerConfig) {
if (moduleContext != null) { if (moduleContext != null) {
this.moduleContext = moduleContext this.moduleContext = moduleContext
} }
this.propertySerializerFactory = CustomPropertySerializerFactory(
setOf(
IntegerPropertySerializer(),
StringPropertyValueSerializer(),
DoublePropertySerializer(),
BooleanPropertySerializer(),
LongPropertySerializer(),
FloatPropertySerializer(),
ObjectPropertySerializer(objectMapper),
)
)
} }
} }

View File

@ -18,6 +18,9 @@ package dev.usbharu.hideout.core.external.job
import dev.usbharu.hideout.activitypub.service.common.ActivityType import dev.usbharu.hideout.activitypub.service.common.ActivityType
import dev.usbharu.httpsignature.common.HttpRequest import dev.usbharu.httpsignature.common.HttpRequest
import dev.usbharu.owl.common.property.ObjectPropertyValue
import dev.usbharu.owl.common.property.PropertyValue
import dev.usbharu.owl.common.property.StringPropertyValue
import dev.usbharu.owl.common.task.Task import dev.usbharu.owl.common.task.Task
import dev.usbharu.owl.common.task.TaskDefinition import dev.usbharu.owl.common.task.TaskDefinition
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
@ -33,4 +36,22 @@ data class InboxTask(
data object InboxTaskDef : TaskDefinition<InboxTask> { data object InboxTaskDef : TaskDefinition<InboxTask> {
override val type: Class<InboxTask> override val type: Class<InboxTask>
get() = InboxTask::class.java get() = InboxTask::class.java
override fun serialize(task: InboxTask): Map<String, PropertyValue<*>> {
return mapOf(
"json" to StringPropertyValue(task.json),
"type" to ObjectPropertyValue(task.type),
"httpRequest" to ObjectPropertyValue(task.httpRequest),
"headers" to ObjectPropertyValue(task.headers),
)
}
override fun deserialize(value: Map<String, PropertyValue<*>>): InboxTask {
return InboxTask(
value.getValue("json").value as String,
value.getValue("type").value as ActivityType,
value.getValue("httpRequest").value as HttpRequest,
value.getValue("headers").value as Map<String, List<String>>,
)
}
} }

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
<Logger name="dev.usbharu.owl.broker.service.QueuedTaskAssignerImpl" level="TRACE">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>

View File

@ -45,12 +45,21 @@ dependencies {
implementation("dev.usbharu:http-signature:1.0.0") implementation("dev.usbharu:http-signature:1.0.0")
implementation("org.springframework.boot:spring-boot-starter") implementation("org.springframework.boot:spring-boot-starter")
implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.springframework.boot:spring-boot-starter-log4j2")
implementation(libs.jackson.databind) implementation(libs.jackson.databind)
implementation(libs.jackson.module.kotlin) implementation(libs.jackson.module.kotlin)
implementation(libs.bundles.coroutines)
testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.springframework.boot:spring-boot-starter-test")
} }
configurations {
all {
exclude("org.springframework.boot", "spring-boot-starter-logging")
exclude("ch.qos.logback", "logback-classic")
}
}
tasks.test { tasks.test {
useJUnitPlatform() useJUnitPlatform()
} }

View File

@ -0,0 +1,26 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.hideout
import dev.usbharu.owl.consumer.TaskRunner
import dev.usbharu.owl.consumer.TaskRunnerLoader
import org.springframework.stereotype.Component
@Component
class SpringTaskRunnerLoader(private val taskRunners: List<TaskRunner>) : TaskRunnerLoader {
override fun load(): Map<String, TaskRunner> = taskRunners.associateBy { it.name }
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.hideout
import dev.usbharu.owl.consumer.StandaloneConsumer
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.stereotype.Component
@Component
class WorkerRunner(private val springTaskRunnerLoader: SpringTaskRunnerLoader) : ApplicationRunner {
override fun run(args: ApplicationArguments?) {
GlobalScope.launch(Dispatchers.Default) {
val consumer = StandaloneConsumer(taskRunnerLoader = springTaskRunnerLoader)
consumer.init()
consumer.start()
}
}
}

View File

@ -11,7 +11,7 @@ serialization = "1.6.3"
kjob = "0.6.0" kjob = "0.6.0"
tika = "2.9.1" tika = "2.9.1"
owl = "0.0.1" owl = "0.0.1"
jackson = "2.17.1" jackson = "2.15.4"
[libraries] [libraries]

View File

@ -66,7 +66,7 @@ class MongodbQueuedTaskRepository(
eq(QueuedTaskMongodb::isActive.name, true) eq(QueuedTaskMongodb::isActive.name, true)
), ),
listOf( listOf(
set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer?.toString()),
set(QueuedTaskMongodb::assignedAt.name, update.assignedAt), set(QueuedTaskMongodb::assignedAt.name, update.assignedAt),
set(QueuedTaskMongodb::queuedAt.name, update.queuedAt), set(QueuedTaskMongodb::queuedAt.name, update.queuedAt),
set(QueuedTaskMongodb::isActive.name, update.isActive) set(QueuedTaskMongodb::isActive.name, update.isActive)

View File

@ -44,7 +44,7 @@ fun main() {
DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy())) DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy()))
} }
} }
modules(module, defaultModule, moduleContext.module()) modules(defaultModule, module, moduleContext.module())
} }
val application = koin.koin.get<OwlBrokerApplication>() val application = koin.koin.get<OwlBrokerApplication>()

View File

@ -34,7 +34,8 @@ class OwlBrokerApplication(
private val subscribeTaskService: SubscribeTaskService, private val subscribeTaskService: SubscribeTaskService,
private val taskPublishService: TaskPublishService, private val taskPublishService: TaskPublishService,
private val taskManagementService: TaskManagementService, private val taskManagementService: TaskManagementService,
private val taskResultSubscribeService: TaskResultSubscribeService private val taskResultSubscribeService: TaskResultSubscribeService,
private val taskResultService: TaskResultService,
) { ) {
private lateinit var server: Server private lateinit var server: Server
@ -47,6 +48,7 @@ class OwlBrokerApplication(
.addService(subscribeTaskService) .addService(subscribeTaskService)
.addService(taskPublishService) .addService(taskPublishService)
.addService(taskResultSubscribeService) .addService(taskResultSubscribeService)
.addService(taskResultService)
.build() .build()
server.start() server.start()

View File

@ -24,10 +24,13 @@ import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.QueuedTaskAssigner import dev.usbharu.owl.broker.service.QueuedTaskAssigner
import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.common.property.PropertySerializerFactory
import io.grpc.Status
import io.grpc.StatusException
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
@ -40,7 +43,9 @@ class AssignmentTaskService(
AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) { AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) {
override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> { override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> {
return requests
return try {
requests
.flatMapMerge { .flatMapMerge {
queuedTaskAssigner.ready(it.consumerId.toUUID(), it.numberOfConcurrent) queuedTaskAssigner.ready(it.consumerId.toUUID(), it.numberOfConcurrent)
} }
@ -51,8 +56,21 @@ class AssignmentTaskService(
.setId(it.task.id.toUUID()) .setId(it.task.id.toUUID())
.setAttempt(it.attempt) .setAttempt(it.attempt)
.setQueuedAt(it.queuedAt.toTimestamp()) .setQueuedAt(it.queuedAt.toTimestamp())
.putAllProperties(PropertySerializeUtils.serialize(propertySerializerFactory, it.task.properties)) .putAllProperties(
PropertySerializeUtils.serialize(
propertySerializerFactory,
it.task.properties
)
)
.build() .build()
} }
} catch (e: Exception) {
logger.warn("Error while reading requests", e)
throw StatusException(Status.INTERNAL.withDescription("Error while reading requests").withCause(e))
}
}
companion object {
private val logger = LoggerFactory.getLogger(AssignmentTaskService::class.java)
} }
} }

View File

@ -24,13 +24,19 @@ import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.TaskManagementService import dev.usbharu.owl.broker.service.TaskManagementService
import dev.usbharu.owl.common.property.PropertySerializeUtils import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory import dev.usbharu.owl.common.property.PropertySerializerFactory
import io.grpc.Status
import io.grpc.StatusException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import org.koin.core.annotation.Singleton
import org.slf4j.LoggerFactory
import java.util.* import java.util.*
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
@Singleton
class TaskResultService( class TaskResultService(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val taskManagementService: TaskManagementService, private val taskManagementService: TaskManagementService,
@ -38,6 +44,7 @@ class TaskResultService(
) : ) :
TaskResultServiceGrpcKt.TaskResultServiceCoroutineImplBase(coroutineContext) { TaskResultServiceGrpcKt.TaskResultServiceCoroutineImplBase(coroutineContext) {
override suspend fun tasKResult(requests: Flow<TaskResultOuterClass.TaskResult>): Empty { override suspend fun tasKResult(requests: Flow<TaskResultOuterClass.TaskResult>): Empty {
try {
requests.onEach { requests.onEach {
taskManagementService.queueProcessed( taskManagementService.queueProcessed(
TaskResult( TaskResult(
@ -50,6 +57,16 @@ class TaskResultService(
) )
) )
}.collect() }.collect()
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.warn("Error while executing task results", e)
throw StatusException(Status.INTERNAL.withDescription("Error while executing task results").withCause(e))
}
return Empty.getDefaultInstance() return Empty.getDefaultInstance()
} }
companion object {
private val logger = LoggerFactory.getLogger(TaskResultService::class.java)
}
} }

View File

@ -26,6 +26,8 @@ class DefaultPropertySerializerFactory :
IntegerPropertySerializer(), IntegerPropertySerializer(),
StringPropertyValueSerializer(), StringPropertyValueSerializer(),
DoublePropertySerializer(), DoublePropertySerializer(),
BooleanPropertySerializer() BooleanPropertySerializer(),
LongPropertySerializer(),
FloatPropertySerializer(),
) )
) )

View File

@ -18,10 +18,7 @@ package dev.usbharu.owl.broker.service
import dev.usbharu.owl.broker.domain.exception.service.QueueCannotDequeueException import dev.usbharu.owl.broker.domain.exception.service.QueueCannotDequeueException
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import org.koin.core.annotation.Singleton import org.koin.core.annotation.Singleton
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.time.Instant import java.time.Instant
@ -37,6 +34,7 @@ class QueuedTaskAssignerImpl(
private val queueStore: QueueStore private val queueStore: QueueStore
) : QueuedTaskAssigner { ) : QueuedTaskAssigner {
override fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> { override fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
logger.trace("Ready {}/{}", numberOfConcurrent, consumerId)
return flow { return flow {
taskManagementService.findAssignableTask(consumerId, numberOfConcurrent) taskManagementService.findAssignableTask(consumerId, numberOfConcurrent)
.onEach { .onEach {
@ -46,6 +44,7 @@ class QueuedTaskAssignerImpl(
emit(assignTask) emit(assignTask)
} }
} }
.catch { logger.warn("Failed to assign task {}", consumerId, it) }
.collect() .collect()
} }
} }

View File

@ -25,6 +25,7 @@ class ObjectPropertyValue(override val value: Any) : PropertyValue<Any>() {
class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : PropertySerializer<Any> { class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : PropertySerializer<Any> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean { override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
println(propertyValue::class.java)
return propertyValue is ObjectPropertyValue return propertyValue is ObjectPropertyValue
} }
@ -39,11 +40,11 @@ class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : Propert
} }
override fun deserialize(string: String): PropertyValue<Any> { override fun deserialize(string: String): PropertyValue<Any> {
//todo jacksonに読み込ませるStringがjackson:classname:jsonになっているのでjsonだけを読み込ませる
return ObjectPropertyValue( return ObjectPropertyValue(
objectMapper.readValue( objectMapper.readValue(
string, string,
Class.forName(string.substringAfter("jackson:").substringBeforeLast(":")) Class.forName(string.substringAfter("jackson:").substringBefore(":"))
) )
) )

View File

@ -24,7 +24,8 @@ package dev.usbharu.owl.common.property
open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) : open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) :
PropertySerializerFactory { PropertySerializerFactory {
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> { override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> {
return propertySerializers.first { it.isSupported(propertyValue) } as PropertySerializer<T> return propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer<T>?
?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue")
} }
override fun factory(string: String): PropertySerializer<*> { override fun factory(string: String): PropertySerializer<*> {

View File

@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.common.property
class PropertySerializeException : RuntimeException {
constructor() : super()
constructor(message: String?) : super(message)
constructor(message: String?, cause: Throwable?) : super(message, cause)
constructor(cause: Throwable?) : super(cause)
constructor(message: String?, cause: Throwable?, enableSuppression: Boolean, writableStackTrace: Boolean) : super(
message,
cause,
enableSuppression,
writableStackTrace
)
}

View File

@ -29,9 +29,16 @@ object PropertySerializeUtils {
*/ */
fun serialize( fun serialize(
serializerFactory: PropertySerializerFactory, serializerFactory: PropertySerializerFactory,
properties: Map<String, PropertyValue<*>> properties: Map<String, PropertyValue<*>>,
): Map<String, String> = ): Map<String, String> {
properties.map { it.key to serializerFactory.factory(it.value).serialize(it.value) }.toMap() return properties.map {
try {
it.key to serializerFactory.factory(it.value).serialize(it.value)
} catch (e: Exception) {
throw PropertySerializeException("Failed to serialize property in ${serializerFactory.javaClass}", e)
}
}.toMap()
}
/** /**
* Stringとシリアライズ済みの[PropertyValue][Map]からシリアライズ済みの[PropertyValue]をデシリアライズしStringと[PropertyValue][Map]として返します * Stringとシリアライズ済みの[PropertyValue][Map]からシリアライズ済みの[PropertyValue]をデシリアライズしStringと[PropertyValue][Map]として返します

View File

@ -31,4 +31,9 @@ abstract class PropertyValue<T> {
* プロパティの型 * プロパティの型
*/ */
abstract val type: PropertyType abstract val type: PropertyType
override fun toString(): String {
return "PropertyValue(value=$value, type=$type)"
}
} }

View File

@ -107,7 +107,11 @@ interface TaskDefinition<T : Task> {
*/ */
fun deserialize(value: Map<String, PropertyValue<*>>): T { fun deserialize(value: Map<String, PropertyValue<*>>): T {
val task = type.getDeclaredConstructor().newInstance() val task = try {
type.getDeclaredConstructor().newInstance()
} catch (e: Exception) {
throw IllegalArgumentException("Unable to deserialize value $value for type ${type.name}", e)
}
type.fields.associateBy { it.name }.mapValues { type.fields.associateBy { it.name }.mapValues {
when { when {

View File

@ -81,28 +81,13 @@ class Consumer(
suspend fun start() { suspend fun start() {
coroutineScope = CoroutineScope(Dispatchers.Default) coroutineScope = CoroutineScope(Dispatchers.Default)
coroutineScope { coroutineScope {
while (isActive) {
try {
taskResultStub taskResultStub
.tasKResult(flow { .tasKResult(flow {
assignmentTaskStub assignmentTaskStub
.ready(flow { .ready(flow {
while (coroutineScope.isActive) { requestTask()
val andSet = concurrent.getAndUpdate { 0 }
if (andSet != 0) {
logger.debug("Request {} tasks.", andSet)
emit(readyRequest {
this.consumerId = consumerId
this.numberOfConcurrent = andSet
})
continue
}
delay(100)
concurrent.update {
((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value))
}
}
}).onEach { }).onEach {
logger.info("Start Task name: {}", it.name) logger.info("Start Task name: {}", it.name)
processing.update { it + 1 } processing.update { it + 1 }
@ -112,10 +97,16 @@ class Consumer(
val taskResult = runnerMap.getValue(it.name).run( val taskResult = runnerMap.getValue(it.name).run(
TaskRequest( TaskRequest(
it.name, it.name,
java.util.UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), java.util.UUID(
it.id.mostSignificantUuidBits,
it.id.leastSignificantUuidBits
),
it.attempt, it.attempt,
Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()),
PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) PropertySerializeUtils.deserialize(
propertySerializerFactory,
it.propertiesMap
)
) )
) )
@ -130,7 +121,11 @@ class Consumer(
) )
this.message = taskResult.message this.message = taskResult.message
}) })
logger.info("Success execute task. name: {} success: {}", it.name, taskResult.success) logger.info(
"Success execute task. name: {} success: {}",
it.name,
taskResult.success
)
logger.debug("TRACE RESULT {}", taskResult) logger.debug("TRACE RESULT {}", taskResult)
} catch (e: CancellationException) { } catch (e: CancellationException) {
logger.warn("Cancelled execute task.", e) logger.warn("Cancelled execute task.", e)
@ -161,6 +156,41 @@ class Consumer(
} }
}.flowOn(Dispatchers.Default).collect() }.flowOn(Dispatchers.Default).collect()
}) })
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.warn("Consumer error", e)
}
delay(1000)
}
}
}
private suspend fun FlowCollector<Task.ReadyRequest>.requestTask() {
while (coroutineScope.isActive) {
val andSet = concurrent.getAndUpdate { 0 }
if (andSet != 0) {
logger.debug("Request {} tasks.", andSet)
try {
emit(readyRequest {
this.consumerId = this@Consumer.consumerId
this.numberOfConcurrent = andSet
})
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.warn("Failed request task.", e)
}
continue
}
delay(100)
concurrent.update {
((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value))
}
} }
} }

View File

@ -52,7 +52,11 @@ class StandaloneConsumer(
constructor( constructor(
propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory(emptySet()), propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory(emptySet()),
taskRunnerLoader: TaskRunnerLoader = ServiceLoaderTaskRunnerLoader(), taskRunnerLoader: TaskRunnerLoader = ServiceLoaderTaskRunnerLoader(),
) : this(Path.of("consumer.properties"), propertySerializerFactory, taskRunnerLoader) ) : this(
Path.of(StandaloneConsumer::class.java.getClassLoader().getResource("consumer.properties").toURI()),
propertySerializerFactory,
taskRunnerLoader
)
private val channel = ManagedChannelBuilder.forAddress(config.address, config.port) private val channel = ManagedChannelBuilder.forAddress(config.address, config.port)
.usePlaintext() .usePlaintext()
@ -68,7 +72,7 @@ class StandaloneConsumer(
taskResultStub = taskResultStub, taskResultStub = taskResultStub,
taskRunnerLoader = taskRunnerLoader, taskRunnerLoader = taskRunnerLoader,
propertySerializerFactory = propertySerializerFactory, propertySerializerFactory = propertySerializerFactory,
consumerConfig = ConsumerConfig(config.concurrency) consumerConfig = ConsumerConfig(config.concurrency),
) )
/** /**

View File

@ -0,0 +1,20 @@
#
# Copyright (C) 2024 usbharu
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
address=localhost
port=50051
name=owl
hostname=localhost
concurrency=10

View File

@ -20,6 +20,7 @@ import dev.usbharu.owl.broker.OwlBrokerApplication
import dev.usbharu.owl.broker.domain.exception.InvalidRepositoryException import dev.usbharu.owl.broker.domain.exception.InvalidRepositoryException
import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository
import dev.usbharu.owl.broker.service.* import dev.usbharu.owl.broker.service.*
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.common.retry.RetryPolicyFactory import dev.usbharu.owl.common.retry.RetryPolicyFactory
import dev.usbharu.owl.common.task.PublishedTask import dev.usbharu.owl.common.task.PublishedTask
import dev.usbharu.owl.common.task.Task import dev.usbharu.owl.common.task.Task
@ -51,8 +52,11 @@ class EmbeddedOwlProducer(
single<RetryPolicyFactory> { single<RetryPolicyFactory> {
embeddedOwlProducerConfig.retryPolicyFactory embeddedOwlProducerConfig.retryPolicyFactory
} }
single<PropertySerializerFactory> {
embeddedOwlProducerConfig.propertySerializerFactory
} }
modules(module, defaultModule, embeddedOwlProducerConfig.moduleContext.module()) }
modules(defaultModule, module, embeddedOwlProducerConfig.moduleContext.module())
}.koin }.koin
application.getOrNull<ProducerRepository>() application.getOrNull<ProducerRepository>()

View File

@ -17,12 +17,14 @@
package dev.usbharu.owl.producer.embedded package dev.usbharu.owl.producer.embedded
import dev.usbharu.owl.broker.ModuleContext import dev.usbharu.owl.broker.ModuleContext
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
import dev.usbharu.owl.common.retry.RetryPolicyFactory import dev.usbharu.owl.common.retry.RetryPolicyFactory
import dev.usbharu.owl.producer.api.OwlProducerConfig import dev.usbharu.owl.producer.api.OwlProducerConfig
class EmbeddedOwlProducerConfig : OwlProducerConfig { class EmbeddedOwlProducerConfig : OwlProducerConfig {
lateinit var moduleContext: ModuleContext lateinit var moduleContext: ModuleContext
lateinit var retryPolicyFactory: RetryPolicyFactory lateinit var retryPolicyFactory: RetryPolicyFactory
lateinit var propertySerializerFactory: CustomPropertySerializerFactory
lateinit var name: String lateinit var name: String
lateinit var port: String lateinit var port: String
} }