Merge pull request #59 from usbharu/feature/kjob-mongodb

feat: KJobがMongoDBでも動くように
This commit is contained in:
usbharu 2023-09-27 14:53:52 +09:00 committed by GitHub
commit 1f646fbff7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 230 deletions

View File

@ -128,6 +128,7 @@ dependencies {
implementation("org.drewcarlson:kjob-core:0.6.0") implementation("org.drewcarlson:kjob-core:0.6.0")
implementation("org.drewcarlson:kjob-mongo:0.6.0")
testImplementation("org.slf4j:slf4j-simple:2.0.7") testImplementation("org.slf4j:slf4j-simple:2.0.7")
detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:1.22.0") detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:1.22.0")

View File

@ -37,7 +37,6 @@ class MastodonAccountApiController(
reason: String? reason: String?
): ResponseEntity<Unit> = runBlocking { ): ResponseEntity<Unit> = runBlocking {
transaction.transaction { transaction.transaction {
accountApiService.registerAccount(UserCreateDto(username, username, "", password)) accountApiService.registerAccount(UserCreateDto(username, username, "", password))
} }
val httpHeaders = HttpHeaders() val httpHeaders = HttpHeaders()

View File

@ -37,7 +37,7 @@ class APReactionServiceImpl(
@Qualifier("activitypub") private val objectMapper: ObjectMapper, @Qualifier("activitypub") private val objectMapper: ObjectMapper,
private val applicationConfig: ApplicationConfig private val applicationConfig: ApplicationConfig
) : APReactionService { ) : APReactionService {
override suspend fun reaction(like: Reaction) { override suspend fun reaction(like: Reaction) {
val followers = followerQueryService.findFollowersById(like.userId) val followers = followerQueryService.findFollowersById(like.userId)
val user = userQueryService.findById(like.userId) val user = userQueryService.findById(like.userId)

View File

@ -7,9 +7,11 @@ import kjob.core.dsl.ScheduleContext
import kjob.core.kjob import kjob.core.kjob
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service
@ConditionalOnProperty(name = ["hideout.job-queue.type"], havingValue = "rdb")
class KJobJobQueueParentService(private val database: Database) : JobQueueParentService { class KJobJobQueueParentService(private val database: Database) : JobQueueParentService {
private val logger = LoggerFactory.getLogger(this::class.java) private val logger = LoggerFactory.getLogger(this::class.java)

View File

@ -5,11 +5,13 @@ import kjob.core.dsl.JobRegisterContext
import kjob.core.dsl.KJobFunctions import kjob.core.dsl.KJobFunctions
import kjob.core.kjob import kjob.core.kjob
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import dev.usbharu.hideout.domain.model.job.HideoutJob as HJ import dev.usbharu.hideout.domain.model.job.HideoutJob as HJ
import kjob.core.dsl.JobContextWithProps as JCWP import kjob.core.dsl.JobContextWithProps as JCWP
@Service @Service
@ConditionalOnProperty(name = ["hideout.job-queue.type"], havingValue = "rdb", matchIfMissing = true)
class KJobJobQueueWorkerService(private val database: Database) : JobQueueWorkerService { class KJobJobQueueWorkerService(private val database: Database) : JobQueueWorkerService {
val kjob by lazy { val kjob by lazy {
@ -17,7 +19,7 @@ class KJobJobQueueWorkerService(private val database: Database) : JobQueueWorker
connectionDatabase = database connectionDatabase = database
nonBlockingMaxJobs = 10 nonBlockingMaxJobs = 10
blockingMaxJobs = 10 blockingMaxJobs = 10
jobExecutionPeriodInSeconds = 10 jobExecutionPeriodInSeconds = 1
}.start() }.start()
} }

View File

@ -0,0 +1,31 @@
package dev.usbharu.hideout.service.job
import kjob.core.dsl.JobRegisterContext
import kjob.core.dsl.KJobFunctions
import kjob.core.kjob
import kjob.mongo.Mongo
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.stereotype.Service
import dev.usbharu.hideout.domain.model.job.HideoutJob as HJ
import kjob.core.dsl.JobContextWithProps as JCWP
@Service
@ConditionalOnProperty(name = ["hideout.job-queue.type"], havingValue = "nosql")
class KJobMongoJobQueueWorkerService : JobQueueWorkerService {
val kjob by lazy {
kjob(Mongo) {
connectionString = "mongodb://localhost"
nonBlockingMaxJobs = 10
blockingMaxJobs = 10
jobExecutionPeriodInSeconds = 1
}.start()
}
override fun init(
defines: List<Pair<HJ, JobRegisterContext<HJ, JCWP<HJ>>.(HJ) -> KJobFunctions<HJ, JCWP<HJ>>>>
) {
defines.forEach { job ->
kjob.register(job.first, job.second)
}
}
}

View File

@ -0,0 +1,27 @@
package dev.usbharu.hideout.service.job
import kjob.core.Job
import kjob.core.dsl.ScheduleContext
import kjob.core.kjob
import kjob.mongo.Mongo
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.stereotype.Service
@Service
@ConditionalOnProperty(name = ["hideout.job-queue.type"], havingValue = "nosql")
class KjobMongoJobQueueParentService : JobQueueParentService {
override fun init(jobDefines: List<Job>) = Unit
private val kjob = kjob(Mongo) {
connectionString = "mongodb://localhost"
databaseName = "kjob"
jobCollection = "kjob-jobs"
lockCollection = "kjob-locks"
expireLockInMinutes = 5L
isWorker = false
}.start()
override suspend fun <J : Job> schedule(job: J, block: ScheduleContext<J>.(J) -> Unit) {
kjob.schedule(job, block)
}
}

View File

@ -5,6 +5,8 @@ hideout:
driver: "org.h2.Driver" driver: "org.h2.Driver"
user: "" user: ""
password: "" password: ""
job-queue:
type: "nosql"
spring: spring:
jackson: jackson:
serialization: serialization:

View File

@ -1,227 +0,0 @@
@file:OptIn(ExperimentalCoroutinesApi::class)
package dev.usbharu.hideout.service.auth
import com.auth0.jwt.JWT
import com.auth0.jwt.algorithms.Algorithm
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import dev.usbharu.hideout.config.Config
import dev.usbharu.hideout.config.ConfigData
import dev.usbharu.hideout.domain.model.hideout.entity.Jwt
import dev.usbharu.hideout.domain.model.hideout.entity.JwtRefreshToken
import dev.usbharu.hideout.domain.model.hideout.entity.User
import dev.usbharu.hideout.domain.model.hideout.form.RefreshToken
import dev.usbharu.hideout.exception.InvalidRefreshTokenException
import dev.usbharu.hideout.query.JwtRefreshTokenQueryService
import dev.usbharu.hideout.query.UserQueryService
import dev.usbharu.hideout.repository.JwtRefreshTokenRepository
import dev.usbharu.hideout.service.core.MetaService
import dev.usbharu.hideout.util.Base64Util
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.doThrow
import org.mockito.kotlin.mock
import java.security.KeyPairGenerator
import java.security.interfaces.RSAPrivateKey
import java.security.interfaces.RSAPublicKey
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
class JwtServiceImplTest {
@Test
fun `createToken トークンを作成できる`() = runTest {
Config.configData = ConfigData(url = "https://example.com", objectMapper = jacksonObjectMapper())
val kid = UUID.randomUUID()
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(2048)
val generateKeyPair = keyPairGenerator.generateKeyPair()
val metaService = mock<MetaService> {
onBlocking { getJwtMeta() } doReturn Jwt(
kid,
Base64Util.encode(generateKeyPair.private.encoded),
Base64Util.encode(generateKeyPair.public.encoded)
)
}
val refreshTokenRepository = mock<JwtRefreshTokenRepository> {
onBlocking { generateId() } doReturn 1L
}
val jwtService = JwtServiceImpl(metaService, refreshTokenRepository, mock(), mock())
val token = jwtService.createToken(
User.of(
id = 1L,
name = "test",
domain = "example.com",
screenName = "testUser",
description = "",
password = "hashedPassword",
inbox = "https://example.com/inbox",
outbox = "https://example.com/outbox",
url = "https://example.com",
publicKey = "-----BEGIN PUBLIC KEY-----...-----END PUBLIC KEY-----",
privateKey = "-----BEGIN PUBLIC KEY-----...-----END PUBLIC KEY-----",
createdAt = Instant.now()
)
)
assertNotEquals("", token.token)
assertNotEquals("", token.refreshToken)
val verify = JWT.require(
Algorithm.RSA256(
generateKeyPair.public as RSAPublicKey,
generateKeyPair.private as RSAPrivateKey
)
)
.withAudience("https://example.com/users/test")
.withIssuer("https://example.com")
.acceptLeeway(3L)
.build()
.verify(token.token)
assertEquals(kid.toString(), verify.keyId)
}
@Test
fun `refreshToken リフレッシュトークンからトークンを作成できる`() = runTest {
Config.configData = ConfigData(url = "https://example.com", objectMapper = jacksonObjectMapper())
val kid = UUID.randomUUID()
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(2048)
val generateKeyPair = keyPairGenerator.generateKeyPair()
val refreshTokenRepository = mock<JwtRefreshTokenRepository> {
onBlocking { generateId() } doReturn 2L
}
val jwtRefreshTokenQueryService = mock<JwtRefreshTokenQueryService> {
onBlocking { findByToken("refreshToken") } doReturn JwtRefreshToken(
id = 1L,
userId = 1L,
refreshToken = "refreshToken",
createdAt = Instant.now().minus(60, ChronoUnit.MINUTES),
expiresAt = Instant.now().plus(14, ChronoUnit.DAYS).minus(60, ChronoUnit.MINUTES)
)
}
val userService = mock<UserQueryService> {
onBlocking { findById(1L) } doReturn User.of(
id = 1L,
name = "test",
domain = "example.com",
screenName = "testUser",
description = "",
password = "hashedPassword",
inbox = "https://example.com/inbox",
outbox = "https://example.com/outbox",
url = "https://example.com",
publicKey = "-----BEGIN PUBLIC KEY-----...-----BEGIN PUBLIC KEY-----",
privateKey = "-----BEGIN PRIVATE KEY-----...-----BEGIN PRIVATE KEY-----",
createdAt = Instant.now()
)
}
val metaService = mock<MetaService> {
onBlocking { getJwtMeta() } doReturn Jwt(
kid,
Base64Util.encode(generateKeyPair.private.encoded),
Base64Util.encode(generateKeyPair.public.encoded)
)
}
val jwtService = JwtServiceImpl(metaService, refreshTokenRepository, userService, jwtRefreshTokenQueryService)
val refreshToken = jwtService.refreshToken(RefreshToken("refreshToken"))
assertNotEquals("", refreshToken.token)
assertNotEquals("", refreshToken.refreshToken)
val verify = JWT.require(
Algorithm.RSA256(
generateKeyPair.public as RSAPublicKey,
generateKeyPair.private as RSAPrivateKey
)
)
.withAudience("https://example.com/users/test")
.withIssuer("https://example.com")
.acceptLeeway(3L)
.build()
.verify(refreshToken.token)
assertEquals(kid.toString(), verify.keyId)
}
@Test
fun `refreshToken 無効なリフレッシュトークンは失敗する`() = runTest {
val refreshTokenRepository = mock<JwtRefreshTokenQueryService> {
onBlocking { findByToken("InvalidRefreshToken") } doThrow NoSuchElementException()
}
val kid = UUID.randomUUID()
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(2048)
val generateKeyPair = keyPairGenerator.generateKeyPair()
val metaService = mock<MetaService> {
onBlocking { getJwtMeta() } doReturn Jwt(
kid,
Base64Util.encode(generateKeyPair.private.encoded),
Base64Util.encode(generateKeyPair.public.encoded)
)
}
val jwtService = JwtServiceImpl(metaService, mock(), mock(), refreshTokenRepository)
assertThrows<InvalidRefreshTokenException> { jwtService.refreshToken(RefreshToken("InvalidRefreshToken")) }
}
@Test
fun `refreshToken 未来に作成されたリフレッシュトークンは失敗する`() = runTest {
val refreshTokenRepository = mock<JwtRefreshTokenQueryService> {
onBlocking { findByToken("refreshToken") } doReturn JwtRefreshToken(
id = 1L,
userId = 1L,
refreshToken = "refreshToken",
createdAt = Instant.now().plus(10, ChronoUnit.MINUTES),
expiresAt = Instant.now().plus(10, ChronoUnit.MINUTES).plus(14, ChronoUnit.DAYS)
)
}
val kid = UUID.randomUUID()
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(2048)
val generateKeyPair = keyPairGenerator.generateKeyPair()
val metaService = mock<MetaService> {
onBlocking { getJwtMeta() } doReturn Jwt(
kid,
Base64Util.encode(generateKeyPair.private.encoded),
Base64Util.encode(generateKeyPair.public.encoded)
)
}
val jwtService = JwtServiceImpl(metaService, mock(), mock(), refreshTokenRepository)
assertThrows<InvalidRefreshTokenException> { jwtService.refreshToken(RefreshToken("refreshToken")) }
}
@Test
fun `refreshToken 期限切れのリフレッシュトークンでは失敗する`() = runTest {
val refreshTokenRepository = mock<JwtRefreshTokenQueryService> {
onBlocking { findByToken("refreshToken") } doReturn JwtRefreshToken(
id = 1L,
userId = 1L,
refreshToken = "refreshToken",
createdAt = Instant.now().minus(30, ChronoUnit.DAYS),
expiresAt = Instant.now().minus(16, ChronoUnit.DAYS)
)
}
val kid = UUID.randomUUID()
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(2048)
val generateKeyPair = keyPairGenerator.generateKeyPair()
val metaService = mock<MetaService> {
onBlocking { getJwtMeta() } doReturn Jwt(
kid,
Base64Util.encode(generateKeyPair.private.encoded),
Base64Util.encode(generateKeyPair.public.encoded)
)
}
val jwtService = JwtServiceImpl(metaService, mock(), mock(), refreshTokenRepository)
assertThrows<InvalidRefreshTokenException> { jwtService.refreshToken(RefreshToken("refreshToken")) }
}
}