wip: embedded producer

This commit is contained in:
usbharu 2024-05-01 00:01:12 +09:00
parent f34c9ad53a
commit 297c29f020
9 changed files with 237 additions and 10 deletions

View File

@ -234,6 +234,7 @@ dependencies {
implementation("org.flywaydb:flyway-core")
implementation("dev.usbharu:emoji-kt:2.0.0")
implementation("dev.usbharu:default:0.0.1")
implementation("org.jsoup:jsoup:1.17.2")
implementation("com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:20240325.1")

View File

@ -16,9 +16,9 @@
package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
import dev.usbharu.owl.Task
import dev.usbharu.owl.Task.TaskRequest
import dev.usbharu.owl.broker.external.toTimestamp
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.QueuedTaskAssigner
@ -37,15 +37,15 @@ class AssignmentTaskService(
private val queuedTaskAssigner: QueuedTaskAssigner,
private val propertySerializerFactory: PropertySerializerFactory
) :
AssignmentTaskServiceCoroutineImplBase(coroutineContext) {
AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) {
override fun ready(requests: Flow<Task.ReadyRequest>): Flow<TaskRequest> {
override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> {
return requests
.flatMapMerge {
queuedTaskAssigner.ready(it.consumerId.toUUID(), it.numberOfConcurrent)
}
.map {
TaskRequest
Task.TaskRequest
.newBuilder()
.setName(it.task.name)
.setId(it.task.id.toUUID())

View File

@ -12,7 +12,7 @@ repositories {
dependencies {
testImplementation("org.jetbrains.kotlin:kotlin-test")
implementation(project(":producer:api"))
api(project(":producer:api"))
implementation("io.grpc:grpc-kotlin-stub:1.4.1")
implementation("io.grpc:grpc-protobuf:1.61.1")
implementation("com.google.protobuf:protobuf-kotlin:3.25.3")

View File

@ -14,11 +14,9 @@
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
package dev.usbharu.owl.producer.defaultimpl
import dev.usbharu.owl.producer.api.OwlProducerBuilder
import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducer
import dev.usbharu.owl.producer.defaultimpl.DefaultOwlProducerConfig
import io.grpc.ManagedChannelBuilder
class DefaultOwlProducerBuilder : OwlProducerBuilder<DefaultOwlProducer, DefaultOwlProducerConfig> {

View File

@ -0,0 +1,25 @@
plugins {
kotlin("jvm")
}
group = "dev.usbharu"
version = "0.0.1"
repositories {
mavenCentral()
}
dependencies {
testImplementation(kotlin("test"))
implementation(project(":producer:api"))
implementation(project(":broker"))
implementation(platform("io.insert-koin:koin-bom:3.5.3"))
implementation("io.insert-koin:koin-core")
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}

View File

@ -0,0 +1,99 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.producer.embedded
import dev.usbharu.owl.broker.ModuleContext
import dev.usbharu.owl.broker.OwlBrokerApplication
import dev.usbharu.owl.broker.service.*
import dev.usbharu.owl.common.task.PublishedTask
import dev.usbharu.owl.common.task.Task
import dev.usbharu.owl.common.task.TaskDefinition
import dev.usbharu.owl.producer.api.OwlProducer
import org.koin.core.Koin
import org.koin.core.context.GlobalContext.startKoin
import org.koin.dsl.module
import org.koin.ksp.generated.defaultModule
import java.time.Instant
import java.util.*
class EmbeddedGrpcOwlProducer(
private val moduleContext: ModuleContext,
private val retryPolicyFactory: RetryPolicyFactory,
private val name: String,
private val port: Int,
private val owlProducer: OwlProducer,
) : OwlProducer {
private lateinit var producerId: UUID
private lateinit var application: Koin
private val taskMap: MutableMap<Class<*>, TaskDefinition<*>> = mutableMapOf()
override suspend fun start() {
application = startKoin {
printLogger()
val module = module {
single<RetryPolicyFactory> {
retryPolicyFactory
}
}
modules(module, defaultModule, moduleContext.module())
}.koin
val producerService = application.get<ProducerService>()
producerId = producerService.registerProducer(RegisterProducerRequest(name, name))
application.get<OwlBrokerApplication>().start(port)
}
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
application.get<RegisterTaskService>()
.registerTask(
dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition(
name = taskDefinition.name,
priority = taskDefinition.priority,
maxRetry = taskDefinition.maxRetry,
timeoutMilli = taskDefinition.timeoutMilli,
propertyDefinitionHash = taskDefinition.propertyDefinition.hash(),
retryPolicy = taskDefinition.retryPolicy
)
)
taskMap[taskDefinition.type] = taskDefinition
}
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition<T>
val publishTask = application.get<TaskPublishService>().publishTask(
PublishTask(
taskDefinition.name,
producerId,
taskDefinition.serialize(task)
)
)
return PublishedTask(
task,
publishTask.id,
Instant.now()
)
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.producer.embedded
import dev.usbharu.owl.broker.ModuleContext
import dev.usbharu.owl.broker.OwlBrokerApplication
import dev.usbharu.owl.broker.service.*
import dev.usbharu.owl.common.task.PublishedTask
import dev.usbharu.owl.common.task.Task
import dev.usbharu.owl.common.task.TaskDefinition
import dev.usbharu.owl.producer.api.OwlProducer
import org.koin.core.Koin
import org.koin.core.context.GlobalContext.startKoin
import org.koin.dsl.module
import org.koin.ksp.generated.defaultModule
import java.time.Instant
import java.util.*
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition as BrokerTaskDefinition
class EmbeddedOwlProducer(
private val moduleContext: ModuleContext,
private val retryPolicyFactory: RetryPolicyFactory,
private val name: String,
private val port: Int,
) : OwlProducer {
private lateinit var producerId: UUID
private lateinit var application: Koin
private val taskMap: MutableMap<Class<*>, TaskDefinition<*>> = mutableMapOf()
override suspend fun start() {
application = startKoin {
printLogger()
val module = module {
single<RetryPolicyFactory> {
retryPolicyFactory
}
}
modules(module, defaultModule, moduleContext.module())
}.koin
val producerService = application.get<ProducerService>()
producerId = producerService.registerProducer(RegisterProducerRequest(name, name))
application.get<OwlBrokerApplication>().start(port)
}
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
application.get<RegisterTaskService>()
.registerTask(
BrokerTaskDefinition(
name = taskDefinition.name,
priority = taskDefinition.priority,
maxRetry = taskDefinition.maxRetry,
timeoutMilli = taskDefinition.timeoutMilli,
propertyDefinitionHash = taskDefinition.propertyDefinition.hash(),
retryPolicy = taskDefinition.retryPolicy
)
)
taskMap[taskDefinition.type] = taskDefinition
}
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition<T>
val publishTask = application.get<TaskPublishService>().publishTask(
PublishTask(
taskDefinition.name,
producerId,
taskDefinition.serialize(task)
)
)
return PublishedTask(
task,
publishTask.id,
Instant.now()
)
}
}

View File

@ -11,3 +11,4 @@ findProject(":broker:broker-mongodb")?.name = "broker-mongodb"
include("producer:default")
findProject(":producer:default")?.name = "default"
include("consumer")
include("producer:embedded")

View File

@ -1,3 +1,6 @@
plugins {
id("org.gradle.toolchains.foojay-resolver-convention") version "0.5.0"
}
rootProject.name = "hideout"
includeBuild("owl")
includeBuild("owl")