Skip to content
Snippets Groups Projects
Verified Commit b1dd3a41 authored by Jan-Niclas Strüwer's avatar Jan-Niclas Strüwer
Browse files

restructured tasks, worker, and task manager

parent b152ddf3
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,5 @@ import org.springframework.boot.runApplication
class DataProviderApplication
fun main(args: Array<String>) {
println("Started app [${Thread.currentThread().name}]")
runApplication<DataProviderApplication>(*args)
}
package de.fraunhofer.iem.dataprovider.git
import de.fraunhofer.iem.dataprovider.taskManager.GitTask
import de.fraunhofer.iem.dataprovider.taskManager.Task
import de.fraunhofer.iem.dataprovider.taskManager.TaskManager
import de.fraunhofer.iem.dataprovider.taskManager.TaskType
......@@ -8,16 +9,13 @@ import org.springframework.stereotype.Service
@Service
class GitService(private val taskManager: TaskManager) {
val list = listOf("https://github.com/eclipse/jgit.git", "https://github.com/secure-software-engineering/phasar", "https://github.com/secure-software-engineering/FlowDroid")
suspend fun cloneGit() {
println("clone git called")
repeat(20) {
val t = Task(TaskType.REPO_CHANGED, it)
for (e in list) {
val t = GitTask(TaskType.REPO_CHANGED, e, taskManager::addTask)
taskManager.addTask(t)
}
// val git: Git = Git.cloneRepository()
// .setURI("https://github.com/eclipse/jgit.git")
// .setDirectory("/path/to/repo")
// .call()
}
}
\ No newline at end of file
......@@ -2,138 +2,68 @@ package de.fraunhofer.iem.dataprovider.taskManager
import de.fraunhofer.iem.dataprovider.logger.getLogger
import jakarta.annotation.PreDestroy
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import org.springframework.stereotype.Component
enum class TaskType{
enum class TaskType {
REPO_CHANGED,
DONE
}
data class Task(val type: TaskType, val payload: Any)
/**
* The Task Manager takes tasks and distributes them to
* underlying workers. Internally it uses a channel
* to manage incoming tasks.
*/
@Component
class TaskManager : Worker("Task Manager") {
private val gitWorker = GitWorker(tasks)
/**
* Used to distribute incoming tasks to the fitting workers
* depending on the task's type.
*/
override suspend fun executeTask(task: Task) {
when (task.type) {
TaskType.REPO_CHANGED -> {
logger.info("[${Thread.currentThread().name}] add task called in ${task.payload}")
gitWorker.addTask(task)
logger.info("[${Thread.currentThread().name}] add task finished in ${task.payload}")
}
TaskType.DONE -> {
logger.info("${Thread.currentThread().name}] Task done ${task.payload}")
}
}
}
@PreDestroy
private fun destruct() {
gitWorker.close()
close()
}
interface Task {
val type: TaskType
suspend fun execute(): Unit
}
class GitWorker(private val responseChannel: Channel<Task>): Worker("Git Worker") {
/**
* Performs git related tasks, like cloning the given repository.
*/
override suspend fun executeTask(task: Task) {
// simulate a long running task
logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker")
class GitTask(
override val type: TaskType,
private val gitUrl: String,
private val responseChannel: suspend (task: Task) -> Unit
): Task {
override suspend fun execute() {
println("Cloning $gitUrl")
delay(5000)
responseChannel(DoneTask("Done 1 with gitUrl $gitUrl"))
delay(5000)
responseChannel.send(Task(TaskType.DONE, "Task with payload ${task.payload} finished"))
logger.info("[${Thread.currentThread().name}] Finished task ${task.payload} in gitworker")
responseChannel(DoneTask("Done 2 with gitUrl $gitUrl"))
// val git: Git = Git.cloneRepository()
// .setURI("https://github.com/eclipse/jgit.git")
// .setDirectory("/path/to/repo")
// .call()
}
}
class DoneTask(private val message: String? = null): Task {
override val type: TaskType = TaskType.DONE
override suspend fun execute() {
println("[${Thread.currentThread().name}] I'm done $message")
}
}
/**
* Abstract worker class used as a base for multithreaded worker.
* It uses a fixed thread pool as a base for the coroutineScope.
* The size of the thread pool is equal to the number of cores.
* The Task Manager takes tasks and distributes them to
* underlying workers. Internally it uses a channel
* to manage incoming tasks.
*/
abstract class Worker(private val name: String): IWorker {
// TODO: this could be dangerous to give it unlimited memory
protected val tasks = Channel<Task>(Channel.UNLIMITED)
private val numCores = Runtime.getRuntime().availableProcessors()
// use a fixed size thread pool context with numCores threads
@OptIn(DelicateCoroutinesApi::class)
private val threadPoolContext = newFixedThreadPoolContext(numCores, name)
private val coroutineScope = CoroutineScope(threadPoolContext)
@Component
class TaskManager: IWorker {
protected val logger = getLogger(javaClass)
private val worker = Worker("Worker")
private val logger = getLogger(javaClass)
override suspend fun addTask(task: Task) {
tasks.send(task)
}
init {
logger.info("Starting $numCores workers.")
repeat(numCores) {
logger.info("[${Thread.currentThread().name}] launching coroutine in $name")
launchWorker(it)
}
}
/**
* Wrapper function to execute tasks provided through the channel.
* We use a dedicated number of coroutines to watch the tasks channel.
* Each coroutine starts a new coroutine to execute the task. This is
* done so that the outer coroutines are not blocked by the potentially
* very long-running inner task performed by executeTask.
*/
private fun launchWorker(id: Int) = coroutineScope.launch {
for (task in tasks) {
coroutineScope.launch {
logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task")
executeTask(task)
logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task")
}
if (task.type == TaskType.DONE) {
task.execute()
}
logger.info("[${Thread.currentThread().name}] add task called in Task Manager $task")
worker.addTask(task)
logger.info("[${Thread.currentThread().name}] add task finished in Task Manager $task")
}
@PreDestroy
override fun close() {
coroutineScope.launch {
tasks.close()
for (job in coroutineContext[Job]!!.children) {
job.join()
}
threadPoolContext.close()
logger.info("Worker $name has been closed")
}
worker.close()
}
}
interface IWorker {
/**
* Adds a new task to the worker's internal tasks channel.
*/
suspend fun addTask(task: Task)
/**
* Worker dependent function describing the worker's
* handling of the task.
* This function is automatically called.
*/
suspend fun executeTask(task: Task)
/**
* Function used to close the underlying resources.
*/
fun close()
}
\ No newline at end of file
package de.fraunhofer.iem.dataprovider.taskManager
import de.fraunhofer.iem.dataprovider.logger.getLogger
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
interface IWorker {
/**
* Adds a new task to the worker's internal tasks channel.
*/
suspend fun addTask(task: Task)
// TODO: check if we can use autoclosable interface here.
/**
* Function used to close the underlying resources.
*/
fun close()
}
/**
* Abstract worker class used as a base for multithreaded worker.
* It uses a fixed thread pool as a base for the coroutineScope.
* The size of the thread pool is equal to the number of cores.
*/
class Worker(private val name: String): IWorker {
// TODO: this could be dangerous to give it unlimited memory
private val tasks = Channel<Task>(Channel.UNLIMITED)
private val numCores = Runtime.getRuntime().availableProcessors()
// use a fixed size thread pool context with numCores threads
@OptIn(DelicateCoroutinesApi::class)
private val threadPoolContext = newFixedThreadPoolContext(numCores, name)
private val coroutineScope = CoroutineScope(threadPoolContext)
private val logger = getLogger(javaClass)
override suspend fun addTask(task: Task) {
tasks.send(task)
}
init {
logger.info("Starting $numCores workers.")
repeat(numCores) {
logger.info("[${Thread.currentThread().name}] launching coroutine in $name")
launchWorker(it)
}
}
/**
* Wrapper function to execute tasks provided through the channel.
* We use a dedicated number of coroutines to watch the tasks channel.
* Each coroutine starts a new coroutine to execute the task. This is
* done so that the outer coroutines are not blocked by the potentially
* very long-running inner task performed by executeTask.
*/
private fun launchWorker(id: Int) = coroutineScope.launch {
for (task in tasks) {
coroutineScope.launch {
logger.debug("[${Thread.currentThread().name}] Processor #$id-$name received $task")
// TODO: we need to check if we want to wrap this executeTask in a try-catch block
task.execute()
logger.debug("[${Thread.currentThread().name}] Processor #$id-$name finished $task")
}
}
}
override fun close() {
coroutineScope.launch {
tasks.close()
for (job in coroutineContext[Job]!!.children) {
job.join()
}
threadPoolContext.close()
logger.info("Worker $name has been closed")
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment

Consent

On this website, we use the web analytics service Matomo to analyze and review the use of our website. Through the collected statistics, we can improve our offerings and make them more appealing for you. Here, you can decide whether to allow us to process your data and set corresponding cookies for these purposes, in addition to technically necessary cookies. Further information on data protection—especially regarding "cookies" and "Matomo"—can be found in our privacy policy. You can withdraw your consent at any time.