diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index e974fa7faefbbddfd62f317ab1c031388f430d1c..57da925a5ae0883d5d4ca71469fe05fb21617fe2 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -22,6 +22,10 @@ class TaskManager : CoroutineScope { // TODO: this could be dangerous to give it unlimited memory private val tasks = Channel<String>(Channel.UNLIMITED) private val numCores = Runtime.getRuntime().availableProcessors() + // use a fixed size thread pool context with numCores threads + private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker") + + private val logger = getLogger(javaClass) @PostConstruct @@ -33,7 +37,8 @@ class TaskManager : CoroutineScope { } } - private fun launchWorker(id: Int) = launch { + private fun launchWorker(id: Int) = launch(threadPoolContext) { + for (task in tasks) { launch { logger.debug("[${Thread.currentThread().name}] Processor #$id received $task") @@ -57,6 +62,7 @@ class TaskManager : CoroutineScope { @PreDestroy fun stop() { job.cancel() + threadPoolContext.close() } }