consumer #1

Merged
usbharu merged 5 commits from consumer into master 2024-04-03 04:47:45 +00:00
1 changed files with 25 additions and 31 deletions
Showing only changes of commit 68926e3dbd - Show all commits

View File

@ -2,13 +2,12 @@ package dev.usbharu
import dev.usbharu.owl.*
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlin.math.max
suspend fun main() {
@ -30,10 +29,9 @@ suspend fun main() {
this.tasks.addAll(listOf())
})
var concurrent = 64
val concurrentMutex = Mutex()
var processing = 0
val processingMutex = Mutex()
val concurrent = MutableStateFlow(64)
val processing = MutableStateFlow(0)
coroutineScope {
launch(Dispatchers.Default) {
@ -41,12 +39,13 @@ suspend fun main() {
assignmentTaskServiceCoroutineStub
.ready(
flow {
while (isActive) {
val andSet = concurrentMutex.withLock {
val andSet = concurrent
concurrent = 0
andSet
while (this@coroutineScope.isActive) {
val andSet = concurrent.getAndUpdate {
0
}
if (andSet != 0) {
emit(readyRequest {
this.consumerId = subscribeTask.id
@ -56,19 +55,16 @@ suspend fun main() {
}
delay(100)
val withLock = processingMutex.withLock {
processing
}
concurrentMutex.withLock {
concurrent = ((64 - concurrent) - withLock).coerceIn(0, 64 - max(0, withLock))
concurrent.update {
((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value))
}
}
}
)
.onEach {
processingMutex.withLock {
processing++
}
processing.update { it + 1 }
try {
emit(taskResult {
@ -79,14 +75,12 @@ suspend fun main() {
this.success = false
})
} finally {
processingMutex.withLock {
processing--
}
concurrentMutex.withLock {
if (concurrent < 64) {
concurrent++
processing.update { it - 1 }
concurrent.update {
if (it < 64) {
it + 1
} else {
concurrent = 64
64
}
}
}