feat: Consumerで同時実行数を制御できるように

This commit is contained in:
usbharu 2024-04-01 17:14:15 +09:00
parent 6a3551f3b4
commit 84d0aa9fac
Signed by: usbharu
GPG Key ID: 6556747BF94EEBC8
1 changed files with 86 additions and 23 deletions

View File

@ -1,36 +1,99 @@
package dev.usbharu package dev.usbharu
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt import dev.usbharu.owl.*
import dev.usbharu.owl.readyRequest
import io.grpc.ManagedChannelBuilder import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.withContext import kotlinx.coroutines.sync.withLock
import kotlin.math.max
suspend fun main() { suspend fun main() {
withContext(Dispatchers.Default) {
var isReady = true
AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(
ManagedChannelBuilder.forAddress(
"localhost", 50051
).build()
).ready(flow {
while (isActive) {
if (isReady) {
emit(readyRequest {
this.consumerId
})
}
delay(500)
}
}).onEach {
}.collect() val channel = ManagedChannelBuilder.forAddress(
"localhost", 50051
).build()
val subscribeTaskServiceCoroutineStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel)
val assignmentTaskServiceCoroutineStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub(
channel
)
val taskResultServiceCoroutineStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel)
val subscribeTask = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest {
this.name = ""
this.hostname = ""
this.tasks.addAll(listOf())
})
var concurrent = 64
val concurrentMutex = Mutex()
var processing = 0
val processingMutex = Mutex()
coroutineScope {
launch(Dispatchers.Default) {
taskResultServiceCoroutineStub.tasKResult(flow {
assignmentTaskServiceCoroutineStub
.ready(
flow {
while (isActive) {
val andSet = concurrentMutex.withLock {
val andSet = concurrent
concurrent = 0
andSet
}
if (andSet != 0) {
emit(readyRequest {
this.consumerId = subscribeTask.id
this.numberOfConcurrent = andSet
})
continue
}
delay(100)
val withLock = processingMutex.withLock {
processing
}
concurrentMutex.withLock {
concurrent = ((64 - concurrent) - withLock).coerceIn(0, 64 - max(0, withLock))
}
}
}
)
.onEach {
processingMutex.withLock {
processing++
}
try {
emit(taskResult {
})
} catch (e: Exception) {
emit(taskResult {
this.success = false
})
} finally {
processingMutex.withLock {
processing--
}
concurrentMutex.withLock {
if (concurrent < 64) {
concurrent++
} else {
concurrent = 64
}
}
}
}
.flowOn(Dispatchers.Default)
.collect()
})
}
} }
} }