diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index 4505b69d59e28b5fa46b7614e70a50ce691f4996..1f40ff8d78a36787b7ada7473454626ddbe4e10a 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -1,6 +1,7 @@ package de.fraunhofer.iem.dataprovider.taskManager import de.fraunhofer.iem.dataprovider.logger.getLogger +import jakarta.annotation.PreDestroy import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import org.springframework.stereotype.Component @@ -10,11 +11,21 @@ enum class TaskType{ DONE } data class Task(val type: TaskType, val payload: Any) + +/** + * The Task Manager takes tasks and distributes them to + * underlying workers. Internally it uses a channel + * to manage incoming tasks. + */ @Component class TaskManager : Worker("Task Manager") { private val gitWorker = GitWorker(tasks) + /** + * Used to distribute incoming tasks to the fitting workers + * depending on the task's type. + */ override suspend fun executeTask(task: Task) { when (task.type) { TaskType.REPO_CHANGED -> { @@ -27,10 +38,19 @@ class TaskManager : Worker("Task Manager") { } } } + + @PreDestroy + private fun destruct() { + gitWorker.close() + close() + } } class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker") { + /** + * Performs git related tasks, like cloning the given repository. + */ override suspend fun executeTask(task: Task) { // simulate a long running task logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker") @@ -40,16 +60,24 @@ class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker" } } + +/** + * Abstract worker class used as a base for multithreaded worker. + * It uses a fixed thread pool as a base for the coroutineScope. + * The size of the thread pool is equal to the number of cores. + */ abstract class Worker(private val name: String): IWorker { // TODO: this could be dangerous to give it unlimited memory protected val tasks = Channel<Task>(Channel.UNLIMITED) private val numCores = Runtime.getRuntime().availableProcessors() // use a fixed size thread pool context with numCores threads + @OptIn(DelicateCoroutinesApi::class) private val threadPoolContext = newFixedThreadPoolContext(numCores, name) private val coroutineScope = CoroutineScope(threadPoolContext) protected val logger = getLogger(javaClass) + override suspend fun addTask(task: Task) { tasks.send(task) } @@ -62,8 +90,14 @@ abstract class Worker(private val name: String): IWorker { } } + /** + * Wrapper function to execute tasks provided through the channel. + * We use a dedicated number of coroutines to watch the tasks channel. + * Each coroutine starts a new coroutine to execute the task. This is + * done so that the outer coroutines are not blocked by the potentially + * very long-running inner task performed by executeTask. + */ private fun launchWorker(id: Int) = coroutineScope.launch { - for (task in tasks) { coroutineScope.launch { logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task") @@ -86,7 +120,20 @@ abstract class Worker(private val name: String): IWorker { } interface IWorker { + /** + * Adds a new task to the worker's internal tasks channel. + */ suspend fun addTask(task: Task) + + /** + * Worker dependent function describing the worker's + * handling of the task. + * This function is automatically called. + */ suspend fun executeTask(task: Task) + + /** + * Function used to close the underlying resources. + */ fun close() } \ No newline at end of file