diff --git a/consumer/build.gradle.kts b/consumer/build.gradle.kts new file mode 100644 index 00000000..4137b56a --- /dev/null +++ b/consumer/build.gradle.kts @@ -0,0 +1,54 @@ +plugins { + kotlin("jvm") + id("com.google.protobuf") version "0.9.4" +} + +group = "dev.usbharu" +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation("org.jetbrains.kotlin:kotlin-test") + implementation("io.grpc:grpc-kotlin-stub:1.4.1") + implementation("io.grpc:grpc-protobuf:1.61.1") + implementation("com.google.protobuf:protobuf-kotlin:3.25.3") + implementation("io.grpc:grpc-netty:1.61.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + implementation(project(":common")) + protobuf(files(project(":broker").dependencyProject.projectDir.toString() + "/src/main/proto")) +} + +tasks.test { + useJUnitPlatform() +} +kotlin { + jvmToolchain(17) +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:3.25.3" + } + plugins { + create("grpc") { + artifact = "io.grpc:protoc-gen-grpc-java:1.61.1" + } + create("grpckt") { + artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1:jdk8@jar" + } + } + generateProtoTasks { + all().forEach { + it.plugins { + create("grpc") + create("grpckt") + } + it.builtins { + create("kotlin") + } + } + } +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt new file mode 100644 index 00000000..1ee51d76 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/Consumer.kt @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.owl.consumer + +import dev.usbharu.dev.usbharu.owl.consumer.ConsumerConfig +import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner +import dev.usbharu.owl.* +import dev.usbharu.owl.Uuid.UUID +import dev.usbharu.owl.common.property.PropertySerializeUtils +import dev.usbharu.owl.common.property.PropertySerializerFactory +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.slf4j.LoggerFactory +import java.time.Instant +import kotlin.math.max + +class Consumer( + private val subscribeTaskServiceCoroutineStub: SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineStub, + private val assignmentTaskServiceCoroutineStub: AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineStub, + private val taskResultServiceCoroutineStub: TaskResultServiceGrpcKt.TaskResultServiceCoroutineStub, + private val runnerMap: Map, + private val propertySerializerFactory: PropertySerializerFactory, + consumerConfig: ConsumerConfig +) { + + private lateinit var consumerId: UUID + + private lateinit var coroutineScope: CoroutineScope + + private val concurrent = MutableStateFlow(consumerConfig.concurrent) + private val processing = MutableStateFlow(0) + suspend fun init(name: String, hostname: String) { + logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) + logger.debug("Registered Tasks: {}", runnerMap.keys) + consumerId = subscribeTaskServiceCoroutineStub.subscribeTask(subscribeTaskRequest { + this.name = name + this.hostname = hostname + this.tasks.addAll(runnerMap.keys) + }).id + logger.info("Success initialize consumer. ConsumerID: {}", consumerId) + } + + suspend fun start() { + coroutineScope = CoroutineScope(Dispatchers.Default) + coroutineScope { + taskResultServiceCoroutineStub + .tasKResult(flow { + assignmentTaskServiceCoroutineStub + .ready(flow { + while (coroutineScope.isActive) { + val andSet = concurrent.getAndUpdate { 0 } + + + if (andSet != 0) { + logger.debug("Request {} tasks.", andSet) + emit(readyRequest { + this.consumerId = consumerId + this.numberOfConcurrent = andSet + }) + continue + } + delay(100) + + concurrent.update { + ((64 - it) - processing.value).coerceIn(0, 64 - max(0, processing.value)) + } + } + }).onEach { + logger.info("Start Task name: {}", it.name) + processing.update { it + 1 } + + try { + + val taskResult = runnerMap.getValue(it.name).run( + TaskRequest( + it.name, + java.util.UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), + it.attempt, + Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), + PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) + ) + ) + + emit(taskResult { + this.success = taskResult.success + this.attempt = it.attempt + this.id = it.id + this.result.putAll( + PropertySerializeUtils.serialize( + propertySerializerFactory, taskResult.result + ) + ) + this.message = taskResult.message + }) + logger.info("Success execute task. name: {} success: {}", it.name, taskResult.success) + logger.debug("TRACE RESULT {}", taskResult) + } catch (e: CancellationException) { + logger.warn("Cancelled execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + throw e + } catch (e: Exception) { + logger.warn("Failed execute task.", e) + emit(taskResult { + this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage + }) + } finally { + processing.update { it - 1 } + concurrent.update { + if (it < 64) { + it + 1 + } else { + 64 + } + } + } + }.flowOn(Dispatchers.Default).collect() + }) + } + } + + fun stop() { + logger.info("Stop Consumer. consumerID: {}", consumerId) + coroutineScope.cancel() + } + + companion object { + private val logger = LoggerFactory.getLogger(Consumer::class.java) + } +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt new file mode 100644 index 00000000..52bca554 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/ConsumerConfig.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +data class ConsumerConfig( + val concurrent:Int, +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt new file mode 100644 index 00000000..d66e9a73 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +import dev.usbharu.owl.common.property.PropertyValue +import java.time.Instant +import java.util.* + +data class TaskRequest( + val name:String, + val id:UUID, + val attempt:Int, + val queuedAt: Instant, + val properties:Map> +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt new file mode 100644 index 00000000..20858a9b --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +import dev.usbharu.owl.common.property.PropertyValue + +data class TaskResult( + val success: Boolean, + val result: Map>, + val message: String +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt new file mode 100644 index 00000000..44513ec1 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +fun interface TaskRunner { + suspend fun run(taskRequest: TaskRequest):TaskResult +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index f447b5a2..87d7071c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -10,3 +10,4 @@ include("broker:broker-mongodb") findProject(":broker:broker-mongodb")?.name = "broker-mongodb" include("producer:default") findProject(":producer:default")?.name = "default" +include("consumer")