Skip to content
Snippets Groups Projects
Verified Commit b152ddf3 authored by Jan-Niclas Strüwer's avatar Jan-Niclas Strüwer
Browse files

added code comments

parent cb7187d3
No related branches found
No related tags found
No related merge requests found
package de.fraunhofer.iem.dataprovider.taskManager package de.fraunhofer.iem.dataprovider.taskManager
import de.fraunhofer.iem.dataprovider.logger.getLogger import de.fraunhofer.iem.dataprovider.logger.getLogger
import jakarta.annotation.PreDestroy
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
...@@ -10,11 +11,21 @@ enum class TaskType{ ...@@ -10,11 +11,21 @@ enum class TaskType{
DONE DONE
} }
data class Task(val type: TaskType, val payload: Any) data class Task(val type: TaskType, val payload: Any)
/**
* The Task Manager takes tasks and distributes them to
* underlying workers. Internally it uses a channel
* to manage incoming tasks.
*/
@Component @Component
class TaskManager : Worker("Task Manager") { class TaskManager : Worker("Task Manager") {
private val gitWorker = GitWorker(tasks) private val gitWorker = GitWorker(tasks)
/**
* Used to distribute incoming tasks to the fitting workers
* depending on the task's type.
*/
override suspend fun executeTask(task: Task) { override suspend fun executeTask(task: Task) {
when (task.type) { when (task.type) {
TaskType.REPO_CHANGED -> { TaskType.REPO_CHANGED -> {
...@@ -27,10 +38,19 @@ class TaskManager : Worker("Task Manager") { ...@@ -27,10 +38,19 @@ class TaskManager : Worker("Task Manager") {
} }
} }
} }
@PreDestroy
private fun destruct() {
gitWorker.close()
close()
}
} }
class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker") { class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker") {
/**
* Performs git related tasks, like cloning the given repository.
*/
override suspend fun executeTask(task: Task) { override suspend fun executeTask(task: Task) {
// simulate a long running task // simulate a long running task
logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker") logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker")
...@@ -40,16 +60,24 @@ class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker" ...@@ -40,16 +60,24 @@ class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker"
} }
} }
/**
* Abstract worker class used as a base for multithreaded worker.
* It uses a fixed thread pool as a base for the coroutineScope.
* The size of the thread pool is equal to the number of cores.
*/
abstract class Worker(private val name: String): IWorker { abstract class Worker(private val name: String): IWorker {
// TODO: this could be dangerous to give it unlimited memory // TODO: this could be dangerous to give it unlimited memory
protected val tasks = Channel<Task>(Channel.UNLIMITED) protected val tasks = Channel<Task>(Channel.UNLIMITED)
private val numCores = Runtime.getRuntime().availableProcessors() private val numCores = Runtime.getRuntime().availableProcessors()
// use a fixed size thread pool context with numCores threads // use a fixed size thread pool context with numCores threads
@OptIn(DelicateCoroutinesApi::class)
private val threadPoolContext = newFixedThreadPoolContext(numCores, name) private val threadPoolContext = newFixedThreadPoolContext(numCores, name)
private val coroutineScope = CoroutineScope(threadPoolContext) private val coroutineScope = CoroutineScope(threadPoolContext)
protected val logger = getLogger(javaClass) protected val logger = getLogger(javaClass)
override suspend fun addTask(task: Task) { override suspend fun addTask(task: Task) {
tasks.send(task) tasks.send(task)
} }
...@@ -62,8 +90,14 @@ abstract class Worker(private val name: String): IWorker { ...@@ -62,8 +90,14 @@ abstract class Worker(private val name: String): IWorker {
} }
} }
/**
* Wrapper function to execute tasks provided through the channel.
* We use a dedicated number of coroutines to watch the tasks channel.
* Each coroutine starts a new coroutine to execute the task. This is
* done so that the outer coroutines are not blocked by the potentially
* very long-running inner task performed by executeTask.
*/
private fun launchWorker(id: Int) = coroutineScope.launch { private fun launchWorker(id: Int) = coroutineScope.launch {
for (task in tasks) { for (task in tasks) {
coroutineScope.launch { coroutineScope.launch {
logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task") logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task")
...@@ -86,7 +120,20 @@ abstract class Worker(private val name: String): IWorker { ...@@ -86,7 +120,20 @@ abstract class Worker(private val name: String): IWorker {
} }
interface IWorker { interface IWorker {
/**
* Adds a new task to the worker's internal tasks channel.
*/
suspend fun addTask(task: Task) suspend fun addTask(task: Task)
/**
* Worker dependent function describing the worker's
* handling of the task.
* This function is automatically called.
*/
suspend fun executeTask(task: Task) suspend fun executeTask(task: Task)
/**
* Function used to close the underlying resources.
*/
fun close() fun close()
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment

Consent

On this website, we use the web analytics service Matomo to analyze and review the use of our website. Through the collected statistics, we can improve our offerings and make them more appealing for you. Here, you can decide whether to allow us to process your data and set corresponding cookies for these purposes, in addition to technically necessary cookies. Further information on data protection—especially regarding "cookies" and "Matomo"—can be found in our privacy policy. You can withdraw your consent at any time.