Compare commits

...

10 Commits

113 changed files with 652 additions and 507 deletions

View File

@ -27,7 +27,7 @@ jobs:
core: ${{ steps.filter.outputs.core }}
mastodon: ${{ steps.filter.outputs.mastodon }}
activitypub: ${{ steps.filter.outputs.ap }}
owl: $${{ steps.filter.outputs.owl }}
owl: ${{ steps.filter.outputs.owl }}
steps:
- name: Checkout
uses: actions/checkout@v4
@ -166,7 +166,8 @@ jobs:
gradle-home-cache-cleanup: true
- name: Build
run: ./owl/gradlew :owl:classes --no-daemon
working-directory: owl
run: ./gradlew :classes --no-daemon
hideout-core-unit-test:
needs:
@ -311,7 +312,8 @@ jobs:
gradle-home-cache-cleanup: true
- name: Unit Test
run: ./owl/gradlew :owl:koverXmlReport
working-directory: owl
run: ./gradlew :koverXmlReport --rerun-tasks
- name: JUnit Test Report
uses: mikepenz/action-junit-report@v4
@ -389,7 +391,8 @@ jobs:
- name: owl Lint
if: always()
run: ./owl/gradlew :owl:detektMain
working-directory: owl
run: ./gradlew :detektMain
- name: Auto Commit
if: ${{ always() }}

View File

