consumer #1

Merged
usbharu merged 5 commits from consumer into master 2024-04-03 04:47:45 +00:00
4 changed files with 114 additions and 1 deletions
Showing only changes of commit 31bf8c9e5e - Show all commits

View File

@ -1,6 +1,10 @@
package dev.usbharu
import dev.usbharu.dev.usbharu.owl.consumer.TaskRequest
import dev.usbharu.dev.usbharu.owl.consumer.TaskRunner
import dev.usbharu.owl.*
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
import dev.usbharu.owl.common.property.PropertySerializeUtils
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
@ -8,6 +12,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.time.Instant
import java.util.*
import kotlin.math.max
suspend fun main() {
@ -29,6 +35,11 @@ suspend fun main() {
this.tasks.addAll(listOf())
})
val map = mapOf<String, TaskRunner>()
val propertySerializerFactory = CustomPropertySerializerFactory(setOf())
val concurrent = MutableStateFlow(64)
val processing = MutableStateFlow(0)
@ -66,13 +77,40 @@ suspend fun main() {
processing.update { it + 1 }
try {
emit(taskResult {
val taskResult = map[it.name]?.run(
TaskRequest(
it.name,
UUID(it.id.mostSignificantUuidBits, it.id.leastSignificantUuidBits),
it.attempt,
Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()),
PropertySerializeUtils.deserialize(propertySerializerFactory, it.propertiesMap)
)
)
if (taskResult == null) {
throw Exception()
}
emit(taskResult {
this.success = taskResult.success
this.attempt = it.attempt
this.id = it.id
this.result.putAll(
PropertySerializeUtils.serialize(
propertySerializerFactory,
taskResult.result
)
)
this.message = taskResult.message
})
} catch (e: Exception) {
emit(taskResult {
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
})
} finally {
processing.update { it - 1 }

View File

@ -0,0 +1,29 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
import dev.usbharu.owl.common.property.PropertyValue
import java.time.Instant
import java.util.*
data class TaskRequest(
val name:String,
val id:UUID,
val attempt:Int,
val queuedAt: Instant,
val properties:Map<String,PropertyValue<*>>
)

View File

@ -0,0 +1,25 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
import dev.usbharu.owl.common.property.PropertyValue
data class TaskResult(
val success: Boolean,
val result: Map<String, PropertyValue<*>>,
val message: String
)

View File

@ -0,0 +1,21 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.dev.usbharu.owl.consumer
fun interface TaskRunner {
suspend fun run(taskRequest: TaskRequest):TaskResult
}