Merge pull request 'consumer' (#1) from consumer into master

Reviewed-on: #1
This commit is contained in:
usbharu 2024-04-03 04:47:44 +00:00
commit 8df9a55a54
7 changed files with 303 additions and 0 deletions

54
consumer/build.gradle.kts Normal file
View File

@ -0,0 +1,54 @@
plugins {
kotlin("jvm")
id("com.google.protobuf") version "0.9.4"
}
group = "dev.usbharu"
version = "0.0.1"
repositories {
mavenCentral()
}
dependencies {
testImplementation("org.jetbrains.kotlin:kotlin-test")
implementation("io.grpc:grpc-kotlin-stub:1.4.1")
implementation("io.grpc:grpc-protobuf:1.61.1")
implementation("com.google.protobuf:protobuf-kotlin:3.25.3")
implementation("io.grpc:grpc-netty:1.61.1")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
implementation(project(":common"))
protobuf(files(project(":broker").dependencyProject.projectDir.toString() + "/src/main/proto"))
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(17)
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.3"
}
plugins {
create("grpc") {
artifact = "io.grpc:protoc-gen-grpc-java:1.61.1"
}
create("grpckt") {
artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1:jdk8@jar"
}
}
generateProtoTasks {
all().forEach {
it.plugins {
create("grpc")
create("grpckt")
}
it.builtins {
create("kotlin")
}
}
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.consumer
import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig
import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest
import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner
import dev.usbharu.owl.*
import dev.usbharu.owl.Uuid.UUID
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.slf4j.LoggerFactory
import java.time.Instant
import kotlin.math.max
class Consumer(
private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub,
private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub,
private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub,
private val runnerMap: Map<String, TaskRunner>,
private val propertySerializerFactory: PropertySerializerFactory,
consumerConfig: ConsumerConfig
) {
private lateinit var consumerId: UUID
private lateinit var coroutineScope: CoroutineScope
private val concurrent = MutableStateFlow(consumerConfig.concurrent)
private val processing = MutableStateFlow(0)
suspend fun init(name: String, hostname: String) {
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
logger.debug("Registered Tasks: {}", runnerMap.keys)
consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest {
this.name = name
this.hostname = hostname
this.tasks.addAll(runnerMap.keys)
}).id
logger.info("Success initialize consumer. ConsumerID: {}", consumerId)
}
suspend fun start() {
coroutineScope = CoroutineScope(Dispatchers.Default)
coroutineScope {
taskResultServiceCoroutineStub
.tasKResult(flow {
assignmentTaskServiceCoroutineStub
.ready(flow {
while (coroutineScope.isActive) {
val andSet = concurrent.getAndUpdate { 0 }
if (andSet != 0) {
logger.debug("Request {} tasks.", andSet)
emit(readyRequest {
this.consumerId = consumerId
this.numberOfConcurrent = andSet
})
continue
}
delay(100)
concurrent.update {
((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value))
}
}
}).onEach {
logger.info("Start Task name: {}", it.name)
processing.update { it + 1 }
try {
val taskResult = runnerMap.getValue(it.name).run(
TaskRequest(
it.name,
java.util.UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits),
it.attempt,
Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()),
PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap)
)
)
emit(taskResult {
this.success = taskResult.success
this.attempt = it.attempt
this.id = it.id
this.result.putAll(
PropertySerializeUtils.serialize(
propertySerializerFactory, taskResult.result
)
)
this.message = taskResult.message
})
logger.info("Success execute task. name: {} success: {}", it.name, taskResult.success)
logger.debug("TRACE RESULT {}", taskResult)
} catch (e: CancellationException) {
logger.warn("Cancelled execute task.", e)
emit(taskResult {
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
})
throw e
} catch (e: Exception) {
logger.warn("Failed execute task.", e)
emit(taskResult {
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
})
} finally {
processing.update { it - 1 }
concurrent.update {
if (it < 64) {
it + 1
} else {
64
}
}
}
}.flowOn(Dispatchers.Default).collect()
})
}
}
fun stop() {
logger.info("Stop Consumer. consumerID: {}", consumerId)
coroutineScope.cancel()
}
companion object {
private val logger = LoggerFactory.getLogger(Consumer::class.java)
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
data class ConsumerConfig(
val concurrent:Int,
)

View File

@ -0,0 +1,29 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
import dev.usbharu.owl.common.property.PropertyValue
import java.time.Instant
import java.util.*
data class TaskRequest(
val name:String,
val id:UUID,
val attempt:Int,
val queuedAt: Instant,
val properties:Map<String,PropertyValue<*>>
)

View File

@ -0,0 +1,25 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
import dev.usbharu.owl.common.property.PropertyValue
data class TaskResult(
val success: Boolean,
val result: Map<String, PropertyValue<*>>,
val message: String
)

View File

@ -0,0 +1,21 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
fun interface TaskRunner {
suspend fun run(taskRequest: TaskRequest):TaskResult
}

View File

@ -10,3 +10,4 @@ include("broker:broker-mongodb")
findProject(":broker:broker-mongodb")?.name = "broker-mongodb"
include("producer:default")
findProject(":producer:default")?.name = "default"
include("consumer")