From f3a1761f1df5597c333dba1708881101abe94a36 Mon Sep 17 00:00:00 2001 From: usbharu Date: Mon, 1 Apr 2024 15:22:31 +0900 Subject: [PATCH 1/5] feat: wip consumer --- consumer/build.gradle.kts | 54 ++++++++++++++++++++++++++++++++ consumer/src/main/kotlin/Main.kt | 36 +++++++++++++++++++++ settings.gradle.kts | 1 + 3 files changed, 91 insertions(+) create mode 100644 consumer/build.gradle.kts create mode 100644 consumer/src/main/kotlin/Main.kt diff --git a/consumer/build.gradle.kts b/consumer/build.gradle.kts new file mode 100644 index 00000000..4137b56a --- /dev/null +++ b/consumer/build.gradle.kts @@ -0,0 +1,54 @@ +plugins { + kotlin("jvm") + id("com.google.protobuf") version "0.9.4" +} + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation("org.jetbrains.kotlin:kotlin-test") + implementation("io.grpc:grpc-kotlin-stub:1.4.1") + implementation("io.grpc:grpc-protobuf:1.61.1") + implementation("com.google.protobuf:protobuf-kotlin:3.25.3") + implementation("io.grpc:grpc-netty:1.61.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + implementation(project(":common")) + protobuf(files(project(":broker").dependencyProject.projectDir.toString() + "/src/main/proto")) +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:3.25.3" + } + plugins { + create("grpc") { + artifact = "io.grpc:protoc-gen-grpc-java:1.61.1" + } + create("grpckt") { + artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1:jdk8@jar" + } + } + generateProtoTasks { + all().forEach { + it.plugins { + create("grpc") + create("grpckt") + } + it.builtins { + create("kotlin") + } + } + } +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt new file mode 100644 index 00000000..3fa8f959 --- /dev/null +++ b/consumer/src/main/kotlin/Main.kt @@ -0,0 +1,36 @@ +package dev.usbharu + +import dev.usbharu.owl.AssignmentTaskServiceGrpcKt +import dev.usbharu.owl.readyRequest +import io.grpc.ManagedChannelBuilder +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.isActive +import kotlinx.coroutines.withContext + +suspend fun main() { + withContext(Dispatchers.Default) { + var isReady = true + AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub( + ManagedChannelBuilder.forAddress( + "localhost", 50051 + ).build() + ).ready(flow { + while (isActive) { + if (isReady) { + emit(readyRequest { + this.consumerId + }) + } + delay(500) + } + }).onEach { + + }.collect() + + + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index f447b5a2..87d7071c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -10,3 +10,4 @@ include("broker:broker-mongodb") findProject(":broker:broker-mongodb")?.name = "broker-mongodb" include("producer:default") findProject(":producer:default")?.name = "default" +include("consumer") From 2e68e2ab1b346d629677fc53594b3a171fa57605 Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Mon, 1 Apr 2024 17:14:15 +0900 Subject: [PATCH 2/5] =?UTF-8?q?feat:=20Consumer=E3=81=A7=E5=90=8C=E6=99=82?= =?UTF-8?q?=E5=AE=9F=E8=A1=8C=E6=95=B0=E3=82=92=E5=88=B6=E5=BE=A1=E3=81=A7?= =?UTF-8?q?=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/src/main/kotlin/Main.kt | 109 ++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 23 deletions(-) diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt index 3fa8f959..c3266f38 100644 --- a/consumer/src/main/kotlin/Main.kt +++ b/consumer/src/main/kotlin/Main.kt @@ -1,36 +1,99 @@ package dev.usbharu -import dev.usbharu.owl.AssignmentTaskServiceGrpcKt -import dev.usbharu.owl.readyRequest +import dev.usbharu.owl.* import io.grpc.ManagedChannelBuilder -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.isActive -import kotlinx.coroutines.withContext +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.math.max suspend fun main() { - withContext(Dispatchers.Default) { - var isReady = true - AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub( - ManagedChannelBuilder.forAddress( - "localhost", 50051 - ).build() - ).ready(flow { - while (isActive) { - if (isReady) { - emit(readyRequest { - this.consumerId - }) - } - delay(500) - } - }).onEach { - }.collect() + val channel = ManagedChannelBuilder.forAddress( + "localhost", 50051 + ).build() + val subscribeTaskServiceCoroutineStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) + val assignmentTaskServiceCoroutineStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub( + channel + ) + val taskResultServiceCoroutineStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) + + val subscribeTask = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + this.name = "" + this.hostname = "" + this.tasks.addAll(listOf()) + }) + + var concurrent = 64 + val concurrentMutex = Mutex() + var processing = 0 + val processingMutex = Mutex() + + coroutineScope { + launch(Dispatchers.Default) { + taskResultServiceCoroutineStub.tasKResult(flow { + assignmentTaskServiceCoroutineStub + .ready( + flow { + while (isActive) { + val andSet = concurrentMutex.withLock { + val andSet = concurrent + concurrent = 0 + andSet + } + if (andSet != 0) { + emit(readyRequest { + this.consumerId = subscribeTask.id + this.numberOfConcurrent = andSet + }) + continue + } + delay(100) + + val withLock = processingMutex.withLock { + processing + } + concurrentMutex.withLock { + concurrent = ((64 - concurrent) - withLock).coerceIn(0, 64 - max(0, withLock)) + } + } + } + ) + .onEach { + processingMutex.withLock { + processing++ + } + try { + emit(taskResult { + + }) + + } catch (e: Exception) { + emit(taskResult { + this.success = false + }) + } finally { + processingMutex.withLock { + processing-- + } + concurrentMutex.withLock { + if (concurrent < 64) { + concurrent++ + } else { + concurrent = 64 + } + } + } + } + .flowOn(Dispatchers.Default) + .collect() + }) + } } } \ No newline at end of file From 74dbd04f0bae0accbc55de46c5b280128a99da8d Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Mon, 1 Apr 2024 17:41:53 +0900 Subject: [PATCH 3/5] =?UTF-8?q?feat:=20Mutex=E3=81=A7=E3=81=AF=E3=81=AA?= =?UTF-8?q?=E3=81=8FStateFlow=E3=82=92=E4=BD=BF=E3=81=86=E3=82=88=E3=81=86?= =?UTF-8?q?=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/src/main/kotlin/Main.kt | 56 ++++++++++++++------------------ 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt index c3266f38..e805e8d1 100644 --- a/consumer/src/main/kotlin/Main.kt +++ b/consumer/src/main/kotlin/Main.kt @@ -2,13 +2,12 @@ package dev.usbharu import dev.usbharu.owl.* import io.grpc.ManagedChannelBuilder -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import kotlin.math.max suspend fun main() { @@ -30,10 +29,9 @@ suspend fun main() { this.tasks.addAll(listOf()) }) - var concurrent = 64 - val concurrentMutex = Mutex() - var processing = 0 - val processingMutex = Mutex() + val concurrent = MutableStateFlow(64) + val processing = MutableStateFlow(0) + coroutineScope { launch(Dispatchers.Default) { @@ -41,12 +39,13 @@ suspend fun main() { assignmentTaskServiceCoroutineStub .ready( flow { - while (isActive) { - val andSet = concurrentMutex.withLock { - val andSet = concurrent - concurrent = 0 - andSet + while (this@coroutineScope.isActive) { + + val andSet = concurrent.getAndUpdate { + 0 } + + if (andSet != 0) { emit(readyRequest { this.consumerId = subscribeTask.id @@ -56,19 +55,16 @@ suspend fun main() { } delay(100) - val withLock = processingMutex.withLock { - processing - } - concurrentMutex.withLock { - concurrent = ((64 - concurrent) - withLock).coerceIn(0, 64 - max(0, withLock)) + concurrent.update { + ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) } } } ) .onEach { - processingMutex.withLock { - processing++ - } + + processing.update { it + 1 } + try { emit(taskResult { @@ -79,14 +75,12 @@ suspend fun main() { this.success = false }) } finally { - processingMutex.withLock { - processing-- - } - concurrentMutex.withLock { - if (concurrent < 64) { - concurrent++ + processing.update { it - 1 } + concurrent.update { + if (it < 64) { + it + 1 } else { - concurrent = 64 + 64 } } } From 3e23662b95e1aa7ceb871e072d90262914abc80f Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Wed, 3 Apr 2024 12:50:49 +0900 Subject: [PATCH 4/5] =?UTF-8?q?feat:=20=E3=82=BF=E3=82=B9=E3=82=AF?= =?UTF-8?q?=E5=AE=9F=E8=A1=8C=E9=83=A8=E5=88=86=E3=82=92=E4=BD=9C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/src/main/kotlin/Main.kt | 40 ++++++++++++++++++- .../dev/usbharu/owl/consumer/TaskRequest.kt | 29 ++++++++++++++ .../dev/usbharu/owl/consumer/TaskResult.kt | 25 ++++++++++++ .../dev/usbharu/owl/consumer/TaskRunner.kt | 21 ++++++++++ 4 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt index e805e8d1..3ff2d4a9 100644 --- a/consumer/src/main/kotlin/Main.kt +++ b/consumer/src/main/kotlin/Main.kt @@ -1,6 +1,10 @@ package dev.usbharu +import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.* +import dev.usbharu.owl.common.property.CustomPropertySerializerFactory +import dev.usbharu.owl.common.property.PropertySerializeUtils import io.grpc.ManagedChannelBuilder import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope @@ -8,6 +12,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import java.time.Instant +import java.util.* import kotlin.math.max suspend fun main() { @@ -29,6 +35,11 @@ suspend fun main() { this.tasks.addAll(listOf()) }) + + val map = mapOf() + + val propertySerializerFactory = CustomPropertySerializerFactory(setOf()) + val concurrent = MutableStateFlow(64) val processing = MutableStateFlow(0) @@ -66,13 +77,40 @@ suspend fun main() { processing.update { it + 1 } try { - emit(taskResult { + val taskResult = map[it.name]?.run( + TaskRequest( + it.name, + UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), + it.attempt, + Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), + PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) + ) + ) + + if (taskResult == null) { + throw Exception() + } + + emit(taskResult { + this.success = taskResult.success + this.attempt = it.attempt + this.id = it.id + this.result.putAll( + PropertySerializeUtils.serialize( + propertySerializerFactory, + taskResult.result + ) + ) + this.message = taskResult.message }) } catch (e: Exception) { emit(taskResult { this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage }) } finally { processing.update { it - 1 } diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt new file mode 100644 index 00000000..d66e9a73 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +import dev.usbharu.owl.common.property.PropertyValue +import java.time.Instant +import java.util.* + +data class TaskRequest( + val name:String, + val id:UUID, + val attempt:Int, + val queuedAt: Instant, + val properties:Map> +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt new file mode 100644 index 00000000..20858a9b --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +import dev.usbharu.owl.common.property.PropertyValue + +data class TaskResult( + val success: Boolean, + val result: Map>, + val message: String +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt new file mode 100644 index 00000000..44513ec1 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +fun interface TaskRunner { + suspend fun run(taskRequest: TaskRequest):TaskResult +} \ No newline at end of file From df35023c02565d2e1210ab46ebb0be2027b79f15 Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Wed, 3 Apr 2024 13:36:16 +0900 Subject: [PATCH 5/5] =?UTF-8?q?feat:=20Consumer=E3=82=92=E4=BD=9C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/src/main/kotlin/Main.kt | 131 --------------- .../dev/usbharu/owl/consumer/Consumer.kt | 152 ++++++++++++++++++ .../usbharu/owl/consumer/ConsumerConfig.kt | 21 +++ 3 files changed, 173 insertions(+), 131 deletions(-) delete mode 100644 consumer/src/main/kotlin/Main.kt create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt create mode 100644 consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt deleted file mode 100644 index 3ff2d4a9..00000000 --- a/consumer/src/main/kotlin/Main.kt +++ /dev/null @@ -1,131 +0,0 @@ -package dev.usbharu - -import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest -import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner -import dev.usbharu.owl.* -import dev.usbharu.owl.common.property.CustomPropertySerializerFactory -import dev.usbharu.owl.common.property.PropertySerializeUtils -import io.grpc.ManagedChannelBuilder -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import java.time.Instant -import java.util.* -import kotlin.math.max - -suspend fun main() { - - val channel = ManagedChannelBuilder.forAddress( - "localhost", 50051 - ).build() - val subscribeTaskServiceCoroutineStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) - - val assignmentTaskServiceCoroutineStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub( - channel - ) - - val taskResultServiceCoroutineStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) - - val subscribeTask = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { - this.name = "" - this.hostname = "" - this.tasks.addAll(listOf()) - }) - - - val map = mapOf() - - val propertySerializerFactory = CustomPropertySerializerFactory(setOf()) - - val concurrent = MutableStateFlow(64) - val processing = MutableStateFlow(0) - - - coroutineScope { - launch(Dispatchers.Default) { - taskResultServiceCoroutineStub.tasKResult(flow { - assignmentTaskServiceCoroutineStub - .ready( - flow { - while (this@coroutineScope.isActive) { - - val andSet = concurrent.getAndUpdate { - 0 - } - - - if (andSet != 0) { - emit(readyRequest { - this.consumerId = subscribeTask.id - this.numberOfConcurrent = andSet - }) - continue - } - delay(100) - - concurrent.update { - ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) - } - } - } - ) - .onEach { - - processing.update { it + 1 } - - try { - - val taskResult = map[it.name]?.run( - TaskRequest( - it.name, - UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), - it.attempt, - Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), - PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) - ) - ) - - if (taskResult == null) { - throw Exception() - } - - emit(taskResult { - this.success = taskResult.success - this.attempt = it.attempt - this.id = it.id - this.result.putAll( - PropertySerializeUtils.serialize( - propertySerializerFactory, - taskResult.result - ) - ) - this.message = taskResult.message - }) - - } catch (e: Exception) { - emit(taskResult { - this.success = false - this.attempt = it.attempt - this.id = it.id - this.message = e.localizedMessage - }) - } finally { - processing.update { it - 1 } - concurrent.update { - if (it < 64) { - it + 1 - } else { - 64 - } - } - } - } - .flowOn(Dispatchers.Default) - .collect() - }) - } - } -} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt new file mode 100644 index 00000000..1ee51d76 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig +import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner +import dev.usbharu.owl.* +import dev.usbharu.owl.Uuid.UUID +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.slf4j.LoggerFactory +import java.time.Instant +import kotlin.math.max + +class Consumer( + private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, + private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, + private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, + private val runnerMap: Map, + private val propertySerializerFactory: PropertySerializerFactory, + consumerConfig: ConsumerConfig +) { + + private lateinit var consumerId: UUID + + private lateinit var coroutineScope: CoroutineScope + + private val concurrent = MutableStateFlow(consumerConfig.concurrent) + private val processing = MutableStateFlow(0) + suspend fun init(name: String, hostname: String) { + logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) + logger.debug("Registered Tasks: {}", runnerMap.keys) + consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + this.name = name + this.hostname = hostname + this.tasks.addAll(runnerMap.keys) + }).id + logger.info("Success initialize consumer. ConsumerID: {}", consumerId) + } + + suspend fun start() { + coroutineScope = CoroutineScope(Dispatchers.Default) + coroutineScope { + taskResultServiceCoroutineStub + .tasKResult(flow { + assignmentTaskServiceCoroutineStub + .ready(flow { + while (coroutineScope.isActive) { + val andSet = concurrent.getAndUpdate { 0 } + + + if (andSet != 0) { + logger.debug("Request {} tasks.", andSet) + emit(readyRequest { + this.consumerId = consumerId + this.numberOfConcurrent = andSet + }) + continue + } + delay(100) + + concurrent.update { + ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) + } + } + }).onEach { + logger.info("Start Task name: {}", it.name) + processing.update { it + 1 } + + try { + + val taskResult = runnerMap.getValue(it.name).run( + TaskRequest( + it.name, + java.util.UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), + it.attempt, + Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), + PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) + ) + ) + + emit(taskResult { + this.success = taskResult.success + this.attempt = it.attempt + this.id = it.id + this.result.putAll( + PropertySerializeUtils.serialize( + propertySerializerFactory, taskResult.result + ) + ) + this.message = taskResult.message + }) + logger.info("Success execute task. name: {} success: {}", it.name, taskResult.success) + logger.debug("TRACE RESULT {}", taskResult) + } catch (e: CancellationException) { + logger.warn("Cancelled execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + throw e + } catch (e: Exception) { + logger.warn("Failed execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + } finally { + processing.update { it - 1 } + concurrent.update { + if (it < 64) { + it + 1 + } else { + 64 + } + } + } + }.flowOn(Dispatchers.Default).collect() + }) + } + } + + fun stop() { + logger.info("Stop Consumer. consumerID: {}", consumerId) + coroutineScope.cancel() + } + + companion object { + private val logger = LoggerFactory.getLogger(Consumer::class.java) + } +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt new file mode 100644 index 00000000..52bca554 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +data class ConsumerConfig( + val concurrent:Int, +)