diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt deleted file mode 100644 index 3ff2d4a9..00000000 --- a/consumer/src/main/kotlin/Main.kt +++ /dev/null @@ -1,131 +0,0 @@ -package dev.usbharu - -import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest -import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner -import dev.usbharu.owl.* -import dev.usbharu.owl.common.property.CustomPropertySerializerFactory -import dev.usbharu.owl.common.property.PropertySerializeUtils -import io.grpc.ManagedChannelBuilder -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import java.time.Instant -import java.util.* -import kotlin.math.max - -suspend fun main() { - - val channel = ManagedChannelBuilder.forAddress( - "localhost", 50051 - ).build() - val subscribeTaskServiceCoroutineStub = SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub(channel) - - val assignmentTaskServiceCoroutineStub = AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub( - channel - ) - - val taskResultServiceCoroutineStub = TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub(channel) - - val subscribeTask = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { - this.name = "" - this.hostname = "" - this.tasks.addAll(listOf()) - }) - - - val map = mapOf() - - val propertySerializerFactory = CustomPropertySerializerFactory(setOf()) - - val concurrent = MutableStateFlow(64) - val processing = MutableStateFlow(0) - - - coroutineScope { - launch(Dispatchers.Default) { - taskResultServiceCoroutineStub.tasKResult(flow { - assignmentTaskServiceCoroutineStub - .ready( - flow { - while (this@coroutineScope.isActive) { - - val andSet = concurrent.getAndUpdate { - 0 - } - - - if (andSet != 0) { - emit(readyRequest { - this.consumerId = subscribeTask.id - this.numberOfConcurrent = andSet - }) - continue - } - delay(100) - - concurrent.update { - ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) - } - } - } - ) - .onEach { - - processing.update { it + 1 } - - try { - - val taskResult = map[it.name]?.run( - TaskRequest( - it.name, - UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), - it.attempt, - Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), - PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) - ) - ) - - if (taskResult == null) { - throw Exception() - } - - emit(taskResult { - this.success = taskResult.success - this.attempt = it.attempt - this.id = it.id - this.result.putAll( - PropertySerializeUtils.serialize( - propertySerializerFactory, - taskResult.result - ) - ) - this.message = taskResult.message - }) - - } catch (e: Exception) { - emit(taskResult { - this.success = false - this.attempt = it.attempt - this.id = it.id - this.message = e.localizedMessage - }) - } finally { - processing.update { it - 1 } - concurrent.update { - if (it < 64) { - it + 1 - } else { - 64 - } - } - } - } - .flowOn(Dispatchers.Default) - .collect() - }) - } - } -} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt new file mode 100644 index 00000000..1ee51d76 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig +import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner +import dev.usbharu.owl.* +import dev.usbharu.owl.Uuid.UUID +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.slf4j.LoggerFactory +import java.time.Instant +import kotlin.math.max + +class Consumer( + private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, + private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, + private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, + private val runnerMap: Map, + private val propertySerializerFactory: PropertySerializerFactory, + consumerConfig: ConsumerConfig +) { + + private lateinit var consumerId: UUID + + private lateinit var coroutineScope: CoroutineScope + + private val concurrent = MutableStateFlow(consumerConfig.concurrent) + private val processing = MutableStateFlow(0) + suspend fun init(name: String, hostname: String) { + logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) + logger.debug("Registered Tasks: {}", runnerMap.keys) + consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + this.name = name + this.hostname = hostname + this.tasks.addAll(runnerMap.keys) + }).id + logger.info("Success initialize consumer. ConsumerID: {}", consumerId) + } + + suspend fun start() { + coroutineScope = CoroutineScope(Dispatchers.Default) + coroutineScope { + taskResultServiceCoroutineStub + .tasKResult(flow { + assignmentTaskServiceCoroutineStub + .ready(flow { + while (coroutineScope.isActive) { + val andSet = concurrent.getAndUpdate { 0 } + + + if (andSet != 0) { + logger.debug("Request {} tasks.", andSet) + emit(readyRequest { + this.consumerId = consumerId + this.numberOfConcurrent = andSet + }) + continue + } + delay(100) + + concurrent.update { + ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) + } + } + }).onEach { + logger.info("Start Task name: {}", it.name) + processing.update { it + 1 } + + try { + + val taskResult = runnerMap.getValue(it.name).run( + TaskRequest( + it.name, + java.util.UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), + it.attempt, + Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), + PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) + ) + ) + + emit(taskResult { + this.success = taskResult.success + this.attempt = it.attempt + this.id = it.id + this.result.putAll( + PropertySerializeUtils.serialize( + propertySerializerFactory, taskResult.result + ) + ) + this.message = taskResult.message + }) + logger.info("Success execute task. name: {} success: {}", it.name, taskResult.success) + logger.debug("TRACE RESULT {}", taskResult) + } catch (e: CancellationException) { + logger.warn("Cancelled execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + throw e + } catch (e: Exception) { + logger.warn("Failed execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + } finally { + processing.update { it - 1 } + concurrent.update { + if (it < 64) { + it + 1 + } else { + 64 + } + } + } + }.flowOn(Dispatchers.Default).collect() + }) + } + } + + fun stop() { + logger.info("Stop Consumer. consumerID: {}", consumerId) + coroutineScope.cancel() + } + + companion object { + private val logger = LoggerFactory.getLogger(Consumer::class.java) + } +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt new file mode 100644 index 00000000..52bca554 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +data class ConsumerConfig( + val concurrent:Int, +)