@ -52,6 +52,7 @@ repositories {
dependencies {
implementation("dev.usbharu:hideout-core:0.0.1")
implementation("dev.usbharu:hideout-mastodon:1.0-SNAPSHOT")
implementation("dev.usbharu:hideout-activitypub:1.0-SNAPSHOT")
}
tasks.register("run") {

View File

@ -1,5 +1,9 @@
import kotlinx.kover.gradle.plugin.dsl.CoverageUnit
plugins {
kotlin("jvm") version "1.9.25"
alias(libs.plugins.kotlin.jvm)
alias(libs.plugins.detekt)
alias(libs.plugins.kover)
}
group = "dev.usbharu"
@ -11,6 +15,7 @@ repositories {
dependencies {
testImplementation(kotlin("test"))
detektPlugins(libs.detekt.formatting)
}
tasks.test {
@ -18,4 +23,93 @@ tasks.test {
}
kotlin {
jvmToolchain(21)
}
}
configurations {
matching { it.name == "detekt" }.all {
resolutionStrategy.eachDependency {
if (requested.group == "org.jetbrains.kotlin") {
useVersion(io.gitlab.arturbosch.detekt.getSupportedKotlinVersion())
}
}
}
all {
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
}
}
tasks {
withType<io.gitlab.arturbosch.detekt.Detekt> {
exclude("**/generated/**")
setSource("src/main/kotlin")
exclude("build/")
configureEach {
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
}
}
withType<io.gitlab.arturbosch.detekt.DetektCreateBaselineTask>() {
configureEach {
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
}
}
withType<Test> {
useJUnitPlatform()
}
}
project.gradle.taskGraph.whenReady {
if (this.hasTask(":koverGenerateArtifact")) {
val task = this.allTasks.find { it.name == "test" }
val verificationTask = task as VerificationTask
verificationTask.ignoreFailures = true
}
}
detekt {
parallel = true
config.setFrom(files("../detekt.yml"))
buildUponDefaultConfig = true
basePath = "${rootDir.absolutePath}/src/main/kotlin"
autoCorrect = true
}
kover {
currentProject {
sources {
}
}
reports {
verify {
rule {
bound {
minValue = 50
coverageUnits = CoverageUnit.INSTRUCTION
}
}
}
total {
xml {
title = "Hideout ActivityPub"
xmlFile = file("$buildDir/reports/kover/hideout-activitypub.xml")
}
}
filters {
excludes {
annotatedBy("org.springframework.context.annotation.Configuration")
annotatedBy("org.springframework.boot.context.properties.ConfigurationProperties")
packages(
"dev.usbharu.hideout.controller.mastodon.generated",
"dev.usbharu.hideout.domain.mastodon.model.generated"
)
packages("org.springframework")
packages("org.jetbrains")
}
}
}
}

0
hideout-activitypub/gradlew vendored Normal file → Executable file
View File

View File

@ -3,3 +3,14 @@ plugins {
}
rootProject.name = "hideout-activitypub"
dependencyResolutionManagement {
repositories {
mavenCentral()
}
versionCatalogs {
create("libs") {
from(files("../libs.versions.toml"))
}
}
}

View File

@ -2,4 +2,4 @@ package dev.usbharu
fun main() {
println("Hello World!")
}
}

View File

@ -1,6 +1,10 @@
import kotlinx.kover.gradle.plugin.dsl.CoverageUnit
plugins {
alias(libs.plugins.kotlin.jvm)
id("maven-publish")
alias(libs.plugins.kover)
alias(libs.plugins.detekt)
}
@ -27,6 +31,8 @@ subprojects {
apply {
plugin("org.jetbrains.kotlin.jvm")
plugin("maven-publish")
plugin(rootProject.libs.plugins.kover.get().pluginId)
plugin(rootProject.libs.plugins.detekt.get().pluginId)
}
kotlin {
jvmToolchain(21)
@ -35,12 +41,42 @@ subprojects {
dependencies {
implementation("org.slf4j:slf4j-api:2.0.15")
testImplementation("org.junit.jupiter:junit-jupiter:5.10.3")
detektPlugins(rootProject.libs.detekt.formatting)
}
tasks.test {
useJUnitPlatform()
detekt {
parallel = true
config.setFrom(files("$rootDir/../detekt.yml"))
buildUponDefaultConfig = true
basePath = "${projectDir}/src/main/kotlin"
autoCorrect = true
}
project.gradle.taskGraph.whenReady {
if (this.hasTask(":koverGenerateArtifact")) {
val task = this.allTasks.find { it.name == "test" }
val verificationTask = task as VerificationTask
verificationTask.ignoreFailures = true
}
}
tasks {
withType<io.gitlab.arturbosch.detekt.Detekt> {
exclude("**/generated/**")
setSource("src/main/kotlin")
exclude("build/")
configureEach {
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
}
}
withType<io.gitlab.arturbosch.detekt.DetektCreateBaselineTask>() {
configureEach {
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
}
}
withType<Test> {
useJUnitPlatform()
}
}
publishing {
@ -69,4 +105,63 @@ subprojects {
}
}
}
}
}
dependencies {
kover(project(":owl-broker"))
kover(project(":owl-broker:owl-broker-mongodb"))
kover(project(":owl-common"))
kover(project(":owl-common:owl-common-serialize-jackson"))
kover(project(":owl-consumer"))
kover(project(":owl-producer"))
kover(project(":owl-producer:owl-producer-api"))
kover(project(":owl-producer:owl-producer-default"))
kover(project(":owl-producer:owl-producer-embedded"))
}
detekt {
parallel = true
config.setFrom(files("../detekt.yml"))
buildUponDefaultConfig = true
basePath = "${projectDir}/src/main/kotlin"
autoCorrect = true
}
project.gradle.taskGraph.whenReady {
if (this.hasTask(":koverGenerateArtifact")) {
val task = this.allTasks.find { it.name == "test" }
val verificationTask = task as VerificationTask
verificationTask.ignoreFailures = true
}
}
kover {
currentProject {
sources {
excludedSourceSets.addAll("grpc", "grpckt")
}
}
reports {
verify {
rule {
bound {
minValue = 50
coverageUnits = CoverageUnit.INSTRUCTION
}
}
}
total {
xml {
title = "Owl"
xmlFile = file("$buildDir/reports/kover/owl.xml")
}
filters {
excludes {
packages("dev.usbharu.owl.generated")
}
}
}
}
}

View File

@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME

View File

@ -35,7 +35,7 @@ protobuf {
artifact = libs.protoc.gen.grpc.java.get().toString()
}
create("grpckt") {
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + "jdk8@jar"
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar"
}
}
generateProtoTasks {

View File

@ -32,7 +32,6 @@ import org.koin.dsl.module
class MongoModuleContext : ModuleContext {
override fun module(): Module {
return module {
single {
val clientSettings =
@ -47,7 +46,6 @@ class MongoModuleContext : ModuleContext {
)
.uuidRepresentation(UuidRepresentation.STANDARD).build()
MongoClient.create(clientSettings)
.getDatabase(System.getProperty("owl.broker.mongo.database", "mongo-test"))
}
@ -59,4 +57,4 @@ class MongoModuleContext : ModuleContext {
single<TaskResultRepository> { MongodbTaskResultRepository(get(), get()) }
}
}
}
}

View File

@ -33,7 +33,11 @@ class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository {
private val collection = database.getCollection<ConsumerMongodb>("consumers")
override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) {
collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true))
collection.replaceOne(
Filters.eq("_id", consumer.id.toString()),
ConsumerMongodb.of(consumer),
ReplaceOptions().upsert(true)
)
return@withContext consumer
}
@ -49,15 +53,19 @@ data class ConsumerMongodb(
val name: String,
val hostname: String,
val tasks: List<String>
){
) {
fun toConsumer():Consumer{
fun toConsumer(): Consumer {
return Consumer(
UUID.fromString(id), name, hostname, tasks
UUID.fromString(id),
name,
hostname,
tasks
)
}
companion object{
fun of(consumer: Consumer):ConsumerMongodb{
companion object {
fun of(consumer: Consumer): ConsumerMongodb {
return ConsumerMongodb(
consumer.id.toString(),
consumer.name,
@ -66,4 +74,4 @@ data class ConsumerMongodb(
)
}
}
}
}

View File

@ -48,7 +48,8 @@ class MongodbQueuedTaskRepository(
override suspend fun save(queuedTask: QueuedTask): QueuedTask {
withContext(Dispatchers.IO) {
collection.replaceOne(
eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
eq("_id", queuedTask.task.id.toString()),
QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
ReplaceOptions().upsert(true)
)
}
@ -57,7 +58,6 @@ class MongodbQueuedTaskRepository(
override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask {
return withContext(Dispatchers.IO) {
val findOneAndUpdate = collection.findOneAndUpdate(
and(
eq("_id", id.toString()),
@ -108,7 +108,7 @@ data class QueuedTaskMongodb(
val task: TaskMongodb,
val attempt: Int,
val queuedAt: Instant,
val priority:Int,
val priority: Int,
val isActive: Boolean,
val timeoutAt: Instant?,
val assignedConsumer: String?,
@ -155,14 +155,14 @@ data class QueuedTaskMongodb(
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
return TaskMongodb(
task.name,
task.id.toString(),
task.publishProducerId.toString(),
task.publishedAt,
task.nextRetry,
task.completedAt,
task.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
name = task.name,
id = task.id.toString(),
publishProducerId = task.publishProducerId.toString(),
publishedAt = task.publishedAt,
nextRetry = task.nextRetry,
completedAt = task.completedAt,
attempt = task.attempt,
properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
)
}
}
@ -171,16 +171,16 @@ data class QueuedTaskMongodb(
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb {
return QueuedTaskMongodb(
queuedTask.task.id.toString(),
TaskMongodb.of(propertySerializerFactory, queuedTask.task),
queuedTask.attempt,
queuedTask.queuedAt,
queuedTask.priority,
queuedTask.isActive,
queuedTask.timeoutAt,
queuedTask.assignedConsumer?.toString(),
queuedTask.assignedAt
id = queuedTask.task.id.toString(),
task = TaskMongodb.of(propertySerializerFactory, queuedTask.task),
attempt = queuedTask.attempt,
queuedAt = queuedTask.queuedAt,
priority = queuedTask.priority,
isActive = queuedTask.isActive,
timeoutAt = queuedTask.timeoutAt,
assignedConsumer = queuedTask.assignedConsumer?.toString(),
assignedAt = queuedTask.assignedAt
)
}
}
}
}

View File

@ -41,7 +41,7 @@ class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionR
}
override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) {
collection.deleteOne(Filters.eq("_id",name))
collection.deleteOne(Filters.eq("_id", name))
}
override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) {
@ -82,4 +82,4 @@ data class TaskDefinitionMongodb(
)
}
}
}
}

View File

@ -36,27 +36,29 @@ import org.bson.codecs.pojo.annotations.BsonRepresentation
import java.time.Instant
import java.util.*
class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) :
TaskRepository {
private val collection = database.getCollection<TaskMongodb>("tasks")
override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) {
collection.replaceOne(
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task),
Filters.eq("_id", task.id.toString()),
TaskMongodb.of(propertySerializerFactory, task),
ReplaceOptions().upsert(true)
)
return@withContext task
}
override suspend fun saveAll(tasks: List<Task>): Unit = withContext(Dispatchers.IO) {
collection.bulkWrite(tasks.map {
ReplaceOneModel(
Filters.eq(it.id.toString()),
TaskMongodb.of(propertySerializerFactory, it),
ReplaceOptions().upsert(true)
)
})
collection.bulkWrite(
tasks.map {
ReplaceOneModel(
Filters.eq(it.id.toString()),
TaskMongodb.of(propertySerializerFactory, it),
ReplaceOptions().upsert(true)
)
}
)
}
override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task> {
@ -75,12 +77,13 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
override suspend fun findByIdAndUpdate(id: UUID, task: Task) {
collection.replaceOne(
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task),
Filters.eq("_id", task.id.toString()),
TaskMongodb.of(propertySerializerFactory, task),
ReplaceOptions().upsert(false)
)
}
override suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> {
override fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> {
return collection
.find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString()))
.map { it.toTask(propertySerializerFactory) }
@ -116,15 +119,15 @@ data class TaskMongodb(
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
return TaskMongodb(
task.name,
task.id.toString(),
task.publishProducerId.toString(),
task.publishedAt,
task.nextRetry,
task.completedAt,
task.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
name = task.name,
id = task.id.toString(),
publishProducerId = task.publishProducerId.toString(),
publishedAt = task.publishedAt,
nextRetry = task.nextRetry,
completedAt = task.completedAt,
attempt = task.attempt,
properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
)
}
}
}
}

View File

@ -41,14 +41,17 @@ class MongodbTaskResultRepository(
private val collection = database.getCollection<TaskResultMongodb>("task_results")
override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) {
collection.replaceOne(
Filters.eq(taskResult.id.toString()), TaskResultMongodb.of(propertySerializerFactory, taskResult),
Filters.eq(taskResult.id.toString()),
TaskResultMongodb.of(propertySerializerFactory, taskResult),
ReplaceOptions().upsert(true)
)
return@withContext taskResult
}
override fun findByTaskId(id: UUID): Flow<TaskResult> {
return collection.find(Filters.eq(id.toString())).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO)
return collection.find(
Filters.eq(id.toString())
).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO)
}
}
@ -65,27 +68,25 @@ data class TaskResultMongodb(
fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult {
return TaskResult(
UUID.fromString(id),
UUID.fromString(taskId),
success,
attempt,
PropertySerializeUtils.deserialize(propertySerializerFactory, result),
message
id = UUID.fromString(id),
taskId = UUID.fromString(taskId),
success = success,
attempt = attempt,
result = PropertySerializeUtils.deserialize(propertySerializerFactory, result),
message = message
)
}
companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb {
return TaskResultMongodb(
taskResult.id.toString(),
taskResult.taskId.toString(),
taskResult.success,
taskResult.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result),
taskResult.message
id = taskResult.id.toString(),
taskId = taskResult.taskId.toString(),
success = taskResult.success,
attempt = taskResult.attempt,
result = PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result),
message = taskResult.message
)
}
}
}
}

View File

@ -7,12 +7,14 @@ import dev.usbharu.owl.broker.domain.model.consumer.Consumer
import kotlinx.coroutines.runBlocking
import org.bson.UuidRepresentation
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import java.util.*
class MongodbConsumerRepositoryTest {
@Test
@Disabled
fun name() {
val clientSettings =

View File

@ -90,7 +90,6 @@ fun main() {
logger.info("Use module name: {}", moduleContext)
val koin = startKoin {
printLogger()
@ -98,7 +97,6 @@ fun main() {
single<RetryPolicyFactory> {
DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy()))
}
}
modules(mainModule, module, moduleContext.module())
}
@ -108,4 +106,4 @@ fun main() {
runBlocking {
application.start(50051).join()
}
}
}

View File

@ -19,11 +19,9 @@ package dev.usbharu.owl.broker
import org.koin.core.module.Module
interface ModuleContext {
fun module():Module
fun module(): Module
}
data object EmptyModuleContext : ModuleContext {
override fun module(): Module {
return org.koin.dsl.module { }
}
}
override fun module(): Module = org.koin.dsl.module { }
}

View File

@ -38,7 +38,7 @@ class OwlBrokerApplication(
private lateinit var server: Server
fun start(port: Int,coroutineScope: CoroutineScope = GlobalScope):Job {
fun start(port: Int, coroutineScope: CoroutineScope = GlobalScope): Job {
server = ServerBuilder.forPort(port)
.addService(assignmentTaskService)
.addService(definitionTaskService)
@ -64,5 +64,4 @@ class OwlBrokerApplication(
fun stop() {
server.shutdown()
}
}
}

View File

@ -27,4 +27,4 @@ class InvalidRepositoryException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -27,4 +27,4 @@ class FailedSaveException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -27,4 +27,4 @@ open class RecordNotFoundException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -27,4 +27,4 @@ class IncompatibleTaskException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -27,4 +27,4 @@ class QueueCannotDequeueException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -27,4 +27,4 @@ class TaskNotRegisterException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -19,7 +19,7 @@ package dev.usbharu.owl.broker.domain.model.consumer
import java.util.*
interface ConsumerRepository {
suspend fun save(consumer: Consumer):Consumer
suspend fun save(consumer: Consumer): Consumer
suspend fun findById(id:UUID):Consumer?
}
suspend fun findById(id: UUID): Consumer?
}

View File

@ -20,9 +20,9 @@ import java.time.Instant
import java.util.*
data class Producer(
val id:UUID,
val name:String,
val hostname:String,
val registeredTask:List<String>,
val id: UUID,
val name: String,
val hostname: String,
val registeredTask: List<String>,
val createdAt: Instant
)

View File

@ -17,5 +17,5 @@
package dev.usbharu.owl.broker.domain.model.producer
interface ProducerRepository {
suspend fun save(producer: Producer):Producer
}
suspend fun save(producer: Producer): Producer
}

View File

@ -21,14 +21,14 @@ import java.time.Instant
import java.util.*
interface QueuedTaskRepository {
suspend fun save(queuedTask: QueuedTask):QueuedTask
suspend fun save(queuedTask: QueuedTask): QueuedTask
/**
* トランザクションの代わり
*/
suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask
suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask
fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
}
}

View File

@ -24,11 +24,11 @@ import java.util.*
* @param attempt 失敗を含めて試行した回数
*/
data class Task(
val name:String,
val name: String,
val id: UUID,
val publishProducerId:UUID,
val publishProducerId: UUID,
val publishedAt: Instant,
val nextRetry:Instant,
val nextRetry: Instant,
val completedAt: Instant? = null,
val attempt: Int,
val properties: Map<String, PropertyValue<*>>

View File

@ -21,15 +21,15 @@ import java.time.Instant
import java.util.*
interface TaskRepository {
suspend fun save(task: Task):Task
suspend fun save(task: Task): Task
suspend fun saveAll(tasks:List<Task>)
suspend fun saveAll(tasks: List<Task>)
fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp:Instant): Flow<Task>
fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task>
suspend fun findById(uuid: UUID): Task?
suspend fun findByIdAndUpdate(id:UUID,task: Task)
suspend fun findByIdAndUpdate(id: UUID, task: Task)
suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId:UUID):Flow<Task>
}
fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task>
}

View File

@ -22,5 +22,5 @@ data class TaskDefinition(
val maxRetry: Int,
val timeoutMilli: Long,
val propertyDefinitionHash: Long,
val retryPolicy:String
val retryPolicy: String
)

View File

