feat: Producerのデフォルト実装を追加

This commit is contained in:
usbharu 2024-03-31 12:12:20 +09:00
parent fad394bbab
commit 1c576f9463
Signed by: usbharu
GPG Key ID: 8CB1087135660B8D
15 changed files with 222 additions and 125 deletions

View File

@ -3,7 +3,6 @@ syntax = "proto3";
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
import "uuid.proto"; import "uuid.proto";
import "property.proto";
option java_package = "dev.usbharu.owl"; option java_package = "dev.usbharu.owl";

View File

@ -18,6 +18,12 @@ package dev.usbharu.owl.common.task
import dev.usbharu.owl.common.property.PropertyType import dev.usbharu.owl.common.property.PropertyType
class PropertyDefinition(val map: Map<String, PropertyType>) : Map<String, PropertyType> by map{ class PropertyDefinition(val map: Map<String, PropertyType>) : Map<String, PropertyType> by map {
fun hash(): Long {
var hash = 1L
map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 }
return hash
}
} }

View File

@ -17,15 +17,15 @@
package dev.usbharu.owl.common.task package dev.usbharu.owl.common.task
import dev.usbharu.owl.common.property.PropertyValue import dev.usbharu.owl.common.property.PropertyValue
import dev.usbharu.owl.common.retry.RetryPolicy
interface TaskDefinition<T : Task> { interface TaskDefinition<T : Task> {
val name: String val name: String
val priority: Int val priority: Int
val maxRetry: Int val maxRetry: Int
val retryPolicy:RetryPolicy val retryPolicy: String
val timeoutMilli: Long val timeoutMilli: Long
val propertyDefinition: PropertyDefinition val propertyDefinition: PropertyDefinition
val type: Class<T>
fun serialize(task: T): Map<String, PropertyValue<*>> fun serialize(task: T): Map<String, PropertyValue<*>>
fun deserialize(value: Map<String, PropertyValue<*>>): T fun deserialize(value: Map<String, PropertyValue<*>>): T

View File

@ -22,6 +22,8 @@ import dev.usbharu.owl.common.task.TaskDefinition
interface OwlProducer { interface OwlProducer {
suspend fun start()
suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>)
suspend fun <T : Task> publishTask(task: T): PublishedTask<T> suspend fun <T : Task> publishTask(task: T): PublishedTask<T>
} }

View File

@ -16,7 +16,9 @@
package dev.usbharu.owl.producer.api package dev.usbharu.owl.producer.api
interface OwlProducerBuilder<T : OwlProducerConfig> { interface OwlProducerBuilder<P : OwlProducer, T : OwlProducerConfig> {
fun config(): T fun config(): T
fun apply(owlProducerConfig: T) fun apply(owlProducerConfig: T)
fun build(): P
} }

View File

@ -16,6 +16,9 @@
package dev.usbharu.owl.producer.api package dev.usbharu.owl.producer.api
fun <T : OwlProducerBuilder<C>, C : OwlProducerConfig> OWL(owlProducerBuilder: T, config: C.() -> Unit) { fun <P : OwlProducer, T : OwlProducerBuilder<P, C>, C : OwlProducerConfig> OWL(
owlProducerBuilder.apply(owlProducerBuilder.config().apply { config() }) owlProducerBuilder: T,
configBlock: C.() -> Unit
) {
owlProducerBuilder.apply(owlProducerBuilder.config().apply { configBlock() })
} }

View File

@ -0,0 +1,55 @@
plugins {
kotlin("jvm")
id("com.google.protobuf") version "0.9.4"
}
group = "dev.usbharu"
version = "0.0.1"
repositories {
mavenCentral()
}
dependencies {
testImplementation("org.jetbrains.kotlin:kotlin-test")
implementation(project(":producer:api"))
implementation("io.grpc:grpc-kotlin-stub:1.4.1")
implementation("io.grpc:grpc-protobuf:1.61.1")
implementation("com.google.protobuf:protobuf-kotlin:3.25.3")
implementation("io.grpc:grpc-netty:1.61.1")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
implementation(project(":common"))
protobuf(files(project(":broker").dependencyProject.projectDir.toString() + "/src/main/proto"))
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.3"
}
plugins {
create("grpc") {
artifact = "io.grpc:protoc-gen-grpc-java:1.61.1"
}
create("grpckt") {
artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1:jdk8@jar"
}
}
generateProtoTasks {
all().forEach {
it.plugins {
create("grpc")
create("grpckt")
}
it.builtins {
create("kotlin")
}
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
import com.google.protobuf.timestamp
import dev.usbharu.owl.*
import dev.usbharu.owl.Uuid.UUID
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.task.PublishedTask
import dev.usbharu.owl.common.task.Task
import dev.usbharu.owl.common.task.TaskDefinition
import dev.usbharu.owl.producer.api.OwlProducer
import java.time.Instant
class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProducerConfig) : OwlProducer {
lateinit var producerId: UUID
lateinit var producerServiceCoroutineStub: ProducerServiceGrpcKt.ProducerServiceCoroutineStub
lateinit var defineTaskServiceCoroutineStub: DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub
lateinit var taskPublishServiceCoroutineStub: TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub
val map = mutableMapOf<Class<*>, TaskDefinition<*>>()
override suspend fun start() {
producerServiceCoroutineStub =
ProducerServiceGrpcKt.ProducerServiceCoroutineStub(defaultOwlProducerConfig.channel)
producerId = producerServiceCoroutineStub.registerProducer(producer {
this.name = defaultOwlProducerConfig.name
this.hostname = defaultOwlProducerConfig.hostname
}).id
defineTaskServiceCoroutineStub =
DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub(defaultOwlProducerConfig.channel)
taskPublishServiceCoroutineStub =
TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub(defaultOwlProducerConfig.channel)
}
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
defineTaskServiceCoroutineStub.register(taskDefinition {
this.producerId = this@DefaultOwlProducer.producerId
this.name = taskDefinition.name
this.maxRetry = taskDefinition.maxRetry
this.priority = taskDefinition.priority
this.retryPolicy = taskDefinition.retryPolicy
this.timeoutMilli = taskDefinition.timeoutMilli
this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash()
})
}
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
val taskDefinition = map.getValue(task::class.java) as TaskDefinition<T>
val properties = PropertySerializeUtils.serialize(
defaultOwlProducerConfig.propertySerializerFactory,
taskDefinition.serialize(task)
)
val now = Instant.now()
val publishTask = taskPublishServiceCoroutineStub.publishTask(
dev.usbharu.owl.publishTask {
this.producerId = this@DefaultOwlProducer.producerId
this.publishedAt = timestamp {
this.seconds = now.epochSecond
this.nanos = now.nano
}
this.name = taskDefinition.name
this.properties.putAll(properties)
}
)
return PublishedTask(
task,
java.util.UUID(publishTask.id.mostSignificantUuidBits, publishTask.id.leastSignificantUuidBits),
now
)
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
import dev.usbharu.owl.producer.api.OwlProducerBuilder
import io.grpc.ManagedChannelBuilder
class DefaultOwlProducerBuilder : OwlProducerBuilder<DefaultOwlProducer, DefaultOwlProducerConfig> {
var config: DefaultOwlProducerConfig = config()
override fun config(): DefaultOwlProducerConfig {
val defaultOwlProducerConfig = DefaultOwlProducerConfig()
with(defaultOwlProducerConfig) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build()
}
return defaultOwlProducerConfig
}
override fun build(): DefaultOwlProducer {
return DefaultOwlProducer(
config
)
}
override fun apply(owlProducerConfig: DefaultOwlProducerConfig) {
this.config = owlProducerConfig
}
}
val DEFAULT by lazy { DefaultOwlProducerBuilder() }

View File

@ -14,14 +14,15 @@
* limitations under the License. * limitations under the License.
*/ */
package dev.usbharu.producer.impl package dev.usbharu.dev.usbharu.owl.producer.defaultimpl
import dev.usbharu.owl.common.task.PublishedTask import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.common.task.Task import dev.usbharu.owl.producer.api.OwlProducerConfig
import dev.usbharu.owl.common.task.TaskDefinition import io.grpc.Channel
interface OwlTaskDatasource { class DefaultOwlProducerConfig : OwlProducerConfig {
lateinit var channel: Channel
suspend fun <T:Task> registerTask(definition: TaskDefinition<T>) lateinit var name: String
suspend fun <T:Task> publishTask(publishedTask: PublishedTask<T>) lateinit var hostname: String
lateinit var propertySerializerFactory: PropertySerializerFactory
} }

View File

@ -1,21 +0,0 @@
plugins {
kotlin("jvm")
}
group = "dev.usbharu"
version = "0.0.1"
repositories {
mavenCentral()
}
dependencies {
implementation(project(":producer:api"))
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}

View File

@ -1,32 +0,0 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.producer.impl
import dev.usbharu.owl.common.task.PublishedTask
import dev.usbharu.owl.common.task.Task
import dev.usbharu.owl.common.task.TaskDefinition
import dev.usbharu.owl.producer.api.OwlProducer
class DefaultOwlProducer : OwlProducer {
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
TODO("Not yet implemented")
}
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
TODO("Not yet implemented")
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.producer.impl.datasource
import dev.usbharu.producer.impl.OwlTaskDatasource
interface DatasourceFactory {
suspend fun create():OwlTaskDatasource
}

View File

@ -1,32 +0,0 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.producer.impl.datasource
import dev.usbharu.producer.impl.OwlTaskDatasource
import java.util.ServiceLoader
import kotlin.jvm.optionals.getOrElse
import kotlin.jvm.optionals.getOrNull
class ServiceProviderDatasourceFactory : DatasourceFactory {
override suspend fun create(): OwlTaskDatasource {
val serviceLoader: ServiceLoader<OwlTaskDatasource> = ServiceLoader.load(OwlTaskDatasource::class.java)
return serviceLoader.findFirst().getOrElse {
throw IllegalStateException("")
}
}
}

View File

@ -5,8 +5,8 @@ rootProject.name = "owl"
include("common") include("common")
include("producer:api") include("producer:api")
findProject(":producer:api")?.name = "api" findProject(":producer:api")?.name = "api"
include("producer:impl")
findProject(":producer:impl")?.name = "impl"
include("broker") include("broker")
include("broker:broker-mongodb") include("broker:broker-mongodb")
findProject(":broker:broker-mongodb")?.name = "broker-mongodb" findProject(":broker:broker-mongodb")?.name = "broker-mongodb"
include("producer:default")
findProject(":producer:default")?.name = "default"