package de.fraunhofer.iem.dataprovider.taskManager import de.fraunhofer.iem.dataprovider.gitlab.GitConfiguration import de.fraunhofer.iem.dataprovider.logger.getLogger import de.fraunhofer.iem.dataprovider.taskManager.tasks.CloneGitTask import de.fraunhofer.iem.dataprovider.taskManager.tasks.GetGitlabProjectTask import de.fraunhofer.iem.dataprovider.taskManager.tasks.GitProject import de.fraunhofer.iem.dataprovider.taskManager.tasks.OdcTask import jakarta.annotation.PreDestroy import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ObsoleteCoroutinesApi import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.actor import org.springframework.stereotype.Component import java.util.* sealed class Event class RepoChangedEvent(val gitConfiguration: GitConfiguration, val repoId: Long) : Event() sealed class TaskDone : Event() { abstract val taskId: UUID } class GitCloneDone(override val taskId: UUID, val outputDirectory: String) : TaskDone() class GetGitlabProjectDone(override val taskId: UUID, val gitProject: GitProject) : TaskDone() class ProcessTaskDone(override val taskId: UUID, val message: String) : TaskDone() /** * The Task Manager takes tasks and distributes them to * underlying workers. Internally it uses a channel * to manage incoming tasks. */ @Component class TaskManager(private val config: Config) { // The used default dispatcher is ok for CPU-bound workloads. However, // if they block for a long time it's better to use a custom thread // pool solution. private val worker = Worker("Worker") // Should be used for long-running tasks, which DON'T use CPU power and // are non-blocking private val ioWorker = Worker("IO-Worker", CoroutineScope(Dispatchers.IO)) // Should be used for long-running CPU heavy tasks, which potentially block. // private val customWorker = getCustomThreadpoolWorker() private val mainScope = CoroutineScope(Dispatchers.Default) private val actor: SendChannel<Event> = mainScope.taskActor() suspend fun addEvent(event: Event) { actor.send(event) } @OptIn(ObsoleteCoroutinesApi::class) // This API is meant to be improved since 2017... so this should be good to use. // IntelliJ might tell u that it is ok to remove <ITask> from the actor declaration. // IT IS NOT! this will lead to very strange gradle compile errors! private fun CoroutineScope.taskActor() = actor<Event> { with(MyActor()) { for (task in channel) { onReceive(task) } } } @PreDestroy fun close() { actor.close() worker.close() ioWorker.close() } /** * The state in this internal class is thread safe. * See https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html#actors * It is used to manage e.g. fixed task sequences, as well as react to general events. */ private inner class MyActor { private val logger = getLogger(javaClass) suspend fun onReceive(event: Event) { logger.info("[${Thread.currentThread().name}] add task called in Task Manager $event") when (event) { is RepoChangedEvent -> { ioWorker.addTask(GetGitlabProjectTask(event.repoId, event.gitConfiguration, ::addEvent)) } is GetGitlabProjectDone -> { worker.addTask( CloneGitTask( event.gitProject, ::addEvent, config.gitProjectPath ) ) } is GitCloneDone -> { worker.addTask(OdcTask(event.outputDirectory, event.outputDirectory, ::addEvent)) } else -> { logger.info("Received event without special handling associated $event") } } logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $event") } } }