From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 3E1061FF15E
	for <inbox@lore.proxmox.com>; Tue, 11 Feb 2025 12:31:23 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 66A8A29916;
	Tue, 11 Feb 2025 12:31:20 +0100 (CET)
Date: Tue, 11 Feb 2025 12:30:43 +0100
From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= <f.gruenbichler@proxmox.com>
To: Proxmox Backup Server development discussion <pbs-devel@lists.proxmox.com>
References: <20250124124635.291858-1-c.ebner@proxmox.com>
In-Reply-To: <20250124124635.291858-1-c.ebner@proxmox.com>
MIME-Version: 1.0
User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid)
Message-Id: <1739273353.vtm1zn5r0f.astroid@yuna.none>
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.044 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [proxmox.com]
Subject: [pbs-devel] applied: [PATCH v5 proxmox-backup] client: pxar: fix
 race in pxar backup stream
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

On January 24, 2025 1:46 pm, Christian Ebner wrote:
> Fixes a race condition where the backup upload stream can miss an
> error returned by pxar::create_archive, because the error state is
> only set after the backup stream was already polled.
> 
> On instantiation, `PxarBackupStream` spawns a future handling the
> pxar archive creation, which sends the encoded pxar archive stream
> (or streams in case of split archives) through a channel, received
> by the pxar backup stream on polling.
> 
> In case this channel is closed as signaled by returning an error, the
> poll logic will propagate an eventual error occurred during pxar
> creation by taking it from the `PxarBackupStream`.
> 
> As this error might not have been set just yet, this can lead to
> incorrectly terminating a backup snapshot with success, eventhough an
> error occurred.
> 
> To fix this, introduce a dedicated notifier for each stream instance
> and wait for the archiver to signal it has finished via this
> notification channel. In addition, extend the `PxarBackupStream` by a
> `finished` flag to allow early return on subsequent polls, which
> would otherwise block, waiting for a new notification.
> 
> In case of premature termination of the pxar backup stream, no
> additional measures have to been taken, as the abort handle already
> terminates the archive creation.
> 
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> Changes since version 4 (thanks a lot Fabian for comments!):
> - Drop unneeded shared state, only store error
> - Use dedicated notification channel instances per stream, so each
>   channel gets notified for sure.
> - Use `finished` flag per `PxarBackupStream` instance, allowing on early
>   return if the archiver finish has already been seen, avoiding to block
>   again on awaiting a notification.
> 
>  pbs-client/src/pxar_backup_stream.rs | 34 +++++++++++++++++++++++++---
>  1 file changed, 31 insertions(+), 3 deletions(-)
> 
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 2bfb5cf29..1303e8503 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -11,6 +11,7 @@ use futures::stream::Stream;
>  use nix::dir::Dir;
>  use nix::fcntl::OFlag;
>  use nix::sys::stat::Mode;
> +use tokio::sync::Notify;
>  
>  use proxmox_async::blocking::TokioWriterAdapter;
>  use proxmox_io::StdChannelWriter;
> @@ -31,6 +32,8 @@ pub struct PxarBackupStream {
>      pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
>      handle: Option<AbortHandle>,
>      error: Arc<Mutex<Option<Error>>>,
> +    finished: bool,
> +    archiver_finished_notification: Arc<Notify>,
>  }
>  
>  impl Drop for PxarBackupStream {
> @@ -80,6 +83,10 @@ impl PxarBackupStream {
>  
>          let error = Arc::new(Mutex::new(None));
>          let error2 = Arc::clone(&error);
> +        let stream_notifier = Arc::new(Notify::new());
> +        let stream_notification_receiver = stream_notifier.clone();
> +        let payload_stream_notifier = Arc::new(Notify::new());
> +        let payload_stream_notification_receiver = payload_stream_notifier.clone();
>          let handler = async move {
>              if let Err(err) = crate::pxar::create_archive(
>                  dir,
> @@ -101,6 +108,10 @@ impl PxarBackupStream {
>                  let mut error = error2.lock().unwrap();
>                  *error = Some(err);
>              }
> +
> +            // Notify upload streams that archiver is finished (with or without error)
> +            stream_notifier.notify_one();
> +            payload_stream_notifier.notify_one();
>          };
>  
>          let (handle, registration) = AbortHandle::new_pair();
> @@ -112,6 +123,8 @@ impl PxarBackupStream {
>              suggested_boundaries: None,
>              handle: Some(handle.clone()),
>              error: Arc::clone(&error),
> +            finished: false,
> +            archiver_finished_notification: stream_notification_receiver,
>          };
>  
>          let backup_payload_stream = payload_rx.map(|rx| Self {
> @@ -119,6 +132,8 @@ impl PxarBackupStream {
>              suggested_boundaries: suggested_boundaries_rx,
>              handle: Some(handle),
>              error,
> +            finished: false,
> +            archiver_finished_notification: payload_stream_notification_receiver,
>          });
>  
>          Ok((backup_stream, backup_payload_stream))
> @@ -141,18 +156,31 @@ impl Stream for PxarBackupStream {
>      type Item = Result<Vec<u8>, Error>;
>  
>      fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +        if this.finished {
> +            // Channel has already been finished and eventual errors propagated,
> +            // early return to avoid blocking on further archiver finished notifications
> +            // by subsequent polls.
> +            return Poll::Ready(None);
> +        }
>          {
>              // limit lock scope
> -            let mut error = self.error.lock().unwrap();
> +            let mut error = this.error.lock().unwrap();
>              if let Some(err) = error.take() {
>                  return Poll::Ready(Some(Err(err)));
>              }
>          }
>  
> -        match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
> +        match proxmox_async::runtime::block_in_place(|| this.rx.as_ref().unwrap().recv()) {
>              Ok(data) => Poll::Ready(Some(data)),
>              Err(_) => {
> -                let mut error = self.error.lock().unwrap();
> +                // Wait for archiver to finish
> +                proxmox_async::runtime::block_on(this.archiver_finished_notification.notified());
> +                // Never block for archiver finished notification on subsequent calls.
> +                // Eventual error will already have been propagated.
> +                this.finished = true;
> +
> +                let mut error = this.error.lock().unwrap();
>                  if let Some(err) = error.take() {
>                      return Poll::Ready(Some(Err(err)));
>                  }
> -- 
> 2.39.5
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel