package de.fraunhofer.iem.dataprovider.taskManager import de.fraunhofer.iem.dataprovider.logger.getLogger import jakarta.annotation.PostConstruct import jakarta.annotation.PreDestroy import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import org.springframework.stereotype.Component import kotlin.coroutines.CoroutineContext enum class TaskType{ REPO_CHANGED } data class Task(val type: TaskType, val payload: Any) @Component class TaskManager : CoroutineScope { private val job = SupervisorJob() override val coroutineContext: CoroutineContext get() = Dispatchers.Default + job // TODO: this could be dangerous to give it unlimited memory private val tasks = Channel<Task>(Channel.UNLIMITED) private val numCores = Runtime.getRuntime().availableProcessors() // use a fixed size thread pool context with numCores threads // private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker") private val gitWorker = GitWorker() private val logger = getLogger(javaClass) @PostConstruct fun startWorkers() { logger.info("Starting $numCores workers.") repeat(numCores-1) { logger.info("[${Thread.currentThread().name}] launching coroutine") val j = launchWorker(it) } } private fun launchWorker(id: Int) = launch { for (task in tasks) { when (task.type) { TaskType.REPO_CHANGED -> { logger.info("[${Thread.currentThread().name}] add task called in $id") gitWorker.addTask(task) logger.info("[${Thread.currentThread().name}] add task finished in $id") } } } } suspend fun addTask(task: String) { tasks.send(Task(TaskType.REPO_CHANGED, task)) } @PreDestroy fun stop() { job.cancel() } } class GitWorker: Worker() { override suspend fun executeTask(task: Task) { // simulate a long running task logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker") delay(5000) logger.info("[${Thread.currentThread().name}] Finished task ${task.payload} in gitworker") } } abstract class Worker: IWorker, CoroutineScope { private val job = SupervisorJob() override val coroutineContext: CoroutineContext get() = Dispatchers.Default + job // TODO: this could be dangerous to give it unlimited memory private val tasks = Channel<Task>(Channel.UNLIMITED) private val numCores = Runtime.getRuntime().availableProcessors() // use a fixed size thread pool context with numCores threads private val threadPoolContext = newFixedThreadPoolContext(numCores, "Worker") protected val logger = getLogger(javaClass) override suspend fun addTask(task: Task) { tasks.send(task) } init { logger.info("Starting $numCores workers.") repeat(numCores-1) { logger.info("[${Thread.currentThread().name}] launching coroutine") launchWorker(it) } } private fun launchWorker(id: Int) = launch(threadPoolContext) { for (task in tasks) { launch { logger.debug("[${Thread.currentThread().name}] Processor #$id received $task") executeTask(task) logger.debug("[${Thread.currentThread().name}] Processor #$id finished $task") } } } override fun close() { threadPoolContext.close() } } interface IWorker { suspend fun addTask(task: Task) suspend fun executeTask(task: Task) fun close() }