Skip to content
Snippets Groups Projects
Verified Commit b126c297 authored by Jan-Niclas Strüwer's avatar Jan-Niclas Strüwer
Browse files

Exception handling inside the worker to keep crashing tasks from crashing the whole application

parent 7794764e
No related branches found
No related tags found
1 merge request!3refactoring
...@@ -45,13 +45,14 @@ dependencies { ...@@ -45,13 +45,14 @@ dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
implementation("org.gitlab4j:gitlab4j-api:6.0.0-rc.1") implementation("org.gitlab4j:gitlab4j-api:6.0.0-rc.1")
implementation("org.eclipse.jgit:org.eclipse.jgit:6.5.0.202303070854-r") implementation("org.eclipse.jgit:org.eclipse.jgit:6.5.0.202303070854-r")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.0") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1")
implementation("org.springframework.boot:spring-boot-starter-actuator") implementation("org.springframework.boot:spring-boot-starter-actuator")
developmentOnly("org.springframework.boot:spring-boot-devtools") developmentOnly("org.springframework.boot:spring-boot-devtools")
runtimeOnly("org.postgresql:postgresql") runtimeOnly("org.postgresql:postgresql")
testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.security:spring-security-test") testImplementation("org.springframework.security:spring-security-test")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.4") testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.1")
testImplementation("io.mockk:mockk:1.13.5")
} }
tasks.withType<KotlinCompile> { tasks.withType<KotlinCompile> {
......
...@@ -63,6 +63,7 @@ class Worker( ...@@ -63,6 +63,7 @@ class Worker(
} }
} }
/** /**
* Wrapper function to execute tasks provided through the channel. * Wrapper function to execute tasks provided through the channel.
* We use a dedicated number of coroutines to watch the tasks channel. * We use a dedicated number of coroutines to watch the tasks channel.
...@@ -75,8 +76,15 @@ class Worker( ...@@ -75,8 +76,15 @@ class Worker(
val job = coroutineScope.launch { val job = coroutineScope.launch {
logger.debug("[{}] Processor #{}-{} received {}", Thread.currentThread().name, id, name, task) logger.debug("[{}] Processor #{}-{} received {}", Thread.currentThread().name, id, name, task)
// TODO: we need to check if we want to wrap this executeTask in a try-catch block // TODO: we need to check if we want to wrap this executeTask in a try-catch block
task.execute() try {
logger.debug("[{}] Processor #{}-{} finished {}", Thread.currentThread().name, id, name, task) task.execute()
} catch (e: Throwable) {
logger.error("Exception during task execution occurred. TaskID: ${task.taskID}")
} finally {
logger.debug("[{}] Processor #{}-{} finished {}", Thread.currentThread().name, id, name, task)
taskMap.remove(task.taskID)
}
} }
taskMap[task.taskID] = Pair(task, job) taskMap[task.taskID] = Pair(task, job)
} }
......
package de.fraunhofer.iem.dataprovider
import de.fraunhofer.iem.dataprovider.taskManager.Worker
import de.fraunhofer.iem.dataprovider.taskManager.events.Event
import de.fraunhofer.iem.dataprovider.taskManager.tasks.Task
import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import java.util.concurrent.atomic.AtomicInteger
class WorkerTest {
private class ExceptionTask(override val responseChannel: suspend (event: Event) -> Unit) : Task() {
override suspend fun execute() {
throw Exception("Execution crashed!")
}
}
private class WaitingTask(override val responseChannel: suspend (event: Event) -> Unit) : Task() {
override suspend fun execute() {
val testEvent = mockk<Event>()
responseChannel(testEvent)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun workerExceptionTest() = runTest {
val worker = Worker("Worker", this)
val counter = AtomicInteger()
suspend fun responseChannelDummy(event: Event): Unit {
val curr = counter.incrementAndGet()
println("counter $counter")
if (curr == 2) {
worker.close()
}
}
worker.addTask(WaitingTask(::responseChannelDummy))
worker.addTask(ExceptionTask(::responseChannelDummy))
worker.addTask(WaitingTask(::responseChannelDummy))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment

Consent

On this website, we use the web analytics service Matomo to analyze and review the use of our website. Through the collected statistics, we can improve our offerings and make them more appealing for you. Here, you can decide whether to allow us to process your data and set corresponding cookies for these purposes, in addition to technically necessary cookies. Further information on data protection—especially regarding "cookies" and "Matomo"—can be found in our privacy policy. You can withdraw your consent at any time.