Compare commits

...

5 Commits

Author SHA1 Message Date
Mikhail Yasnov 5384dd3430 Add isDisabled method 2021-12-03 18:45:16 +03:00
Mikhail Yasnov 03e0676693 Fix README.md 2021-12-03 12:03:24 +03:00
Mikhail Yasnov 7b1a7cfc5e Fix tests and updating trigger expression 2021-12-03 12:00:36 +03:00
Mikhail Yasnov 7960e5abac Update worker manager code style 2021-11-25 19:28:55 +03:00
Mikhail Yasnov 15fe61bde5 Add spring workers manager 2021-11-18 15:54:47 +03:00
84 changed files with 2572 additions and 0 deletions

View File

@ -199,3 +199,10 @@ Disables Spring OAuth2 resource server for testing.
## s3-storage
Amazon S3 support.
# spring-workers-agent
Framework for clustered scheduling based on Quartz and Spring Data.
Stores all data in a database (Postgres by default).
Used as manageable alternative for Spring's `@Scheduled`.

View File

@ -51,3 +51,6 @@ include("security-resource-server-custom-jwt-configuration")
include("security-resource-server-test-jwt-configuration")
include("security-jwt-common")
include("s3-storage")
include("spring-workers-manager-core")
include("spring-workers-manager-agent")
include("spring-workers-manager-api")

View File

@ -0,0 +1,77 @@
# Workers Agent
Framework for clustered scheduling based on Quartz and Spring Data.
Stores all data in a database (Postgres by default).
Used as manageable alternative for Spring's `@Scheduled`.
## How to
1. Add dependency
```
implementation project(":spring-workers-manager-agent")
```
1. Define a job by annotations to be scheduled in any component.
```
package ru.touchin
@Component
class MyJob {
@ScheduledAction
@InitTrigger(type = "CRON", expression = "0 15 * * * ?")
fun sayHello(){
println("Hello, world!")
}
}
```
1. Enable job in `application.properties`
```
workers.names=ru.touchin.MyJob
```
or:
```
workers.names=*
```
1. Start the application.
## Annotations
### @ScheduledAction
Registers method as action of some job.
Parameters:
- `name` - name of job. Defaults to class full name.
Must be unique in application scope.
### @Trigger
Declares default trigger for the job.
Default triggers are created when launching job first time.
Parameters:
- `name` - Optional name for trigger. Defaults to some (maybe random) string.
Name must be unique in scope of corresponding job.
- `type` - Trigger type. See triggers types.
SpEL expressions are supported like in `@Scheduled` annotation of Spring.
- `expression` - The value for trigger.
SpEL expressions are supported like in `@Scheduled` annotation of Spring.
## Configuration
### Enabling workers
Agent ignores workers by default. To enable worker add its name to `worker.names` property.
Example:
```
worker.names=com.eample.Job1,\
com.example.Job2
```
#### Patterns for names
`workers.names` support Glob-like patterns.
- Asterisk (`*`) symbol is for "zero or more any symbols" (as `.*` in regex)
- Question mark (`?`) is for "any single symbol" (as `.` in regex)
## TODO
- External data source, unrelated to application code.

View File

@ -0,0 +1,21 @@
plugins {
id("kotlin")
id("kotlin-spring")
id("maven-publish")
}
dependencies {
implementation(project(":common-spring"))
implementation(project(":common-spring-jpa"))
implementation(project(":spring-workers-manager-core"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.data:spring-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-quartz")
testImplementation(project(":common-spring-test-jpa"))
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin")
}

View File

@ -0,0 +1,30 @@
package ru.touchin.spring.workers.manager.agent
import org.springframework.boot.context.event.ApplicationStartedEvent
import org.springframework.context.event.EventListener
import org.springframework.core.annotation.Order
import org.springframework.stereotype.Component
import ru.touchin.common.spring.Ordered
import ru.touchin.spring.workers.manager.agent.config.WorkerInitializer
import ru.touchin.spring.workers.manager.agent.scheduled.WorkerManagerWatcher
import ru.touchin.spring.workers.manager.core.config.LiquibaseRunner
/**
* Prepares required resources and initializes agent.
*/
@Component
class AgentInitializer(
private val liquibase: LiquibaseRunner,
private val workerInitializer: WorkerInitializer,
private val workerWatcher: WorkerManagerWatcher
) {
@EventListener(value = [ApplicationStartedEvent::class])
@Order(Ordered.HIGH)
fun execute() {
liquibase.run()
workerInitializer.init()
workerWatcher.init()
}
}

View File

@ -0,0 +1,80 @@
package ru.touchin.spring.workers.manager.agent.annotation_config
import org.springframework.beans.factory.config.BeanPostProcessor
import org.springframework.stereotype.Component
import ru.touchin.spring.workers.manager.agent.annotation_config.job_factory.AnnotationConfigJobFactory
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import java.lang.reflect.Method
/**
* 1. Scans components for [ScheduledAction] annotation
* 2. Keeps metadata of that components
* 3. Creates [BaseJob] for methods created
*/
@Component
class AnnotationConfigCollectingBeanPostProcessor(
private val jobFactories: List<AnnotationConfigJobFactory>
) : BeanPostProcessor {
val jobs: MutableList<BaseJob> = ArrayList()
val jobName2Method: MutableMap<String, Method> = HashMap()
/**
* Bean name -> class of this bean.
*
* Contains only entries for classes with [ScheduledAction] annotation.
*/
private val beanName2OriginalClass: MutableMap<String, Class<*>> = HashMap()
override fun postProcessBeforeInitialization(bean: Any, beanName: String): Any? {
val hasMethodsForScheduling = bean.javaClass.declaredMethods
.any { it.isAnnotationPresent(ScheduledAction::class.java) }
if (hasMethodsForScheduling) {
beanName2OriginalClass[beanName] = bean.javaClass
}
return bean
}
override fun postProcessAfterInitialization(bean: Any, beanName: String): Any? {
val clazz = beanName2OriginalClass[beanName]
?: return bean
val actionMethod = findActionMethod(clazz)
val createdJobs = jobFactories.flatMap { it.create(bean, actionMethod) }
createdJobs.forEach {
jobName2Method[it.getName()] = actionMethod
}
jobs.addAll(createdJobs)
return bean
}
companion object {
private fun findActionMethod(clazz: Class<*>): Method {
return clazz.declaredMethods
.filter { it.isAnnotationPresent(ScheduledAction::class.java) }
.also { annotatedMethods ->
check(annotatedMethods.size <= 1) {
"Class `${clazz.name}` has more that one methods with annotation @Scheduled. " +
"Methods: $annotatedMethods"
}
}
.onEach { annotatedMethod ->
check(annotatedMethod.parameters.isEmpty()) {
"Method ${clazz.name}:${annotatedMethod.name}' must not have arguments for scheduling, " +
"but requires ${annotatedMethod.parameters.size} parameters"
}
}
.single()
}
}
}

View File

@ -0,0 +1,41 @@
package ru.touchin.spring.workers.manager.agent.annotation_config
import org.springframework.stereotype.Component
import org.springframework.util.LinkedMultiValueMap
import org.springframework.util.MultiValueMap
import ru.touchin.spring.workers.manager.agent.annotation_config.trigger_factory.AnnotationConfigTriggerFactory
import ru.touchin.spring.workers.manager.agent.trigger.InitialTriggerDescriptorsProvider
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
@Component
class AnnotationConfigInitialTriggerDescriptorsProvider(
private val triggersCollector: AnnotationConfigCollectingBeanPostProcessor,
private val triggerFactories: List<AnnotationConfigTriggerFactory>
) : InitialTriggerDescriptorsProvider {
val jobName2Triggers: MultiValueMap<String, CreateTriggerDescriptor> = LinkedMultiValueMap()
override fun applicableFor(worker: Worker): Boolean {
val actionMethod = triggersCollector.jobName2Method[worker.name]
?: return false
val triggers = triggerFactories.flatMap { it.create(worker, actionMethod) }
if (triggers.isEmpty()) {
return false
}
jobName2Triggers.addAll(worker.name, triggers)
return true
}
override fun createInitialTriggerDescriptors(worker: Worker): List<CreateTriggerDescriptor> {
return jobName2Triggers[worker.name].orEmpty()
}
}

View File

@ -0,0 +1,14 @@
package ru.touchin.spring.workers.manager.agent.annotation_config
import org.springframework.stereotype.Component
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import ru.touchin.spring.workers.manager.agent.registry.JobProvider
@Component
class AnnotationConfigJobProvider(
private val jobsCollector: AnnotationConfigCollectingBeanPostProcessor
) : JobProvider {
override fun getJobs(): List<BaseJob> = jobsCollector.jobs
}

View File

@ -0,0 +1,15 @@
package ru.touchin.spring.workers.manager.agent.annotation_config
import java.lang.annotation.Inherited
/**
* Adds default trigger to [ScheduledAction].
* Default trigger is submitted if job is launched first time.
*/
@Inherited
@Target(AnnotationTarget.FUNCTION)
annotation class InitTrigger(
val name: String = "",
val type: String,
val expression: String
)

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager.agent.annotation_config
@Target(AnnotationTarget.FUNCTION)
annotation class ScheduledAction(
/**
* Job name. Defaults to class name.
*/
val name: String = ""
)

View File

@ -0,0 +1,19 @@
package ru.touchin.spring.workers.manager.agent.annotation_config.job_factory
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import java.lang.reflect.Method
/**
* Invoked when found method with [ScheduledAction] annotation in some bean.
*
* Used to read jobs settings from annotations in components.
*/
interface AnnotationConfigJobFactory {
/**
* Warning: As Spring could substitute actual beans with proxy-objects,
* you must carefully check if [actionMethod] is applicable for [bean].
*/
fun create(bean: Any, actionMethod: Method): List<BaseJob>
}

View File

@ -0,0 +1,50 @@
package ru.touchin.spring.workers.manager.agent.annotation_config.job_factory
import org.springframework.stereotype.Component
import ru.touchin.spring.workers.manager.agent.annotation_config.ScheduledAction
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import java.lang.reflect.Method
/**
* Creates job instances for every annotated action method.
*/
@Component
class ScheduledActionAnnotationConfigJobFactory : AnnotationConfigJobFactory {
override fun create(bean: Any, actionMethod: Method): List<BaseJob> {
val job = createJobForBean(bean, actionMethod)
return listOf(job)
}
companion object {
private fun createJobForBean(bean: Any, annotatedMethod: Method): BaseJob {
val targetMethod = bean.javaClass.getMethod(annotatedMethod)
val annotation = annotatedMethod.getAnnotation(ScheduledAction::class.java)
val jobName: String = annotation.name.takeIf { it.isNotBlank() }
?: annotatedMethod.declaringClass.name
return createJob(jobName) { targetMethod.invoke(bean) }
}
private fun Class<*>.getMethod(sampleMethod: Method): Method {
return getMethod(sampleMethod.name, *sampleMethod.parameterTypes)
.apply { isAccessible = true }
}
private fun createJob(jobName: String, func: () -> Unit): BaseJob = object : BaseJob {
override fun getName() = jobName
override fun run() {
func.invoke()
}
}
}
}

View File

@ -0,0 +1,14 @@
package ru.touchin.spring.workers.manager.agent.annotation_config.trigger_factory
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import java.lang.reflect.Method
/**
* Used to create initial triggers for new workers
*/
interface AnnotationConfigTriggerFactory {
fun create(worker: Worker, actionMethod: Method): List<CreateTriggerDescriptor>
}

View File

@ -0,0 +1,49 @@
package ru.touchin.spring.workers.manager.agent.annotation_config.trigger_factory
import org.springframework.context.EmbeddedValueResolverAware
import org.springframework.stereotype.Component
import org.springframework.util.StringValueResolver
import ru.touchin.spring.workers.manager.agent.annotation_config.InitTrigger
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import java.lang.reflect.Method
/**
* Creates triggers for methods annotated with [ru.touchin.spring.workers.manager.agent.annotation_config.InitTrigger] annotation
*/
@Component
class TriggerAnnotationConfigTriggerFactory
: AnnotationConfigTriggerFactory,
EmbeddedValueResolverAware {
lateinit var valueResolver: StringValueResolver
override fun setEmbeddedValueResolver(resolver: StringValueResolver) {
valueResolver = resolver
}
override fun create(worker: Worker, actionMethod: Method): List<CreateTriggerDescriptor> {
val triggerAnnotation = actionMethod.getAnnotation(InitTrigger::class.java)
?: return emptyList()
val resolvedType = valueResolver.resolveStringValue(triggerAnnotation.type)
val triggerType = TriggerType.find(resolvedType)
?: throw IllegalArgumentException("Trigger type for name $resolvedType dies not exist")
val expression = valueResolver.resolveStringValue(triggerAnnotation.expression)
?: throw NullPointerException("Trigger for worker '${worker.name}' has null expression")
val trigger = CreateTriggerDescriptor(
name = "${triggerType.name}_${expression.replace(" ", "_")}",
type = triggerType,
workerName = worker.name,
expression = valueResolver.resolveStringValue(triggerAnnotation.expression)
?: throw NullPointerException("Trigger for worker '${worker.name}' has null expression"),
disabledAt = null,
)
return listOf(trigger)
}
}

View File

@ -0,0 +1,9 @@
package ru.touchin.spring.workers.manager.agent.common.base
interface BaseJob {
fun run()
fun getName(): String = this::class.java.name
}

View File

@ -0,0 +1,54 @@
package ru.touchin.spring.workers.manager.agent.common.utils
/**
* Glob is simple replacement for Regex.
* It uses only two special chars: * (any substring) and ? (any char)
* This implementation has no support for characters escaping.
* Glob has no capturing features.
*/
object Glob {
private const val ANY_SUBSTRING_SYMBOL = '*'
private const val ANY_CHAR_SYMBOL = '?'
/**
* Example: "I love patterns" matches "I * pa?t?er?s"
*
* Based on [StackOverflow answer](https://stackoverflow.com/a/3687031)
*/
fun matches(text: String, pattern: String): Boolean {
val starPosition = pattern.indexOf(ANY_SUBSTRING_SYMBOL)
val headPattern: String = if (starPosition == -1) pattern else pattern.substring(0, starPosition)
if (headPattern.length > text.length) {
return false
}
// handle the part up to the first *
for (i in headPattern.indices) {
if (
headPattern[i] != ANY_CHAR_SYMBOL
&& headPattern[i] != text[i]
) {
return false
}
}
if (starPosition == -1) {
return headPattern.length == text.length
}
val tailPattern: String = pattern.substring(starPosition + 1)
for (i in headPattern.length..text.length) {
if (matches(text.substring(i), tailPattern)) {
return true
}
}
return false
}
}

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager.agent.config
import org.springframework.context.annotation.Import
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration
/**
* Annotation to enable Workers Manager module in Spring components via annotations.
*/
@Import(WorkersManagerConfiguration::class)
annotation class EnableWorkersManager

View File

@ -0,0 +1,39 @@
package ru.touchin.spring.workers.manager.agent.config
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.agent.trigger.services.TriggerDescriptorService
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
import ru.touchin.spring.workers.manager.core.worker.services.WorkersStateService
@Component
class WorkerInitializer(
private val triggerDescriptorAgentService: TriggerDescriptorService,
private val jobDefinitionsRegistry: JobDefinitionsRegistry,
private val workerCoreService: WorkerCoreService,
private val workerStateService: WorkersStateService,
) {
@Transactional
fun init() {
initWorkers(jobDefinitionsRegistry.jobNames)
}
private fun initWorkers(jobNames: Set<String>) {
jobNames.forEach(this::getOrCreateWorkerWithTriggers)
}
private fun getOrCreateWorkerWithTriggers(name: String) {
workerCoreService.getWithLock(name)
?.let { workerStateService.start(it.name) }
?: createWorkerWithTriggers(name)
}
private fun createWorkerWithTriggers(name: String) {
val worker = workerCoreService.create(name)
triggerDescriptorAgentService.createDefaultTriggerDescriptors(worker)
}
}

View File

@ -0,0 +1,36 @@
package ru.touchin.spring.workers.manager.agent.quartz
import org.quartz.Job
import org.quartz.JobBuilder
import org.quartz.JobDataMap
import org.quartz.JobExecutionContext
import org.quartz.JobExecutionException
typealias JobFunction = (JobExecutionContext) -> Unit
class RunnableJob : Job {
override fun execute(context: JobExecutionContext) {
try {
@Suppress("UNCHECKED_CAST")
(context.jobDetail.jobDataMap[ACTION] as JobFunction).also { action ->
action.invoke(context)
}
} catch (e: Exception) {
throw JobExecutionException(e)
}
}
companion object {
private const val ACTION = "ACTION"
fun initJobBuilder(action: JobFunction): JobBuilder {
return JobBuilder
.newJob(RunnableJob::class.java)
.usingJobData(JobDataMap(mapOf(ACTION to action)))
}
}
}

View File

@ -0,0 +1,64 @@
package ru.touchin.spring.workers.manager.agent.registry
import org.quartz.JobDetail
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import org.springframework.util.LinkedMultiValueMap
import ru.touchin.spring.workers.manager.agent.worker.executors.WorkerActionExecutor
import ru.touchin.spring.workers.manager.agent.quartz.RunnableJob
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import ru.touchin.spring.workers.manager.agent.common.utils.Glob
@Component
class JobDefinitionsRegistry(
@Value("#{'\${workers.names}'.split(',')}")
workersNamesPatterns: Set<String>,
providers: List<JobProvider>,
private val workerActionExecutor: WorkerActionExecutor
) {
final val jobs: Map<String, BaseJob>
private final val jobDetails: Map<String, JobDetail>
final val jobNames: Set<String>
init {
val allJobs = providers.flatMap(JobProvider::getJobs)
val name2jobsList = LinkedMultiValueMap<String, BaseJob>()
allJobs.forEach { job ->
name2jobsList.add(job.getName(), job)
}
name2jobsList.forEach { (name, jobs) ->
check(jobs.size <= 1) { "There are ${jobs.size} jobs with name '$name'. Job names must be unique" }
}
val name2job = name2jobsList
.toSingleValueMap()
.filterKeys { name -> workersNamesPatterns.any { pattern -> Glob.matches(name, pattern) } }
jobNames = name2job.keys
jobs = name2job
jobDetails = name2job
.mapValues { (_, job) ->
createJobDetail(job)
}
}
fun getJobDetail(jobName: String): JobDetail? = jobDetails[jobName]
private fun createJobDetail(job: BaseJob): JobDetail {
return RunnableJob
.initJobBuilder { context ->
workerActionExecutor.executeJobAction(job, context.trigger)
}
.withIdentity(job.getName())
.build()
}
}

View File

@ -0,0 +1,9 @@
package ru.touchin.spring.workers.manager.agent.registry
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
interface JobProvider {
fun getJobs(): List<BaseJob>
}

View File

@ -0,0 +1,15 @@
package ru.touchin.spring.workers.manager.agent.registry
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
@Component
class SimpleJobProvider : JobProvider {
@Autowired(required = false)
var jobBeans: List<BaseJob> = emptyList()
override fun getJobs(): List<BaseJob> = jobBeans
}

View File

@ -0,0 +1,37 @@
package ru.touchin.spring.workers.manager.agent.registry
import org.quartz.Trigger
import org.springframework.stereotype.Component
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import java.util.concurrent.ConcurrentHashMap
@Component
class TriggersRegistry {
// Concurrent impl to prevent java.util.ConcurrentModificationException
private val descriptors2triggers: MutableMap<TriggerDescriptor, Trigger> = ConcurrentHashMap()
fun getTriggerByDescriptor(descriptor: TriggerDescriptor): Trigger? {
return descriptors2triggers[descriptor]
}
fun getDescriptorByTrigger(trigger: Trigger): TriggerDescriptor? {
return descriptors2triggers
.entries
.firstOrNull { it.value == trigger }
?.key
}
fun getDescriptors(): List<TriggerDescriptor> {
return descriptors2triggers.keys.toList()
}
fun putTrigger(descriptor: TriggerDescriptor, trigger: Trigger) {
descriptors2triggers[descriptor] = trigger
}
fun remove(triggerDescriptors: List<TriggerDescriptor>) {
return triggerDescriptors.forEach { descriptors2triggers.remove(it) }
}
}

View File

@ -0,0 +1,67 @@
package ru.touchin.spring.workers.manager.agent.scheduled
import org.quartz.Scheduler
import org.quartz.SimpleScheduleBuilder
import org.quartz.TriggerBuilder
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.touchin.spring.workers.manager.agent.quartz.RunnableJob
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
import ru.touchin.spring.workers.manager.agent.scheduled.services.SchedulerService
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
/**
* Class is proceeding regular synchronisation trigger descriptors from db and quartz scheduled triggers
*/
@Component
class WorkerManagerWatcher(
@Value("\${workers.watcher.sync.interval}")
private val watcherSyncInterval: Long,
private val jobDefinitionsRegistry: JobDefinitionsRegistry,
private val scheduleTriggerService: SchedulerService,
private val triggersRegistry: TriggersRegistry,
private val triggerDescriptorCoreService: TriggerDescriptorCoreService,
private val quartzScheduler: Scheduler
) {
fun init() {
val systemJobDetail = RunnableJob
.initJobBuilder { sync() }
.withIdentity(SYSTEM_JOB_NAME, SYSTEM_JOB_GROUP)
.build()
val systemTrigger = TriggerBuilder.newTrigger()
.forJob(systemJobDetail)
.withIdentity("${SYSTEM_JOB_NAME}_trigger")
.withSchedule(SimpleScheduleBuilder
.simpleSchedule()
.repeatForever()
.withIntervalInMilliseconds(watcherSyncInterval)
)
.build()
quartzScheduler.scheduleJob(systemJobDetail, systemTrigger)
}
fun sync() {
val currentTriggerDescriptors = triggersRegistry.getDescriptors()
val actualTriggerDescriptors = jobDefinitionsRegistry.jobs
.flatMap { (jobName, _) -> triggerDescriptorCoreService.getByWorkerName(jobName) }
.filter { !it.isDisabled() }
val deletedTriggerDescriptors = currentTriggerDescriptors - actualTriggerDescriptors.toSet()
scheduleTriggerService.unscheduleTriggers(deletedTriggerDescriptors)
triggersRegistry.remove(deletedTriggerDescriptors)
val newTriggerDescriptors = actualTriggerDescriptors - currentTriggerDescriptors.toSet()
scheduleTriggerService.scheduleTriggers(newTriggerDescriptors)
}
companion object {
private const val SYSTEM_JOB_GROUP = "SYSTEM"
private const val SYSTEM_JOB_NAME = "system_worker_check"
}
}

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager.agent.scheduled.services
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
interface SchedulerService {
fun scheduleTriggers(triggerDescriptors: List<TriggerDescriptor>)
fun unscheduleTriggers(triggerDescriptors: List<TriggerDescriptor>)
}

View File

@ -0,0 +1,85 @@
package ru.touchin.spring.workers.manager.agent.scheduled.services
import org.quartz.CronScheduleBuilder
import org.quartz.JobDetail
import org.quartz.ScheduleBuilder
import org.quartz.Scheduler
import org.quartz.SimpleScheduleBuilder
import org.quartz.Trigger
import org.quartz.TriggerBuilder
import org.springframework.stereotype.Service
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
@Service
class SchedulerServiceImpl(
private val quartzScheduler: Scheduler,
private val jobDefinitionsRegistry: JobDefinitionsRegistry,
private val triggersRegistry: TriggersRegistry
) : SchedulerService {
override fun scheduleTriggers(
triggerDescriptors: List<TriggerDescriptor>
) {
triggerDescriptors.forEach(this::scheduleTrigger)
}
override fun unscheduleTriggers(
triggerDescriptors: List<TriggerDescriptor>
) {
triggerDescriptors.forEach { descriptor ->
val trigger = triggersRegistry.getTriggerByDescriptor(descriptor)
if (trigger != null) {
quartzScheduler.unscheduleJob(trigger.key)
}
}
}
private fun scheduleTrigger(descriptor: TriggerDescriptor) {
val jobDetail = jobDefinitionsRegistry.getJobDetail(descriptor.workerName)
if (jobDetail != null) {
val trigger = createTrigger(jobDetail, descriptor)
triggersRegistry.putTrigger(descriptor, trigger)
if (!quartzScheduler.checkExists(trigger.key)) {
if (quartzScheduler.checkExists(jobDetail.key)) {
quartzScheduler.scheduleJob(trigger)
} else {
quartzScheduler.scheduleJob(jobDetail, trigger)
}
}
}
}
private fun createTrigger(job: JobDetail, triggerDescriptor: TriggerDescriptor): Trigger {
return TriggerBuilder.newTrigger().forJob(job)
.withIdentity(createTriggerName(job, triggerDescriptor))
.withSchedule(getScheduleBuilder(triggerDescriptor))
.build()
}
private fun createTriggerName(job: JobDetail, triggerDescriptor: TriggerDescriptor) =
"${job.key.name}_${triggerDescriptor.id}_trigger"
private fun getScheduleBuilder(triggerDescriptor: TriggerDescriptor): ScheduleBuilder<out Trigger> {
return when (triggerDescriptor.type) {
TriggerType.CRON -> cronSchedule(triggerDescriptor.expression)
TriggerType.FIXED_RATE -> fixedRateSchedule(triggerDescriptor.expression.toLong())
TriggerType.FIXED_DELAY -> fixedRateSchedule(triggerDescriptor.expression.toLong())
}
}
private fun fixedRateSchedule(interval: Long): SimpleScheduleBuilder {
return SimpleScheduleBuilder.simpleSchedule().repeatForever().withIntervalInMilliseconds(interval)
}
private fun cronSchedule(expression: String): CronScheduleBuilder {
return CronScheduleBuilder.cronSchedule(expression)
}
}

View File

@ -0,0 +1,14 @@
package ru.touchin.spring.workers.manager.agent.trigger
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
/**
* When the agent is starting first time, then initial (default) triggers should be created for new workers.
*/
interface InitialTriggerDescriptorsProvider {
fun applicableFor(worker: Worker): Boolean
fun createInitialTriggerDescriptors(worker: Worker): List<CreateTriggerDescriptor>
}

View File

@ -0,0 +1,20 @@
package ru.touchin.spring.workers.manager.agent.trigger
import org.springframework.core.annotation.Order
import org.springframework.stereotype.Component
import ru.touchin.common.spring.Ordered
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
/**
* When no triggers could be created by any provider, then no triggers created.
*/
@Component
@Order(Ordered.LOWER)
class NoInitialTriggersDescriptorsProvider : InitialTriggerDescriptorsProvider {
override fun applicableFor(worker: Worker): Boolean = true
override fun createInitialTriggerDescriptors(worker: Worker): List<CreateTriggerDescriptor> = emptyList()
}

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager.agent.trigger.services
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
interface TriggerDescriptorService {
fun createDefaultTriggerDescriptors(worker: Worker): List<TriggerDescriptor>
}

View File

@ -0,0 +1,25 @@
package ru.touchin.spring.workers.manager.agent.trigger.services
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.agent.trigger.InitialTriggerDescriptorsProvider
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
@Service
class TriggerDescriptorServiceImpl(
private val triggerDescriptorCoreService: TriggerDescriptorCoreService,
private val initialTriggerDescriptorProviders: List<InitialTriggerDescriptorsProvider>,
) : TriggerDescriptorService {
@Transactional
override fun createDefaultTriggerDescriptors(worker: Worker): List<TriggerDescriptor> {
return triggerDescriptorCoreService.create(
initialTriggerDescriptorProviders
.filter { it.applicableFor(worker) }
.flatMap { it.createInitialTriggerDescriptors(worker) }
)
}
}

View File

@ -0,0 +1,14 @@
package ru.touchin.spring.workers.manager.agent.worker.executors
import org.quartz.Trigger
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
interface WorkerActionExecutor {
fun prepareExecution(job: BaseJob, currentTrigger: Trigger): Execution?
fun finishWorkerProcessing(job: BaseJob)
fun executeJobAction(job: BaseJob, currentTrigger: Trigger)
}

View File

@ -0,0 +1,77 @@
package ru.touchin.spring.workers.manager.agent.worker.executors
import org.quartz.Trigger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
import ru.touchin.spring.workers.manager.core.execution.services.ExecutionCoreService
import ru.touchin.spring.workers.manager.core.execution.services.dto.CreateExecution
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
import ru.touchin.spring.workers.manager.core.worker.services.dto.UpdateWorker
@Service
class WorkerActionExecutorImpl(
private val executionCoreService: ExecutionCoreService,
private val workerCoreService: WorkerCoreService,
private val triggersRegistry: TriggersRegistry,
) : WorkerActionExecutor {
@Autowired
lateinit var workerActionExecutor: WorkerActionExecutor
override fun executeJobAction(job: BaseJob, currentTrigger: Trigger) {
val execution = workerActionExecutor.prepareExecution(job, currentTrigger)
execution?.let {
job.run()
executionCoreService.setFinished(execution.id)
}
workerActionExecutor.finishWorkerProcessing(job)
}
@Transactional
override fun prepareExecution(job: BaseJob, currentTrigger: Trigger): Execution? {
val currentWorker = workerCoreService.getWithLock(job.getName())
return currentWorker
?.takeIf { !it.isStopped() }
?.let { worker ->
val currentTriggerDescriptor = triggersRegistry.getDescriptorByTrigger(currentTrigger)
currentTriggerDescriptor?.let {
setWorkerStatus(worker.name, WorkerStatus.PROCESSING)
executionCoreService.create(
CreateExecution(
workerName = worker.name,
triggerId = currentTriggerDescriptor.id,
)
)
}
}
}
@Transactional
override fun finishWorkerProcessing(job: BaseJob) {
val currentWorker = workerCoreService.getWithLock(job.getName())
?: return
setWorkerStatus(currentWorker.name, WorkerStatus.IDLE)
}
private fun setWorkerStatus(name: String, status: WorkerStatus) {
workerCoreService.update(
UpdateWorker(
name = name,
status = status,
)
)
}
}

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.context.annotation.Import
@SpringBootApplication
@TestConfiguration
@Import(WorkersManagerConfiguration::class)
class TestApplication

View File

@ -0,0 +1,32 @@
package ru.touchin.spring.workers.manager.agent.common.utils
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
internal class GlobTest {
@Test
fun matches() {
assertMatches("", "*****")
assertMatches("my perfect text", "my*text")
assertMatches("my perfect text", "my*?text")
assertMatches("moon", "????")
assertMatches("(abc)", "?abc?")
assertMatches("****", "????")
assertNotMatches("", "?")
assertNotMatches("mo", "????")
assertNotMatches("moonmoon", "????")
assertNotMatches("my perfect text", "our*text")
}
private fun assertMatches(text: String, pattern: String) {
assertTrue(Glob.matches(text, pattern))
}
private fun assertNotMatches(text: String, pattern: String) {
assertFalse(Glob.matches(text, pattern))
}
}

View File

@ -0,0 +1,85 @@
package ru.touchin.spring.workers.manager.agent.config
import com.nhaarman.mockitokotlin2.doNothing
import com.nhaarman.mockitokotlin2.mock
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mockito
import org.mockito.Mockito.doAnswer
import org.mockito.Mockito.never
import org.mockito.Mockito.verify
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
import ru.touchin.spring.workers.manager.agent.registry.JobProvider
import ru.touchin.spring.workers.manager.agent.trigger.services.TriggerDescriptorService
import ru.touchin.spring.workers.manager.agent.worker.executors.WorkerActionExecutorImpl
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
import ru.touchin.spring.workers.manager.core.worker.services.WorkersStateService
import ru.touchin.spring.workers.manager.utils.MockitoUtils.anyx
internal class WorkerInitializerTest {
private val baseJob = mock<BaseJob> {
on(it.getName()).thenReturn(BASE_WORKER_NAME)
}
private val simpleJobProvider = mock<JobProvider> {
on(it.getJobs()).thenReturn(listOf(baseJob))
}
private val workerActionExecutor = Mockito.mock(WorkerActionExecutorImpl::class.java)
private val jobDefinitionsRegistry = JobDefinitionsRegistry(setOf(BASE_WORKER_NAME), listOf(simpleJobProvider), workerActionExecutor)
private val workerCoreService = Mockito.mock(WorkerCoreService::class.java)
private val workersStateService = Mockito.mock(WorkersStateService::class.java)
private val triggerDescriptorService = Mockito.mock(TriggerDescriptorService::class.java)
private val workerInitializer = WorkerInitializer(
triggerDescriptorService,
jobDefinitionsRegistry,
workerCoreService,
workersStateService,
)
private val baseWorker = Worker(
name = BASE_WORKER_NAME,
status = WorkerStatus.IDLE,
disabledAt = null,
stoppedAt = null,
parallelExecutionEnabled = false,
)
@Test
fun checkSyncWithExistingWorker() {
doAnswer { baseWorker }.`when`(workerCoreService).getWithLock(BASE_WORKER_NAME)
doNothing().`when`(workersStateService).start(anyx())
workerInitializer.init()
verify(workersStateService).start(baseWorker.name)
verify(workerCoreService, never()).create(anyString())
verify(triggerDescriptorService, never()).createDefaultTriggerDescriptors(baseWorker)
}
@Test
fun checkSyncAndCreateNewWorker() {
doAnswer { null }.`when`(workerCoreService).getWithLock(BASE_WORKER_NAME)
doAnswer { baseWorker }.`when`(workerCoreService).create(BASE_WORKER_NAME)
workerInitializer.init()
verify(workerCoreService).create(BASE_WORKER_NAME)
verify(triggerDescriptorService).createDefaultTriggerDescriptors(baseWorker)
}
companion object {
private const val BASE_WORKER_NAME = "baseWorker"
}
}

View File

@ -0,0 +1,134 @@
package ru.touchin.spring.workers.manager.agent.scheduled
import com.nhaarman.mockitokotlin2.mock
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers
import org.mockito.Mockito
import org.mockito.Mockito.anyString
import org.mockito.Mockito.doAnswer
import org.mockito.Mockito.verify
import org.quartz.Scheduler
import ru.touchin.spring.workers.manager.agent.common.base.BaseJob
import ru.touchin.spring.workers.manager.agent.registry.JobDefinitionsRegistry
import ru.touchin.spring.workers.manager.agent.registry.JobProvider
import ru.touchin.spring.workers.manager.agent.registry.TriggersRegistry
import ru.touchin.spring.workers.manager.agent.scheduled.services.SchedulerServiceImpl
import ru.touchin.spring.workers.manager.agent.worker.executors.WorkerActionExecutorImpl
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
import java.util.*
internal class WorkerManagerWatcherTest {
private val baseJob = mock<BaseJob> {
on(it.getName()).thenReturn(BASE_WORKER_NAME)
}
private val simpleJobProvider = mock<JobProvider> {
on(it.getJobs()).thenReturn(listOf(baseJob))
}
private val workerActionExecutor = Mockito.mock(WorkerActionExecutorImpl::class.java)
private val jobDefinitionsRegistry = JobDefinitionsRegistry(setOf(BASE_WORKER_NAME), listOf(simpleJobProvider), workerActionExecutor)
private val triggerRegistry = Mockito.mock(TriggersRegistry::class.java)
private val watcherSyncInterval = 1000L
private val triggerDescriptorCoreService = Mockito.mock(TriggerDescriptorCoreService::class.java)
private val scheduleTriggerService = Mockito.mock(SchedulerServiceImpl::class.java)
private val quartzScheduler = Mockito.mock(Scheduler::class.java)
private val workerManagerWatcher = WorkerManagerWatcher(
watcherSyncInterval,
jobDefinitionsRegistry,
scheduleTriggerService,
triggerRegistry,
triggerDescriptorCoreService,
quartzScheduler
)
private val baseWorker = Worker(
name = BASE_WORKER_NAME,
status = WorkerStatus.IDLE,
disabledAt = null,
stoppedAt = null,
parallelExecutionEnabled = false,
)
private val triggerDescriptorId1 = UUID.fromString("514fb14b-d7ea-4ace-b200-4e06e80c37b7")
private val triggerDescriptorId2 = UUID.fromString("94d2ef98-6fc9-4d41-a66a-35b73a2448e0")
private val triggerDescriptorId3 = UUID.fromString("2a7719ec-6a65-481e-947b-08537ada2337")
private val currentTriggerDescriptor1 = createTriggerDescriptor(triggerDescriptorId1, TriggerType.CRON, baseWorker)
private val actualTriggerDescriptor1 = createTriggerDescriptor(triggerDescriptorId2, TriggerType.FIXED_RATE, baseWorker)
private val actualTriggerDescriptor2 = createTriggerDescriptor(triggerDescriptorId3, TriggerType.FIXED_RATE, baseWorker)
private val currentTriggers = listOf(currentTriggerDescriptor1)
@BeforeEach
fun setUp() {
doAnswer { currentTriggers }
.`when`(triggerRegistry).getDescriptors()
}
@Test
fun checkSyncWithOnlyNewTriggersWithoutRemoving() {
doAnswer { listOf(currentTriggerDescriptor1, actualTriggerDescriptor1) }
.`when`(triggerDescriptorCoreService).getByWorkerName(anyString())
workerManagerWatcher.sync()
verify(triggerRegistry).remove(emptyList())
verify(scheduleTriggerService).unscheduleTriggers(emptyList())
verify(scheduleTriggerService).scheduleTriggers(listOf(actualTriggerDescriptor1))
}
@Test
fun checkSyncWithOnlyRemoveIrrelevantTriggers() {
doAnswer { emptyList<TriggerDescriptor>() }
.`when`(triggerDescriptorCoreService).getByWorkerName(ArgumentMatchers.anyString())
workerManagerWatcher.sync()
verify(triggerRegistry).remove(currentTriggers)
verify(scheduleTriggerService).unscheduleTriggers(currentTriggers)
verify(scheduleTriggerService).scheduleTriggers(emptyList())
}
@Test
fun checkSyncWithSaveNewAndRemoveIrrelevantTriggers() {
doAnswer { listOf(actualTriggerDescriptor2, actualTriggerDescriptor1) }
.`when`(triggerDescriptorCoreService).getByWorkerName(ArgumentMatchers.anyString())
workerManagerWatcher.sync()
verify(triggerRegistry).remove(currentTriggers)
verify(scheduleTriggerService).unscheduleTriggers(currentTriggers)
verify(scheduleTriggerService).scheduleTriggers(setOf(actualTriggerDescriptor2, actualTriggerDescriptor1).toList())
}
private fun createTriggerDescriptor(id: UUID, type: TriggerType, worker: Worker): TriggerDescriptor {
return TriggerDescriptor(
id = id,
name = id.toString(),
type = type,
expression = "expression",
workerName = worker.name,
disabledAt = null,
deletedAt = null,
)
}
companion object {
private const val BASE_WORKER_NAME = "baseWorker"
}
}

View File

@ -0,0 +1,17 @@
package ru.touchin.spring.workers.manager.utils
import org.mockito.ArgumentMatchers
import org.mockito.Mockito
object MockitoUtils {
fun <T> anyx(matcher: ((T) -> Boolean)? = null): T {
@Suppress("UNCHECKED_CAST")
return if (matcher == null) {
ArgumentMatchers.any() ?: (null as T)
} else {
Mockito.argThat(matcher) ?: (null as T)
}
}
fun <T> anyx(sample: T): T = anyx { it == sample }
}

View File

@ -0,0 +1,3 @@
spring:
config:
import: "test-slow.yml"

View File

@ -0,0 +1,3 @@
spring:
config:
import: "test.yml"

View File

@ -0,0 +1,14 @@
plugins {
id("kotlin")
id("kotlin-spring")
id("maven-publish")
}
dependencies {
implementation(project(":spring-workers-manager-core"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.data:spring-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-web")
}

View File

@ -0,0 +1,60 @@
package ru.touchin.spring.workers.manager.api.trigger.controllers
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.PutMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import ru.touchin.spring.workers.manager.api.trigger.controllers.dto.TriggerChangeRequest
import ru.touchin.spring.workers.manager.api.trigger.services.TriggerDescriptorApiService
import ru.touchin.spring.workers.manager.api.trigger.services.dto.CreateTrigger
import ru.touchin.spring.workers.manager.api.trigger.services.dto.UpdateTrigger
@RestController
@RequestMapping("/workers/{workerName}/triggers")
class TriggerController(
private val triggerDescriptorApiService: TriggerDescriptorApiService
) {
@PostMapping
fun createTrigger(
@PathVariable
workerName: String,
@RequestBody
body: TriggerChangeRequest
) {
triggerDescriptorApiService.create(
CreateTrigger(
workerName = workerName,
triggerName = body.name,
type = body.type,
expression = body.expression,
disabled = body.disabled,
)
)
}
@PutMapping("/{triggerName}")
fun changeTrigger(
@PathVariable
triggerName: String,
@PathVariable
workerName: String,
@RequestBody
body: TriggerChangeRequest
) {
triggerDescriptorApiService.update(
UpdateTrigger(
workerName = workerName,
oldTriggerName = triggerName,
newTriggerName = body.name,
type = body.type,
expression = body.expression,
disabled = body.disabled,
)
)
}
}

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager.api.trigger.controllers.dto
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
data class TriggerChangeRequest(
val name: String,
val type: TriggerType,
val expression: String,
val disabled: Boolean,
)

View File

@ -0,0 +1,11 @@
package ru.touchin.spring.workers.manager.api.trigger.services
import ru.touchin.spring.workers.manager.api.trigger.services.dto.CreateTrigger
import ru.touchin.spring.workers.manager.api.trigger.services.dto.UpdateTrigger
interface TriggerDescriptorApiService {
fun create(create: CreateTrigger)
fun update(update: UpdateTrigger)
}

View File

@ -0,0 +1,61 @@
package ru.touchin.spring.workers.manager.api.trigger.services
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.api.trigger.services.dto.CreateTrigger
import ru.touchin.spring.workers.manager.api.trigger.services.dto.UpdateTrigger
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.exceptions.TriggerNotFoundException
import ru.touchin.spring.workers.manager.core.trigger.services.TriggerDescriptorCoreService
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
import java.time.ZonedDateTime
@Service
class TriggerDescriptorApiServiceImpl(
private val workerCoreService: WorkerCoreService,
private val triggerDescriptorCoreService: TriggerDescriptorCoreService,
) : TriggerDescriptorApiService {
@Transactional
override fun create(create: CreateTrigger) {
val worker = workerCoreService.get(create.workerName)
triggerDescriptorCoreService.create(
CreateTriggerDescriptor(
expression = create.expression,
name = create.triggerName,
type = create.type,
disabledAt = if (create.disabled) ZonedDateTime.now() else null,
workerName = worker.name,
)
)
}
@Transactional
override fun update(update: UpdateTrigger) {
val worker = workerCoreService.get(update.workerName)
val triggerDescriptor = getTriggerDescriptor(worker, update.oldTriggerName)
triggerDescriptorCoreService.setDeleted(triggerDescriptor.id)
triggerDescriptorCoreService.create(
CreateTriggerDescriptor(
expression = update.expression,
name = update.newTriggerName,
type = update.type,
disabledAt = if (update.disabled) ZonedDateTime.now() else null,
workerName = worker.name,
)
)
}
private fun getTriggerDescriptor(worker: Worker, triggerName: String): TriggerDescriptor {
return triggerDescriptorCoreService.getByWorkerName(worker.name)
.firstOrNull { it.name == triggerName }
?: throw TriggerNotFoundException(triggerName)
}
}

View File

@ -0,0 +1,11 @@
package ru.touchin.spring.workers.manager.api.trigger.services.dto
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
data class CreateTrigger(
val workerName: String,
val triggerName: String,
val type: TriggerType,
val expression: String,
val disabled: Boolean,
)

View File

@ -0,0 +1,12 @@
package ru.touchin.spring.workers.manager.api.trigger.services.dto
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
data class UpdateTrigger(
val workerName: String,
val oldTriggerName: String,
val newTriggerName: String,
val type: TriggerType,
val expression: String,
val disabled: Boolean,
)

View File

@ -0,0 +1,65 @@
package ru.touchin.spring.workers.manager.api.worker.controllers
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.WorkerResponse
import ru.touchin.spring.workers.manager.api.worker.services.WorkerApiService
import ru.touchin.spring.workers.manager.core.worker.services.WorkersStateService
@RestController
@RequestMapping("/workers")
class WorkerController(
private val workerStateService: WorkersStateService,
private val workerApiService: WorkerApiService
) {
@GetMapping
fun getWorkers(): List<WorkerResponse> {
return workerApiService.getWorkers()
}
@GetMapping("/{workerName}")
fun getWorker(
@PathVariable
workerName: String
): WorkerResponse {
return workerApiService.getWorker(workerName)
}
@PostMapping("/{workerName}/stop")
fun stop(
@PathVariable
workerName: String
) {
workerStateService.stop(workerName)
}
@PostMapping("/{workerName}/start")
fun start(
@PathVariable
workerName: String
) {
workerStateService.start(workerName)
}
@PostMapping("/{workerName}/disable")
fun disable(
@PathVariable
workerName: String
) {
workerStateService.disable(workerName)
}
@PostMapping("/{workerName}/enable")
fun enable(
@PathVariable
workerName: String
) {
workerStateService.enable(workerName)
}
}

View File

@ -0,0 +1,21 @@
package ru.touchin.spring.workers.manager.api.worker.controllers.dto
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
import java.time.ZonedDateTime
data class WorkerResponse(
val workerName: String,
val stoppedAt: ZonedDateTime?,
val disabledAt: ZonedDateTime?,
val status: WorkerStatus,
val parallelExecutionEnabled: Boolean
)
fun Worker.toWorkerResponse() = WorkerResponse(
workerName = this.name,
stoppedAt = this.stoppedAt,
disabledAt = this.disabledAt,
status = this.status,
parallelExecutionEnabled = this.parallelExecutionEnabled,
)

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager.api.worker.services
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.WorkerResponse
interface WorkerApiService {
fun getWorker(workerName: String): WorkerResponse
fun getWorkers(): List<WorkerResponse>
}

View File

@ -0,0 +1,22 @@
package ru.touchin.spring.workers.manager.api.worker.services
import org.springframework.stereotype.Service
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.WorkerResponse
import ru.touchin.spring.workers.manager.api.worker.controllers.dto.toWorkerResponse
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.services.WorkerCoreService
@Service
class WorkerApiServiceImpl(
private val workerCoreService: WorkerCoreService
) : WorkerApiService {
override fun getWorker(workerName: String): WorkerResponse {
return workerCoreService.get(workerName).toWorkerResponse()
}
override fun getWorkers(): List<WorkerResponse> {
return workerCoreService.getAll().map(Worker::toWorkerResponse)
}
}

View File

@ -0,0 +1 @@
# Touch Spring Workers Manager

View File

@ -0,0 +1,17 @@
plugins {
id("kotlin")
id("kotlin-spring")
id("maven-publish")
}
dependencies {
api(project(":common"))
implementation(project(":common-spring-jpa"))
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.liquibase:liquibase-core")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-quartz")
}

View File

@ -0,0 +1,38 @@
package ru.touchin.spring.workers.manager
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.domain.EntityScan
import org.springframework.boot.context.properties.ConfigurationPropertiesScan
import org.springframework.cache.annotation.EnableCaching
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.data.jpa.repository.config.EnableJpaAuditing
import org.springframework.data.jpa.repository.config.EnableJpaRepositories
import ru.touchin.common.spring.jpa.EnableJpaAuditingExtra
/**
* Configuration which brings to context all the components, required to support workers manager module via annotations.
*
* You could @[org.springframework.context.annotation.Import] this configuration into your application or use [EnableWorkersManager] annotation to do it automatically
*/
@ComponentScan
@EntityScan
@EnableJpaRepositories
@EnableCaching
@ConfigurationPropertiesScan
class WorkersManagerConfiguration {
companion object {
const val SCHEMA: String = "workers"
}
/**
* Applies `@EnableJpaAuditing` only if it was not already applied.
* Enabling `@EnableJpaAuditing` twice will lead to application context failure.
*/
@Configuration
@ConditionalOnMissingBean(name=["jpaAuditingHandler"])
@EnableJpaAuditingExtra
class JpaAuditingNonConflictingDeclaration
}

View File

@ -0,0 +1,34 @@
package ru.touchin.spring.workers.manager.core.config
import liquibase.Contexts
import liquibase.Liquibase
import liquibase.database.Database
import liquibase.database.DatabaseFactory
import liquibase.database.jvm.JdbcConnection
import liquibase.resource.ClassLoaderResourceAccessor
import org.springframework.stereotype.Component
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
import javax.sql.DataSource
@Component
class LiquibaseRunner(
private val dataSource: DataSource
) {
fun run() = dataSource.connection.use { connection ->
val database: Database = DatabaseFactory.getInstance()
.findCorrectDatabaseImplementation(JdbcConnection(connection))
.apply { defaultSchemaName = SCHEMA }
val liquibase = Liquibase(MASTER_CHANGELOG_PATH, ClassLoaderResourceAccessor(), database)
liquibase.changeLogParameters.set("schemaName", SCHEMA)
liquibase.update(Contexts())
}
companion object {
private const val MASTER_CHANGELOG_PATH = "workers/db/changelog/db.changelog-master.yaml"
}
}

View File

@ -0,0 +1,15 @@
package ru.touchin.spring.workers.manager.core.execution.converters
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
import ru.touchin.spring.workers.manager.core.trigger.converters.toTriggerDescriptor
fun ExecutionEntity.toExecution(): Execution {
return Execution(
id = id!!,
triggerDescriptor = triggerDescriptor?.toTriggerDescriptor(),
status = status,
startedAt = startedAt,
finishedAt = finishedAt,
)
}

View File

@ -0,0 +1,14 @@
package ru.touchin.spring.workers.manager.core.execution.dto
import ru.touchin.spring.workers.manager.core.execution.enums.ExecutionStatus
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import java.time.ZonedDateTime
import java.util.*
data class Execution(
val id: UUID,
val triggerDescriptor: TriggerDescriptor?,
val status: ExecutionStatus,
val startedAt: ZonedDateTime?,
val finishedAt: ZonedDateTime?,
)

View File

@ -0,0 +1,8 @@
package ru.touchin.spring.workers.manager.core.execution.enums
enum class ExecutionStatus {
PROCESSING,
FINISHED
}

View File

@ -0,0 +1,6 @@
package ru.touchin.spring.workers.manager.core.execution.exceptions
import ru.touchin.common.exceptions.CommonNotFoundException
import java.util.*
class ExecutionNotFoundException(id: UUID) : CommonNotFoundException("Execution not found id=$id")

View File

@ -0,0 +1,43 @@
package ru.touchin.spring.workers.manager.core.execution.models
import ru.touchin.common.spring.jpa.models.BaseUuidIdEntity
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
import ru.touchin.spring.workers.manager.core.execution.enums.ExecutionStatus
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
import java.time.ZonedDateTime
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.EnumType
import javax.persistence.Enumerated
import javax.persistence.JoinColumn
import javax.persistence.ManyToOne
import javax.persistence.Table
@Entity
@Table(name = "executions", schema = SCHEMA)
class ExecutionEntity : BaseUuidIdEntity() {
@Column(name = "worker_name", nullable = false)
lateinit var workerName: String
@ManyToOne
@JoinColumn(name = "trigger_id", nullable = true)
var triggerDescriptor: TriggerDescriptorEntity? = null
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
lateinit var status: ExecutionStatus
@Column(name = "error_message", nullable = true)
var errorMessage: String? = null
@Column(name = "error_code", nullable = true)
var errorCode: Int? = null
@Column(name = "started_at", nullable = true)
var startedAt: ZonedDateTime? = null
@Column(name = "finished_at", nullable = true)
var finishedAt: ZonedDateTime? = null
}

View File

@ -0,0 +1,14 @@
package ru.touchin.spring.workers.manager.core.execution.repositories
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.data.repository.findByIdOrNull
import ru.touchin.spring.workers.manager.core.execution.exceptions.ExecutionNotFoundException
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
import java.util.*
interface ExecutionRepository : JpaRepository<ExecutionEntity, UUID>
fun ExecutionRepository.findByIdOrThrow(id: UUID): ExecutionEntity {
return findByIdOrNull(id)
?: throw ExecutionNotFoundException(id)
}

View File

@ -0,0 +1,12 @@
package ru.touchin.spring.workers.manager.core.execution.services
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
import ru.touchin.spring.workers.manager.core.execution.services.dto.CreateExecution
import java.util.*
interface ExecutionCoreService{
fun create(create: CreateExecution): Execution
fun setFinished(id: UUID): Execution
}

View File

@ -0,0 +1,49 @@
package ru.touchin.spring.workers.manager.core.execution.services
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.core.execution.converters.toExecution
import ru.touchin.spring.workers.manager.core.execution.dto.Execution
import ru.touchin.spring.workers.manager.core.execution.enums.ExecutionStatus
import ru.touchin.spring.workers.manager.core.execution.models.ExecutionEntity
import ru.touchin.spring.workers.manager.core.execution.repositories.ExecutionRepository
import ru.touchin.spring.workers.manager.core.execution.repositories.findByIdOrThrow
import ru.touchin.spring.workers.manager.core.execution.services.dto.CreateExecution
import ru.touchin.spring.workers.manager.core.trigger.repositories.TriggerDescriptorRepository
import ru.touchin.spring.workers.manager.core.trigger.repositories.findByIdOrThrow
import java.time.ZonedDateTime
import java.util.*
@Service
class ExecutionCoreServiceImpl(
private val executionRepository: ExecutionRepository,
private val triggerDescriptorRepository: TriggerDescriptorRepository,
) : ExecutionCoreService {
@Transactional
override fun create(create: CreateExecution): Execution {
val entity = ExecutionEntity().apply {
startedAt = ZonedDateTime.now()
workerName = create.workerName
triggerDescriptor = triggerDescriptorRepository.findByIdOrThrow(create.triggerId)
status = ExecutionStatus.PROCESSING
}
return executionRepository.save(entity)
.toExecution()
}
@Transactional
override fun setFinished(id: UUID): Execution {
val entity = executionRepository.findByIdOrThrow(id)
entity.apply {
finishedAt = ZonedDateTime.now()
status = ExecutionStatus.FINISHED
}
return executionRepository.save(entity)
.toExecution()
}
}

View File

@ -0,0 +1,8 @@
package ru.touchin.spring.workers.manager.core.execution.services.dto
import java.util.*
data class CreateExecution(
val workerName: String,
val triggerId: UUID,
)

View File

@ -0,0 +1,16 @@
package ru.touchin.spring.workers.manager.core.trigger.converters
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
fun TriggerDescriptorEntity.toTriggerDescriptor(): TriggerDescriptor {
return TriggerDescriptor(
id = id!!,
name = triggerName,
type = type,
expression = expression,
workerName = worker.workerName,
disabledAt = disabledAt,
deletedAt = deletedAt,
)
}

View File

@ -0,0 +1,36 @@
package ru.touchin.spring.workers.manager.core.trigger.dto
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
import java.time.ZonedDateTime
import java.util.*
data class TriggerDescriptor(
val id: UUID,
val name: String,
val type: TriggerType,
val expression: String,
val workerName: String,
val disabledAt: ZonedDateTime?,
val deletedAt: ZonedDateTime?,
) {
fun isDeleted() = deletedAt != null
fun isDisabled() = disabledAt != null
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is TriggerDescriptor) return false
if (id != other.id || expression != other.expression || type != other.type) {
return false
}
return true
}
override fun hashCode(): Int {
return id.hashCode()
}
}

View File

@ -0,0 +1,49 @@
package ru.touchin.spring.workers.manager.core.trigger.enums
enum class TriggerType {
/**
* Uses CRON expression for scheduling.
* Expression example: `0 * * * * *`
*/
CRON,
/**
* Uses numeric expressions, which mean time period in milliseconds between
* end of previous execution and start of the next one.
* ```
* ------------ TIMELINE -------------------
* [ EXECUTION ]............................
* .............< DELAY >...................
* ......................[ NEXT EXECUTION ]
* -----------------------------------------
* ```
*/
FIXED_DELAY,
/**
* Uses numeric expressions, which mean time period in milliseconds between
* start of previous execution and start of the next one.
* ```
* ------------ TIMELINE -------------------
* [ EXECUTION ]............................
* < DELAY >................................
* .........[ NEXT EXECUTION ]..............
* .........< DELAY >.......................
* ..................[ 3RD EXECUTION ].....
* -----------------------------------------
* ```
*/
FIXED_RATE;
companion object {
fun find(name: String?): TriggerType? {
name ?: return null
return values().find { it.name == name }
}
}
}

View File

@ -0,0 +1,11 @@
package ru.touchin.spring.workers.manager.core.trigger.exceptions
import ru.touchin.common.exceptions.CommonNotFoundException
import java.util.*
class TriggerNotFoundException : CommonNotFoundException {
constructor(id: UUID) : super("TriggerDescriptor not found id=$id")
constructor(name: String) : super("TriggerDescriptor code not found name=$name")
}

View File

@ -0,0 +1,40 @@
package ru.touchin.spring.workers.manager.core.trigger.models
import ru.touchin.common.spring.jpa.models.BaseUuidIdEntity
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
import java.time.ZonedDateTime
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.EnumType
import javax.persistence.Enumerated
import javax.persistence.JoinColumn
import javax.persistence.ManyToOne
import javax.persistence.Table
@Entity
@Table(name = "triggers", schema = SCHEMA)
class TriggerDescriptorEntity : BaseUuidIdEntity() {
@Column(name = "trigger_name", nullable = false)
lateinit var triggerName: String
@Enumerated(EnumType.STRING)
@Column(name = "type", nullable = false)
lateinit var type: TriggerType
@ManyToOne
@JoinColumn(name = "worker_name", nullable = false)
lateinit var worker: WorkerEntity
@Column(name = "expression", nullable = false)
lateinit var expression: String
@Column(name = "disabled_at", nullable = true)
var disabledAt: ZonedDateTime? = null
@Column(name = "deleted_at", nullable = true)
var deletedAt: ZonedDateTime? = null
}

View File

@ -0,0 +1,27 @@
package ru.touchin.spring.workers.manager.core.trigger.repositories
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.data.jpa.repository.Query
import org.springframework.data.repository.findByIdOrNull
import ru.touchin.spring.workers.manager.core.trigger.exceptions.TriggerNotFoundException
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
import java.util.*
interface TriggerDescriptorRepository : JpaRepository<TriggerDescriptorEntity, UUID> {
@Query(
"""
SELECT td
FROM TriggerDescriptorEntity td
WHERE td.worker.workerName = :workerName
AND td.deletedAt IS NULL
"""
)
fun findAll(workerName: String): List<TriggerDescriptorEntity>
}
fun TriggerDescriptorRepository.findByIdOrThrow(id: UUID): TriggerDescriptorEntity {
return findByIdOrNull(id)
?: throw TriggerNotFoundException(id)
}

View File

@ -0,0 +1,14 @@
package ru.touchin.spring.workers.manager.core.trigger.services
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import java.util.*
interface TriggerDescriptorCoreService {
fun create(create: CreateTriggerDescriptor): TriggerDescriptor
fun create(create: List<CreateTriggerDescriptor>): List<TriggerDescriptor>
fun setDeleted(id: UUID): TriggerDescriptor
fun getByWorkerName(workerName: String): List<TriggerDescriptor>
}

View File

@ -0,0 +1,68 @@
package ru.touchin.spring.workers.manager.core.trigger.services
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.core.trigger.converters.toTriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
import ru.touchin.spring.workers.manager.core.trigger.repositories.TriggerDescriptorRepository
import ru.touchin.spring.workers.manager.core.trigger.repositories.findByIdOrThrow
import ru.touchin.spring.workers.manager.core.trigger.services.dto.CreateTriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.repositories.WorkerRepository
import ru.touchin.spring.workers.manager.core.worker.repositories.findByNameOrThrow
import java.time.ZonedDateTime
import java.util.*
@Service
class TriggerDescriptorCoreServiceImpl(
private val triggerDescriptorRepository: TriggerDescriptorRepository,
private val workerRepository: WorkerRepository,
) : TriggerDescriptorCoreService {
@Transactional
override fun create(create: CreateTriggerDescriptor): TriggerDescriptor {
val entity = TriggerDescriptorEntity().apply {
expression = create.expression
triggerName = create.name
type = create.type
worker = workerRepository.findByNameOrThrow(create.workerName)
disabledAt = create.disabledAt
}
return triggerDescriptorRepository.save(entity)
.toTriggerDescriptor()
}
@Transactional
override fun create(create: List<CreateTriggerDescriptor>): List<TriggerDescriptor> {
val entities = create.map { dto ->
TriggerDescriptorEntity().apply {
expression = dto.expression
triggerName = dto.name
type = dto.type
worker = workerRepository.findByNameOrThrow(dto.workerName)
disabledAt = dto.disabledAt
}
}
return triggerDescriptorRepository.saveAll(entities)
.map(TriggerDescriptorEntity::toTriggerDescriptor)
}
@Transactional
override fun setDeleted(id: UUID): TriggerDescriptor {
val entity = triggerDescriptorRepository.findByIdOrThrow(id)
entity.apply { deletedAt = ZonedDateTime.now() }
return triggerDescriptorRepository.save(entity)
.toTriggerDescriptor()
}
@Transactional(readOnly = true)
override fun getByWorkerName(workerName: String): List<TriggerDescriptor> {
return triggerDescriptorRepository.findAll(workerName)
.map(TriggerDescriptorEntity::toTriggerDescriptor)
}
}

View File

@ -0,0 +1,12 @@
package ru.touchin.spring.workers.manager.core.trigger.services.dto
import ru.touchin.spring.workers.manager.core.trigger.enums.TriggerType
import java.time.ZonedDateTime
data class CreateTriggerDescriptor(
val name: String,
val type: TriggerType,
val expression: String,
val disabledAt: ZonedDateTime?,
val workerName: String,
)

View File

@ -0,0 +1,16 @@
package ru.touchin.spring.workers.manager.core.worker.converters
import ru.touchin.spring.workers.manager.core.trigger.converters.toTriggerDescriptor
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
fun WorkerEntity.toWorker(): Worker {
return Worker(
name = workerName,
stoppedAt = stoppedAt,
disabledAt = disabledAt,
status = status,
parallelExecutionEnabled = parallelExecutionEnabled,
)
}

View File

@ -0,0 +1,19 @@
package ru.touchin.spring.workers.manager.core.worker.dto
import ru.touchin.spring.workers.manager.core.trigger.dto.TriggerDescriptor
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
import java.time.ZonedDateTime
data class Worker(
val name: String,
val stoppedAt: ZonedDateTime?,
val disabledAt: ZonedDateTime?,
val status: WorkerStatus,
val parallelExecutionEnabled: Boolean,
) {
fun isStopped() = stoppedAt != null
fun isDisabled() = disabledAt != null
}

View File

@ -0,0 +1,8 @@
package ru.touchin.spring.workers.manager.core.worker.enums
enum class WorkerStatus {
IDLE,
PROCESSING
}

View File

@ -0,0 +1,7 @@
package ru.touchin.spring.workers.manager.core.worker.exceptions
import ru.touchin.common.exceptions.CommonNotFoundException
class WorkerNotFoundException(name: String): CommonNotFoundException(
"Worker not found name=$name"
)

View File

@ -0,0 +1,41 @@
package ru.touchin.spring.workers.manager.core.worker.models
import ru.touchin.common.spring.jpa.models.BaseEntity
import ru.touchin.spring.workers.manager.WorkersManagerConfiguration.Companion.SCHEMA
import ru.touchin.spring.workers.manager.core.trigger.models.TriggerDescriptorEntity
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
import java.time.ZonedDateTime
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.EnumType
import javax.persistence.Enumerated
import javax.persistence.FetchType
import javax.persistence.Id
import javax.persistence.OneToMany
import javax.persistence.Table
@Entity
@Table(name = "workers", schema = SCHEMA)
class WorkerEntity : BaseEntity() {
@Id
@Column(name = "worker_name", unique = true)
lateinit var workerName: String
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
lateinit var status: WorkerStatus
@Column(name = "disabled_at", nullable = true)
var disabledAt: ZonedDateTime? = null
@Column(name = "stopped_at", nullable = true)
var stoppedAt: ZonedDateTime? = null
@Column(name = "parallel_execution_enabled", nullable = false)
var parallelExecutionEnabled: Boolean = false
@OneToMany(mappedBy = "worker", orphanRemoval = true, fetch = FetchType.LAZY)
lateinit var triggerDescriptors: Set<TriggerDescriptorEntity>
}

View File

@ -0,0 +1,36 @@
package ru.touchin.spring.workers.manager.core.worker.repositories
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.data.jpa.repository.Lock
import org.springframework.data.jpa.repository.Query
import ru.touchin.spring.workers.manager.core.worker.exceptions.WorkerNotFoundException
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
import javax.persistence.LockModeType
interface WorkerRepository : JpaRepository<WorkerEntity, String> {
@Query(
"""
SELECT w
FROM WorkerEntity w
WHERE w.workerName = :workerName
"""
)
@Lock(LockModeType.PESSIMISTIC_WRITE)
fun findWithLock(workerName: String): WorkerEntity?
@Query(
"""
SELECT w
FROM WorkerEntity w
WHERE w.workerName = :workerName
"""
)
fun findByName(workerName: String): WorkerEntity?
}
fun WorkerRepository.findByNameOrThrow(name: String): WorkerEntity {
return findByName(name)
?: throw WorkerNotFoundException(name)
}

View File

@ -0,0 +1,17 @@
package ru.touchin.spring.workers.manager.core.worker.services
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.exceptions.WorkerNotFoundException
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
import ru.touchin.spring.workers.manager.core.worker.services.dto.UpdateWorker
interface WorkerCoreService {
fun create(name: String): Worker
fun update(update: UpdateWorker): Worker
fun getWithLock(name: String): Worker?
fun get(name: String): Worker = getOrNull(name) ?: throw WorkerNotFoundException(name)
fun getOrNull(name: String): Worker?
fun getAll(): List<Worker>
}

View File

@ -0,0 +1,56 @@
package ru.touchin.spring.workers.manager.core.worker.services
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.core.worker.converters.toWorker
import ru.touchin.spring.workers.manager.core.worker.dto.Worker
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
import ru.touchin.spring.workers.manager.core.worker.repositories.WorkerRepository
import ru.touchin.spring.workers.manager.core.worker.repositories.findByNameOrThrow
import ru.touchin.spring.workers.manager.core.worker.services.dto.UpdateWorker
@Service
class WorkerCoreServiceImpl(
private val workerRepository: WorkerRepository,
) : WorkerCoreService {
@Transactional
override fun create(name: String): Worker {
val entity = WorkerEntity().apply {
workerName = name
status = WorkerStatus.IDLE
}
return workerRepository.save(entity)
.toWorker()
}
@Transactional
override fun update(update: UpdateWorker): Worker {
val entity = workerRepository.findByNameOrThrow(update.name)
.apply { status = update.status }
return workerRepository.save(entity)
.toWorker()
}
@Transactional
override fun getWithLock(name: String): Worker? {
return workerRepository.findWithLock(name)
?.toWorker()
}
@Transactional(readOnly = true)
override fun getOrNull(name: String): Worker? {
return workerRepository.findByName(name)
?.toWorker()
}
@Transactional(readOnly = true)
override fun getAll(): List<Worker> {
return workerRepository.findAll()
.map(WorkerEntity::toWorker)
}
}

View File

@ -0,0 +1,51 @@
package ru.touchin.spring.workers.manager.core.worker.services
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import ru.touchin.spring.workers.manager.core.worker.models.WorkerEntity
import ru.touchin.spring.workers.manager.core.worker.repositories.WorkerRepository
import java.time.ZonedDateTime
@Service
class WorkerStateServiceImpl(
private val workerRepository: WorkerRepository
) : WorkersStateService {
@Transactional
override fun stop(name: String) {
updateWorkerWithLock(name) {
it.stoppedAt = ZonedDateTime.now()
}
}
@Transactional
override fun start(name: String) {
updateWorkerWithLock(name) {
it.stoppedAt = null
}
}
@Transactional
override fun disable(name: String) {
updateWorkerWithLock(name) {
it.disabledAt = ZonedDateTime.now()
}
}
@Transactional
override fun enable(name: String) {
updateWorkerWithLock(name) {
it.disabledAt = null
}
}
private fun updateWorkerWithLock(name: String, updater: (WorkerEntity) -> Unit) {
val worker = workerRepository.findWithLock(name)
?.apply(updater)
?: throw NoSuchElementException("Worker with name $name not found")
workerRepository.save(worker)
}
}

View File

@ -0,0 +1,10 @@
package ru.touchin.spring.workers.manager.core.worker.services
interface WorkersStateService {
fun stop(name: String)
fun start(name: String)
fun disable(name: String)
fun enable(name: String)
}

View File

@ -0,0 +1,8 @@
package ru.touchin.spring.workers.manager.core.worker.services.dto
import ru.touchin.spring.workers.manager.core.worker.enums.WorkerStatus
data class UpdateWorker(
val name: String,
val status: WorkerStatus,
)

View File

@ -0,0 +1,165 @@
databaseChangeLog:
- changeSet:
id: create-table-workers
author: sonkate
preConditions:
- onFail: MARK_RAN
not:
tableExists:
tableName: workers
changes:
- createTable:
tableName: workers
columns:
- column:
name: worker_name
type: VARCHAR(250)
constraints:
nullable: false
primaryKey: true
primaryKeyName: PK_WORKERS
- column:
name: status
type: VARCHAR(250)
constraints:
nullable: false
- column:
name: parallel_execution_enabled
type: BOOLEAN
defaultValueBoolean: false
constraints:
nullable: false
- column:
name: created_at
type: TIMESTAMP WITH TIME ZONE
defaultValueDate: CURRENT_TIMESTAMP
constraints:
nullable: false
- column:
name: updated_at
type: TIMESTAMP WITH TIME ZONE
- column:
name: stopped_at
type: TIMESTAMP WITH TIME ZONE
- column:
name: disabled_at
type: TIMESTAMP WITH TIME ZONE
- changeSet:
id: create-table-triggers
author: sonkate
preConditions:
- onFail: MARK_RAN
not:
tableExists:
tableName: triggers
changes:
- createTable:
tableName: triggers
columns:
- column:
name: id
type: UUID
constraints:
nullable: false
primaryKey: true
primaryKeyName: PK_TRIGGERS
- column:
name: worker_name
type: VARCHAR(250)
constraints:
nullable: false
foreignKeyName: fk_trigger_worker
references: workers(worker_name)
- column:
name: type
type: VARCHAR(250)
constraints:
nullable: false
- column:
name: trigger_name
type: VARCHAR(250)
constraints:
nullable: false
- column:
name: expression
type: VARCHAR(250)
constraints:
nullable: false
- column:
name: created_at
type: TIMESTAMP WITH TIME ZONE
defaultValueDate: CURRENT_TIMESTAMP
constraints:
nullable: false
- column:
name: updated_at
type: TIMESTAMP WITH TIME ZONE
- column:
name: deleted_at
type: TIMESTAMP WITH TIME ZONE
- column:
name: disabled_at
type: TIMESTAMP WITH TIME ZONE
- changeSet:
id: create-table-executions
author: sonkate
preConditions:
- onFail: MARK_RAN
not:
tableExists:
tableName: executions
changes:
- createTable:
tableName: executions
columns:
- column:
name: id
type: UUID
constraints:
nullable: false
primaryKey: true
primaryKeyName: PK_EXECUTIONS
- column:
name: worker_name
type: VARCHAR(250)
constraints:
nullable: false
- column:
name: trigger_id
type: UUID
- column:
constraints:
nullable: false
name: status
type: VARCHAR(250)
- column:
name: error_message
type: VARCHAR(250)
- column:
name: error_code
type: int
- column:
name: created_at
type: TIMESTAMP WITH TIME ZONE
defaultValueDate: CURRENT_TIMESTAMP
constraints:
nullable: false
- column:
name: updated_at
type: TIMESTAMP WITH TIME ZONE
- column:
name: started_at
type: TIMESTAMP WITH TIME ZONE
- column:
name: finished_at
type: TIMESTAMP WITH TIME ZONE
- changeSet:
id: create-index-for-executions
author: sonkate
changes:
- sql:
sql: |
CREATE INDEX fk_trigger_worker_idx
ON ${schemaName}.triggers (worker_name ASC)
WHERE deleted_at IS NULL;
endDelimeter: ;