public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>
Subject: [pbs-devel] applied: [PATCH v5 proxmox-backup] client: pxar: fix race in pxar backup stream
Date: Tue, 11 Feb 2025 12:30:43 +0100	[thread overview]
Message-ID: <1739273353.vtm1zn5r0f.astroid@yuna.none> (raw)
In-Reply-To: <20250124124635.291858-1-c.ebner@proxmox.com>

On January 24, 2025 1:46 pm, Christian Ebner wrote:
> Fixes a race condition where the backup upload stream can miss an
> error returned by pxar::create_archive, because the error state is
> only set after the backup stream was already polled.
> 
> On instantiation, `PxarBackupStream` spawns a future handling the
> pxar archive creation, which sends the encoded pxar archive stream
> (or streams in case of split archives) through a channel, received
> by the pxar backup stream on polling.
> 
> In case this channel is closed as signaled by returning an error, the
> poll logic will propagate an eventual error occurred during pxar
> creation by taking it from the `PxarBackupStream`.
> 
> As this error might not have been set just yet, this can lead to
> incorrectly terminating a backup snapshot with success, eventhough an
> error occurred.
> 
> To fix this, introduce a dedicated notifier for each stream instance
> and wait for the archiver to signal it has finished via this
> notification channel. In addition, extend the `PxarBackupStream` by a
> `finished` flag to allow early return on subsequent polls, which
> would otherwise block, waiting for a new notification.
> 
> In case of premature termination of the pxar backup stream, no
> additional measures have to been taken, as the abort handle already
> terminates the archive creation.
> 
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> Changes since version 4 (thanks a lot Fabian for comments!):
> - Drop unneeded shared state, only store error
> - Use dedicated notification channel instances per stream, so each
>   channel gets notified for sure.
> - Use `finished` flag per `PxarBackupStream` instance, allowing on early
>   return if the archiver finish has already been seen, avoiding to block
>   again on awaiting a notification.
> 
>  pbs-client/src/pxar_backup_stream.rs | 34 +++++++++++++++++++++++++---
>  1 file changed, 31 insertions(+), 3 deletions(-)
> 
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 2bfb5cf29..1303e8503 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -11,6 +11,7 @@ use futures::stream::Stream;
>  use nix::dir::Dir;
>  use nix::fcntl::OFlag;
>  use nix::sys::stat::Mode;
> +use tokio::sync::Notify;
>  
>  use proxmox_async::blocking::TokioWriterAdapter;
>  use proxmox_io::StdChannelWriter;
> @@ -31,6 +32,8 @@ pub struct PxarBackupStream {
>      pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
>      handle: Option<AbortHandle>,
>      error: Arc<Mutex<Option<Error>>>,
> +    finished: bool,
> +    archiver_finished_notification: Arc<Notify>,
>  }
>  
>  impl Drop for PxarBackupStream {
> @@ -80,6 +83,10 @@ impl PxarBackupStream {
>  
>          let error = Arc::new(Mutex::new(None));
>          let error2 = Arc::clone(&error);
> +        let stream_notifier = Arc::new(Notify::new());
> +        let stream_notification_receiver = stream_notifier.clone();
> +        let payload_stream_notifier = Arc::new(Notify::new());
> +        let payload_stream_notification_receiver = payload_stream_notifier.clone();
>          let handler = async move {
>              if let Err(err) = crate::pxar::create_archive(
>                  dir,
> @@ -101,6 +108,10 @@ impl PxarBackupStream {
>                  let mut error = error2.lock().unwrap();
>                  *error = Some(err);
>              }
> +
> +            // Notify upload streams that archiver is finished (with or without error)
> +            stream_notifier.notify_one();
> +            payload_stream_notifier.notify_one();
>          };
>  
>          let (handle, registration) = AbortHandle::new_pair();
> @@ -112,6 +123,8 @@ impl PxarBackupStream {
>              suggested_boundaries: None,
>              handle: Some(handle.clone()),
>              error: Arc::clone(&error),
> +            finished: false,
> +            archiver_finished_notification: stream_notification_receiver,
>          };
>  
>          let backup_payload_stream = payload_rx.map(|rx| Self {
> @@ -119,6 +132,8 @@ impl PxarBackupStream {
>              suggested_boundaries: suggested_boundaries_rx,
>              handle: Some(handle),
>              error,
> +            finished: false,
> +            archiver_finished_notification: payload_stream_notification_receiver,
>          });
>  
>          Ok((backup_stream, backup_payload_stream))
> @@ -141,18 +156,31 @@ impl Stream for PxarBackupStream {
>      type Item = Result<Vec<u8>, Error>;
>  
>      fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +        if this.finished {
> +            // Channel has already been finished and eventual errors propagated,
> +            // early return to avoid blocking on further archiver finished notifications
> +            // by subsequent polls.
> +            return Poll::Ready(None);
> +        }
>          {
>              // limit lock scope
> -            let mut error = self.error.lock().unwrap();
> +            let mut error = this.error.lock().unwrap();
>              if let Some(err) = error.take() {
>                  return Poll::Ready(Some(Err(err)));
>              }
>          }
>  
> -        match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
> +        match proxmox_async::runtime::block_in_place(|| this.rx.as_ref().unwrap().recv()) {
>              Ok(data) => Poll::Ready(Some(data)),
>              Err(_) => {
> -                let mut error = self.error.lock().unwrap();
> +                // Wait for archiver to finish
> +                proxmox_async::runtime::block_on(this.archiver_finished_notification.notified());
> +                // Never block for archiver finished notification on subsequent calls.
> +                // Eventual error will already have been propagated.
> +                this.finished = true;
> +
> +                let mut error = this.error.lock().unwrap();
>                  if let Some(err) = error.take() {
>                      return Poll::Ready(Some(Err(err)));
>                  }
> -- 
> 2.39.5
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


      reply	other threads:[~2025-02-11 11:31 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-01-24 12:46 [pbs-devel] " Christian Ebner
2025-02-11 11:30 ` Fabian Grünbichler [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1739273353.vtm1zn5r0f.astroid@yuna.none \
    --to=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal