Newer
Older
package de.fraunhofer.iem.dataprovider.taskManager
import de.fraunhofer.iem.dataprovider.gitlab.GitConfiguration
import de.fraunhofer.iem.dataprovider.logger.getLogger
import de.fraunhofer.iem.dataprovider.taskManager.tasks.CloneGitTask
import de.fraunhofer.iem.dataprovider.taskManager.tasks.GetGitlabProjectTask
import de.fraunhofer.iem.dataprovider.taskManager.tasks.GitProject
import de.fraunhofer.iem.dataprovider.taskManager.tasks.OdcTask
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ObsoleteCoroutinesApi

Jan-Niclas Strüwer
committed
import kotlinx.coroutines.channels.SendChannel
import org.springframework.stereotype.Component
import java.util.*
sealed class Event
class RepoChangedEvent(val gitConfiguration: GitConfiguration, val repoId: Long) : Event()
sealed class TaskDone : Event() {
abstract val taskId: UUID
}
class GitCloneDone(override val taskId: UUID, val outputDirectory: String) : TaskDone()
class GetGitlabProjectDone(override val taskId: UUID, val gitProject: GitProject) : TaskDone()
class ProcessTaskDone(override val taskId: UUID, val message: String) : TaskDone()
* The Task Manager takes tasks and distributes them to
* underlying workers. Internally it uses a channel
* to manage incoming tasks.
class TaskManager(private val config: Config) {
// The used default dispatcher is ok for CPU-bound workloads. However,
// if they block for a long time it's better to use a custom thread
// pool solution.
private val worker = Worker("Worker")
// Should be used for long-running tasks, which DON'T use CPU power and
// are non-blocking
private val ioWorker = Worker("IO-Worker", CoroutineScope(Dispatchers.IO))
// Should be used for long-running CPU heavy tasks, which potentially block.
// private val customWorker = getCustomThreadpoolWorker()
private val mainScope = CoroutineScope(Dispatchers.Default)
private val actor: SendChannel<Event> = mainScope.taskActor()

Jan-Niclas Strüwer
committed
suspend fun addEvent(event: Event) {
actor.send(event)

Jan-Niclas Strüwer
committed
@OptIn(ObsoleteCoroutinesApi::class) // This API is meant to be improved since 2017... so this should be good to use.
// IntelliJ might tell u that it is ok to remove <ITask> from the actor declaration.
// IT IS NOT! this will lead to very strange gradle compile errors!
private fun CoroutineScope.taskActor() = actor<Event> {

Jan-Niclas Strüwer
committed
with(MyActor()) {
for (task in channel) {
onReceive(task)
}
}
}
@PreDestroy

Jan-Niclas Strüwer
committed
actor.close()
worker.close()
ioWorker.close()
}
/**
* The state in this internal class is thread safe.
* See https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html#actors
* It is used to manage e.g. fixed task sequences, as well as react to general events.

Jan-Niclas Strüwer
committed
*/
private inner class MyActor {
private val logger = getLogger(javaClass)
suspend fun onReceive(event: Event) {
logger.info("[${Thread.currentThread().name}] add task called in Task Manager $event")
when (event) {
is RepoChangedEvent -> {
ioWorker.addTask(GetGitlabProjectTask(event.repoId, event.gitConfiguration, ::addEvent))
is GetGitlabProjectDone -> {
worker.addTask(
CloneGitTask(
event.gitProject,
::addEvent,
is GitCloneDone -> {
worker.addTask(OdcTask(event.outputDirectory, event.outputDirectory, ::addEvent))
}
else -> {
logger.info("Received event without special handling associated $event")
logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $event")