From b1dd3a41d2271ed32689531739057d4ebe6e61f9 Mon Sep 17 00:00:00 2001 From: Jan-Niclas Struewer <j.n.struewer@gmail.com> Date: Tue, 25 Apr 2023 12:16:10 +0200 Subject: [PATCH] restructured tasks, worker, and task manager --- .../dataprovider/DataProviderApplication.kt | 1 - .../iem/dataprovider/git/GitService.kt | 10 +- .../dataprovider/taskManager/TaskManager.kt | 150 +++++------------- .../iem/dataprovider/taskManager/Worker.kt | 78 +++++++++ 4 files changed, 122 insertions(+), 117 deletions(-) create mode 100644 src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/DataProviderApplication.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/DataProviderApplication.kt index dbc39208..ed3c240b 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/DataProviderApplication.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/DataProviderApplication.kt @@ -8,6 +8,5 @@ import org.springframework.boot.runApplication class DataProviderApplication fun main(args: Array<String>) { - println("Started app [${Thread.currentThread().name}]") runApplication<DataProviderApplication>(*args) } diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt index 79bb629d..1b40b291 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt @@ -1,5 +1,6 @@ package de.fraunhofer.iem.dataprovider.git +import de.fraunhofer.iem.dataprovider.taskManager.GitTask import de.fraunhofer.iem.dataprovider.taskManager.Task import de.fraunhofer.iem.dataprovider.taskManager.TaskManager import de.fraunhofer.iem.dataprovider.taskManager.TaskType @@ -8,16 +9,13 @@ import org.springframework.stereotype.Service @Service class GitService(private val taskManager: TaskManager) { + val list = listOf("https://github.com/eclipse/jgit.git", "https://github.com/secure-software-engineering/phasar", "https://github.com/secure-software-engineering/FlowDroid") suspend fun cloneGit() { println("clone git called") - repeat(20) { - val t = Task(TaskType.REPO_CHANGED, it) + for (e in list) { + val t = GitTask(TaskType.REPO_CHANGED, e, taskManager::addTask) taskManager.addTask(t) } - // val git: Git = Git.cloneRepository() -// .setURI("https://github.com/eclipse/jgit.git") -// .setDirectory("/path/to/repo") -// .call() } } \ No newline at end of file diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index 1f40ff8d..d5b2b634 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -2,138 +2,68 @@ package de.fraunhofer.iem.dataprovider.taskManager import de.fraunhofer.iem.dataprovider.logger.getLogger import jakarta.annotation.PreDestroy -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay import org.springframework.stereotype.Component -enum class TaskType{ +enum class TaskType { REPO_CHANGED, DONE } -data class Task(val type: TaskType, val payload: Any) - -/** - * The Task Manager takes tasks and distributes them to - * underlying workers. Internally it uses a channel - * to manage incoming tasks. - */ -@Component -class TaskManager : Worker("Task Manager") { - - private val gitWorker = GitWorker(tasks) - - /** - * Used to distribute incoming tasks to the fitting workers - * depending on the task's type. - */ - override suspend fun executeTask(task: Task) { - when (task.type) { - TaskType.REPO_CHANGED -> { - logger.info("[${Thread.currentThread().name}] add task called in ${task.payload}") - gitWorker.addTask(task) - logger.info("[${Thread.currentThread().name}] add task finished in ${task.payload}") - } - TaskType.DONE -> { - logger.info("${Thread.currentThread().name}] Task done ${task.payload}") - } - } - } - - @PreDestroy - private fun destruct() { - gitWorker.close() - close() - } +interface Task { + val type: TaskType + suspend fun execute(): Unit } -class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker") { - - /** - * Performs git related tasks, like cloning the given repository. - */ - override suspend fun executeTask(task: Task) { - // simulate a long running task - logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker") +class GitTask( + override val type: TaskType, + private val gitUrl: String, + private val responseChannel: suspend (task: Task) -> Unit +): Task { + override suspend fun execute() { + println("Cloning $gitUrl") + delay(5000) + responseChannel(DoneTask("Done 1 with gitUrl $gitUrl")) delay(5000) - responseChannel.send(Task(TaskType.DONE, "Task with payload ${task.payload} finished")) - logger.info("[${Thread.currentThread().name}] Finished task ${task.payload} in gitworker") + responseChannel(DoneTask("Done 2 with gitUrl $gitUrl")) + // val git: Git = Git.cloneRepository() +// .setURI("https://github.com/eclipse/jgit.git") +// .setDirectory("/path/to/repo") +// .call() } +} +class DoneTask(private val message: String? = null): Task { + override val type: TaskType = TaskType.DONE + override suspend fun execute() { + println("[${Thread.currentThread().name}] I'm done $message") + } } + /** - * Abstract worker class used as a base for multithreaded worker. - * It uses a fixed thread pool as a base for the coroutineScope. - * The size of the thread pool is equal to the number of cores. + * The Task Manager takes tasks and distributes them to + * underlying workers. Internally it uses a channel + * to manage incoming tasks. */ -abstract class Worker(private val name: String): IWorker { - - // TODO: this could be dangerous to give it unlimited memory - protected val tasks = Channel<Task>(Channel.UNLIMITED) - private val numCores = Runtime.getRuntime().availableProcessors() - // use a fixed size thread pool context with numCores threads - @OptIn(DelicateCoroutinesApi::class) - private val threadPoolContext = newFixedThreadPoolContext(numCores, name) - private val coroutineScope = CoroutineScope(threadPoolContext) +@Component +class TaskManager: IWorker { - protected val logger = getLogger(javaClass) + private val worker = Worker("Worker") + private val logger = getLogger(javaClass) override suspend fun addTask(task: Task) { - tasks.send(task) - } - - init { - logger.info("Starting $numCores workers.") - repeat(numCores) { - logger.info("[${Thread.currentThread().name}] launching coroutine in $name") - launchWorker(it) - } - } - - /** - * Wrapper function to execute tasks provided through the channel. - * We use a dedicated number of coroutines to watch the tasks channel. - * Each coroutine starts a new coroutine to execute the task. This is - * done so that the outer coroutines are not blocked by the potentially - * very long-running inner task performed by executeTask. - */ - private fun launchWorker(id: Int) = coroutineScope.launch { - for (task in tasks) { - coroutineScope.launch { - logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task") - executeTask(task) - logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task") - } + if (task.type == TaskType.DONE) { + task.execute() } + logger.info("[${Thread.currentThread().name}] add task called in Task Manager $task") + worker.addTask(task) + logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $task") } + @PreDestroy override fun close() { - coroutineScope.launch { - tasks.close() - for (job in coroutineContext[Job]!!.children) { - job.join() - } - threadPoolContext.close() - logger.info("Worker $name has been closed") - } + worker.close() } } -interface IWorker { - /** - * Adds a new task to the worker's internal tasks channel. - */ - suspend fun addTask(task: Task) - - /** - * Worker dependent function describing the worker's - * handling of the task. - * This function is automatically called. - */ - suspend fun executeTask(task: Task) - /** - * Function used to close the underlying resources. - */ - fun close() -} \ No newline at end of file diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt new file mode 100644 index 00000000..708d5ed4 --- /dev/null +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt @@ -0,0 +1,78 @@ +package de.fraunhofer.iem.dataprovider.taskManager + +import de.fraunhofer.iem.dataprovider.logger.getLogger +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel + + +interface IWorker { + /** + * Adds a new task to the worker's internal tasks channel. + */ + suspend fun addTask(task: Task) + + // TODO: check if we can use autoclosable interface here. + /** + * Function used to close the underlying resources. + */ + fun close() +} + +/** + * Abstract worker class used as a base for multithreaded worker. + * It uses a fixed thread pool as a base for the coroutineScope. + * The size of the thread pool is equal to the number of cores. + */ +class Worker(private val name: String): IWorker { + + // TODO: this could be dangerous to give it unlimited memory + private val tasks = Channel<Task>(Channel.UNLIMITED) + private val numCores = Runtime.getRuntime().availableProcessors() + // use a fixed size thread pool context with numCores threads + @OptIn(DelicateCoroutinesApi::class) + private val threadPoolContext = newFixedThreadPoolContext(numCores, name) + private val coroutineScope = CoroutineScope(threadPoolContext) + + private val logger = getLogger(javaClass) + + override suspend fun addTask(task: Task) { + tasks.send(task) + } + + init { + logger.info("Starting $numCores workers.") + repeat(numCores) { + logger.info("[${Thread.currentThread().name}] launching coroutine in $name") + launchWorker(it) + } + } + + /** + * Wrapper function to execute tasks provided through the channel. + * We use a dedicated number of coroutines to watch the tasks channel. + * Each coroutine starts a new coroutine to execute the task. This is + * done so that the outer coroutines are not blocked by the potentially + * very long-running inner task performed by executeTask. + */ + private fun launchWorker(id: Int) = coroutineScope.launch { + for (task in tasks) { + coroutineScope.launch { + logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task") + // TODO: we need to check if we want to wrap this executeTask in a try-catch block + task.execute() + logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task") + } + } + } + + override fun close() { + coroutineScope.launch { + tasks.close() + for (job in coroutineContext[Job]!!.children) { + job.join() + } + threadPoolContext.close() + logger.info("Worker $name has been closed") + } + } +} -- GitLab