consumer #2

Merged
usbharu merged 10 commits from consumer into master 2024-04-30 09:26:10 +00:00
6 changed files with 152 additions and 47 deletions
Showing only changes of commit 5a589dbab5 - Show all commits

View File

@ -16,7 +16,6 @@
package dev.usbharu.owl.consumer
import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig
import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest
import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner
import dev.usbharu.owl.*
@ -30,9 +29,9 @@ import java.time.Instant
import kotlin.math.max
class Consumer(
private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub,
private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub,
private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub,
private val subscribeTaskStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub,
private val assignmentTaskStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub,
private val taskResultStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub,
private val runnerMap: Map<String, TaskRunner>,
private val propertySerializerFactory: PropertySerializerFactory,
consumerConfig: ConsumerConfig
@ -47,7 +46,7 @@ class Consumer(
suspend fun init(name: String, hostname: String) {
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
logger.debug("Registered Tasks: {}", runnerMap.keys)
consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest {
consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest {
this.name = name
this.hostname = hostname
this.tasks.addAll(runnerMap.keys)
@ -58,9 +57,9 @@ class Consumer(
suspend fun start() {
coroutineScope = CoroutineScope(Dispatchers.Default)
coroutineScope {
taskResultServiceCoroutineStub
taskResultStub
.tasKResult(flow {
assignmentTaskServiceCoroutineStub
assignmentTaskStub
.ready(flow {
while (coroutineScope.isActive) {
val andSet = concurrent.getAndUpdate { 0 }

View File

@ -14,10 +14,8 @@
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
package dev.usbharu.owl.consumer
data class ConsumerConfig(
val concurrent:Int,
val address: String,
val port: Int
val concurrent: Int
)

View File

@ -16,47 +16,14 @@
package dev.usbharu.owl.consumer
import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig
import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
import dev.usbharu.owl.SubscribeTaskServiceGrpcKt
import dev.usbharu.owl.TaskResultServiceGrpcKt
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.runBlocking
import java.util.*
fun main() {
val consumerConfig = ConsumerConfig(20, "localhost", 50051)
val channel = ManagedChannelBuilder.forAddress(consumerConfig.address, consumerConfig.port).usePlaintext().build()
val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel)
val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel)
val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel)
val customPropertySerializerFactory = CustomPropertySerializerFactory(emptySet())
val taskRunnerMap = ServiceLoader
.load(TaskRunner::class.java)
.associateBy { it::class.qualifiedName!! }
.filterNot { it.key.isBlank() }
val consumer = Consumer(
subscribeStub,
assignmentTaskStub,
taskResultStub,
taskRunnerMap,
customPropertySerializerFactory,
consumerConfig
)
val standaloneConsumer = StandaloneConsumer()
runBlocking {
consumer.init("consumer", "consumer-1")
consumer.start()
Runtime.getRuntime().addShutdownHook(Thread {
consumer.stop()
})
standaloneConsumer.init()
standaloneConsumer.start()
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.consumer
import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
import dev.usbharu.owl.SubscribeTaskServiceGrpcKt
import dev.usbharu.owl.TaskResultServiceGrpcKt
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
import dev.usbharu.owl.common.property.PropertySerializerFactory
import io.grpc.ManagedChannelBuilder
import java.nio.file.Path
import java.util.*
class StandaloneConsumer(
private val config: StandaloneConsumerConfig,
private val propertySerializerFactory: PropertySerializerFactory
) {
constructor(
path: Path, propertySerializerFactory: PropertySerializerFactory = CustomPropertySerializerFactory(
emptySet()
)
) : this(StandaloneConsumerConfigLoader.load(path), propertySerializerFactory)
constructor(string: String) : this(Path.of(string))
constructor() : this(Path.of("consumer.properties"))
private val channel = ManagedChannelBuilder.forAddress(config.address, config.port)
.usePlaintext()
.build()
private val subscribeStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel)
private val assignmentTaskStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(channel)
private val taskResultStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel)
private val taskRunnerMap = ServiceLoader
.load(TaskRunner::class.java)
.associateBy { it::class.qualifiedName!! }
.filterNot { it.key.isBlank() }
private val consumer = Consumer(
subscribeStub,
assignmentTaskStub,
taskResultStub,
taskRunnerMap,
propertySerializerFactory,
ConsumerConfig(config.concurrency)
)
suspend fun init() {
consumer.init(config.name, config.hostname)
}
suspend fun start() {
consumer.start()
Runtime.getRuntime().addShutdownHook(Thread {
consumer.stop()
})
}
fun stop() {
consumer.stop()
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.consumer
data class StandaloneConsumerConfig(
val address: String,
val port: Int,
val name: String,
val hostname: String,
val concurrency: Int,
)

View File

@ -0,0 +1,36 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.consumer
import java.nio.file.Files
import java.nio.file.Path
import java.util.*
object StandaloneConsumerConfigLoader {
fun load(path: Path): StandaloneConsumerConfig {
val properties = Properties()
properties.load(Files.newInputStream(path))
val address = properties.getProperty("address")
val port = properties.getProperty("port").toInt()
val name = properties.getProperty("name")
val hostname = properties.getProperty("hostname")
return StandaloneConsumerConfig(address, port, name, hostname)
}
}