diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index 57da925a5ae0883d5d4ca71469fe05fb21617fe2..795e8be7476275c331c1e782d1498a391b7a3b54 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -20,11 +20,12 @@ class TaskManager : CoroutineScope { get() = Dispatchers.Default + job // TODO: this could be dangerous to give it unlimited memory - private val tasks = Channel<String>(Channel.UNLIMITED) + private val tasks = Channel<Task>(Channel.UNLIMITED) private val numCores = Runtime.getRuntime().availableProcessors() // use a fixed size thread pool context with numCores threads - private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker") +// private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker") + private val gitWorker = GitWorker() private val logger = getLogger(javaClass) @@ -33,36 +34,88 @@ class TaskManager : CoroutineScope { logger.info("Starting $numCores workers.") repeat(numCores-1) { logger.info("[${Thread.currentThread().name}] launching coroutine") - launchWorker(it) + val j = launchWorker(it) } } - private fun launchWorker(id: Int) = launch(threadPoolContext) { + private fun launchWorker(id: Int) = launch { for (task in tasks) { - launch { - logger.debug("[${Thread.currentThread().name}] Processor #$id received $task") - executeTask(task) - logger.debug("[${Thread.currentThread().name}] Processor #$id finished $task") + when (task.type) { + TaskType.REPO_CHANGED -> { + logger.info("[${Thread.currentThread().name}] add task called in $id") + gitWorker.addTask(task) + logger.info("[${Thread.currentThread().name}] add task finished in $id") + } } } } - private suspend fun executeTask(task: String) { + + suspend fun addTask(task: String) { + tasks.send(Task(TaskType.REPO_CHANGED, task)) + } + + @PreDestroy + fun stop() { + job.cancel() + } +} + +class GitWorker: Worker() { + override suspend fun executeTask(task: Task) { // simulate a long running task + logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker") delay(5000) + logger.info("[${Thread.currentThread().name}] Finished task ${task.payload} in gitworker") } - fun addTask(task: String) { - launch { - tasks.send(task) +} +abstract class Worker: IWorker, CoroutineScope { + + private val job = SupervisorJob() + override val coroutineContext: CoroutineContext + get() = Dispatchers.Default + job + + // TODO: this could be dangerous to give it unlimited memory + private val tasks = Channel<Task>(Channel.UNLIMITED) + private val numCores = Runtime.getRuntime().availableProcessors() + // use a fixed size thread pool context with numCores threads + private val threadPoolContext = newFixedThreadPoolContext(numCores, "Worker") + + protected val logger = getLogger(javaClass) + override suspend fun addTask(task: Task) { + tasks.send(task) + } + + init { + logger.info("Starting $numCores workers.") + repeat(numCores-1) { + logger.info("[${Thread.currentThread().name}] launching coroutine") + launchWorker(it) } } - @PreDestroy - fun stop() { - job.cancel() - threadPoolContext.close() + private fun launchWorker(id: Int) = launch(threadPoolContext) { + + for (task in tasks) { + launch { + logger.debug("[${Thread.currentThread().name}] Processor #$id received $task") + executeTask(task) + logger.debug("[${Thread.currentThread().name}] Processor #$id finished $task") + } + } } + override fun close() { + threadPoolContext.close() + } } + +interface IWorker { + suspend fun addTask(task: Task) + suspend fun executeTask(task: Task) + + fun close() + +} \ No newline at end of file