diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt index 9ffefe4d8fb540e954c9d9647d08fb067f80b10f..52b245f540565925b0ab6ca15e9fbb312c0a8f07 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt @@ -6,6 +6,7 @@ import de.fraunhofer.iem.dataprovider.taskManager.tasks.ITask import de.fraunhofer.iem.dataprovider.taskManager.tasks.TaskType import jakarta.annotation.PreDestroy import kotlinx.coroutines.* +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.actor import org.springframework.stereotype.Component @@ -27,15 +28,39 @@ class TaskManager: IWorker { // Should be used for long-running CPU heavy tasks, which potentially block. // private val customWorker = getCustomThreadpoolWorker() - private val logger = getLogger(javaClass) private val mainScope = CoroutineScope(Dispatchers.Default) - private val actor = mainScope.taskActor() + private val actor:SendChannel<ITask> = mainScope.taskActor() + override suspend fun addTask(task: ITask) { - actor.send(task) + actor.send(task) } + @OptIn(ObsoleteCoroutinesApi::class) // This API is meant to be improved since 2017... so this should be good to use. + // IntelliJ might tell u that it is ok to remove <ITask> from the actor declaration. + // IT IS NOT! this will lead to very strange gradle compile errors! private fun CoroutineScope.taskActor() = actor<ITask> { - for (task in channel) { + with(MyActor()) { + for (task in channel) { + onReceive(task) + } + } + } + + @PreDestroy + override fun close() { + actor.close() + worker.close() + ioWorker.close() + } + + /** + * The state in this internal class is thread safe. + * See https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html#actors + */ + private inner class MyActor { + + private val logger = getLogger(javaClass) + suspend fun onReceive(task: ITask) { logger.info("[${Thread.currentThread().name}] add task called in Task Manager $task") // For test purposes we have the Done task, which prints a control // message printed in the task manager. @@ -49,10 +74,6 @@ class TaskManager: IWorker { } logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $task") } - } - @PreDestroy - override fun close() { - worker.close() } } diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt index 7715df2978aa28f3e60fbc4c5c1763ced717cc48..900b17dc8502f8f3c2f646209b0144ecd3095951 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/Worker.kt @@ -4,6 +4,7 @@ import de.fraunhofer.iem.dataprovider.logger.getLogger import de.fraunhofer.iem.dataprovider.taskManager.tasks.ITask import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import java.util.UUID interface IWorker { @@ -41,13 +42,16 @@ class Worker(private val name: String, ): IWorker { // TODO: this could be dangerous to give it unlimited memory - private val tasks = Channel<ITask>(Channel.UNLIMITED) - + private val tasksChannel = Channel<ITask>(Channel.UNLIMITED) + // TODO: the task map should be cleared to reduce memory usage. + // This needs to be triggered from outside a worker as soon as + // all tasks related to the specific tasks are completed. + private val taskMap = HashMap<UUID, Pair<ITask, Job>>() private val logger = getLogger(javaClass) override suspend fun addTask(task: ITask) { - tasks.send(task) + tasksChannel.send(task) } init { @@ -66,19 +70,20 @@ class Worker(private val name: String, * very long-running inner task performed by executeTask. */ private fun launchWorker(id: Int) = coroutineScope.launch { - for (task in tasks) { - coroutineScope.launch { + for (task in tasksChannel) { + val job = coroutineScope.launch { logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task") // TODO: we need to check if we want to wrap this executeTask in a try-catch block task.execute() logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task") } + taskMap[task.taskID] = Pair(task, job) } } override fun close() { coroutineScope.launch { - tasks.close() + tasksChannel.close() for (job in coroutineContext[Job]!!.children) { job.join() } diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt index 09ff462425c9ee90e3fbe9c0df56140c3e73d6b3..0dddbbf925826a34e6a584f48f60c9afd0bf8fb1 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitTask.kt @@ -8,8 +8,10 @@ data class GitProject(val name: String, val uri: String) class GitTask( override val type: TaskType, private val gitProject: GitProject, + private val outputPath: String, private val responseChannel: suspend (task: ITask) -> Unit ): Task() { + override suspend fun execute() { logger.info("Cloning ${gitProject.name}") diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt index 67e1ff8b5fde47cd69588d0a7349af65419a39d1..5839c41077e647b626a5a44a419d83bd3e71a9fa 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/GitlabTask.kt @@ -18,7 +18,7 @@ sealed class GetGitlabProjectTask( val projectUri = project.httpUrlToRepo val gitProject = GitProject(project.name, projectUri) logger.info("Retrieved project ${project.path} and url $projectUri") - responseChannel(GitTask(this.type, gitProject, responseChannel)) +// responseChannel(GitTask(this.type, gitProject, responseChannel)) } } @@ -27,6 +27,7 @@ class GetGitlabProjectPathTask( private val gitlabConfiguration: GitlabConfiguration, private val responseChannel: suspend (task: ITask) -> Unit ): GetGitlabProjectTask(repoId = repoId, gitlabConfiguration = gitlabConfiguration, responseChannel = responseChannel ) + class GetGitlabProjectIdTask( override val repoId: Long, private val gitlabConfiguration: GitlabConfiguration, diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt index 780ccf2c1aebd2750b87cb07575e07009902f17d..92d6b88dd9a3c826303d1b9ac88e465a7d982464 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/ProcessTask.kt @@ -18,12 +18,14 @@ sealed class ProcessTask(protected open vararg val flags: String): Task() { handleProcessReturn(p1) } } - } abstract suspend fun handleProcessReturn(p: Process) } +/** + * Dummy Task to simulate running external processes. + */ class JavaTask( override val type: TaskType, override vararg val flags: String, diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt index 5168bacf66da4b688aa8b4621ba3f4798d0dd2f5..72ad0c41c878b4c91c019e490bf61c31b8ac5d4d 100644 --- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt +++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/tasks/Task.kt @@ -10,9 +10,11 @@ enum class TaskType { interface ITask { val type: TaskType suspend fun execute() + val taskID: UUID } -sealed class Task(protected val taskID: UUID = UUID.randomUUID()): ITask { +sealed class Task: ITask { + override val taskID: UUID = UUID.randomUUID() protected val logger = getLogger(javaClass) } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7ea02d250b2e5faad085be0f7d9a54e50ca99b4b..10edc1c6cfd14c290796fb23bd18e44afdc98647 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -4,4 +4,7 @@ OPENCODE_GITLAB_TOKEN=${OPENCODE_TOKEN} GITHUB_URL=https://gitlab.opencode.de/ GITHUB_TOKEN=${GITHUB_TOKEN} spring.r2dbc.url=${R2DBC_URL} + +GIT_PROJECT_PATH=${GIT_PROJECT_PATH} +ODC_OUTPUT_PATH=${ODC_OUTPUT_PATH} #logging.level.root=DEBUG \ No newline at end of file