From 9fb2b4e5c81f3249be1144079a4da8683e2a6c29 Mon Sep 17 00:00:00 2001 From: Jan-Niclas Struewer <j.n.struewer@gmail.com> Date: Wed, 3 May 2023 11:32:09 +0200 Subject: [PATCH] upated task manager to use an inner class to capsulate mutally exclusive actor state --- .../dataprovider/taskManager/TaskManager.kt | 37 +++++++++++++++---- .../iem/dataprovider/taskManager/Worker.kt | 17 ++++++--- .../dataprovider/taskManager/tasks/GitTask.kt | 2 + .../taskManager/tasks/GitlabTask.kt | 3 +- .../taskManager/tasks/ProcessTask.kt | 4 +- .../dataprovider/taskManager/tasks/Task.kt | 4 +- src/main/resources/application.properties | 3 ++ 7 files changed, 53 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index 9ffefe4d..52b245f5 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -6,6 +6,7 @@ import de.fraunhofer.iem.dataprovider.taskManager.tasks.ITask import de.fraunhofer.iem.dataprovider.taskManager.tasks.TaskType import jakarta.annotation.PreDestroy import kotlinx.coroutines.* +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.actor import org.springframework.stereotype.Component @@ -27,15 +28,39 @@ class TaskManager: IWorker { // Should be used for long-running CPU heavy tasks, which potentially block. // private val customWorker = getCustomThreadpoolWorker() - private val logger = getLogger(javaClass) private val mainScope = CoroutineScope(Dispatchers.Default) - private val actor = mainScope.taskActor() + private val actor:SendChannel<ITask> = mainScope.taskActor() + override suspend fun addTask(task: ITask) { - actor.send(task) + actor.send(task) } + @OptIn(ObsoleteCoroutinesApi::class) // This API is meant to be improved since 2017... so this should be good to use. + // IntelliJ might tell u that it is ok to remove <ITask> from the actor declaration. + // IT IS NOT! this will lead to very strange gradle compile errors! private fun CoroutineScope.taskActor() = actor<ITask> { - for (task in channel) { + with(MyActor()) { + for (task in channel) { + onReceive(task) + } + } + } + + @PreDestroy + override fun close() { + actor.close() + worker.close() + ioWorker.close() + } + + /** + * The state in this internal class is thread safe. + * See https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html#actors + */ + private inner class MyActor { + + private val logger = getLogger(javaClass) + suspend fun onReceive(task: ITask) { logger.info("[${Thread.currentThread().name}] add task called in Task Manager $task") // For test purposes we have the Done task, which prints a control // message printed in the task manager. @@ -49,10 +74,6 @@ class TaskManager: IWorker { } logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $task") } - } - @PreDestroy - override fun close() { - worker.close() } } diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt index 7715df29..900b17dc 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt @@ -4,6 +4,7 @@ import de.fraunhofer.iem.dataprovider.logger.getLogger import de.fraunhofer.iem.dataprovider.taskManager.tasks.ITask import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import java.util.UUID interface IWorker { @@ -41,13 +42,16 @@ class Worker(private val name: String, ): IWorker { // TODO: this could be dangerous to give it unlimited memory - private val tasks = Channel<ITask>(Channel.UNLIMITED) - + private val tasksChannel = Channel<ITask>(Channel.UNLIMITED) + // TODO: the task map should be cleared to reduce memory usage. + // This needs to be triggered from outside a worker as soon as + // all tasks related to the specific tasks are completed. + private val taskMap = HashMap<UUID, Pair<ITask, Job>>() private val logger = getLogger(javaClass) override suspend fun addTask(task: ITask) { - tasks.send(task) + tasksChannel.send(task) } init { @@ -66,19 +70,20 @@ class Worker(private val name: String, * very long-running inner task performed by executeTask. */ private fun launchWorker(id: Int) = coroutineScope.launch { - for (task in tasks) { - coroutineScope.launch { + for (task in tasksChannel) { + val job = coroutineScope.launch { logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task") // TODO: we need to check if we want to wrap this executeTask in a try-catch block task.execute() logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task") } + taskMap[task.taskID] = Pair(task, job) } } override fun close() { coroutineScope.launch { - tasks.close() + tasksChannel.close() for (job in coroutineContext[Job]!!.children) { job.join() } diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt index 09ff4624..0dddbbf9 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt @@ -8,8 +8,10 @@ data class GitProject(val name: String, val uri: String) class GitTask( override val type: TaskType, private val gitProject: GitProject, + private val outputPath: String, private val responseChannel: suspend (task: ITask) -> Unit ): Task() { + override suspend fun execute() { logger.info("Cloning ${gitProject.name}") diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt index 67e1ff8b..5839c410 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt @@ -18,7 +18,7 @@ sealed class GetGitlabProjectTask( val projectUri = project.httpUrlToRepo val gitProject = GitProject(project.name, projectUri) logger.info("Retrieved project ${project.path} and url $projectUri") - responseChannel(GitTask(this.type, gitProject, responseChannel)) +// responseChannel(GitTask(this.type, gitProject, responseChannel)) } } @@ -27,6 +27,7 @@ class GetGitlabProjectPathTask( private val gitlabConfiguration: GitlabConfiguration, private val responseChannel: suspend (task: ITask) -> Unit ): GetGitlabProjectTask(repoId = repoId, gitlabConfiguration = gitlabConfiguration, responseChannel = responseChannel ) + class GetGitlabProjectIdTask( override val repoId: Long, private val gitlabConfiguration: GitlabConfiguration, diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt index 780ccf2c..92d6b88d 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt @@ -18,12 +18,14 @@ sealed class ProcessTask(protected open vararg val flags: String): Task() { handleProcessReturn(p1) } } - } abstract suspend fun handleProcessReturn(p: Process) } +/** + * Dummy Task to simulate running external processes. + */ class JavaTask( override val type: TaskType, override vararg val flags: String, diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt index 5168bacf..72ad0c41 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt @@ -10,9 +10,11 @@ enum class TaskType { interface ITask { val type: TaskType suspend fun execute() + val taskID: UUID } -sealed class Task(protected val taskID: UUID = UUID.randomUUID()): ITask { +sealed class Task: ITask { + override val taskID: UUID = UUID.randomUUID() protected val logger = getLogger(javaClass) } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7ea02d25..10edc1c6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -4,4 +4,7 @@ OPENCODE_GITLAB_TOKEN=${OPENCODE_TOKEN} GITHUB_URL=https://gitlab.opencode.de/ GITHUB_TOKEN=${GITHUB_TOKEN} spring.r2dbc.url=${R2DBC_URL} + +GIT_PROJECT_PATH=${GIT_PROJECT_PATH} +ODC_OUTPUT_PATH=${ODC_OUTPUT_PATH} #logging.level.root=DEBUG \ No newline at end of file -- GitLab