feat: OWLでタスクを実行できるように

This commit is contained in:
usbharu 2024-05-11 22:50:03 +09:00
parent 5cdf78483d
commit 9c7fda27ea
10 changed files with 90 additions and 25 deletions

View File

@ -17,6 +17,9 @@
package dev.usbharu.hideout.core.external.job
import dev.usbharu.hideout.activitypub.domain.model.Follow
import dev.usbharu.owl.common.property.ObjectPropertyValue
import dev.usbharu.owl.common.property.PropertyValue
import dev.usbharu.owl.common.property.StringPropertyValue
import dev.usbharu.owl.common.task.Task
import dev.usbharu.owl.common.task.TaskDefinition
import org.springframework.stereotype.Component
@ -32,4 +35,19 @@ data object ReceiveFollowTaskDef : TaskDefinition<ReceiveFollowTask> {
override val type: Class<ReceiveFollowTask>
get() = ReceiveFollowTask::class.java
override fun serialize(task: ReceiveFollowTask): Map<String, PropertyValue<*>> {
return mapOf(
"actor" to StringPropertyValue(task.actor),
"follow" to ObjectPropertyValue(task.follow),
"targetActor" to StringPropertyValue(task.targetActor)
)
}
override fun deserialize(value: Map<String, PropertyValue<*>>): ReceiveFollowTask {
return ReceiveFollowTask(
value.getValue("actor").value as String,
value.getValue("follow").value as Follow,
value.getValue("targetActor").value as String,
)
}
}

View File

@ -9,8 +9,6 @@
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
<Logger name="dev.usbharu.owl.broker.service.QueuedTaskAssignerImpl" level="TRACE">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="dev.usbharu.owl.broker.service.QueuedTaskAssignerImpl" level="TRACE"/>
</Loggers>
</Configuration>

View File

@ -16,19 +16,37 @@
package dev.usbharu.hideout
import com.fasterxml.jackson.databind.ObjectMapper
import dev.usbharu.owl.common.property.*
import dev.usbharu.owl.consumer.StandaloneConsumer
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.stereotype.Component
@Component
class WorkerRunner(private val springTaskRunnerLoader: SpringTaskRunnerLoader) : ApplicationRunner {
class WorkerRunner(
private val springTaskRunnerLoader: SpringTaskRunnerLoader,
@Qualifier("activitypub") private val objectMapper: ObjectMapper,
) : ApplicationRunner {
override fun run(args: ApplicationArguments?) {
GlobalScope.launch(Dispatchers.Default) {
val consumer = StandaloneConsumer(taskRunnerLoader = springTaskRunnerLoader)
val consumer = StandaloneConsumer(
taskRunnerLoader = springTaskRunnerLoader, propertySerializerFactory = CustomPropertySerializerFactory(
setOf(
IntegerPropertySerializer(),
StringPropertyValueSerializer(),
DoublePropertySerializer(),
BooleanPropertySerializer(),
LongPropertySerializer(),
FloatPropertySerializer(),
ObjectPropertySerializer(objectMapper),
)
)
)
consumer.init()
consumer.start()
}

View File

@ -34,7 +34,6 @@ class QueuedTaskAssignerImpl(
private val queueStore: QueueStore
) : QueuedTaskAssigner {
override fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
logger.trace("Ready {}/{}", numberOfConcurrent, consumerId)
return flow {
taskManagementService.findAssignableTask(consumerId, numberOfConcurrent)
.onEach {
@ -65,10 +64,10 @@ class QueuedTaskAssignerImpl(
logger.debug(
"Assign Task. name: {} id: {} attempt: {} consumer: {}",
queuedTask.task.name,
queuedTask.task.id,
queuedTask.attempt,
queuedTask.assignedConsumer
assignedTaskQueue.task.name,
assignedTaskQueue.task.id,
assignedTaskQueue.attempt,
assignedTaskQueue.assignedConsumer
)
assignedTaskQueue
} catch (e: QueueCannotDequeueException) {

View File

@ -130,8 +130,8 @@ class TaskManagementServiceImpl(
}
override suspend fun queueProcessed(taskResult: TaskResult) {
val task = taskRepository.findById(taskResult.id)
?: throw RecordNotFoundException("Task not found. id: ${taskResult.id}")
val task = taskRepository.findById(taskResult.taskId)
?: throw RecordNotFoundException("Task not found. id: ${taskResult.taskId}")
val taskDefinition = taskDefinitionRepository.findByName(task.name)
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
@ -147,7 +147,7 @@ class TaskManagementServiceImpl(
taskResultRepository.save(taskResult)
taskRepository.findByIdAndUpdate(
taskResult.id,
taskResult.taskId,
task.copy(completedAt = completedAt, attempt = taskResult.attempt)
)

View File

@ -25,7 +25,6 @@ class ObjectPropertyValue(override val value: Any) : PropertyValue<Any>() {
class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : PropertySerializer<Any> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
println(propertyValue::class.java)
return propertyValue is ObjectPropertyValue
}
@ -40,10 +39,9 @@ class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : Propert
}
override fun deserialize(string: String): PropertyValue<Any> {
//todo jacksonに読み込ませるStringがjackson:classname:jsonになっているのでjsonだけを読み込ませる
return ObjectPropertyValue(
objectMapper.readValue(
string,
string.substringAfter("jackson:").substringAfter(":"),
Class.forName(string.substringAfter("jackson:").substringBefore(":"))
)
)

View File

@ -0,0 +1,26 @@
/*
* Copyright (C) 2024 usbharu
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.usbharu.owl.common
import java.lang.reflect.Field
val Class<*>.allFields: List<Field>
get() = if (superclass != null) {
superclass.allFields + declaredFields
} else {
declaredFields.toList()
}.map { it.trySetAccessible();it }

View File

@ -49,7 +49,14 @@ object PropertySerializeUtils {
*/
fun deserialize(
serializerFactory: PropertySerializerFactory,
properties: Map<String, String>
): Map<String, PropertyValue<*>> =
properties.map { it.key to serializerFactory.factory(it.value).deserialize(it.value) }.toMap()
properties: Map<String, String>,
): Map<String, PropertyValue<*>> {
return properties.map {
try {
it.key to serializerFactory.factory(it.value).deserialize(it.value)
} catch (e: Exception) {
throw PropertySerializeException("Failed to deserialize property in ${serializerFactory.javaClass}", e)
}
}.toMap()
}
}

View File

@ -16,6 +16,7 @@
package dev.usbharu.owl.common.task
import dev.usbharu.owl.common.allFields
import dev.usbharu.owl.common.property.*
/**
@ -61,7 +62,7 @@ interface TaskDefinition<T : Task> {
*/
val propertyDefinition: PropertyDefinition
get() {
val mapValues = type.fields.associate { it.name to it.type }.mapValues {
val mapValues = type.allFields.associate { it.name to it.type }.mapValues {
when {
it.value === Int::class.java -> PropertyType.number
it.value === String::class.java -> PropertyType.string
@ -86,7 +87,7 @@ interface TaskDefinition<T : Task> {
* @return シリアライズされたタスク
*/
fun serialize(task: T): Map<String, PropertyValue<*>> {
return type.fields.associateBy { it.name }.mapValues {
return type.allFields.associateBy { it.name }.mapValues {
when {
it.value.type === Int::class.java -> IntegerPropertyValue(it.value.getInt(task))
it.value.type === String::class.java -> StringPropertyValue(it.value.get(task) as String)
@ -113,7 +114,7 @@ interface TaskDefinition<T : Task> {
throw IllegalArgumentException("Unable to deserialize value $value for type ${type.name}", e)
}
type.fields.associateBy { it.name }.mapValues {
type.allFields.associateBy { it.name }.mapValues {
when {
it.value.type === Int::class.java -> it.value.setInt(task, value.getValue(it.key).value as Int)
it.value.type === Double::class.java -> it.value.setDouble(task, value.getValue(it.key).value as Double)

View File

@ -89,11 +89,10 @@ class Consumer(
.ready(flow {
requestTask()
}).onEach {
logger.info("Start Task name: {}", it.name)
logger.info("Start Task name: {} id: {}", it.name, it.id)
processing.update { it + 1 }
try {
val taskResult = runnerMap.getValue(it.name).run(
TaskRequest(
it.name,
@ -137,7 +136,7 @@ class Consumer(
})
throw e
} catch (e: Exception) {
logger.warn("Failed execute task.", e)
logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e)
emit(taskResult {
this.success = false
this.attempt = it.attempt
@ -145,6 +144,7 @@ class Consumer(
this.message = e.localizedMessage
})
} finally {
logger.debug(" Task name: {} id: {}", it.name, it.id)
processing.update { it - 1 }
concurrent.update {
if (it < 64) {