From 297c29f0203921f17a8bb6a0a312c678719e4cb4 Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Wed, 1 May 2024 00:01:12 +0900 Subject: [PATCH] wip: embedded producer --- build.gradle.kts | 1 + .../interfaces/grpc/AssignmentTaskService.kt | 10 +- owl/producer/default/build.gradle.kts | 2 +- .../defaultimpl/DefaultOwlProducerBuilder.kt | 4 +- owl/producer/embedded/build.gradle.kts | 25 +++++ .../embedded/EmbeddedGrpcOwlProducer.kt | 99 +++++++++++++++++ .../producer/embedded/EmbeddedOwlProducer.kt | 100 ++++++++++++++++++ owl/settings.gradle.kts | 1 + settings.gradle.kts | 5 +- 9 files changed, 237 insertions(+), 10 deletions(-) create mode 100644 owl/producer/embedded/build.gradle.kts create mode 100644 owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedGrpcOwlProducer.kt create mode 100644 owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt diff --git a/build.gradle.kts b/build.gradle.kts index ff7abe9d..adddf5bb 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -234,6 +234,7 @@ dependencies { implementation("org.flywaydb:flyway-core") implementation("dev.usbharu:emoji-kt:2.0.0") + implementation("dev.usbharu:default:0.0.1") implementation("org.jsoup:jsoup:1.17.2") implementation("com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:20240325.1") diff --git a/owl/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt b/owl/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt index a1840588..4004de82 100644 --- a/owl/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt +++ b/owl/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt @@ -16,9 +16,9 @@ package dev.usbharu.owl.broker.interfaces.grpc -import dev.usbharu.owl.AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase + +import dev.usbharu.owl.AssignmentTaskServiceGrpcKt import dev.usbharu.owl.Task -import dev.usbharu.owl.Task.TaskRequest import dev.usbharu.owl.broker.external.toTimestamp import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.service.QueuedTaskAssigner @@ -37,15 +37,15 @@ class AssignmentTaskService( private val queuedTaskAssigner: QueuedTaskAssigner, private val propertySerializerFactory: PropertySerializerFactory ) : - AssignmentTaskServiceCoroutineImplBase(coroutineContext) { + AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) { - override fun ready(requests: Flow): Flow { + override fun ready(requests: Flow): Flow { return requests .flatMapMerge { queuedTaskAssigner.ready(it.consumerId.toUUID(), it.numberOfConcurrent) } .map { - TaskRequest + Task.TaskRequest .newBuilder() .setName(it.task.name) .setId(it.task.id.toUUID()) diff --git a/owl/producer/default/build.gradle.kts b/owl/producer/default/build.gradle.kts index 722c7de8..877dd36e 100644 --- a/owl/producer/default/build.gradle.kts +++ b/owl/producer/default/build.gradle.kts @@ -12,7 +12,7 @@ repositories { dependencies { testImplementation("org.jetbrains.kotlin:kotlin-test") - implementation(project(":producer:api")) + api(project(":producer:api")) implementation("io.grpc:grpc-kotlin-stub:1.4.1") implementation("io.grpc:grpc-protobuf:1.61.1") implementation("com.google.protobuf:protobuf-kotlin:3.25.3") diff --git a/owl/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt b/owl/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt index 8100088f..4e45e9f4 100644 --- a/owl/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt +++ b/owl/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt @@ -14,11 +14,9 @@ * limitations under the License. */ -package dev.usbharu.dev.usbharu.owl.producer.defaultimpl +package dev.usbharu.owl.producer.defaultimpl import dev.usbharu.owl.producer.api.OwlProducerBuilder -import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducer -import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducerConfig import io.grpc.ManagedChannelBuilder class DefaultOwlProducerBuilder : OwlProducerBuilder { diff --git a/owl/producer/embedded/build.gradle.kts b/owl/producer/embedded/build.gradle.kts new file mode 100644 index 00000000..9dbf4c0c --- /dev/null +++ b/owl/producer/embedded/build.gradle.kts @@ -0,0 +1,25 @@ +plugins { + kotlin("jvm") +} + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation(kotlin("test")) + implementation(project(":producer:api")) + implementation(project(":broker")) + implementation(platform("io.insert-koin:koin-bom:3.5.3")) + implementation("io.insert-koin:koin-core") +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} \ No newline at end of file diff --git a/owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedGrpcOwlProducer.kt b/owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedGrpcOwlProducer.kt new file mode 100644 index 00000000..41b6dd85 --- /dev/null +++ b/owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedGrpcOwlProducer.kt @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.producer.embedded + +import dev.usbharu.owl.broker.ModuleContext +import dev.usbharu.owl.broker.OwlBrokerApplication +import dev.usbharu.owl.broker.service.* +import dev.usbharu.owl.common.task.PublishedTask +import dev.usbharu.owl.common.task.Task +import dev.usbharu.owl.common.task.TaskDefinition +import dev.usbharu.owl.producer.api.OwlProducer +import org.koin.core.Koin +import org.koin.core.context.GlobalContext.startKoin +import org.koin.dsl.module +import org.koin.ksp.generated.defaultModule +import java.time.Instant +import java.util.* + +class EmbeddedGrpcOwlProducer( + private val moduleContext: ModuleContext, + private val retryPolicyFactory: RetryPolicyFactory, + private val name: String, + private val port: Int, + private val owlProducer: OwlProducer, +) : OwlProducer { + private lateinit var producerId: UUID + + private lateinit var application: Koin + + private val taskMap: MutableMap, TaskDefinition<*>> = mutableMapOf() + + override suspend fun start() { + application = startKoin { + printLogger() + + val module = module { + single { + retryPolicyFactory + } + } + modules(module, defaultModule, moduleContext.module()) + }.koin + + val producerService = application.get() + + producerId = producerService.registerProducer(RegisterProducerRequest(name, name)) + + application.get().start(port) + } + + override suspend fun registerTask(taskDefinition: TaskDefinition) { + application.get() + .registerTask( + dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition( + name = taskDefinition.name, + priority = taskDefinition.priority, + maxRetry = taskDefinition.maxRetry, + timeoutMilli = taskDefinition.timeoutMilli, + propertyDefinitionHash = taskDefinition.propertyDefinition.hash(), + retryPolicy = taskDefinition.retryPolicy + ) + ) + + taskMap[taskDefinition.type] = taskDefinition + } + + override suspend fun publishTask(task: T): PublishedTask { + + val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition + + val publishTask = application.get().publishTask( + PublishTask( + taskDefinition.name, + producerId, + taskDefinition.serialize(task) + ) + ) + + return PublishedTask( + task, + publishTask.id, + Instant.now() + ) + } +} \ No newline at end of file diff --git a/owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt b/owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt new file mode 100644 index 00000000..def3b5fb --- /dev/null +++ b/owl/producer/embedded/src/main/kotlin/dev/usbharu/owl/producer/embedded/EmbeddedOwlProducer.kt @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.producer.embedded + +import dev.usbharu.owl.broker.ModuleContext +import dev.usbharu.owl.broker.OwlBrokerApplication +import dev.usbharu.owl.broker.service.* +import dev.usbharu.owl.common.task.PublishedTask +import dev.usbharu.owl.common.task.Task +import dev.usbharu.owl.common.task.TaskDefinition +import dev.usbharu.owl.producer.api.OwlProducer +import org.koin.core.Koin +import org.koin.core.context.GlobalContext.startKoin +import org.koin.dsl.module +import org.koin.ksp.generated.defaultModule +import java.time.Instant +import java.util.* +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition as BrokerTaskDefinition + +class EmbeddedOwlProducer( + private val moduleContext: ModuleContext, + private val retryPolicyFactory: RetryPolicyFactory, + private val name: String, + private val port: Int, +) : OwlProducer { + + private lateinit var producerId: UUID + + private lateinit var application: Koin + + private val taskMap: MutableMap, TaskDefinition<*>> = mutableMapOf() + + override suspend fun start() { + application = startKoin { + printLogger() + + val module = module { + single { + retryPolicyFactory + } + } + modules(module, defaultModule, moduleContext.module()) + }.koin + + val producerService = application.get() + + producerId = producerService.registerProducer(RegisterProducerRequest(name, name)) + + application.get().start(port) + } + + override suspend fun registerTask(taskDefinition: TaskDefinition) { + application.get() + .registerTask( + BrokerTaskDefinition( + name = taskDefinition.name, + priority = taskDefinition.priority, + maxRetry = taskDefinition.maxRetry, + timeoutMilli = taskDefinition.timeoutMilli, + propertyDefinitionHash = taskDefinition.propertyDefinition.hash(), + retryPolicy = taskDefinition.retryPolicy + ) + ) + + taskMap[taskDefinition.type] = taskDefinition + } + + override suspend fun publishTask(task: T): PublishedTask { + + val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition + + val publishTask = application.get().publishTask( + PublishTask( + taskDefinition.name, + producerId, + taskDefinition.serialize(task) + ) + ) + + return PublishedTask( + task, + publishTask.id, + Instant.now() + ) + } +} \ No newline at end of file diff --git a/owl/settings.gradle.kts b/owl/settings.gradle.kts index 87d7071c..450172d1 100644 --- a/owl/settings.gradle.kts +++ b/owl/settings.gradle.kts @@ -11,3 +11,4 @@ findProject(":broker:broker-mongodb")?.name = "broker-mongodb" include("producer:default") findProject(":producer:default")?.name = "default" include("consumer") +include("producer:embedded") \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index d73c9cc5..376e901e 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,3 +1,6 @@ +plugins { + id("org.gradle.toolchains.foojay-resolver-convention") version "0.5.0" +} rootProject.name = "hideout" -includeBuild("owl") \ No newline at end of file +includeBuild("owl")