style: fix lint

This commit is contained in:
usbharu 2024-09-17 22:02:29 +09:00
parent 1a2142e83b
commit 4bc9e3abd2
Signed by: usbharu
GPG Key ID: 95CBCF7046307B77
92 changed files with 413 additions and 461 deletions

View File

@ -53,7 +53,6 @@ subprojects {
autoCorrect = true autoCorrect = true
} }
project.gradle.taskGraph.whenReady { project.gradle.taskGraph.whenReady {
if (this.hasTask(":koverGenerateArtifact")) { if (this.hasTask(":koverGenerateArtifact")) {
val task = this.allTasks.find { println(it.name);it.name == "test" } val task = this.allTasks.find { println(it.name);it.name == "test" }
@ -61,8 +60,23 @@ subprojects {
verificationTask.ignoreFailures = true verificationTask.ignoreFailures = true
} }
} }
tasks.test { tasks {
useJUnitPlatform() withType<io.gitlab.arturbosch.detekt.Detekt> {
exclude("**/generated/**")
setSource("src/main/kotlin")
exclude("build/")
configureEach {
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
}
}
withType<io.gitlab.arturbosch.detekt.DetektCreateBaselineTask>() {
configureEach {
exclude("**/org/koin/ksp/generated/**", "**/generated/**")
}
}
withType<Test> {
useJUnitPlatform()
}
} }
publishing { publishing {

View File

@ -32,7 +32,6 @@ import org.koin.dsl.module
class MongoModuleContext : ModuleContext { class MongoModuleContext : ModuleContext {
override fun module(): Module { override fun module(): Module {
return module { return module {
single { single {
val clientSettings = val clientSettings =
@ -47,7 +46,6 @@ class MongoModuleContext : ModuleContext {
) )
.uuidRepresentation(UuidRepresentation.STANDARD).build() .uuidRepresentation(UuidRepresentation.STANDARD).build()
MongoClient.create(clientSettings) MongoClient.create(clientSettings)
.getDatabase(System.getProperty("owl.broker.mongo.database", "mongo-test")) .getDatabase(System.getProperty("owl.broker.mongo.database", "mongo-test"))
} }

View File

@ -33,7 +33,11 @@ class MongodbConsumerRepository(database: MongoDatabase) : ConsumerRepository {
private val collection = database.getCollection<ConsumerMongodb>("consumers") private val collection = database.getCollection<ConsumerMongodb>("consumers")
override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) { override suspend fun save(consumer: Consumer): Consumer = withContext(Dispatchers.IO) {
collection.replaceOne(Filters.eq("_id", consumer.id.toString()), ConsumerMongodb.of(consumer), ReplaceOptions().upsert(true)) collection.replaceOne(
Filters.eq("_id", consumer.id.toString()),
ConsumerMongodb.of(consumer),
ReplaceOptions().upsert(true)
)
return@withContext consumer return@withContext consumer
} }
@ -49,15 +53,19 @@ data class ConsumerMongodb(
val name: String, val name: String,
val hostname: String, val hostname: String,
val tasks: List<String> val tasks: List<String>
){ ) {
fun toConsumer():Consumer{ fun toConsumer(): Consumer {
return Consumer( return Consumer(
UUID.fromString(id), name, hostname, tasks UUID.fromString(id),
name,
hostname,
tasks
) )
} }
companion object{
fun of(consumer: Consumer):ConsumerMongodb{ companion object {
fun of(consumer: Consumer): ConsumerMongodb {
return ConsumerMongodb( return ConsumerMongodb(
consumer.id.toString(), consumer.id.toString(),
consumer.name, consumer.name,

View File

@ -48,7 +48,8 @@ class MongodbQueuedTaskRepository(
override suspend fun save(queuedTask: QueuedTask): QueuedTask { override suspend fun save(queuedTask: QueuedTask): QueuedTask {
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
collection.replaceOne( collection.replaceOne(
eq("_id", queuedTask.task.id.toString()), QueuedTaskMongodb.of(propertySerializerFactory, queuedTask), eq("_id", queuedTask.task.id.toString()),
QueuedTaskMongodb.of(propertySerializerFactory, queuedTask),
ReplaceOptions().upsert(true) ReplaceOptions().upsert(true)
) )
} }
@ -57,7 +58,6 @@ class MongodbQueuedTaskRepository(
override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask { override suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask {
return withContext(Dispatchers.IO) { return withContext(Dispatchers.IO) {
val findOneAndUpdate = collection.findOneAndUpdate( val findOneAndUpdate = collection.findOneAndUpdate(
and( and(
eq("_id", id.toString()), eq("_id", id.toString()),
@ -108,7 +108,7 @@ data class QueuedTaskMongodb(
val task: TaskMongodb, val task: TaskMongodb,
val attempt: Int, val attempt: Int,
val queuedAt: Instant, val queuedAt: Instant,
val priority:Int, val priority: Int,
val isActive: Boolean, val isActive: Boolean,
val timeoutAt: Instant?, val timeoutAt: Instant?,
val assignedConsumer: String?, val assignedConsumer: String?,
@ -155,14 +155,14 @@ data class QueuedTaskMongodb(
companion object { companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb { fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
return TaskMongodb( return TaskMongodb(
task.name, name = task.name,
task.id.toString(), id = task.id.toString(),
task.publishProducerId.toString(), publishProducerId = task.publishProducerId.toString(),
task.publishedAt, publishedAt = task.publishedAt,
task.nextRetry, nextRetry = task.nextRetry,
task.completedAt, completedAt = task.completedAt,
task.attempt, attempt = task.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
) )
} }
} }
@ -171,15 +171,15 @@ data class QueuedTaskMongodb(
companion object { companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb { fun of(propertySerializerFactory: PropertySerializerFactory, queuedTask: QueuedTask): QueuedTaskMongodb {
return QueuedTaskMongodb( return QueuedTaskMongodb(
queuedTask.task.id.toString(), id = queuedTask.task.id.toString(),
TaskMongodb.of(propertySerializerFactory, queuedTask.task), task = TaskMongodb.of(propertySerializerFactory, queuedTask.task),
queuedTask.attempt, attempt = queuedTask.attempt,
queuedTask.queuedAt, queuedAt = queuedTask.queuedAt,
queuedTask.priority, priority = queuedTask.priority,
queuedTask.isActive, isActive = queuedTask.isActive,
queuedTask.timeoutAt, timeoutAt = queuedTask.timeoutAt,
queuedTask.assignedConsumer?.toString(), assignedConsumer = queuedTask.assignedConsumer?.toString(),
queuedTask.assignedAt assignedAt = queuedTask.assignedAt
) )
} }
} }

View File

@ -41,7 +41,7 @@ class MongodbTaskDefinitionRepository(database: MongoDatabase) : TaskDefinitionR
} }
override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) { override suspend fun deleteByName(name: String): Unit = withContext(Dispatchers.IO) {
collection.deleteOne(Filters.eq("_id",name)) collection.deleteOne(Filters.eq("_id", name))
} }
override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) { override suspend fun findByName(name: String): TaskDefinition? = withContext(Dispatchers.IO) {

View File

@ -36,27 +36,29 @@ import org.bson.codecs.pojo.annotations.BsonRepresentation
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) : class MongodbTaskRepository(database: MongoDatabase, private val propertySerializerFactory: PropertySerializerFactory) :
TaskRepository { TaskRepository {
private val collection = database.getCollection<TaskMongodb>("tasks") private val collection = database.getCollection<TaskMongodb>("tasks")
override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) { override suspend fun save(task: Task): Task = withContext(Dispatchers.IO) {
collection.replaceOne( collection.replaceOne(
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), Filters.eq("_id", task.id.toString()),
TaskMongodb.of(propertySerializerFactory, task),
ReplaceOptions().upsert(true) ReplaceOptions().upsert(true)
) )
return@withContext task return@withContext task
} }
override suspend fun saveAll(tasks: List<Task>): Unit = withContext(Dispatchers.IO) { override suspend fun saveAll(tasks: List<Task>): Unit = withContext(Dispatchers.IO) {
collection.bulkWrite(tasks.map { collection.bulkWrite(
ReplaceOneModel( tasks.map {
Filters.eq(it.id.toString()), ReplaceOneModel(
TaskMongodb.of(propertySerializerFactory, it), Filters.eq(it.id.toString()),
ReplaceOptions().upsert(true) TaskMongodb.of(propertySerializerFactory, it),
) ReplaceOptions().upsert(true)
}) )
}
)
} }
override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task> { override fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task> {
@ -75,12 +77,13 @@ class MongodbTaskRepository(database: MongoDatabase, private val propertySeriali
override suspend fun findByIdAndUpdate(id: UUID, task: Task) { override suspend fun findByIdAndUpdate(id: UUID, task: Task) {
collection.replaceOne( collection.replaceOne(
Filters.eq("_id", task.id.toString()), TaskMongodb.of(propertySerializerFactory, task), Filters.eq("_id", task.id.toString()),
TaskMongodb.of(propertySerializerFactory, task),
ReplaceOptions().upsert(false) ReplaceOptions().upsert(false)
) )
} }
override suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> { override fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task> {
return collection return collection
.find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString())) .find(Filters.eq(TaskMongodb::publishProducerId.name, publishProducerId.toString()))
.map { it.toTask(propertySerializerFactory) } .map { it.toTask(propertySerializerFactory) }
@ -116,14 +119,14 @@ data class TaskMongodb(
companion object { companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb { fun of(propertySerializerFactory: PropertySerializerFactory, task: Task): TaskMongodb {
return TaskMongodb( return TaskMongodb(
task.name, name = task.name,
task.id.toString(), id = task.id.toString(),
task.publishProducerId.toString(), publishProducerId = task.publishProducerId.toString(),
task.publishedAt, publishedAt = task.publishedAt,
task.nextRetry, nextRetry = task.nextRetry,
task.completedAt, completedAt = task.completedAt,
task.attempt, attempt = task.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, task.properties) properties = PropertySerializeUtils.serialize(propertySerializerFactory, task.properties)
) )
} }
} }

View File

@ -41,14 +41,17 @@ class MongodbTaskResultRepository(
private val collection = database.getCollection<TaskResultMongodb>("task_results") private val collection = database.getCollection<TaskResultMongodb>("task_results")
override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) { override suspend fun save(taskResult: TaskResult): TaskResult = withContext(Dispatchers.IO) {
collection.replaceOne( collection.replaceOne(
Filters.eq(taskResult.id.toString()), TaskResultMongodb.of(propertySerializerFactory, taskResult), Filters.eq(taskResult.id.toString()),
TaskResultMongodb.of(propertySerializerFactory, taskResult),
ReplaceOptions().upsert(true) ReplaceOptions().upsert(true)
) )
return@withContext taskResult return@withContext taskResult
} }
override fun findByTaskId(id: UUID): Flow<TaskResult> { override fun findByTaskId(id: UUID): Flow<TaskResult> {
return collection.find(Filters.eq(id.toString())).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO) return collection.find(
Filters.eq(id.toString())
).map { it.toTaskResult(propertySerializerFactory) }.flowOn(Dispatchers.IO)
} }
} }
@ -65,27 +68,25 @@ data class TaskResultMongodb(
fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult { fun toTaskResult(propertySerializerFactory: PropertySerializerFactory): TaskResult {
return TaskResult( return TaskResult(
UUID.fromString(id), id = UUID.fromString(id),
UUID.fromString(taskId), taskId = UUID.fromString(taskId),
success, success = success,
attempt, attempt = attempt,
PropertySerializeUtils.deserialize(propertySerializerFactory, result), result = PropertySerializeUtils.deserialize(propertySerializerFactory, result),
message message = message
) )
} }
companion object { companion object {
fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb { fun of(propertySerializerFactory: PropertySerializerFactory, taskResult: TaskResult): TaskResultMongodb {
return TaskResultMongodb( return TaskResultMongodb(
taskResult.id.toString(), id = taskResult.id.toString(),
taskResult.taskId.toString(), taskId = taskResult.taskId.toString(),
taskResult.success, success = taskResult.success,
taskResult.attempt, attempt = taskResult.attempt,
PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result), result = PropertySerializeUtils.serialize(propertySerializerFactory, taskResult.result),
taskResult.message message = taskResult.message
) )
} }
} }
} }

View File

@ -90,7 +90,6 @@ fun main() {
logger.info("Use module name: {}", moduleContext) logger.info("Use module name: {}", moduleContext)
val koin = startKoin { val koin = startKoin {
printLogger() printLogger()
@ -98,7 +97,6 @@ fun main() {
single<RetryPolicyFactory> { single<RetryPolicyFactory> {
DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy())) DefaultRetryPolicyFactory(mapOf("" to ExponentialRetryPolicy()))
} }
} }
modules(mainModule, module, moduleContext.module()) modules(mainModule, module, moduleContext.module())
} }

View File

@ -19,11 +19,9 @@ package dev.usbharu.owl.broker
import org.koin.core.module.Module import org.koin.core.module.Module
interface ModuleContext { interface ModuleContext {
fun module():Module fun module(): Module
} }
data object EmptyModuleContext : ModuleContext { data object EmptyModuleContext : ModuleContext {
override fun module(): Module { override fun module(): Module = org.koin.dsl.module { }
return org.koin.dsl.module { }
}
} }

View File

@ -38,7 +38,7 @@ class OwlBrokerApplication(
private lateinit var server: Server private lateinit var server: Server
fun start(port: Int,coroutineScope: CoroutineScope = GlobalScope):Job { fun start(port: Int, coroutineScope: CoroutineScope = GlobalScope): Job {
server = ServerBuilder.forPort(port) server = ServerBuilder.forPort(port)
.addService(assignmentTaskService) .addService(assignmentTaskService)
.addService(definitionTaskService) .addService(definitionTaskService)
@ -64,5 +64,4 @@ class OwlBrokerApplication(
fun stop() { fun stop() {
server.shutdown() server.shutdown()
} }
} }

View File

@ -19,7 +19,7 @@ package dev.usbharu.owl.broker.domain.model.consumer
import java.util.* import java.util.*
interface ConsumerRepository { interface ConsumerRepository {
suspend fun save(consumer: Consumer):Consumer suspend fun save(consumer: Consumer): Consumer
suspend fun findById(id:UUID):Consumer? suspend fun findById(id: UUID): Consumer?
} }

View File

@ -20,9 +20,9 @@ import java.time.Instant
import java.util.* import java.util.*
data class Producer( data class Producer(
val id:UUID, val id: UUID,
val name:String, val name: String,
val hostname:String, val hostname: String,
val registeredTask:List<String>, val registeredTask: List<String>,
val createdAt: Instant val createdAt: Instant
) )

View File

@ -17,5 +17,5 @@
package dev.usbharu.owl.broker.domain.model.producer package dev.usbharu.owl.broker.domain.model.producer
interface ProducerRepository { interface ProducerRepository {
suspend fun save(producer: Producer):Producer suspend fun save(producer: Producer): Producer
} }

View File

@ -21,12 +21,12 @@ import java.time.Instant
import java.util.* import java.util.*
interface QueuedTaskRepository { interface QueuedTaskRepository {
suspend fun save(queuedTask: QueuedTask):QueuedTask suspend fun save(queuedTask: QueuedTask): QueuedTask
/** /**
* トランザクションの代わり * トランザクションの代わり
*/ */
suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id:UUID,update:QueuedTask):QueuedTask suspend fun findByTaskIdAndAssignedConsumerIsNullAndUpdate(id: UUID, update: QueuedTask): QueuedTask
fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask> fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks: List<String>, limit: Int): Flow<QueuedTask>

View File

@ -24,11 +24,11 @@ import java.util.*
* @param attempt 失敗を含めて試行した回数 * @param attempt 失敗を含めて試行した回数
*/ */
data class Task( data class Task(
val name:String, val name: String,
val id: UUID, val id: UUID,
val publishProducerId:UUID, val publishProducerId: UUID,
val publishedAt: Instant, val publishedAt: Instant,
val nextRetry:Instant, val nextRetry: Instant,
val completedAt: Instant? = null, val completedAt: Instant? = null,
val attempt: Int, val attempt: Int,
val properties: Map<String, PropertyValue<*>> val properties: Map<String, PropertyValue<*>>

View File

@ -21,15 +21,15 @@ import java.time.Instant
import java.util.* import java.util.*
interface TaskRepository { interface TaskRepository {
suspend fun save(task: Task):Task suspend fun save(task: Task): Task
suspend fun saveAll(tasks:List<Task>) suspend fun saveAll(tasks: List<Task>)
fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp:Instant): Flow<Task> fun findByNextRetryBeforeAndCompletedAtIsNull(timestamp: Instant): Flow<Task>
suspend fun findById(uuid: UUID): Task? suspend fun findById(uuid: UUID): Task?
suspend fun findByIdAndUpdate(id:UUID,task: Task) suspend fun findByIdAndUpdate(id: UUID, task: Task)
suspend fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId:UUID):Flow<Task> fun findByPublishProducerIdAndCompletedAtIsNotNull(publishProducerId: UUID): Flow<Task>
} }

View File

@ -22,5 +22,5 @@ data class TaskDefinition(
val maxRetry: Int, val maxRetry: Int,
val timeoutMilli: Long, val timeoutMilli: Long,
val propertyDefinitionHash: Long, val propertyDefinitionHash: Long,
val retryPolicy:String val retryPolicy: String
) )

View File

@ -18,7 +18,7 @@ package dev.usbharu.owl.broker.domain.model.taskdefinition
interface TaskDefinitionRepository { interface TaskDefinitionRepository {
suspend fun save(taskDefinition: TaskDefinition): TaskDefinition suspend fun save(taskDefinition: TaskDefinition): TaskDefinition
suspend fun deleteByName(name:String) suspend fun deleteByName(name: String)
suspend fun findByName(name:String):TaskDefinition? suspend fun findByName(name: String): TaskDefinition?
} }

View File

@ -21,7 +21,7 @@ import java.util.*
data class TaskResult( data class TaskResult(
val id: UUID, val id: UUID,
val taskId:UUID, val taskId: UUID,
val success: Boolean, val success: Boolean,
val attempt: Int, val attempt: Int,
val result: Map<String, PropertyValue<*>>, val result: Map<String, PropertyValue<*>>,

View File

@ -20,6 +20,6 @@ import kotlinx.coroutines.flow.Flow
import java.util.* import java.util.*
interface TaskResultRepository { interface TaskResultRepository {
suspend fun save(taskResult: TaskResult):TaskResult suspend fun save(taskResult: TaskResult): TaskResult
fun findByTaskId(id:UUID): Flow<TaskResult> fun findByTaskId(id: UUID): Flow<TaskResult>
} }

View File

@ -32,4 +32,4 @@ fun UUID.toUUID(): Uuid.UUID = Uuid
fun Timestamp.toInstant(): Instant = Instant.ofEpochSecond(seconds, nanos.toLong()) fun Timestamp.toInstant(): Instant = Instant.ofEpochSecond(seconds, nanos.toLong())
fun Instant.toTimestamp():Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build() fun Instant.toTimestamp(): Timestamp = Timestamp.newBuilder().setSeconds(this.epochSecond).setNanos(this.nano).build()

View File

@ -16,7 +16,6 @@
package dev.usbharu.owl.broker.interfaces.grpc package dev.usbharu.owl.broker.interfaces.grpc
import dev.usbharu.owl.broker.external.toTimestamp import dev.usbharu.owl.broker.external.toTimestamp
import dev.usbharu.owl.broker.external.toUUID import dev.usbharu.owl.broker.external.toUUID
import dev.usbharu.owl.broker.service.QueuedTaskAssigner import dev.usbharu.owl.broker.service.QueuedTaskAssigner
@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
class AssignmentTaskService( class AssignmentTaskService(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val queuedTaskAssigner: QueuedTaskAssigner, private val queuedTaskAssigner: QueuedTaskAssigner,
@ -42,7 +40,6 @@ class AssignmentTaskService(
AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) { AssignmentTaskServiceGrpcKt.AssignmentTaskServiceCoroutineImplBase(coroutineContext) {
override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> { override fun ready(requests: Flow<Task.ReadyRequest>): Flow<Task.TaskRequest> {
return try { return try {
requests requests
.flatMapMerge { .flatMapMerge {

View File

@ -25,17 +25,20 @@ import dev.usbharu.owl.generated.DefinitionTaskServiceGrpcKt.DefinitionTaskServi
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
class DefinitionTaskService(coroutineContext: CoroutineContext = EmptyCoroutineContext,private val registerTaskService: RegisterTaskService) : class DefinitionTaskService(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val registerTaskService: RegisterTaskService
) :
DefinitionTaskServiceCoroutineImplBase(coroutineContext) { DefinitionTaskServiceCoroutineImplBase(coroutineContext) {
override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined { override suspend fun register(request: DefinitionTask.TaskDefinition): TaskDefined {
registerTaskService.registerTask( registerTaskService.registerTask(
TaskDefinition( TaskDefinition(
request.name, name = request.name,
request.priority, priority = request.priority,
request.maxRetry, maxRetry = request.maxRetry,
request.timeoutMilli, timeoutMilli = request.timeoutMilli,
request.propertyDefinitionHash, propertyDefinitionHash = request.propertyDefinitionHash,
request.retryPolicy retryPolicy = request.retryPolicy
) )
) )
return TaskDefined return TaskDefined

View File

@ -24,16 +24,18 @@ import dev.usbharu.owl.generated.ProducerServiceGrpcKt.ProducerServiceCoroutineI
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
class ProducerService( class ProducerService(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val producerService: ProducerService private val producerService: ProducerService
) : ) :
ProducerServiceCoroutineImplBase(coroutineContext) { ProducerServiceCoroutineImplBase(coroutineContext) {
override suspend fun registerProducer(request: ProducerOuterClass.Producer): ProducerOuterClass.RegisterProducerResponse { override suspend fun registerProducer(
request: ProducerOuterClass.Producer
): ProducerOuterClass.RegisterProducerResponse {
val registerProducer = producerService.registerProducer( val registerProducer = producerService.registerProducer(
RegisterProducerRequest( RegisterProducerRequest(
request.name, request.hostname request.name,
request.hostname
) )
) )
return ProducerOuterClass.RegisterProducerResponse.newBuilder().setId(registerProducer.toUUID()).build() return ProducerOuterClass.RegisterProducerResponse.newBuilder().setId(registerProducer.toUUID()).build()

View File

@ -39,13 +39,9 @@ class TaskPublishService(
TaskPublishServiceCoroutineImplBase(coroutineContext) { TaskPublishServiceCoroutineImplBase(coroutineContext) {
override suspend fun publishTask(request: PublishTaskOuterClass.PublishTask): PublishedTask { override suspend fun publishTask(request: PublishTaskOuterClass.PublishTask): PublishedTask {
logger.warn("aaaaaaaaaaa") logger.warn("aaaaaaaaaaa")
return try { return try {
val publishedTask = taskPublishService.publishTask( val publishedTask = taskPublishService.publishTask(
PublishTask( PublishTask(
request.name, request.name,
@ -61,7 +57,6 @@ class TaskPublishService(
} }
override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishedTasks { override suspend fun publishTasks(request: PublishTaskOuterClass.PublishTasks): PublishedTasks {
val tasks = request.propertiesArrayList.map { val tasks = request.propertiesArrayList.map {
PublishTask( PublishTask(
request.name, request.name,

View File

@ -41,15 +41,17 @@ class TaskResultSubscribeService(
name = it.name name = it.name
attempt = it.attempt attempt = it.attempt
success = it.success success = it.success
results.addAll(it.results.map { results.addAll(
taskResult { it.results.map {
id = it.taskId.toUUID() taskResult {
success = it.success id = it.taskId.toUUID()
attempt = it.attempt success = it.success
result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result)) attempt = it.attempt
message = it.message result.putAll(PropertySerializeUtils.serialize(propertySerializerFactory, it.result))
message = it.message
}
} }
}) )
} }
} }
} }

View File

@ -42,7 +42,5 @@ class AssignQueuedTaskDeciderImpl(
).take(numberOfConcurrent) ).take(numberOfConcurrent)
) )
} }
} }
} }

View File

@ -26,10 +26,8 @@ interface ProducerService {
suspend fun registerProducer(producer: RegisterProducerRequest): UUID suspend fun registerProducer(producer: RegisterProducerRequest): UUID
} }
class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService { class ProducerServiceImpl(private val producerRepository: ProducerRepository) : ProducerService {
override suspend fun registerProducer(producer: RegisterProducerRequest): UUID { override suspend fun registerProducer(producer: RegisterProducerRequest): UUID {
val id = UUID.randomUUID() val id = UUID.randomUUID()
val saveProducer = Producer( val saveProducer = Producer(

View File

@ -29,19 +29,14 @@ interface QueueScanner {
fun startScan(): Flow<QueuedTask> fun startScan(): Flow<QueuedTask>
} }
class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner { class QueueScannerImpl(private val queueStore: QueueStore) : QueueScanner {
override fun startScan(): Flow<QueuedTask> { override fun startScan(): Flow<QueuedTask> = flow {
return flow { while (currentCoroutineContext().isActive) {
while (currentCoroutineContext().isActive) { emitAll(scanQueue())
emitAll(scanQueue()) delay(1000)
delay(1000)
}
} }
} }
private fun scanQueue(): Flow<QueuedTask> { private fun scanQueue(): Flow<QueuedTask> =
return queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10)) queueStore.findByQueuedAtBeforeAndIsActiveIsTrue(Instant.now().minusSeconds(10))
}
} }

View File

@ -32,33 +32,24 @@ interface QueueStore {
fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask>
} }
class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore { class QueueStoreImpl(private val queuedTaskRepository: QueuedTaskRepository) : QueueStore {
override suspend fun enqueue(queuedTask: QueuedTask) { override suspend fun enqueue(queuedTask: QueuedTask) {
queuedTaskRepository.save(queuedTask) queuedTaskRepository.save(queuedTask)
} }
override suspend fun enqueueAll(queuedTaskList: List<QueuedTask>) { override suspend fun enqueueAll(queuedTaskList: List<QueuedTask>) = queuedTaskList.forEach { enqueue(it) }
queuedTaskList.forEach { enqueue(it) }
}
override suspend fun dequeue(queuedTask: QueuedTask) { override suspend fun dequeue(queuedTask: QueuedTask) {
queuedTaskRepository.findByTaskIdAndAssignedConsumerIsNullAndUpdate(queuedTask.task.id, queuedTask) queuedTaskRepository.findByTaskIdAndAssignedConsumerIsNullAndUpdate(queuedTask.task.id, queuedTask)
} }
override suspend fun dequeueAll(queuedTaskList: List<QueuedTask>) { override suspend fun dequeueAll(queuedTaskList: List<QueuedTask>) = queuedTaskList.forEach { dequeue(it) }
return queuedTaskList.forEach { dequeue(it) }
}
override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority( override fun findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(
tasks: List<String>, tasks: List<String>,
limit: Int limit: Int
): Flow<QueuedTask> { ): Flow<QueuedTask> = queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit)
return queuedTaskRepository.findByTaskNameInAndIsActiveIsTrueAndOrderByPriority(tasks, limit)
}
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> {
return queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant)
}
override fun findByQueuedAtBeforeAndIsActiveIsTrue(instant: Instant): Flow<QueuedTask> =
queuedTaskRepository.findByQueuedAtBeforeAndIsActiveIsTrue(instant)
} }

View File

@ -27,7 +27,6 @@ interface QueuedTaskAssigner {
fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> fun ready(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask>
} }
class QueuedTaskAssignerImpl( class QueuedTaskAssignerImpl(
private val taskManagementService: TaskManagementService, private val taskManagementService: TaskManagementService,
private val queueStore: QueueStore private val queueStore: QueueStore
@ -49,7 +48,6 @@ class QueuedTaskAssignerImpl(
private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? { private suspend fun assignTask(queuedTask: QueuedTask, consumerId: UUID): QueuedTask? {
return try { return try {
val assignedTaskQueue = val assignedTaskQueue =
queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false) queuedTask.copy(assignedConsumer = consumerId, assignedAt = Instant.now(), isActive = false)
logger.trace( logger.trace(

View File

@ -24,33 +24,34 @@ import org.slf4j.LoggerFactory
interface RegisterTaskService { interface RegisterTaskService {
suspend fun registerTask(taskDefinition: TaskDefinition) suspend fun registerTask(taskDefinition: TaskDefinition)
suspend fun unregisterTask(name:String) suspend fun unregisterTask(name: String)
} }
class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService { class RegisterTaskServiceImpl(private val taskDefinitionRepository: TaskDefinitionRepository) : RegisterTaskService {
override suspend fun registerTask(taskDefinition: TaskDefinition) { override suspend fun registerTask(taskDefinition: TaskDefinition) {
val definedTask = taskDefinitionRepository.findByName(taskDefinition.name) val definedTask = taskDefinitionRepository.findByName(taskDefinition.name)
if (definedTask != null) { if (definedTask != null) {
logger.debug("Task already defined. name: ${taskDefinition.name}") logger.debug("Task already defined. name: ${taskDefinition.name}")
if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) { if (taskDefinition.propertyDefinitionHash != definedTask.propertyDefinitionHash) {
throw IncompatibleTaskException("Task ${taskDefinition.name} has already been defined, and the parameters are incompatible.") throw IncompatibleTaskException(
"Task ${taskDefinition.name} has already been defined, and the parameters are incompatible."
)
} }
return return
} }
taskDefinitionRepository.save(taskDefinition) taskDefinitionRepository.save(taskDefinition)
logger.info("Register a new task. name: {}",taskDefinition.name) logger.info("Register a new task. name: {}", taskDefinition.name)
} }
// todo すでにpublish済みのタスクをどうするか決めさせる // todo すでにpublish済みのタスクをどうするか決めさせる
override suspend fun unregisterTask(name: String) { override suspend fun unregisterTask(name: String) {
taskDefinitionRepository.deleteByName(name) taskDefinitionRepository.deleteByName(name)
logger.info("Unregister a task. name: {}",name) logger.info("Unregister a task. name: {}", name)
} }
companion object{ companion object {
private val logger = LoggerFactory.getLogger(RegisterTaskServiceImpl::class.java) private val logger = LoggerFactory.getLogger(RegisterTaskServiceImpl::class.java)
} }
} }

View File

@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
interface TaskManagementService { interface TaskManagementService {
suspend fun startManagement(coroutineScope: CoroutineScope) suspend fun startManagement(coroutineScope: CoroutineScope)
@ -75,13 +74,11 @@ class TaskManagementServiceImpl(
} }
} }
override fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> { override fun findAssignableTask(consumerId: UUID, numberOfConcurrent: Int): Flow<QueuedTask> {
return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent) return assignQueuedTaskDecider.findAssignableQueue(consumerId, numberOfConcurrent)
} }
private suspend fun enqueueTask(task: Task): QueuedTask { private suspend fun enqueueTask(task: Task): QueuedTask {
val definedTask = taskDefinitionRepository.findByName(task.name) val definedTask = taskDefinitionRepository.findByName(task.name)
?: throw TaskNotRegisterException("Task ${task.name} not definition.") ?: throw TaskNotRegisterException("Task ${task.name} not definition.")
@ -113,7 +110,6 @@ class TaskManagementServiceImpl(
queueStore.dequeue(timeoutQueue) queueStore.dequeue(timeoutQueue)
val task = taskRepository.findById(timeoutQueue.task.id) val task = taskRepository.findById(timeoutQueue.task.id)
?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}") ?: throw RecordNotFoundException("Task not found. id: ${timeoutQueue.task.id}")
val copy = task.copy(attempt = timeoutQueue.attempt) val copy = task.copy(attempt = timeoutQueue.attempt)
@ -148,12 +144,10 @@ class TaskManagementServiceImpl(
taskResult.taskId, taskResult.taskId,
task.copy(completedAt = completedAt, attempt = taskResult.attempt) task.copy(completedAt = completedAt, attempt = taskResult.attempt)
) )
} }
override fun subscribeResult(producerId: UUID): Flow<TaskResults> { override fun subscribeResult(producerId: UUID): Flow<TaskResults> {
return flow { return flow {
while (currentCoroutineContext().isActive) { while (currentCoroutineContext().isActive) {
taskRepository taskRepository
.findByPublishProducerIdAndCompletedAtIsNotNull(producerId) .findByPublishProducerIdAndCompletedAtIsNotNull(producerId)
@ -163,7 +157,7 @@ class TaskManagementServiceImpl(
TaskResults( TaskResults(
it.name, it.name,
it.id, it.id,
results.any { it.success }, results.any { taskResult -> taskResult.success },
it.attempt, it.attempt,
results results
) )
@ -171,9 +165,7 @@ class TaskManagementServiceImpl(
} }
delay(500) delay(500)
} }
} }
} }
companion object { companion object {

View File

@ -78,7 +78,6 @@ class TaskPublishServiceImpl(
} }
override suspend fun publishTasks(list: List<PublishTask>): List<PublishedTask> { override suspend fun publishTasks(list: List<PublishTask>): List<PublishedTask> {
val first = list.first() val first = list.first()
val definition = taskDefinitionRepository.findByName(first.name) val definition = taskDefinitionRepository.findByName(first.name)
@ -90,14 +89,14 @@ class TaskPublishServiceImpl(
val tasks = list.map { val tasks = list.map {
Task( Task(
it.name, name = it.name,
UUID.randomUUID(), id = UUID.randomUUID(),
first.producerId, publishProducerId = first.producerId,
published, publishedAt = published,
nextRetry, nextRetry = nextRetry,
null, completedAt = null,
0, attempt = 0,
it.properties properties = it.properties
) )
} }

View File

@ -20,9 +20,9 @@ import dev.usbharu.owl.broker.domain.model.taskresult.TaskResult
import java.util.* import java.util.*
data class TaskResults( data class TaskResults(
val name:String, val name: String,
val id:UUID, val id: UUID,
val success:Boolean, val success: Boolean,
val attempt:Int, val attempt: Int,
val results: List<TaskResult> val results: List<TaskResult>
) )

View File

@ -45,6 +45,5 @@ class ObjectPropertySerializer(private val objectMapper: ObjectMapper) : Propert
Class.forName(string.substringAfter("jackson:").substringBefore(":")) Class.forName(string.substringAfter("jackson:").substringBefore(":"))
) )
) )
} }
} }

View File

@ -23,4 +23,7 @@ val Class<*>.allFields: List<Field>
superclass.allFields + declaredFields superclass.allFields + declaredFields
} else { } else {
declaredFields.toList() declaredFields.toList()
}.map { it.trySetAccessible();it } }.map {
it.trySetAccessible()
it
}

View File

@ -15,19 +15,12 @@ class BooleanPropertyValue(override val value: Boolean) : PropertyValue<Boolean>
* *
*/ */
class BooleanPropertySerializer : PropertySerializer<Boolean> { class BooleanPropertySerializer : PropertySerializer<Boolean> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean { override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Boolean
return propertyValue.value is Boolean
}
override fun isSupported(string: String): Boolean { override fun isSupported(string: String): Boolean = string.startsWith("bool:")
return string.startsWith("bool:")
}
override fun serialize(propertyValue: PropertyValue<*>): String { override fun serialize(propertyValue: PropertyValue<*>): String = "bool:" + propertyValue.value.toString()
return "bool:" + propertyValue.value.toString()
}
override fun deserialize(string: String): PropertyValue<Boolean> { override fun deserialize(string: String): PropertyValue<Boolean> =
return BooleanPropertyValue(string.replace("bool:", "").toBoolean()) BooleanPropertyValue(string.replace("bool:", "").toBoolean())
}
} }

View File

@ -23,12 +23,9 @@ package dev.usbharu.owl.common.property
*/ */
open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) : open class CustomPropertySerializerFactory(private val propertySerializers: Set<PropertySerializer<*>>) :
PropertySerializerFactory { PropertySerializerFactory {
override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> { override fun <T> factory(propertyValue: PropertyValue<T>): PropertySerializer<T> =
return propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer<T>? propertySerializers.firstOrNull { it.isSupported(propertyValue) } as PropertySerializer<T>?
?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue") ?: throw IllegalArgumentException("PropertySerializer not found: $propertyValue")
}
override fun factory(string: String): PropertySerializer<*> { override fun factory(string: String): PropertySerializer<*> = propertySerializers.first { it.isSupported(string) }
return propertySerializers.first { it.isSupported(string) }
}
} }

View File

@ -15,19 +15,12 @@ class DoublePropertyValue(override val value: Double) : PropertyValue<Double>()
* *
*/ */
class DoublePropertySerializer : PropertySerializer<Double> { class DoublePropertySerializer : PropertySerializer<Double> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean { override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Double
return propertyValue.value is Double
}
override fun isSupported(string: String): Boolean { override fun isSupported(string: String): Boolean = string.startsWith("double:")
return string.startsWith("double:")
}
override fun serialize(propertyValue: PropertyValue<*>): String { override fun serialize(propertyValue: PropertyValue<*>): String = "double:" + propertyValue.value.toString()
return "double:" + propertyValue.value.toString()
}
override fun deserialize(string: String): PropertyValue<Double> { override fun deserialize(string: String): PropertyValue<Double> =
return DoublePropertyValue(string.replace("double:", "").toDouble()) DoublePropertyValue(string.replace("double:", "").toDouble())
}
} }

View File

@ -26,19 +26,12 @@ class FloatPropertyValue(override val value: Float) : PropertyValue<Float>() {
* *
*/ */
class FloatPropertySerializer : PropertySerializer<Float> { class FloatPropertySerializer : PropertySerializer<Float> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean { override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Float
return propertyValue.value is Float
}
override fun isSupported(string: String): Boolean { override fun isSupported(string: String): Boolean = string.startsWith("float:")
return string.startsWith("float:")
}
override fun serialize(propertyValue: PropertyValue<*>): String { override fun serialize(propertyValue: PropertyValue<*>): String = "float:" + propertyValue.value.toString()
return "float:" + propertyValue.value.toString()
}
override fun deserialize(string: String): PropertyValue<Float> { override fun deserialize(string: String): PropertyValue<Float> =
return FloatPropertyValue(string.replace("float:", "").toFloat()) FloatPropertyValue(string.replace("float:", "").toFloat())
}
} }

View File

@ -31,19 +31,12 @@ class IntegerPropertyValue(override val value: Int) : PropertyValue<Int>() {
* *
*/ */
class IntegerPropertySerializer : PropertySerializer<Int> { class IntegerPropertySerializer : PropertySerializer<Int> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean { override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Int
return propertyValue.value is Int
}
override fun isSupported(string: String): Boolean { override fun isSupported(string: String): Boolean = string.startsWith("int32:")
return string.startsWith("int32:")
}
override fun serialize(propertyValue: PropertyValue<*>): String { override fun serialize(propertyValue: PropertyValue<*>): String = "int32:" + propertyValue.value.toString()
return "int32:" + propertyValue.value.toString()
}
override fun deserialize(string: String): PropertyValue<Int> { override fun deserialize(string: String): PropertyValue<Int> =
return IntegerPropertyValue(string.replace("int32:", "").toInt()) IntegerPropertyValue(string.replace("int32:", "").toInt())
}
} }

View File

@ -27,19 +27,12 @@ class LongPropertyValue(override val value: Long) : PropertyValue<Long>() {
* *
*/ */
class LongPropertySerializer : PropertySerializer<Long> { class LongPropertySerializer : PropertySerializer<Long> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean { override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is Long
return propertyValue.value is Long
}
override fun isSupported(string: String): Boolean { override fun isSupported(string: String): Boolean = string.startsWith("int64:")
return string.startsWith("int64:")
}
override fun serialize(propertyValue: PropertyValue<*>): String { override fun serialize(propertyValue: PropertyValue<*>): String = "int64:" + propertyValue.value.toString()
return "int64:" + propertyValue.value.toString()
}
override fun deserialize(string: String): PropertyValue<Long> { override fun deserialize(string: String): PropertyValue<Long> =
return LongPropertyValue(string.replace("int64:", "").toLong()) LongPropertyValue(string.replace("int64:", "").toLong())
}
} }

View File

@ -31,9 +31,5 @@ abstract class PropertyValue<T> {
* プロパティの型 * プロパティの型
*/ */
abstract val type: PropertyType abstract val type: PropertyType
override fun toString(): String { override fun toString(): String = "PropertyValue(value=$value, type=$type)"
return "PropertyValue(value=$value, type=$type)"
}
} }

View File

@ -15,19 +15,11 @@ class StringPropertyValue(override val value: String) : PropertyValue<String>()
* *
*/ */
class StringPropertyValueSerializer : PropertySerializer<String> { class StringPropertyValueSerializer : PropertySerializer<String> {
override fun isSupported(propertyValue: PropertyValue<*>): Boolean { override fun isSupported(propertyValue: PropertyValue<*>): Boolean = propertyValue.value is String
return propertyValue.value is String
}
override fun isSupported(string: String): Boolean { override fun isSupported(string: String): Boolean = string.startsWith("str:")
return string.startsWith("str:")
}
override fun serialize(propertyValue: PropertyValue<*>): String { override fun serialize(propertyValue: PropertyValue<*>): String = "str:" + propertyValue.value
return "str:" + propertyValue.value
}
override fun deserialize(string: String): PropertyValue<String> { override fun deserialize(string: String): PropertyValue<String> = StringPropertyValue(string.replace("str:", ""))
return StringPropertyValue(string.replace("str:", ""))
}
} }

View File

@ -13,5 +13,4 @@ import kotlin.math.roundToLong
class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy { class ExponentialRetryPolicy(private val firstRetrySeconds: Int = 30) : RetryPolicy {
override fun nextRetry(now: Instant, attempt: Int): Instant = override fun nextRetry(now: Instant, attempt: Int): Instant =
now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds) now.plusSeconds(firstRetrySeconds.times((2.0).pow(attempt).roundToLong()) - firstRetrySeconds)
} }

View File

@ -16,7 +16,6 @@
package dev.usbharu.owl.common.retry package dev.usbharu.owl.common.retry
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
interface RetryPolicyFactory { interface RetryPolicyFactory {
@ -24,9 +23,7 @@ interface RetryPolicyFactory {
} }
class DefaultRetryPolicyFactory(private val map: Map<String, RetryPolicy>) : RetryPolicyFactory { class DefaultRetryPolicyFactory(private val map: Map<String, RetryPolicy>) : RetryPolicyFactory {
override fun factory(name: String): RetryPolicy { override fun factory(name: String): RetryPolicy = map[name] ?: throwException(name)
return map[name] ?: throwException(name)
}
private fun throwException(name: String): Nothing { private fun throwException(name: String): Nothing {
logger.warn("RetryPolicy not found. name: {}", name) logger.warn("RetryPolicy not found. name: {}", name)

View File

@ -36,6 +36,4 @@ class PropertyDefinition(val map: Map<String, PropertyType>) : Map<String, Prope
map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 } map.map { it.key + it.value.name }.joinToString("").map { hash *= it.code * 31 }
return hash return hash
} }
} }

View File

@ -107,7 +107,6 @@ interface TaskDefinition<T : Task> {
* @return デシリアライズされたタスク * @return デシリアライズされたタスク
*/ */
fun deserialize(value: Map<String, PropertyValue<*>>): T { fun deserialize(value: Map<String, PropertyValue<*>>): T {
val task = try { val task = try {
type.getDeclaredConstructor().newInstance() type.getDeclaredConstructor().newInstance()
} catch (e: Exception) { } catch (e: Exception) {

View File

@ -29,5 +29,4 @@ abstract class AbstractTaskRunner<T : Task, D : TaskDefinition<T>>(private val t
} }
abstract suspend fun typedRun(typedParam: T, taskRequest: TaskRequest): TaskResult abstract suspend fun typedRun(typedParam: T, taskRequest: TaskRequest): TaskResult
} }

View File

@ -66,11 +66,13 @@ class Consumer(
suspend fun init(name: String, hostname: String) { suspend fun init(name: String, hostname: String) {
logger.info("Initialize Consumer name: {} hostname: {}", name, hostname) logger.info("Initialize Consumer name: {} hostname: {}", name, hostname)
logger.debug("Registered Tasks: {}", runnerMap.keys) logger.debug("Registered Tasks: {}", runnerMap.keys)
consumerId = subscribeTaskStub.subscribeTask(subscribeTaskRequest { consumerId = subscribeTaskStub.subscribeTask(
this.name = name subscribeTaskRequest {
this.hostname = hostname this.name = name
this.tasks.addAll(runnerMap.keys) this.hostname = hostname
}).id this.tasks.addAll(runnerMap.keys)
}
).id
logger.info("Success initialize consumer. ConsumerID: {}", consumerId) logger.info("Success initialize consumer. ConsumerID: {}", consumerId)
} }
@ -84,78 +86,92 @@ class Consumer(
while (isActive) { while (isActive) {
try { try {
taskResultStub taskResultStub
.tasKResult(flow { .tasKResult(
assignmentTaskStub flow {
.ready(flow { assignmentTaskStub
requestTask() .ready(
}).onEach { flow {
logger.info("Start Task name: {} id: {}", it.name, it.id) requestTask()
processing.update { it + 1 } }
).onEach {
logger.info("Start Task name: {} id: {}", it.name, it.id)
processing.update { it + 1 }
try { try {
val taskResult = runnerMap.getValue(it.name).run( val taskResult = runnerMap.getValue(it.name).run(
TaskRequest( TaskRequest(
it.name,
java.util.UUID(
it.id.mostSignificantUuidBits,
it.id.leastSignificantUuidBits
),
it.attempt,
Instant.ofEpochSecond(
it.queuedAt.seconds,
it.queuedAt.nanos.toLong()
),
PropertySerializeUtils.deserialize(
propertySerializerFactory,
it.propertiesMap
)
)
)
emit(
taskResult {
this.success = taskResult.success
this.attempt = it.attempt
this.id = it.id
this.result.putAll(
PropertySerializeUtils.serialize(
propertySerializerFactory,
taskResult.result
)
)
this.message = taskResult.message
}
)
logger.info(
"Success execute task. name: {} success: {}",
it.name, it.name,
java.util.UUID( taskResult.success
it.id.mostSignificantUuidBits,
it.id.leastSignificantUuidBits
),
it.attempt,
Instant.ofEpochSecond(it.queuedAt.seconds, it.queuedAt.nanos.toLong()),
PropertySerializeUtils.deserialize(
propertySerializerFactory,
it.propertiesMap
)
) )
) logger.debug("TRACE RESULT {}", taskResult)
} catch (e: CancellationException) {
emit(taskResult { logger.warn("Cancelled execute task.", e)
this.success = taskResult.success emit(
this.attempt = it.attempt taskResult {
this.id = it.id this.success = false
this.result.putAll( this.attempt = it.attempt
PropertySerializeUtils.serialize( this.id = it.id
propertySerializerFactory, taskResult.result this.message = e.localizedMessage
) }
) )
this.message = taskResult.message throw e
}) } catch (e: Exception) {
logger.info( logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e)
"Success execute task. name: {} success: {}", emit(
it.name, taskResult {
taskResult.success this.success = false
) this.attempt = it.attempt
logger.debug("TRACE RESULT {}", taskResult) this.id = it.id
} catch (e: CancellationException) { this.message = e.localizedMessage
logger.warn("Cancelled execute task.", e) }
emit(taskResult { )
this.success = false } finally {
this.attempt = it.attempt logger.debug(" Task name: {} id: {}", it.name, it.id)
this.id = it.id processing.update { it - 1 }
this.message = e.localizedMessage concurrent.update {
}) if (it < 64) {
throw e it + 1
} catch (e: Exception) { } else {
logger.warn("Failed execute task. name: {} id: {}", it.name, it.id, e) 64
emit(taskResult { }
this.success = false
this.attempt = it.attempt
this.id = it.id
this.message = e.localizedMessage
})
} finally {
logger.debug(" Task name: {} id: {}", it.name, it.id)
processing.update { it - 1 }
concurrent.update {
if (it < 64) {
it + 1
} else {
64
} }
} }
} }.flowOn(Dispatchers.Default).collect()
}.flowOn(Dispatchers.Default).collect() }
}) )
} catch (e: CancellationException) { } catch (e: CancellationException) {
throw e throw e
} catch (e: Exception) { } catch (e: Exception) {
@ -171,14 +187,15 @@ class Consumer(
while (coroutineScope.isActive) { while (coroutineScope.isActive) {
val andSet = concurrent.getAndUpdate { 0 } val andSet = concurrent.getAndUpdate { 0 }
if (andSet != 0) { if (andSet != 0) {
logger.debug("Request {} tasks.", andSet) logger.debug("Request {} tasks.", andSet)
try { try {
emit(readyRequest { emit(
this.consumerId = this@Consumer.consumerId readyRequest {
this.numberOfConcurrent = andSet this.consumerId = this@Consumer.consumerId
}) this.numberOfConcurrent = andSet
}
)
} catch (e: CancellationException) { } catch (e: CancellationException) {
throw e throw e
} catch (e: Exception) { } catch (e: Exception) {

View File

@ -25,5 +25,4 @@ fun main() {
standaloneConsumer.init() standaloneConsumer.init()
standaloneConsumer.start() standaloneConsumer.start()
} }
} }

View File

@ -90,9 +90,11 @@ class StandaloneConsumer(
*/ */
suspend fun start() { suspend fun start() {
consumer.start() consumer.start()
Runtime.getRuntime().addShutdownHook(Thread { Runtime.getRuntime().addShutdownHook(
consumer.stop() Thread {
}) consumer.stop()
}
)
} }
/** /**
@ -102,5 +104,4 @@ class StandaloneConsumer(
fun stop() { fun stop() {
consumer.stop() consumer.stop()
} }
} }

View File

@ -30,9 +30,9 @@ import java.util.*
* @property properties タスクに渡されたパラメータ * @property properties タスクに渡されたパラメータ
*/ */
data class TaskRequest( data class TaskRequest(
val name:String, val name: String,
val id:UUID, val id: UUID,
val attempt:Int, val attempt: Int,
val queuedAt: Instant, val queuedAt: Instant,
val properties:Map<String,PropertyValue<*>> val properties: Map<String, PropertyValue<*>>
) )

View File

@ -36,10 +36,12 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce
override suspend fun start() { override suspend fun start() {
producerServiceCoroutineStub = producerServiceCoroutineStub =
ProducerServiceGrpcKt.ProducerServiceCoroutineStub(defaultOwlProducerConfig.channel) ProducerServiceGrpcKt.ProducerServiceCoroutineStub(defaultOwlProducerConfig.channel)
producerId = producerServiceCoroutineStub.registerProducer(producer { producerId = producerServiceCoroutineStub.registerProducer(
this.name = defaultOwlProducerConfig.name producer {
this.hostname = defaultOwlProducerConfig.hostname this.name = defaultOwlProducerConfig.name
}).id this.hostname = defaultOwlProducerConfig.hostname
}
).id
defineTaskServiceCoroutineStub = defineTaskServiceCoroutineStub =
DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub(defaultOwlProducerConfig.channel) DefinitionTaskServiceGrpcKt.DefinitionTaskServiceCoroutineStub(defaultOwlProducerConfig.channel)
@ -48,17 +50,18 @@ class DefaultOwlProducer(private val defaultOwlProducerConfig: DefaultOwlProduce
TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub(defaultOwlProducerConfig.channel) TaskPublishServiceGrpcKt.TaskPublishServiceCoroutineStub(defaultOwlProducerConfig.channel)
} }
override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) { override suspend fun <T : Task> registerTask(taskDefinition: TaskDefinition<T>) {
defineTaskServiceCoroutineStub.register(taskDefinition { defineTaskServiceCoroutineStub.register(
this.producerId = this@DefaultOwlProducer.producerId taskDefinition {
this.name = taskDefinition.name this.producerId = this@DefaultOwlProducer.producerId
this.maxRetry = taskDefinition.maxRetry this.name = taskDefinition.name
this.priority = taskDefinition.priority this.maxRetry = taskDefinition.maxRetry
this.retryPolicy = taskDefinition.retryPolicy this.priority = taskDefinition.priority
this.timeoutMilli = taskDefinition.timeoutMilli this.retryPolicy = taskDefinition.retryPolicy
this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash() this.timeoutMilli = taskDefinition.timeoutMilli
}) this.propertyDefinitionHash = taskDefinition.propertyDefinition.hash()
}
)
} }
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> { override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {

View File

@ -96,7 +96,6 @@ class EmbeddedOwlProducer(
} }
override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> { override suspend fun <T : Task> publishTask(task: T): PublishedTask<T> {
val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition<T> val taskDefinition = taskMap.getValue(task::class.java) as TaskDefinition<T>
val publishTask = application.get<TaskPublishService>().publishTask( val publishTask = application.get<TaskPublishService>().publishTask(

View File

@ -46,7 +46,6 @@ class EmbeddedOwlProducerBuilder : OwlProducerBuilder<EmbeddedOwlProducer, Embed
override fun apply(owlProducerConfig: EmbeddedOwlProducerConfig) { override fun apply(owlProducerConfig: EmbeddedOwlProducerConfig) {
this.config = owlProducerConfig this.config = owlProducerConfig
} }
} }
val EMBEDDED by lazy { EmbeddedOwlProducerBuilder() } val EMBEDDED by lazy { EmbeddedOwlProducerBuilder() }