* [pbs-devel] [PATCH v5 proxmox-backup] client: pxar: fix race in pxar backup stream
@ 2025-01-24 12:46 Christian Ebner
2025-02-11 11:30 ` [pbs-devel] applied: " Fabian Grünbichler
0 siblings, 1 reply; 2+ messages in thread
From: Christian Ebner @ 2025-01-24 12:46 UTC (permalink / raw)
To: pbs-devel
Fixes a race condition where the backup upload stream can miss an
error returned by pxar::create_archive, because the error state is
only set after the backup stream was already polled.
On instantiation, `PxarBackupStream` spawns a future handling the
pxar archive creation, which sends the encoded pxar archive stream
(or streams in case of split archives) through a channel, received
by the pxar backup stream on polling.
In case this channel is closed as signaled by returning an error, the
poll logic will propagate an eventual error occurred during pxar
creation by taking it from the `PxarBackupStream`.
As this error might not have been set just yet, this can lead to
incorrectly terminating a backup snapshot with success, eventhough an
error occurred.
To fix this, introduce a dedicated notifier for each stream instance
and wait for the archiver to signal it has finished via this
notification channel. In addition, extend the `PxarBackupStream` by a
`finished` flag to allow early return on subsequent polls, which
would otherwise block, waiting for a new notification.
In case of premature termination of the pxar backup stream, no
additional measures have to been taken, as the abort handle already
terminates the archive creation.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Changes since version 4 (thanks a lot Fabian for comments!):
- Drop unneeded shared state, only store error
- Use dedicated notification channel instances per stream, so each
channel gets notified for sure.
- Use `finished` flag per `PxarBackupStream` instance, allowing on early
return if the archiver finish has already been seen, avoiding to block
again on awaiting a notification.
pbs-client/src/pxar_backup_stream.rs | 34 +++++++++++++++++++++++++---
1 file changed, 31 insertions(+), 3 deletions(-)
diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 2bfb5cf29..1303e8503 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -11,6 +11,7 @@ use futures::stream::Stream;
use nix::dir::Dir;
use nix::fcntl::OFlag;
use nix::sys::stat::Mode;
+use tokio::sync::Notify;
use proxmox_async::blocking::TokioWriterAdapter;
use proxmox_io::StdChannelWriter;
@@ -31,6 +32,8 @@ pub struct PxarBackupStream {
pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
handle: Option<AbortHandle>,
error: Arc<Mutex<Option<Error>>>,
+ finished: bool,
+ archiver_finished_notification: Arc<Notify>,
}
impl Drop for PxarBackupStream {
@@ -80,6 +83,10 @@ impl PxarBackupStream {
let error = Arc::new(Mutex::new(None));
let error2 = Arc::clone(&error);
+ let stream_notifier = Arc::new(Notify::new());
+ let stream_notification_receiver = stream_notifier.clone();
+ let payload_stream_notifier = Arc::new(Notify::new());
+ let payload_stream_notification_receiver = payload_stream_notifier.clone();
let handler = async move {
if let Err(err) = crate::pxar::create_archive(
dir,
@@ -101,6 +108,10 @@ impl PxarBackupStream {
let mut error = error2.lock().unwrap();
*error = Some(err);
}
+
+ // Notify upload streams that archiver is finished (with or without error)
+ stream_notifier.notify_one();
+ payload_stream_notifier.notify_one();
};
let (handle, registration) = AbortHandle::new_pair();
@@ -112,6 +123,8 @@ impl PxarBackupStream {
suggested_boundaries: None,
handle: Some(handle.clone()),
error: Arc::clone(&error),
+ finished: false,
+ archiver_finished_notification: stream_notification_receiver,
};
let backup_payload_stream = payload_rx.map(|rx| Self {
@@ -119,6 +132,8 @@ impl PxarBackupStream {
suggested_boundaries: suggested_boundaries_rx,
handle: Some(handle),
error,
+ finished: false,
+ archiver_finished_notification: payload_stream_notification_receiver,
});
Ok((backup_stream, backup_payload_stream))
@@ -141,18 +156,31 @@ impl Stream for PxarBackupStream {
type Item = Result<Vec<u8>, Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+ if this.finished {
+ // Channel has already been finished and eventual errors propagated,
+ // early return to avoid blocking on further archiver finished notifications
+ // by subsequent polls.
+ return Poll::Ready(None);
+ }
{
// limit lock scope
- let mut error = self.error.lock().unwrap();
+ let mut error = this.error.lock().unwrap();
if let Some(err) = error.take() {
return Poll::Ready(Some(Err(err)));
}
}
- match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
+ match proxmox_async::runtime::block_in_place(|| this.rx.as_ref().unwrap().recv()) {
Ok(data) => Poll::Ready(Some(data)),
Err(_) => {
- let mut error = self.error.lock().unwrap();
+ // Wait for archiver to finish
+ proxmox_async::runtime::block_on(this.archiver_finished_notification.notified());
+ // Never block for archiver finished notification on subsequent calls.
+ // Eventual error will already have been propagated.
+ this.finished = true;
+
+ let mut error = this.error.lock().unwrap();
if let Some(err) = error.take() {
return Poll::Ready(Some(Err(err)));
}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 2+ messages in thread
* [pbs-devel] applied: [PATCH v5 proxmox-backup] client: pxar: fix race in pxar backup stream
2025-01-24 12:46 [pbs-devel] [PATCH v5 proxmox-backup] client: pxar: fix race in pxar backup stream Christian Ebner
@ 2025-02-11 11:30 ` Fabian Grünbichler
0 siblings, 0 replies; 2+ messages in thread
From: Fabian Grünbichler @ 2025-02-11 11:30 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
On January 24, 2025 1:46 pm, Christian Ebner wrote:
> Fixes a race condition where the backup upload stream can miss an
> error returned by pxar::create_archive, because the error state is
> only set after the backup stream was already polled.
>
> On instantiation, `PxarBackupStream` spawns a future handling the
> pxar archive creation, which sends the encoded pxar archive stream
> (or streams in case of split archives) through a channel, received
> by the pxar backup stream on polling.
>
> In case this channel is closed as signaled by returning an error, the
> poll logic will propagate an eventual error occurred during pxar
> creation by taking it from the `PxarBackupStream`.
>
> As this error might not have been set just yet, this can lead to
> incorrectly terminating a backup snapshot with success, eventhough an
> error occurred.
>
> To fix this, introduce a dedicated notifier for each stream instance
> and wait for the archiver to signal it has finished via this
> notification channel. In addition, extend the `PxarBackupStream` by a
> `finished` flag to allow early return on subsequent polls, which
> would otherwise block, waiting for a new notification.
>
> In case of premature termination of the pxar backup stream, no
> additional measures have to been taken, as the abort handle already
> terminates the archive creation.
>
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> Changes since version 4 (thanks a lot Fabian for comments!):
> - Drop unneeded shared state, only store error
> - Use dedicated notification channel instances per stream, so each
> channel gets notified for sure.
> - Use `finished` flag per `PxarBackupStream` instance, allowing on early
> return if the archiver finish has already been seen, avoiding to block
> again on awaiting a notification.
>
> pbs-client/src/pxar_backup_stream.rs | 34 +++++++++++++++++++++++++---
> 1 file changed, 31 insertions(+), 3 deletions(-)
>
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 2bfb5cf29..1303e8503 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -11,6 +11,7 @@ use futures::stream::Stream;
> use nix::dir::Dir;
> use nix::fcntl::OFlag;
> use nix::sys::stat::Mode;
> +use tokio::sync::Notify;
>
> use proxmox_async::blocking::TokioWriterAdapter;
> use proxmox_io::StdChannelWriter;
> @@ -31,6 +32,8 @@ pub struct PxarBackupStream {
> pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
> handle: Option<AbortHandle>,
> error: Arc<Mutex<Option<Error>>>,
> + finished: bool,
> + archiver_finished_notification: Arc<Notify>,
> }
>
> impl Drop for PxarBackupStream {
> @@ -80,6 +83,10 @@ impl PxarBackupStream {
>
> let error = Arc::new(Mutex::new(None));
> let error2 = Arc::clone(&error);
> + let stream_notifier = Arc::new(Notify::new());
> + let stream_notification_receiver = stream_notifier.clone();
> + let payload_stream_notifier = Arc::new(Notify::new());
> + let payload_stream_notification_receiver = payload_stream_notifier.clone();
> let handler = async move {
> if let Err(err) = crate::pxar::create_archive(
> dir,
> @@ -101,6 +108,10 @@ impl PxarBackupStream {
> let mut error = error2.lock().unwrap();
> *error = Some(err);
> }
> +
> + // Notify upload streams that archiver is finished (with or without error)
> + stream_notifier.notify_one();
> + payload_stream_notifier.notify_one();
> };
>
> let (handle, registration) = AbortHandle::new_pair();
> @@ -112,6 +123,8 @@ impl PxarBackupStream {
> suggested_boundaries: None,
> handle: Some(handle.clone()),
> error: Arc::clone(&error),
> + finished: false,
> + archiver_finished_notification: stream_notification_receiver,
> };
>
> let backup_payload_stream = payload_rx.map(|rx| Self {
> @@ -119,6 +132,8 @@ impl PxarBackupStream {
> suggested_boundaries: suggested_boundaries_rx,
> handle: Some(handle),
> error,
> + finished: false,
> + archiver_finished_notification: payload_stream_notification_receiver,
> });
>
> Ok((backup_stream, backup_payload_stream))
> @@ -141,18 +156,31 @@ impl Stream for PxarBackupStream {
> type Item = Result<Vec<u8>, Error>;
>
> fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
> + let this = self.get_mut();
> + if this.finished {
> + // Channel has already been finished and eventual errors propagated,
> + // early return to avoid blocking on further archiver finished notifications
> + // by subsequent polls.
> + return Poll::Ready(None);
> + }
> {
> // limit lock scope
> - let mut error = self.error.lock().unwrap();
> + let mut error = this.error.lock().unwrap();
> if let Some(err) = error.take() {
> return Poll::Ready(Some(Err(err)));
> }
> }
>
> - match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
> + match proxmox_async::runtime::block_in_place(|| this.rx.as_ref().unwrap().recv()) {
> Ok(data) => Poll::Ready(Some(data)),
> Err(_) => {
> - let mut error = self.error.lock().unwrap();
> + // Wait for archiver to finish
> + proxmox_async::runtime::block_on(this.archiver_finished_notification.notified());
> + // Never block for archiver finished notification on subsequent calls.
> + // Eventual error will already have been propagated.
> + this.finished = true;
> +
> + let mut error = this.error.lock().unwrap();
> if let Some(err) = error.take() {
> return Poll::Ready(Some(Err(err)));
> }
> --
> 2.39.5
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2025-02-11 11:31 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-01-24 12:46 [pbs-devel] [PATCH v5 proxmox-backup] client: pxar: fix race in pxar backup stream Christian Ebner
2025-02-11 11:30 ` [pbs-devel] applied: " Fabian Grünbichler
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox