mirror of https://github.com/usbharu/Hideout.git
Compare commits
No commits in common. "80918e1c2e40baba0932f540759fbe0f1d18b8f3" and "acb9efdfef622e79288b56458603190e79e03bb3" have entirely different histories.
80918e1c2e
...
acb9efdfef
|
@ -27,7 +27,7 @@ jobs:
|
||||||
core: ${{ steps.filter.outputs.core }}
|
core: ${{ steps.filter.outputs.core }}
|
||||||
mastodon: ${{ steps.filter.outputs.mastodon }}
|
mastodon: ${{ steps.filter.outputs.mastodon }}
|
||||||
activitypub: ${{ steps.filter.outputs.ap }}
|
activitypub: ${{ steps.filter.outputs.ap }}
|
||||||
owl: ${{ steps.filter.outputs.owl }}
|
owl: $${{ steps.filter.outputs.owl }}
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
|
@ -166,8 +166,7 @@ jobs:
|
||||||
gradle-home-cache-cleanup: true
|
gradle-home-cache-cleanup: true
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
working-directory: owl
|
run: ./owl/gradlew :owl:classes --no-daemon
|
||||||
run: ./gradlew :classes --no-daemon
|
|
||||||
|
|
||||||
hideout-core-unit-test:
|
hideout-core-unit-test:
|
||||||
needs:
|
needs:
|
||||||
|
@ -312,8 +311,7 @@ jobs:
|
||||||
gradle-home-cache-cleanup: true
|
gradle-home-cache-cleanup: true
|
||||||
|
|
||||||
- name: Unit Test
|
- name: Unit Test
|
||||||
working-directory: owl
|
run: ./owl/gradlew :owl:koverXmlReport
|
||||||
run: ./gradlew :koverXmlReport --rerun-tasks
|
|
||||||
|
|
||||||
- name: JUnit Test Report
|
- name: JUnit Test Report
|
||||||
uses: mikepenz/action-junit-report@v4
|
uses: mikepenz/action-junit-report@v4
|
||||||
|
@ -391,8 +389,7 @@ jobs:
|
||||||
|
|
||||||
- name: owl Lint
|
- name: owl Lint
|
||||||
if: always()
|
if: always()
|
||||||
working-directory: owl
|
run: ./owl/gradlew :owl:detektMain
|
||||||
run: ./gradlew :detektMain
|
|
||||||
|
|
||||||
- name: Auto Commit
|
- name: Auto Commit
|
||||||
if: ${{ always() }}
|
if: ${{ always() }}
|
||||||
|
|
|
@ -52,7 +52,6 @@ repositories {
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation("dev.usbharu:hideout-core:0.0.1")
|
implementation("dev.usbharu:hideout-core:0.0.1")
|
||||||
implementation("dev.usbharu:hideout-mastodon:1.0-SNAPSHOT")
|
implementation("dev.usbharu:hideout-mastodon:1.0-SNAPSHOT")
|
||||||
implementation("dev.usbharu:hideout-activitypub:1.0-SNAPSHOT")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks.register("run") {
|
tasks.register("run") {
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
import kotlinx.kover.gradle.plugin.dsl.CoverageUnit
|
|
||||||
|
|
||||||
plugins {
|
plugins {
|
||||||
alias(libs.plugins.kotlin.jvm)
|
kotlin("jvm") version "1.9.25"
|
||||||
alias(libs.plugins.detekt)
|
|
||||||
alias(libs.plugins.kover)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
group = "dev.usbharu"
|
group = "dev.usbharu"
|
||||||
|
@ -15,7 +11,6 @@ repositories {
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
testImplementation(kotlin("test"))
|
testImplementation(kotlin("test"))
|
||||||
detektPlugins(libs.detekt.formatting)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks.test {
|
tasks.test {
|
||||||
|
@ -24,92 +19,3 @@ tasks.test {
|
||||||
kotlin {
|
kotlin {
|
||||||
jvmToolchain(21)
|
jvmToolchain(21)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
configurations {
|
|
||||||
matching { it.name == "detekt" }.all {
|
|
||||||
resolutionStrategy.eachDependency {
|
|
||||||
if (requested.group == "org.jetbrains.kotlin") {
|
|
||||||
useVersion(io.gitlab.arturbosch.detekt.getSupportedKotlinVersion())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
all {
|
|
||||||
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks {
|
|
||||||
withType<io.gitlab.arturbosch.detekt.Detekt> {
|
|
||||||
exclude("**/generated/**")
|
|
||||||
setSource("src/main/kotlin")
|
|
||||||
exclude("build/")
|
|
||||||
configureEach {
|
|
||||||
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
withType<io.gitlab.arturbosch.detekt.DetektCreateBaselineTask>() {
|
|
||||||
configureEach {
|
|
||||||
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
withType<Test> {
|
|
||||||
useJUnitPlatform()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
project.gradle.taskGraph.whenReady {
|
|
||||||
if (this.hasTask(":koverGenerateArtifact")) {
|
|
||||||
val task = this.allTasks.find { it.name == "test" }
|
|
||||||
val verificationTask = task as VerificationTask
|
|
||||||
verificationTask.ignoreFailures = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
detekt {
|
|
||||||
parallel = true
|
|
||||||
config.setFrom(files("../detekt.yml"))
|
|
||||||
buildUponDefaultConfig = true
|
|
||||||
basePath = "${rootDir.absolutePath}/src/main/kotlin"
|
|
||||||
autoCorrect = true
|
|
||||||
}
|
|
||||||
|
|
||||||
kover {
|
|
||||||
currentProject {
|
|
||||||
sources {
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
reports {
|
|
||||||
verify {
|
|
||||||
rule {
|
|
||||||
bound {
|
|
||||||
minValue = 50
|
|
||||||
coverageUnits = CoverageUnit.INSTRUCTION
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
total {
|
|
||||||
xml {
|
|
||||||
title = "Hideout ActivityPub"
|
|
||||||
xmlFile = file("$buildDir/reports/kover/hideout-activitypub.xml")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
filters {
|
|
||||||
excludes {
|
|
||||||
annotatedBy("org.springframework.context.annotation.Configuration")
|
|
||||||
annotatedBy("org.springframework.boot.context.properties.ConfigurationProperties")
|
|
||||||
packages(
|
|
||||||
"dev.usbharu.hideout.controller.mastodon.generated",
|
|
||||||
"dev.usbharu.hideout.domain.mastodon.model.generated"
|
|
||||||
)
|
|
||||||
packages("org.springframework")
|
|
||||||
packages("org.jetbrains")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,14 +3,3 @@ plugins {
|
||||||
}
|
}
|
||||||
rootProject.name = "hideout-activitypub"
|
rootProject.name = "hideout-activitypub"
|
||||||
|
|
||||||
dependencyResolutionManagement {
|
|
||||||
repositories {
|
|
||||||
mavenCentral()
|
|
||||||
}
|
|
||||||
|
|
||||||
versionCatalogs {
|
|
||||||
create("libs") {
|
|
||||||
from(files("../libs.versions.toml"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,10 +1,6 @@
|
||||||
import kotlinx.kover.gradle.plugin.dsl.CoverageUnit
|
|
||||||
|
|
||||||
plugins {
|
plugins {
|
||||||
alias(libs.plugins.kotlin.jvm)
|
alias(libs.plugins.kotlin.jvm)
|
||||||
id("maven-publish")
|
id("maven-publish")
|
||||||
alias(libs.plugins.kover)
|
|
||||||
alias(libs.plugins.detekt)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -31,8 +27,6 @@ subprojects {
|
||||||
apply {
|
apply {
|
||||||
plugin("org.jetbrains.kotlin.jvm")
|
plugin("org.jetbrains.kotlin.jvm")
|
||||||
plugin("maven-publish")
|
plugin("maven-publish")
|
||||||
plugin(rootProject.libs.plugins.kover.get().pluginId)
|
|
||||||
plugin(rootProject.libs.plugins.detekt.get().pluginId)
|
|
||||||
}
|
}
|
||||||
kotlin {
|
kotlin {
|
||||||
jvmToolchain(21)
|
jvmToolchain(21)
|
||||||
|
@ -41,42 +35,12 @@ subprojects {
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation("org.slf4j:slf4j-api:2.0.15")
|
implementation("org.slf4j:slf4j-api:2.0.15")
|
||||||
testImplementation("org.junit.jupiter:junit-jupiter:5.10.3")
|
testImplementation("org.junit.jupiter:junit-jupiter:5.10.3")
|
||||||
detektPlugins(rootProject.libs.detekt.formatting)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
detekt {
|
tasks.test {
|
||||||
parallel = true
|
useJUnitPlatform()
|
||||||
config.setFrom(files("$rootDir/../detekt.yml"))
|
|
||||||
buildUponDefaultConfig = true
|
|
||||||
basePath = "${projectDir}/src/main/kotlin"
|
|
||||||
autoCorrect = true
|
|
||||||
}
|
|
||||||
|
|
||||||
project.gradle.taskGraph.whenReady {
|
|
||||||
if (this.hasTask(":koverGenerateArtifact")) {
|
|
||||||
val task = this.allTasks.find { it.name == "test" }
|
|
||||||
val verificationTask = task as VerificationTask
|
|
||||||
verificationTask.ignoreFailures = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tasks {
|
|
||||||
withType<io.gitlab.arturbosch.detekt.Detekt> {
|
|
||||||
exclude("**/generated/**")
|
|
||||||
setSource("src/main/kotlin")
|
|
||||||
exclude("build/")
|
|
||||||
configureEach {
|
|
||||||
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
withType<io.gitlab.arturbosch.detekt.DetektCreateBaselineTask>() {
|
|
||||||
configureEach {
|
|
||||||
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
withType<Test> {
|
|
||||||
useJUnitPlatform()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
publishing {
|
publishing {
|
||||||
|
@ -106,62 +70,3 @@ subprojects {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
|
||||||
kover(project(":owl-broker"))
|
|
||||||
kover(project(":owl-broker:owl-broker-mongodb"))
|
|
||||||
kover(project(":owl-common"))
|
|
||||||
kover(project(":owl-common:owl-common-serialize-jackson"))
|
|
||||||
kover(project(":owl-consumer"))
|
|
||||||
kover(project(":owl-producer"))
|
|
||||||
kover(project(":owl-producer:owl-producer-api"))
|
|
||||||
kover(project(":owl-producer:owl-producer-default"))
|
|
||||||
kover(project(":owl-producer:owl-producer-embedded"))
|
|
||||||
}
|
|
||||||
|
|
||||||
detekt {
|
|
||||||
parallel = true
|
|
||||||
config.setFrom(files("../detekt.yml"))
|
|
||||||
buildUponDefaultConfig = true
|
|
||||||
basePath = "${projectDir}/src/main/kotlin"
|
|
||||||
autoCorrect = true
|
|
||||||
}
|
|
||||||
|
|
||||||
project.gradle.taskGraph.whenReady {
|
|
||||||
if (this.hasTask(":koverGenerateArtifact")) {
|
|
||||||
val task = this.allTasks.find { it.name == "test" }
|
|
||||||
val verificationTask = task as VerificationTask
|
|
||||||
verificationTask.ignoreFailures = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
kover {
|
|
||||||
currentProject {
|
|
||||||
sources {
|
|
||||||
excludedSourceSets.addAll("grpc", "grpckt")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
reports {
|
|
||||||
verify {
|
|
||||||
rule {
|
|
||||||
bound {
|
|
||||||
minValue = 50
|
|
||||||
coverageUnits = CoverageUnit.INSTRUCTION
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
total {
|
|
||||||
xml {
|
|
||||||
title = "Owl"
|
|
||||||
xmlFile = file("$buildDir/reports/kover/owl.xml")
|
|
||||||
}
|
|
||||||
filters {
|
|
||||||
excludes {
|
|
||||||
packages("dev.usbharu.owl.generated")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
distributionBase=GRADLE_USER_HOME
|
distributionBase=GRADLE_USER_HOME
|
||||||
distributionPath=wrapper/dists
|
distributionPath=wrapper/dists
|
||||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip
|
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-bin.zip
|
||||||
networkTimeout=10000
|
networkTimeout=10000
|
||||||
validateDistributionUrl=true
|
validateDistributionUrl=true
|
||||||
zipStoreBase=GRADLE_USER_HOME
|
zipStoreBase=GRADLE_USER_HOME
|
||||||
|
|
|
@ -35,7 +35,7 @@ protobuf {
|
||||||
artifact = libs.protoc.gen.grpc.java.get().toString()
|
artifact = libs.protoc.gen.grpc.java.get().toString()
|
||||||
}
|
}
|
||||||
create("grpckt") {
|
create("grpckt") {
|
||||||
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar"
|
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + "jdk8@jar"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
generateProtoTasks {
|
generateProtoTasks {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.koin.dsl.module
|
||||||
|
|
||||||
class MongoModuleContext : ModuleContext {
|
class MongoModuleContext : ModuleContext {
|
||||||
override fun module(): Module {
|
override fun module(): Module {
|
||||||
|
|
||||||
return module {
|
return module {
|
||||||
single {
|
single {
|
||||||
val clientSettings =
|
val clientSettings =
|
||||||
|
@ -46,6 +47,7 @@ class MongoModuleContext : ModuleContext {
|
||||||
)
|
)
|
||||||
.uuidRepresentation(UuidRepresentation.STANDARD).build()
|
.uuidRepresentation(UuidRepresentation.STANDARD).build()
|
||||||
|
|
||||||
|
|
||||||
MongoClient.create(clientSettings)
|
MongoClient.create(clientSettings)
|
||||||
.getDatabase(System.getProperty("owl.broker.mongo.database", "mongo-test"))
|
.getDatabase(System.getProperty("owl.broker.mongo.database", "mongo-test"))
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,11 +33,7 @@ class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository {
|
||||||
|
|
||||||
private val collection = database.getCollection<ConsumerMongodb>("consumers")
|
private val collection = database.getCollection<ConsumerMongodb>("consumers")
|
||||||
override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) {
|
override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) {
|
||||||
collection.replaceOne(
|
collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true))
|
||||||
Filters.eq("_id", consumer.id.toString()),
|
|
||||||
ConsumerMongodb.of(consumer),
|
|
||||||
ReplaceOptions().upsert(true)
|
|
||||||
)
|
|
||||||
return@withContext consumer
|
return@withContext consumer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,19 +49,15 @@ data class ConsumerMongodb(
|
||||||
val name: String,
|
val name: String,
|
||||||
val hostname: String,
|
val hostname: String,
|
||||||
val tasks: List<String>
|
val tasks: List<String>
|
||||||
) {
|
){
|
||||||
|
|
||||||
fun toConsumer(): Consumer {
|
fun toConsumer():Consumer{
|
||||||
return Consumer(
|
return Consumer(
|
||||||
UUID.fromString(id),
|
UUID.fromString(id), name, hostname, tasks
|
||||||
name,
|
|
||||||
hostname,
|
|
||||||
tasks
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
companion object{
|
||||||
companion object {
|
fun of(consumer: Consumer):ConsumerMongodb{
|
||||||
fun of(consumer: Consumer): ConsumerMongodb {
|
|
||||||
return ConsumerMongodb(
|
return ConsumerMongodb(
|
||||||
consumer.id.toString(),
|
consumer.id.toString(),
|
||||||
consumer.name,
|
consumer.name,
|
||||||
|
|
|
@ -48,8 +48,7 @@ class MongodbQueuedTaskRepository(
|
||||||
override suspend fun save(queuedTask: QueuedTask): QueuedTask {
|
override suspend fun save(queuedTask: QueuedTask): QueuedTask {
|
||||||
withContext(Dispatchers.IO) {
|
withContext(Dispatchers.IO) {
|
||||||
collection.replaceOne(
|
collection.replaceOne(
|
||||||
eq("_id", queuedTask.task.id.toString()),
|
eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
|
||||||
QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
|
|
||||||
ReplaceOptions().upsert(true)
|
ReplaceOptions().upsert(true)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -58,6 +57,7 @@ class MongodbQueuedTaskRepository(
|
||||||
|
|
||||||
override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask {
|
override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask {
|
||||||
return withContext(Dispatchers.IO) {
|
return withContext(Dispatchers.IO) {
|
||||||
|
|
||||||
val findOneAndUpdate = collection.findOneAndUpdate(
|
val findOneAndUpdate = collection.findOneAndUpdate(
|
||||||
and(
|
and(
|
||||||
eq("_id", id.toString()),
|
eq("_id", id.toString()),
|
||||||
|
@ -108,7 +108,7 @@ data class QueuedTaskMongodb(
|
||||||
val task: TaskMongodb,
|
val task: TaskMongodb,
|
||||||
val attempt: Int,
|
val attempt: Int,
|
||||||
val queuedAt: Instant,
|
val queuedAt: Instant,
|
||||||
val priority: Int,
|
val priority:Int,
|
||||||
val isActive: Boolean,
|
val isActive: Boolean,
|
||||||
val timeoutAt: Instant?,
|
val timeoutAt: Instant?,
|
||||||
val assignedConsumer: String?,
|
val assignedConsumer: String?,
|
||||||
|
@ -155,14 +155,14 @@ data class QueuedTaskMongodb(
|
||||||
companion object {
|
companion object {
|
||||||
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
|
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
|
||||||
return TaskMongodb(
|
return TaskMongodb(
|
||||||
name = task.name,
|
task.name,
|
||||||
id = task.id.toString(),
|
task.id.toString(),
|
||||||
publishProducerId = task.publishProducerId.toString(),
|
task.publishProducerId.toString(),
|
||||||
publishedAt = task.publishedAt,
|
task.publishedAt,
|
||||||
nextRetry = task.nextRetry,
|
task.nextRetry,
|
||||||
completedAt = task.completedAt,
|
task.completedAt,
|
||||||
attempt = task.attempt,
|
task.attempt,
|
||||||
properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
|
PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -171,15 +171,15 @@ data class QueuedTaskMongodb(
|
||||||
companion object {
|
companion object {
|
||||||
fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb {
|
fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb {
|
||||||
return QueuedTaskMongodb(
|
return QueuedTaskMongodb(
|
||||||
id = queuedTask.task.id.toString(),
|
queuedTask.task.id.toString(),
|
||||||
task = TaskMongodb.of(propertySerializerFactory, queuedTask.task),
|
TaskMongodb.of(propertySerializerFactory, queuedTask.task),
|
||||||
attempt = queuedTask.attempt,
|
queuedTask.attempt,
|
||||||
queuedAt = queuedTask.queuedAt,
|
queuedTask.queuedAt,
|
||||||
priority = queuedTask.priority,
|
queuedTask.priority,
|
||||||
isActive = queuedTask.isActive,
|
queuedTask.isActive,
|
||||||
timeoutAt = queuedTask.timeoutAt,
|
queuedTask.timeoutAt,
|
||||||
assignedConsumer = queuedTask.assignedConsumer?.toString(),
|
queuedTask.assignedConsumer?.toString(),
|
||||||
assignedAt = queuedTask.assignedAt
|
queuedTask.assignedAt
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionR
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) {
|
override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) {
|
||||||
collection.deleteOne(Filters.eq("_id", name))
|
collection.deleteOne(Filters.eq("_id",name))
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) {
|
override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) {
|
||||||
|
|
|
@ -36,29 +36,27 @@ import org.bson.codecs.pojo.annotations.BsonRepresentation
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
|
||||||
class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) :
|
class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) :
|
||||||
TaskRepository {
|
TaskRepository {
|
||||||
|
|
||||||
private val collection = database.getCollection<TaskMongodb>("tasks")
|
private val collection = database.getCollection<TaskMongodb>("tasks")
|
||||||
override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) {
|
override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) {
|
||||||
collection.replaceOne(
|
collection.replaceOne(
|
||||||
Filters.eq("_id", task.id.toString()),
|
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task),
|
||||||
TaskMongodb.of(propertySerializerFactory, task),
|
|
||||||
ReplaceOptions().upsert(true)
|
ReplaceOptions().upsert(true)
|
||||||
)
|
)
|
||||||
return@withContext task
|
return@withContext task
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun saveAll(tasks: List<Task>): Unit = withContext(Dispatchers.IO) {
|
override suspend fun saveAll(tasks: List<Task>): Unit = withContext(Dispatchers.IO) {
|
||||||
collection.bulkWrite(
|
collection.bulkWrite(tasks.map {
|
||||||
tasks.map {
|
ReplaceOneModel(
|
||||||
ReplaceOneModel(
|
Filters.eq(it.id.toString()),
|
||||||
Filters.eq(it.id.toString()),
|
TaskMongodb.of(propertySerializerFactory, it),
|
||||||
TaskMongodb.of(propertySerializerFactory, it),
|
ReplaceOptions().upsert(true)
|
||||||
ReplaceOptions().upsert(true)
|
)
|
||||||
)
|
})
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task> {
|
override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task> {
|
||||||
|
@ -77,13 +75,12 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
|
||||||
|
|
||||||
override suspend fun findByIdAndUpdate(id: UUID, task: Task) {
|
override suspend fun findByIdAndUpdate(id: UUID, task: Task) {
|
||||||
collection.replaceOne(
|
collection.replaceOne(
|
||||||
Filters.eq("_id", task.id.toString()),
|
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task),
|
||||||
TaskMongodb.of(propertySerializerFactory, task),
|
|
||||||
ReplaceOptions().upsert(false)
|
ReplaceOptions().upsert(false)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> {
|
override suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> {
|
||||||
return collection
|
return collection
|
||||||
.find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString()))
|
.find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString()))
|
||||||
.map { it.toTask(propertySerializerFactory) }
|
.map { it.toTask(propertySerializerFactory) }
|
||||||
|
@ -119,14 +116,14 @@ data class TaskMongodb(
|
||||||
companion object {
|
companion object {
|
||||||
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
|
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
|
||||||
return TaskMongodb(
|
return TaskMongodb(
|
||||||
name = task.name,
|
task.name,
|
||||||
id = task.id.toString(),
|
task.id.toString(),
|
||||||
publishProducerId = task.publishProducerId.toString(),
|
task.publishProducerId.toString(),
|
||||||
publishedAt = task.publishedAt,
|
task.publishedAt,
|
||||||
nextRetry = task.nextRetry,
|
task.nextRetry,
|
||||||
completedAt = task.completedAt,
|
task.completedAt,
|
||||||
attempt = task.attempt,
|
task.attempt,
|
||||||
properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
|
PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,17 +41,14 @@ class MongodbTaskResultRepository(
|
||||||
private val collection = database.getCollection<TaskResultMongodb>("task_results")
|
private val collection = database.getCollection<TaskResultMongodb>("task_results")
|
||||||
override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) {
|
override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) {
|
||||||
collection.replaceOne(
|
collection.replaceOne(
|
||||||
Filters.eq(taskResult.id.toString()),
|
Filters.eq(taskResult.id.toString()), TaskResultMongodb.of(propertySerializerFactory, taskResult),
|
||||||
TaskResultMongodb.of(propertySerializerFactory, taskResult),
|
|
||||||
ReplaceOptions().upsert(true)
|
ReplaceOptions().upsert(true)
|
||||||
)
|
)
|
||||||
return@withContext taskResult
|
return@withContext taskResult
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun findByTaskId(id: UUID): Flow<TaskResult> {
|
override fun findByTaskId(id: UUID): Flow<TaskResult> {
|
||||||
return collection.find(
|
return collection.find(Filters.eq(id.toString())).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO)
|
||||||
Filters.eq(id.toString())
|
|
||||||
).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,25 +65,27 @@ data class TaskResultMongodb(
|
||||||
|
|
||||||
fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult {
|
fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult {
|
||||||
return TaskResult(
|
return TaskResult(
|
||||||
id = UUID.fromString(id),
|
UUID.fromString(id),
|
||||||
taskId = UUID.fromString(taskId),
|
UUID.fromString(taskId),
|
||||||
success = success,
|
success,
|
||||||
attempt = attempt,
|
attempt,
|
||||||
result = PropertySerializeUtils.deserialize(propertySerializerFactory, result),
|
PropertySerializeUtils.deserialize(propertySerializerFactory, result),
|
||||||
message = message
|
message
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb {
|
fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb {
|
||||||
return TaskResultMongodb(
|
return TaskResultMongodb(
|
||||||
id = taskResult.id.toString(),
|
taskResult.id.toString(),
|
||||||
taskId = taskResult.taskId.toString(),
|
taskResult.taskId.toString(),
|
||||||
success = taskResult.success,
|
taskResult.success,
|
||||||
attempt = taskResult.attempt,
|
taskResult.attempt,
|
||||||
result = PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result),
|
PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result),
|
||||||
message = taskResult.message
|
taskResult.message
|
||||||
)
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -7,14 +7,12 @@ import dev.usbharu.owl.broker.domain.model.consumer.Consumer
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import org.bson.UuidRepresentation
|
import org.bson.UuidRepresentation
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
import org.junit.jupiter.api.Disabled
|
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
|
||||||
class MongodbConsumerRepositoryTest {
|
class MongodbConsumerRepositoryTest {
|
||||||
@Test
|
@Test
|
||||||
@Disabled
|
|
||||||
fun name() {
|
fun name() {
|
||||||
|
|
||||||
val clientSettings =
|
val clientSettings =
|
||||||
|
|
|
@ -90,6 +90,7 @@ fun main() {
|
||||||
|
|
||||||
logger.info("Use module name: {}", moduleContext)
|
logger.info("Use module name: {}", moduleContext)
|
||||||
|
|
||||||
|
|
||||||
val koin = startKoin {
|
val koin = startKoin {
|
||||||
printLogger()
|
printLogger()
|
||||||
|
|
||||||
|
@ -97,6 +98,7 @@ fun main() {
|
||||||
single<RetryPolicyFactory> {
|
single<RetryPolicyFactory> {
|
||||||
DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy()))
|
DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy()))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
modules(mainModule, module, moduleContext.module())
|
modules(mainModule, module, moduleContext.module())
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,11 @@ package dev.usbharu.owl.broker
|
||||||
import org.koin.core.module.Module
|
import org.koin.core.module.Module
|
||||||
|
|
||||||
interface ModuleContext {
|
interface ModuleContext {
|
||||||
fun module(): Module
|
fun module():Module
|
||||||
}
|
}
|
||||||
|
|
||||||
data object EmptyModuleContext : ModuleContext {
|
data object EmptyModuleContext : ModuleContext {
|
||||||
override fun module(): Module = org.koin.dsl.module { }
|
override fun module(): Module {
|
||||||
|
return org.koin.dsl.module { }
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -38,7 +38,7 @@ class OwlBrokerApplication(
|
||||||
|
|
||||||
private lateinit var server: Server
|
private lateinit var server: Server
|
||||||
|
|
||||||
fun start(port: Int, coroutineScope: CoroutineScope = GlobalScope): Job {
|
fun start(port: Int,coroutineScope: CoroutineScope = GlobalScope):Job {
|
||||||
server = ServerBuilder.forPort(port)
|
server = ServerBuilder.forPort(port)
|
||||||
.addService(assignmentTaskService)
|
.addService(assignmentTaskService)
|
||||||
.addService(definitionTaskService)
|
.addService(definitionTaskService)
|
||||||
|
@ -64,4 +64,5 @@ class OwlBrokerApplication(
|
||||||
fun stop() {
|
fun stop() {
|
||||||
server.shutdown()
|
server.shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -19,7 +19,7 @@ package dev.usbharu.owl.broker.domain.model.consumer
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
interface ConsumerRepository {
|
interface ConsumerRepository {
|
||||||
suspend fun save(consumer: Consumer): Consumer
|
suspend fun save(consumer: Consumer):Consumer
|
||||||
|
|
||||||
suspend fun findById(id: UUID): Consumer?
|
suspend fun findById(id:UUID):Consumer?
|
||||||
}
|
}
|
|
@ -20,9 +20,9 @@ import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
data class Producer(
|
data class Producer(
|
||||||
val id: UUID,
|
val id:UUID,
|
||||||
val name: String,
|
val name:String,
|
||||||
val hostname: String,
|
val hostname:String,
|
||||||
val registeredTask: List<String>,
|
val registeredTask:List<String>,
|
||||||
val createdAt: Instant
|
val createdAt: Instant
|
||||||
)
|
)
|
||||||
|
|
|
@ -17,5 +17,5 @@
|
||||||
package dev.usbharu.owl.broker.domain.model.producer
|
package dev.usbharu.owl.broker.domain.model.producer
|
||||||
|
|
||||||
interface ProducerRepository {
|
interface ProducerRepository {
|
||||||
suspend fun save(producer: Producer): Producer
|
suspend fun save(producer: Producer):Producer
|
||||||
}
|
}
|
|
@ -21,12 +21,12 @@ import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
interface QueuedTaskRepository {
|
interface QueuedTaskRepository {
|
||||||
suspend fun save(queuedTask: QueuedTask): QueuedTask
|
suspend fun save(queuedTask: QueuedTask):QueuedTask
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* トランザクションの代わり
|
* トランザクションの代わり
|
||||||
*/
|
*/
|
||||||
suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask
|
suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask
|
||||||
|
|
||||||
fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>
|
fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>
|
||||||
|
|
||||||
|
|
|
@ -24,11 +24,11 @@ import java.util.*
|
||||||
* @param attempt 失敗を含めて試行した回数
|
* @param attempt 失敗を含めて試行した回数
|
||||||
*/
|
*/
|
||||||
data class Task(
|
data class Task(
|
||||||
val name: String,
|
val name:String,
|
||||||
val id: UUID,
|
val id: UUID,
|
||||||
val publishProducerId: UUID,
|
val publishProducerId:UUID,
|
||||||
val publishedAt: Instant,
|
val publishedAt: Instant,
|
||||||
val nextRetry: Instant,
|
val nextRetry:Instant,
|
||||||
val completedAt: Instant? = null,
|
val completedAt: Instant? = null,
|
||||||
val attempt: Int,
|
val attempt: Int,
|
||||||
val properties: Map<String, PropertyValue<*>>
|
val properties: Map<String, PropertyValue<*>>
|
||||||
|
|
|
@ -21,15 +21,15 @@ import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
interface TaskRepository {
|
interface TaskRepository {
|
||||||
suspend fun save(task: Task): Task
|
suspend fun save(task: Task):Task
|
||||||
|
|
||||||
suspend fun saveAll(tasks: List<Task>)
|
suspend fun saveAll(tasks:List<Task>)
|
||||||
|
|
||||||
fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task>
|
fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp:Instant): Flow<Task>
|
||||||
|
|
||||||
suspend fun findById(uuid: UUID): Task?
|
suspend fun findById(uuid: UUID): Task?
|
||||||
|
|
||||||
suspend fun findByIdAndUpdate(id: UUID, task: Task)
|
suspend fun findByIdAndUpdate(id:UUID,task: Task)
|
||||||
|
|
||||||
fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task>
|
suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId:UUID):Flow<Task>
|
||||||
}
|
}
|
|
@ -22,5 +22,5 @@ data class TaskDefinition(
|
||||||
val maxRetry: Int,
|
val maxRetry: Int,
|
||||||
val timeoutMilli: Long,
|
val timeoutMilli: Long,
|
||||||
val propertyDefinitionHash: Long,
|
val propertyDefinitionHash: Long,
|
||||||
val retryPolicy: String
|
val retryPolicy:String
|
||||||
)
|
)
|
||||||
|
|
|
@ -18,7 +18,7 @@ package dev.usbharu.owl.broker.domain.model.taskdefinition
|
||||||
|
|
||||||
interface TaskDefinitionRepository {
|
interface TaskDefinitionRepository {
|
||||||
suspend fun save(taskDefinition: TaskDefinition): TaskDefinition
|
suspend fun save(taskDefinition: TaskDefinition): TaskDefinition
|
||||||
suspend fun deleteByName(name: String)
|
suspend fun deleteByName(name:String)
|
||||||
|
|
||||||
suspend fun findByName(name: String): TaskDefinition?
|
suspend fun findByName(name:String):TaskDefinition?
|
||||||
}
|
}
|
|
@ -21,7 +21,7 @@ import java.util.*
|
||||||
|
|
||||||
data class TaskResult(
|
data class TaskResult(
|
||||||
val id: UUID,
|
val id: UUID,
|
||||||
val taskId: UUID,
|
val taskId:UUID,
|
||||||
val success: Boolean,
|
val success: Boolean,
|
||||||
val attempt: Int,
|
val attempt: Int,
|
||||||
val result: Map<String, PropertyValue<*>>,
|
val result: Map<String, PropertyValue<*>>,
|
||||||
|
|
|
@ -20,6 +20,6 @@ import kotlinx.coroutines.flow.Flow
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
interface TaskResultRepository {
|
interface TaskResultRepository {
|
||||||
suspend fun save(taskResult: TaskResult): TaskResult
|
suspend fun save(taskResult: TaskResult):TaskResult
|
||||||
fun findByTaskId(id: UUID): Flow<TaskResult>
|
fun findByTaskId(id:UUID): Flow<TaskResult>
|
||||||
}
|
}
|
|
@ -17,7 +17,7 @@
|
||||||
package dev.usbharu.owl.broker.external
|
package dev.usbharu.owl.broker.external
|
||||||
|
|
||||||
import com.google.protobuf.Timestamp
|
import com.google.protobuf.Timestamp
|
||||||
import dev.usbharu.owl.generated.Uuid
|
import dev.usbharu.owl.Uuid
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
@ -32,4 +32,4 @@ fun UUID.toUUID(): Uuid.UUID = Uuid
|
||||||
|
|
||||||
fun Timestamp.toInstant(): Instant = Instant.ofEpochSecond(seconds, nanos.toLong())
|
fun Timestamp.toInstant(): Instant = Instant.ofEpochSecond(seconds, nanos.toLong())
|
||||||
|
|
||||||
fun Instant.toTimestamp(): Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build()
|
fun Instant.toTimestamp():Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build()
|
|
@ -16,13 +16,14 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
|
|
||||||
|
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
|
||||||
|
import dev.usbharu.owl.Task
|
||||||
import dev.usbharu.owl.broker.external.toTimestamp
|
import dev.usbharu.owl.broker.external.toTimestamp
|
||||||
import dev.usbharu.owl.broker.external.toUUID
|
import dev.usbharu.owl.broker.external.toUUID
|
||||||
import dev.usbharu.owl.broker.service.QueuedTaskAssigner
|
import dev.usbharu.owl.broker.service.QueuedTaskAssigner
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.generated.AssignmentTaskServiceGrpcKt
|
|
||||||
import dev.usbharu.owl.generated.Task
|
|
||||||
import io.grpc.Status
|
import io.grpc.Status
|
||||||
import io.grpc.StatusException
|
import io.grpc.StatusException
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
|
|
||||||
class AssignmentTaskService(
|
class AssignmentTaskService(
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
private val queuedTaskAssigner: QueuedTaskAssigner,
|
private val queuedTaskAssigner: QueuedTaskAssigner,
|
||||||
|
@ -40,6 +42,7 @@ class AssignmentTaskService(
|
||||||
AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) {
|
AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) {
|
||||||
|
|
||||||
override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> {
|
override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> {
|
||||||
|
|
||||||
return try {
|
return try {
|
||||||
requests
|
requests
|
||||||
.flatMapMerge {
|
.flatMapMerge {
|
||||||
|
|
|
@ -17,28 +17,25 @@
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
import com.google.protobuf.Empty
|
import com.google.protobuf.Empty
|
||||||
|
import dev.usbharu.owl.DefinitionTask
|
||||||
|
import dev.usbharu.owl.DefinitionTask.TaskDefined
|
||||||
|
import dev.usbharu.owl.DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineImplBase
|
||||||
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition
|
import dev.usbharu.owl.broker.domain.model.taskdefinition.TaskDefinition
|
||||||
import dev.usbharu.owl.broker.service.RegisterTaskService
|
import dev.usbharu.owl.broker.service.RegisterTaskService
|
||||||
import dev.usbharu.owl.generated.DefinitionTask
|
|
||||||
import dev.usbharu.owl.generated.DefinitionTask.TaskDefined
|
|
||||||
import dev.usbharu.owl.generated.DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineImplBase
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
class DefinitionTaskService(
|
class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineContext,private val registerTaskService: RegisterTaskService) :
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
|
||||||
private val registerTaskService: RegisterTaskService
|
|
||||||
) :
|
|
||||||
DefinitionTaskServiceCoroutineImplBase(coroutineContext) {
|
DefinitionTaskServiceCoroutineImplBase(coroutineContext) {
|
||||||
override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined {
|
override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined {
|
||||||
registerTaskService.registerTask(
|
registerTaskService.registerTask(
|
||||||
TaskDefinition(
|
TaskDefinition(
|
||||||
name = request.name,
|
request.name,
|
||||||
priority = request.priority,
|
request.priority,
|
||||||
maxRetry = request.maxRetry,
|
request.maxRetry,
|
||||||
timeoutMilli = request.timeoutMilli,
|
request.timeoutMilli,
|
||||||
propertyDefinitionHash = request.propertyDefinitionHash,
|
request.propertyDefinitionHash,
|
||||||
retryPolicy = request.retryPolicy
|
request.retryPolicy
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return TaskDefined
|
return TaskDefined
|
||||||
|
|
|
@ -16,26 +16,24 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
|
import dev.usbharu.owl.ProducerOuterClass
|
||||||
|
import dev.usbharu.owl.ProducerServiceGrpcKt.ProducerServiceCoroutineImplBase
|
||||||
import dev.usbharu.owl.broker.external.toUUID
|
import dev.usbharu.owl.broker.external.toUUID
|
||||||
import dev.usbharu.owl.broker.service.ProducerService
|
import dev.usbharu.owl.broker.service.ProducerService
|
||||||
import dev.usbharu.owl.broker.service.RegisterProducerRequest
|
import dev.usbharu.owl.broker.service.RegisterProducerRequest
|
||||||
import dev.usbharu.owl.generated.ProducerOuterClass
|
|
||||||
import dev.usbharu.owl.generated.ProducerServiceGrpcKt.ProducerServiceCoroutineImplBase
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
|
|
||||||
class ProducerService(
|
class ProducerService(
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
private val producerService: ProducerService
|
private val producerService: ProducerService
|
||||||
) :
|
) :
|
||||||
ProducerServiceCoroutineImplBase(coroutineContext) {
|
ProducerServiceCoroutineImplBase(coroutineContext) {
|
||||||
override suspend fun registerProducer(
|
override suspend fun registerProducer(request: ProducerOuterClass.Producer): ProducerOuterClass.RegisterProducerResponse {
|
||||||
request: ProducerOuterClass.Producer
|
|
||||||
): ProducerOuterClass.RegisterProducerResponse {
|
|
||||||
val registerProducer = producerService.registerProducer(
|
val registerProducer = producerService.registerProducer(
|
||||||
RegisterProducerRequest(
|
RegisterProducerRequest(
|
||||||
request.name,
|
request.name, request.hostname
|
||||||
request.hostname
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return ProducerOuterClass.RegisterProducerResponse.newBuilder().setId(registerProducer.toUUID()).build()
|
return ProducerOuterClass.RegisterProducerResponse.newBuilder().setId(registerProducer.toUUID()).build()
|
||||||
|
|
|
@ -16,11 +16,11 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
|
import dev.usbharu.owl.Consumer
|
||||||
|
import dev.usbharu.owl.SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineImplBase
|
||||||
import dev.usbharu.owl.broker.external.toUUID
|
import dev.usbharu.owl.broker.external.toUUID
|
||||||
import dev.usbharu.owl.broker.service.ConsumerService
|
import dev.usbharu.owl.broker.service.ConsumerService
|
||||||
import dev.usbharu.owl.broker.service.RegisterConsumerRequest
|
import dev.usbharu.owl.broker.service.RegisterConsumerRequest
|
||||||
import dev.usbharu.owl.generated.Consumer
|
|
||||||
import dev.usbharu.owl.generated.SubscribeTaskServiceGrpcKt.SubscribeTaskServiceCoroutineImplBase
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
|
|
|
@ -16,15 +16,15 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
|
import dev.usbharu.owl.PublishTaskOuterClass
|
||||||
|
import dev.usbharu.owl.PublishTaskOuterClass.PublishedTask
|
||||||
|
import dev.usbharu.owl.PublishTaskOuterClass.PublishedTasks
|
||||||
|
import dev.usbharu.owl.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase
|
||||||
import dev.usbharu.owl.broker.external.toUUID
|
import dev.usbharu.owl.broker.external.toUUID
|
||||||
import dev.usbharu.owl.broker.service.PublishTask
|
import dev.usbharu.owl.broker.service.PublishTask
|
||||||
import dev.usbharu.owl.broker.service.TaskPublishService
|
import dev.usbharu.owl.broker.service.TaskPublishService
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.generated.PublishTaskOuterClass
|
|
||||||
import dev.usbharu.owl.generated.PublishTaskOuterClass.PublishedTask
|
|
||||||
import dev.usbharu.owl.generated.PublishTaskOuterClass.PublishedTasks
|
|
||||||
import dev.usbharu.owl.generated.TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineImplBase
|
|
||||||
import io.grpc.Status
|
import io.grpc.Status
|
||||||
import io.grpc.StatusException
|
import io.grpc.StatusException
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
@ -39,9 +39,13 @@ class TaskPublishService(
|
||||||
TaskPublishServiceCoroutineImplBase(coroutineContext) {
|
TaskPublishServiceCoroutineImplBase(coroutineContext) {
|
||||||
|
|
||||||
override suspend fun publishTask(request: PublishTaskOuterClass.PublishTask): PublishedTask {
|
override suspend fun publishTask(request: PublishTaskOuterClass.PublishTask): PublishedTask {
|
||||||
|
|
||||||
logger.warn("aaaaaaaaaaa")
|
logger.warn("aaaaaaaaaaa")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
return try {
|
return try {
|
||||||
|
|
||||||
val publishedTask = taskPublishService.publishTask(
|
val publishedTask = taskPublishService.publishTask(
|
||||||
PublishTask(
|
PublishTask(
|
||||||
request.name,
|
request.name,
|
||||||
|
@ -57,6 +61,7 @@ class TaskPublishService(
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishedTasks {
|
override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishedTasks {
|
||||||
|
|
||||||
val tasks = request.propertiesArrayList.map {
|
val tasks = request.propertiesArrayList.map {
|
||||||
PublishTask(
|
PublishTask(
|
||||||
request.name,
|
request.name,
|
||||||
|
|
|
@ -17,13 +17,13 @@
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
import com.google.protobuf.Empty
|
import com.google.protobuf.Empty
|
||||||
|
import dev.usbharu.owl.TaskResultOuterClass
|
||||||
|
import dev.usbharu.owl.TaskResultServiceGrpcKt
|
||||||
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
|
import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
|
||||||
import dev.usbharu.owl.broker.external.toUUID
|
import dev.usbharu.owl.broker.external.toUUID
|
||||||
import dev.usbharu.owl.broker.service.TaskManagementService
|
import dev.usbharu.owl.broker.service.TaskManagementService
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.generated.TaskResultOuterClass
|
|
||||||
import dev.usbharu.owl.generated.TaskResultServiceGrpcKt
|
|
||||||
import io.grpc.Status
|
import io.grpc.Status
|
||||||
import io.grpc.StatusException
|
import io.grpc.StatusException
|
||||||
import kotlinx.coroutines.CancellationException
|
import kotlinx.coroutines.CancellationException
|
||||||
|
|
|
@ -16,11 +16,11 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.broker.interfaces.grpc
|
package dev.usbharu.owl.broker.interfaces.grpc
|
||||||
|
|
||||||
|
import dev.usbharu.owl.*
|
||||||
import dev.usbharu.owl.broker.external.toUUID
|
import dev.usbharu.owl.broker.external.toUUID
|
||||||
import dev.usbharu.owl.broker.service.TaskManagementService
|
import dev.usbharu.owl.broker.service.TaskManagementService
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.generated.*
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
@ -41,17 +41,15 @@ class TaskResultSubscribeService(
|
||||||
name = it.name
|
name = it.name
|
||||||
attempt = it.attempt
|
attempt = it.attempt
|
||||||
success = it.success
|
success = it.success
|
||||||
results.addAll(
|
results.addAll(it.results.map {
|
||||||
it.results.map {
|
taskResult {
|
||||||
taskResult {
|
id = it.taskId.toUUID()
|
||||||
id = it.taskId.toUUID()
|
success = it.success
|
||||||
success = it.success
|
attempt = it.attempt
|
||||||
attempt = it.attempt
|
result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result))
|
||||||
result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result))
|
message = it.message
|
||||||
message = it.message
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
)
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,5 +42,7 @@ class AssignQueuedTaskDeciderImpl(
|
||||||
).take(numberOfConcurrent)
|
).take(numberOfConcurrent)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -26,8 +26,10 @@ interface ProducerService {
|
||||||
suspend fun registerProducer(producer: RegisterProducerRequest): UUID
|
suspend fun registerProducer(producer: RegisterProducerRequest): UUID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService {
|
class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService {
|
||||||
override suspend fun registerProducer(producer: RegisterProducerRequest): UUID {
|
override suspend fun registerProducer(producer: RegisterProducerRequest): UUID {
|
||||||
|
|
||||||
val id = UUID.randomUUID()
|
val id = UUID.randomUUID()
|
||||||
|
|
||||||
val saveProducer = Producer(
|
val saveProducer = Producer(
|
||||||
|
|
|
@ -29,14 +29,19 @@ interface QueueScanner {
|
||||||
fun startScan(): Flow<QueuedTask>
|
fun startScan(): Flow<QueuedTask>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner {
|
class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner {
|
||||||
override fun startScan(): Flow<QueuedTask> = flow {
|
override fun startScan(): Flow<QueuedTask> {
|
||||||
while (currentCoroutineContext().isActive) {
|
return flow {
|
||||||
emitAll(scanQueue())
|
while (currentCoroutineContext().isActive) {
|
||||||
delay(1000)
|
emitAll(scanQueue())
|
||||||
|
delay(1000)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun scanQueue(): Flow<QueuedTask> =
|
private fun scanQueue(): Flow<QueuedTask> {
|
||||||
queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10))
|
return queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -32,24 +32,33 @@ interface QueueStore {
|
||||||
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
|
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore {
|
class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore {
|
||||||
override suspend fun enqueue(queuedTask: QueuedTask) {
|
override suspend fun enqueue(queuedTask: QueuedTask) {
|
||||||
queuedTaskRepository.save(queuedTask)
|
queuedTaskRepository.save(queuedTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun enqueueAll(queuedTaskList: List<QueuedTask>) = queuedTaskList.forEach { enqueue(it) }
|
override suspend fun enqueueAll(queuedTaskList: List<QueuedTask>) {
|
||||||
|
queuedTaskList.forEach { enqueue(it) }
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun dequeue(queuedTask: QueuedTask) {
|
override suspend fun dequeue(queuedTask: QueuedTask) {
|
||||||
queuedTaskRepository.findByTaskIdAndAssignedConsumerIsNullAndUpdate(queuedTask.task.id, queuedTask)
|
queuedTaskRepository.findByTaskIdAndAssignedConsumerIsNullAndUpdate(queuedTask.task.id, queuedTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun dequeueAll(queuedTaskList: List<QueuedTask>) = queuedTaskList.forEach { dequeue(it) }
|
override suspend fun dequeueAll(queuedTaskList: List<QueuedTask>) {
|
||||||
|
return queuedTaskList.forEach { dequeue(it) }
|
||||||
|
}
|
||||||
|
|
||||||
override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(
|
override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(
|
||||||
tasks: List<String>,
|
tasks: List<String>,
|
||||||
limit: Int
|
limit: Int
|
||||||
): Flow<QueuedTask> = queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit)
|
): Flow<QueuedTask> {
|
||||||
|
return queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> {
|
||||||
|
return queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant)
|
||||||
|
}
|
||||||
|
|
||||||
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> =
|
|
||||||
queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant)
|
|
||||||
}
|
}
|
|
@ -27,6 +27,7 @@ interface QueuedTaskAssigner {
|
||||||
fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
|
fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class QueuedTaskAssignerImpl(
|
class QueuedTaskAssignerImpl(
|
||||||
private val taskManagementService: TaskManagementService,
|
private val taskManagementService: TaskManagementService,
|
||||||
private val queueStore: QueueStore
|
private val queueStore: QueueStore
|
||||||
|
@ -48,6 +49,7 @@ class QueuedTaskAssignerImpl(
|
||||||
|
|
||||||
private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? {
|
private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? {
|
||||||
return try {
|
return try {
|
||||||
|
|
||||||
val assignedTaskQueue =
|
val assignedTaskQueue =
|
||||||
queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false)
|
queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false)
|
||||||
logger.trace(
|
logger.trace(
|
||||||
|
|
|
@ -24,34 +24,33 @@ import org.slf4j.LoggerFactory
|
||||||
interface RegisterTaskService {
|
interface RegisterTaskService {
|
||||||
suspend fun registerTask(taskDefinition: TaskDefinition)
|
suspend fun registerTask(taskDefinition: TaskDefinition)
|
||||||
|
|
||||||
suspend fun unregisterTask(name: String)
|
suspend fun unregisterTask(name:String)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService {
|
class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService {
|
||||||
override suspend fun registerTask(taskDefinition: TaskDefinition) {
|
override suspend fun registerTask(taskDefinition: TaskDefinition) {
|
||||||
val definedTask = taskDefinitionRepository.findByName(taskDefinition.name)
|
val definedTask = taskDefinitionRepository.findByName(taskDefinition.name)
|
||||||
if (definedTask != null) {
|
if (definedTask != null) {
|
||||||
logger.debug("Task already defined. name: ${taskDefinition.name}")
|
logger.debug("Task already defined. name: ${taskDefinition.name}")
|
||||||
if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) {
|
if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) {
|
||||||
throw IncompatibleTaskException(
|
throw IncompatibleTaskException("Task ${taskDefinition.name} has already been defined, and the parameters are incompatible.")
|
||||||
"Task ${taskDefinition.name} has already been defined, and the parameters are incompatible."
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
taskDefinitionRepository.save(taskDefinition)
|
taskDefinitionRepository.save(taskDefinition)
|
||||||
|
|
||||||
logger.info("Register a new task. name: {}", taskDefinition.name)
|
logger.info("Register a new task. name: {}",taskDefinition.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo すでにpublish済みのタスクをどうするか決めさせる
|
// todo すでにpublish済みのタスクをどうするか決めさせる
|
||||||
override suspend fun unregisterTask(name: String) {
|
override suspend fun unregisterTask(name: String) {
|
||||||
taskDefinitionRepository.deleteByName(name)
|
taskDefinitionRepository.deleteByName(name)
|
||||||
|
|
||||||
logger.info("Unregister a task. name: {}", name)
|
logger.info("Unregister a task. name: {}",name)
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object{
|
||||||
private val logger = LoggerFactory.getLogger(RegisterTaskServiceImpl::class.java)
|
private val logger = LoggerFactory.getLogger(RegisterTaskServiceImpl::class.java)
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
|
||||||
interface TaskManagementService {
|
interface TaskManagementService {
|
||||||
|
|
||||||
suspend fun startManagement(coroutineScope: CoroutineScope)
|
suspend fun startManagement(coroutineScope: CoroutineScope)
|
||||||
|
@ -74,11 +75,13 @@ class TaskManagementServiceImpl(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
override fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
|
override fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
|
||||||
return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent)
|
return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent)
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun enqueueTask(task: Task): QueuedTask {
|
private suspend fun enqueueTask(task: Task): QueuedTask {
|
||||||
|
|
||||||
val definedTask = taskDefinitionRepository.findByName(task.name)
|
val definedTask = taskDefinitionRepository.findByName(task.name)
|
||||||
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
|
?: throw TaskNotRegisterException("Task ${task.name} not definition.")
|
||||||
|
|
||||||
|
@ -110,6 +113,7 @@ class TaskManagementServiceImpl(
|
||||||
|
|
||||||
queueStore.dequeue(timeoutQueue)
|
queueStore.dequeue(timeoutQueue)
|
||||||
|
|
||||||
|
|
||||||
val task = taskRepository.findById(timeoutQueue.task.id)
|
val task = taskRepository.findById(timeoutQueue.task.id)
|
||||||
?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}")
|
?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}")
|
||||||
val copy = task.copy(attempt = timeoutQueue.attempt)
|
val copy = task.copy(attempt = timeoutQueue.attempt)
|
||||||
|
@ -144,10 +148,12 @@ class TaskManagementServiceImpl(
|
||||||
taskResult.taskId,
|
taskResult.taskId,
|
||||||
task.copy(completedAt = completedAt, attempt = taskResult.attempt)
|
task.copy(completedAt = completedAt, attempt = taskResult.attempt)
|
||||||
)
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun subscribeResult(producerId: UUID): Flow<TaskResults> {
|
override fun subscribeResult(producerId: UUID): Flow<TaskResults> {
|
||||||
return flow {
|
return flow {
|
||||||
|
|
||||||
while (currentCoroutineContext().isActive) {
|
while (currentCoroutineContext().isActive) {
|
||||||
taskRepository
|
taskRepository
|
||||||
.findByPublishProducerIdAndCompletedAtIsNotNull(producerId)
|
.findByPublishProducerIdAndCompletedAtIsNotNull(producerId)
|
||||||
|
@ -157,7 +163,7 @@ class TaskManagementServiceImpl(
|
||||||
TaskResults(
|
TaskResults(
|
||||||
it.name,
|
it.name,
|
||||||
it.id,
|
it.id,
|
||||||
results.any { taskResult -> taskResult.success },
|
results.any { it.success },
|
||||||
it.attempt,
|
it.attempt,
|
||||||
results
|
results
|
||||||
)
|
)
|
||||||
|
@ -165,7 +171,9 @@ class TaskManagementServiceImpl(
|
||||||
}
|
}
|
||||||
delay(500)
|
delay(500)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
|
|
@ -78,6 +78,7 @@ class TaskPublishServiceImpl(
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun publishTasks(list: List<PublishTask>): List<PublishedTask> {
|
override suspend fun publishTasks(list: List<PublishTask>): List<PublishedTask> {
|
||||||
|
|
||||||
val first = list.first()
|
val first = list.first()
|
||||||
|
|
||||||
val definition = taskDefinitionRepository.findByName(first.name)
|
val definition = taskDefinitionRepository.findByName(first.name)
|
||||||
|
@ -89,14 +90,14 @@ class TaskPublishServiceImpl(
|
||||||
|
|
||||||
val tasks = list.map {
|
val tasks = list.map {
|
||||||
Task(
|
Task(
|
||||||
name = it.name,
|
it.name,
|
||||||
id = UUID.randomUUID(),
|
UUID.randomUUID(),
|
||||||
publishProducerId = first.producerId,
|
first.producerId,
|
||||||
publishedAt = published,
|
published,
|
||||||
nextRetry = nextRetry,
|
nextRetry,
|
||||||
completedAt = null,
|
null,
|
||||||
attempt = 0,
|
0,
|
||||||
properties = it.properties
|
it.properties
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,9 +20,9 @@ import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
data class TaskResults(
|
data class TaskResults(
|
||||||
val name: String,
|
val name:String,
|
||||||
val id: UUID,
|
val id:UUID,
|
||||||
val success: Boolean,
|
val success:Boolean,
|
||||||
val attempt: Int,
|
val attempt:Int,
|
||||||
val results: List<TaskResult>
|
val results: List<TaskResult>
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
syntax = "proto3";
|
syntax = "proto3";
|
||||||
import "uuid.proto";
|
import "uuid.proto";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message SubscribeTaskRequest {
|
message SubscribeTaskRequest {
|
||||||
string name = 1;
|
string name = 1;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
syntax = "proto3";
|
syntax = "proto3";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
import "google/protobuf/empty.proto";
|
import "google/protobuf/empty.proto";
|
||||||
import "uuid.proto";
|
import "uuid.proto";
|
||||||
|
|
|
@ -2,7 +2,7 @@ syntax = "proto3";
|
||||||
|
|
||||||
import "uuid.proto";
|
import "uuid.proto";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message Producer {
|
message Producer {
|
||||||
string name = 1;
|
string name = 1;
|
||||||
|
|
|
@ -2,7 +2,7 @@ syntax = "proto3";
|
||||||
|
|
||||||
import "google/protobuf/empty.proto";
|
import "google/protobuf/empty.proto";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message Property{
|
message Property{
|
||||||
oneof value {
|
oneof value {
|
||||||
|
|
|
@ -4,7 +4,7 @@ import "google/protobuf/timestamp.proto";
|
||||||
|
|
||||||
import "uuid.proto";
|
import "uuid.proto";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
|
|
||||||
message PublishTask {
|
message PublishTask {
|
||||||
|
|
|
@ -3,7 +3,7 @@ import "uuid.proto";
|
||||||
import "google/protobuf/timestamp.proto";
|
import "google/protobuf/timestamp.proto";
|
||||||
import "property.proto";
|
import "property.proto";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message ReadyRequest {
|
message ReadyRequest {
|
||||||
int32 number_of_concurrent = 1;
|
int32 number_of_concurrent = 1;
|
||||||
|
|
|
@ -3,7 +3,7 @@ import "uuid.proto";
|
||||||
import "google/protobuf/empty.proto";
|
import "google/protobuf/empty.proto";
|
||||||
import "property.proto";
|
import "property.proto";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message TaskResult {
|
message TaskResult {
|
||||||
UUID id = 1;
|
UUID id = 1;
|
||||||
|
|
|
@ -2,7 +2,7 @@ syntax = "proto3";
|
||||||
import "uuid.proto";
|
import "uuid.proto";
|
||||||
import "task_result.proto";
|
import "task_result.proto";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message TaskResults {
|
message TaskResults {
|
||||||
string name = 1;
|
string name = 1;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
syntax = "proto3";
|
syntax = "proto3";
|
||||||
|
|
||||||
option java_package = "dev.usbharu.owl.generated";
|
option java_package = "dev.usbharu.owl";
|
||||||
|
|
||||||
message UUID {
|
message UUID {
|
||||||
uint64 most_significant_uuid_bits = 1;
|
uint64 most_significant_uuid_bits = 1;
|
||||||
|
|
|
@ -45,5 +45,6 @@ class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : Propert
|
||||||
Class.forName(string.substringAfter("jackson:").substringBefore(":"))
|
Class.forName(string.substringAfter("jackson:").substringBefore(":"))
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -23,7 +23,4 @@ val Class<*>.allFields: List<Field>
|
||||||
superclass.allFields + declaredFields
|
superclass.allFields + declaredFields
|
||||||
} else {
|
} else {
|
||||||
declaredFields.toList()
|
declaredFields.toList()
|
||||||
}.map {
|
}.map { it.trySetAccessible();it }
|
||||||
it.trySetAccessible()
|
|
||||||
it
|
|
||||||
}
|
|
|
@ -15,12 +15,19 @@ class BooleanPropertyValue(override val value: Boolean) : PropertyValue<Boolean>
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class BooleanPropertySerializer : PropertySerializer<Boolean> {
|
class BooleanPropertySerializer : PropertySerializer<Boolean> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Boolean
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
|
return propertyValue.value is Boolean
|
||||||
|
}
|
||||||
|
|
||||||
override fun isSupported(string: String): Boolean = string.startsWith("bool:")
|
override fun isSupported(string: String): Boolean {
|
||||||
|
return string.startsWith("bool:")
|
||||||
|
}
|
||||||
|
|
||||||
override fun serialize(propertyValue: PropertyValue<*>): String = "bool:" + propertyValue.value.toString()
|
override fun serialize(propertyValue: PropertyValue<*>): String {
|
||||||
|
return "bool:" + propertyValue.value.toString()
|
||||||
|
}
|
||||||
|
|
||||||
override fun deserialize(string: String): PropertyValue<Boolean> =
|
override fun deserialize(string: String): PropertyValue<Boolean> {
|
||||||
BooleanPropertyValue(string.replace("bool:", "").toBoolean())
|
return BooleanPropertyValue(string.replace("bool:", "").toBoolean())
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -23,9 +23,12 @@ package dev.usbharu.owl.common.property
|
||||||
*/
|
*/
|
||||||
open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) :
|
open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) :
|
||||||
PropertySerializerFactory {
|
PropertySerializerFactory {
|
||||||
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> =
|
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> {
|
||||||
propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer<T>?
|
return propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer<T>?
|
||||||
?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue")
|
?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue")
|
||||||
|
}
|
||||||
|
|
||||||
override fun factory(string: String): PropertySerializer<*> = propertySerializers.first { it.isSupported(string) }
|
override fun factory(string: String): PropertySerializer<*> {
|
||||||
|
return propertySerializers.first { it.isSupported(string) }
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -15,12 +15,19 @@ class DoublePropertyValue(override val value: Double) : PropertyValue<Double>()
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class DoublePropertySerializer : PropertySerializer<Double> {
|
class DoublePropertySerializer : PropertySerializer<Double> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Double
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
|
return propertyValue.value is Double
|
||||||
|
}
|
||||||
|
|
||||||
override fun isSupported(string: String): Boolean = string.startsWith("double:")
|
override fun isSupported(string: String): Boolean {
|
||||||
|
return string.startsWith("double:")
|
||||||
|
}
|
||||||
|
|
||||||
override fun serialize(propertyValue: PropertyValue<*>): String = "double:" + propertyValue.value.toString()
|
override fun serialize(propertyValue: PropertyValue<*>): String {
|
||||||
|
return "double:" + propertyValue.value.toString()
|
||||||
|
}
|
||||||
|
|
||||||
override fun deserialize(string: String): PropertyValue<Double> =
|
override fun deserialize(string: String): PropertyValue<Double> {
|
||||||
DoublePropertyValue(string.replace("double:", "").toDouble())
|
return DoublePropertyValue(string.replace("double:", "").toDouble())
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -26,12 +26,19 @@ class FloatPropertyValue(override val value: Float) : PropertyValue<Float>() {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class FloatPropertySerializer : PropertySerializer<Float> {
|
class FloatPropertySerializer : PropertySerializer<Float> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Float
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
|
return propertyValue.value is Float
|
||||||
|
}
|
||||||
|
|
||||||
override fun isSupported(string: String): Boolean = string.startsWith("float:")
|
override fun isSupported(string: String): Boolean {
|
||||||
|
return string.startsWith("float:")
|
||||||
|
}
|
||||||
|
|
||||||
override fun serialize(propertyValue: PropertyValue<*>): String = "float:" + propertyValue.value.toString()
|
override fun serialize(propertyValue: PropertyValue<*>): String {
|
||||||
|
return "float:" + propertyValue.value.toString()
|
||||||
|
}
|
||||||
|
|
||||||
override fun deserialize(string: String): PropertyValue<Float> =
|
override fun deserialize(string: String): PropertyValue<Float> {
|
||||||
FloatPropertyValue(string.replace("float:", "").toFloat())
|
return FloatPropertyValue(string.replace("float:", "").toFloat())
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -31,12 +31,19 @@ class IntegerPropertyValue(override val value: Int) : PropertyValue<Int>() {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class IntegerPropertySerializer : PropertySerializer<Int> {
|
class IntegerPropertySerializer : PropertySerializer<Int> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Int
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
|
return propertyValue.value is Int
|
||||||
|
}
|
||||||
|
|
||||||
override fun isSupported(string: String): Boolean = string.startsWith("int32:")
|
override fun isSupported(string: String): Boolean {
|
||||||
|
return string.startsWith("int32:")
|
||||||
|
}
|
||||||
|
|
||||||
override fun serialize(propertyValue: PropertyValue<*>): String = "int32:" + propertyValue.value.toString()
|
override fun serialize(propertyValue: PropertyValue<*>): String {
|
||||||
|
return "int32:" + propertyValue.value.toString()
|
||||||
|
}
|
||||||
|
|
||||||
override fun deserialize(string: String): PropertyValue<Int> =
|
override fun deserialize(string: String): PropertyValue<Int> {
|
||||||
IntegerPropertyValue(string.replace("int32:", "").toInt())
|
return IntegerPropertyValue(string.replace("int32:", "").toInt())
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -27,12 +27,19 @@ class LongPropertyValue(override val value: Long) : PropertyValue<Long>() {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class LongPropertySerializer : PropertySerializer<Long> {
|
class LongPropertySerializer : PropertySerializer<Long> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Long
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
|
return propertyValue.value is Long
|
||||||
|
}
|
||||||
|
|
||||||
override fun isSupported(string: String): Boolean = string.startsWith("int64:")
|
override fun isSupported(string: String): Boolean {
|
||||||
|
return string.startsWith("int64:")
|
||||||
|
}
|
||||||
|
|
||||||
override fun serialize(propertyValue: PropertyValue<*>): String = "int64:" + propertyValue.value.toString()
|
override fun serialize(propertyValue: PropertyValue<*>): String {
|
||||||
|
return "int64:" + propertyValue.value.toString()
|
||||||
|
}
|
||||||
|
|
||||||
override fun deserialize(string: String): PropertyValue<Long> =
|
override fun deserialize(string: String): PropertyValue<Long> {
|
||||||
LongPropertyValue(string.replace("int64:", "").toLong())
|
return LongPropertyValue(string.replace("int64:", "").toLong())
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -31,5 +31,9 @@ abstract class PropertyValue<T> {
|
||||||
* プロパティの型
|
* プロパティの型
|
||||||
*/
|
*/
|
||||||
abstract val type: PropertyType
|
abstract val type: PropertyType
|
||||||
override fun toString(): String = "PropertyValue(value=$value, type=$type)"
|
override fun toString(): String {
|
||||||
|
return "PropertyValue(value=$value, type=$type)"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -15,11 +15,19 @@ class StringPropertyValue(override val value: String) : PropertyValue<String>()
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class StringPropertyValueSerializer : PropertySerializer<String> {
|
class StringPropertyValueSerializer : PropertySerializer<String> {
|
||||||
override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is String
|
override fun isSupported(propertyValue: PropertyValue<*>): Boolean {
|
||||||
|
return propertyValue.value is String
|
||||||
|
}
|
||||||
|
|
||||||
override fun isSupported(string: String): Boolean = string.startsWith("str:")
|
override fun isSupported(string: String): Boolean {
|
||||||
|
return string.startsWith("str:")
|
||||||
|
}
|
||||||
|
|
||||||
override fun serialize(propertyValue: PropertyValue<*>): String = "str:" + propertyValue.value
|
override fun serialize(propertyValue: PropertyValue<*>): String {
|
||||||
|
return "str:" + propertyValue.value
|
||||||
|
}
|
||||||
|
|
||||||
override fun deserialize(string: String): PropertyValue<String> = StringPropertyValue(string.replace("str:", ""))
|
override fun deserialize(string: String): PropertyValue<String> {
|
||||||
|
return StringPropertyValue(string.replace("str:", ""))
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -13,4 +13,5 @@ import kotlin.math.roundToLong
|
||||||
class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy {
|
class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy {
|
||||||
override fun nextRetry(now: Instant, attempt: Int): Instant =
|
override fun nextRetry(now: Instant, attempt: Int): Instant =
|
||||||
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds)
|
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds)
|
||||||
|
|
||||||
}
|
}
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.common.retry
|
package dev.usbharu.owl.common.retry
|
||||||
|
|
||||||
|
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
interface RetryPolicyFactory {
|
interface RetryPolicyFactory {
|
||||||
|
@ -23,7 +24,9 @@ interface RetryPolicyFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
class DefaultRetryPolicyFactory(private val map: Map<String, RetryPolicy>) : RetryPolicyFactory {
|
class DefaultRetryPolicyFactory(private val map: Map<String, RetryPolicy>) : RetryPolicyFactory {
|
||||||
override fun factory(name: String): RetryPolicy = map[name] ?: throwException(name)
|
override fun factory(name: String): RetryPolicy {
|
||||||
|
return map[name] ?: throwException(name)
|
||||||
|
}
|
||||||
|
|
||||||
private fun throwException(name: String): Nothing {
|
private fun throwException(name: String): Nothing {
|
||||||
logger.warn("RetryPolicy not found. name: {}", name)
|
logger.warn("RetryPolicy not found. name: {}", name)
|
||||||
|
|
|
@ -36,4 +36,6 @@ class PropertyDefinition(val map: Map<String, PropertyType>) : Map<String, Prope
|
||||||
map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 }
|
map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 }
|
||||||
return hash
|
return hash
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,6 +107,7 @@ interface TaskDefinition<T : Task> {
|
||||||
* @return デシリアライズされたタスク
|
* @return デシリアライズされたタスク
|
||||||
*/
|
*/
|
||||||
fun deserialize(value: Map<String, PropertyValue<*>>): T {
|
fun deserialize(value: Map<String, PropertyValue<*>>): T {
|
||||||
|
|
||||||
val task = try {
|
val task = try {
|
||||||
type.getDeclaredConstructor().newInstance()
|
type.getDeclaredConstructor().newInstance()
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
|
|
@ -33,7 +33,7 @@ protobuf {
|
||||||
artifact = libs.protoc.gen.grpc.java.get().toString()
|
artifact = libs.protoc.gen.grpc.java.get().toString()
|
||||||
}
|
}
|
||||||
create("grpckt") {
|
create("grpckt") {
|
||||||
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar"
|
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + "jdk8@jar"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
generateProtoTasks {
|
generateProtoTasks {
|
||||||
|
|
|
@ -29,4 +29,5 @@ abstract class AbstractTaskRunner<T : Task, D : TaskDefinition<T>>(private val t
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract suspend fun typedRun(typedParam: T, taskRequest: TaskRequest): TaskResult
|
abstract suspend fun typedRun(typedParam: T, taskRequest: TaskRequest): TaskResult
|
||||||
|
|
||||||
}
|
}
|
|
@ -16,10 +16,10 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.consumer
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
|
import dev.usbharu.owl.*
|
||||||
|
import dev.usbharu.owl.Uuid.UUID
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.generated.*
|
|
||||||
import dev.usbharu.owl.generated.Uuid.UUID
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.*
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
@ -66,13 +66,11 @@ class Consumer(
|
||||||
suspend fun init(name: String, hostname: String) {
|
suspend fun init(name: String, hostname: String) {
|
||||||
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
|
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
|
||||||
logger.debug("Registered Tasks: {}", runnerMap.keys)
|
logger.debug("Registered Tasks: {}", runnerMap.keys)
|
||||||
consumerId = subscribeTaskStub.subscribeTask(
|
consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest {
|
||||||
subscribeTaskRequest {
|
this.name = name
|
||||||
this.name = name
|
this.hostname = hostname
|
||||||
this.hostname = hostname
|
this.tasks.addAll(runnerMap.keys)
|
||||||
this.tasks.addAll(runnerMap.keys)
|
}).id
|
||||||
}
|
|
||||||
).id
|
|
||||||
logger.info("Success initialize consumer. ConsumerID: {}", consumerId)
|
logger.info("Success initialize consumer. ConsumerID: {}", consumerId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,92 +84,78 @@ class Consumer(
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
try {
|
try {
|
||||||
taskResultStub
|
taskResultStub
|
||||||
.tasKResult(
|
.tasKResult(flow {
|
||||||
flow {
|
assignmentTaskStub
|
||||||
assignmentTaskStub
|
.ready(flow {
|
||||||
.ready(
|
requestTask()
|
||||||
flow {
|
}).onEach {
|
||||||
requestTask()
|
logger.info("Start Task name: {} id: {}", it.name, it.id)
|
||||||
}
|
processing.update { it + 1 }
|
||||||
).onEach {
|
|
||||||
logger.info("Start Task name: {} id: {}", it.name, it.id)
|
|
||||||
processing.update { it + 1 }
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
val taskResult = runnerMap.getValue(it.name).run(
|
val taskResult = runnerMap.getValue(it.name).run(
|
||||||
TaskRequest(
|
TaskRequest(
|
||||||
it.name,
|
it.name,
|
||||||
java.util.UUID(
|
java.util.UUID(
|
||||||
it.id.mostSignificantUuidBits,
|
it.id.mostSignificantUuidBits,
|
||||||
it.id.leastSignificantUuidBits
|
it.id.leastSignificantUuidBits
|
||||||
),
|
),
|
||||||
it.attempt,
|
it.attempt,
|
||||||
Instant.ofEpochSecond(
|
Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()),
|
||||||
it.queuedAt.seconds,
|
PropertySerializeUtils.deserialize(
|
||||||
it.queuedAt.nanos.toLong()
|
propertySerializerFactory,
|
||||||
),
|
it.propertiesMap
|
||||||
PropertySerializeUtils.deserialize(
|
|
||||||
propertySerializerFactory,
|
|
||||||
it.propertiesMap
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
emit(
|
emit(taskResult {
|
||||||
taskResult {
|
this.success = taskResult.success
|
||||||
this.success = taskResult.success
|
this.attempt = it.attempt
|
||||||
this.attempt = it.attempt
|
this.id = it.id
|
||||||
this.id = it.id
|
this.result.putAll(
|
||||||
this.result.putAll(
|
PropertySerializeUtils.serialize(
|
||||||
PropertySerializeUtils.serialize(
|
propertySerializerFactory, taskResult.result
|
||||||
propertySerializerFactory,
|
)
|
||||||
taskResult.result
|
|
||||||
)
|
|
||||||
)
|
|
||||||
this.message = taskResult.message
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
logger.info(
|
this.message = taskResult.message
|
||||||
"Success execute task. name: {} success: {}",
|
})
|
||||||
it.name,
|
logger.info(
|
||||||
taskResult.success
|
"Success execute task. name: {} success: {}",
|
||||||
)
|
it.name,
|
||||||
logger.debug("TRACE RESULT {}", taskResult)
|
taskResult.success
|
||||||
} catch (e: CancellationException) {
|
)
|
||||||
logger.warn("Cancelled execute task.", e)
|
logger.debug("TRACE RESULT {}", taskResult)
|
||||||
emit(
|
} catch (e: CancellationException) {
|
||||||
taskResult {
|
logger.warn("Cancelled execute task.", e)
|
||||||
this.success = false
|
emit(taskResult {
|
||||||
this.attempt = it.attempt
|
this.success = false
|
||||||
this.id = it.id
|
this.attempt = it.attempt
|
||||||
this.message = e.localizedMessage
|
this.id = it.id
|
||||||
}
|
this.message = e.localizedMessage
|
||||||
)
|
})
|
||||||
throw e
|
throw e
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e)
|
logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e)
|
||||||
emit(
|
emit(taskResult {
|
||||||
taskResult {
|
this.success = false
|
||||||
this.success = false
|
this.attempt = it.attempt
|
||||||
this.attempt = it.attempt
|
this.id = it.id
|
||||||
this.id = it.id
|
this.message = e.localizedMessage
|
||||||
this.message = e.localizedMessage
|
})
|
||||||
}
|
} finally {
|
||||||
)
|
logger.debug(" Task name: {} id: {}", it.name, it.id)
|
||||||
} finally {
|
processing.update { it - 1 }
|
||||||
logger.debug(" Task name: {} id: {}", it.name, it.id)
|
concurrent.update {
|
||||||
processing.update { it - 1 }
|
if (it < 64) {
|
||||||
concurrent.update {
|
it + 1
|
||||||
if (it < 64) {
|
} else {
|
||||||
it + 1
|
64
|
||||||
} else {
|
|
||||||
64
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.flowOn(Dispatchers.Default).collect()
|
}
|
||||||
}
|
}.flowOn(Dispatchers.Default).collect()
|
||||||
)
|
})
|
||||||
} catch (e: CancellationException) {
|
} catch (e: CancellationException) {
|
||||||
throw e
|
throw e
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
@ -187,15 +171,14 @@ class Consumer(
|
||||||
while (coroutineScope.isActive) {
|
while (coroutineScope.isActive) {
|
||||||
val andSet = concurrent.getAndUpdate { 0 }
|
val andSet = concurrent.getAndUpdate { 0 }
|
||||||
|
|
||||||
|
|
||||||
if (andSet != 0) {
|
if (andSet != 0) {
|
||||||
logger.debug("Request {} tasks.", andSet)
|
logger.debug("Request {} tasks.", andSet)
|
||||||
try {
|
try {
|
||||||
emit(
|
emit(readyRequest {
|
||||||
readyRequest {
|
this.consumerId = this@Consumer.consumerId
|
||||||
this.consumerId = this@Consumer.consumerId
|
this.numberOfConcurrent = andSet
|
||||||
this.numberOfConcurrent = andSet
|
})
|
||||||
}
|
|
||||||
)
|
|
||||||
} catch (e: CancellationException) {
|
} catch (e: CancellationException) {
|
||||||
throw e
|
throw e
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
|
|
|
@ -25,4 +25,5 @@ fun main() {
|
||||||
standaloneConsumer.init()
|
standaloneConsumer.init()
|
||||||
standaloneConsumer.start()
|
standaloneConsumer.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -16,11 +16,11 @@
|
||||||
|
|
||||||
package dev.usbharu.owl.consumer
|
package dev.usbharu.owl.consumer
|
||||||
|
|
||||||
|
import dev.usbharu.owl.AssignmentTaskServiceGrpcKt
|
||||||
|
import dev.usbharu.owl.SubscribeTaskServiceGrpcKt
|
||||||
|
import dev.usbharu.owl.TaskResultServiceGrpcKt
|
||||||
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
|
import dev.usbharu.owl.common.property.CustomPropertySerializerFactory
|
||||||
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
import dev.usbharu.owl.common.property.PropertySerializerFactory
|
||||||
import dev.usbharu.owl.generated.AssignmentTaskServiceGrpcKt
|
|
||||||
import dev.usbharu.owl.generated.SubscribeTaskServiceGrpcKt
|
|
||||||
import dev.usbharu.owl.generated.TaskResultServiceGrpcKt
|
|
||||||
import io.grpc.ManagedChannelBuilder
|
import io.grpc.ManagedChannelBuilder
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
|
@ -90,11 +90,9 @@ class StandaloneConsumer(
|
||||||
*/
|
*/
|
||||||
suspend fun start() {
|
suspend fun start() {
|
||||||
consumer.start()
|
consumer.start()
|
||||||
Runtime.getRuntime().addShutdownHook(
|
Runtime.getRuntime().addShutdownHook(Thread {
|
||||||
Thread {
|
consumer.stop()
|
||||||
consumer.stop()
|
})
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,4 +102,5 @@ class StandaloneConsumer(
|
||||||
fun stop() {
|
fun stop() {
|
||||||
consumer.stop()
|
consumer.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -30,9 +30,9 @@ import java.util.*
|
||||||
* @property properties タスクに渡されたパラメータ
|
* @property properties タスクに渡されたパラメータ
|
||||||
*/
|
*/
|
||||||
data class TaskRequest(
|
data class TaskRequest(
|
||||||
val name: String,
|
val name:String,
|
||||||
val id: UUID,
|
val id:UUID,
|
||||||
val attempt: Int,
|
val attempt:Int,
|
||||||
val queuedAt: Instant,
|
val queuedAt: Instant,
|
||||||
val properties: Map<String, PropertyValue<*>>
|
val properties:Map<String,PropertyValue<*>>
|
||||||
)
|
)
|
||||||
|
|
|
@ -34,7 +34,7 @@ protobuf {
|
||||||
artifact = libs.protoc.gen.grpc.java.get().toString()
|
artifact = libs.protoc.gen.grpc.java.get().toString()
|
||||||
}
|
}
|
||||||
create("grpckt") {
|
create("grpckt") {
|
||||||
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar"
|
artifact = libs.protoc.gen.grpc.kotlin.get().toString() + "jdk8@jar"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
generateProtoTasks {
|
generateProtoTasks {
|
||||||
|
|
|
@ -17,12 +17,12 @@
|
||||||
package dev.usbharu.owl.producer.defaultimpl
|
package dev.usbharu.owl.producer.defaultimpl
|
||||||
|
|
||||||
import com.google.protobuf.timestamp
|
import com.google.protobuf.timestamp
|
||||||
|
import dev.usbharu.owl.*
|
||||||
|
import dev.usbharu.owl.Uuid.UUID
|
||||||
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
import dev.usbharu.owl.common.property.PropertySerializeUtils
|
||||||
import dev.usbharu.owl.common.task.PublishedTask
|
import dev.usbharu.owl.common.task.PublishedTask
|
||||||
import dev.usbharu.owl.common.task.Task
|
import dev.usbharu.owl.common.task.Task
|
||||||
import dev.usbharu.owl.common.task.TaskDefinition
|
import dev.usbharu.owl.common.task.TaskDefinition
|
||||||
import dev.usbharu.owl.generated.*
|
|
||||||
import dev.usbharu.owl.generated.Uuid.UUID
|
|
||||||
import dev.usbharu.owl.producer.api.OwlProducer
|
import dev.usbharu.owl.producer.api.OwlProducer
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
|
@ -36,12 +36,10 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce
|
||||||
override suspend fun start() {
|
override suspend fun start() {
|
||||||
producerServiceCoroutineStub =
|
producerServiceCoroutineStub =
|
||||||
ProducerServiceGrpcKt.ProducerServiceCoroutineStub(defaultOwlProducerConfig.channel)
|
ProducerServiceGrpcKt.ProducerServiceCoroutineStub(defaultOwlProducerConfig.channel)
|
||||||
producerId = producerServiceCoroutineStub.registerProducer(
|
producerId = producerServiceCoroutineStub.registerProducer(producer {
|
||||||
producer {
|
this.name = defaultOwlProducerConfig.name
|
||||||
this.name = defaultOwlProducerConfig.name
|
this.hostname = defaultOwlProducerConfig.hostname
|
||||||
this.hostname = defaultOwlProducerConfig.hostname
|
}).id
|
||||||
}
|
|
||||||
).id
|
|
||||||
|
|
||||||
defineTaskServiceCoroutineStub =
|
defineTaskServiceCoroutineStub =
|
||||||
DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub(defaultOwlProducerConfig.channel)
|
DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub(defaultOwlProducerConfig.channel)
|
||||||
|
@ -50,18 +48,17 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce
|
||||||
TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub(defaultOwlProducerConfig.channel)
|
TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub(defaultOwlProducerConfig.channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
|
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
|
||||||
defineTaskServiceCoroutineStub.register(
|
defineTaskServiceCoroutineStub.register(taskDefinition {
|
||||||
taskDefinition {
|
this.producerId = this@DefaultOwlProducer.producerId
|
||||||
this.producerId = this@DefaultOwlProducer.producerId
|
this.name = taskDefinition.name
|
||||||
this.name = taskDefinition.name
|
this.maxRetry = taskDefinition.maxRetry
|
||||||
this.maxRetry = taskDefinition.maxRetry
|
this.priority = taskDefinition.priority
|
||||||
this.priority = taskDefinition.priority
|
this.retryPolicy = taskDefinition.retryPolicy
|
||||||
this.retryPolicy = taskDefinition.retryPolicy
|
this.timeoutMilli = taskDefinition.timeoutMilli
|
||||||
this.timeoutMilli = taskDefinition.timeoutMilli
|
this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash()
|
||||||
this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash()
|
})
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
|
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
|
||||||
|
@ -72,7 +69,7 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce
|
||||||
)
|
)
|
||||||
val now = Instant.now()
|
val now = Instant.now()
|
||||||
val publishTask = taskPublishServiceCoroutineStub.publishTask(
|
val publishTask = taskPublishServiceCoroutineStub.publishTask(
|
||||||
dev.usbharu.owl.generated.publishTask {
|
dev.usbharu.owl.publishTask {
|
||||||
this.producerId = this@DefaultOwlProducer.producerId
|
this.producerId = this@DefaultOwlProducer.producerId
|
||||||
|
|
||||||
this.publishedAt = timestamp {
|
this.publishedAt = timestamp {
|
||||||
|
|
|
@ -96,6 +96,7 @@ class EmbeddedOwlProducer(
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
|
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
|
||||||
|
|
||||||
val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition<T>
|
val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition<T>
|
||||||
|
|
||||||
val publishTask = application.get<TaskPublishService>().publishTask(
|
val publishTask = application.get<TaskPublishService>().publishTask(
|
||||||
|
|
|
@ -46,6 +46,7 @@ class EmbeddedOwlProducerBuilder : OwlProducerBuilder<EmbeddedOwlProducer, Embed
|
||||||
override fun apply(owlProducerConfig: EmbeddedOwlProducerConfig) {
|
override fun apply(owlProducerConfig: EmbeddedOwlProducerConfig) {
|
||||||
this.config = owlProducerConfig
|
this.config = owlProducerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val EMBEDDED by lazy { EmbeddedOwlProducerBuilder() }
|
val EMBEDDED by lazy { EmbeddedOwlProducerBuilder() }
|
|
@ -21,7 +21,6 @@ rootProject.name = "hideout"
|
||||||
|
|
||||||
includeBuild("hideout-core")
|
includeBuild("hideout-core")
|
||||||
includeBuild("hideout-mastodon")
|
includeBuild("hideout-mastodon")
|
||||||
includeBuild("hideout-activitypub")
|
|
||||||
|
|
||||||
dependencyResolutionManagement {
|
dependencyResolutionManagement {
|
||||||
repositories {
|
repositories {
|
||||||
|
|
Loading…
Reference in New Issue