@ -18,7 +18,7 @@ package dev.usbharu.owl.broker.domain.model.taskdefinition
interface TaskDefinitionRepository {
suspend fun save(taskDefinition: TaskDefinition): TaskDefinition
suspend fun deleteByName(name:String)
suspend fun deleteByName(name: String)
suspend fun findByName(name:String):TaskDefinition?
}
suspend fun findByName(name: String): TaskDefinition?
}

View File

@ -21,7 +21,7 @@ import java.util.*
data class TaskResult(
val id: UUID,
val taskId:UUID,
val taskId: UUID,
val success: Boolean,
val attempt: Int,
val result: Map<String, PropertyValue<*>>,

View File

@ -20,6 +20,6 @@ import kotlinx.coroutines.flow.Flow
import java.util.*
interface TaskResultRepository {
suspend fun save(taskResult: TaskResult):TaskResult
fun findByTaskId(id:UUID): Flow<TaskResult>
}
suspend fun save(taskResult: TaskResult): TaskResult
fun findByTaskId(id: UUID): Flow<TaskResult>
}

View File

@ -17,7 +17,7 @@
package dev.usbharu.owl.broker.external
import com.google.protobuf.Timestamp
import dev.usbharu.owl.Uuid
import dev.usbharu.owl.generated.Uuid
import java.time.Instant
import java.util.*
@ -32,4 +32,4 @@ fun UUID.toUUID(): Uuid.UUID = Uuid
fun Timestamp.toInstant(): Instant = Instant.ofEpochSecond(seconds, nanos.toLong())
fun Instant.toTimestamp():Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build()
fun Instant.toTimestamp(): Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build()

View File

@ -16,14 +16,13 @@
package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
import dev.usbharu.owl.Task
import dev.usbharu.owl.broker.external.toTimestamp
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.QueuedTaskAssigner
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.generated.AssignmentTaskServiceGrpcKt
import dev.usbharu.owl.generated.Task
import io.grpc.Status
import io.grpc.StatusException
import kotlinx.coroutines.flow.Flow
@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
class AssignmentTaskService(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val queuedTaskAssigner: QueuedTaskAssigner,
@ -42,7 +40,6 @@ class AssignmentTaskService(
AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) {
override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> {
return try {
requests
.flatMapMerge {
@ -72,4 +69,4 @@ class AssignmentTaskService(
companion object {
private val logger = LoggerFactory.getLogger(AssignmentTaskService::class.java)
}
}
}

View File

@ -17,25 +17,28 @@
package dev.usbharu.owl.broker.interfaces.grpc
import com.google.protobuf.Empty
import dev.usbharu.owl.DefinitionTask
import dev.usbharu.owl.DefinitionTask.TaskDefined
import dev.usbharu.owl.DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineImplBase
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition
import dev.usbharu.owl.broker.service.RegisterTaskService
import dev.usbharu.owl.generated.DefinitionTask
import dev.usbharu.owl.generated.DefinitionTask.TaskDefined
import dev.usbharu.owl.generated.DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineImplBase
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineContext,private val registerTaskService: RegisterTaskService) :
class DefinitionTaskService(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val registerTaskService: RegisterTaskService
) :
DefinitionTaskServiceCoroutineImplBase(coroutineContext) {
override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined {
registerTaskService.registerTask(
TaskDefinition(
request.name,
request.priority,
request.maxRetry,
request.timeoutMilli,
request.propertyDefinitionHash,
request.retryPolicy
name = request.name,
priority = request.priority,
maxRetry = request.maxRetry,
timeoutMilli = request.timeoutMilli,
propertyDefinitionHash = request.propertyDefinitionHash,
retryPolicy = request.retryPolicy
)
)
return TaskDefined
@ -50,4 +53,4 @@ class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineC
registerTaskService.unregisterTask(request.name)
return Empty.getDefaultInstance()
}
}
}

View File

@ -16,26 +16,28 @@
package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.ProducerOuterClass
import dev.usbharu.owl.ProducerServiceGrpcKt.ProducerServiceCoroutineImplBase
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.ProducerService
import dev.usbharu.owl.broker.service.RegisterProducerRequest
import dev.usbharu.owl.generated.ProducerOuterClass
import dev.usbharu.owl.generated.ProducerServiceGrpcKt.ProducerServiceCoroutineImplBase
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
class ProducerService(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val producerService: ProducerService
) :
ProducerServiceCoroutineImplBase(coroutineContext) {
override suspend fun registerProducer(request: ProducerOuterClass.Producer): ProducerOuterClass.RegisterProducerResponse {
override suspend fun registerProducer(
request: ProducerOuterClass.Producer
): ProducerOuterClass.RegisterProducerResponse {
val registerProducer = producerService.registerProducer(
RegisterProducerRequest(
request.name, request.hostname
request.name,
request.hostname
)
)
return ProducerOuterClass.RegisterProducerResponse.newBuilder().setId(registerProducer.toUUID()).build()
}
}
}

View File

@ -16,11 +16,11 @@
package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.Consumer
import dev.usbharu.owl.SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineImplBase
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.ConsumerService
import dev.usbharu.owl.broker.service.RegisterConsumerRequest
import dev.usbharu.owl.generated.Consumer
import dev.usbharu.owl.generated.SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineImplBase
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@ -34,4 +34,4 @@ class SubscribeTaskService(
consumerService.registerConsumer(RegisterConsumerRequest(request.name, request.hostname, request.tasksList))
return Consumer.SubscribeTaskResponse.newBuilder().setId(id.toUUID()).build()
}
}
}

View File

@ -16,15 +16,15 @@
package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.PublishTaskOuterClass
import dev.usbharu.owl.PublishTaskOuterClass.PublishedTask
import dev.usbharu.owl.PublishTaskOuterClass.PublishedTasks
import dev.usbharu.owl.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.PublishTask
import dev.usbharu.owl.broker.service.TaskPublishService
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.generated.PublishTaskOuterClass
import dev.usbharu.owl.generated.PublishTaskOuterClass.PublishedTask
import dev.usbharu.owl.generated.PublishTaskOuterClass.PublishedTasks
import dev.usbharu.owl.generated.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase
import io.grpc.Status
import io.grpc.StatusException
import org.slf4j.LoggerFactory
@ -39,13 +39,9 @@ class TaskPublishService(
TaskPublishServiceCoroutineImplBase(coroutineContext) {
override suspend fun publishTask(request: PublishTaskOuterClass.PublishTask): PublishedTask {
logger.warn("aaaaaaaaaaa")
return try {
val publishedTask = taskPublishService.publishTask(
PublishTask(
request.name,
@ -61,7 +57,6 @@ class TaskPublishService(
}
override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishedTasks {
val tasks = request.propertiesArrayList.map {
PublishTask(
request.name,
@ -79,4 +74,4 @@ class TaskPublishService(
private val logger =
LoggerFactory.getLogger(dev.usbharu.owl.broker.interfaces.grpc.TaskPublishService::class.java)
}
}
}

View File

@ -17,13 +17,13 @@
package dev.usbharu.owl.broker.interfaces.grpc
import com.google.protobuf.Empty
import dev.usbharu.owl.TaskResultOuterClass
import dev.usbharu.owl.TaskResultServiceGrpcKt
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.TaskManagementService
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.generated.TaskResultOuterClass
import dev.usbharu.owl.generated.TaskResultServiceGrpcKt
import io.grpc.Status
import io.grpc.StatusException
import kotlinx.coroutines.CancellationException
@ -67,4 +67,4 @@ class TaskResultService(
companion object {
private val logger = LoggerFactory.getLogger(TaskResultService::class.java)
}
}
}

View File

@ -16,11 +16,11 @@
package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.*
import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.TaskManagementService
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.generated.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlin.coroutines.CoroutineContext
@ -41,16 +41,18 @@ class TaskResultSubscribeService(
name = it.name
attempt = it.attempt
success = it.success
results.addAll(it.results.map {
taskResult {
id = it.taskId.toUUID()
success = it.success
attempt = it.attempt
result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result))
message = it.message
results.addAll(
it.results.map {
taskResult {
id = it.taskId.toUUID()
success = it.success
attempt = it.attempt
result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result))
message = it.message
}
}
})
)
}
}
}
}
}

View File

@ -42,7 +42,5 @@ class AssignQueuedTaskDeciderImpl(
).take(numberOfConcurrent)
)
}
}
}
}

View File

@ -57,4 +57,4 @@ data class RegisterConsumerRequest(
val name: String,
val hostname: String,
val tasks: List<String>
)
)

View File

@ -28,4 +28,4 @@ class DefaultPropertySerializerFactory :
LongPropertySerializer(),
FloatPropertySerializer(),
)
)
)

View File

@ -26,10 +26,8 @@ interface ProducerService {
suspend fun registerProducer(producer: RegisterProducerRequest): UUID
}
class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService {
override suspend fun registerProducer(producer: RegisterProducerRequest): UUID {
val id = UUID.randomUUID()
val saveProducer = Producer(
@ -54,4 +52,4 @@ class ProducerServiceImpl(private val producerRepository: ProducerRepository) :
data class RegisterProducerRequest(
val name: String,
val hostname: String
)
)

View File

@ -29,19 +29,14 @@ interface QueueScanner {
fun startScan(): Flow<QueuedTask>
}
class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner {
override fun startScan(): Flow<QueuedTask> {
return flow {
while (currentCoroutineContext().isActive) {
emitAll(scanQueue())
delay(1000)
}
override fun startScan(): Flow<QueuedTask> = flow {
while (currentCoroutineContext().isActive) {
emitAll(scanQueue())
delay(1000)
}
}
private fun scanQueue(): Flow<QueuedTask> {
return queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10))
}
}
private fun scanQueue(): Flow<QueuedTask> =
queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10))
}

View File

@ -32,33 +32,24 @@ interface QueueStore {
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
}
class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore {
override suspend fun enqueue(queuedTask: QueuedTask) {
queuedTaskRepository.save(queuedTask)
}
override suspend fun enqueueAll(queuedTaskList: List<QueuedTask>) {
queuedTaskList.forEach { enqueue(it) }
}
override suspend fun enqueueAll(queuedTaskList: List<QueuedTask>) = queuedTaskList.forEach { enqueue(it) }
override suspend fun dequeue(queuedTask: QueuedTask) {
queuedTaskRepository.findByTaskIdAndAssignedConsumerIsNullAndUpdate(queuedTask.task.id, queuedTask)
}
override suspend fun dequeueAll(queuedTaskList: List<QueuedTask>) {
return queuedTaskList.forEach { dequeue(it) }
}
override suspend fun dequeueAll(queuedTaskList: List<QueuedTask>) = queuedTaskList.forEach { dequeue(it) }
override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(
tasks: List<String>,
limit: Int
): Flow<QueuedTask> {
return queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit)
}
): Flow<QueuedTask> = queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit)
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> {
return queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant)
}
}
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> =
queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant)
}

View File

@ -27,7 +27,6 @@ interface QueuedTaskAssigner {
fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
}
class QueuedTaskAssignerImpl(
private val taskManagementService: TaskManagementService,
private val queueStore: QueueStore
@ -49,7 +48,6 @@ class QueuedTaskAssignerImpl(
private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? {
return try {
val assignedTaskQueue =
queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false)
logger.trace(
@ -78,4 +76,4 @@ class QueuedTaskAssignerImpl(
companion object {
private val logger = LoggerFactory.getLogger(QueuedTaskAssignerImpl::class.java)
}
}
}

View File

@ -24,33 +24,34 @@ import org.slf4j.LoggerFactory
interface RegisterTaskService {
suspend fun registerTask(taskDefinition: TaskDefinition)
suspend fun unregisterTask(name:String)
suspend fun unregisterTask(name: String)
}
class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService {
override suspend fun registerTask(taskDefinition: TaskDefinition) {
val definedTask = taskDefinitionRepository.findByName(taskDefinition.name)
if (definedTask != null) {
logger.debug("Task already defined. name: ${taskDefinition.name}")
if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) {
throw IncompatibleTaskException("Task ${taskDefinition.name} has already been defined, and the parameters are incompatible.")
throw IncompatibleTaskException(
"Task ${taskDefinition.name} has already been defined, and the parameters are incompatible."
)
}
return
}
taskDefinitionRepository.save(taskDefinition)
logger.info("Register a new task. name: {}",taskDefinition.name)
logger.info("Register a new task. name: {}", taskDefinition.name)
}
// todo すでにpublish済みのタスクをどうするか決めさせる
override suspend fun unregisterTask(name: String) {
taskDefinitionRepository.deleteByName(name)
logger.info("Unregister a task. name: {}",name)
logger.info("Unregister a task. name: {}", name)
}
companion object{
companion object {
private val logger = LoggerFactory.getLogger(RegisterTaskServiceImpl::class.java)
}
}
}

View File

@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory
import java.time.Instant
import java.util.*
interface TaskManagementService {
suspend fun startManagement(coroutineScope: CoroutineScope)
@ -75,13 +74,11 @@ class TaskManagementServiceImpl(
}
}
override fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent)
}
private suspend fun enqueueTask(task: Task): QueuedTask {
val definedTask = taskDefinitionRepository.findByName(task.name)
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
@ -113,7 +110,6 @@ class TaskManagementServiceImpl(
queueStore.dequeue(timeoutQueue)
val task = taskRepository.findById(timeoutQueue.task.id)
?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}")
val copy = task.copy(attempt = timeoutQueue.attempt)
@ -148,12 +144,10 @@ class TaskManagementServiceImpl(
taskResult.taskId,
task.copy(completedAt = completedAt, attempt = taskResult.attempt)
)
}
override fun subscribeResult(producerId: UUID): Flow<TaskResults> {
return flow {
while (currentCoroutineContext().isActive) {
taskRepository
.findByPublishProducerIdAndCompletedAtIsNotNull(producerId)
@ -163,7 +157,7 @@ class TaskManagementServiceImpl(
TaskResults(
it.name,
it.id,
results.any { it.success },
results.any { taskResult -> taskResult.success },
it.attempt,
results
)
@ -171,12 +165,10 @@ class TaskManagementServiceImpl(
}
delay(500)
}
}
}
companion object {
private val logger = LoggerFactory.getLogger(TaskManagementServiceImpl::class.java)
}
}
}

