From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 2DCD01FF170 for ; Tue, 3 Dec 2024 12:28:48 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id A72491CF1; Tue, 3 Dec 2024 12:28:54 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Tue, 3 Dec 2024 12:27:55 +0100 Message-Id: <20241203112756.63872-2-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20241203112756.63872-1-c.ebner@proxmox.com> References: <20241203112756.63872-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.030 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup 2/3] api: reader: gracefully handle reader client disconnects X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Currently, if a reader client disconnects after finishing its work, the connection will be closed by the client without notifying the server. The future handling the connection on then server side will then return with a connection error, and in consequence the reader worker task will log with error state. This can cause confusion [0], as this is not an error but normal behaviour. Instead of failing, provide an api endpoint to request gracefully closing of the connection by the client. This will trigger a signal to the future handling the connection, to gracefully close it. Report in the community forum: [0] https://forum.proxmox.com/threads/158306/ Signed-off-by: Christian Ebner --- Cargo.toml | 1 + src/api2/reader/environment.rs | 12 +++++- src/api2/reader/mod.rs | 74 ++++++++++++++++++++++++++++++++-- 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d14f320a6..e3ddb1942 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,6 +185,7 @@ num-traits.workspace = true once_cell.workspace = true openssl.workspace = true percent-encoding.workspace = true +pin-project-lite.workspace = true regex.workspace = true rustyline.workspace = true serde.workspace = true diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs index 3b2f06f43..ac8970865 100644 --- a/src/api2/reader/environment.rs +++ b/src/api2/reader/environment.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use serde_json::{json, Value}; +use tokio::sync::oneshot; use proxmox_router::{RpcEnvironment, RpcEnvironmentType}; @@ -23,6 +24,7 @@ pub struct ReaderEnvironment { pub worker: Arc, pub datastore: Arc, pub backup_dir: BackupDir, + connection_shutdown_trigger: Arc>>>, allowed_chunks: Arc>>, } @@ -33,6 +35,7 @@ impl ReaderEnvironment { worker: Arc, datastore: Arc, backup_dir: BackupDir, + connection_shutdown_trigger: oneshot::Sender<()>, ) -> Self { Self { result_attributes: json!({}), @@ -43,6 +46,7 @@ impl ReaderEnvironment { debug: tracing::enabled!(tracing::Level::DEBUG), formatter: JSON_FORMATTER, backup_dir, + connection_shutdown_trigger: Arc::new(Mutex::new(Some(connection_shutdown_trigger))), allowed_chunks: Arc::new(RwLock::new(HashSet::new())), } } @@ -69,6 +73,12 @@ impl ReaderEnvironment { pub fn check_chunk_access(&self, digest: [u8; 32]) -> bool { self.allowed_chunks.read().unwrap().contains(&digest) } + + pub fn connection_shutdown(&self) { + if let Some(trigger) = self.connection_shutdown_trigger.lock().unwrap().take() { + let _ = trigger.send(()); + } + } } impl RpcEnvironment for ReaderEnvironment { diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs index 50f80de43..973b8f257 100644 --- a/src/api2/reader/mod.rs +++ b/src/api2/reader/mod.rs @@ -1,13 +1,20 @@ //! Backup reader/restore protocol (HTTP2 upgrade) +use std::pin::Pin; +use std::task::Poll; + use anyhow::{bail, format_err, Error}; use futures::*; use hex::FromHex; use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE}; use hyper::http::request::Parts; +use hyper::server::conn::Connection; +use hyper::upgrade::Upgraded; use hyper::{Body, Request, Response, StatusCode}; +use pin_project_lite::pin_project; use serde::Deserialize; use serde_json::Value; +use tokio::sync::oneshot; use proxmox_rest_server::{H2Service, WorkerTask}; use proxmox_router::{ @@ -156,12 +163,15 @@ fn upgrade_to_backup_reader_protocol( move |worker| async move { let _guard = _guard; + let (connection_shutdown_trigger, connection_shutdown_receiver) = + oneshot::channel::<()>(); let mut env = ReaderEnvironment::new( env_type, auth_id, worker.clone(), datastore, backup_dir, + connection_shutdown_trigger, ); env.debug = debug; @@ -192,9 +202,10 @@ fn upgrade_to_backup_reader_protocol( http.http2_initial_connection_window_size(window_size); http.http2_max_frame_size(4 * 1024 * 1024); - http.serve_connection(conn, service) - .map_err(Error::from) - .await + let connection = http.serve_connection(conn, service); + let graceful_shutdown_connection = + GracefulShutdownConnection::new(connection, connection_shutdown_receiver); + graceful_shutdown_connection.await.map_err(Error::from) }; futures::select! { @@ -222,12 +233,53 @@ fn upgrade_to_backup_reader_protocol( .boxed() } +pin_project! { + struct GracefulShutdownConnection { + #[pin] + connection: Connection, ExecInheritLogContext>, + #[pin] + shutdown_receiver: oneshot::Receiver<()>, + shutdown_state: Option>, + } +} + +impl Future for GracefulShutdownConnection { + type Output = Result<(), hyper::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut this = self.project(); + + if this.shutdown_state.is_none() { + if let Poll::Ready(shutdown) = this.shutdown_receiver.poll(cx) { + let _ = this.shutdown_state.insert(shutdown); + this.connection.as_mut().graceful_shutdown(); + } + } + + this.connection.poll(cx) + } +} + +impl GracefulShutdownConnection { + fn new( + connection: Connection, ExecInheritLogContext>, + shutdown_receiver: oneshot::Receiver<()>, + ) -> Self { + Self { + connection, + shutdown_receiver, + shutdown_state: None, + } + } +} + const READER_API_SUBDIRS: SubdirMap = &[ ("chunk", &Router::new().download(&API_METHOD_DOWNLOAD_CHUNK)), ( "download", &Router::new().download(&API_METHOD_DOWNLOAD_FILE), ), + ("finish", &Router::new().post(&API_METHOD_FINISH)), ("speedtest", &Router::new().download(&API_METHOD_SPEEDTEST)), ]; @@ -347,6 +399,22 @@ fn download_chunk( .boxed() } +#[sortable] +pub const API_METHOD_FINISH: ApiMethod = ApiMethod::new( + &ApiHandler::Sync(&finish), + &ObjectSchema::new("Signal the reader instance is finished", &[]), +); + +fn finish( + _param: Value, + _info: &ApiMethod, + rpcenv: &mut dyn RpcEnvironment, +) -> Result { + let env: &ReaderEnvironment = rpcenv.as_ref(); + env.connection_shutdown(); + Ok(Value::Null) +} + /* this is too slow fn download_chunk_old( _parts: Parts, -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel