From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id DED191FF133 for ; Mon, 11 May 2026 15:46:25 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1F24A1AD68; Mon, 11 May 2026 15:46:23 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v2 3/3] fix #6373: HTTP level client heartbeat for proxy connection keepalive Date: Mon, 11 May 2026 15:46:10 +0200 Message-ID: <20260511134610.675164-4-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260511134610.675164-1-c.ebner@proxmox.com> References: <20260511134610.675164-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1778507062780 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.070 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox.com,lib.rs,haproxy.com] Message-ID-Hash: MVDA67JDBI624YKLY2QXX37GQRCEX3R6 X-Message-ID-Hash: MVDA67JDBI624YKLY2QXX37GQRCEX3R6 X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Backup readers and writers can potentially have long periods of idle connections, e.g. a reader if a backup snapshot has been mounted and all relevant chunks are locally cached, a backup session with previous metadata archive not needing to fetch new contents while the backup is ongoing or a backup writer to a datastore with slow backend storage. Proxies like e.g. HAProxy might however close idle connections for better resource handling [0,1], even multiplexed HTTP/2 connections as are being used for the Proxmox Backup Sever backup reader/writer protocol. Therefore, perform heartbeat traffic in the HTTP/2 client when no other requests are being send, tracked by an internal timeout, being rest before sending a request. Since older servers do not provide the new API path, ignore errors as the response is not strictly necessary for the connection to remain established. The heartbeat is currently only being performed if the timeout value in seconds is given via the PBS_READER_HEARTBEAT_TIMEOUT for readers and PBS_WRITER_HEARTBEAT_TIMEOUT for writers. Testing was performed using HAProxy with the Proxmox Backup Server as backend using the following 5 second connection idle timeouts as configuration parameters in haproxy.cfg: ``` ... defaults ... timeout connect 5000 timeout client 5000 timeout server 5000 .. frontend http-in bind *:8007 mode tcp default_backend pbs backend pbs mode tcp http-reuse always server pbs :8007 verify none ``` As command invocation: ``` PBS_READER_HEARTBEAT_TIMEOUT=1 proxmox-backup-client mount \ --repository @: --verbose ``` [0] https://www.haproxy.com/documentation/haproxy-configuration-manual/latest/#4-timeout%20client [1] https://www.haproxy.com/documentation/haproxy-configuration-manual/latest/#4.2-timeout%20http-keep-alive Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=6373 Signed-off-by: Christian Ebner --- pbs-client/src/backup_reader.rs | 67 ++++++++++++++-- pbs-client/src/backup_writer.rs | 131 +++++++++++++++++++++++++++++--- pbs-client/src/lib.rs | 25 ++++++ 3 files changed, 203 insertions(+), 20 deletions(-) diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs index 88cba599b..53596511d 100644 --- a/pbs-client/src/backup_reader.rs +++ b/pbs-client/src/backup_reader.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use futures::future::AbortHandle; use serde_json::{json, Value}; +use tokio::sync::mpsc; use pbs_api_types::{BackupArchiveName, BackupDir, BackupNamespace, MANIFEST_BLOB_NAME}; use pbs_datastore::data_blob::DataBlob; @@ -16,28 +17,61 @@ use pbs_datastore::{BackupManifest, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1}; use pbs_tools::crypt_config::CryptConfig; use pbs_tools::sha::sha256; -use super::{H2Client, HttpClient}; +use super::{parse_optional_heartbeat_env, H2Client, HeartBeatMsg, HttpClient}; /// Backup Reader pub struct BackupReader { h2: H2Client, abort: AbortHandle, crypt_config: Option>, + heartbeat: Option>, } impl Drop for BackupReader { fn drop(&mut self) { + self.send_msg_heartbeat(HeartBeatMsg::Abort); self.abort.abort(); } } impl BackupReader { - fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option>) -> Arc { - Arc::new(Self { - h2, - abort, - crypt_config, - }) + fn new( + h2: H2Client, + abort: AbortHandle, + crypt_config: Option>, + ) -> Result, Error> { + if let Some(timeout) = parse_optional_heartbeat_env("PBS_READER_HEARTBEAT_TIMEOUT")? { + let (send, mut recv) = mpsc::channel(1); + let backup_reader = Arc::new(Self { + h2, + abort, + crypt_config, + heartbeat: Some(send), + }); + let reader_cloned = Arc::clone(&backup_reader); + + tokio::spawn(async move { + loop { + match tokio::time::timeout(timeout, recv.recv()).await { + Ok(Some(HeartBeatMsg::Reset)) => (), + Ok(Some(HeartBeatMsg::Abort)) | Ok(None) => break, + Err(_elapsed) => { + // connection idle timeout reached, send heatbeat + let _ = reader_cloned.h2.get("heartbeat", None).await; + } + } + } + }); + + Ok(backup_reader) + } else { + Ok(Arc::new(Self { + h2, + abort, + crypt_config, + heartbeat: None, + })) + } } /// Create a new instance by upgrading the connection at '/api2/json/reader' @@ -74,26 +108,30 @@ impl BackupReader { .start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) .await?; - Ok(BackupReader::new(h2, abort, crypt_config)) + BackupReader::new(h2, abort, crypt_config) } /// Execute a GET request pub async fn get(&self, path: &str, param: Option) -> Result { + self.send_msg_heartbeat(HeartBeatMsg::Reset); self.h2.get(path, param).await } /// Execute a PUT request pub async fn put(&self, path: &str, param: Option) -> Result { + self.send_msg_heartbeat(HeartBeatMsg::Reset); self.h2.put(path, param).await } /// Execute a POST request pub async fn post(&self, path: &str, param: Option) -> Result { + self.send_msg_heartbeat(HeartBeatMsg::Reset); self.h2.post(path, param).await } /// Execute a GET request and send output to a writer pub async fn download(&self, file_name: &str, output: W) -> Result<(), Error> { + self.send_msg_heartbeat(HeartBeatMsg::Reset); let path = "download"; let param = json!({ "file-name": file_name }); self.h2.download(path, Some(param), output).await @@ -103,6 +141,7 @@ impl BackupReader { /// /// This writes random data, and is only useful to test download speed. pub async fn speedtest(&self, output: W) -> Result<(), Error> { + self.send_msg_heartbeat(HeartBeatMsg::Reset); self.h2.download("speedtest", None, output).await } @@ -112,12 +151,14 @@ impl BackupReader { digest: &[u8; 32], output: W, ) -> Result<(), Error> { + self.send_msg_heartbeat(HeartBeatMsg::Reset); let path = "chunk"; let param = json!({ "digest": hex::encode(digest) }); self.h2.download(path, Some(param), output).await } pub fn force_close(self) { + self.send_msg_heartbeat(HeartBeatMsg::Abort); self.abort.abort(); } @@ -205,4 +246,14 @@ impl BackupReader { Ok(index) } + + /// Send given message to the heartbeat closure. + /// + /// All errors are being ignored, since they cannot be handled anyways. + fn send_msg_heartbeat(&self, msg: HeartBeatMsg) { + let _ = self + .heartbeat + .as_ref() + .map(|heartbeat| heartbeat.try_send(msg)); + } } diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index 49aff3fdd..4c246f25c 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -1,12 +1,15 @@ use std::collections::HashSet; use std::future::Future; +use std::io::Write; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; use anyhow::{bail, format_err, Error}; +use bytes::Bytes; use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt}; use futures::stream::{Stream, StreamExt, TryStreamExt}; +use hyper::http::Request; use openssl::sha::Sha256; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; @@ -28,14 +31,82 @@ use proxmox_human_byte::HumanByte; use proxmox_log::{debug, enabled, info, trace, warn, Level}; use proxmox_time::TimeSpan; +use crate::parse_optional_heartbeat_env; + use super::backup_stats::{BackupStats, UploadCounters, UploadStats}; use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; -use super::{H2Client, HttpClient}; +use super::{H2Client, HeartBeatMsg, HttpClient}; + +#[derive(Clone)] +struct H2ClientWithHeartbeat { + inner: H2Client, + heartbeat: Option>, +} + +impl H2ClientWithHeartbeat { + async fn get(&self, path: &str, param: Option) -> Result { + self.send_msg_heartbeat(HeartBeatMsg::Reset); + self.inner.get(path, param).await + } + + async fn put(&self, path: &str, param: Option) -> Result { + self.send_msg_heartbeat(HeartBeatMsg::Reset); + self.inner.put(path, param).await + } + + async fn post(&self, path: &str, param: Option) -> Result { + self.send_msg_heartbeat(HeartBeatMsg::Reset); + self.inner.post(path, param).await + } + + async fn upload( + &self, + method: &str, + path: &str, + param: Option, + content_type: &str, + data: Vec, + ) -> Result { + self.send_msg_heartbeat(HeartBeatMsg::Reset); + self.inner + .upload(method, path, param, content_type, data) + .await + } + + async fn download( + &self, + path: &str, + param: Option, + output: W, + ) -> Result<(), Error> { + self.send_msg_heartbeat(HeartBeatMsg::Reset); + self.inner.download(path, param, output).await + } + + fn send_request( + &self, + request: Request<()>, + data: Option, + ) -> impl Future> { + self.send_msg_heartbeat(HeartBeatMsg::Reset); + self.inner.send_request(request, data) + } + + /// Send given message to the heartbeat closure. + /// + /// All errors are being ignored, since they cannot be handled anyways. + fn send_msg_heartbeat(&self, msg: HeartBeatMsg) { + let _ = self + .heartbeat + .as_ref() + .map(|heartbeat| heartbeat.try_send(msg)); + } +} pub struct BackupWriter { - h2: H2Client, + h2: H2ClientWithHeartbeat, abort: AbortHandle, crypt_config: Option>, } @@ -101,12 +172,47 @@ pub struct BackupWriterOptions<'a> { } impl BackupWriter { - fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option>) -> Arc { - Arc::new(Self { - h2, - abort, - crypt_config, - }) + fn new( + h2: H2Client, + abort: AbortHandle, + crypt_config: Option>, + ) -> Result, Error> { + if let Some(timeout) = parse_optional_heartbeat_env("PBS_WRITER_HEARTBEAT_TIMEOUT")? { + let (send, mut recv) = mpsc::channel(1); + let backup_writer = Arc::new(Self { + h2: H2ClientWithHeartbeat { + inner: h2, + heartbeat: Some(send), + }, + abort, + crypt_config, + }); + let writer_cloned = Arc::clone(&backup_writer); + + tokio::spawn(async move { + loop { + match tokio::time::timeout(timeout, recv.recv()).await { + Ok(Some(HeartBeatMsg::Reset)) => (), + Ok(Some(HeartBeatMsg::Abort)) | Ok(None) => break, + Err(_elapsed) => { + // connection idle timeout reached, send heatbeat + let _ = writer_cloned.h2.inner.get("heartbeat", None).await; + } + } + } + }); + + Ok(backup_writer) + } else { + Ok(Arc::new(Self { + h2: H2ClientWithHeartbeat { + inner: h2, + heartbeat: None, + }, + abort, + crypt_config, + })) + } } #[allow(clippy::too_many_arguments)] @@ -143,7 +249,7 @@ impl BackupWriter { .start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())) .await?; - Ok(BackupWriter::new(h2, abort, writer_options.crypt_config)) + BackupWriter::new(h2, abort, writer_options.crypt_config) } pub async fn get(&self, path: &str, param: Option) -> Result { @@ -209,6 +315,7 @@ impl BackupWriter { } pub fn cancel(&self) { + self.h2.send_msg_heartbeat(HeartBeatMsg::Abort); self.abort.abort(); } @@ -583,7 +690,7 @@ impl BackupWriter { } fn append_chunk_queue( - h2: H2Client, + h2: H2ClientWithHeartbeat, wid: u64, path: String, uploaded: Arc, @@ -759,7 +866,7 @@ impl BackupWriter { // since this is a private method. #[allow(clippy::too_many_arguments)] fn upload_chunk_info_stream( - h2: H2Client, + h2: H2ClientWithHeartbeat, wid: u64, stream: impl Stream>, prefix: &str, @@ -857,7 +964,7 @@ impl BackupWriter { } fn upload_merged_chunk_stream( - h2: H2Client, + h2: H2ClientWithHeartbeat, wid: u64, archive: &BackupArchiveName, prefix: &str, diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs index 4b8e4e4f4..e12a8ddb5 100644 --- a/pbs-client/src/lib.rs +++ b/pbs-client/src/lib.rs @@ -45,4 +45,29 @@ pub use chunk_stream::{ChunkStream, FixedChunkStream, InjectionData}; mod backup_stats; pub use backup_stats::BackupStats; +use std::str::FromStr; +use std::time::Duration; + +use anyhow::{Context, Error}; + pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120; + +/// Request reset or abort for backup reader/writer heartbeat traffic +pub enum HeartBeatMsg { + Reset, + Abort, +} + +/// Parse optional reader/writer heartbeat timeout value from environment variable as duration. +pub(crate) fn parse_optional_heartbeat_env(env_name: &str) -> Result, Error> { + match std::env::var(env_name) { + Ok(val) => { + let timeout = u64::from_str(&val) + .map(std::time::Duration::from_secs) + .context("failed to parse heartbeat timeout")?; + Ok(Some(timeout)) + } + Err(std::env::VarError::NotPresent) => Ok(None), + Err(err) => Err(Error::from(err).context("failed to parse heartbeat timeout")), + } +} -- 2.47.3