diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index d5b2b63483349c19a939369a561d58eb1990b1b0..d45f6b7492a7c34edcfe7105af79e79de78e6de3 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -2,7 +2,8 @@ package de.fraunhofer.iem.dataprovider.taskManager import de.fraunhofer.iem.dataprovider.logger.getLogger import jakarta.annotation.PreDestroy -import kotlinx.coroutines.delay +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.actor import org.springframework.stereotype.Component enum class TaskType { @@ -11,7 +12,7 @@ enum class TaskType { } interface Task { val type: TaskType - suspend fun execute(): Unit + suspend fun execute() } class GitTask( @@ -48,16 +49,34 @@ class DoneTask(private val message: String? = null): Task { @Component class TaskManager: IWorker { + // The used default dispatcher is ok for CPU-bound workloads. However, + // if they block for a long time it's better to use a custom thread + // pool solution. private val worker = Worker("Worker") - private val logger = getLogger(javaClass) + // Should be used for long-running tasks, which DON'T use CPU power and + // are non-blocking + // private val ioWorker = Worker("IO-Worker", CoroutineScope(Dispatchers.IO)) + // Should be used for long-running CPU heavy tasks, which potentially block. + // private val customWorker = getCustomThreadpoolWorker() + private val logger = getLogger(javaClass) + private val mainScope = CoroutineScope(Dispatchers.Default) + private val actor = mainScope.taskActor() override suspend fun addTask(task: Task) { - if (task.type == TaskType.DONE) { - task.execute() + actor.send(task) + } + + private fun CoroutineScope.taskActor() = actor<Task> { + for (task in channel) { + // For test purposes we have the Done task, which prints a control + // message printed in the task manager. + if (task.type == TaskType.DONE) { + task.execute() + } + logger.info("[${Thread.currentThread().name}] add task called in Task Manager $task") + worker.addTask(task) + logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $task") } - logger.info("[${Thread.currentThread().name}] add task called in Task Manager $task") - worker.addTask(task) - logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $task") } @PreDestroy @@ -65,5 +84,3 @@ class TaskManager: IWorker { worker.close() } } - - diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt index 708d5ed49850c7637a428f9b8405d3638bd0d63f..e8c8d0586d1a15ed8c22778f628ef62aa6df725f 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt @@ -18,20 +18,30 @@ interface IWorker { fun close() } + +fun getCustomThreadpoolWorker(): Worker { + val numCores = Runtime.getRuntime().availableProcessors() + // use a fixed size thread pool context with numCores threads + // TODO: Threadpool needs to be closed + @OptIn(DelicateCoroutinesApi::class) + val threadPoolContext = newFixedThreadPoolContext(numCores, "CustomThreadWorkerPool") + val coroutineScope = CoroutineScope(threadPoolContext) + return Worker("Custom-ThreadPool-Worker", coroutineScope) +} + /** * Abstract worker class used as a base for multithreaded worker. * It uses a fixed thread pool as a base for the coroutineScope. * The size of the thread pool is equal to the number of cores. */ -class Worker(private val name: String): IWorker { +class Worker(private val name: String, + private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.Default), + numberOfWorkers: Int = Runtime.getRuntime().availableProcessors() +): IWorker { // TODO: this could be dangerous to give it unlimited memory private val tasks = Channel<Task>(Channel.UNLIMITED) - private val numCores = Runtime.getRuntime().availableProcessors() - // use a fixed size thread pool context with numCores threads - @OptIn(DelicateCoroutinesApi::class) - private val threadPoolContext = newFixedThreadPoolContext(numCores, name) - private val coroutineScope = CoroutineScope(threadPoolContext) + private val logger = getLogger(javaClass) @@ -40,8 +50,8 @@ class Worker(private val name: String): IWorker { } init { - logger.info("Starting $numCores workers.") - repeat(numCores) { + logger.info("Starting $numberOfWorkers workers.") + repeat(numberOfWorkers) { logger.info("[${Thread.currentThread().name}] launching coroutine in $name") launchWorker(it) } @@ -71,7 +81,6 @@ class Worker(private val name: String): IWorker { for (job in coroutineContext[Job]!!.children) { job.join() } - threadPoolContext.close() logger.info("Worker $name has been closed") } }