View File

@ -78,7 +78,6 @@ class TaskPublishServiceImpl(
}
override suspend fun publishTasks(list: List<PublishTask>): List<PublishedTask> {
val first = list.first()
val definition = taskDefinitionRepository.findByName(first.name)
@ -90,14 +89,14 @@ class TaskPublishServiceImpl(
val tasks = list.map {
Task(
it.name,
UUID.randomUUID(),
first.producerId,
published,
nextRetry,
null,
0,
it.properties
name = it.name,
id = UUID.randomUUID(),
publishProducerId = first.producerId,
publishedAt = published,
nextRetry = nextRetry,
completedAt = null,
attempt = 0,
properties = it.properties
)
}
@ -111,4 +110,4 @@ class TaskPublishServiceImpl(
companion object {
private val logger = LoggerFactory.getLogger(TaskPublishServiceImpl::class.java)
}
}
}

View File

@ -20,9 +20,9 @@ import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import java.util.*
data class TaskResults(
val name:String,
val id:UUID,
val success:Boolean,
val attempt:Int,
val name: String,
val id: UUID,
val success: Boolean,
val attempt: Int,
val results: List<TaskResult>
)

View File

@ -49,4 +49,4 @@ class TaskScannerImpl(private val taskRepository: TaskRepository) :
companion object {
private val logger = LoggerFactory.getLogger(TaskScannerImpl::class.java)
}
}
}

View File

@ -1,7 +1,7 @@
syntax = "proto3";
import "uuid.proto";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message SubscribeTaskRequest {
string name = 1;

View File

@ -1,6 +1,6 @@
syntax = "proto3";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
import "google/protobuf/empty.proto";
import "uuid.proto";

View File

@ -2,7 +2,7 @@ syntax = "proto3";
import "uuid.proto";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message Producer {
string name = 1;

View File

@ -2,7 +2,7 @@ syntax = "proto3";
import "google/protobuf/empty.proto";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message Property{
oneof value {

View File

@ -4,7 +4,7 @@ import "google/protobuf/timestamp.proto";
import "uuid.proto";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message PublishTask {

View File

@ -3,7 +3,7 @@ import "uuid.proto";
import "google/protobuf/timestamp.proto";
import "property.proto";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message ReadyRequest {
int32 number_of_concurrent = 1;

View File

@ -3,7 +3,7 @@ import "uuid.proto";
import "google/protobuf/empty.proto";
import "property.proto";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message TaskResult {
UUID id = 1;

View File

@ -2,7 +2,7 @@ syntax = "proto3";
import "uuid.proto";
import "task_result.proto";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message TaskResults {
string name = 1;

View File

@ -1,6 +1,6 @@
syntax = "proto3";
option java_package = "dev.usbharu.owl";
option java_package = "dev.usbharu.owl.generated";
message UUID {
uint64 most_significant_uuid_bits = 1;

View File

@ -18,4 +18,4 @@ package dev.usbharu
fun main() {
println("Hello World!")
}
}

View File

@ -45,6 +45,5 @@ class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : Propert
Class.forName(string.substringAfter("jackson:").substringBefore(":"))
)
)
}
}
}

View File

@ -23,4 +23,7 @@ val Class<*>.allFields: List<Field>
superclass.allFields + declaredFields
} else {
declaredFields.toList()
}.map { it.trySetAccessible();it }
}.map {
it.trySetAccessible()
it
}

View File

@ -15,19 +15,12 @@ class BooleanPropertyValue(override val value: Boolean) : PropertyValue<Boolean>
*
*/
class BooleanPropertySerializer : PropertySerializer<Boolean> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
return propertyValue.value is Boolean
}
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Boolean
override fun isSupported(string: String): Boolean {
return string.startsWith("bool:")
}
override fun isSupported(string: String): Boolean = string.startsWith("bool:")
override fun serialize(propertyValue: PropertyValue<*>): String {
return "bool:" + propertyValue.value.toString()
}
override fun serialize(propertyValue: PropertyValue<*>): String = "bool:" + propertyValue.value.toString()
override fun deserialize(string: String): PropertyValue<Boolean> {
return BooleanPropertyValue(string.replace("bool:", "").toBoolean())
}
}
override fun deserialize(string: String): PropertyValue<Boolean> =
BooleanPropertyValue(string.replace("bool:", "").toBoolean())
}

View File

@ -23,12 +23,9 @@ package dev.usbharu.owl.common.property
*/
open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) :
PropertySerializerFactory {
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> {
return propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer<T>?
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> =
propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer<T>?
?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue")
}
override fun factory(string: String): PropertySerializer<*> {
return propertySerializers.first { it.isSupported(string) }
}
}
override fun factory(string: String): PropertySerializer<*> = propertySerializers.first { it.isSupported(string) }
}

View File

@ -15,19 +15,12 @@ class DoublePropertyValue(override val value: Double) : PropertyValue<Double>()
*
*/
class DoublePropertySerializer : PropertySerializer<Double> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
return propertyValue.value is Double
}
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Double
override fun isSupported(string: String): Boolean {
return string.startsWith("double:")
}
override fun isSupported(string: String): Boolean = string.startsWith("double:")
override fun serialize(propertyValue: PropertyValue<*>): String {
return "double:" + propertyValue.value.toString()
}
override fun serialize(propertyValue: PropertyValue<*>): String = "double:" + propertyValue.value.toString()
override fun deserialize(string: String): PropertyValue<Double> {
return DoublePropertyValue(string.replace("double:", "").toDouble())
}
}
override fun deserialize(string: String): PropertyValue<Double> =
DoublePropertyValue(string.replace("double:", "").toDouble())
}

View File

@ -26,19 +26,12 @@ class FloatPropertyValue(override val value: Float) : PropertyValue<Float>() {
*
*/
class FloatPropertySerializer : PropertySerializer<Float> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
return propertyValue.value is Float
}
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Float
override fun isSupported(string: String): Boolean {
return string.startsWith("float:")
}
override fun isSupported(string: String): Boolean = string.startsWith("float:")
override fun serialize(propertyValue: PropertyValue<*>): String {
return "float:" + propertyValue.value.toString()
}
override fun serialize(propertyValue: PropertyValue<*>): String = "float:" + propertyValue.value.toString()
override fun deserialize(string: String): PropertyValue<Float> {
return FloatPropertyValue(string.replace("float:", "").toFloat())
}
}
override fun deserialize(string: String): PropertyValue<Float> =
FloatPropertyValue(string.replace("float:", "").toFloat())
}

View File

@ -31,19 +31,12 @@ class IntegerPropertyValue(override val value: Int) : PropertyValue<Int>() {
*
*/
class IntegerPropertySerializer : PropertySerializer<Int> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
return propertyValue.value is Int
}
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Int
override fun isSupported(string: String): Boolean {
return string.startsWith("int32:")
}
override fun isSupported(string: String): Boolean = string.startsWith("int32:")
override fun serialize(propertyValue: PropertyValue<*>): String {
return "int32:" + propertyValue.value.toString()
}
override fun serialize(propertyValue: PropertyValue<*>): String = "int32:" + propertyValue.value.toString()
override fun deserialize(string: String): PropertyValue<Int> {
return IntegerPropertyValue(string.replace("int32:", "").toInt())
}
}
override fun deserialize(string: String): PropertyValue<Int> =
IntegerPropertyValue(string.replace("int32:", "").toInt())
}

View File

@ -27,19 +27,12 @@ class LongPropertyValue(override val value: Long) : PropertyValue<Long>() {
*
*/
class LongPropertySerializer : PropertySerializer<Long> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
return propertyValue.value is Long
}
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Long
override fun isSupported(string: String): Boolean {
return string.startsWith("int64:")
}
override fun isSupported(string: String): Boolean = string.startsWith("int64:")
override fun serialize(propertyValue: PropertyValue<*>): String {
return "int64:" + propertyValue.value.toString()
}
override fun serialize(propertyValue: PropertyValue<*>): String = "int64:" + propertyValue.value.toString()
override fun deserialize(string: String): PropertyValue<Long> {
return LongPropertyValue(string.replace("int64:", "").toLong())
}
}
override fun deserialize(string: String): PropertyValue<Long> =
LongPropertyValue(string.replace("int64:", "").toLong())
}

View File

@ -27,4 +27,4 @@ class PropertySerializeException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -59,4 +59,4 @@ object PropertySerializeUtils {
}
}.toMap()
}
}
}

View File

@ -53,4 +53,4 @@ interface PropertySerializer<T> {
* @return デシリアライズされた[PropertyValue]
*/
fun deserialize(string: String): PropertyValue<T>
}
}

View File

@ -37,4 +37,4 @@ interface PropertySerializerFactory {
* @return 作成されたシリアライザー
*/
fun factory(string: String): PropertySerializer<*>
}
}

View File

@ -38,4 +38,4 @@ enum class PropertyType {
*
*/
binary
}
}

View File

@ -31,9 +31,5 @@ abstract class PropertyValue<T> {
* プロパティの型
*/
abstract val type: PropertyType
override fun toString(): String {
return "PropertyValue(value=$value, type=$type)"
}
}
override fun toString(): String = "PropertyValue(value=$value, type=$type)"
}

View File

@ -15,19 +15,11 @@ class StringPropertyValue(override val value: String) : PropertyValue<String>()
*
*/
class StringPropertyValueSerializer : PropertySerializer<String> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
return propertyValue.value is String
}
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is String
override fun isSupported(string: String): Boolean {
return string.startsWith("str:")
}
override fun isSupported(string: String): Boolean = string.startsWith("str:")
override fun serialize(propertyValue: PropertyValue<*>): String {
return "str:" + propertyValue.value
}
override fun serialize(propertyValue: PropertyValue<*>): String = "str:" + propertyValue.value
override fun deserialize(string: String): PropertyValue<String> {
return StringPropertyValue(string.replace("str:", ""))
}
}
override fun deserialize(string: String): PropertyValue<String> = StringPropertyValue(string.replace("str:", ""))
}

View File

@ -13,5 +13,4 @@ import kotlin.math.roundToLong
class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy {
override fun nextRetry(now: Instant, attempt: Int): Instant =
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds)
}
}

View File

@ -33,4 +33,4 @@ interface RetryPolicy {
* @return 次のリトライ時刻
*/
fun nextRetry(now: Instant, attempt: Int): Instant
}
}

View File

@ -16,7 +16,6 @@
package dev.usbharu.owl.common.retry
import org.slf4j.LoggerFactory
interface RetryPolicyFactory {
@ -24,9 +23,7 @@ interface RetryPolicyFactory {
}
class DefaultRetryPolicyFactory(private val map: Map<String, RetryPolicy>) : RetryPolicyFactory {
override fun factory(name: String): RetryPolicy {
return map[name] ?: throwException(name)
}
override fun factory(name: String): RetryPolicy = map[name] ?: throwException(name)
private fun throwException(name: String): Nothing {
logger.warn("RetryPolicy not found. name: {}", name)
@ -36,4 +33,4 @@ class DefaultRetryPolicyFactory(private val map: Map<String, RetryPolicy>) : Ret
companion object {
private val logger = LoggerFactory.getLogger(RetryPolicyFactory::class.java)
}
}
}

View File

@ -27,4 +27,4 @@ class RetryPolicyNotFoundException : RuntimeException {
enableSuppression,
writableStackTrace
)
}
}

View File

@ -36,6 +36,4 @@ class PropertyDefinition(val map: Map<String, PropertyType>) : Map<String, Prope
map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 }
return hash
}
}

View File

@ -31,4 +31,4 @@ data class PublishedTask<T : Task>(
val task: T,
val id: UUID,
val published: Instant
)
)

View File

@ -20,4 +20,4 @@ package dev.usbharu.owl.common.task
* タスク
*
*/
open class Task
open class Task

View File

@ -107,7 +107,6 @@ interface TaskDefinition<T : Task> {
* @return デシリアライズされたタスク
*/
fun deserialize(value: Map<String, PropertyValue<*>>): T {
val task = try {
type.getDeclaredConstructor().newInstance()
} catch (e: Exception) {
@ -127,4 +126,4 @@ interface TaskDefinition<T : Task> {
return task
}
}
}

View File

@ -33,7 +33,7 @@ protobuf {
artifact = libs.protoc.gen.grpc.java.get().toString()
}
create("grpckt") {
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + "jdk8@jar"
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar"
}
}
generateProtoTasks {

View File

@ -29,5 +29,4 @@ abstract class AbstractTaskRunner<T : Task, D : TaskDefinition<T>>(private val t
}
abstract suspend fun typedRun(typedParam: T, taskRequest: TaskRequest): TaskResult
}
}

View File

@ -16,10 +16,10 @@
package dev.usbharu.owl.consumer
import dev.usbharu.owl.*
import dev.usbharu.owl.Uuid.UUID
import dev.usbharu.owl.common.property.PropertySerializeUtils
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.generated.*
import dev.usbharu.owl.generated.Uuid.UUID
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.slf4j.LoggerFactory
@ -66,11 +66,13 @@ class Consumer(
suspend fun init(name: String, hostname: String) {
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
logger.debug("Registered Tasks: {}", runnerMap.keys)
consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest {
this.name = name
this.hostname = hostname
this.tasks.addAll(runnerMap.keys)
}).id
consumerId = subscribeTaskStub.subscribeTask(
subscribeTaskRequest {
this.name = name
this.hostname = hostname
this.tasks.addAll(runnerMap.keys)
}
).id
logger.info("Success initialize consumer. ConsumerID: {}", consumerId)
}
@ -84,78 +86,92 @@ class Consumer(
while (isActive) {
try {
taskResultStub
.tasKResult(flow {
assignmentTaskStub
.ready(flow {
requestTask()
}).onEach {
logger.info("Start Task name: {} id: {}", it.name, it.id)
processing.update { it + 1 }
.tasKResult(
flow {
assignmentTaskStub
.ready(
flow {
requestTask()
}
).onEach {
logger.info("Start Task name: {} id: {}", it.name, it.id)
processing.update { it + 1 }
try {
val taskResult = runnerMap.getValue(it.name).run(
TaskRequest(
try {
val taskResult = runnerMap.getValue(it.name).run(
TaskRequest(
it.name,
java.util.UUID(
it.id.mostSignificantUuidBits,
it.id.leastSignificantUuidBits
),
it.attempt,
Instant.ofEpochSecond(
it.queuedAt.seconds,
it.queuedAt.nanos.toLong()
),
PropertySerializeUtils.deserialize(
propertySerializerFactory,
it.propertiesMap
)
)
)
emit(
taskResult {
this.success = taskResult.success
this.attempt = it.attempt
this.id = it.id
this.result.putAll(
PropertySerializeUtils.serialize(
propertySerializerFactory,
taskResult.result
)
)
this.message = taskResult.message
}
)
logger.info(
"Success execute task. name: {} success: {}",
it.name,
java.util.UUID(
it.id.mostSignificantUuidBits,
it.id.leastSignificantUuidBits
),
it.attempt,
Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()),
PropertySerializeUtils.deserialize(
propertySerializerFactory,
it.propertiesMap
)
taskResult.success
)
)
emit(taskResult {
this.success = taskResult.success
this.attempt = it.attempt
this.id = it.id
this.result.putAll(
PropertySerializeUtils.serialize(
propertySerializerFactory, taskResult.result
)
logger.debug("TRACE RESULT {}", taskResult)
} catch (e: CancellationException) {
logger.warn("Cancelled execute task.", e)
emit(
taskResult {
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
}
)
this.message = taskResult.message
})
logger.info(
"Success execute task. name: {} success: {}",
it.name,
taskResult.success
)
logger.debug("TRACE RESULT {}", taskResult)
} catch (e: CancellationException) {
logger.warn("Cancelled execute task.", e)
emit(taskResult {
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
})
throw e
} catch (e: Exception) {
logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e)
emit(taskResult {
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
})
} finally {
logger.debug(" Task name: {} id: {}", it.name, it.id)
processing.update { it - 1 }
concurrent.update {
if (it < 64) {
it + 1
} else {
64
throw e
} catch (e: Exception) {
logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e)
emit(
taskResult {
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
}
)
} finally {
logger.debug(" Task name: {} id: {}", it.name, it.id)
processing.update { it - 1 }
concurrent.update {
if (it < 64) {
it + 1
} else {
64
}
}
}
}
}.flowOn(Dispatchers.Default).collect()
})
}.flowOn(Dispatchers.Default).collect()
}
)
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
@ -171,14 +187,15 @@ class Consumer(
while (coroutineScope.isActive) {
val andSet = concurrent.getAndUpdate { 0 }
if (andSet != 0) {
logger.debug("Request {} tasks.", andSet)
try {
emit(readyRequest {
this.consumerId = this@Consumer.consumerId
this.numberOfConcurrent = andSet
})
emit(
readyRequest {
this.consumerId = this@Consumer.consumerId
this.numberOfConcurrent = andSet
}
)
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
@ -206,4 +223,4 @@ class Consumer(
companion object {
private val logger = LoggerFactory.getLogger(Consumer::class.java)
}
}
}

View File

@ -25,5 +25,4 @@ fun main() {
standaloneConsumer.init()
standaloneConsumer.start()
}
}
}

View File

@ -26,4 +26,4 @@ class ServiceLoaderTaskRunnerLoader : TaskRunnerLoader {
override fun load(): Map<String, TaskRunner> {
return taskRunnerMap
}
}
}

View File

@ -16,11 +16,11 @@
package dev.usbharu.owl.consumer
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
import dev.usbharu.owl.SubscribeTaskServiceGrpcKt
import dev.usbharu.owl.TaskResultServiceGrpcKt
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
import dev.usbharu.owl.common.property.PropertySerializerFactory
import dev.usbharu.owl.generated.AssignmentTaskServiceGrpcKt
import dev.usbharu.owl.generated.SubscribeTaskServiceGrpcKt
import dev.usbharu.owl.generated.TaskResultServiceGrpcKt
import io.grpc.ManagedChannelBuilder
import java.nio.file.Path
@ -90,9 +90,11 @@ class StandaloneConsumer(
*/
suspend fun start() {
consumer.start()
Runtime.getRuntime().addShutdownHook(Thread {
consumer.stop()
})
Runtime.getRuntime().addShutdownHook(
Thread {
consumer.stop()
}
)
}
/**
@ -102,5 +104,4 @@ class StandaloneConsumer(
fun stop() {
consumer.stop()
}
}
}

View File

@ -43,4 +43,4 @@ object StandaloneConsumerConfigLoader {
return StandaloneConsumerConfig(address, port, name, hostname, concurrency)
}
}
}

View File

@ -30,9 +30,9 @@ import java.util.*
* @property properties タスクに渡されたパラメータ
*/
data class TaskRequest(
val name:String,
val id:UUID,
val attempt:Int,
val name: String,
val id: UUID,
val attempt: Int,
val queuedAt: Instant,
val properties:Map<String,PropertyValue<*>>
val properties: Map<String, PropertyValue<*>>
)

View File

@ -35,4 +35,4 @@ data class TaskResult(
return TaskResult(true, result, "")
}
}
}
}

View File

@ -33,4 +33,4 @@ interface TaskRunner {
* @return タスク実行結果
*/
suspend fun run(taskRequest: TaskRequest): TaskResult
}
}

View File

@ -18,4 +18,4 @@ package dev.usbharu.owl.consumer
interface TaskRunnerLoader {
fun load(): Map<String, TaskRunner>
}
}

Some files were not shown because too many files have changed in this diff Show More