Skip to content
Snippets Groups Projects
Verified Commit e0929a27 authored by Jan-Niclas Strüwer's avatar Jan-Niclas Strüwer
Browse files

started standalone worker implementation

parent f59ef65d
No related branches found
No related tags found
No related merge requests found
...@@ -20,11 +20,12 @@ class TaskManager : CoroutineScope { ...@@ -20,11 +20,12 @@ class TaskManager : CoroutineScope {
get() = Dispatchers.Default + job get() = Dispatchers.Default + job
// TODO: this could be dangerous to give it unlimited memory // TODO: this could be dangerous to give it unlimited memory
private val tasks = Channel<String>(Channel.UNLIMITED) private val tasks = Channel<Task>(Channel.UNLIMITED)
private val numCores = Runtime.getRuntime().availableProcessors() private val numCores = Runtime.getRuntime().availableProcessors()
// use a fixed size thread pool context with numCores threads // use a fixed size thread pool context with numCores threads
private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker") // private val threadPoolContext = newFixedThreadPoolContext(numCores, "TaskManagerWorker")
private val gitWorker = GitWorker()
private val logger = getLogger(javaClass) private val logger = getLogger(javaClass)
...@@ -33,36 +34,88 @@ class TaskManager : CoroutineScope { ...@@ -33,36 +34,88 @@ class TaskManager : CoroutineScope {
logger.info("Starting $numCores workers.") logger.info("Starting $numCores workers.")
repeat(numCores-1) { repeat(numCores-1) {
logger.info("[${Thread.currentThread().name}] launching coroutine") logger.info("[${Thread.currentThread().name}] launching coroutine")
launchWorker(it) val j = launchWorker(it)
} }
} }
private fun launchWorker(id: Int) = launch(threadPoolContext) { private fun launchWorker(id: Int) = launch {
for (task in tasks) { for (task in tasks) {
launch { when (task.type) {
logger.debug("[${Thread.currentThread().name}] Processor #$id received $task") TaskType.REPO_CHANGED -> {
executeTask(task) logger.info("[${Thread.currentThread().name}] add task called in $id")
logger.debug("[${Thread.currentThread().name}] Processor #$id finished $task") gitWorker.addTask(task)
logger.info("[${Thread.currentThread().name}] add task finished in $id")
}
} }
} }
} }
private suspend fun executeTask(task: String) {
suspend fun addTask(task: String) {
tasks.send(Task(TaskType.REPO_CHANGED, task))
}
@PreDestroy
fun stop() {
job.cancel()
}
}
class GitWorker: Worker() {
override suspend fun executeTask(task: Task) {
// simulate a long running task // simulate a long running task
logger.info("[${Thread.currentThread().name}] Starting task ${task.payload} in gitworker")
delay(5000) delay(5000)
logger.info("[${Thread.currentThread().name}] Finished task ${task.payload} in gitworker")
} }
fun addTask(task: String) { }
launch { abstract class Worker: IWorker, CoroutineScope {
tasks.send(task)
private val job = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
// TODO: this could be dangerous to give it unlimited memory
private val tasks = Channel<Task>(Channel.UNLIMITED)
private val numCores = Runtime.getRuntime().availableProcessors()
// use a fixed size thread pool context with numCores threads
private val threadPoolContext = newFixedThreadPoolContext(numCores, "Worker")
protected val logger = getLogger(javaClass)
override suspend fun addTask(task: Task) {
tasks.send(task)
}
init {
logger.info("Starting $numCores workers.")
repeat(numCores-1) {
logger.info("[${Thread.currentThread().name}] launching coroutine")
launchWorker(it)
} }
} }
@PreDestroy private fun launchWorker(id: Int) = launch(threadPoolContext) {
fun stop() {
job.cancel() for (task in tasks) {
threadPoolContext.close() launch {
logger.debug("[${Thread.currentThread().name}] Processor #$id received $task")
executeTask(task)
logger.debug("[${Thread.currentThread().name}] Processor #$id finished $task")
}
}
} }
override fun close() {
threadPoolContext.close()
}
} }
interface IWorker {
suspend fun addTask(task: Task)
suspend fun executeTask(task: Task)
fun close()
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment

Consent

On this website, we use the web analytics service Matomo to analyze and review the use of our website. Through the collected statistics, we can improve our offerings and make them more appealing for you. Here, you can decide whether to allow us to process your data and set corresponding cookies for these purposes, in addition to technically necessary cookies. Further information on data protection—especially regarding "cookies" and "Matomo"—can be found in our privacy policy. You can withdraw your consent at any time.