From 7bdb4719f50c587fa2dc1b01dc9b99a66596628b Mon Sep 17 00:00:00 2001 From: usbharu Date: Sun, 31 Mar 2024 12:12:20 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Producer=E3=81=AE=E3=83=87=E3=83=95?= =?UTF-8?q?=E3=82=A9=E3=83=AB=E3=83=88=E5=AE=9F=E8=A3=85=E3=82=92=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- broker/src/main/proto/publish_task.proto | 1 - .../owl/common/task/PropertyDefinition.kt | 8 +- .../usbharu/owl/common/task/TaskDefinition.kt | 4 +- .../usbharu/owl/producer/api/OwlProducer.kt | 2 + .../owl/producer/api/OwlProducerBuilder.kt | 4 +- .../producer/api/OwlProducerBuilderConfig.kt | 7 +- producer/default/build.gradle.kts | 55 ++++++++++++ .../defaultimpl/DefaultOwlProducer.kt | 90 +++++++++++++++++++ .../defaultimpl/DefaultOwlProducerBuilder.kt | 47 ++++++++++ .../defaultimpl/DefaultOwlProducerConfig.kt} | 17 ++-- producer/impl/build.gradle.kts | 21 ----- .../producer/impl/DefaultOwlProducer.kt | 32 ------- .../impl/datasource/DatasourceFactory.kt | 23 ----- .../ServiceProviderDatasourceFactory.kt | 32 ------- settings.gradle.kts | 4 +- 15 files changed, 222 insertions(+), 125 deletions(-) create mode 100644 producer/default/build.gradle.kts create mode 100644 producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt create mode 100644 producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt rename producer/{impl/src/main/kotlin/dev/usbharu/producer/impl/OwlTaskDatasource.kt => default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt} (58%) delete mode 100644 producer/impl/build.gradle.kts delete mode 100644 producer/impl/src/main/kotlin/dev/usbharu/producer/impl/DefaultOwlProducer.kt delete mode 100644 producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/DatasourceFactory.kt delete mode 100644 producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/ServiceProviderDatasourceFactory.kt diff --git a/broker/src/main/proto/publish_task.proto b/broker/src/main/proto/publish_task.proto index 9194077a..620e6396 100644 --- a/broker/src/main/proto/publish_task.proto +++ b/broker/src/main/proto/publish_task.proto @@ -3,7 +3,6 @@ syntax = "proto3"; import "google/protobuf/timestamp.proto"; import "uuid.proto"; -import "property.proto"; option java_package = "dev.usbharu.owl"; diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt index 44c149b2..11f8dcda 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt @@ -18,6 +18,12 @@ package dev.usbharu.owl.common.task import dev.usbharu.owl.common.property.PropertyType -class PropertyDefinition(val map: Map) : Map by map{ +class PropertyDefinition(val map: Map) : Map by map { + fun hash(): Long { + var hash = 1L + map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 } + return hash + } + } diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt index 8c766c34..d62b94de 100644 --- a/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt @@ -17,15 +17,15 @@ package dev.usbharu.owl.common.task import dev.usbharu.owl.common.property.PropertyValue -import dev.usbharu.owl.common.retry.RetryPolicy interface TaskDefinition { val name: String val priority: Int val maxRetry: Int - val retryPolicy:RetryPolicy + val retryPolicy: String val timeoutMilli: Long val propertyDefinition: PropertyDefinition + val type: Class fun serialize(task: T): Map> fun deserialize(value: Map>): T diff --git a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt index e4df89ab..b15d1d28 100644 --- a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt +++ b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt @@ -22,6 +22,8 @@ import dev.usbharu.owl.common.task.TaskDefinition interface OwlProducer { + suspend fun start() + suspend fun registerTask(taskDefinition: TaskDefinition) suspend fun publishTask(task: T): PublishedTask } diff --git a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt index e49d5f3e..4d1c21ab 100644 --- a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt +++ b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilder.kt @@ -16,7 +16,9 @@ package dev.usbharu.owl.producer.api -interface OwlProducerBuilder { +interface OwlProducerBuilder

{ fun config(): T fun apply(owlProducerConfig: T) + + fun build(): P } \ No newline at end of file diff --git a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt index d4a5288e..2b6548f0 100644 --- a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt +++ b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerBuilderConfig.kt @@ -16,6 +16,9 @@ package dev.usbharu.owl.producer.api -fun , C : OwlProducerConfig> OWL(owlProducerBuilder: T, config: C.() -> Unit) { - owlProducerBuilder.apply(owlProducerBuilder.config().apply { config() }) +fun

, C : OwlProducerConfig> OWL( + owlProducerBuilder: T, + configBlock: C.() -> Unit +) { + owlProducerBuilder.apply(owlProducerBuilder.config().apply { configBlock() }) } \ No newline at end of file diff --git a/producer/default/build.gradle.kts b/producer/default/build.gradle.kts new file mode 100644 index 00000000..722c7de8 --- /dev/null +++ b/producer/default/build.gradle.kts @@ -0,0 +1,55 @@ +plugins { + kotlin("jvm") + id("com.google.protobuf") version "0.9.4" +} + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation("org.jetbrains.kotlin:kotlin-test") + implementation(project(":producer:api")) + implementation("io.grpc:grpc-kotlin-stub:1.4.1") + implementation("io.grpc:grpc-protobuf:1.61.1") + implementation("com.google.protobuf:protobuf-kotlin:3.25.3") + implementation("io.grpc:grpc-netty:1.61.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + implementation(project(":common")) + protobuf(files(project(":broker").dependencyProject.projectDir.toString() + "/src/main/proto")) +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:3.25.3" + } + plugins { + create("grpc") { + artifact = "io.grpc:protoc-gen-grpc-java:1.61.1" + } + create("grpckt") { + artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1:jdk8@jar" + } + } + generateProtoTasks { + all().forEach { + it.plugins { + create("grpc") + create("grpckt") + } + it.builtins { + create("kotlin") + } + } + } +} \ No newline at end of file diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt new file mode 100644 index 00000000..573fba71 --- /dev/null +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducer.kt @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.producer.defaultimpl + +import com.google.protobuf.timestamp +import dev.usbharu.owl.* +import dev.usbharu.owl.Uuid.UUID +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.task.PublishedTask +import dev.usbharu.owl.common.task.Task +import dev.usbharu.owl.common.task.TaskDefinition +import dev.usbharu.owl.producer.api.OwlProducer +import java.time.Instant + +class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProducerConfig) : OwlProducer { + + lateinit var producerId: UUID + lateinit var producerServiceCoroutineStub: ProducerServiceGrpcKt.ProducerServiceCoroutineStub + lateinit var defineTaskServiceCoroutineStub: DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub + lateinit var taskPublishServiceCoroutineStub: TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub + val map = mutableMapOf, TaskDefinition<*>>() + override suspend fun start() { + producerServiceCoroutineStub = + ProducerServiceGrpcKt.ProducerServiceCoroutineStub(defaultOwlProducerConfig.channel) + producerId = producerServiceCoroutineStub.registerProducer(producer { + this.name = defaultOwlProducerConfig.name + this.hostname = defaultOwlProducerConfig.hostname + }).id + + defineTaskServiceCoroutineStub = + DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub(defaultOwlProducerConfig.channel) + + taskPublishServiceCoroutineStub = + TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub(defaultOwlProducerConfig.channel) + } + + + override suspend fun registerTask(taskDefinition: TaskDefinition) { + defineTaskServiceCoroutineStub.register(taskDefinition { + this.producerId = this@DefaultOwlProducer.producerId + this.name = taskDefinition.name + this.maxRetry = taskDefinition.maxRetry + this.priority = taskDefinition.priority + this.retryPolicy = taskDefinition.retryPolicy + this.timeoutMilli = taskDefinition.timeoutMilli + this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash() + }) + } + + override suspend fun publishTask(task: T): PublishedTask { + val taskDefinition = map.getValue(task::class.java) as TaskDefinition + val properties = PropertySerializeUtils.serialize( + defaultOwlProducerConfig.propertySerializerFactory, + taskDefinition.serialize(task) + ) + val now = Instant.now() + val publishTask = taskPublishServiceCoroutineStub.publishTask( + dev.usbharu.owl.publishTask { + this.producerId = this@DefaultOwlProducer.producerId + + this.publishedAt = timestamp { + this.seconds = now.epochSecond + this.nanos = now.nano + } + this.name = taskDefinition.name + this.properties.putAll(properties) + } + ) + + return PublishedTask( + task, + java.util.UUID(publishTask.id.mostSignificantUuidBits, publishTask.id.leastSignificantUuidBits), + now + ) + } +} \ No newline at end of file diff --git a/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt new file mode 100644 index 00000000..8ebaaf1c --- /dev/null +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerBuilder.kt @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.producer.defaultimpl + +import dev.usbharu.owl.producer.api.OwlProducerBuilder +import io.grpc.ManagedChannelBuilder + +class DefaultOwlProducerBuilder : OwlProducerBuilder { + + var config: DefaultOwlProducerConfig = config() + + override fun config(): DefaultOwlProducerConfig { + val defaultOwlProducerConfig = DefaultOwlProducerConfig() + + with(defaultOwlProducerConfig) { + channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build() + } + + return defaultOwlProducerConfig + } + + override fun build(): DefaultOwlProducer { + return DefaultOwlProducer( + config + ) + } + + override fun apply(owlProducerConfig: DefaultOwlProducerConfig) { + this.config = owlProducerConfig + } +} + +val DEFAULT by lazy { DefaultOwlProducerBuilder() } \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/OwlTaskDatasource.kt b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt similarity index 58% rename from producer/impl/src/main/kotlin/dev/usbharu/producer/impl/OwlTaskDatasource.kt rename to producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt index 7c9410aa..527e1d04 100644 --- a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/OwlTaskDatasource.kt +++ b/producer/default/src/main/kotlin/dev/usbharu/owl/producer/defaultimpl/DefaultOwlProducerConfig.kt @@ -14,14 +14,15 @@ * limitations under the License. */ -package dev.usbharu.producer.impl +package dev.usbharu.dev.usbharu.owl.producer.defaultimpl -import dev.usbharu.owl.common.task.PublishedTask -import dev.usbharu.owl.common.task.Task -import dev.usbharu.owl.common.task.TaskDefinition +import dev.usbharu.owl.common.property.PropertySerializerFactory +import dev.usbharu.owl.producer.api.OwlProducerConfig +import io.grpc.Channel -interface OwlTaskDatasource { - - suspend fun registerTask(definition: TaskDefinition) - suspend fun publishTask(publishedTask: PublishedTask) +class DefaultOwlProducerConfig : OwlProducerConfig { + lateinit var channel: Channel + lateinit var name: String + lateinit var hostname: String + lateinit var propertySerializerFactory: PropertySerializerFactory } \ No newline at end of file diff --git a/producer/impl/build.gradle.kts b/producer/impl/build.gradle.kts deleted file mode 100644 index 87bf53a1..00000000 --- a/producer/impl/build.gradle.kts +++ /dev/null @@ -1,21 +0,0 @@ -plugins { - kotlin("jvm") -} - -group = "dev.usbharu" -version = "0.0.1" - -repositories { - mavenCentral() -} - -dependencies { - implementation(project(":producer:api")) -} - -tasks.test { - useJUnitPlatform() -} -kotlin { - jvmToolchain(17) -} \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/DefaultOwlProducer.kt b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/DefaultOwlProducer.kt deleted file mode 100644 index 1607a62b..00000000 --- a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/DefaultOwlProducer.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2024 usbharu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.usbharu.producer.impl - -import dev.usbharu.owl.common.task.PublishedTask -import dev.usbharu.owl.common.task.Task -import dev.usbharu.owl.common.task.TaskDefinition -import dev.usbharu.owl.producer.api.OwlProducer - -class DefaultOwlProducer : OwlProducer { - override suspend fun registerTask(taskDefinition: TaskDefinition) { - TODO("Not yet implemented") - } - - override suspend fun publishTask(task: T): PublishedTask { - TODO("Not yet implemented") - } -} \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/DatasourceFactory.kt b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/DatasourceFactory.kt deleted file mode 100644 index 293ba15b..00000000 --- a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/DatasourceFactory.kt +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (C) 2024 usbharu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.usbharu.producer.impl.datasource - -import dev.usbharu.producer.impl.OwlTaskDatasource - -interface DatasourceFactory { - suspend fun create():OwlTaskDatasource -} \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/ServiceProviderDatasourceFactory.kt b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/ServiceProviderDatasourceFactory.kt deleted file mode 100644 index f198a270..00000000 --- a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/ServiceProviderDatasourceFactory.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2024 usbharu - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.usbharu.producer.impl.datasource - -import dev.usbharu.producer.impl.OwlTaskDatasource -import java.util.ServiceLoader -import kotlin.jvm.optionals.getOrElse -import kotlin.jvm.optionals.getOrNull - -class ServiceProviderDatasourceFactory : DatasourceFactory { - override suspend fun create(): OwlTaskDatasource { - val serviceLoader: ServiceLoader = ServiceLoader.load(OwlTaskDatasource::class.java) - - return serviceLoader.findFirst().getOrElse { - throw IllegalStateException("") - } - } -} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 8e2a06bc..f447b5a2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -5,8 +5,8 @@ rootProject.name = "owl" include("common") include("producer:api") findProject(":producer:api")?.name = "api" -include("producer:impl") -findProject(":producer:impl")?.name = "impl" include("broker") include("broker:broker-mongodb") findProject(":broker:broker-mongodb")?.name = "broker-mongodb" +include("producer:default") +findProject(":producer:default")?.name = "default"