Compare commits
5 Commits
master
...
feature/wo
| Author | SHA1 | Date |
|---|---|---|
|
|
5384dd3430 | |
|
|
03e0676693 | |
|
|
7b1a7cfc5e | |
|
|
7960e5abac | |
|
|
15fe61bde5 |
|
|
@ -199,3 +199,10 @@ Disables Spring OAuth2 resource server for testing.
|
|||
## s3-storage
|
||||
|
||||
Amazon S3 support.
|
||||
|
||||
# spring-workers-agent
|
||||
|
||||
Framework for clustered scheduling based on Quartz and Spring Data.
|
||||
Stores all data in a database (Postgres by default).
|
||||
|
||||
Used as manageable alternative for Spring's `@Scheduled`.
|
||||
|
|
|
|||
|
|
@ -51,3 +51,6 @@ include("security-resource-server-custom-jwt-configuration")
|
|||
include("security-resource-server-test-jwt-configuration")
|
||||
include("security-jwt-common")
|
||||
include("s3-storage")
|
||||
include("spring-workers-manager-core")
|
||||
include("spring-workers-manager-agent")
|
||||
include("spring-workers-manager-api")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,77 @@
|
|||
# Workers Agent
|
||||
|
||||
Framework for clustered scheduling based on Quartz and Spring Data.
|
||||
Stores all data in a database (Postgres by default).
|
||||
|
||||
Used as manageable alternative for Spring's `@Scheduled`.
|
||||
|
||||
## How to
|
||||
1. Add dependency
|
||||
```
|
||||
implementation project(":spring-workers-manager-agent")
|
||||
```
|
||||
1. Define a job by annotations to be scheduled in any component.
|
||||
```
|
||||
package ru.touchin
|
||||
|
||||
@Component
|
||||
class MyJob {
|
||||
|
||||
@ScheduledAction
|
||||
@InitTrigger(type = "CRON", expression = "0 15 * * * ?")
|
||||
fun sayHello(){
|
||||
println("Hello, world!")
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
1. Enable job in `application.properties`
|
||||
|
||||
```
|
||||
workers.names=ru.touchin.MyJob
|
||||
```
|
||||
or:
|
||||
```
|
||||
workers.names=*
|
||||
```
|
||||
|
||||
1. Start the application.
|
||||
|
||||
## Annotations
|
||||
### @ScheduledAction
|
||||
Registers method as action of some job.
|
||||
|
||||
Parameters:
|
||||
- `name` - name of job. Defaults to class full name.
|
||||
Must be unique in application scope.
|
||||
|
||||
### @Trigger
|
||||
Declares default trigger for the job.
|
||||
Default triggers are created when launching job first time.
|
||||
|
||||
Parameters:
|
||||
- `name` - Optional name for trigger. Defaults to some (maybe random) string.
|
||||
Name must be unique in scope of corresponding job.
|
||||
- `type` - Trigger type. See triggers types.
|
||||
SpEL expressions are supported like in `@Scheduled` annotation of Spring.
|
||||
- `expression` - The value for trigger.
|
||||
SpEL expressions are supported like in `@Scheduled` annotation of Spring.
|
||||
|
||||
## Configuration
|
||||
### Enabling workers
|
||||
|
||||
Agent ignores workers by default. To enable worker add its name to `worker.names` property.
|
||||
Example:
|
||||
```
|
||||
worker.names=com.eample.Job1,\
|
||||
com.example.Job2
|
||||
```
|
||||
|
||||
#### Patterns for names
|
||||
`workers.names` support Glob-like patterns.
|
||||
- Asterisk (`*`) symbol is for "zero or more any symbols" (as `.*` in regex)
|
||||
- Question mark (`?`) is for "any single symbol" (as `.` in regex)
|
||||
|
||||
## TODO
|
||||
- External data source, unrelated to application code.
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
plugins {
|
||||
id("kotlin")
|
||||
id("kotlin-spring")
|
||||
id("maven-publish")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(project(":common-spring"))
|
||||
implementation(project(":common-spring-jpa"))
|
||||
implementation(project(":spring-workers-manager-core"))
|
||||
|
||||
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
|
||||
|
||||
implementation("org.springframework.data:spring-data-jpa")
|
||||
implementation("org.springframework.boot:spring-boot-starter-quartz")
|
||||
|
||||
testImplementation(project(":common-spring-test-jpa"))
|
||||
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-test")
|
||||
testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin")
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
package ru.touchin.spring.workers.manager.agent
|
||||
|
||||
import org.springframework.boot.context.event.ApplicationStartedEvent
|
||||
import org.springframework.context.event.EventListener
|
||||
import org.springframework.core.annotation.Order
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.common.spring.Ordered
|
||||
import ru.touchin.spring.workers.manager.agent.config.WorkerInitializer
|
||||
import ru.touchin.spring.workers.manager.agent.scheduled.WorkerManagerWatcher
|
||||
import ru.touchin.spring.workers.manager.core.config.LiquibaseRunner
|
||||
|
||||
/**
|
||||
* Prepares required resources and initializes agent.
|
||||
*/
|
||||
@Component
|
||||
class AgentInitializer(
|
||||
private val liquibase: LiquibaseRunner,
|
||||
private val workerInitializer: WorkerInitializer,
|
||||
private val workerWatcher: WorkerManagerWatcher
|
||||
) {
|
||||
|
||||
@EventListener(value = [ApplicationStartedEvent::class])
|
||||
@Order(Ordered.HIGH)
|
||||
fun execute() {
|
||||
liquibase.run()
|
||||
workerInitializer.init()
|
||||
workerWatcher.init()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,80 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config
|
||||
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.spring.workers.manager.agent.annotation_config.job_factory.AnnotationConfigJobFactory
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import java.lang.reflect.Method
|
||||
|
||||
/**
|
||||
* 1. Scans components for [ScheduledAction] annotation
|
||||
* 2. Keeps metadata of that components
|
||||
* 3. Creates [BaseJob] for methods created
|
||||
*/
|
||||
@Component
|
||||
class AnnotationConfigCollectingBeanPostProcessor(
|
||||
private val jobFactories: List<AnnotationConfigJobFactory>
|
||||
) : BeanPostProcessor {
|
||||
|
||||
val jobs: MutableList<BaseJob> = ArrayList()
|
||||
|
||||
val jobName2Method: MutableMap<String, Method> = HashMap()
|
||||
|
||||
/**
|
||||
* Bean name -> class of this bean.
|
||||
*
|
||||
* Contains only entries for classes with [ScheduledAction] annotation.
|
||||
*/
|
||||
private val beanName2OriginalClass: MutableMap<String, Class<*>> = HashMap()
|
||||
|
||||
override fun postProcessBeforeInitialization(bean: Any, beanName: String): Any? {
|
||||
val hasMethodsForScheduling = bean.javaClass.declaredMethods
|
||||
.any { it.isAnnotationPresent(ScheduledAction::class.java) }
|
||||
|
||||
if (hasMethodsForScheduling) {
|
||||
beanName2OriginalClass[beanName] = bean.javaClass
|
||||
}
|
||||
|
||||
return bean
|
||||
}
|
||||
|
||||
override fun postProcessAfterInitialization(bean: Any, beanName: String): Any? {
|
||||
val clazz = beanName2OriginalClass[beanName]
|
||||
?: return bean
|
||||
|
||||
val actionMethod = findActionMethod(clazz)
|
||||
|
||||
val createdJobs = jobFactories.flatMap { it.create(bean, actionMethod) }
|
||||
|
||||
createdJobs.forEach {
|
||||
jobName2Method[it.getName()] = actionMethod
|
||||
}
|
||||
|
||||
jobs.addAll(createdJobs)
|
||||
|
||||
return bean
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
private fun findActionMethod(clazz: Class<*>): Method {
|
||||
return clazz.declaredMethods
|
||||
.filter { it.isAnnotationPresent(ScheduledAction::class.java) }
|
||||
.also { annotatedMethods ->
|
||||
check(annotatedMethods.size <= 1) {
|
||||
"Class `${clazz.name}` has more that one methods with annotation @Scheduled. " +
|
||||
"Methods: $annotatedMethods"
|
||||
}
|
||||
}
|
||||
.onEach { annotatedMethod ->
|
||||
check(annotatedMethod.parameters.isEmpty()) {
|
||||
"Method ${clazz.name}:${annotatedMethod.name}' must not have arguments for scheduling, " +
|
||||
"but requires ${annotatedMethod.parameters.size} parameters"
|
||||
}
|
||||
}
|
||||
.single()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config
|
||||
|
||||
import org.springframework.stereotype.Component
|
||||
import org.springframework.util.LinkedMultiValueMap
|
||||
import org.springframework.util.MultiValueMap
|
||||
import ru.touchin.spring.workers.manager.agent.annotation_config.trigger_factory.AnnotationConfigTriggerFactory
|
||||
import ru.touchin.spring.workers.manager.agent.trigger.InitialTriggerDescriptorsProvider
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
|
||||
|
||||
@Component
|
||||
class AnnotationConfigInitialTriggerDescriptorsProvider(
|
||||
private val triggersCollector: AnnotationConfigCollectingBeanPostProcessor,
|
||||
private val triggerFactories: List<AnnotationConfigTriggerFactory>
|
||||
) : InitialTriggerDescriptorsProvider {
|
||||
|
||||
val jobName2Triggers: MultiValueMap<String, CreateTriggerDescriptor> = LinkedMultiValueMap()
|
||||
|
||||
override fun applicableFor(worker: Worker): Boolean {
|
||||
val actionMethod = triggersCollector.jobName2Method[worker.name]
|
||||
?: return false
|
||||
|
||||
val triggers = triggerFactories.flatMap { it.create(worker, actionMethod) }
|
||||
|
||||
if (triggers.isEmpty()) {
|
||||
return false
|
||||
}
|
||||
|
||||
jobName2Triggers.addAll(worker.name, triggers)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun createInitialTriggerDescriptors(worker: Worker): List<CreateTriggerDescriptor> {
|
||||
return jobName2Triggers[worker.name].orEmpty()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config
|
||||
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobProvider
|
||||
|
||||
@Component
|
||||
class AnnotationConfigJobProvider(
|
||||
private val jobsCollector: AnnotationConfigCollectingBeanPostProcessor
|
||||
) : JobProvider {
|
||||
|
||||
override fun getJobs(): List<BaseJob> = jobsCollector.jobs
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config
|
||||
|
||||
import java.lang.annotation.Inherited
|
||||
|
||||
/**
|
||||
* Adds default trigger to [ScheduledAction].
|
||||
* Default trigger is submitted if job is launched first time.
|
||||
*/
|
||||
@Inherited
|
||||
@Target(AnnotationTarget.FUNCTION)
|
||||
annotation class InitTrigger(
|
||||
val name: String = "",
|
||||
val type: String,
|
||||
val expression: String
|
||||
)
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config
|
||||
|
||||
@Target(AnnotationTarget.FUNCTION)
|
||||
annotation class ScheduledAction(
|
||||
/**
|
||||
* Job name. Defaults to class name.
|
||||
*/
|
||||
val name: String = ""
|
||||
)
|
||||
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config.job_factory
|
||||
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import java.lang.reflect.Method
|
||||
|
||||
/**
|
||||
* Invoked when found method with [ScheduledAction] annotation in some bean.
|
||||
*
|
||||
* Used to read jobs settings from annotations in components.
|
||||
*/
|
||||
interface AnnotationConfigJobFactory {
|
||||
|
||||
/**
|
||||
* Warning: As Spring could substitute actual beans with proxy-objects,
|
||||
* you must carefully check if [actionMethod] is applicable for [bean].
|
||||
*/
|
||||
fun create(bean: Any, actionMethod: Method): List<BaseJob>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config.job_factory
|
||||
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.spring.workers.manager.agent.annotation_config.ScheduledAction
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import java.lang.reflect.Method
|
||||
|
||||
/**
|
||||
* Creates job instances for every annotated action method.
|
||||
*/
|
||||
@Component
|
||||
class ScheduledActionAnnotationConfigJobFactory : AnnotationConfigJobFactory {
|
||||
|
||||
override fun create(bean: Any, actionMethod: Method): List<BaseJob> {
|
||||
val job = createJobForBean(bean, actionMethod)
|
||||
|
||||
return listOf(job)
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
private fun createJobForBean(bean: Any, annotatedMethod: Method): BaseJob {
|
||||
val targetMethod = bean.javaClass.getMethod(annotatedMethod)
|
||||
val annotation = annotatedMethod.getAnnotation(ScheduledAction::class.java)
|
||||
|
||||
val jobName: String = annotation.name.takeIf { it.isNotBlank() }
|
||||
?: annotatedMethod.declaringClass.name
|
||||
|
||||
return createJob(jobName) { targetMethod.invoke(bean) }
|
||||
}
|
||||
|
||||
private fun Class<*>.getMethod(sampleMethod: Method): Method {
|
||||
return getMethod(sampleMethod.name, *sampleMethod.parameterTypes)
|
||||
.apply { isAccessible = true }
|
||||
}
|
||||
|
||||
private fun createJob(jobName: String, func: () -> Unit): BaseJob = object : BaseJob {
|
||||
|
||||
override fun getName() = jobName
|
||||
|
||||
override fun run() {
|
||||
func.invoke()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config.trigger_factory
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import java.lang.reflect.Method
|
||||
|
||||
/**
|
||||
* Used to create initial triggers for new workers
|
||||
*/
|
||||
interface AnnotationConfigTriggerFactory {
|
||||
|
||||
fun create(worker: Worker, actionMethod: Method): List<CreateTriggerDescriptor>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
package ru.touchin.spring.workers.manager.agent.annotation_config.trigger_factory
|
||||
|
||||
import org.springframework.context.EmbeddedValueResolverAware
|
||||
import org.springframework.stereotype.Component
|
||||
import org.springframework.util.StringValueResolver
|
||||
import ru.touchin.spring.workers.manager.agent.annotation_config.InitTrigger
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import java.lang.reflect.Method
|
||||
|
||||
/**
|
||||
* Creates triggers for methods annotated with [ru.touchin.spring.workers.manager.agent.annotation_config.InitTrigger] annotation
|
||||
*/
|
||||
@Component
|
||||
class TriggerAnnotationConfigTriggerFactory
|
||||
: AnnotationConfigTriggerFactory,
|
||||
EmbeddedValueResolverAware {
|
||||
|
||||
lateinit var valueResolver: StringValueResolver
|
||||
|
||||
override fun setEmbeddedValueResolver(resolver: StringValueResolver) {
|
||||
valueResolver = resolver
|
||||
}
|
||||
|
||||
override fun create(worker: Worker, actionMethod: Method): List<CreateTriggerDescriptor> {
|
||||
val triggerAnnotation = actionMethod.getAnnotation(InitTrigger::class.java)
|
||||
?: return emptyList()
|
||||
|
||||
val resolvedType = valueResolver.resolveStringValue(triggerAnnotation.type)
|
||||
val triggerType = TriggerType.find(resolvedType)
|
||||
?: throw IllegalArgumentException("Trigger type for name $resolvedType dies not exist")
|
||||
|
||||
val expression = valueResolver.resolveStringValue(triggerAnnotation.expression)
|
||||
?: throw NullPointerException("Trigger for worker '${worker.name}' has null expression")
|
||||
|
||||
val trigger = CreateTriggerDescriptor(
|
||||
name = "${triggerType.name}_${expression.replace(" ", "_")}",
|
||||
type = triggerType,
|
||||
workerName = worker.name,
|
||||
expression = valueResolver.resolveStringValue(triggerAnnotation.expression)
|
||||
?: throw NullPointerException("Trigger for worker '${worker.name}' has null expression"),
|
||||
disabledAt = null,
|
||||
)
|
||||
|
||||
return listOf(trigger)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
package ru.touchin.spring.workers.manager.agent.common.base
|
||||
|
||||
interface BaseJob {
|
||||
|
||||
fun run()
|
||||
|
||||
fun getName(): String = this::class.java.name
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
package ru.touchin.spring.workers.manager.agent.common.utils
|
||||
|
||||
/**
|
||||
* Glob is simple replacement for Regex.
|
||||
* It uses only two special chars: * (any substring) and ? (any char)
|
||||
* This implementation has no support for characters escaping.
|
||||
* Glob has no capturing features.
|
||||
*/
|
||||
object Glob {
|
||||
|
||||
private const val ANY_SUBSTRING_SYMBOL = '*'
|
||||
|
||||
private const val ANY_CHAR_SYMBOL = '?'
|
||||
|
||||
/**
|
||||
* Example: "I love patterns" matches "I * pa?t?er?s"
|
||||
*
|
||||
* Based on [StackOverflow answer](https://stackoverflow.com/a/3687031)
|
||||
*/
|
||||
fun matches(text: String, pattern: String): Boolean {
|
||||
val starPosition = pattern.indexOf(ANY_SUBSTRING_SYMBOL)
|
||||
|
||||
val headPattern: String = if (starPosition == -1) pattern else pattern.substring(0, starPosition)
|
||||
|
||||
if (headPattern.length > text.length) {
|
||||
return false
|
||||
}
|
||||
|
||||
// handle the part up to the first *
|
||||
for (i in headPattern.indices) {
|
||||
if (
|
||||
headPattern[i] != ANY_CHAR_SYMBOL
|
||||
&& headPattern[i] != text[i]
|
||||
) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if (starPosition == -1) {
|
||||
return headPattern.length == text.length
|
||||
}
|
||||
|
||||
val tailPattern: String = pattern.substring(starPosition + 1)
|
||||
|
||||
for (i in headPattern.length..text.length) {
|
||||
if (matches(text.substring(i), tailPattern)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager.agent.config
|
||||
|
||||
import org.springframework.context.annotation.Import
|
||||
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration
|
||||
|
||||
/**
|
||||
* Annotation to enable Workers Manager module in Spring components via annotations.
|
||||
*/
|
||||
@Import(WorkersManagerConfiguration::class)
|
||||
annotation class EnableWorkersManager
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
package ru.touchin.spring.workers.manager.agent.config
|
||||
|
||||
import org.springframework.stereotype.Component
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.agent.trigger.services.TriggerDescriptorService
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkersStateService
|
||||
|
||||
@Component
|
||||
class WorkerInitializer(
|
||||
private val triggerDescriptorAgentService: TriggerDescriptorService,
|
||||
private val jobDefinitionsRegistry: JobDefinitionsRegistry,
|
||||
private val workerCoreService: WorkerCoreService,
|
||||
private val workerStateService: WorkersStateService,
|
||||
) {
|
||||
|
||||
@Transactional
|
||||
fun init() {
|
||||
initWorkers(jobDefinitionsRegistry.jobNames)
|
||||
}
|
||||
|
||||
private fun initWorkers(jobNames: Set<String>) {
|
||||
jobNames.forEach(this::getOrCreateWorkerWithTriggers)
|
||||
}
|
||||
|
||||
private fun getOrCreateWorkerWithTriggers(name: String) {
|
||||
workerCoreService.getWithLock(name)
|
||||
?.let { workerStateService.start(it.name) }
|
||||
?: createWorkerWithTriggers(name)
|
||||
}
|
||||
|
||||
private fun createWorkerWithTriggers(name: String) {
|
||||
val worker = workerCoreService.create(name)
|
||||
|
||||
triggerDescriptorAgentService.createDefaultTriggerDescriptors(worker)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
package ru.touchin.spring.workers.manager.agent.quartz
|
||||
|
||||
import org.quartz.Job
|
||||
import org.quartz.JobBuilder
|
||||
import org.quartz.JobDataMap
|
||||
import org.quartz.JobExecutionContext
|
||||
import org.quartz.JobExecutionException
|
||||
|
||||
typealias JobFunction = (JobExecutionContext) -> Unit
|
||||
|
||||
class RunnableJob : Job {
|
||||
|
||||
override fun execute(context: JobExecutionContext) {
|
||||
try {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
(context.jobDetail.jobDataMap[ACTION] as JobFunction).also { action ->
|
||||
action.invoke(context)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
throw JobExecutionException(e)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
private const val ACTION = "ACTION"
|
||||
|
||||
fun initJobBuilder(action: JobFunction): JobBuilder {
|
||||
return JobBuilder
|
||||
.newJob(RunnableJob::class.java)
|
||||
.usingJobData(JobDataMap(mapOf(ACTION to action)))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,64 @@
|
|||
package ru.touchin.spring.workers.manager.agent.registry
|
||||
|
||||
import org.quartz.JobDetail
|
||||
import org.springframework.beans.factory.annotation.Value
|
||||
import org.springframework.stereotype.Component
|
||||
import org.springframework.util.LinkedMultiValueMap
|
||||
import ru.touchin.spring.workers.manager.agent.worker.executors.WorkerActionExecutor
|
||||
import ru.touchin.spring.workers.manager.agent.quartz.RunnableJob
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import ru.touchin.spring.workers.manager.agent.common.utils.Glob
|
||||
|
||||
@Component
|
||||
class JobDefinitionsRegistry(
|
||||
@Value("#{'\${workers.names}'.split(',')}")
|
||||
workersNamesPatterns: Set<String>,
|
||||
providers: List<JobProvider>,
|
||||
private val workerActionExecutor: WorkerActionExecutor
|
||||
) {
|
||||
|
||||
final val jobs: Map<String, BaseJob>
|
||||
|
||||
private final val jobDetails: Map<String, JobDetail>
|
||||
|
||||
final val jobNames: Set<String>
|
||||
|
||||
init {
|
||||
val allJobs = providers.flatMap(JobProvider::getJobs)
|
||||
|
||||
val name2jobsList = LinkedMultiValueMap<String, BaseJob>()
|
||||
|
||||
allJobs.forEach { job ->
|
||||
name2jobsList.add(job.getName(), job)
|
||||
}
|
||||
|
||||
name2jobsList.forEach { (name, jobs) ->
|
||||
check(jobs.size <= 1) { "There are ${jobs.size} jobs with name '$name'. Job names must be unique" }
|
||||
}
|
||||
|
||||
val name2job = name2jobsList
|
||||
.toSingleValueMap()
|
||||
.filterKeys { name -> workersNamesPatterns.any { pattern -> Glob.matches(name, pattern) } }
|
||||
|
||||
jobNames = name2job.keys
|
||||
|
||||
jobs = name2job
|
||||
|
||||
jobDetails = name2job
|
||||
.mapValues { (_, job) ->
|
||||
createJobDetail(job)
|
||||
}
|
||||
}
|
||||
|
||||
fun getJobDetail(jobName: String): JobDetail? = jobDetails[jobName]
|
||||
|
||||
private fun createJobDetail(job: BaseJob): JobDetail {
|
||||
return RunnableJob
|
||||
.initJobBuilder { context ->
|
||||
workerActionExecutor.executeJobAction(job, context.trigger)
|
||||
}
|
||||
.withIdentity(job.getName())
|
||||
.build()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
package ru.touchin.spring.workers.manager.agent.registry
|
||||
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
|
||||
interface JobProvider {
|
||||
|
||||
fun getJobs(): List<BaseJob>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package ru.touchin.spring.workers.manager.agent.registry
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
|
||||
@Component
|
||||
class SimpleJobProvider : JobProvider {
|
||||
|
||||
@Autowired(required = false)
|
||||
var jobBeans: List<BaseJob> = emptyList()
|
||||
|
||||
override fun getJobs(): List<BaseJob> = jobBeans
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
package ru.touchin.spring.workers.manager.agent.registry
|
||||
|
||||
import org.quartz.Trigger
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
@Component
|
||||
class TriggersRegistry {
|
||||
|
||||
// Concurrent impl to prevent java.util.ConcurrentModificationException
|
||||
private val descriptors2triggers: MutableMap<TriggerDescriptor, Trigger> = ConcurrentHashMap()
|
||||
|
||||
fun getTriggerByDescriptor(descriptor: TriggerDescriptor): Trigger? {
|
||||
return descriptors2triggers[descriptor]
|
||||
}
|
||||
|
||||
fun getDescriptorByTrigger(trigger: Trigger): TriggerDescriptor? {
|
||||
return descriptors2triggers
|
||||
.entries
|
||||
.firstOrNull { it.value == trigger }
|
||||
?.key
|
||||
}
|
||||
|
||||
fun getDescriptors(): List<TriggerDescriptor> {
|
||||
return descriptors2triggers.keys.toList()
|
||||
}
|
||||
|
||||
fun putTrigger(descriptor: TriggerDescriptor, trigger: Trigger) {
|
||||
descriptors2triggers[descriptor] = trigger
|
||||
}
|
||||
|
||||
fun remove(triggerDescriptors: List<TriggerDescriptor>) {
|
||||
return triggerDescriptors.forEach { descriptors2triggers.remove(it) }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,67 @@
|
|||
package ru.touchin.spring.workers.manager.agent.scheduled
|
||||
|
||||
import org.quartz.Scheduler
|
||||
import org.quartz.SimpleScheduleBuilder
|
||||
import org.quartz.TriggerBuilder
|
||||
import org.springframework.beans.factory.annotation.Value
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.spring.workers.manager.agent.quartz.RunnableJob
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
|
||||
import ru.touchin.spring.workers.manager.agent.scheduled.services.SchedulerService
|
||||
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
|
||||
|
||||
/**
|
||||
* Class is proceeding regular synchronisation trigger descriptors from db and quartz scheduled triggers
|
||||
*/
|
||||
@Component
|
||||
class WorkerManagerWatcher(
|
||||
@Value("\${workers.watcher.sync.interval}")
|
||||
private val watcherSyncInterval: Long,
|
||||
private val jobDefinitionsRegistry: JobDefinitionsRegistry,
|
||||
private val scheduleTriggerService: SchedulerService,
|
||||
private val triggersRegistry: TriggersRegistry,
|
||||
private val triggerDescriptorCoreService: TriggerDescriptorCoreService,
|
||||
private val quartzScheduler: Scheduler
|
||||
) {
|
||||
|
||||
fun init() {
|
||||
val systemJobDetail = RunnableJob
|
||||
.initJobBuilder { sync() }
|
||||
.withIdentity(SYSTEM_JOB_NAME, SYSTEM_JOB_GROUP)
|
||||
.build()
|
||||
|
||||
val systemTrigger = TriggerBuilder.newTrigger()
|
||||
.forJob(systemJobDetail)
|
||||
.withIdentity("${SYSTEM_JOB_NAME}_trigger")
|
||||
.withSchedule(SimpleScheduleBuilder
|
||||
.simpleSchedule()
|
||||
.repeatForever()
|
||||
.withIntervalInMilliseconds(watcherSyncInterval)
|
||||
)
|
||||
.build()
|
||||
|
||||
quartzScheduler.scheduleJob(systemJobDetail, systemTrigger)
|
||||
}
|
||||
|
||||
fun sync() {
|
||||
val currentTriggerDescriptors = triggersRegistry.getDescriptors()
|
||||
|
||||
val actualTriggerDescriptors = jobDefinitionsRegistry.jobs
|
||||
.flatMap { (jobName, _) -> triggerDescriptorCoreService.getByWorkerName(jobName) }
|
||||
.filter { !it.isDisabled() }
|
||||
|
||||
val deletedTriggerDescriptors = currentTriggerDescriptors - actualTriggerDescriptors.toSet()
|
||||
scheduleTriggerService.unscheduleTriggers(deletedTriggerDescriptors)
|
||||
triggersRegistry.remove(deletedTriggerDescriptors)
|
||||
|
||||
val newTriggerDescriptors = actualTriggerDescriptors - currentTriggerDescriptors.toSet()
|
||||
scheduleTriggerService.scheduleTriggers(newTriggerDescriptors)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val SYSTEM_JOB_GROUP = "SYSTEM"
|
||||
private const val SYSTEM_JOB_NAME = "system_worker_check"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager.agent.scheduled.services
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
|
||||
interface SchedulerService {
|
||||
|
||||
fun scheduleTriggers(triggerDescriptors: List<TriggerDescriptor>)
|
||||
fun unscheduleTriggers(triggerDescriptors: List<TriggerDescriptor>)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
package ru.touchin.spring.workers.manager.agent.scheduled.services
|
||||
|
||||
import org.quartz.CronScheduleBuilder
|
||||
import org.quartz.JobDetail
|
||||
import org.quartz.ScheduleBuilder
|
||||
import org.quartz.Scheduler
|
||||
import org.quartz.SimpleScheduleBuilder
|
||||
import org.quartz.Trigger
|
||||
import org.quartz.TriggerBuilder
|
||||
import org.springframework.stereotype.Service
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
|
||||
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
|
||||
@Service
|
||||
class SchedulerServiceImpl(
|
||||
private val quartzScheduler: Scheduler,
|
||||
private val jobDefinitionsRegistry: JobDefinitionsRegistry,
|
||||
private val triggersRegistry: TriggersRegistry
|
||||
) : SchedulerService {
|
||||
|
||||
override fun scheduleTriggers(
|
||||
triggerDescriptors: List<TriggerDescriptor>
|
||||
) {
|
||||
triggerDescriptors.forEach(this::scheduleTrigger)
|
||||
}
|
||||
|
||||
override fun unscheduleTriggers(
|
||||
triggerDescriptors: List<TriggerDescriptor>
|
||||
) {
|
||||
triggerDescriptors.forEach { descriptor ->
|
||||
val trigger = triggersRegistry.getTriggerByDescriptor(descriptor)
|
||||
|
||||
if (trigger != null) {
|
||||
quartzScheduler.unscheduleJob(trigger.key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun scheduleTrigger(descriptor: TriggerDescriptor) {
|
||||
val jobDetail = jobDefinitionsRegistry.getJobDetail(descriptor.workerName)
|
||||
|
||||
if (jobDetail != null) {
|
||||
val trigger = createTrigger(jobDetail, descriptor)
|
||||
|
||||
triggersRegistry.putTrigger(descriptor, trigger)
|
||||
|
||||
if (!quartzScheduler.checkExists(trigger.key)) {
|
||||
if (quartzScheduler.checkExists(jobDetail.key)) {
|
||||
quartzScheduler.scheduleJob(trigger)
|
||||
} else {
|
||||
quartzScheduler.scheduleJob(jobDetail, trigger)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun createTrigger(job: JobDetail, triggerDescriptor: TriggerDescriptor): Trigger {
|
||||
return TriggerBuilder.newTrigger().forJob(job)
|
||||
.withIdentity(createTriggerName(job, triggerDescriptor))
|
||||
.withSchedule(getScheduleBuilder(triggerDescriptor))
|
||||
.build()
|
||||
}
|
||||
|
||||
private fun createTriggerName(job: JobDetail, triggerDescriptor: TriggerDescriptor) =
|
||||
"${job.key.name}_${triggerDescriptor.id}_trigger"
|
||||
|
||||
private fun getScheduleBuilder(triggerDescriptor: TriggerDescriptor): ScheduleBuilder<out Trigger> {
|
||||
return when (triggerDescriptor.type) {
|
||||
TriggerType.CRON -> cronSchedule(triggerDescriptor.expression)
|
||||
TriggerType.FIXED_RATE -> fixedRateSchedule(triggerDescriptor.expression.toLong())
|
||||
TriggerType.FIXED_DELAY -> fixedRateSchedule(triggerDescriptor.expression.toLong())
|
||||
}
|
||||
}
|
||||
|
||||
private fun fixedRateSchedule(interval: Long): SimpleScheduleBuilder {
|
||||
return SimpleScheduleBuilder.simpleSchedule().repeatForever().withIntervalInMilliseconds(interval)
|
||||
}
|
||||
|
||||
private fun cronSchedule(expression: String): CronScheduleBuilder {
|
||||
return CronScheduleBuilder.cronSchedule(expression)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package ru.touchin.spring.workers.manager.agent.trigger
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
|
||||
/**
|
||||
* When the agent is starting first time, then initial (default) triggers should be created for new workers.
|
||||
*/
|
||||
interface InitialTriggerDescriptorsProvider {
|
||||
|
||||
fun applicableFor(worker: Worker): Boolean
|
||||
fun createInitialTriggerDescriptors(worker: Worker): List<CreateTriggerDescriptor>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package ru.touchin.spring.workers.manager.agent.trigger
|
||||
|
||||
import org.springframework.core.annotation.Order
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.common.spring.Ordered
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
|
||||
/**
|
||||
* When no triggers could be created by any provider, then no triggers created.
|
||||
*/
|
||||
@Component
|
||||
@Order(Ordered.LOWER)
|
||||
class NoInitialTriggersDescriptorsProvider : InitialTriggerDescriptorsProvider {
|
||||
|
||||
override fun applicableFor(worker: Worker): Boolean = true
|
||||
|
||||
override fun createInitialTriggerDescriptors(worker: Worker): List<CreateTriggerDescriptor> = emptyList()
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager.agent.trigger.services
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
|
||||
interface TriggerDescriptorService {
|
||||
|
||||
fun createDefaultTriggerDescriptors(worker: Worker): List<TriggerDescriptor>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
package ru.touchin.spring.workers.manager.agent.trigger.services
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.agent.trigger.InitialTriggerDescriptorsProvider
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
|
||||
@Service
|
||||
class TriggerDescriptorServiceImpl(
|
||||
private val triggerDescriptorCoreService: TriggerDescriptorCoreService,
|
||||
private val initialTriggerDescriptorProviders: List<InitialTriggerDescriptorsProvider>,
|
||||
) : TriggerDescriptorService {
|
||||
|
||||
@Transactional
|
||||
override fun createDefaultTriggerDescriptors(worker: Worker): List<TriggerDescriptor> {
|
||||
return triggerDescriptorCoreService.create(
|
||||
initialTriggerDescriptorProviders
|
||||
.filter { it.applicableFor(worker) }
|
||||
.flatMap { it.createInitialTriggerDescriptors(worker) }
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package ru.touchin.spring.workers.manager.agent.worker.executors
|
||||
|
||||
import org.quartz.Trigger
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
|
||||
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
|
||||
|
||||
interface WorkerActionExecutor {
|
||||
|
||||
fun prepareExecution(job: BaseJob, currentTrigger: Trigger): Execution?
|
||||
fun finishWorkerProcessing(job: BaseJob)
|
||||
fun executeJobAction(job: BaseJob, currentTrigger: Trigger)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
package ru.touchin.spring.workers.manager.agent.worker.executors
|
||||
|
||||
import org.quartz.Trigger
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
|
||||
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
|
||||
import ru.touchin.spring.workers.manager.core.execution.services.ExecutionCoreService
|
||||
import ru.touchin.spring.workers.manager.core.execution.services.dto.CreateExecution
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.dto.UpdateWorker
|
||||
|
||||
@Service
|
||||
class WorkerActionExecutorImpl(
|
||||
private val executionCoreService: ExecutionCoreService,
|
||||
private val workerCoreService: WorkerCoreService,
|
||||
private val triggersRegistry: TriggersRegistry,
|
||||
) : WorkerActionExecutor {
|
||||
|
||||
@Autowired
|
||||
lateinit var workerActionExecutor: WorkerActionExecutor
|
||||
|
||||
override fun executeJobAction(job: BaseJob, currentTrigger: Trigger) {
|
||||
val execution = workerActionExecutor.prepareExecution(job, currentTrigger)
|
||||
|
||||
execution?.let {
|
||||
job.run()
|
||||
|
||||
executionCoreService.setFinished(execution.id)
|
||||
}
|
||||
|
||||
workerActionExecutor.finishWorkerProcessing(job)
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun prepareExecution(job: BaseJob, currentTrigger: Trigger): Execution? {
|
||||
val currentWorker = workerCoreService.getWithLock(job.getName())
|
||||
|
||||
return currentWorker
|
||||
?.takeIf { !it.isStopped() }
|
||||
?.let { worker ->
|
||||
val currentTriggerDescriptor = triggersRegistry.getDescriptorByTrigger(currentTrigger)
|
||||
|
||||
currentTriggerDescriptor?.let {
|
||||
setWorkerStatus(worker.name, WorkerStatus.PROCESSING)
|
||||
|
||||
executionCoreService.create(
|
||||
CreateExecution(
|
||||
workerName = worker.name,
|
||||
triggerId = currentTriggerDescriptor.id,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun finishWorkerProcessing(job: BaseJob) {
|
||||
val currentWorker = workerCoreService.getWithLock(job.getName())
|
||||
?: return
|
||||
|
||||
setWorkerStatus(currentWorker.name, WorkerStatus.IDLE)
|
||||
}
|
||||
|
||||
private fun setWorkerStatus(name: String, status: WorkerStatus) {
|
||||
workerCoreService.update(
|
||||
UpdateWorker(
|
||||
name = name,
|
||||
status = status,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager
|
||||
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||
import org.springframework.boot.test.context.TestConfiguration
|
||||
import org.springframework.context.annotation.Import
|
||||
|
||||
@SpringBootApplication
|
||||
@TestConfiguration
|
||||
@Import(WorkersManagerConfiguration::class)
|
||||
class TestApplication
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
package ru.touchin.spring.workers.manager.agent.common.utils
|
||||
|
||||
import org.junit.jupiter.api.Assertions.assertFalse
|
||||
import org.junit.jupiter.api.Assertions.assertTrue
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
internal class GlobTest {
|
||||
|
||||
@Test
|
||||
fun matches() {
|
||||
assertMatches("", "*****")
|
||||
assertMatches("my perfect text", "my*text")
|
||||
assertMatches("my perfect text", "my*?text")
|
||||
assertMatches("moon", "????")
|
||||
assertMatches("(abc)", "?abc?")
|
||||
assertMatches("****", "????")
|
||||
|
||||
assertNotMatches("", "?")
|
||||
assertNotMatches("mo", "????")
|
||||
assertNotMatches("moonmoon", "????")
|
||||
assertNotMatches("my perfect text", "our*text")
|
||||
}
|
||||
|
||||
private fun assertMatches(text: String, pattern: String) {
|
||||
assertTrue(Glob.matches(text, pattern))
|
||||
}
|
||||
|
||||
private fun assertNotMatches(text: String, pattern: String) {
|
||||
assertFalse(Glob.matches(text, pattern))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
package ru.touchin.spring.workers.manager.agent.config
|
||||
|
||||
import com.nhaarman.mockitokotlin2.doNothing
|
||||
import com.nhaarman.mockitokotlin2.mock
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.mockito.ArgumentMatchers.anyString
|
||||
import org.mockito.Mockito
|
||||
import org.mockito.Mockito.doAnswer
|
||||
import org.mockito.Mockito.never
|
||||
import org.mockito.Mockito.verify
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobProvider
|
||||
import ru.touchin.spring.workers.manager.agent.trigger.services.TriggerDescriptorService
|
||||
import ru.touchin.spring.workers.manager.agent.worker.executors.WorkerActionExecutorImpl
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkersStateService
|
||||
import ru.touchin.spring.workers.manager.utils.MockitoUtils.anyx
|
||||
|
||||
internal class WorkerInitializerTest {
|
||||
|
||||
private val baseJob = mock<BaseJob> {
|
||||
on(it.getName()).thenReturn(BASE_WORKER_NAME)
|
||||
}
|
||||
|
||||
private val simpleJobProvider = mock<JobProvider> {
|
||||
on(it.getJobs()).thenReturn(listOf(baseJob))
|
||||
}
|
||||
|
||||
private val workerActionExecutor = Mockito.mock(WorkerActionExecutorImpl::class.java)
|
||||
|
||||
private val jobDefinitionsRegistry = JobDefinitionsRegistry(setOf(BASE_WORKER_NAME), listOf(simpleJobProvider), workerActionExecutor)
|
||||
private val workerCoreService = Mockito.mock(WorkerCoreService::class.java)
|
||||
|
||||
private val workersStateService = Mockito.mock(WorkersStateService::class.java)
|
||||
|
||||
private val triggerDescriptorService = Mockito.mock(TriggerDescriptorService::class.java)
|
||||
|
||||
private val workerInitializer = WorkerInitializer(
|
||||
triggerDescriptorService,
|
||||
jobDefinitionsRegistry,
|
||||
workerCoreService,
|
||||
workersStateService,
|
||||
)
|
||||
|
||||
private val baseWorker = Worker(
|
||||
name = BASE_WORKER_NAME,
|
||||
status = WorkerStatus.IDLE,
|
||||
disabledAt = null,
|
||||
stoppedAt = null,
|
||||
parallelExecutionEnabled = false,
|
||||
)
|
||||
|
||||
@Test
|
||||
fun checkSyncWithExistingWorker() {
|
||||
doAnswer { baseWorker }.`when`(workerCoreService).getWithLock(BASE_WORKER_NAME)
|
||||
doNothing().`when`(workersStateService).start(anyx())
|
||||
|
||||
workerInitializer.init()
|
||||
|
||||
verify(workersStateService).start(baseWorker.name)
|
||||
verify(workerCoreService, never()).create(anyString())
|
||||
verify(triggerDescriptorService, never()).createDefaultTriggerDescriptors(baseWorker)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun checkSyncAndCreateNewWorker() {
|
||||
doAnswer { null }.`when`(workerCoreService).getWithLock(BASE_WORKER_NAME)
|
||||
doAnswer { baseWorker }.`when`(workerCoreService).create(BASE_WORKER_NAME)
|
||||
|
||||
workerInitializer.init()
|
||||
|
||||
verify(workerCoreService).create(BASE_WORKER_NAME)
|
||||
verify(triggerDescriptorService).createDefaultTriggerDescriptors(baseWorker)
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
private const val BASE_WORKER_NAME = "baseWorker"
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,134 @@
|
|||
package ru.touchin.spring.workers.manager.agent.scheduled
|
||||
|
||||
import com.nhaarman.mockitokotlin2.mock
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.mockito.ArgumentMatchers
|
||||
import org.mockito.Mockito
|
||||
import org.mockito.Mockito.anyString
|
||||
import org.mockito.Mockito.doAnswer
|
||||
import org.mockito.Mockito.verify
|
||||
import org.quartz.Scheduler
|
||||
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
|
||||
import ru.touchin.spring.workers.manager.agent.registry.JobProvider
|
||||
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
|
||||
import ru.touchin.spring.workers.manager.agent.scheduled.services.SchedulerServiceImpl
|
||||
import ru.touchin.spring.workers.manager.agent.worker.executors.WorkerActionExecutorImpl
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
import java.util.*
|
||||
|
||||
internal class WorkerManagerWatcherTest {
|
||||
|
||||
private val baseJob = mock<BaseJob> {
|
||||
on(it.getName()).thenReturn(BASE_WORKER_NAME)
|
||||
}
|
||||
|
||||
private val simpleJobProvider = mock<JobProvider> {
|
||||
on(it.getJobs()).thenReturn(listOf(baseJob))
|
||||
}
|
||||
|
||||
private val workerActionExecutor = Mockito.mock(WorkerActionExecutorImpl::class.java)
|
||||
|
||||
private val jobDefinitionsRegistry = JobDefinitionsRegistry(setOf(BASE_WORKER_NAME), listOf(simpleJobProvider), workerActionExecutor)
|
||||
|
||||
private val triggerRegistry = Mockito.mock(TriggersRegistry::class.java)
|
||||
private val watcherSyncInterval = 1000L
|
||||
|
||||
private val triggerDescriptorCoreService = Mockito.mock(TriggerDescriptorCoreService::class.java)
|
||||
private val scheduleTriggerService = Mockito.mock(SchedulerServiceImpl::class.java)
|
||||
private val quartzScheduler = Mockito.mock(Scheduler::class.java)
|
||||
|
||||
private val workerManagerWatcher = WorkerManagerWatcher(
|
||||
watcherSyncInterval,
|
||||
jobDefinitionsRegistry,
|
||||
scheduleTriggerService,
|
||||
triggerRegistry,
|
||||
triggerDescriptorCoreService,
|
||||
quartzScheduler
|
||||
)
|
||||
|
||||
private val baseWorker = Worker(
|
||||
name = BASE_WORKER_NAME,
|
||||
status = WorkerStatus.IDLE,
|
||||
disabledAt = null,
|
||||
stoppedAt = null,
|
||||
parallelExecutionEnabled = false,
|
||||
)
|
||||
|
||||
private val triggerDescriptorId1 = UUID.fromString("514fb14b-d7ea-4ace-b200-4e06e80c37b7")
|
||||
private val triggerDescriptorId2 = UUID.fromString("94d2ef98-6fc9-4d41-a66a-35b73a2448e0")
|
||||
private val triggerDescriptorId3 = UUID.fromString("2a7719ec-6a65-481e-947b-08537ada2337")
|
||||
private val currentTriggerDescriptor1 = createTriggerDescriptor(triggerDescriptorId1, TriggerType.CRON, baseWorker)
|
||||
private val actualTriggerDescriptor1 = createTriggerDescriptor(triggerDescriptorId2, TriggerType.FIXED_RATE, baseWorker)
|
||||
private val actualTriggerDescriptor2 = createTriggerDescriptor(triggerDescriptorId3, TriggerType.FIXED_RATE, baseWorker)
|
||||
private val currentTriggers = listOf(currentTriggerDescriptor1)
|
||||
|
||||
@BeforeEach
|
||||
fun setUp() {
|
||||
doAnswer { currentTriggers }
|
||||
.`when`(triggerRegistry).getDescriptors()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun checkSyncWithOnlyNewTriggersWithoutRemoving() {
|
||||
doAnswer { listOf(currentTriggerDescriptor1, actualTriggerDescriptor1) }
|
||||
.`when`(triggerDescriptorCoreService).getByWorkerName(anyString())
|
||||
|
||||
workerManagerWatcher.sync()
|
||||
|
||||
verify(triggerRegistry).remove(emptyList())
|
||||
verify(scheduleTriggerService).unscheduleTriggers(emptyList())
|
||||
|
||||
verify(scheduleTriggerService).scheduleTriggers(listOf(actualTriggerDescriptor1))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun checkSyncWithOnlyRemoveIrrelevantTriggers() {
|
||||
doAnswer { emptyList<TriggerDescriptor>() }
|
||||
.`when`(triggerDescriptorCoreService).getByWorkerName(ArgumentMatchers.anyString())
|
||||
|
||||
workerManagerWatcher.sync()
|
||||
|
||||
verify(triggerRegistry).remove(currentTriggers)
|
||||
verify(scheduleTriggerService).unscheduleTriggers(currentTriggers)
|
||||
|
||||
verify(scheduleTriggerService).scheduleTriggers(emptyList())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun checkSyncWithSaveNewAndRemoveIrrelevantTriggers() {
|
||||
doAnswer { listOf(actualTriggerDescriptor2, actualTriggerDescriptor1) }
|
||||
.`when`(triggerDescriptorCoreService).getByWorkerName(ArgumentMatchers.anyString())
|
||||
|
||||
workerManagerWatcher.sync()
|
||||
|
||||
verify(triggerRegistry).remove(currentTriggers)
|
||||
verify(scheduleTriggerService).unscheduleTriggers(currentTriggers)
|
||||
|
||||
verify(scheduleTriggerService).scheduleTriggers(setOf(actualTriggerDescriptor2, actualTriggerDescriptor1).toList())
|
||||
}
|
||||
|
||||
private fun createTriggerDescriptor(id: UUID, type: TriggerType, worker: Worker): TriggerDescriptor {
|
||||
return TriggerDescriptor(
|
||||
id = id,
|
||||
name = id.toString(),
|
||||
type = type,
|
||||
expression = "expression",
|
||||
workerName = worker.name,
|
||||
disabledAt = null,
|
||||
deletedAt = null,
|
||||
)
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
private const val BASE_WORKER_NAME = "baseWorker"
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
package ru.touchin.spring.workers.manager.utils
|
||||
|
||||
import org.mockito.ArgumentMatchers
|
||||
import org.mockito.Mockito
|
||||
|
||||
object MockitoUtils {
|
||||
fun <T> anyx(matcher: ((T) -> Boolean)? = null): T {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return if (matcher == null) {
|
||||
ArgumentMatchers.any() ?: (null as T)
|
||||
} else {
|
||||
Mockito.argThat(matcher) ?: (null as T)
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> anyx(sample: T): T = anyx { it == sample }
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
spring:
|
||||
config:
|
||||
import: "test-slow.yml"
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
spring:
|
||||
config:
|
||||
import: "test.yml"
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
plugins {
|
||||
id("kotlin")
|
||||
id("kotlin-spring")
|
||||
id("maven-publish")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(project(":spring-workers-manager-core"))
|
||||
|
||||
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
|
||||
|
||||
implementation("org.springframework.data:spring-data-jpa")
|
||||
implementation("org.springframework.boot:spring-boot-starter-web")
|
||||
}
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
package ru.touchin.spring.workers.manager.api.trigger.controllers
|
||||
|
||||
import org.springframework.web.bind.annotation.PathVariable
|
||||
import org.springframework.web.bind.annotation.PostMapping
|
||||
import org.springframework.web.bind.annotation.PutMapping
|
||||
import org.springframework.web.bind.annotation.RequestBody
|
||||
import org.springframework.web.bind.annotation.RequestMapping
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
import ru.touchin.spring.workers.manager.api.trigger.controllers.dto.TriggerChangeRequest
|
||||
import ru.touchin.spring.workers.manager.api.trigger.services.TriggerDescriptorApiService
|
||||
import ru.touchin.spring.workers.manager.api.trigger.services.dto.CreateTrigger
|
||||
import ru.touchin.spring.workers.manager.api.trigger.services.dto.UpdateTrigger
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/workers/{workerName}/triggers")
|
||||
class TriggerController(
|
||||
private val triggerDescriptorApiService: TriggerDescriptorApiService
|
||||
) {
|
||||
|
||||
@PostMapping
|
||||
fun createTrigger(
|
||||
@PathVariable
|
||||
workerName: String,
|
||||
@RequestBody
|
||||
body: TriggerChangeRequest
|
||||
) {
|
||||
triggerDescriptorApiService.create(
|
||||
CreateTrigger(
|
||||
workerName = workerName,
|
||||
triggerName = body.name,
|
||||
type = body.type,
|
||||
expression = body.expression,
|
||||
disabled = body.disabled,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
@PutMapping("/{triggerName}")
|
||||
fun changeTrigger(
|
||||
@PathVariable
|
||||
triggerName: String,
|
||||
@PathVariable
|
||||
workerName: String,
|
||||
@RequestBody
|
||||
body: TriggerChangeRequest
|
||||
) {
|
||||
triggerDescriptorApiService.update(
|
||||
UpdateTrigger(
|
||||
workerName = workerName,
|
||||
oldTriggerName = triggerName,
|
||||
newTriggerName = body.name,
|
||||
type = body.type,
|
||||
expression = body.expression,
|
||||
disabled = body.disabled,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager.api.trigger.controllers.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
|
||||
data class TriggerChangeRequest(
|
||||
val name: String,
|
||||
val type: TriggerType,
|
||||
val expression: String,
|
||||
val disabled: Boolean,
|
||||
)
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package ru.touchin.spring.workers.manager.api.trigger.services
|
||||
|
||||
import ru.touchin.spring.workers.manager.api.trigger.services.dto.CreateTrigger
|
||||
import ru.touchin.spring.workers.manager.api.trigger.services.dto.UpdateTrigger
|
||||
|
||||
interface TriggerDescriptorApiService {
|
||||
|
||||
fun create(create: CreateTrigger)
|
||||
fun update(update: UpdateTrigger)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
package ru.touchin.spring.workers.manager.api.trigger.services
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.api.trigger.services.dto.CreateTrigger
|
||||
import ru.touchin.spring.workers.manager.api.trigger.services.dto.UpdateTrigger
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.exceptions.TriggerNotFoundException
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
@Service
|
||||
class TriggerDescriptorApiServiceImpl(
|
||||
private val workerCoreService: WorkerCoreService,
|
||||
private val triggerDescriptorCoreService: TriggerDescriptorCoreService,
|
||||
) : TriggerDescriptorApiService {
|
||||
|
||||
@Transactional
|
||||
override fun create(create: CreateTrigger) {
|
||||
val worker = workerCoreService.get(create.workerName)
|
||||
|
||||
triggerDescriptorCoreService.create(
|
||||
CreateTriggerDescriptor(
|
||||
expression = create.expression,
|
||||
name = create.triggerName,
|
||||
type = create.type,
|
||||
disabledAt = if (create.disabled) ZonedDateTime.now() else null,
|
||||
workerName = worker.name,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun update(update: UpdateTrigger) {
|
||||
val worker = workerCoreService.get(update.workerName)
|
||||
|
||||
val triggerDescriptor = getTriggerDescriptor(worker, update.oldTriggerName)
|
||||
|
||||
triggerDescriptorCoreService.setDeleted(triggerDescriptor.id)
|
||||
|
||||
triggerDescriptorCoreService.create(
|
||||
CreateTriggerDescriptor(
|
||||
expression = update.expression,
|
||||
name = update.newTriggerName,
|
||||
type = update.type,
|
||||
disabledAt = if (update.disabled) ZonedDateTime.now() else null,
|
||||
workerName = worker.name,
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private fun getTriggerDescriptor(worker: Worker, triggerName: String): TriggerDescriptor {
|
||||
return triggerDescriptorCoreService.getByWorkerName(worker.name)
|
||||
.firstOrNull { it.name == triggerName }
|
||||
?: throw TriggerNotFoundException(triggerName)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package ru.touchin.spring.workers.manager.api.trigger.services.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
|
||||
data class CreateTrigger(
|
||||
val workerName: String,
|
||||
val triggerName: String,
|
||||
val type: TriggerType,
|
||||
val expression: String,
|
||||
val disabled: Boolean,
|
||||
)
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package ru.touchin.spring.workers.manager.api.trigger.services.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
|
||||
data class UpdateTrigger(
|
||||
val workerName: String,
|
||||
val oldTriggerName: String,
|
||||
val newTriggerName: String,
|
||||
val type: TriggerType,
|
||||
val expression: String,
|
||||
val disabled: Boolean,
|
||||
)
|
||||
|
|
@ -0,0 +1,65 @@
|
|||
package ru.touchin.spring.workers.manager.api.worker.controllers
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping
|
||||
import org.springframework.web.bind.annotation.PathVariable
|
||||
import org.springframework.web.bind.annotation.PostMapping
|
||||
import org.springframework.web.bind.annotation.RequestMapping
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.WorkerResponse
|
||||
import ru.touchin.spring.workers.manager.api.worker.services.WorkerApiService
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkersStateService
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/workers")
|
||||
class WorkerController(
|
||||
private val workerStateService: WorkersStateService,
|
||||
private val workerApiService: WorkerApiService
|
||||
) {
|
||||
|
||||
@GetMapping
|
||||
fun getWorkers(): List<WorkerResponse> {
|
||||
return workerApiService.getWorkers()
|
||||
}
|
||||
|
||||
@GetMapping("/{workerName}")
|
||||
fun getWorker(
|
||||
@PathVariable
|
||||
workerName: String
|
||||
): WorkerResponse {
|
||||
return workerApiService.getWorker(workerName)
|
||||
}
|
||||
|
||||
@PostMapping("/{workerName}/stop")
|
||||
fun stop(
|
||||
@PathVariable
|
||||
workerName: String
|
||||
) {
|
||||
workerStateService.stop(workerName)
|
||||
}
|
||||
|
||||
@PostMapping("/{workerName}/start")
|
||||
fun start(
|
||||
@PathVariable
|
||||
workerName: String
|
||||
) {
|
||||
workerStateService.start(workerName)
|
||||
}
|
||||
|
||||
@PostMapping("/{workerName}/disable")
|
||||
fun disable(
|
||||
@PathVariable
|
||||
workerName: String
|
||||
) {
|
||||
workerStateService.disable(workerName)
|
||||
}
|
||||
|
||||
@PostMapping("/{workerName}/enable")
|
||||
fun enable(
|
||||
@PathVariable
|
||||
workerName: String
|
||||
) {
|
||||
workerStateService.enable(workerName)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
package ru.touchin.spring.workers.manager.api.worker.controllers.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
data class WorkerResponse(
|
||||
val workerName: String,
|
||||
val stoppedAt: ZonedDateTime?,
|
||||
val disabledAt: ZonedDateTime?,
|
||||
val status: WorkerStatus,
|
||||
val parallelExecutionEnabled: Boolean
|
||||
)
|
||||
|
||||
fun Worker.toWorkerResponse() = WorkerResponse(
|
||||
workerName = this.name,
|
||||
stoppedAt = this.stoppedAt,
|
||||
disabledAt = this.disabledAt,
|
||||
status = this.status,
|
||||
parallelExecutionEnabled = this.parallelExecutionEnabled,
|
||||
)
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager.api.worker.services
|
||||
|
||||
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.WorkerResponse
|
||||
|
||||
interface WorkerApiService {
|
||||
|
||||
fun getWorker(workerName: String): WorkerResponse
|
||||
fun getWorkers(): List<WorkerResponse>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
package ru.touchin.spring.workers.manager.api.worker.services
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.WorkerResponse
|
||||
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.toWorkerResponse
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
|
||||
|
||||
@Service
|
||||
class WorkerApiServiceImpl(
|
||||
private val workerCoreService: WorkerCoreService
|
||||
) : WorkerApiService {
|
||||
|
||||
override fun getWorker(workerName: String): WorkerResponse {
|
||||
return workerCoreService.get(workerName).toWorkerResponse()
|
||||
}
|
||||
|
||||
override fun getWorkers(): List<WorkerResponse> {
|
||||
return workerCoreService.getAll().map(Worker::toWorkerResponse)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
# Touch Spring Workers Manager
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
plugins {
|
||||
id("kotlin")
|
||||
id("kotlin-spring")
|
||||
id("maven-publish")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api(project(":common"))
|
||||
|
||||
implementation(project(":common-spring-jpa"))
|
||||
|
||||
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
|
||||
|
||||
implementation("org.liquibase:liquibase-core")
|
||||
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
|
||||
implementation("org.springframework.boot:spring-boot-starter-quartz")
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
package ru.touchin.spring.workers.manager
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
||||
import org.springframework.boot.autoconfigure.domain.EntityScan
|
||||
import org.springframework.boot.context.properties.ConfigurationPropertiesScan
|
||||
import org.springframework.cache.annotation.EnableCaching
|
||||
import org.springframework.context.annotation.ComponentScan
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.data.jpa.repository.config.EnableJpaAuditing
|
||||
import org.springframework.data.jpa.repository.config.EnableJpaRepositories
|
||||
import ru.touchin.common.spring.jpa.EnableJpaAuditingExtra
|
||||
|
||||
/**
|
||||
* Configuration which brings to context all the components, required to support workers manager module via annotations.
|
||||
*
|
||||
* You could @[org.springframework.context.annotation.Import] this configuration into your application or use [EnableWorkersManager] annotation to do it automatically
|
||||
*/
|
||||
@ComponentScan
|
||||
@EntityScan
|
||||
@EnableJpaRepositories
|
||||
@EnableCaching
|
||||
@ConfigurationPropertiesScan
|
||||
class WorkersManagerConfiguration {
|
||||
|
||||
companion object {
|
||||
const val SCHEMA: String = "workers"
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies `@EnableJpaAuditing` only if it was not already applied.
|
||||
* Enabling `@EnableJpaAuditing` twice will lead to application context failure.
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(name=["jpaAuditingHandler"])
|
||||
@EnableJpaAuditingExtra
|
||||
class JpaAuditingNonConflictingDeclaration
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
package ru.touchin.spring.workers.manager.core.config
|
||||
|
||||
import liquibase.Contexts
|
||||
import liquibase.Liquibase
|
||||
import liquibase.database.Database
|
||||
import liquibase.database.DatabaseFactory
|
||||
import liquibase.database.jvm.JdbcConnection
|
||||
import liquibase.resource.ClassLoaderResourceAccessor
|
||||
import org.springframework.stereotype.Component
|
||||
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
|
||||
import javax.sql.DataSource
|
||||
|
||||
@Component
|
||||
class LiquibaseRunner(
|
||||
private val dataSource: DataSource
|
||||
) {
|
||||
|
||||
fun run() = dataSource.connection.use { connection ->
|
||||
val database: Database = DatabaseFactory.getInstance()
|
||||
.findCorrectDatabaseImplementation(JdbcConnection(connection))
|
||||
.apply { defaultSchemaName = SCHEMA }
|
||||
|
||||
val liquibase = Liquibase(MASTER_CHANGELOG_PATH, ClassLoaderResourceAccessor(), database)
|
||||
|
||||
liquibase.changeLogParameters.set("schemaName", SCHEMA)
|
||||
|
||||
liquibase.update(Contexts())
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val MASTER_CHANGELOG_PATH = "workers/db/changelog/db.changelog-master.yaml"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.converters
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
|
||||
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
|
||||
import ru.touchin.spring.workers.manager.core.trigger.converters.toTriggerDescriptor
|
||||
|
||||
fun ExecutionEntity.toExecution(): Execution {
|
||||
return Execution(
|
||||
id = id!!,
|
||||
triggerDescriptor = triggerDescriptor?.toTriggerDescriptor(),
|
||||
status = status,
|
||||
startedAt = startedAt,
|
||||
finishedAt = finishedAt,
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.execution.enums.ExecutionStatus
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.*
|
||||
|
||||
data class Execution(
|
||||
val id: UUID,
|
||||
val triggerDescriptor: TriggerDescriptor?,
|
||||
val status: ExecutionStatus,
|
||||
val startedAt: ZonedDateTime?,
|
||||
val finishedAt: ZonedDateTime?,
|
||||
)
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.enums
|
||||
|
||||
enum class ExecutionStatus {
|
||||
|
||||
PROCESSING,
|
||||
FINISHED
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.exceptions
|
||||
|
||||
import ru.touchin.common.exceptions.CommonNotFoundException
|
||||
import java.util.*
|
||||
|
||||
class ExecutionNotFoundException(id: UUID) : CommonNotFoundException("Execution not found id=$id")
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.models
|
||||
|
||||
import ru.touchin.common.spring.jpa.models.BaseUuidIdEntity
|
||||
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
|
||||
import ru.touchin.spring.workers.manager.core.execution.enums.ExecutionStatus
|
||||
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
|
||||
import java.time.ZonedDateTime
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.EnumType
|
||||
import javax.persistence.Enumerated
|
||||
import javax.persistence.JoinColumn
|
||||
import javax.persistence.ManyToOne
|
||||
import javax.persistence.Table
|
||||
|
||||
@Entity
|
||||
@Table(name = "executions", schema = SCHEMA)
|
||||
class ExecutionEntity : BaseUuidIdEntity() {
|
||||
|
||||
@Column(name = "worker_name", nullable = false)
|
||||
lateinit var workerName: String
|
||||
|
||||
@ManyToOne
|
||||
@JoinColumn(name = "trigger_id", nullable = true)
|
||||
var triggerDescriptor: TriggerDescriptorEntity? = null
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
@Column(name = "status", nullable = false)
|
||||
lateinit var status: ExecutionStatus
|
||||
|
||||
@Column(name = "error_message", nullable = true)
|
||||
var errorMessage: String? = null
|
||||
|
||||
@Column(name = "error_code", nullable = true)
|
||||
var errorCode: Int? = null
|
||||
|
||||
@Column(name = "started_at", nullable = true)
|
||||
var startedAt: ZonedDateTime? = null
|
||||
|
||||
@Column(name = "finished_at", nullable = true)
|
||||
var finishedAt: ZonedDateTime? = null
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.repositories
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository
|
||||
import org.springframework.data.repository.findByIdOrNull
|
||||
import ru.touchin.spring.workers.manager.core.execution.exceptions.ExecutionNotFoundException
|
||||
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
|
||||
import java.util.*
|
||||
|
||||
interface ExecutionRepository : JpaRepository<ExecutionEntity, UUID>
|
||||
|
||||
fun ExecutionRepository.findByIdOrThrow(id: UUID): ExecutionEntity {
|
||||
return findByIdOrNull(id)
|
||||
?: throw ExecutionNotFoundException(id)
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.services
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
|
||||
import ru.touchin.spring.workers.manager.core.execution.services.dto.CreateExecution
|
||||
import java.util.*
|
||||
|
||||
interface ExecutionCoreService{
|
||||
|
||||
fun create(create: CreateExecution): Execution
|
||||
fun setFinished(id: UUID): Execution
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.services
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.core.execution.converters.toExecution
|
||||
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
|
||||
import ru.touchin.spring.workers.manager.core.execution.enums.ExecutionStatus
|
||||
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
|
||||
import ru.touchin.spring.workers.manager.core.execution.repositories.ExecutionRepository
|
||||
import ru.touchin.spring.workers.manager.core.execution.repositories.findByIdOrThrow
|
||||
import ru.touchin.spring.workers.manager.core.execution.services.dto.CreateExecution
|
||||
import ru.touchin.spring.workers.manager.core.trigger.repositories.TriggerDescriptorRepository
|
||||
import ru.touchin.spring.workers.manager.core.trigger.repositories.findByIdOrThrow
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.*
|
||||
|
||||
@Service
|
||||
class ExecutionCoreServiceImpl(
|
||||
private val executionRepository: ExecutionRepository,
|
||||
private val triggerDescriptorRepository: TriggerDescriptorRepository,
|
||||
) : ExecutionCoreService {
|
||||
|
||||
@Transactional
|
||||
override fun create(create: CreateExecution): Execution {
|
||||
val entity = ExecutionEntity().apply {
|
||||
startedAt = ZonedDateTime.now()
|
||||
workerName = create.workerName
|
||||
triggerDescriptor = triggerDescriptorRepository.findByIdOrThrow(create.triggerId)
|
||||
status = ExecutionStatus.PROCESSING
|
||||
}
|
||||
|
||||
return executionRepository.save(entity)
|
||||
.toExecution()
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun setFinished(id: UUID): Execution {
|
||||
val entity = executionRepository.findByIdOrThrow(id)
|
||||
|
||||
entity.apply {
|
||||
finishedAt = ZonedDateTime.now()
|
||||
status = ExecutionStatus.FINISHED
|
||||
}
|
||||
|
||||
return executionRepository.save(entity)
|
||||
.toExecution()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
package ru.touchin.spring.workers.manager.core.execution.services.dto
|
||||
|
||||
import java.util.*
|
||||
|
||||
data class CreateExecution(
|
||||
val workerName: String,
|
||||
val triggerId: UUID,
|
||||
)
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.converters
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
|
||||
|
||||
fun TriggerDescriptorEntity.toTriggerDescriptor(): TriggerDescriptor {
|
||||
return TriggerDescriptor(
|
||||
id = id!!,
|
||||
name = triggerName,
|
||||
type = type,
|
||||
expression = expression,
|
||||
workerName = worker.workerName,
|
||||
disabledAt = disabledAt,
|
||||
deletedAt = deletedAt,
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.*
|
||||
|
||||
data class TriggerDescriptor(
|
||||
val id: UUID,
|
||||
val name: String,
|
||||
val type: TriggerType,
|
||||
val expression: String,
|
||||
val workerName: String,
|
||||
val disabledAt: ZonedDateTime?,
|
||||
val deletedAt: ZonedDateTime?,
|
||||
) {
|
||||
|
||||
fun isDeleted() = deletedAt != null
|
||||
|
||||
fun isDisabled() = disabledAt != null
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
if (other !is TriggerDescriptor) return false
|
||||
|
||||
if (id != other.id || expression != other.expression || type != other.type) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
return id.hashCode()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.enums
|
||||
|
||||
enum class TriggerType {
|
||||
|
||||
/**
|
||||
* Uses CRON expression for scheduling.
|
||||
* Expression example: `0 * * * * *`
|
||||
*/
|
||||
CRON,
|
||||
|
||||
/**
|
||||
* Uses numeric expressions, which mean time period in milliseconds between
|
||||
* end of previous execution and start of the next one.
|
||||
* ```
|
||||
* ------------ TIMELINE -------------------
|
||||
* [ EXECUTION ]............................
|
||||
* .............< DELAY >...................
|
||||
* ......................[ NEXT EXECUTION ]
|
||||
* -----------------------------------------
|
||||
* ```
|
||||
*/
|
||||
FIXED_DELAY,
|
||||
|
||||
/**
|
||||
* Uses numeric expressions, which mean time period in milliseconds between
|
||||
* start of previous execution and start of the next one.
|
||||
* ```
|
||||
* ------------ TIMELINE -------------------
|
||||
* [ EXECUTION ]............................
|
||||
* < DELAY >................................
|
||||
* .........[ NEXT EXECUTION ]..............
|
||||
* .........< DELAY >.......................
|
||||
* ..................[ 3RD EXECUTION ].....
|
||||
* -----------------------------------------
|
||||
* ```
|
||||
*/
|
||||
FIXED_RATE;
|
||||
|
||||
companion object {
|
||||
|
||||
fun find(name: String?): TriggerType? {
|
||||
name ?: return null
|
||||
|
||||
return values().find { it.name == name }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.exceptions
|
||||
|
||||
import ru.touchin.common.exceptions.CommonNotFoundException
|
||||
import java.util.*
|
||||
|
||||
class TriggerNotFoundException : CommonNotFoundException {
|
||||
|
||||
constructor(id: UUID) : super("TriggerDescriptor not found id=$id")
|
||||
constructor(name: String) : super("TriggerDescriptor code not found name=$name")
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.models
|
||||
|
||||
import ru.touchin.common.spring.jpa.models.BaseUuidIdEntity
|
||||
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
|
||||
import java.time.ZonedDateTime
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.EnumType
|
||||
import javax.persistence.Enumerated
|
||||
import javax.persistence.JoinColumn
|
||||
import javax.persistence.ManyToOne
|
||||
import javax.persistence.Table
|
||||
|
||||
@Entity
|
||||
@Table(name = "triggers", schema = SCHEMA)
|
||||
class TriggerDescriptorEntity : BaseUuidIdEntity() {
|
||||
|
||||
@Column(name = "trigger_name", nullable = false)
|
||||
lateinit var triggerName: String
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
@Column(name = "type", nullable = false)
|
||||
lateinit var type: TriggerType
|
||||
|
||||
@ManyToOne
|
||||
@JoinColumn(name = "worker_name", nullable = false)
|
||||
lateinit var worker: WorkerEntity
|
||||
|
||||
@Column(name = "expression", nullable = false)
|
||||
lateinit var expression: String
|
||||
|
||||
@Column(name = "disabled_at", nullable = true)
|
||||
var disabledAt: ZonedDateTime? = null
|
||||
|
||||
@Column(name = "deleted_at", nullable = true)
|
||||
var deletedAt: ZonedDateTime? = null
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.repositories
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository
|
||||
import org.springframework.data.jpa.repository.Query
|
||||
import org.springframework.data.repository.findByIdOrNull
|
||||
import ru.touchin.spring.workers.manager.core.trigger.exceptions.TriggerNotFoundException
|
||||
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
|
||||
import java.util.*
|
||||
|
||||
interface TriggerDescriptorRepository : JpaRepository<TriggerDescriptorEntity, UUID> {
|
||||
|
||||
@Query(
|
||||
"""
|
||||
SELECT td
|
||||
FROM TriggerDescriptorEntity td
|
||||
WHERE td.worker.workerName = :workerName
|
||||
AND td.deletedAt IS NULL
|
||||
"""
|
||||
)
|
||||
fun findAll(workerName: String): List<TriggerDescriptorEntity>
|
||||
|
||||
}
|
||||
|
||||
fun TriggerDescriptorRepository.findByIdOrThrow(id: UUID): TriggerDescriptorEntity {
|
||||
return findByIdOrNull(id)
|
||||
?: throw TriggerNotFoundException(id)
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.services
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import java.util.*
|
||||
|
||||
interface TriggerDescriptorCoreService {
|
||||
|
||||
fun create(create: CreateTriggerDescriptor): TriggerDescriptor
|
||||
fun create(create: List<CreateTriggerDescriptor>): List<TriggerDescriptor>
|
||||
fun setDeleted(id: UUID): TriggerDescriptor
|
||||
fun getByWorkerName(workerName: String): List<TriggerDescriptor>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.services
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.core.trigger.converters.toTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
|
||||
import ru.touchin.spring.workers.manager.core.trigger.repositories.TriggerDescriptorRepository
|
||||
import ru.touchin.spring.workers.manager.core.trigger.repositories.findByIdOrThrow
|
||||
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.repositories.WorkerRepository
|
||||
import ru.touchin.spring.workers.manager.core.worker.repositories.findByNameOrThrow
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.*
|
||||
|
||||
@Service
|
||||
class TriggerDescriptorCoreServiceImpl(
|
||||
private val triggerDescriptorRepository: TriggerDescriptorRepository,
|
||||
private val workerRepository: WorkerRepository,
|
||||
) : TriggerDescriptorCoreService {
|
||||
|
||||
@Transactional
|
||||
override fun create(create: CreateTriggerDescriptor): TriggerDescriptor {
|
||||
val entity = TriggerDescriptorEntity().apply {
|
||||
expression = create.expression
|
||||
triggerName = create.name
|
||||
type = create.type
|
||||
worker = workerRepository.findByNameOrThrow(create.workerName)
|
||||
disabledAt = create.disabledAt
|
||||
}
|
||||
|
||||
return triggerDescriptorRepository.save(entity)
|
||||
.toTriggerDescriptor()
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun create(create: List<CreateTriggerDescriptor>): List<TriggerDescriptor> {
|
||||
val entities = create.map { dto ->
|
||||
TriggerDescriptorEntity().apply {
|
||||
expression = dto.expression
|
||||
triggerName = dto.name
|
||||
type = dto.type
|
||||
worker = workerRepository.findByNameOrThrow(dto.workerName)
|
||||
disabledAt = dto.disabledAt
|
||||
}
|
||||
}
|
||||
|
||||
return triggerDescriptorRepository.saveAll(entities)
|
||||
.map(TriggerDescriptorEntity::toTriggerDescriptor)
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun setDeleted(id: UUID): TriggerDescriptor {
|
||||
val entity = triggerDescriptorRepository.findByIdOrThrow(id)
|
||||
|
||||
entity.apply { deletedAt = ZonedDateTime.now() }
|
||||
|
||||
return triggerDescriptorRepository.save(entity)
|
||||
.toTriggerDescriptor()
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
override fun getByWorkerName(workerName: String): List<TriggerDescriptor> {
|
||||
return triggerDescriptorRepository.findAll(workerName)
|
||||
.map(TriggerDescriptorEntity::toTriggerDescriptor)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
package ru.touchin.spring.workers.manager.core.trigger.services.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
data class CreateTriggerDescriptor(
|
||||
val name: String,
|
||||
val type: TriggerType,
|
||||
val expression: String,
|
||||
val disabledAt: ZonedDateTime?,
|
||||
val workerName: String,
|
||||
)
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.converters
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.converters.toTriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
|
||||
|
||||
fun WorkerEntity.toWorker(): Worker {
|
||||
return Worker(
|
||||
name = workerName,
|
||||
stoppedAt = stoppedAt,
|
||||
disabledAt = disabledAt,
|
||||
status = status,
|
||||
parallelExecutionEnabled = parallelExecutionEnabled,
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
data class Worker(
|
||||
val name: String,
|
||||
val stoppedAt: ZonedDateTime?,
|
||||
val disabledAt: ZonedDateTime?,
|
||||
val status: WorkerStatus,
|
||||
val parallelExecutionEnabled: Boolean,
|
||||
) {
|
||||
|
||||
fun isStopped() = stoppedAt != null
|
||||
|
||||
fun isDisabled() = disabledAt != null
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.enums
|
||||
|
||||
enum class WorkerStatus {
|
||||
|
||||
IDLE,
|
||||
PROCESSING
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.exceptions
|
||||
|
||||
import ru.touchin.common.exceptions.CommonNotFoundException
|
||||
|
||||
class WorkerNotFoundException(name: String): CommonNotFoundException(
|
||||
"Worker not found name=$name"
|
||||
)
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.models
|
||||
|
||||
import ru.touchin.common.spring.jpa.models.BaseEntity
|
||||
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
|
||||
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
import java.time.ZonedDateTime
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.EnumType
|
||||
import javax.persistence.Enumerated
|
||||
import javax.persistence.FetchType
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.OneToMany
|
||||
import javax.persistence.Table
|
||||
|
||||
@Entity
|
||||
@Table(name = "workers", schema = SCHEMA)
|
||||
class WorkerEntity : BaseEntity() {
|
||||
|
||||
@Id
|
||||
@Column(name = "worker_name", unique = true)
|
||||
lateinit var workerName: String
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
@Column(name = "status", nullable = false)
|
||||
lateinit var status: WorkerStatus
|
||||
|
||||
@Column(name = "disabled_at", nullable = true)
|
||||
var disabledAt: ZonedDateTime? = null
|
||||
|
||||
@Column(name = "stopped_at", nullable = true)
|
||||
var stoppedAt: ZonedDateTime? = null
|
||||
|
||||
@Column(name = "parallel_execution_enabled", nullable = false)
|
||||
var parallelExecutionEnabled: Boolean = false
|
||||
|
||||
@OneToMany(mappedBy = "worker", orphanRemoval = true, fetch = FetchType.LAZY)
|
||||
lateinit var triggerDescriptors: Set<TriggerDescriptorEntity>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.repositories
|
||||
|
||||
import org.springframework.data.jpa.repository.JpaRepository
|
||||
import org.springframework.data.jpa.repository.Lock
|
||||
import org.springframework.data.jpa.repository.Query
|
||||
import ru.touchin.spring.workers.manager.core.worker.exceptions.WorkerNotFoundException
|
||||
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
|
||||
import javax.persistence.LockModeType
|
||||
|
||||
interface WorkerRepository : JpaRepository<WorkerEntity, String> {
|
||||
|
||||
@Query(
|
||||
"""
|
||||
SELECT w
|
||||
FROM WorkerEntity w
|
||||
WHERE w.workerName = :workerName
|
||||
"""
|
||||
)
|
||||
@Lock(LockModeType.PESSIMISTIC_WRITE)
|
||||
fun findWithLock(workerName: String): WorkerEntity?
|
||||
|
||||
@Query(
|
||||
"""
|
||||
SELECT w
|
||||
FROM WorkerEntity w
|
||||
WHERE w.workerName = :workerName
|
||||
"""
|
||||
)
|
||||
fun findByName(workerName: String): WorkerEntity?
|
||||
|
||||
}
|
||||
|
||||
fun WorkerRepository.findByNameOrThrow(name: String): WorkerEntity {
|
||||
return findByName(name)
|
||||
?: throw WorkerNotFoundException(name)
|
||||
}
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.services
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.exceptions.WorkerNotFoundException
|
||||
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.dto.UpdateWorker
|
||||
|
||||
interface WorkerCoreService {
|
||||
|
||||
fun create(name: String): Worker
|
||||
fun update(update: UpdateWorker): Worker
|
||||
fun getWithLock(name: String): Worker?
|
||||
fun get(name: String): Worker = getOrNull(name) ?: throw WorkerNotFoundException(name)
|
||||
fun getOrNull(name: String): Worker?
|
||||
fun getAll(): List<Worker>
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.services
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.core.worker.converters.toWorker
|
||||
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
|
||||
import ru.touchin.spring.workers.manager.core.worker.repositories.WorkerRepository
|
||||
import ru.touchin.spring.workers.manager.core.worker.repositories.findByNameOrThrow
|
||||
import ru.touchin.spring.workers.manager.core.worker.services.dto.UpdateWorker
|
||||
|
||||
@Service
|
||||
class WorkerCoreServiceImpl(
|
||||
private val workerRepository: WorkerRepository,
|
||||
) : WorkerCoreService {
|
||||
|
||||
@Transactional
|
||||
override fun create(name: String): Worker {
|
||||
val entity = WorkerEntity().apply {
|
||||
workerName = name
|
||||
status = WorkerStatus.IDLE
|
||||
}
|
||||
|
||||
return workerRepository.save(entity)
|
||||
.toWorker()
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun update(update: UpdateWorker): Worker {
|
||||
val entity = workerRepository.findByNameOrThrow(update.name)
|
||||
.apply { status = update.status }
|
||||
|
||||
return workerRepository.save(entity)
|
||||
.toWorker()
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun getWithLock(name: String): Worker? {
|
||||
return workerRepository.findWithLock(name)
|
||||
?.toWorker()
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
override fun getOrNull(name: String): Worker? {
|
||||
return workerRepository.findByName(name)
|
||||
?.toWorker()
|
||||
}
|
||||
|
||||
@Transactional(readOnly = true)
|
||||
override fun getAll(): List<Worker> {
|
||||
return workerRepository.findAll()
|
||||
.map(WorkerEntity::toWorker)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.services
|
||||
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
import org.springframework.transaction.annotation.Transactional
|
||||
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
|
||||
import ru.touchin.spring.workers.manager.core.worker.repositories.WorkerRepository
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
@Service
|
||||
class WorkerStateServiceImpl(
|
||||
private val workerRepository: WorkerRepository
|
||||
) : WorkersStateService {
|
||||
|
||||
@Transactional
|
||||
override fun stop(name: String) {
|
||||
updateWorkerWithLock(name) {
|
||||
it.stoppedAt = ZonedDateTime.now()
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun start(name: String) {
|
||||
updateWorkerWithLock(name) {
|
||||
it.stoppedAt = null
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun disable(name: String) {
|
||||
updateWorkerWithLock(name) {
|
||||
it.disabledAt = ZonedDateTime.now()
|
||||
}
|
||||
}
|
||||
|
||||
@Transactional
|
||||
override fun enable(name: String) {
|
||||
updateWorkerWithLock(name) {
|
||||
it.disabledAt = null
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateWorkerWithLock(name: String, updater: (WorkerEntity) -> Unit) {
|
||||
val worker = workerRepository.findWithLock(name)
|
||||
?.apply(updater)
|
||||
?: throw NoSuchElementException("Worker with name $name not found")
|
||||
|
||||
workerRepository.save(worker)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.services
|
||||
|
||||
interface WorkersStateService {
|
||||
|
||||
fun stop(name: String)
|
||||
fun start(name: String)
|
||||
fun disable(name: String)
|
||||
fun enable(name: String)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
package ru.touchin.spring.workers.manager.core.worker.services.dto
|
||||
|
||||
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
|
||||
|
||||
data class UpdateWorker(
|
||||
val name: String,
|
||||
val status: WorkerStatus,
|
||||
)
|
||||
|
|
@ -0,0 +1,165 @@
|
|||
databaseChangeLog:
|
||||
- changeSet:
|
||||
id: create-table-workers
|
||||
author: sonkate
|
||||
preConditions:
|
||||
- onFail: MARK_RAN
|
||||
not:
|
||||
tableExists:
|
||||
tableName: workers
|
||||
changes:
|
||||
- createTable:
|
||||
tableName: workers
|
||||
columns:
|
||||
- column:
|
||||
name: worker_name
|
||||
type: VARCHAR(250)
|
||||
constraints:
|
||||
nullable: false
|
||||
primaryKey: true
|
||||
primaryKeyName: PK_WORKERS
|
||||
- column:
|
||||
name: status
|
||||
type: VARCHAR(250)
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: parallel_execution_enabled
|
||||
type: BOOLEAN
|
||||
defaultValueBoolean: false
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: created_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
defaultValueDate: CURRENT_TIMESTAMP
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: updated_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- column:
|
||||
name: stopped_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- column:
|
||||
name: disabled_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- changeSet:
|
||||
id: create-table-triggers
|
||||
author: sonkate
|
||||
preConditions:
|
||||
- onFail: MARK_RAN
|
||||
not:
|
||||
tableExists:
|
||||
tableName: triggers
|
||||
changes:
|
||||
- createTable:
|
||||
tableName: triggers
|
||||
columns:
|
||||
- column:
|
||||
name: id
|
||||
type: UUID
|
||||
constraints:
|
||||
nullable: false
|
||||
primaryKey: true
|
||||
primaryKeyName: PK_TRIGGERS
|
||||
- column:
|
||||
name: worker_name
|
||||
type: VARCHAR(250)
|
||||
constraints:
|
||||
nullable: false
|
||||
foreignKeyName: fk_trigger_worker
|
||||
references: workers(worker_name)
|
||||
- column:
|
||||
name: type
|
||||
type: VARCHAR(250)
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: trigger_name
|
||||
type: VARCHAR(250)
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: expression
|
||||
type: VARCHAR(250)
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: created_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
defaultValueDate: CURRENT_TIMESTAMP
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: updated_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- column:
|
||||
name: deleted_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- column:
|
||||
name: disabled_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- changeSet:
|
||||
id: create-table-executions
|
||||
author: sonkate
|
||||
preConditions:
|
||||
- onFail: MARK_RAN
|
||||
not:
|
||||
tableExists:
|
||||
tableName: executions
|
||||
changes:
|
||||
- createTable:
|
||||
tableName: executions
|
||||
columns:
|
||||
- column:
|
||||
name: id
|
||||
type: UUID
|
||||
constraints:
|
||||
nullable: false
|
||||
primaryKey: true
|
||||
primaryKeyName: PK_EXECUTIONS
|
||||
- column:
|
||||
name: worker_name
|
||||
type: VARCHAR(250)
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: trigger_id
|
||||
type: UUID
|
||||
- column:
|
||||
constraints:
|
||||
nullable: false
|
||||
name: status
|
||||
type: VARCHAR(250)
|
||||
- column:
|
||||
name: error_message
|
||||
type: VARCHAR(250)
|
||||
- column:
|
||||
name: error_code
|
||||
type: int
|
||||
- column:
|
||||
name: created_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
defaultValueDate: CURRENT_TIMESTAMP
|
||||
constraints:
|
||||
nullable: false
|
||||
- column:
|
||||
name: updated_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- column:
|
||||
name: started_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- column:
|
||||
name: finished_at
|
||||
type: TIMESTAMP WITH TIME ZONE
|
||||
- changeSet:
|
||||
id: create-index-for-executions
|
||||
author: sonkate
|
||||
changes:
|
||||
- sql:
|
||||
sql: |
|
||||
CREATE INDEX fk_trigger_worker_idx
|
||||
ON ${schemaName}.triggers (worker_name ASC)
|
||||
WHERE deleted_at IS NULL;
|
||||
endDelimeter: ;
|
||||
Loading…
Reference in New Issue