From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v2 3/3] fix #6373: HTTP level client heartbeat for proxy connection keepalive
Date: Mon, 11 May 2026 15:46:10 +0200 [thread overview]
Message-ID: <20260511134610.675164-4-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260511134610.675164-1-c.ebner@proxmox.com>
Backup readers and writers can potentially have long periods of idle
connections, e.g. a reader if a backup snapshot has been mounted and
all relevant chunks are locally cached, a backup session with
previous metadata archive not needing to fetch new contents while the
backup is ongoing or a backup writer to a datastore with slow backend
storage.
Proxies like e.g. HAProxy might however close idle connections for
better resource handling [0,1], even multiplexed HTTP/2 connections as
are being used for the Proxmox Backup Sever backup reader/writer
protocol.
Therefore, perform heartbeat traffic in the HTTP/2 client when no
other requests are being send, tracked by an internal timeout,
being rest before sending a request.
Since older servers do not provide the new API path, ignore errors
as the response is not strictly necessary for the connection to
remain established.
The heartbeat is currently only being performed if the timeout value
in seconds is given via the PBS_READER_HEARTBEAT_TIMEOUT for readers
and PBS_WRITER_HEARTBEAT_TIMEOUT for writers.
Testing was performed using HAProxy with the Proxmox Backup Server
as backend using the following 5 second connection idle timeouts
as configuration parameters in haproxy.cfg:
```
...
defaults
...
timeout connect 5000
timeout client 5000
timeout server 5000
..
frontend http-in
bind *:8007
mode tcp
default_backend pbs
backend pbs
mode tcp
http-reuse always
server pbs <PBS-IP>:8007 verify none
```
As command invocation:
```
PBS_READER_HEARTBEAT_TIMEOUT=1 proxmox-backup-client mount <snapshot> <archive> \
<mountpoint> --repository <user-and-realm>@<PROXY-IP>:<datastore> --verbose
```
[0] https://www.haproxy.com/documentation/haproxy-configuration-manual/latest/#4-timeout%20client
[1] https://www.haproxy.com/documentation/haproxy-configuration-manual/latest/#4.2-timeout%20http-keep-alive
Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=6373
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_reader.rs | 67 ++++++++++++++--
pbs-client/src/backup_writer.rs | 131 +++++++++++++++++++++++++++++---
pbs-client/src/lib.rs | 25 ++++++
3 files changed, 203 insertions(+), 20 deletions(-)
diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 88cba599b..53596511d 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -5,6 +5,7 @@ use std::sync::Arc;
use futures::future::AbortHandle;
use serde_json::{json, Value};
+use tokio::sync::mpsc;
use pbs_api_types::{BackupArchiveName, BackupDir, BackupNamespace, MANIFEST_BLOB_NAME};
use pbs_datastore::data_blob::DataBlob;
@@ -16,28 +17,61 @@ use pbs_datastore::{BackupManifest, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
use pbs_tools::crypt_config::CryptConfig;
use pbs_tools::sha::sha256;
-use super::{H2Client, HttpClient};
+use super::{parse_optional_heartbeat_env, H2Client, HeartBeatMsg, HttpClient};
/// Backup Reader
pub struct BackupReader {
h2: H2Client,
abort: AbortHandle,
crypt_config: Option<Arc<CryptConfig>>,
+ heartbeat: Option<mpsc::Sender<HeartBeatMsg>>,
}
impl Drop for BackupReader {
fn drop(&mut self) {
+ self.send_msg_heartbeat(HeartBeatMsg::Abort);
self.abort.abort();
}
}
impl BackupReader {
- fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
- Arc::new(Self {
- h2,
- abort,
- crypt_config,
- })
+ fn new(
+ h2: H2Client,
+ abort: AbortHandle,
+ crypt_config: Option<Arc<CryptConfig>>,
+ ) -> Result<Arc<Self>, Error> {
+ if let Some(timeout) = parse_optional_heartbeat_env("PBS_READER_HEARTBEAT_TIMEOUT")? {
+ let (send, mut recv) = mpsc::channel(1);
+ let backup_reader = Arc::new(Self {
+ h2,
+ abort,
+ crypt_config,
+ heartbeat: Some(send),
+ });
+ let reader_cloned = Arc::clone(&backup_reader);
+
+ tokio::spawn(async move {
+ loop {
+ match tokio::time::timeout(timeout, recv.recv()).await {
+ Ok(Some(HeartBeatMsg::Reset)) => (),
+ Ok(Some(HeartBeatMsg::Abort)) | Ok(None) => break,
+ Err(_elapsed) => {
+ // connection idle timeout reached, send heatbeat
+ let _ = reader_cloned.h2.get("heartbeat", None).await;
+ }
+ }
+ }
+ });
+
+ Ok(backup_reader)
+ } else {
+ Ok(Arc::new(Self {
+ h2,
+ abort,
+ crypt_config,
+ heartbeat: None,
+ }))
+ }
}
/// Create a new instance by upgrading the connection at '/api2/json/reader'
@@ -74,26 +108,30 @@ impl BackupReader {
.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
.await?;
- Ok(BackupReader::new(h2, abort, crypt_config))
+ BackupReader::new(h2, abort, crypt_config)
}
/// Execute a GET request
pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
self.h2.get(path, param).await
}
/// Execute a PUT request
pub async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
self.h2.put(path, param).await
}
/// Execute a POST request
pub async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
self.h2.post(path, param).await
}
/// Execute a GET request and send output to a writer
pub async fn download<W: Write + Send>(&self, file_name: &str, output: W) -> Result<(), Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
let path = "download";
let param = json!({ "file-name": file_name });
self.h2.download(path, Some(param), output).await
@@ -103,6 +141,7 @@ impl BackupReader {
///
/// This writes random data, and is only useful to test download speed.
pub async fn speedtest<W: Write + Send>(&self, output: W) -> Result<(), Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
self.h2.download("speedtest", None, output).await
}
@@ -112,12 +151,14 @@ impl BackupReader {
digest: &[u8; 32],
output: W,
) -> Result<(), Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
let path = "chunk";
let param = json!({ "digest": hex::encode(digest) });
self.h2.download(path, Some(param), output).await
}
pub fn force_close(self) {
+ self.send_msg_heartbeat(HeartBeatMsg::Abort);
self.abort.abort();
}
@@ -205,4 +246,14 @@ impl BackupReader {
Ok(index)
}
+
+ /// Send given message to the heartbeat closure.
+ ///
+ /// All errors are being ignored, since they cannot be handled anyways.
+ fn send_msg_heartbeat(&self, msg: HeartBeatMsg) {
+ let _ = self
+ .heartbeat
+ .as_ref()
+ .map(|heartbeat| heartbeat.try_send(msg));
+ }
}
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 49aff3fdd..4c246f25c 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -1,12 +1,15 @@
use std::collections::HashSet;
use std::future::Future;
+use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::{bail, format_err, Error};
+use bytes::Bytes;
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
+use hyper::http::Request;
use openssl::sha::Sha256;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
@@ -28,14 +31,82 @@ use proxmox_human_byte::HumanByte;
use proxmox_log::{debug, enabled, info, trace, warn, Level};
use proxmox_time::TimeSpan;
+use crate::parse_optional_heartbeat_env;
+
use super::backup_stats::{BackupStats, UploadCounters, UploadStats};
use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
-use super::{H2Client, HttpClient};
+use super::{H2Client, HeartBeatMsg, HttpClient};
+
+#[derive(Clone)]
+struct H2ClientWithHeartbeat {
+ inner: H2Client,
+ heartbeat: Option<mpsc::Sender<HeartBeatMsg>>,
+}
+
+impl H2ClientWithHeartbeat {
+ async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
+ self.inner.get(path, param).await
+ }
+
+ async fn put(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
+ self.inner.put(path, param).await
+ }
+
+ async fn post(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
+ self.inner.post(path, param).await
+ }
+
+ async fn upload(
+ &self,
+ method: &str,
+ path: &str,
+ param: Option<Value>,
+ content_type: &str,
+ data: Vec<u8>,
+ ) -> Result<Value, Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
+ self.inner
+ .upload(method, path, param, content_type, data)
+ .await
+ }
+
+ async fn download<W: Write + Send>(
+ &self,
+ path: &str,
+ param: Option<Value>,
+ output: W,
+ ) -> Result<(), Error> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
+ self.inner.download(path, param, output).await
+ }
+
+ fn send_request(
+ &self,
+ request: Request<()>,
+ data: Option<Bytes>,
+ ) -> impl Future<Output = Result<h2::client::ResponseFuture, Error>> {
+ self.send_msg_heartbeat(HeartBeatMsg::Reset);
+ self.inner.send_request(request, data)
+ }
+
+ /// Send given message to the heartbeat closure.
+ ///
+ /// All errors are being ignored, since they cannot be handled anyways.
+ fn send_msg_heartbeat(&self, msg: HeartBeatMsg) {
+ let _ = self
+ .heartbeat
+ .as_ref()
+ .map(|heartbeat| heartbeat.try_send(msg));
+ }
+}
pub struct BackupWriter {
- h2: H2Client,
+ h2: H2ClientWithHeartbeat,
abort: AbortHandle,
crypt_config: Option<Arc<CryptConfig>>,
}
@@ -101,12 +172,47 @@ pub struct BackupWriterOptions<'a> {
}
impl BackupWriter {
- fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
- Arc::new(Self {
- h2,
- abort,
- crypt_config,
- })
+ fn new(
+ h2: H2Client,
+ abort: AbortHandle,
+ crypt_config: Option<Arc<CryptConfig>>,
+ ) -> Result<Arc<Self>, Error> {
+ if let Some(timeout) = parse_optional_heartbeat_env("PBS_WRITER_HEARTBEAT_TIMEOUT")? {
+ let (send, mut recv) = mpsc::channel(1);
+ let backup_writer = Arc::new(Self {
+ h2: H2ClientWithHeartbeat {
+ inner: h2,
+ heartbeat: Some(send),
+ },
+ abort,
+ crypt_config,
+ });
+ let writer_cloned = Arc::clone(&backup_writer);
+
+ tokio::spawn(async move {
+ loop {
+ match tokio::time::timeout(timeout, recv.recv()).await {
+ Ok(Some(HeartBeatMsg::Reset)) => (),
+ Ok(Some(HeartBeatMsg::Abort)) | Ok(None) => break,
+ Err(_elapsed) => {
+ // connection idle timeout reached, send heatbeat
+ let _ = writer_cloned.h2.inner.get("heartbeat", None).await;
+ }
+ }
+ }
+ });
+
+ Ok(backup_writer)
+ } else {
+ Ok(Arc::new(Self {
+ h2: H2ClientWithHeartbeat {
+ inner: h2,
+ heartbeat: None,
+ },
+ abort,
+ crypt_config,
+ }))
+ }
}
#[allow(clippy::too_many_arguments)]
@@ -143,7 +249,7 @@ impl BackupWriter {
.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
.await?;
- Ok(BackupWriter::new(h2, abort, writer_options.crypt_config))
+ BackupWriter::new(h2, abort, writer_options.crypt_config)
}
pub async fn get(&self, path: &str, param: Option<Value>) -> Result<Value, Error> {
@@ -209,6 +315,7 @@ impl BackupWriter {
}
pub fn cancel(&self) {
+ self.h2.send_msg_heartbeat(HeartBeatMsg::Abort);
self.abort.abort();
}
@@ -583,7 +690,7 @@ impl BackupWriter {
}
fn append_chunk_queue(
- h2: H2Client,
+ h2: H2ClientWithHeartbeat,
wid: u64,
path: String,
uploaded: Arc<AtomicUsize>,
@@ -759,7 +866,7 @@ impl BackupWriter {
// since this is a private method.
#[allow(clippy::too_many_arguments)]
fn upload_chunk_info_stream(
- h2: H2Client,
+ h2: H2ClientWithHeartbeat,
wid: u64,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
prefix: &str,
@@ -857,7 +964,7 @@ impl BackupWriter {
}
fn upload_merged_chunk_stream(
- h2: H2Client,
+ h2: H2ClientWithHeartbeat,
wid: u64,
archive: &BackupArchiveName,
prefix: &str,
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 4b8e4e4f4..e12a8ddb5 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -45,4 +45,29 @@ pub use chunk_stream::{ChunkStream, FixedChunkStream, InjectionData};
mod backup_stats;
pub use backup_stats::BackupStats;
+use std::str::FromStr;
+use std::time::Duration;
+
+use anyhow::{Context, Error};
+
pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120;
+
+/// Request reset or abort for backup reader/writer heartbeat traffic
+pub enum HeartBeatMsg {
+ Reset,
+ Abort,
+}
+
+/// Parse optional reader/writer heartbeat timeout value from environment variable as duration.
+pub(crate) fn parse_optional_heartbeat_env(env_name: &str) -> Result<Option<Duration>, Error> {
+ match std::env::var(env_name) {
+ Ok(val) => {
+ let timeout = u64::from_str(&val)
+ .map(std::time::Duration::from_secs)
+ .context("failed to parse heartbeat timeout")?;
+ Ok(Some(timeout))
+ }
+ Err(std::env::VarError::NotPresent) => Ok(None),
+ Err(err) => Err(Error::from(err).context("failed to parse heartbeat timeout")),
+ }
+}
--
2.47.3
prev parent reply other threads:[~2026-05-11 13:46 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-11 13:46 [PATCH proxmox{,-backup} v2 0/3] fix #6373: HTTP level keepalive for http2 backup reader/writer connection Christian Ebner
2026-05-11 13:46 ` [PATCH proxmox v2 1/3] rest-server: add request logfilter by method and path in h2 service Christian Ebner
2026-05-11 13:46 ` [PATCH proxmox-backup v2 2/3] api: add heartbeat endpoint for backup reader/writer http/2 clients Christian Ebner
2026-05-11 13:46 ` Christian Ebner [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260511134610.675164-4-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox