From 68926e3dbd9826c1665ac1f44e1712f91df44271 Mon Sep 17 00:00:00 2001 From: usbharu <64310155+usbharu@users.noreply.github.com> Date: Mon, 1 Apr 2024 17:41:53 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Mutex=E3=81=A7=E3=81=AF=E3=81=AA?= =?UTF-8?q?=E3=81=8FStateFlow=E3=82=92=E4=BD=BF=E3=81=86=E3=82=88=E3=81=86?= =?UTF-8?q?=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumer/src/main/kotlin/Main.kt | 56 ++++++++++++++------------------ 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt index c3266f3..e805e8d 100644 --- a/consumer/src/main/kotlin/Main.kt +++ b/consumer/src/main/kotlin/Main.kt @@ -2,13 +2,12 @@ package dev.usbharu import dev.usbharu.owl.* import io.grpc.ManagedChannelBuilder -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import kotlin.math.max suspend fun main() { @@ -30,10 +29,9 @@ suspend fun main() { this.tasks.addAll(listOf()) }) - var concurrent = 64 - val concurrentMutex = Mutex() - var processing = 0 - val processingMutex = Mutex() + val concurrent = MutableStateFlow(64) + val processing = MutableStateFlow(0) + coroutineScope { launch(Dispatchers.Default) { @@ -41,12 +39,13 @@ suspend fun main() { assignmentTaskServiceCoroutineStub .ready( flow { - while (isActive) { - val andSet = concurrentMutex.withLock { - val andSet = concurrent - concurrent = 0 - andSet + while (this@coroutineScope.isActive) { + + val andSet = concurrent.getAndUpdate { + 0 } + + if (andSet != 0) { emit(readyRequest { this.consumerId = subscribeTask.id @@ -56,19 +55,16 @@ suspend fun main() { } delay(100) - val withLock = processingMutex.withLock { - processing - } - concurrentMutex.withLock { - concurrent = ((64 - concurrent) - withLock).coerceIn(0, 64 - max(0, withLock)) + concurrent.update { + ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) } } } ) .onEach { - processingMutex.withLock { - processing++ - } + + processing.update { it + 1 } + try { emit(taskResult { @@ -79,14 +75,12 @@ suspend fun main() { this.success = false }) } finally { - processingMutex.withLock { - processing-- - } - concurrentMutex.withLock { - if (concurrent < 64) { - concurrent++ + processing.update { it - 1 } + concurrent.update { + if (it < 64) { + it + 1 } else { - concurrent = 64 + 64 } } }