diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt index bce20555dc5225a1fbb759fbbee03a1770961f3a..79bb629da319043501a79aa88a734a61ab8f2159 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt @@ -1,6 +1,8 @@ package de.fraunhofer.iem.dataprovider.git +import de.fraunhofer.iem.dataprovider.taskManager.Task import de.fraunhofer.iem.dataprovider.taskManager.TaskManager +import de.fraunhofer.iem.dataprovider.taskManager.TaskType import org.springframework.stereotype.Service @Service @@ -10,7 +12,8 @@ class GitService(private val taskManager: TaskManager) { suspend fun cloneGit() { println("clone git called") repeat(20) { - taskManager.addTask("Task no $it") + val t = Task(TaskType.REPO_CHANGED, it) + taskManager.addTask(t) } // val git: Git = Git.cloneRepository() // .setURI("https://github.com/eclipse/jgit.git") diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index 795e8be7476275c331c1e782d1498a391b7a3b54..4505b69d59e28b5fa46b7614e70a50ce691f4996 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -1,87 +1,53 @@ package de.fraunhofer.iem.dataprovider.taskManager import de.fraunhofer.iem.dataprovider.logger.getLogger -import jakarta.annotation.PostConstruct -import jakarta.annotation.PreDestroy import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import org.springframework.stereotype.Component -import kotlin.coroutines.CoroutineContext enum class TaskType{ - REPO_CHANGED + REPO_CHANGED, + DONE } data class Task(val type: TaskType, val payload: Any) @Component -class TaskManager : CoroutineScope { +class TaskManager : Worker("Task Manager") { - private val job = SupervisorJob() - override val coroutineContext: CoroutineContext - get() = Dispatchers.Default + job + private val gitWorker = GitWorker(tasks) - // TODO: this could be dangerous to give it unlimited memory - private val tasks = Channel<Task>(Channel.UNLIMITED) - private val numCores = Runtime.getRuntime().availableProcessors() - // use a fixed size thread pool context with numCores threads -// private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker") - - private val gitWorker = GitWorker() - - private val logger = getLogger(javaClass) - - @PostConstruct - fun startWorkers() { - logger.info("Starting $numCores workers.") - repeat(numCores-1) { - logger.info("[${Thread.currentThread().name}] launching coroutine") - val j = launchWorker(it) - } - } - - private fun launchWorker(id: Int) = launch { - - for (task in tasks) { - when (task.type) { - TaskType.REPO_CHANGED -> { - logger.info("[${Thread.currentThread().name}] add task called in $id") - gitWorker.addTask(task) - logger.info("[${Thread.currentThread().name}] add task finished in $id") - } + override suspend fun executeTask(task: Task) { + when (task.type) { + TaskType.REPO_CHANGED -> { + logger.info("[${Thread.currentThread().name}] add task called in ${task.payload}") + gitWorker.addTask(task) + logger.info("[${Thread.currentThread().name}] add task finished in ${task.payload}") + } + TaskType.DONE -> { + logger.info("${Thread.currentThread().name}] Task done ${task.payload}") } } } - - - suspend fun addTask(task: String) { - tasks.send(Task(TaskType.REPO_CHANGED, task)) - } - - @PreDestroy - fun stop() { - job.cancel() - } } -class GitWorker: Worker() { +class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker") { + override suspend fun executeTask(task: Task) { // simulate a long running task logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker") delay(5000) + responseChannel.send(Task(TaskType.DONE, "Task with payload ${task.payload} finished")) logger.info("[${Thread.currentThread().name}] Finished task ${task.payload} in gitworker") } } -abstract class Worker: IWorker, CoroutineScope { - - private val job = SupervisorJob() - override val coroutineContext: CoroutineContext - get() = Dispatchers.Default + job +abstract class Worker(private val name: String): IWorker { // TODO: this could be dangerous to give it unlimited memory - private val tasks = Channel<Task>(Channel.UNLIMITED) + protected val tasks = Channel<Task>(Channel.UNLIMITED) private val numCores = Runtime.getRuntime().availableProcessors() // use a fixed size thread pool context with numCores threads - private val threadPoolContext = newFixedThreadPoolContext(numCores, "Worker") + private val threadPoolContext = newFixedThreadPoolContext(numCores, name) + private val coroutineScope = CoroutineScope(threadPoolContext) protected val logger = getLogger(javaClass) override suspend fun addTask(task: Task) { @@ -90,32 +56,37 @@ abstract class Worker: IWorker, CoroutineScope { init { logger.info("Starting $numCores workers.") - repeat(numCores-1) { - logger.info("[${Thread.currentThread().name}] launching coroutine") + repeat(numCores) { + logger.info("[${Thread.currentThread().name}] launching coroutine in $name") launchWorker(it) } } - private fun launchWorker(id: Int) = launch(threadPoolContext) { + private fun launchWorker(id: Int) = coroutineScope.launch { for (task in tasks) { - launch { - logger.debug("[${Thread.currentThread().name}] Processor #$id received $task") + coroutineScope.launch { + logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task") executeTask(task) - logger.debug("[${Thread.currentThread().name}] Processor #$id finished $task") + logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task") } } } override fun close() { - threadPoolContext.close() + coroutineScope.launch { + tasks.close() + for (job in coroutineContext[Job]!!.children) { + job.join() + } + threadPoolContext.close() + logger.info("Worker $name has been closed") + } } } interface IWorker { suspend fun addTask(task: Task) suspend fun executeTask(task: Task) - fun close() - } \ No newline at end of file