commit fbdec67606cb9463b5e4c37d0066c0d96827be0e Author: usbharu <64310155+usbharu@users.noreply.github.com> Date: Mon Mar 4 18:24:57 2024 +0900 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bf3e1b2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store +/.idea/ diff --git a/broker/broker-mongodb/build.gradle.kts b/broker/broker-mongodb/build.gradle.kts new file mode 100644 index 0000000..8268414 --- /dev/null +++ b/broker/broker-mongodb/build.gradle.kts @@ -0,0 +1,33 @@ +plugins { + kotlin("jvm") + id("com.google.devtools.ksp") version "1.9.22-1.0.17" +} + +apply { + plugin("com.google.devtools.ksp") +} + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + implementation("org.mongodb:mongodb-driver-kotlin-coroutine:5.0.0") + compileOnly(project(":broker")) + implementation(project(":common")) + implementation(platform("io.insert-koin:koin-bom:3.5.3")) + implementation(platform("io.insert-koin:koin-annotations-bom:1.3.1")) + implementation("io.insert-koin:koin-core") + compileOnly("io.insert-koin:koin-annotations") + ksp("io.insert-koin:koin-ksp-compiler:1.3.1") +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} \ No newline at end of file diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModule.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModule.kt new file mode 100644 index 0000000..33f4086 --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModule.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import org.koin.core.annotation.ComponentScan +import org.koin.core.annotation.Module + +@Module +@ComponentScan("dev.usbharu.owl.broker.mongodb") +class MongoModule { +} + diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt new file mode 100644 index 0000000..0fa424d --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongoModuleContext.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import dev.usbharu.owl.broker.ModuleContext +import org.koin.ksp.generated.module + +class MongoModuleContext : ModuleContext { + override fun module(): org.koin.core.module.Module { + return MongoModule().module + } +} \ No newline at end of file diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt new file mode 100644 index 0000000..1c34892 --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepository.kt @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import com.mongodb.client.model.Filters +import com.mongodb.client.model.ReplaceOptions +import com.mongodb.kotlin.client.coroutine.MongoDatabase +import dev.usbharu.owl.broker.domain.model.consumer.Consumer +import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository +import kotlinx.coroutines.flow.singleOrNull +import org.bson.BsonType +import org.bson.codecs.pojo.annotations.BsonId +import org.bson.codecs.pojo.annotations.BsonRepresentation +import org.koin.core.annotation.Singleton +import java.util.* + +@Singleton +class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository { + + private val collection = database.getCollection("consumers") + override suspend fun save(consumer: Consumer): Consumer { + collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true)) + return consumer + } + + override suspend fun findById(id: UUID): Consumer? { + return collection.find(Filters.eq("_id", id.toString())).singleOrNull()?.toConsumer() + } +} + +data class ConsumerMongodb( + @BsonId + @BsonRepresentation(BsonType.STRING) + val id: String, + val name: String, + val hostname: String, + val tasks: List +){ + + fun toConsumer():Consumer{ + return Consumer( + UUID.fromString(id), name, hostname, tasks + ) + } + companion object{ + fun of(consumer: Consumer):ConsumerMongodb{ + return ConsumerMongodb( + consumer.id.toString(), + consumer.name, + consumer.hostname, + consumer.tasks + ) + } + } +} \ No newline at end of file diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt new file mode 100644 index 0000000..fe93cf8 --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbProducerRepository.kt @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import com.mongodb.client.model.Filters +import com.mongodb.client.model.ReplaceOptions +import com.mongodb.kotlin.client.coroutine.MongoDatabase +import dev.usbharu.owl.broker.domain.model.producer.Producer +import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository +import org.koin.core.annotation.Singleton +import java.time.Instant +import java.util.* + +@Singleton +class MongodbProducerRepository(database: MongoDatabase) : ProducerRepository { + + private val collection = database.getCollection("producers") + + override suspend fun save(producer: Producer): Producer { + collection.replaceOne( + Filters.eq("_id", producer.id.toString()), + ProducerMongodb.of(producer), + ReplaceOptions().upsert(true) + ) + return producer + } +} + +data class ProducerMongodb( + val id: String, + val name: String, + val hostname: String, + val registeredTask: List, + val createdAt: Instant +) { + fun toProducer(): Producer { + return Producer( + id = UUID.fromString(id), + name = name, + hostname = hostname, + registeredTask = registeredTask, + createdAt = createdAt + ) + } + + companion object { + fun of(producer: Producer): ProducerMongodb { + return ProducerMongodb( + id = producer.id.toString(), + name = producer.name, + hostname = producer.hostname, + registeredTask = producer.registeredTask, + createdAt = producer.createdAt + ) + } + } +} \ No newline at end of file diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt new file mode 100644 index 0000000..c44cf96 --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbQueuedTaskRepository.kt @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import com.mongodb.client.model.Filters.* +import com.mongodb.client.model.FindOneAndUpdateOptions +import com.mongodb.client.model.ReplaceOptions +import com.mongodb.client.model.ReturnDocument +import com.mongodb.client.model.Updates.set +import com.mongodb.kotlin.client.coroutine.MongoDatabase +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import org.bson.BsonType +import org.bson.codecs.pojo.annotations.BsonId +import org.bson.codecs.pojo.annotations.BsonRepresentation +import org.koin.core.annotation.Singleton +import java.time.Instant +import java.util.* + +@Singleton +class MongodbQueuedTaskRepository(private val propertySerializerFactory: PropertySerializerFactory,database: MongoDatabase) : QueuedTaskRepository { + + private val collection = database.getCollection("queued_task") + override suspend fun save(queuedTask: QueuedTask): QueuedTask { + collection.replaceOne( + eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory,queuedTask), + ReplaceOptions().upsert(true) + ) + return queuedTask + } + + override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask { + val findOneAndUpdate = collection.findOneAndUpdate( + and( + eq("_id", id.toString()), + eq(QueuedTaskMongodb::assignedConsumer.name, null) + ), + listOf( + set(QueuedTaskMongodb::assignedConsumer.name, update.assignedConsumer), + set(QueuedTaskMongodb::assignedAt.name, update.assignedAt) + ), + FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER) + ) + if (findOneAndUpdate == null) { + TODO() + } + return findOneAndUpdate.toQueuedTask(propertySerializerFactory) + } + + override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( + tasks: List, + limit: Int + ): Flow { + return collection.find( + and( + `in`("task.name", tasks), + eq(QueuedTaskMongodb::assignedConsumer.name, null) + ) + ).map { it.toQueuedTask(propertySerializerFactory) } + } +} + +data class QueuedTaskMongodb( + @BsonId + @BsonRepresentation(BsonType.STRING) + val id: String, + val task: TaskMongodb, + val attempt: Int, + val queuedAt: Instant, + val assignedConsumer: String?, + val assignedAt: Instant? +) { + + fun toQueuedTask(propertySerializerFactory: PropertySerializerFactory): QueuedTask { + return QueuedTask( + attempt, + queuedAt, + task.toTask(propertySerializerFactory), + UUID.fromString(assignedConsumer), + assignedAt + ) + } + + companion object { + fun of(propertySerializerFactory: PropertySerializerFactory,queuedTask: QueuedTask): QueuedTaskMongodb { + return QueuedTaskMongodb( + queuedTask.task.id.toString(), + TaskMongodb.of(propertySerializerFactory,queuedTask.task), + queuedTask.attempt, + queuedTask.queuedAt, + queuedTask.assignedConsumer?.toString(), + queuedTask.assignedAt + ) + } + } +} \ No newline at end of file diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt new file mode 100644 index 0000000..a186cd5 --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskDefinitionRepository.kt @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import com.mongodb.client.model.Filters +import com.mongodb.client.model.ReplaceOptions +import com.mongodb.kotlin.client.coroutine.MongoDatabase +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import kotlinx.coroutines.flow.singleOrNull +import org.bson.BsonType +import org.bson.codecs.pojo.annotations.BsonId +import org.bson.codecs.pojo.annotations.BsonRepresentation +import org.koin.core.annotation.Singleton + +@Singleton +class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionRepository { + + private val collection = database.getCollection("task_definition") + override suspend fun save(taskDefinition: TaskDefinition): TaskDefinition { + collection.replaceOne( + Filters.eq("_id", taskDefinition.name), + TaskDefinitionMongodb.of(taskDefinition), + ReplaceOptions().upsert(true) + ) + return taskDefinition + } + + override suspend fun deleteByName(name: String) { + collection.deleteOne(Filters.eq("_id",name)) + } + + override suspend fun findByName(name: String): TaskDefinition? { + return collection.find(Filters.eq("_id", name)).singleOrNull()?.toTaskDefinition() + } +} + +data class TaskDefinitionMongodb( + @BsonId + @BsonRepresentation(BsonType.STRING) + val name: String, + val priority: Int, + val maxRetry: Int, + val timeoutMilli: Long, + val propertyDefinitionHash: Long, + val retryPolicy: String +) { + fun toTaskDefinition(): TaskDefinition { + return TaskDefinition( + name = name, + priority = priority, + maxRetry = maxRetry, + timeoutMilli = timeoutMilli, + propertyDefinitionHash = propertyDefinitionHash, + retryPolicy = retryPolicy + ) + } + + companion object { + fun of(taskDefinition: TaskDefinition): TaskDefinitionMongodb { + return TaskDefinitionMongodb( + name = taskDefinition.name, + priority = taskDefinition.priority, + maxRetry = taskDefinition.maxRetry, + timeoutMilli = taskDefinition.timeoutMilli, + propertyDefinitionHash = taskDefinition.propertyDefinitionHash, + retryPolicy = taskDefinition.retryPolicy + ) + } + } +} \ No newline at end of file diff --git a/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt new file mode 100644 index 0000000..9dd5b4e --- /dev/null +++ b/broker/broker-mongodb/src/main/kotlin/dev/usbharu/owl/broker/mongodb/MongodbTaskRepository.kt @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.mongodb + +import com.mongodb.client.model.Filters +import com.mongodb.client.model.ReplaceOptions +import com.mongodb.kotlin.client.coroutine.MongoDatabase +import dev.usbharu.owl.broker.domain.model.task.Task +import dev.usbharu.owl.broker.domain.model.task.TaskRepository +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import org.koin.core.annotation.Singleton +import java.time.Instant +import java.util.* + +@Singleton +class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) : + TaskRepository { + + private val collection = database.getCollection("tasks") + override suspend fun save(task: Task): Task { + collection.replaceOne( + Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), + ReplaceOptions().upsert(true) + ) + return task + } + + override fun findByNextRetryBefore(timestamp: Instant): Flow { + return collection.find(Filters.lte(TaskMongodb::nextRetry.name, timestamp)) + .map { it.toTask(propertySerializerFactory) } + } +} + +data class TaskMongodb( + val name: String, + val id: String, + val publishProducerId: String, + val publishedAt: Instant, + val nextRetry: Instant, + val completedAt: Instant?, + val attempt: Int, + val properties: Map +) { + + fun toTask(propertySerializerFactory: PropertySerializerFactory): Task { + return Task( + name = name, + id = UUID.fromString(id), + publishProducerId = UUID.fromString(publishProducerId), + publishedAt = publishedAt, + nextRetry = nextRetry, + completedAt = completedAt, + attempt = attempt, + properties = PropertySerializeUtils.deserialize(propertySerializerFactory, properties) + ) + } + + companion object { + fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb { + return TaskMongodb( + task.name, + task.id.toString(), + task.publishProducerId.toString(), + task.publishedAt, + task.nextRetry, + task.completedAt, + task.attempt, + PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) + ) + } + } +} \ No newline at end of file diff --git a/broker/broker-mongodb/src/test/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepositoryTest.kt b/broker/broker-mongodb/src/test/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepositoryTest.kt new file mode 100644 index 0000000..b71c5ea --- /dev/null +++ b/broker/broker-mongodb/src/test/kotlin/dev/usbharu/owl/broker/mongodb/MongodbConsumerRepositoryTest.kt @@ -0,0 +1,48 @@ +package dev.usbharu.owl.broker.mongodb + +import com.mongodb.ConnectionString +import com.mongodb.MongoClientSettings +import com.mongodb.kotlin.client.coroutine.MongoClient +import dev.usbharu.owl.broker.domain.model.consumer.Consumer +import kotlinx.coroutines.runBlocking +import org.bson.UuidRepresentation +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import java.util.* + + +class MongodbConsumerRepositoryTest { + @Test + fun name() { + + val clientSettings = + MongoClientSettings.builder().applyConnectionString(ConnectionString("mongodb://localhost:27017")) + .uuidRepresentation(UuidRepresentation.STANDARD).build() + + + val database = MongoClient.create(clientSettings).getDatabase("mongo-test") + + val mongodbConsumerRepository = MongodbConsumerRepository(database) + + val consumer = Consumer( + UUID.randomUUID(), + name = "test", + hostname = "aaa", + tasks = listOf("a", "b", "c") + ) + runBlocking { + mongodbConsumerRepository.save(consumer) + + val findById = mongodbConsumerRepository.findById(UUID.randomUUID()) + assertEquals(null, findById) + + val findById1 = mongodbConsumerRepository.findById(consumer.id) + assertEquals(consumer, findById1) + + mongodbConsumerRepository.save(consumer.copy(name = "test2")) + + val findById2 = mongodbConsumerRepository.findById(consumer.id) + assertEquals(consumer.copy(name = "test2"), findById2) + } + } +} \ No newline at end of file diff --git a/broker/build.gradle.kts b/broker/build.gradle.kts new file mode 100644 index 0000000..c16d61b --- /dev/null +++ b/broker/build.gradle.kts @@ -0,0 +1,67 @@ +plugins { + kotlin("jvm") + id("com.google.protobuf") version "0.9.4" + id("com.google.devtools.ksp") version "1.9.22-1.0.17" +} + +apply { + plugin("com.google.devtools.ksp") +} + + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + implementation("io.grpc:grpc-kotlin-stub:1.4.1") + implementation("io.grpc:grpc-protobuf:1.61.1") + implementation("com.google.protobuf:protobuf-kotlin:3.25.3") + implementation("io.grpc:grpc-netty:1.61.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + implementation("org.mongodb:mongodb-driver-kotlin-coroutine:4.11.0") + implementation("org.mongodb:bson-kotlinx:4.11.0") + implementation(project(":common")) + runtimeOnly(project(":broker:broker-mongodb")) + implementation("org.apache.logging.log4j:log4j-slf4j2-impl:2.23.0") + implementation(platform("io.insert-koin:koin-bom:3.5.3")) + implementation(platform("io.insert-koin:koin-annotations-bom:1.3.1")) + implementation("io.insert-koin:koin-core") + compileOnly("io.insert-koin:koin-annotations") + ksp("io.insert-koin:koin-ksp-compiler:1.3.1") +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:3.25.3" + } + plugins { + create("grpc") { + artifact = "io.grpc:protoc-gen-grpc-java:1.61.1" + } + create("grpckt") { + artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1:jdk8@jar" + } + } + generateProtoTasks { + all().forEach { + it.plugins { + create("grpc") + create("grpckt") + } + it.builtins { + create("kotlin") + } + } + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt new file mode 100644 index 0000000..4241caf --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/Main.kt @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker + +import com.mongodb.ConnectionString +import com.mongodb.MongoClientSettings +import com.mongodb.kotlin.client.coroutine.MongoClient +import dev.usbharu.owl.broker.service.DefaultRetryPolicyFactory +import dev.usbharu.owl.broker.service.RetryPolicyFactory +import dev.usbharu.owl.common.property.PropertySerializerFactory +import dev.usbharu.owl.common.property.PropertySerializerFactoryImpl +import kotlinx.coroutines.runBlocking +import org.bson.UuidRepresentation +import org.koin.core.context.startKoin +import org.koin.dsl.module +import org.koin.ksp.generated.defaultModule + +fun main() { + + val moduleContext = + Class.forName("dev.usbharu.owl.broker.mongodb.MongoModuleContext").newInstance() as ModuleContext + + + +// println(File(Thread.currentThread().contextClassLoader.getResource("dev/usbharu/owl/broker/mongodb").file).listFiles().joinToString()) + + val koin = startKoin { + printLogger() + + val module = module { + single { + val clientSettings = + MongoClientSettings.builder().applyConnectionString(ConnectionString("mongodb://localhost:27017")) + .uuidRepresentation(UuidRepresentation.STANDARD).build() + + + MongoClient.create(clientSettings).getDatabase("mongo-test") + } + single { + PropertySerializerFactoryImpl() + } + single { + DefaultRetryPolicyFactory(emptyMap()) + } + } + modules(module,defaultModule, moduleContext.module()) + } + + val application = koin.koin.get() + + runBlocking { + application.start(50051).join() + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/ModuleContext.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/ModuleContext.kt new file mode 100644 index 0000000..1478f4f --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/ModuleContext.kt @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker + +import org.koin.core.module.Module + +interface ModuleContext { + fun module():Module +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt new file mode 100644 index 0000000..a6cc617 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/OwlBrokerApplication.kt @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker + +import dev.usbharu.owl.broker.interfaces.grpc.* +import dev.usbharu.owl.broker.service.TaskManagementService +import io.grpc.Server +import io.grpc.ServerBuilder +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import org.koin.core.annotation.Singleton + +@Singleton +class OwlBrokerApplication( + private val assignmentTaskService: AssignmentTaskService, + private val definitionTaskService: DefinitionTaskService, + private val producerService: ProducerService, + private val subscribeTaskService: SubscribeTaskService, + private val taskPublishService: TaskPublishService, + private val taskManagementService: TaskManagementService +) { + + private lateinit var server: Server + + fun start(port: Int,coroutineScope: CoroutineScope = GlobalScope):Job { + server = ServerBuilder.forPort(port) + .addService(assignmentTaskService) + .addService(definitionTaskService) + .addService(producerService) + .addService(subscribeTaskService) + .addService(taskPublishService) + .build() + + server.start() + Runtime.getRuntime().addShutdownHook( + Thread { + server.shutdown() + } + ) + + return coroutineScope.launch { + taskManagementService.startManagement() + } + } + + fun stop() { + server.shutdown() + } + +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/Consumer.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/Consumer.kt new file mode 100644 index 0000000..27fe78b --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/Consumer.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.consumer + +import java.util.* + +data class Consumer( + val id: UUID, + val name: String, + val hostname: String, + val tasks: List +) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/ConsumerRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/ConsumerRepository.kt new file mode 100644 index 0000000..34ebb0a --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/consumer/ConsumerRepository.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.consumer + +import java.util.* + +interface ConsumerRepository { + suspend fun save(consumer: Consumer):Consumer + + suspend fun findById(id:UUID):Consumer? +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/Producer.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/Producer.kt new file mode 100644 index 0000000..0a5e491 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/Producer.kt @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.producer + +import java.time.Instant +import java.util.UUID + +data class Producer( + val id:UUID, + val name:String, + val hostname:String, + val registeredTask:List, + val createdAt: Instant +) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/ProducerRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/ProducerRepository.kt new file mode 100644 index 0000000..932cef1 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/producer/ProducerRepository.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.producer + +interface ProducerRepository { + suspend fun save(producer: Producer):Producer +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt new file mode 100644 index 0000000..e2ed5f2 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTask.kt @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.queuedtask + +import dev.usbharu.owl.broker.domain.model.task.Task +import java.time.Instant +import java.util.* + +/** + * @param attempt キューされた時点での試行回数より1多い + */ +data class QueuedTask( + val attempt: Int, + val queuedAt: Instant, + val task: Task, + val assignedConsumer: UUID?, + val assignedAt:Instant? +) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt new file mode 100644 index 0000000..049b92a --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/queuedtask/QueuedTaskRepository.kt @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.queuedtask + +import kotlinx.coroutines.flow.Flow +import java.util.* + +interface QueuedTaskRepository { + suspend fun save(queuedTask: QueuedTask):QueuedTask + + /** + * トランザクションの代わり + */ + suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask + + fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks:List,limit:Int): Flow +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/Task.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/Task.kt new file mode 100644 index 0000000..07347c0 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/Task.kt @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.task + +import dev.usbharu.owl.common.property.PropertyValue +import java.time.Instant +import java.util.* + +/** + * @param attempt 失敗を含めて試行した回数 + */ +data class Task( + val name:String, + val id: UUID, + val publishProducerId:UUID, + val publishedAt: Instant, + val nextRetry:Instant, + val completedAt: Instant? = null, + val attempt: Int, + val properties: Map +) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt new file mode 100644 index 0000000..60159a2 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/task/TaskRepository.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.task + +import kotlinx.coroutines.flow.Flow +import java.time.Instant + +interface TaskRepository { + suspend fun save(task: Task):Task + + fun findByNextRetryBefore(timestamp:Instant): Flow +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinition.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinition.kt new file mode 100644 index 0000000..8fcb8f1 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinition.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.taskdefinition + +data class TaskDefinition( + val name: String, + val priority: Int, + val maxRetry: Int, + val timeoutMilli: Long, + val propertyDefinitionHash: Long, + val retryPolicy:String +) diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinitionRepository.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinitionRepository.kt new file mode 100644 index 0000000..2a1c3c6 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/domain/model/taskdefinition/TaskDefinitionRepository.kt @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.domain.model.taskdefinition + +interface TaskDefinitionRepository { + suspend fun save(taskDefinition: TaskDefinition): TaskDefinition + suspend fun deleteByName(name:String) + + suspend fun findByName(name:String):TaskDefinition? +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/external/GrpcExtension.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/external/GrpcExtension.kt new file mode 100644 index 0000000..4271b55 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/external/GrpcExtension.kt @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.external + +import com.google.protobuf.Timestamp +import dev.usbharu.owl.Uuid +import java.time.Instant +import java.util.* + +fun Uuid.UUID.toUUID(): UUID = UUID(mostSignificantUuidBits, leastSignificantUuidBits) + +fun UUID.toUUID(): Uuid.UUID = Uuid + .UUID + .newBuilder() + .setMostSignificantUuidBits(mostSignificantBits) + .setLeastSignificantUuidBits(leastSignificantBits) + .build() + +fun Timestamp.toInstant(): Instant = Instant.ofEpochSecond(seconds, nanos.toLong()) + +fun Instant.toTimestamp():Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build() \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt new file mode 100644 index 0000000..a184058 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/AssignmentTaskService.kt @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.interfaces.grpc + +import dev.usbharu.owl.AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase +import dev.usbharu.owl.Task +import dev.usbharu.owl.Task.TaskRequest +import dev.usbharu.owl.broker.external.toTimestamp +import dev.usbharu.owl.broker.external.toUUID +import dev.usbharu.owl.broker.service.QueuedTaskAssigner +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flatMapMerge +import kotlinx.coroutines.flow.map +import org.koin.core.annotation.Singleton +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +@Singleton +class AssignmentTaskService( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + private val queuedTaskAssigner: QueuedTaskAssigner, + private val propertySerializerFactory: PropertySerializerFactory +) : + AssignmentTaskServiceCoroutineImplBase(coroutineContext) { + + override fun ready(requests: Flow): Flow { + return requests + .flatMapMerge { + queuedTaskAssigner.ready(it.consumerId.toUUID(), it.numberOfConcurrent) + } + .map { + TaskRequest + .newBuilder() + .setName(it.task.name) + .setId(it.task.id.toUUID()) + .setAttempt(it.attempt) + .setQueuedAt(it.queuedAt.toTimestamp()) + .putAllProperties(PropertySerializeUtils.serialize(propertySerializerFactory, it.task.properties)) + .build() + } + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/DefinitionTaskService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/DefinitionTaskService.kt new file mode 100644 index 0000000..3ce2b5b --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/DefinitionTaskService.kt @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.interfaces.grpc + +import com.google.protobuf.Empty +import dev.usbharu.owl.DefinitionTask +import dev.usbharu.owl.DefinitionTask.TaskDefined +import dev.usbharu.owl.DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineImplBase +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition +import dev.usbharu.owl.broker.service.RegisterTaskService +import org.koin.core.annotation.Singleton +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +@Singleton +class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineContext,private val registerTaskService: RegisterTaskService) : + DefinitionTaskServiceCoroutineImplBase(coroutineContext) { + override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined { + registerTaskService.registerTask( + TaskDefinition( + request.name, + request.priority, + request.maxRetry, + request.timeoutMilli, + request.propertyDefinitionHash, + request.retryPolicy + ) + ) + return TaskDefined + .newBuilder() + .setTaskId( + request.name + ) + .build() + } + + override suspend fun unregister(request: DefinitionTask.TaskUnregister): Empty { + registerTaskService.unregisterTask(request.name) + return Empty.getDefaultInstance() + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/ProducerService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/ProducerService.kt new file mode 100644 index 0000000..c01bec6 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/ProducerService.kt @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.interfaces.grpc + +import dev.usbharu.owl.ProducerOuterClass +import dev.usbharu.owl.ProducerServiceGrpcKt.ProducerServiceCoroutineImplBase +import dev.usbharu.owl.broker.external.toUUID +import dev.usbharu.owl.broker.service.ProducerService +import dev.usbharu.owl.broker.service.RegisterProducerRequest +import org.koin.core.annotation.Singleton +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +@Singleton +class ProducerService( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + private val producerService: ProducerService +) : + ProducerServiceCoroutineImplBase(coroutineContext) { + override suspend fun registerProducer(request: ProducerOuterClass.Producer): ProducerOuterClass.RegisterProducerResponse { + val registerProducer = producerService.registerProducer( + RegisterProducerRequest( + request.name, request.hostname + ) + ) + return ProducerOuterClass.RegisterProducerResponse.newBuilder().setId(registerProducer.toUUID()).build() + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/SubscribeTaskService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/SubscribeTaskService.kt new file mode 100644 index 0000000..f521ca0 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/SubscribeTaskService.kt @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.interfaces.grpc + +import dev.usbharu.owl.Consumer +import dev.usbharu.owl.SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineImplBase +import dev.usbharu.owl.broker.external.toUUID +import dev.usbharu.owl.broker.service.ConsumerService +import dev.usbharu.owl.broker.service.RegisterConsumerRequest +import org.koin.core.annotation.Singleton +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +@Singleton +class SubscribeTaskService( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + private val consumerService: ConsumerService +) : + SubscribeTaskServiceCoroutineImplBase(coroutineContext) { + override suspend fun subscribeTask(request: Consumer.SubscribeTaskRequest): Consumer.SubscribeTaskResponse { + val id = + consumerService.registerConsumer(RegisterConsumerRequest(request.name, request.hostname, request.tasksList)) + return Consumer.SubscribeTaskResponse.newBuilder().setId(id.toUUID()).build() + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt new file mode 100644 index 0000000..803c893 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/interfaces/grpc/TaskPublishService.kt @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.interfaces.grpc + +import dev.usbharu.owl.PublishTaskOuterClass +import dev.usbharu.owl.PublishTaskOuterClass.PublishedTask +import dev.usbharu.owl.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase +import dev.usbharu.owl.broker.external.toUUID +import dev.usbharu.owl.broker.service.PublishTask +import dev.usbharu.owl.broker.service.TaskPublishService +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import io.grpc.Status +import io.grpc.StatusException +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +@Singleton +class TaskPublishService( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + private val taskPublishService: TaskPublishService, + private val propertySerializerFactory: PropertySerializerFactory +) : + TaskPublishServiceCoroutineImplBase(coroutineContext) { + + override suspend fun publishTask(request: PublishTaskOuterClass.PublishTask): PublishedTask { + + logger.warn("aaaaaaaaaaa") + + + + return try { + + val publishedTask = taskPublishService.publishTask( + PublishTask( + request.name, + request.producerId.toUUID(), + PropertySerializeUtils.deserialize(propertySerializerFactory, request.propertiesMap) + ) + ) + PublishedTask.newBuilder().setName(publishedTask.name).setId(publishedTask.id.toUUID()).build() + }catch (e:Error){ + logger.warn("exception ",e) + throw StatusException(Status.INTERNAL) + } + + + } + + companion object{ + private val logger = LoggerFactory.getLogger(dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService::class.java) + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt new file mode 100644 index 0000000..8e89722 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/AssignQueuedTaskDecider.kt @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take +import org.koin.core.annotation.Singleton +import java.util.* +interface AssignQueuedTaskDecider { + fun findAssignableQueue(consumerId: UUID, numberOfConcurrent: Int): Flow +} +@Singleton +class AssignQueuedTaskDeciderImpl( + private val consumerRepository: ConsumerRepository, + private val queueStore: QueueStore +) : AssignQueuedTaskDecider { + override fun findAssignableQueue(consumerId: UUID, numberOfConcurrent: Int): Flow { + return flow { + val consumer = consumerRepository.findById(consumerId) ?: TODO() + emitAll( + queueStore.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( + consumer.tasks, + numberOfConcurrent + ).take(numberOfConcurrent) + ) + } + + } + +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/ConsumerService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/ConsumerService.kt new file mode 100644 index 0000000..62bb6df --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/ConsumerService.kt @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.consumer.Consumer +import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository +import org.koin.core.annotation.Single +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory +import java.util.* + +interface ConsumerService { + suspend fun registerConsumer(registerConsumerRequest: RegisterConsumerRequest): UUID +} + +@Singleton +class ConsumerServiceImpl(private val consumerRepository: ConsumerRepository) : ConsumerService { + override suspend fun registerConsumer(registerConsumerRequest: RegisterConsumerRequest): UUID { + val id = UUID.randomUUID() + + consumerRepository.save( + Consumer( + id, + registerConsumerRequest.name, + registerConsumerRequest.hostname, + registerConsumerRequest.tasks + ) + ) + + logger.info( + "Register a new Consumer. name: {} hostname: {} tasks: {}", + registerConsumerRequest.name, + registerConsumerRequest.hostname, + registerConsumerRequest.tasks.size + ) + + return id + } + + companion object { + private val logger = LoggerFactory.getLogger(ConsumerServiceImpl::class.java) + } +} + +data class RegisterConsumerRequest( + val name: String, + val hostname: String, + val tasks: List +) \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/ProducerService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/ProducerService.kt new file mode 100644 index 0000000..1c803a0 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/ProducerService.kt @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.producer.Producer +import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory +import java.time.Instant +import java.util.* + +interface ProducerService { + suspend fun registerProducer(producer: RegisterProducerRequest):UUID +} + +@Singleton +class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService { + override suspend fun registerProducer(producer: RegisterProducerRequest): UUID { + + val id = UUID.randomUUID() + + val saveProducer = Producer( + id = id, + name = producer.name, + hostname = producer.hostname, + registeredTask = emptyList(), + createdAt = Instant.now() + ) + + producerRepository.save(saveProducer) + + logger.info("Register a new Producer. name: {} hostname: {}",saveProducer.name,saveProducer.hostname) + return id + } + + companion object{ + private val logger = LoggerFactory.getLogger(ProducerServiceImpl::class.java) + } +} + +data class RegisterProducerRequest( + val name: String, + val hostname: String +) \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt new file mode 100644 index 0000000..bb49f6b --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueueStore.kt @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository +import kotlinx.coroutines.flow.Flow +import org.koin.core.annotation.Singleton + +interface QueueStore { + suspend fun enqueue(queuedTask: QueuedTask) + suspend fun enqueueAll(queuedTaskList: List) + + suspend fun dequeue(queuedTask: QueuedTask) + suspend fun dequeueAll(queuedTaskList: List) + fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks: List, limit: Int): Flow +} + +@Singleton +class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore { + override suspend fun enqueue(queuedTask: QueuedTask) { + queuedTaskRepository.save(queuedTask) + } + + override suspend fun enqueueAll(queuedTaskList: List) { + queuedTaskList.forEach { enqueue(it) } + } + + override suspend fun dequeue(queuedTask: QueuedTask) { + queuedTaskRepository.findByTaskIdAndAssignedConsumerIsNullAndUpdate(queuedTask.task.id, queuedTask) + } + + override suspend fun dequeueAll(queuedTaskList: List) { + return queuedTaskList.forEach { dequeue(it) } + } + + override fun findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority( + tasks: List, + limit: Int + ): Flow { + return queuedTaskRepository.findByTaskNameInAndAssignedConsumerIsNullAndOrderByPriority(tasks, limit) + } + +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt new file mode 100644 index 0000000..40dde20 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/QueuedTaskAssigner.kt @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory +import java.time.Instant +import java.util.* + +interface QueuedTaskAssigner { + fun ready(consumerId: UUID,numberOfConcurrent:Int): Flow +} + +@Singleton +class QueuedTaskAssignerImpl( + private val taskManagementService: TaskManagementService, + private val queueStore: QueueStore +) : QueuedTaskAssigner{ + override fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow { + return flow { + taskManagementService.findAssignableTask(consumerId, numberOfConcurrent) + .onEach { + val assignTask = assignTask(it, consumerId) + + if (assignTask != null) { + emit(assignTask) + } + } + .collect() + } + } + + private suspend fun assignTask(queuedTask: QueuedTask,consumerId: UUID):QueuedTask?{ + return try { + + val assignedTaskQueue = queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now()) + logger.trace("Try assign task: {} id: {} consumer: {}",queuedTask.task.name,queuedTask.task.id,consumerId) + + queueStore.dequeue(assignedTaskQueue) + + logger.debug( + "Assign Task. name: {} id: {} attempt: {} consumer: {}", + queuedTask.task.name, + queuedTask.task.id, + queuedTask.attempt, + queuedTask.assignedConsumer + ) + assignedTaskQueue + } catch (e: Exception) { + TODO("Not yet implemented") + } + } + + companion object{ + private val logger = LoggerFactory.getLogger(QueuedTaskAssignerImpl::class.java) + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt new file mode 100644 index 0000000..c3b8c46 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RegisterTaskService.kt @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory + +interface RegisterTaskService { + suspend fun registerTask(taskDefinition: TaskDefinition) + + suspend fun unregisterTask(name:String) +} + +@Singleton +class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService { + override suspend fun registerTask(taskDefinition: TaskDefinition) { + taskDefinitionRepository.save(taskDefinition) + + logger.info("Register a new task. name: {}",taskDefinition.name) + } + + // todo すでにpublish済みのタスクをどうするか決めさせる + override suspend fun unregisterTask(name: String) { + taskDefinitionRepository.deleteByName(name) + + logger.info("Unregister a task. name: {}",name) + } + + companion object{ + private val logger = LoggerFactory.getLogger(RegisterTaskServiceImpl::class.java) + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicy.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicy.kt new file mode 100644 index 0000000..fb4c667 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicy.kt @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import java.time.Instant +import kotlin.math.pow +import kotlin.math.roundToLong + +interface RetryPolicy { + fun nextRetry(now: Instant, attempt: Int): Instant +} + +class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy { + override fun nextRetry(now: Instant, attempt: Int): Instant = + now.plusSeconds(firstRetrySeconds.toDouble().pow(attempt + 1.0).roundToLong()) + +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicyFactory.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicyFactory.kt new file mode 100644 index 0000000..86ad27b --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/RetryPolicyFactory.kt @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +interface RetryPolicyFactory { + fun factory(name:String):RetryPolicy +} + +class DefaultRetryPolicyFactory(private val map: Map) : RetryPolicyFactory { + override fun factory(name: String): RetryPolicy { + return map[name]?: ExponentialRetryPolicy() + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt new file mode 100644 index 0000000..d575124 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskManagementService.kt @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask +import dev.usbharu.owl.broker.domain.model.task.Task +import dev.usbharu.owl.broker.domain.model.task.TaskRepository +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory +import java.time.Instant +import java.util.* + + +interface TaskManagementService { + + suspend fun startManagement() + fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow +} + +@Singleton +class TaskManagementServiceImpl( + private val taskScanner: TaskScanner, + private val queueStore: QueueStore, + private val taskDefinitionRepository: TaskDefinitionRepository, + private val assignQueuedTaskDecider: AssignQueuedTaskDecider, + private val retryPolicyFactory: RetryPolicyFactory, + private val taskRepository: TaskRepository +) : TaskManagementService { + + private var flow:Flow = flowOf() + override suspend fun startManagement() { + flow = taskScanner.startScan() + + flow.onEach { + enqueueTask(it) + }.collect() + + } + + + override fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow { + return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent) + } + + private suspend fun enqueueTask(task: Task):QueuedTask{ + + val queuedTask = QueuedTask( + task.attempt + 1, + Instant.now(), + task, + null, + null + ) + + val copy = task.copy( + nextRetry = retryPolicyFactory.factory(taskDefinitionRepository.findByName(task.name)?.retryPolicy.orEmpty()) + .nextRetry(Instant.now(), task.attempt) + ) + + taskRepository.save(copy) + + queueStore.enqueue(queuedTask) + logger.debug("Enqueue Task. {} {}", task.name, task.id) + return queuedTask + } + + companion object{ + private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java) + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt new file mode 100644 index 0000000..1aee8a6 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskPublishService.kt @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.task.Task +import dev.usbharu.owl.broker.domain.model.task.TaskRepository +import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository +import dev.usbharu.owl.common.property.PropertyValue +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory +import java.time.Instant +import java.util.* + +interface TaskPublishService { + suspend fun publishTask(publishTask: PublishTask): PublishedTask +} + +data class PublishTask( + val name: String, + val producerId: UUID, + val properties: Map +) + +data class PublishedTask( + val name: String, + val id: UUID +) + +@Singleton +class TaskPublishServiceImpl( + private val taskRepository: TaskRepository, + private val taskDefinitionRepository:TaskDefinitionRepository, + private val retryPolicyFactory: RetryPolicyFactory +) : TaskPublishService { + override suspend fun publishTask(publishTask: PublishTask): PublishedTask { + val id = UUID.randomUUID() + + val definition = taskDefinitionRepository.findByName(publishTask.name) ?: TODO() + + val published = Instant.now() + val nextRetry = retryPolicyFactory.factory(definition.name).nextRetry(published,0) + + val task = Task( + name = publishTask.name, + id = id, + publishProducerId = publishTask.producerId, + publishedAt = published, + completedAt = null, + attempt = 0, + properties = publishTask.properties, + nextRetry = nextRetry + ) + + taskRepository.save(task) + + logger.debug("Published task #{} name: {}", task.id, task.name) + + return PublishedTask( + name = publishTask.name, + id = id + ) + } + + companion object { + private val logger = LoggerFactory.getLogger(TaskPublishServiceImpl::class.java) + } +} \ No newline at end of file diff --git a/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt new file mode 100644 index 0000000..ff579e5 --- /dev/null +++ b/broker/src/main/kotlin/dev/usbharu/owl/broker/service/TaskScanner.kt @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.broker.service + +import dev.usbharu.owl.broker.domain.model.task.Task +import dev.usbharu.owl.broker.domain.model.task.TaskRepository +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.isActive +import org.koin.core.annotation.Singleton +import org.slf4j.LoggerFactory +import java.time.Instant + +interface TaskScanner { + + fun startScan(): Flow +} + +@Singleton +class TaskScannerImpl(private val taskRepository: TaskRepository) : + TaskScanner { + + override fun startScan(): Flow = flow { + while (currentCoroutineContext().isActive) { + emitAll(scanTask()) + delay(500) + } + } + + private fun scanTask(): Flow { + return taskRepository.findByNextRetryBefore(Instant.now()) + } + + companion object { + private val logger = LoggerFactory.getLogger(TaskScannerImpl::class.java) + } +} \ No newline at end of file diff --git a/broker/src/main/proto/consumer.proto b/broker/src/main/proto/consumer.proto new file mode 100644 index 0000000..252d4a2 --- /dev/null +++ b/broker/src/main/proto/consumer.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; +import "uuid.proto"; + +option java_package = "dev.usbharu.owl"; + +message SubscribeTaskRequest { + string name = 1; + string hostname = 2; + repeated string tasks = 3;; +} + +message SubscribeTaskResponse { + UUID id = 1; +} + +service SubscribeTaskService { + rpc SubscribeTask (SubscribeTaskRequest) returns (SubscribeTaskResponse); +} \ No newline at end of file diff --git a/broker/src/main/proto/definition_task.proto b/broker/src/main/proto/definition_task.proto new file mode 100644 index 0000000..6cab525 --- /dev/null +++ b/broker/src/main/proto/definition_task.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +option java_package = "dev.usbharu.owl"; + +import "google/protobuf/empty.proto"; +import "uuid.proto"; + + +message TaskDefinition { + string name = 1; + int32 priority = 2; + int32 max_retry = 3; + int64 timeout_milli = 4; + int64 property_definition_hash = 5; + UUID producer_id = 6; + string retryPolicy = 7; +} + +message TaskDefined { + string task_id = 1; +} + +message TaskUnregister { + string name = 1; + UUID producer_id = 2; +} + +service DefinitionTaskService { + rpc register(TaskDefinition) returns (TaskDefined); + rpc unregister(TaskUnregister) returns (google.protobuf.Empty); +} \ No newline at end of file diff --git a/broker/src/main/proto/producer.proto b/broker/src/main/proto/producer.proto new file mode 100644 index 0000000..b4bbcd7 --- /dev/null +++ b/broker/src/main/proto/producer.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +import "uuid.proto"; + +option java_package = "dev.usbharu.owl"; + +message Producer { + string name = 1; + string hostname = 2; +} + +message RegisterProducerResponse { + UUID id = 1; +} + +service ProducerService { + rpc registerProducer (Producer) returns (RegisterProducerResponse); +} \ No newline at end of file diff --git a/broker/src/main/proto/property.proto b/broker/src/main/proto/property.proto new file mode 100644 index 0000000..138e7e2 --- /dev/null +++ b/broker/src/main/proto/property.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +option java_package = "dev.usbharu.owl"; + +message Property{ + oneof value { + google.protobuf.Empty empty = 1; + string string = 2; + int32 integer = 3; + } +} \ No newline at end of file diff --git a/broker/src/main/proto/publish_task.proto b/broker/src/main/proto/publish_task.proto new file mode 100644 index 0000000..447a4bc --- /dev/null +++ b/broker/src/main/proto/publish_task.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +import "uuid.proto"; +import "property.proto"; + +option java_package = "dev.usbharu.owl"; + + +message PublishTask { + string name = 1; + google.protobuf.Timestamp publishedAt = 2; + map properties = 3; + UUID producer_id = 4; +} + +message PublishedTask { + string name = 1; + UUID id = 2; +} + +service TaskPublishService { + rpc publishTask (PublishTask) returns (PublishedTask); +} diff --git a/broker/src/main/proto/task.proto b/broker/src/main/proto/task.proto new file mode 100644 index 0000000..48566fd --- /dev/null +++ b/broker/src/main/proto/task.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +import "uuid.proto"; +import "google/protobuf/timestamp.proto"; +import "property.proto"; + +option java_package = "dev.usbharu.owl"; + +message ReadyRequest { + int32 number_of_concurrent = 1; + UUID consumer_id = 2; +} + +message TaskRequest { + string name = 1; + UUID id = 2; + int32 attempt = 4; + google.protobuf.Timestamp queuedAt = 5; + map properties = 6; +} + +service AssignmentTaskService { + rpc ready (stream ReadyRequest) returns (stream TaskRequest); +} \ No newline at end of file diff --git a/broker/src/main/proto/task_result.proto b/broker/src/main/proto/task_result.proto new file mode 100644 index 0000000..43a07ae --- /dev/null +++ b/broker/src/main/proto/task_result.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; +import "uuid.proto"; +import "google/protobuf/empty.proto"; +import "property.proto"; + +option java_package = "dev.usbharu.owl"; + +message TaskResult { + UUID id = 1; + bool success = 2; + int32 attempt = 3; + map result = 4; +} \ No newline at end of file diff --git a/broker/src/main/proto/uuid.proto b/broker/src/main/proto/uuid.proto new file mode 100644 index 0000000..26d6100 --- /dev/null +++ b/broker/src/main/proto/uuid.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +option java_package = "dev.usbharu.owl"; + +message UUID { + uint64 most_significant_uuid_bits = 1; + uint64 least_significant_uuid_bits = 2; +} \ No newline at end of file diff --git a/broker/src/main/resources/log4j2.xml b/broker/src/main/resources/log4j2.xml new file mode 100644 index 0000000..e202a6f --- /dev/null +++ b/broker/src/main/resources/log4j2.xml @@ -0,0 +1,40 @@ + + + + + + + %d{yyyy/MM/dd HH:mm:ss.SSS} [%t] %-6p %c{10}#%M:%L | %m%n + + + + + ${format1} + + + + + + + + + + + + + + \ No newline at end of file diff --git a/broker/src/test/kotlin/dev/usbharu/owl/broker/service/TaskManagementServiceImplTest.kt b/broker/src/test/kotlin/dev/usbharu/owl/broker/service/TaskManagementServiceImplTest.kt new file mode 100644 index 0000000..6d968be --- /dev/null +++ b/broker/src/test/kotlin/dev/usbharu/owl/broker/service/TaskManagementServiceImplTest.kt @@ -0,0 +1,13 @@ +package dev.usbharu.owl.broker.service + +import org.junit.jupiter.api.Test + +class TaskManagementServiceImplTest { + + @Test + fun findAssignableTask() { + val taskManagementServiceImpl = TaskManagementServiceImpl() + + Thread.sleep(10000) + } +} \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts new file mode 100644 index 0000000..3503056 --- /dev/null +++ b/build.gradle.kts @@ -0,0 +1,37 @@ +plugins { + kotlin("jvm") version "1.9.22" +} + + +allprojects { + group = "dev.usbharu" + version = "0.0.1" + + + repositories { + mavenCentral() + } +} + +subprojects { + apply { + plugin("org.jetbrains.kotlin.jvm") + } + kotlin { + jvmToolchain(17) + } + + dependencies { + implementation("org.slf4j:slf4j-api:2.0.12") + testImplementation("org.junit.jupiter:junit-jupiter:5.10.2") + + + } + + + tasks.test { + useJUnitPlatform() + } + + +} \ No newline at end of file diff --git a/common/build.gradle.kts b/common/build.gradle.kts new file mode 100644 index 0000000..58cc541 --- /dev/null +++ b/common/build.gradle.kts @@ -0,0 +1,21 @@ +plugins { + kotlin("jvm") +} + +group = "dev.usbharu" +version = "1.0-SNAPSHOT" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation("org.jetbrains.kotlin:kotlin-test") +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt new file mode 100644 index 0000000..4098410 --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/IntegerPropertyValue.kt @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + +class IntegerPropertyValue(override val value: Int) : PropertyValue() { + override val type: PropertyType + get() = PropertyType.integer +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt new file mode 100644 index 0000000..b5e151d --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializeUtils.kt @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + +object PropertySerializeUtils { + fun serialize( + serializerFactory: PropertySerializerFactory, + properties: Map + ): Map = + properties.map { it.key to serializerFactory.factory(it.value).serialize(it.value) }.toMap() + + fun deserialize( + serializerFactory: PropertySerializerFactory, + properties: Map + ): Map = + properties.map { it.key to serializerFactory.factory(it.value).deserialize(it.value) }.toMap() +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt new file mode 100644 index 0000000..85194fe --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializer.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + +interface PropertySerializer { + fun isSupported(propertyValue: PropertyValue): Boolean + fun isSupported(string: String): Boolean + fun serialize(propertyValue: PropertyValue): String + + fun deserialize(string: String): PropertyValue +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt new file mode 100644 index 0000000..9caaa8a --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactory.kt @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + +interface PropertySerializerFactory { + fun factory(propertyValue: PropertyValue): PropertySerializer + fun factory(string: String): PropertySerializer +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactoryImpl.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactoryImpl.kt new file mode 100644 index 0000000..f891f7a --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertySerializerFactoryImpl.kt @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + + +class PropertySerializerFactoryImpl : PropertySerializerFactory { + override fun factory(propertyValue: PropertyValue): PropertySerializer { + TODO("Not yet implemented") + } + + override fun factory(string: String): PropertySerializer { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt new file mode 100644 index 0000000..6694114 --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyType.kt @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + +enum class PropertyType { + integer, + string +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt new file mode 100644 index 0000000..6b7f392 --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/property/PropertyValue.kt @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.property + +sealed class PropertyValue { + abstract val value:Any + abstract val type: PropertyType +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt b/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt new file mode 100644 index 0000000..9042917 --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/retry/RetryPolicy.kt @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.retry + +interface RetryPolicy { +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt new file mode 100644 index 0000000..44c149b --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/PropertyDefinition.kt @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.task + +import dev.usbharu.owl.common.property.PropertyType + +class PropertyDefinition(val map: Map) : Map by map{ + +} diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt new file mode 100644 index 0000000..57d55ec --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/PublishedTask.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.task + +import java.time.Instant +import java.util.UUID + +data class PublishedTask( + val task: T, + val id: UUID, + val published: Instant +) \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt new file mode 100644 index 0000000..2f196e8 --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/Task.kt @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.task + +open class Task { +} \ No newline at end of file diff --git a/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt new file mode 100644 index 0000000..6a8e7f3 --- /dev/null +++ b/common/src/main/kotlin/dev/usbharu/owl/common/task/TaskDefinition.kt @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.common.task + +import dev.usbharu.owl.common.property.PropertyValue +import dev.usbharu.owl.common.retry.RetryPolicy + +interface TaskDefinition { + val name: String + val priority: Int + val maxRetry: Int + val retryPolicy:RetryPolicy + val timeoutMilli: Long + val propertyDefinition: PropertyDefinition + + fun serialize(task: T): Map + fun deserialize(value: Map): T +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..7fc6f1f --- /dev/null +++ b/gradle.properties @@ -0,0 +1 @@ +kotlin.code.style=official diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..249e583 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..c380688 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Sun Feb 18 23:33:24 JST 2024 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100644 index 0000000..1b6c787 --- /dev/null +++ b/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..107acd3 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/producer/api/build.gradle.kts b/producer/api/build.gradle.kts new file mode 100644 index 0000000..7e049bf --- /dev/null +++ b/producer/api/build.gradle.kts @@ -0,0 +1,21 @@ +plugins { + kotlin("jvm") +} + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + api(project(":common")) +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} \ No newline at end of file diff --git a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt new file mode 100644 index 0000000..e4df89a --- /dev/null +++ b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducer.kt @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.producer.api + +import dev.usbharu.owl.common.task.PublishedTask +import dev.usbharu.owl.common.task.Task +import dev.usbharu.owl.common.task.TaskDefinition + +interface OwlProducer { + + suspend fun registerTask(taskDefinition: TaskDefinition) + suspend fun publishTask(task: T): PublishedTask +} diff --git a/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerFactory.kt b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerFactory.kt new file mode 100644 index 0000000..419689b --- /dev/null +++ b/producer/api/src/main/kotlin/dev/usbharu/owl/producer/api/OwlProducerFactory.kt @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.producer.api + +object OwlProducerFactory { + fun createProducer():OwlProducer{ + TODO() + } +} \ No newline at end of file diff --git a/producer/impl/build.gradle.kts b/producer/impl/build.gradle.kts new file mode 100644 index 0000000..87bf53a --- /dev/null +++ b/producer/impl/build.gradle.kts @@ -0,0 +1,21 @@ +plugins { + kotlin("jvm") +} + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + implementation(project(":producer:api")) +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/DefaultOwlProducer.kt b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/DefaultOwlProducer.kt new file mode 100644 index 0000000..1607a62 --- /dev/null +++ b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/DefaultOwlProducer.kt @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.producer.impl + +import dev.usbharu.owl.common.task.PublishedTask +import dev.usbharu.owl.common.task.Task +import dev.usbharu.owl.common.task.TaskDefinition +import dev.usbharu.owl.producer.api.OwlProducer + +class DefaultOwlProducer : OwlProducer { + override suspend fun registerTask(taskDefinition: TaskDefinition) { + TODO("Not yet implemented") + } + + override suspend fun publishTask(task: T): PublishedTask { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/OwlTaskDatasource.kt b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/OwlTaskDatasource.kt new file mode 100644 index 0000000..7c9410a --- /dev/null +++ b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/OwlTaskDatasource.kt @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.producer.impl + +import dev.usbharu.owl.common.task.PublishedTask +import dev.usbharu.owl.common.task.Task +import dev.usbharu.owl.common.task.TaskDefinition + +interface OwlTaskDatasource { + + suspend fun registerTask(definition: TaskDefinition) + suspend fun publishTask(publishedTask: PublishedTask) +} \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/DatasourceFactory.kt b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/DatasourceFactory.kt new file mode 100644 index 0000000..293ba15 --- /dev/null +++ b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/DatasourceFactory.kt @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.producer.impl.datasource + +import dev.usbharu.producer.impl.OwlTaskDatasource + +interface DatasourceFactory { + suspend fun create():OwlTaskDatasource +} \ No newline at end of file diff --git a/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/ServiceProviderDatasourceFactory.kt b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/ServiceProviderDatasourceFactory.kt new file mode 100644 index 0000000..f198a27 --- /dev/null +++ b/producer/impl/src/main/kotlin/dev/usbharu/producer/impl/datasource/ServiceProviderDatasourceFactory.kt @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.producer.impl.datasource + +import dev.usbharu.producer.impl.OwlTaskDatasource +import java.util.ServiceLoader +import kotlin.jvm.optionals.getOrElse +import kotlin.jvm.optionals.getOrNull + +class ServiceProviderDatasourceFactory : DatasourceFactory { + override suspend fun create(): OwlTaskDatasource { + val serviceLoader: ServiceLoader = ServiceLoader.load(OwlTaskDatasource::class.java) + + return serviceLoader.findFirst().getOrElse { + throw IllegalStateException("") + } + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts new file mode 100644 index 0000000..8e2a06b --- /dev/null +++ b/settings.gradle.kts @@ -0,0 +1,12 @@ +plugins { + id("org.gradle.toolchains.foojay-resolver-convention") version "0.5.0" +} +rootProject.name = "owl" +include("common") +include("producer:api") +findProject(":producer:api")?.name = "api" +include("producer:impl") +findProject(":producer:impl")?.name = "impl" +include("broker") +include("broker:broker-mongodb") +findProject(":broker:broker-mongodb")?.name = "broker-mongodb"