From ccd13c6a7d5e4a539473f400ce3fa3bd7e0d0bdb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Dani=C3=A9l=20Kerkmann?= <d.kerkmann@opentalk.eu>
Date: Tue, 25 Mar 2025 14:51:52 +0100
Subject: [PATCH] fix: recorder timeout for first recording attempt

The access token which is used for the websocket connection and http
requests to the controller will be refreshed before it's expired.
This is useful, because we are not sending an expired token to the
controller.

Closes #204
---
 Cargo.lock  |  26 ++++++++++
 Cargo.toml  |   2 +
 src/http.rs | 136 ++++++++++++++++++++++++----------------------------
 src/main.rs |   1 -
 4 files changed, 91 insertions(+), 74 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index efcd387..380b461 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3210,9 +3210,11 @@ checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde"
 dependencies = [
  "base64 0.22.1",
  "js-sys",
+ "pem",
  "ring",
  "serde",
  "serde_json",
+ "simple_asn1",
 ]
 
 [[package]]
@@ -3763,12 +3765,14 @@ version = "0.14.0"
 dependencies = [
  "anyhow",
  "bytes",
+ "chrono",
  "clap",
  "cocoa",
  "config",
  "env_logger 0.11.7",
  "futures",
  "gstreamer",
+ "jsonwebtoken",
  "lapin",
  "log",
  "objc",
@@ -4109,6 +4113,16 @@ dependencies = [
  "hmac",
 ]
 
+[[package]]
+name = "pem"
+version = "3.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3"
+dependencies = [
+ "base64 0.22.1",
+ "serde",
+]
+
 [[package]]
 name = "pem-rfc7468"
 version = "0.7.0"
@@ -5341,6 +5355,18 @@ version = "0.3.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
 
+[[package]]
+name = "simple_asn1"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb"
+dependencies = [
+ "num-bigint",
+ "num-traits",
+ "thiserror 2.0.12",
+ "time",
+]
+
 [[package]]
 name = "siphasher"
 version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 283ec6a..76babac 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,11 +16,13 @@ build = "build.rs"
 [dependencies]
 anyhow = { version = "1", features = ["backtrace"] }
 bytes = "1"
+chrono = "0.4"
 compositor = { package = "opentalk-compositor", version = "0.14.0" }
 config = { version = "0.15", default-features = false, features = ["toml"] }
 env_logger = "0.11"
 futures = "0.3"
 gst = { package = "gstreamer", version = "0.23" }
+jsonwebtoken = "9.3"
 lapin = { version = "2.3", default-features = false, features = [
   "rustls-webpki-roots-certs",
 ] }
diff --git a/src/http.rs b/src/http.rs
index 749931d..5615034 100644
--- a/src/http.rs
+++ b/src/http.rs
@@ -4,10 +4,12 @@
 
 //! HTTP calls made by this library (except for websockets)
 
-use std::time::{Duration, Instant};
+use std::time::{Duration, Instant, SystemTime};
 
 use anyhow::{bail, Context, Result};
+use chrono::{serde::ts_seconds::deserialize as from_ts, DateTime, Utc};
 use futures::{SinkExt, StreamExt};
+use jsonwebtoken::{self, decode, DecodingKey, Validation};
 use openidconnect::{
     core::{
         CoreAuthDisplay, CoreAuthPrompt, CoreClient, CoreErrorResponseType, CoreGenderClaim,
@@ -139,24 +141,6 @@ impl HttpClient {
         })
     }
 
-    async fn refresh_access_tokens(&self, invalid_token: AccessToken) -> Result<()> {
-        let mut token = self.access_token.write().await;
-
-        if token.secret() != invalid_token.secret() {
-            return Ok(());
-        }
-
-        let response = self
-            .oidc
-            .exchange_client_credentials()
-            .request_async(&self.client)
-            .await?;
-
-        *token = response.access_token().clone();
-
-        Ok(())
-    }
-
     pub(crate) async fn start(
         &self,
         settings: &ControllerSettings,
@@ -165,45 +149,32 @@ impl HttpClient {
     ) -> Result<String> {
         let uri = format!("{}/services/recording/start", settings.v1_api_base_url());
 
-        // max 10 authentication tries
-        for _ in 0..10 {
-            let token = {
-                // Scope the access to the lock to avoid holding it for the entire loop-body
-                let l = self.access_token.read().await;
-                l.clone()
-            };
+        let token = self.get_valid_access_token().await?;
 
-            let response = self
-                .client
-                .post(&uri)
-                .bearer_auth(token.secret())
-                .json(&StartRequest {
-                    room_id,
-                    breakout_room,
-                })
-                .send()
-                .await?;
+        let response = self
+            .client
+            .post(&uri)
+            .bearer_auth(token.secret())
+            .json(&StartRequest {
+                room_id,
+                breakout_room,
+            })
+            .send()
+            .await?;
 
-            match response.status() {
-                StatusCode::OK => {
-                    let response = response.json::<StartResponse>().await?;
+        match response.status() {
+            StatusCode::OK => {
+                let response = response.json::<StartResponse>().await?;
 
-                    return Ok(response.ticket);
-                }
-                StatusCode::UNAUTHORIZED => {
-                    let ApiError { code } = response.json::<ApiError>().await?;
+                Ok(response.ticket)
+            }
+            StatusCode::UNAUTHORIZED => {
+                let ApiError { code } = response.json::<ApiError>().await?;
 
-                    if code == "unauthorized" {
-                        self.refresh_access_tokens(token).await?;
-                    } else {
-                        bail!(InvalidCredentials);
-                    }
-                }
-                code => bail!("unexpected status code {code:?}"),
+                bail!("failed to authorize, {code:?}");
             }
+            code => bail!("unexpected status code {code:?}"),
         }
-
-        bail!("failed to authorize")
     }
 
     pub(crate) async fn upload_render(
@@ -226,19 +197,10 @@ impl HttpClient {
             urlencoding::encode(&timestamp.to_string()),
         );
 
+        let token = self.get_valid_access_token().await?;
+
         log::debug!("connect websocket to {uri}");
-        let ws_stream = if let Ok((ws_stream, _response)) =
-            self.websocket_connect(uri.clone()).await
-        {
-            ws_stream
-        } else {
-            log::debug!("Unable to connect to the websocket, refresh access token and retry it");
-            self.refresh_access_tokens(self.access_token.read().await.clone())
-                .await
-                .context("unable to refresh the access token")?;
-
-            self.websocket_connect(uri).await?.0
-        };
+        let (ws_stream, _response) = self.websocket_connect(uri.clone(), token).await?;
         let (mut tx, mut rx) = ws_stream.split();
 
         let mut last_pong = Instant::now();
@@ -293,18 +255,31 @@ impl HttpClient {
         Ok(())
     }
 
+    async fn get_valid_access_token(&self) -> Result<AccessToken> {
+        let mut token = self.access_token.write().await;
+
+        // Refresh access token if it's expired
+        if check_if_token_is_expired(token.secret())? {
+            let response = self
+                .oidc
+                .exchange_client_credentials()
+                .request_async(&self.client)
+                .await?;
+
+            *token = response.access_token().clone();
+        }
+
+        Ok(token.clone())
+    }
+
     async fn websocket_connect(
         &self,
         uri: String,
+        token: AccessToken,
     ) -> Result<(
         WebSocketStream<tt::MaybeTlsStream<tokio::net::TcpStream>>,
         openidconnect::http::Response<std::option::Option<Vec<u8>>>,
     )> {
-        let token = {
-            let l = self.access_token.read().await;
-            l.clone()
-        };
-
         let mut request = uri.into_client_request().unwrap();
         request.headers_mut().insert(
             reqwest::header::AUTHORIZATION,
@@ -316,10 +291,25 @@ impl HttpClient {
     }
 }
 
-/// Error returned by the `start` function when the given digits were incorrect
-#[derive(Debug, thiserror::Error)]
-#[error("given credentials were invalid")]
-pub(crate) struct InvalidCredentials;
+#[derive(Deserialize)]
+struct TokenClaims {
+    #[serde(deserialize_with = "from_ts")]
+    exp: DateTime<Utc>,
+}
+
+/// Check if the token is expired or is expiring within a minute
+fn check_if_token_is_expired(token: &str) -> Result<bool> {
+    let mut validation = Validation::default();
+    validation.insecure_disable_signature_validation();
+    validation.validate_exp = false;
+    validation.validate_aud = false;
+
+    let token = decode::<TokenClaims>(token, &DecodingKey::from_secret(&[]), &validation)?;
+
+    let now = DateTime::<Utc>::from(SystemTime::now());
+
+    Ok(now > token.claims.exp + Duration::from_secs(60))
+}
 
 #[derive(Serialize)]
 struct StartRequest<'s> {
diff --git a/src/main.rs b/src/main.rs
index 9eec333..76494ca 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -53,7 +53,6 @@ fn check_plugins() -> Result<()> {
         "compositor",
         "debug",
         "dtls",
-        "fdkaac",
         "pango",
         "png",
         "rtp",
-- 
GitLab