* [pbs-devel] [PATCH v3 proxmox-backup 3/5] api: reader: handle reader client disconnects
2025-01-09 14:06 [pbs-devel] [PATCH v3 proxmox-backup 0/5] handle reader client disconnects Christian Ebner
2025-01-09 14:06 ` [pbs-devel] [PATCH v3 proxmox-backup 1/5] client: reader: drop dead code Christian Ebner
2025-01-09 14:06 ` [pbs-devel] [PATCH v3 proxmox-backup 2/5] backup debug: diff: refactor backup reader creation Christian Ebner
@ 2025-01-09 14:06 ` Christian Ebner
2025-01-09 14:06 ` [pbs-devel] [PATCH v3 proxmox-backup 4/5] client: reader: add finish method to signal client state to server Christian Ebner
2025-01-09 14:06 ` [pbs-devel] [PATCH v3 proxmox-backup 5/5] client: backup reader: call finish before dropping backup readers Christian Ebner
4 siblings, 0 replies; 6+ messages in thread
From: Christian Ebner @ 2025-01-09 14:06 UTC (permalink / raw)
To: pbs-devel
Currently, if a reader client disconnects after finishing its work,
the connection will be closed by the client without notifying the
server. The future handling the connection on then server side will
then return with a connection error, and in consequence the reader
worker task will log with error state. This can cause confusion [0],
as this is not an error but normal behaviour.
Instead of failing, provide an api endpoint for the client to signal
it has finished operation. The server sets the connection environment
state accordingly, and suppresses the disconnection error if the flag
has been set. This follows the same logic used for the backup writer,
introduced by commit b428af97 ("backup: avoid Transport endpoint is
not connected error").
Report in the community forum:
[0] https://forum.proxmox.com/threads/158306/
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 2:
- use api macro for handler declaration
src/api2/reader/environment.rs | 20 +++++++++++++++++++-
src/api2/reader/mod.rs | 27 +++++++++++++++++++++++----
2 files changed, 42 insertions(+), 5 deletions(-)
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f43..3cdc8e394 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -1,5 +1,5 @@
use std::collections::HashSet;
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, Mutex, RwLock};
use serde_json::{json, Value};
@@ -24,6 +24,11 @@ pub struct ReaderEnvironment {
pub datastore: Arc<DataStore>,
pub backup_dir: BackupDir,
allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
+ connection_state: Arc<Mutex<ConnectionState>>,
+}
+
+struct ConnectionState {
+ client_finished: bool,
}
impl ReaderEnvironment {
@@ -44,6 +49,9 @@ impl ReaderEnvironment {
formatter: JSON_FORMATTER,
backup_dir,
allowed_chunks: Arc::new(RwLock::new(HashSet::new())),
+ connection_state: Arc::new(Mutex::new(ConnectionState {
+ client_finished: false,
+ })),
}
}
@@ -69,6 +77,16 @@ impl ReaderEnvironment {
pub fn check_chunk_access(&self, digest: [u8; 32]) -> bool {
self.allowed_chunks.read().unwrap().contains(&digest)
}
+
+ pub(crate) fn client_finished(&self) -> bool {
+ let state = self.connection_state.lock().unwrap();
+ state.client_finished
+ }
+
+ pub(crate) fn finish(&self) {
+ let mut state = self.connection_state.lock().unwrap();
+ state.client_finished = true;
+ }
}
impl RpcEnvironment for ReaderEnvironment {
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 50f80de43..5744f59a0 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -14,7 +14,7 @@ use proxmox_router::{
http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
Router, RpcEnvironment, SubdirMap,
};
-use proxmox_schema::{BooleanSchema, ObjectSchema};
+use proxmox_schema::{api, BooleanSchema, ObjectSchema};
use proxmox_sortable_macro::sortable;
use proxmox_sys::fs::lock_dir_noblock_shared;
@@ -192,9 +192,16 @@ fn upgrade_to_backup_reader_protocol(
http.http2_initial_connection_window_size(window_size);
http.http2_max_frame_size(4 * 1024 * 1024);
- http.serve_connection(conn, service)
- .map_err(Error::from)
- .await
+ if let Err(err) = http.serve_connection(conn, service).await {
+ // Avoid Transport endpoint is not connected (os error 107)
+ // fixme: find a better way to test for that error
+ if !(err.to_string().starts_with("connection error")
+ && env2.client_finished())
+ {
+ return Err(Error::from(err));
+ }
+ }
+ Ok(())
};
futures::select! {
@@ -228,6 +235,7 @@ const READER_API_SUBDIRS: SubdirMap = &[
"download",
&Router::new().download(&API_METHOD_DOWNLOAD_FILE),
),
+ ("finish", &Router::new().post(&API_METHOD_FINISH)),
("speedtest", &Router::new().download(&API_METHOD_SPEEDTEST)),
];
@@ -347,6 +355,17 @@ fn download_chunk(
.boxed()
}
+#[api]
+/// Signal the reader instance finished successfully
+fn finish(
+ _info: &ApiMethod,
+ rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Value, Error> {
+ let env: &ReaderEnvironment = rpcenv.as_ref();
+ env.finish();
+ Ok(Value::Null)
+}
+
/* this is too slow
fn download_chunk_old(
_parts: Parts,
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 6+ messages in thread
* [pbs-devel] [PATCH v3 proxmox-backup 5/5] client: backup reader: call finish before dropping backup readers
2025-01-09 14:06 [pbs-devel] [PATCH v3 proxmox-backup 0/5] handle reader client disconnects Christian Ebner
` (3 preceding siblings ...)
2025-01-09 14:06 ` [pbs-devel] [PATCH v3 proxmox-backup 4/5] client: reader: add finish method to signal client state to server Christian Ebner
@ 2025-01-09 14:06 ` Christian Ebner
4 siblings, 0 replies; 6+ messages in thread
From: Christian Ebner @ 2025-01-09 14:06 UTC (permalink / raw)
To: pbs-devel
Signal the backup server that the readers have terminated with
success, so the server gracefully handles disconnections and does not
log them as error.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 2:
- not present in previous version
examples/download-speed.rs | 2 ++
proxmox-backup-client/src/catalog.rs | 2 ++
proxmox-backup-client/src/main.rs | 7 +++++++
proxmox-backup-client/src/mount.rs | 3 +++
proxmox-file-restore/src/main.rs | 19 +++++++++++++-----
src/bin/proxmox_backup_debug/diff.rs | 13 ++++++++----
src/server/pull.rs | 7 ++++++-
src/server/push.rs | 2 +-
src/server/sync.rs | 30 +++++++++++++++++-----------
9 files changed, 62 insertions(+), 23 deletions(-)
diff --git a/examples/download-speed.rs b/examples/download-speed.rs
index fe700982b..3583135fb 100644
--- a/examples/download-speed.rs
+++ b/examples/download-speed.rs
@@ -62,6 +62,8 @@ async fn run() -> Result<(), Error> {
(bytes as f64) / (elapsed * 1024.0 * 1024.0)
);
+ client.finish().await?;
+
Ok(())
}
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index b1b22ff24..60c59137c 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -152,6 +152,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
catalog_reader.dump()?;
record_repository(&repo);
+ client.finish().await?;
Ok(Value::Null)
}
@@ -287,6 +288,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
state.shell().await?;
record_repository(&repo);
+ client.finish().await?;
Ok(())
}
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 632a29170..a19591cd9 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -999,6 +999,7 @@ async fn create_backup(
let mut catalog = None;
let mut catalog_result_rx = None;
+ let mut prev_backup_reader = None;
let log_file = |desc: &str, file: &str, target: &str| {
let what = if dry_run { "Would upload" } else { "Upload" };
@@ -1103,6 +1104,8 @@ async fn create_backup(
true,
)
.await?;
+ // Allows to finish the backup reader instance
+ prev_backup_reader = Some(backup_reader.clone());
previous_ref = prepare_reference(
&target,
manifest.clone(),
@@ -1250,6 +1253,9 @@ async fn create_backup(
.await?;
client.finish().await?;
+ if let Some(backup_reader) = prev_backup_reader {
+ backup_reader.finish().await?;
+ }
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
@@ -1744,6 +1750,7 @@ async fn restore(
)
.await?;
}
+ client.finish().await?;
Ok(Value::Null)
}
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index a5fee8329..275339a4f 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -299,6 +299,8 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
// exit on interrupted
}
}
+
+ client.finish().await?;
} else if server_archive_name.archive_type() == ArchiveType::FixedIndex {
let file_info = manifest.lookup_file_info(&server_archive_name)?;
let index = client
@@ -361,6 +363,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
}
log::info!("Image unmapped");
+ client.finish().await?;
} else {
bail!("unknown archive file extension (expected .pxar or .img)");
}
diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs
index 572e2d188..ee83519a2 100644
--- a/proxmox-file-restore/src/main.rs
+++ b/proxmox-file-restore/src/main.rs
@@ -122,7 +122,7 @@ async fn list_files(
let (manifest, _) = client.download_manifest().await?;
manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
- match path {
+ let result = match path {
ExtractPath::ListArchives => {
let mut entries = vec![];
for file in manifest.files() {
@@ -207,7 +207,11 @@ async fn list_files(
};
data_list(driver, details, file, path).await
}
- }
+ };
+
+ client.finish().await?;
+
+ result
}
#[api(
@@ -487,9 +491,13 @@ async fn extract(
.await?;
let reader = if let Some(payload_archive_name) = payload_archive_name {
- let (payload_reader, payload_size) =
- get_remote_pxar_reader(&payload_archive_name, client, &manifest, crypt_config)
- .await?;
+ let (payload_reader, payload_size) = get_remote_pxar_reader(
+ &payload_archive_name,
+ client.clone(),
+ &manifest,
+ crypt_config,
+ )
+ .await?;
pxar::PxarVariant::Split(reader, (payload_reader, payload_size))
} else {
pxar::PxarVariant::Unified(reader)
@@ -542,6 +550,7 @@ async fn extract(
bail!("cannot extract '{orig_path}'");
}
}
+ client.finish().await?;
Ok(())
}
diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
index fc65f3120..4462a1187 100644
--- a/src/bin/proxmox_backup_debug/diff.rs
+++ b/src/bin/proxmox_backup_debug/diff.rs
@@ -163,8 +163,10 @@ async fn diff_archive(
compare_contents: bool,
output_params: &OutputParams,
) -> Result<(), Error> {
- let (index_a, accessor_a) = open_dynamic_index(snapshot_a, file_name, repo_params).await?;
- let (index_b, accessor_b) = open_dynamic_index(snapshot_b, file_name, repo_params).await?;
+ let (index_a, accessor_a, backup_reader_a) =
+ open_dynamic_index(snapshot_a, file_name, repo_params).await?;
+ let (index_b, accessor_b, backup_reader_b) =
+ open_dynamic_index(snapshot_b, file_name, repo_params).await?;
// vecs of chunk digests, in their correct order
let chunks_a = chunk_digests_for_index(&index_a);
@@ -217,6 +219,9 @@ async fn diff_archive(
show_file_list(&added_files, &deleted_files, &modified_files, output_params)?;
+ backup_reader_a.finish().await?;
+ backup_reader_b.finish().await?;
+
Ok(())
}
@@ -248,7 +253,7 @@ async fn open_dynamic_index(
snapshot: &str,
archive_name: &BackupArchiveName,
params: &RepoParams,
-) -> Result<(DynamicIndexReader, Accessor), Error> {
+) -> Result<(DynamicIndexReader, Accessor, Arc<BackupReader>), Error> {
let backup_dir = match snapshot.parse::<BackupPart>()? {
BackupPart::Dir(dir) => dir,
BackupPart::Group(_group) => {
@@ -291,7 +296,7 @@ async fn open_dynamic_index(
let reader: Arc<dyn ReadAt + Send + Sync> = Arc::new(LocalDynamicReadAt::new(reader));
let accessor = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
- Ok((lookup_index, accessor))
+ Ok((lookup_index, accessor, backup_reader))
}
/// Get a list of chunk digests for an index file.
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 516abfe5d..1b914501f 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -608,13 +608,18 @@ async fn pull_group(
.store
.backup_dir(target_ns.clone(), from_snapshot.clone())?;
- let reader = params
+ let (reader, backup_reader) = params
.source
.reader(source_namespace, &from_snapshot)
.await?;
let result =
pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
+ if let Some(backup_reader) = backup_reader {
+ // ignore errors
+ let _result = backup_reader.finish().await;
+ }
+
progress.done_snapshots = pos as u64 + 1;
info!("percentage done: {progress}");
diff --git a/src/server/push.rs b/src/server/push.rs
index 6498f316b..c326bad1f 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -787,7 +787,7 @@ pub(crate) async fn push_snapshot(
.backup_dir(namespace.clone(), snapshot.clone())?;
// Reader locks the snapshot
- let reader = params.source.reader(namespace, snapshot).await?;
+ let (reader, _) = params.source.reader(namespace, snapshot).await?;
// Does not lock the manifest, but the reader already assures a locked snapshot
let source_manifest = match backup_dir.load_manifest() {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..42ddd967f 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -255,7 +255,7 @@ pub(crate) trait SyncSource: Send + Sync {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error>;
+ ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error>;
}
pub(crate) struct RemoteSource {
@@ -402,13 +402,16 @@ impl SyncSource for RemoteSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error> {
let backup_reader =
BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteSourceReader {
- backup_reader,
- dir: dir.clone(),
- }))
+ Ok((
+ Arc::new(RemoteSourceReader {
+ backup_reader: backup_reader.clone(),
+ dir: dir.clone(),
+ }),
+ Some(backup_reader),
+ ))
}
}
@@ -475,18 +478,21 @@ impl SyncSource for LocalSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error> {
let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
&dir.full_path(),
"snapshot",
"locked by another operation",
)?;
- Ok(Arc::new(LocalSourceReader {
- _dir_lock: Arc::new(Mutex::new(dir_lock)),
- path: dir.full_path(),
- datastore: dir.datastore().clone(),
- }))
+ Ok((
+ Arc::new(LocalSourceReader {
+ _dir_lock: Arc::new(Mutex::new(dir_lock)),
+ path: dir.full_path(),
+ datastore: dir.datastore().clone(),
+ }),
+ None,
+ ))
}
}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 6+ messages in thread