diff --git a/consumer/src/main/kotlin/Main.kt b/consumer/src/main/kotlin/Main.kt index e805e8d1..3ff2d4a9 100644 --- a/consumer/src/main/kotlin/Main.kt +++ b/consumer/src/main/kotlin/Main.kt @@ -1,6 +1,10 @@ package dev.usbharu +import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest +import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner import dev.usbharu.owl.* +import dev.usbharu.owl.common.property.CustomPropertySerializerFactory +import dev.usbharu.owl.common.property.PropertySerializeUtils import io.grpc.ManagedChannelBuilder import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope @@ -8,6 +12,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import java.time.Instant +import java.util.* import kotlin.math.max suspend fun main() { @@ -29,6 +35,11 @@ suspend fun main() { this.tasks.addAll(listOf()) }) + + val map = mapOf() + + val propertySerializerFactory = CustomPropertySerializerFactory(setOf()) + val concurrent = MutableStateFlow(64) val processing = MutableStateFlow(0) @@ -66,13 +77,40 @@ suspend fun main() { processing.update { it + 1 } try { - emit(taskResult { + val taskResult = map[it.name]?.run( + TaskRequest( + it.name, + UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits), + it.attempt, + Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()), + PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap) + ) + ) + + if (taskResult == null) { + throw Exception() + } + + emit(taskResult { + this.success = taskResult.success + this.attempt = it.attempt + this.id = it.id + this.result.putAll( + PropertySerializeUtils.serialize( + propertySerializerFactory, + taskResult.result + ) + ) + this.message = taskResult.message }) } catch (e: Exception) { emit(taskResult { this.success = false + this.attempt = it.attempt + this.id = it.id + this.message = e.localizedMessage }) } finally { processing.update { it - 1 } diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt new file mode 100644 index 00000000..d66e9a73 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRequest.kt @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +import dev.usbharu.owl.common.property.PropertyValue +import java.time.Instant +import java.util.* + +data class TaskRequest( + val name:String, + val id:UUID, + val attempt:Int, + val queuedAt: Instant, + val properties:Map> +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt new file mode 100644 index 00000000..20858a9b --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskResult.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +import dev.usbharu.owl.common.property.PropertyValue + +data class TaskResult( + val success: Boolean, + val result: Map>, + val message: String +) diff --git a/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt new file mode 100644 index 00000000..44513ec1 --- /dev/null +++ b/consumer/src/main/kotlin/dev/usbharu/owl/consumer/TaskRunner.kt @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2024 usbharu + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.usbharu.dev.usbharu.owl.consumer + +fun interface TaskRunner { + suspend fun run(taskRequest: TaskRequest):TaskResult +} \ No newline at end of file