From cb7187d334db061ac3eb1bce7a4f1c106c5bbc0a Mon Sep 17 00:00:00 2001
From: Jan-Niclas Struewer <j.n.struewer@gmail.com>
Date: Thu, 20 Apr 2023 15:37:42 +0200
Subject: [PATCH] created abstract worker

---
 .../iem/dataprovider/git/GitService.kt        |  5 +-
 .../dataprovider/taskManager/TaskManager.kt   | 97 +++++++------------
 2 files changed, 38 insertions(+), 64 deletions(-)

diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt
index bce20555..79bb629d 100644
--- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt
+++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/git/GitService.kt
@@ -1,6 +1,8 @@
 package de.fraunhofer.iem.dataprovider.git
 
+import de.fraunhofer.iem.dataprovider.taskManager.Task
 import de.fraunhofer.iem.dataprovider.taskManager.TaskManager
+import de.fraunhofer.iem.dataprovider.taskManager.TaskType
 import org.springframework.stereotype.Service
 
 @Service
@@ -10,7 +12,8 @@ class GitService(private val taskManager: TaskManager) {
     suspend fun cloneGit() {
         println("clone git called")
         repeat(20) {
-            taskManager.addTask("Task no $it")
+            val t = Task(TaskType.REPO_CHANGED, it)
+            taskManager.addTask(t)
         }
         //        val git: Git = Git.cloneRepository()
 //            .setURI("https://github.com/eclipse/jgit.git")
diff --git a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt
index 795e8be7..4505b69d 100644
--- a/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt
+++ b/src/main/kotlin/de/fraunhofer/iem/dataprovider/taskManager/TaskManager.kt
@@ -1,87 +1,53 @@
 package de.fraunhofer.iem.dataprovider.taskManager
 
 import de.fraunhofer.iem.dataprovider.logger.getLogger
-import jakarta.annotation.PostConstruct
-import jakarta.annotation.PreDestroy
 import kotlinx.coroutines.*
 import kotlinx.coroutines.channels.Channel
 import org.springframework.stereotype.Component
-import kotlin.coroutines.CoroutineContext
 
 enum class TaskType{
-    REPO_CHANGED
+    REPO_CHANGED,
+    DONE
 }
 data class Task(val type: TaskType, val payload: Any)
 @Component
-class TaskManager : CoroutineScope {
+class TaskManager : Worker("Task Manager") {
 
-    private val job = SupervisorJob()
-    override val coroutineContext: CoroutineContext
-        get() = Dispatchers.Default + job
+    private val gitWorker = GitWorker(tasks)
 
-    // TODO: this could be dangerous to give it unlimited memory
-    private val tasks = Channel<Task>(Channel.UNLIMITED)
-    private val numCores = Runtime.getRuntime().availableProcessors()
-    // use a fixed size thread pool context with numCores threads
-//    private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker")
-
-    private val gitWorker = GitWorker()
-
-    private val logger = getLogger(javaClass)
-
-    @PostConstruct
-    fun startWorkers() {
-        logger.info("Starting $numCores workers.")
-        repeat(numCores-1) {
-            logger.info("[${Thread.currentThread().name}] launching coroutine")
-            val j = launchWorker(it)
-        }
-    }
-
-    private fun launchWorker(id: Int) = launch {
-
-        for (task in tasks) {
-            when (task.type) {
-                TaskType.REPO_CHANGED -> {
-                    logger.info("[${Thread.currentThread().name}] add task called in $id")
-                    gitWorker.addTask(task)
-                    logger.info("[${Thread.currentThread().name}] add task finished in $id")
-                }
+    override suspend fun executeTask(task: Task) {
+        when (task.type) {
+            TaskType.REPO_CHANGED -> {
+                logger.info("[${Thread.currentThread().name}] add task called in ${task.payload}")
+                gitWorker.addTask(task)
+                logger.info("[${Thread.currentThread().name}] add task finished in ${task.payload}")
+            }
+            TaskType.DONE -> {
+                logger.info("${Thread.currentThread().name}] Task done ${task.payload}")
             }
         }
     }
-
-
-    suspend fun addTask(task: String) {
-        tasks.send(Task(TaskType.REPO_CHANGED, task))
-    }
-
-    @PreDestroy
-    fun stop() {
-        job.cancel()
-    }
 }
 
-class GitWorker: Worker() {
+class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker") {
+
     override suspend fun executeTask(task: Task) {
         // simulate a long running task
         logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker")
         delay(5000)
+        responseChannel.send(Task(TaskType.DONE, "Task with payload ${task.payload} finished"))
         logger.info("[${Thread.currentThread().name}] Finished task ${task.payload} in gitworker")
     }
 
 }
-abstract class Worker: IWorker, CoroutineScope {
-
-    private val job = SupervisorJob()
-    override val coroutineContext: CoroutineContext
-        get() = Dispatchers.Default + job
+abstract class Worker(private val name: String): IWorker {
 
     // TODO: this could be dangerous to give it unlimited memory
-    private val tasks = Channel<Task>(Channel.UNLIMITED)
+    protected val tasks = Channel<Task>(Channel.UNLIMITED)
     private val numCores = Runtime.getRuntime().availableProcessors()
     // use a fixed size thread pool context with numCores threads
-    private val threadPoolContext = newFixedThreadPoolContext(numCores, "Worker")
+    private val threadPoolContext = newFixedThreadPoolContext(numCores, name)
+    private val coroutineScope = CoroutineScope(threadPoolContext)
 
     protected val logger = getLogger(javaClass)
     override suspend fun addTask(task: Task) {
@@ -90,32 +56,37 @@ abstract class Worker: IWorker, CoroutineScope {
 
     init {
         logger.info("Starting $numCores workers.")
-        repeat(numCores-1) {
-            logger.info("[${Thread.currentThread().name}] launching coroutine")
+        repeat(numCores) {
+            logger.info("[${Thread.currentThread().name}] launching coroutine in $name")
             launchWorker(it)
         }
     }
 
-    private fun launchWorker(id: Int) = launch(threadPoolContext) {
+    private fun launchWorker(id: Int) = coroutineScope.launch {
 
         for (task in tasks) {
-            launch {
-                logger.debug("[${Thread.currentThread().name}] Processor #$id received $task")
+            coroutineScope.launch {
+                logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task")
                 executeTask(task)
-                logger.debug("[${Thread.currentThread().name}] Processor #$id finished $task")
+                logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task")
             }
         }
     }
 
     override fun close() {
-        threadPoolContext.close()
+        coroutineScope.launch {
+            tasks.close()
+            for (job in coroutineContext[Job]!!.children) {
+                job.join()
+            }
+            threadPoolContext.close()
+            logger.info("Worker $name has been closed")
+        }
     }
 }
 
 interface IWorker {
     suspend fun addTask(task: Task)
     suspend fun executeTask(task: Task)
-
     fun close()
-
 }
\ No newline at end of file
-- 
GitLab