diff --git a/Cargo.lock b/Cargo.lock index efcd38742aee32f0744517d2722134be2efcacba..380b461ed32d48de4cc4e9647fb76ca275d33b2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3210,9 +3210,11 @@ checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" dependencies = [ "base64 0.22.1", "js-sys", + "pem", "ring", "serde", "serde_json", + "simple_asn1", ] [[package]] @@ -3763,12 +3765,14 @@ version = "0.14.0" dependencies = [ "anyhow", "bytes", + "chrono", "clap", "cocoa", "config", "env_logger 0.11.7", "futures", "gstreamer", + "jsonwebtoken", "lapin", "log", "objc", @@ -4109,6 +4113,16 @@ dependencies = [ "hmac", ] +[[package]] +name = "pem" +version = "3.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" +dependencies = [ + "base64 0.22.1", + "serde", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5341,6 +5355,18 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "simple_asn1" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.12", + "time", +] + [[package]] name = "siphasher" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 283ec6afe27ac3a8177211bbffd4842adb187b76..76babac825a641785d6dfa82df5170c54906eda2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,13 @@ build = "build.rs" [dependencies] anyhow = { version = "1", features = ["backtrace"] } bytes = "1" +chrono = "0.4" compositor = { package = "opentalk-compositor", version = "0.14.0" } config = { version = "0.15", default-features = false, features = ["toml"] } env_logger = "0.11" futures = "0.3" gst = { package = "gstreamer", version = "0.23" } +jsonwebtoken = "9.3" lapin = { version = "2.3", default-features = false, features = [ "rustls-webpki-roots-certs", ] } diff --git a/src/http.rs b/src/http.rs index 749931d6c58352a6131bfbbb895cf0295d0e1b1a..5615034aec680c2c106da56825db714cf73de90b 100644 --- a/src/http.rs +++ b/src/http.rs @@ -4,10 +4,12 @@ //! HTTP calls made by this library (except for websockets) -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use anyhow::{bail, Context, Result}; +use chrono::{serde::ts_seconds::deserialize as from_ts, DateTime, Utc}; use futures::{SinkExt, StreamExt}; +use jsonwebtoken::{self, decode, DecodingKey, Validation}; use openidconnect::{ core::{ CoreAuthDisplay, CoreAuthPrompt, CoreClient, CoreErrorResponseType, CoreGenderClaim, @@ -139,24 +141,6 @@ impl HttpClient { }) } - async fn refresh_access_tokens(&self, invalid_token: AccessToken) -> Result<()> { - let mut token = self.access_token.write().await; - - if token.secret() != invalid_token.secret() { - return Ok(()); - } - - let response = self - .oidc - .exchange_client_credentials() - .request_async(&self.client) - .await?; - - *token = response.access_token().clone(); - - Ok(()) - } - pub(crate) async fn start( &self, settings: &ControllerSettings, @@ -165,45 +149,32 @@ impl HttpClient { ) -> Result<String> { let uri = format!("{}/services/recording/start", settings.v1_api_base_url()); - // max 10 authentication tries - for _ in 0..10 { - let token = { - // Scope the access to the lock to avoid holding it for the entire loop-body - let l = self.access_token.read().await; - l.clone() - }; + let token = self.get_valid_access_token().await?; - let response = self - .client - .post(&uri) - .bearer_auth(token.secret()) - .json(&StartRequest { - room_id, - breakout_room, - }) - .send() - .await?; + let response = self + .client + .post(&uri) + .bearer_auth(token.secret()) + .json(&StartRequest { + room_id, + breakout_room, + }) + .send() + .await?; - match response.status() { - StatusCode::OK => { - let response = response.json::<StartResponse>().await?; + match response.status() { + StatusCode::OK => { + let response = response.json::<StartResponse>().await?; - return Ok(response.ticket); - } - StatusCode::UNAUTHORIZED => { - let ApiError { code } = response.json::<ApiError>().await?; + Ok(response.ticket) + } + StatusCode::UNAUTHORIZED => { + let ApiError { code } = response.json::<ApiError>().await?; - if code == "unauthorized" { - self.refresh_access_tokens(token).await?; - } else { - bail!(InvalidCredentials); - } - } - code => bail!("unexpected status code {code:?}"), + bail!("failed to authorize, {code:?}"); } + code => bail!("unexpected status code {code:?}"), } - - bail!("failed to authorize") } pub(crate) async fn upload_render( @@ -226,19 +197,10 @@ impl HttpClient { urlencoding::encode(×tamp.to_string()), ); + let token = self.get_valid_access_token().await?; + log::debug!("connect websocket to {uri}"); - let ws_stream = if let Ok((ws_stream, _response)) = - self.websocket_connect(uri.clone()).await - { - ws_stream - } else { - log::debug!("Unable to connect to the websocket, refresh access token and retry it"); - self.refresh_access_tokens(self.access_token.read().await.clone()) - .await - .context("unable to refresh the access token")?; - - self.websocket_connect(uri).await?.0 - }; + let (ws_stream, _response) = self.websocket_connect(uri.clone(), token).await?; let (mut tx, mut rx) = ws_stream.split(); let mut last_pong = Instant::now(); @@ -293,18 +255,31 @@ impl HttpClient { Ok(()) } + async fn get_valid_access_token(&self) -> Result<AccessToken> { + let mut token = self.access_token.write().await; + + // Refresh access token if it's expired + if check_if_token_is_expired(token.secret())? { + let response = self + .oidc + .exchange_client_credentials() + .request_async(&self.client) + .await?; + + *token = response.access_token().clone(); + } + + Ok(token.clone()) + } + async fn websocket_connect( &self, uri: String, + token: AccessToken, ) -> Result<( WebSocketStream<tt::MaybeTlsStream<tokio::net::TcpStream>>, openidconnect::http::Response<std::option::Option<Vec<u8>>>, )> { - let token = { - let l = self.access_token.read().await; - l.clone() - }; - let mut request = uri.into_client_request().unwrap(); request.headers_mut().insert( reqwest::header::AUTHORIZATION, @@ -316,10 +291,25 @@ impl HttpClient { } } -/// Error returned by the `start` function when the given digits were incorrect -#[derive(Debug, thiserror::Error)] -#[error("given credentials were invalid")] -pub(crate) struct InvalidCredentials; +#[derive(Deserialize)] +struct TokenClaims { + #[serde(deserialize_with = "from_ts")] + exp: DateTime<Utc>, +} + +/// Check if the token is expired or is expiring within a minute +fn check_if_token_is_expired(token: &str) -> Result<bool> { + let mut validation = Validation::default(); + validation.insecure_disable_signature_validation(); + validation.validate_exp = false; + validation.validate_aud = false; + + let token = decode::<TokenClaims>(token, &DecodingKey::from_secret(&[]), &validation)?; + + let now = DateTime::<Utc>::from(SystemTime::now()); + + Ok(now > token.claims.exp + Duration::from_secs(60)) +} #[derive(Serialize)] struct StartRequest<'s> { diff --git a/src/main.rs b/src/main.rs index 9eec333d89ccee1b7604197247f93ce8d0278654..76494ca5e823048d30bb946eb0de1be93ed2d7b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,7 +53,6 @@ fn check_plugins() -> Result<()> { "compositor", "debug", "dtls", - "fdkaac", "pango", "png", "rtp",