feat: タスク実行部分を作成

This commit is contained in:
usbharu 2024-04-03 12:50:49 +09:00
parent 74dbd04f0b
commit 3e23662b95
4 changed files with 114 additions and 1 deletions

View File

@ -1,6 +1,10 @@
package dev.usbharu package dev.usbharu
import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest
import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner
import dev.usbharu.owl.* import dev.usbharu.owl.*
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
import dev.usbharu.owl.common.property.PropertySerializeUtils
import io.grpc.ManagedChannelBuilder import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
@ -8,6 +12,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import java.time.Instant
import java.util.*
import kotlin.math.max import kotlin.math.max
suspend fun main() { suspend fun main() {
@ -29,6 +35,11 @@ suspend fun main() {
this.tasks.addAll(listOf()) this.tasks.addAll(listOf())
}) })
val map = mapOf<String, TaskRunner>()
val propertySerializerFactory = CustomPropertySerializerFactory(setOf())
val concurrent = MutableStateFlow(64) val concurrent = MutableStateFlow(64)
val processing = MutableStateFlow(0) val processing = MutableStateFlow(0)
@ -66,13 +77,40 @@ suspend fun main() {
processing.update { it + 1 } processing.update { it + 1 }
try { try {
emit(taskResult {
val taskResult = map[it.name]?.run(
TaskRequest(
it.name,
UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits),
it.attempt,
Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()),
PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap)
)
)
if (taskResult == null) {
throw Exception()
}
emit(taskResult {
this.success = taskResult.success
this.attempt = it.attempt
this.id = it.id
this.result.putAll(
PropertySerializeUtils.serialize(
propertySerializerFactory,
taskResult.result
)
)
this.message = taskResult.message
}) })
} catch (e: Exception) { } catch (e: Exception) {
emit(taskResult { emit(taskResult {
this.success = false this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
}) })
} finally { } finally {
processing.update { it - 1 } processing.update { it - 1 }

View File

@ -0,0 +1,29 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
import dev.usbharu.owl.common.property.PropertyValue
import java.time.Instant
import java.util.*
data class TaskRequest(
val name:String,
val id:UUID,
val attempt:Int,
val queuedAt: Instant,
val properties:Map<String,PropertyValue<*>>
)

View File

@ -0,0 +1,25 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
import dev.usbharu.owl.common.property.PropertyValue
data class TaskResult(
val success: Boolean,
val result: Map<String, PropertyValue<*>>,
val message: String
)

View File

@ -0,0 +1,21 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
fun interface TaskRunner {
suspend fun run(taskRequest: TaskRequest):TaskResult
}