mirror of https://github.com/usbharu/Hideout.git
commit
f0c4eb93f8
|
@ -2,11 +2,6 @@ plugins {
|
|||
// alias(libs.plugins.kotlin.jvm)
|
||||
kotlin("jvm")
|
||||
id("com.google.protobuf") version "0.9.4"
|
||||
id("com.google.devtools.ksp") version "1.9.25-1.0.20"
|
||||
}
|
||||
|
||||
apply {
|
||||
plugin("com.google.devtools.ksp")
|
||||
}
|
||||
|
||||
|
||||
|
@ -26,10 +21,7 @@ dependencies {
|
|||
implementation(project(":owl-common"))
|
||||
implementation("org.apache.logging.log4j:log4j-slf4j2-impl:2.23.1")
|
||||
implementation(platform("io.insert-koin:koin-bom:3.5.6"))
|
||||
implementation(platform("io.insert-koin:koin-annotations-bom:1.3.1"))
|
||||
implementation("io.insert-koin:koin-core")
|
||||
compileOnly("io.insert-koin:koin-annotations")
|
||||
ksp("io.insert-koin:koin-ksp-compiler:1.3.1")
|
||||
}
|
||||
|
||||
tasks.test {
|
||||
|
|
|
@ -1,11 +1,6 @@
|
|||
plugins {
|
||||
application
|
||||
kotlin("jvm")
|
||||
id("com.google.devtools.ksp") version "1.9.25-1.0.20"
|
||||
}
|
||||
|
||||
apply {
|
||||
plugin("com.google.devtools.ksp")
|
||||
}
|
||||
|
||||
group = "dev.usbharu"
|
||||
|
@ -22,8 +17,6 @@ dependencies {
|
|||
implementation(platform("io.insert-koin:koin-bom:3.5.6"))
|
||||
implementation(platform("io.insert-koin:koin-annotations-bom:1.3.1"))
|
||||
implementation("io.insert-koin:koin-core")
|
||||
compileOnly("io.insert-koin:koin-annotations")
|
||||
ksp("io.insert-koin:koin-ksp-compiler:1.3.1")
|
||||
}
|
||||
|
||||
tasks.test {
|
||||
|
|
|
@ -1,25 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2024 usbharu
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package dev.usbharu.owl.broker.mongodb
|
||||
|
||||
import org.koin.core.annotation.ComponentScan
|
||||
import org.koin.core.annotation.Module
|
||||
|
||||
@Module
|
||||
@ComponentScan("dev.usbharu.owl.broker.mongodb")
|
||||
class MongoModule
|
||||
|
|
@ -20,15 +20,20 @@ import com.mongodb.ConnectionString
|
|||
import com.mongodb.MongoClientSettings
|
||||
import com.mongodb.kotlin.client.coroutine.MongoClient
|
||||
import dev.usbharu.owl.broker.ModuleContext
|
||||
import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository
|
||||
import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository
|
||||
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository
|
||||
import dev.usbharu.owl.broker.domain.model.task.TaskRepository
|
||||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
|
||||
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository
|
||||
import org.bson.UuidRepresentation
|
||||
import org.koin.core.module.Module
|
||||
import org.koin.dsl.module
|
||||
import org.koin.ksp.generated.module
|
||||
|
||||
class MongoModuleContext : ModuleContext {
|
||||
override fun module(): Module {
|
||||
val module = MongoModule().module
|
||||
module.includes(module {
|
||||
|
||||
return module {
|
||||
single {
|
||||
val clientSettings =
|
||||
MongoClientSettings.builder()
|
||||
|
@ -46,7 +51,12 @@ class MongoModuleContext : ModuleContext {
|
|||
MongoClient.create(clientSettings)
|
||||
.getDatabase(System.getProperty("owl.broker.mongo.database", "mongo-test"))
|
||||
}
|
||||
})
|
||||
return module
|
||||
single<ConsumerRepository> { MongodbConsumerRepository(get()) }
|
||||
single<ProducerRepository> { MongodbProducerRepository(get()) }
|
||||
single<QueuedTaskRepository> { MongodbQueuedTaskRepository(get(), get()) }
|
||||
single<TaskDefinitionRepository> { MongodbTaskDefinitionRepository(get()) }
|
||||
single<TaskRepository> { MongodbTaskRepository(get(), get()) }
|
||||
single<TaskResultRepository> { MongodbTaskResultRepository(get(), get()) }
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,10 +27,8 @@ import kotlinx.coroutines.withContext
|
|||
import org.bson.BsonType
|
||||
import org.bson.codecs.pojo.annotations.BsonId
|
||||
import org.bson.codecs.pojo.annotations.BsonRepresentation
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.util.*
|
||||
|
||||
@Singleton
|
||||
class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository {
|
||||
|
||||
private val collection = database.getCollection<ConsumerMongodb>("consumers")
|
||||
|
|
|
@ -23,11 +23,9 @@ import dev.usbharu.owl.broker.domain.model.producer.Producer
|
|||
import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
@Singleton
|
||||
class MongodbProducerRepository(database: MongoDatabase) : ProducerRepository {
|
||||
|
||||
private val collection = database.getCollection<ProducerMongodb>("producers")
|
||||
|
|
|
@ -36,11 +36,9 @@ import kotlinx.coroutines.withContext
|
|||
import org.bson.BsonType
|
||||
import org.bson.codecs.pojo.annotations.BsonId
|
||||
import org.bson.codecs.pojo.annotations.BsonRepresentation
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
@Singleton
|
||||
class MongodbQueuedTaskRepository(
|
||||
private val propertySerializerFactory: PropertySerializerFactory,
|
||||
database: MongoDatabase
|
||||
|
|
|
@ -27,9 +27,7 @@ import kotlinx.coroutines.withContext
|
|||
import org.bson.BsonType
|
||||
import org.bson.codecs.pojo.annotations.BsonId
|
||||
import org.bson.codecs.pojo.annotations.BsonRepresentation
|
||||
import org.koin.core.annotation.Singleton
|
||||
|
||||
@Singleton
|
||||
class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionRepository {
|
||||
|
||||
private val collection = database.getCollection<TaskDefinitionMongodb>("task_definition")
|
||||
|
|
|
@ -33,11 +33,10 @@ import kotlinx.coroutines.withContext
|
|||
import org.bson.BsonType
|
||||
import org.bson.codecs.pojo.annotations.BsonId
|
||||
import org.bson.codecs.pojo.annotations.BsonRepresentation
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
@Singleton
|
||||
|
||||
class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) :
|
||||
TaskRepository {
|
||||
|
||||
|
|
|
@ -31,10 +31,8 @@ import kotlinx.coroutines.withContext
|
|||
import org.bson.BsonType
|
||||
import org.bson.codecs.pojo.annotations.BsonId
|
||||
import org.bson.codecs.pojo.annotations.BsonRepresentation
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.util.*
|
||||
|
||||
@Singleton
|
||||
class MongodbTaskResultRepository(
|
||||
database: MongoDatabase,
|
||||
private val propertySerializerFactory: PropertySerializerFactory
|
||||
|
|
|
@ -16,18 +16,73 @@
|
|||
|
||||
package dev.usbharu.owl.broker
|
||||
|
||||
import dev.usbharu.owl.broker.interfaces.grpc.*
|
||||
import dev.usbharu.owl.broker.service.*
|
||||
import dev.usbharu.owl.broker.service.ProducerService
|
||||
import dev.usbharu.owl.broker.service.TaskPublishService
|
||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||
import dev.usbharu.owl.common.retry.DefaultRetryPolicyFactory
|
||||
import dev.usbharu.owl.common.retry.ExponentialRetryPolicy
|
||||
import dev.usbharu.owl.common.retry.RetryPolicyFactory
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.koin.core.context.startKoin
|
||||
import org.koin.dsl.module
|
||||
import org.koin.ksp.generated.defaultModule
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.util.*
|
||||
|
||||
val logger = LoggerFactory.getLogger("MAIN")
|
||||
|
||||
val mainModule = module {
|
||||
single<AssignQueuedTaskDecider> {
|
||||
AssignQueuedTaskDeciderImpl(get(), get())
|
||||
}
|
||||
single<TaskScanner> { TaskScannerImpl(get()) }
|
||||
single<TaskPublishService> { TaskPublishServiceImpl(get(), get(), get()) }
|
||||
single<TaskManagementService> {
|
||||
TaskManagementServiceImpl(
|
||||
taskScanner = get(),
|
||||
queueStore = get(),
|
||||
taskDefinitionRepository = get(),
|
||||
assignQueuedTaskDecider = get(),
|
||||
retryPolicyFactory = get(),
|
||||
taskRepository = get(),
|
||||
queueScanner = get(),
|
||||
taskResultRepository = get()
|
||||
)
|
||||
}
|
||||
single<RegisterTaskService> { RegisterTaskServiceImpl(get()) }
|
||||
single<QueueStore> { QueueStoreImpl(get()) }
|
||||
single<QueueScanner> { QueueScannerImpl(get()) }
|
||||
single<QueuedTaskAssigner> { QueuedTaskAssignerImpl(get(), get()) }
|
||||
single<ProducerService> { ProducerServiceImpl(get()) }
|
||||
single<PropertySerializerFactory> { DefaultPropertySerializerFactory() }
|
||||
single<ConsumerService> { ConsumerServiceImpl(get()) }
|
||||
single {
|
||||
OwlBrokerApplication(
|
||||
assignmentTaskService = get(),
|
||||
definitionTaskService = get(),
|
||||
producerService = get(),
|
||||
subscribeTaskService = get(),
|
||||
taskPublishService = get(),
|
||||
taskManagementService = get(),
|
||||
taskResultSubscribeService = get(),
|
||||
taskResultService = get()
|
||||
)
|
||||
}
|
||||
single { AssignmentTaskService(queuedTaskAssigner = get(), propertySerializerFactory = get()) }
|
||||
single { DefinitionTaskService(registerTaskService = get()) }
|
||||
single { dev.usbharu.owl.broker.interfaces.grpc.ProducerService(producerService = get()) }
|
||||
single { SubscribeTaskService(consumerService = get()) }
|
||||
single {
|
||||
dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService(
|
||||
taskPublishService = get(),
|
||||
propertySerializerFactory = get()
|
||||
)
|
||||
}
|
||||
single { TaskResultService(taskManagementService = get(), propertySerializerFactory = get()) }
|
||||
single { TaskResultSubscribeService(taskManagementService = get(), propertySerializerFactory = get()) }
|
||||
}
|
||||
|
||||
fun main() {
|
||||
val moduleContexts = ServiceLoader.load(ModuleContext::class.java)
|
||||
|
||||
|
@ -43,8 +98,9 @@ fun main() {
|
|||
single<RetryPolicyFactory> {
|
||||
DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy()))
|
||||
}
|
||||
|
||||
}
|
||||
modules(defaultModule, module, moduleContext.module())
|
||||
modules(mainModule, module, moduleContext.module())
|
||||
}
|
||||
|
||||
val application = koin.koin.get<OwlBrokerApplication>()
|
||||
|
|
|
@ -24,9 +24,7 @@ import kotlinx.coroutines.CoroutineScope
|
|||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.launch
|
||||
import org.koin.core.annotation.Singleton
|
||||
|
||||
@Singleton
|
||||
class OwlBrokerApplication(
|
||||
private val assignmentTaskService: AssignmentTaskService,
|
||||
private val definitionTaskService: DefinitionTaskService,
|
||||
|
|
|
@ -29,12 +29,11 @@ import io.grpc.StatusException
|
|||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flatMapMerge
|
||||
import kotlinx.coroutines.flow.map
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
@Singleton
|
||||
|
||||
class AssignmentTaskService(
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
private val queuedTaskAssigner: QueuedTaskAssigner,
|
||||
|
|
|
@ -22,11 +22,9 @@ import dev.usbharu.owl.DefinitionTask.TaskDefined
|
|||
import dev.usbharu.owl.DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineImplBase
|
||||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition
|
||||
import dev.usbharu.owl.broker.service.RegisterTaskService
|
||||
import org.koin.core.annotation.Singleton
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
@Singleton
|
||||
class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineContext,private val registerTaskService: RegisterTaskService) :
|
||||
DefinitionTaskServiceCoroutineImplBase(coroutineContext) {
|
||||
override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined {
|
||||
|
|
|
@ -21,11 +21,10 @@ import dev.usbharu.owl.ProducerServiceGrpcKt.ProducerServiceCoroutineImplBase
|
|||
import dev.usbharu.owl.broker.external.toUUID
|
||||
import dev.usbharu.owl.broker.service.ProducerService
|
||||
import dev.usbharu.owl.broker.service.RegisterProducerRequest
|
||||
import org.koin.core.annotation.Singleton
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
@Singleton
|
||||
|
||||
class ProducerService(
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
private val producerService: ProducerService
|
||||
|
|
|
@ -21,11 +21,9 @@ import dev.usbharu.owl.SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineI
|
|||
import dev.usbharu.owl.broker.external.toUUID
|
||||
import dev.usbharu.owl.broker.service.ConsumerService
|
||||
import dev.usbharu.owl.broker.service.RegisterConsumerRequest
|
||||
import org.koin.core.annotation.Singleton
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
@Singleton
|
||||
class SubscribeTaskService(
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
private val consumerService: ConsumerService
|
||||
|
|
|
@ -27,12 +27,10 @@ import dev.usbharu.owl.common.property.PropertySerializeUtils
|
|||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||
import io.grpc.Status
|
||||
import io.grpc.StatusException
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
@Singleton
|
||||
class TaskPublishService(
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
private val taskPublishService: TaskPublishService,
|
||||
|
|
|
@ -30,13 +30,11 @@ import kotlinx.coroutines.CancellationException
|
|||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.util.*
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
@Singleton
|
||||
class TaskResultService(
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
private val taskManagementService: TaskManagementService,
|
||||
|
|
|
@ -23,11 +23,9 @@ import dev.usbharu.owl.common.property.PropertySerializeUtils
|
|||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import org.koin.core.annotation.Singleton
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
@Singleton
|
||||
class TaskResultSubscribeService(
|
||||
private val taskManagementService: TaskManagementService,
|
||||
private val propertySerializerFactory: PropertySerializerFactory,
|
||||
|
|
|
@ -23,12 +23,10 @@ import kotlinx.coroutines.flow.Flow
|
|||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.take
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.util.*
|
||||
interface AssignQueuedTaskDecider {
|
||||
fun findAssignableQueue(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
|
||||
}
|
||||
@Singleton
|
||||
class AssignQueuedTaskDeciderImpl(
|
||||
private val consumerRepository: ConsumerRepository,
|
||||
private val queueStore: QueueStore
|
||||
|
|
|
@ -18,7 +18,6 @@ package dev.usbharu.owl.broker.service
|
|||
|
||||
import dev.usbharu.owl.broker.domain.model.consumer.Consumer
|
||||
import dev.usbharu.owl.broker.domain.model.consumer.ConsumerRepository
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.util.*
|
||||
|
||||
|
@ -26,7 +25,6 @@ interface ConsumerService {
|
|||
suspend fun registerConsumer(registerConsumerRequest: RegisterConsumerRequest): UUID
|
||||
}
|
||||
|
||||
@Singleton
|
||||
class ConsumerServiceImpl(private val consumerRepository: ConsumerRepository) : ConsumerService {
|
||||
override suspend fun registerConsumer(registerConsumerRequest: RegisterConsumerRequest): UUID {
|
||||
val id = UUID.randomUUID()
|
||||
|
|
|
@ -17,9 +17,7 @@
|
|||
package dev.usbharu.owl.broker.service
|
||||
|
||||
import dev.usbharu.owl.common.property.*
|
||||
import org.koin.core.annotation.Singleton
|
||||
|
||||
@Singleton(binds = [PropertySerializerFactory::class])
|
||||
class DefaultPropertySerializerFactory :
|
||||
CustomPropertySerializerFactory(
|
||||
setOf(
|
||||
|
|
|
@ -18,16 +18,15 @@ package dev.usbharu.owl.broker.service
|
|||
|
||||
import dev.usbharu.owl.broker.domain.model.producer.Producer
|
||||
import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
interface ProducerService {
|
||||
suspend fun registerProducer(producer: RegisterProducerRequest):UUID
|
||||
suspend fun registerProducer(producer: RegisterProducerRequest): UUID
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
||||
class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService {
|
||||
override suspend fun registerProducer(producer: RegisterProducerRequest): UUID {
|
||||
|
||||
|
@ -43,11 +42,11 @@ class ProducerServiceImpl(private val producerRepository: ProducerRepository) :
|
|||
|
||||
producerRepository.save(saveProducer)
|
||||
|
||||
logger.info("Register a new Producer. name: {} hostname: {}",saveProducer.name,saveProducer.hostname)
|
||||
logger.info("Register a new Producer. name: {} hostname: {}", saveProducer.name, saveProducer.hostname)
|
||||
return id
|
||||
}
|
||||
|
||||
companion object{
|
||||
companion object {
|
||||
private val logger = LoggerFactory.getLogger(ProducerServiceImpl::class.java)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,14 +23,13 @@ import kotlinx.coroutines.flow.Flow
|
|||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.isActive
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.time.Instant
|
||||
|
||||
interface QueueScanner {
|
||||
fun startScan(): Flow<QueuedTask>
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
||||
class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner {
|
||||
override fun startScan(): Flow<QueuedTask> {
|
||||
return flow {
|
||||
|
|
|
@ -19,7 +19,6 @@ package dev.usbharu.owl.broker.service
|
|||
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
|
||||
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTaskRepository
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import org.koin.core.annotation.Singleton
|
||||
import java.time.Instant
|
||||
|
||||
interface QueueStore {
|
||||
|
@ -33,7 +32,7 @@ interface QueueStore {
|
|||
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
||||
class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore {
|
||||
override suspend fun enqueue(queuedTask: QueuedTask) {
|
||||
queuedTaskRepository.save(queuedTask)
|
||||
|
|
|
@ -19,7 +19,6 @@ package dev.usbharu.owl.broker.service
|
|||
import dev.usbharu.owl.broker.domain.exception.service.QueueCannotDequeueException
|
||||
import dev.usbharu.owl.broker.domain.model.queuedtask.QueuedTask
|
||||
import kotlinx.coroutines.flow.*
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
@ -28,7 +27,7 @@ interface QueuedTaskAssigner {
|
|||
fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
||||
class QueuedTaskAssignerImpl(
|
||||
private val taskManagementService: TaskManagementService,
|
||||
private val queueStore: QueueStore
|
||||
|
|
|
@ -19,7 +19,6 @@ package dev.usbharu.owl.broker.service
|
|||
import dev.usbharu.owl.broker.domain.exception.service.IncompatibleTaskException
|
||||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition
|
||||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
interface RegisterTaskService {
|
||||
|
@ -28,7 +27,7 @@ interface RegisterTaskService {
|
|||
suspend fun unregisterTask(name:String)
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
||||
class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService {
|
||||
override suspend fun registerTask(taskDefinition: TaskDefinition) {
|
||||
val definedTask = taskDefinitionRepository.findByName(taskDefinition.name)
|
||||
|
|
|
@ -27,7 +27,6 @@ import dev.usbharu.owl.broker.domain.model.taskresult.TaskResultRepository
|
|||
import dev.usbharu.owl.common.retry.RetryPolicyFactory
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.*
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
@ -43,7 +42,6 @@ interface TaskManagementService {
|
|||
fun subscribeResult(producerId: UUID): Flow<TaskResults>
|
||||
}
|
||||
|
||||
@Singleton
|
||||
class TaskManagementServiceImpl(
|
||||
private val taskScanner: TaskScanner,
|
||||
private val queueStore: QueueStore,
|
||||
|
|
|
@ -22,7 +22,6 @@ import dev.usbharu.owl.broker.domain.model.task.TaskRepository
|
|||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinitionRepository
|
||||
import dev.usbharu.owl.common.property.PropertyValue
|
||||
import dev.usbharu.owl.common.retry.RetryPolicyFactory
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
@ -43,7 +42,6 @@ data class PublishedTask(
|
|||
val id: UUID
|
||||
)
|
||||
|
||||
@Singleton
|
||||
class TaskPublishServiceImpl(
|
||||
private val taskRepository: TaskRepository,
|
||||
private val taskDefinitionRepository: TaskDefinitionRepository,
|
||||
|
|
|
@ -24,7 +24,6 @@ import kotlinx.coroutines.flow.Flow
|
|||
import kotlinx.coroutines.flow.emitAll
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.isActive
|
||||
import org.koin.core.annotation.Singleton
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.time.Instant
|
||||
|
||||
|
@ -33,7 +32,6 @@ interface TaskScanner {
|
|||
fun startScan(): Flow<Task>
|
||||
}
|
||||
|
||||
@Singleton
|
||||
class TaskScannerImpl(private val taskRepository: TaskRepository) :
|
||||
TaskScanner {
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package dev.usbharu.owl.producer.embedded
|
||||
|
||||
import dev.usbharu.owl.broker.OwlBrokerApplication
|
||||
import dev.usbharu.owl.broker.mainModule
|
||||
import dev.usbharu.owl.common.retry.RetryPolicyFactory
|
||||
import dev.usbharu.owl.common.task.PublishedTask
|
||||
import dev.usbharu.owl.common.task.Task
|
||||
|
@ -25,7 +26,6 @@ import dev.usbharu.owl.producer.api.OwlProducer
|
|||
import org.koin.core.Koin
|
||||
import org.koin.core.context.GlobalContext.startKoin
|
||||
import org.koin.dsl.module
|
||||
import org.koin.ksp.generated.defaultModule
|
||||
|
||||
class EmbeddedGrpcOwlProducer(
|
||||
private val config: EmbeddedGrpcOwlProducerConfig,
|
||||
|
@ -42,7 +42,7 @@ class EmbeddedGrpcOwlProducer(
|
|||
config.retryPolicyFactory
|
||||
}
|
||||
}
|
||||
modules(module, defaultModule, config.moduleContext.module())
|
||||
modules(mainModule, module, config.moduleContext.module())
|
||||
}.koin
|
||||
|
||||
application.get<OwlBrokerApplication>().start(config.port.toInt())
|
||||
|
|
|
@ -19,6 +19,7 @@ package dev.usbharu.owl.producer.embedded
|
|||
import dev.usbharu.owl.broker.OwlBrokerApplication
|
||||
import dev.usbharu.owl.broker.domain.exception.InvalidRepositoryException
|
||||
import dev.usbharu.owl.broker.domain.model.producer.ProducerRepository
|
||||
import dev.usbharu.owl.broker.mainModule
|
||||
import dev.usbharu.owl.broker.service.*
|
||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||
import dev.usbharu.owl.common.retry.RetryPolicyFactory
|
||||
|
@ -30,7 +31,6 @@ import org.koin.core.Koin
|
|||
import org.koin.core.context.GlobalContext
|
||||
import org.koin.core.context.GlobalContext.startKoin
|
||||
import org.koin.dsl.module
|
||||
import org.koin.ksp.generated.defaultModule
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition as BrokerTaskDefinition
|
||||
|
@ -60,7 +60,7 @@ class EmbeddedOwlProducer(
|
|||
embeddedOwlProducerConfig.propertySerializerFactory
|
||||
}
|
||||
}
|
||||
modules(defaultModule, module, embeddedOwlProducerConfig.moduleContext.module())
|
||||
modules(mainModule, module, embeddedOwlProducerConfig.moduleContext.module())
|
||||
}.koin
|
||||
|
||||
application.getOrNull<ProducerRepository>()
|
||||
|
|
Loading…
Reference in New Issue