From 84d0aa9facec749b75570e703aa14d4d166351f4 Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Mon, 1 Apr 2024 17:14:15 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Consumer=E3=81=A7=E5=90=8C=E6=99=82?= =?UTF-8?q?=E5=AE=9F=E8=A1=8C=E6=95=B0=E3=82=92=E5=88=B6=E5=BE=A1=E3=81=A7?= =?UTF-8?q?=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/src/main/kotlin/Main.kt | 109 ++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 23 deletions(-) diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt index 3fa8f95..c3266f3 100644 --- a/consumer/src/main/kotlin/Main.kt +++ b/consumer/src/main/kotlin/Main.kt @@ -1,36 +1,99 @@ package dev.usbharu -import dev.usbharu.owl.AssignmentTaskServiceGrpcKt -import dev.usbharu.owl.readyRequest +import dev.usbharu.owl.* import io.grpc.ManagedChannelBuilder -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.isActive -import kotlinx.coroutines.withContext +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.math.max suspend fun main() { - withContext(Dispatchers.Default) { - var isReady = true - AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub( - ManagedChannelBuilder.forAddress( - "localhost", 50051 - ).build() - ).ready(flow { - while (isActive) { - if (isReady) { - emit(readyRequest { - this.consumerId - }) - } - delay(500) - } - }).onEach { - }.collect() + val channel = ManagedChannelBuilder.forAddress( + "localhost", 50051 + ).build() + val subscribeTaskServiceCoroutineStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) + val assignmentTaskServiceCoroutineStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub( + channel + ) + val taskResultServiceCoroutineStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) + + val subscribeTask = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + this.name = "" + this.hostname = "" + this.tasks.addAll(listOf()) + }) + + var concurrent = 64 + val concurrentMutex = Mutex() + var processing = 0 + val processingMutex = Mutex() + + coroutineScope { + launch(Dispatchers.Default) { + taskResultServiceCoroutineStub.tasKResult(flow { + assignmentTaskServiceCoroutineStub + .ready( + flow { + while (isActive) { + val andSet = concurrentMutex.withLock { + val andSet = concurrent + concurrent = 0 + andSet + } + if (andSet != 0) { + emit(readyRequest { + this.consumerId = subscribeTask.id + this.numberOfConcurrent = andSet + }) + continue + } + delay(100) + + val withLock = processingMutex.withLock { + processing + } + concurrentMutex.withLock { + concurrent = ((64 - concurrent) - withLock).coerceIn(0, 64 - max(0, withLock)) + } + } + } + ) + .onEach { + processingMutex.withLock { + processing++ + } + try { + emit(taskResult { + + }) + + } catch (e: Exception) { + emit(taskResult { + this.success = false + }) + } finally { + processingMutex.withLock { + processing-- + } + concurrentMutex.withLock { + if (concurrent < 64) { + concurrent++ + } else { + concurrent = 64 + } + } + } + } + .flowOn(Dispatchers.Default) + .collect() + }) + } } } \ No newline at end of file