From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9])
	by lore.proxmox.com (Postfix) with ESMTPS id 375871FF176
	for <inbox@lore.proxmox.com>; Sat,  7 Dec 2024 12:08:23 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id BE6C61276C;
	Sat,  7 Dec 2024 12:08:23 +0100 (CET)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Sat,  7 Dec 2024 12:07:20 +0100
Message-Id: <20241207110720.205989-1-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.5
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.029 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [PATCH v4 proxmox-backup] client: pxar: fix race in
 pxar backup stream
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

Fixes a race condition where the backup upload stream can miss an
error returned by pxar::create_archive, because the error state is
only set after the backup stream was already polled.

On instantiation, `PxarBackupStream` spawns a future handling the
pxar archive creation, which sends the encoded pxar archive stream
(or streams in case of split archives) through a channel, received
by the pxar backup stream on polling.

In case this channel is closed as signaled by returning an error, the
poll logic will propagate an eventual error occurred during pxar
creation by taking it from the `PxarBackupStream`.

As this error might not have been set just yet, this can lead to
incorrectly terminating a backup snapshot with success, eventhough an
error occurred.

To fix this, introduce `ArchiverState` to hold a finish flag as well
as the error and add a notification channel, allowing the archiver
future to signal the waiting stream. As the notification waiter will
block on subsequent polls even if it has already been notified about
the archive creation finish, or it might not have been registered
just yet when the notification was send out, only block and wait for
notifications if the finished flag in the `ArchiverState` is not set.
If it is set, there is no need to wait for a notification, as the
archiver is finished for sure.

In case of premature termination of the pxar backup stream, no
additional measures have to been taken, as the abort handle already
terminates the archive creation.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- fix a possible deadlock encountered during further testing by
  strictly limiting the archiver state's mutex lock scope.

 pbs-client/src/pxar_backup_stream.rs | 61 +++++++++++++++++++++-------
 1 file changed, 47 insertions(+), 14 deletions(-)

diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 2bfb5cf29..3fb1927d0 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -11,6 +11,7 @@ use futures::stream::Stream;
 use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
+use tokio::sync::Notify;
 
 use proxmox_async::blocking::TokioWriterAdapter;
 use proxmox_io::StdChannelWriter;
@@ -30,7 +31,13 @@ pub struct PxarBackupStream {
     rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
     pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
     handle: Option<AbortHandle>,
-    error: Arc<Mutex<Option<Error>>>,
+    archiver_state: Arc<Mutex<ArchiverState>>,
+    archiver_finished_notification: Arc<Notify>,
+}
+
+struct ArchiverState {
+    finished: bool,
+    error: Option<Error>,
 }
 
 impl Drop for PxarBackupStream {
@@ -78,10 +85,16 @@ impl PxarBackupStream {
                 (pxar::PxarVariant::Unified(writer), None, None, None)
             };
 
-        let error = Arc::new(Mutex::new(None));
-        let error2 = Arc::clone(&error);
+        let archiver_state = ArchiverState {
+            finished: false,
+            error: None,
+        };
+        let archiver_state = Arc::new(Mutex::new(archiver_state));
+        let archiver_state2 = Arc::clone(&archiver_state);
+        let pxar_backup_stream_notifier = Arc::new(Notify::new());
+        let archiver_finished_notification = pxar_backup_stream_notifier.clone();
         let handler = async move {
-            if let Err(err) = crate::pxar::create_archive(
+            let result = crate::pxar::create_archive(
                 dir,
                 PxarWriters::new(
                     writer,
@@ -96,10 +109,19 @@ impl PxarBackupStream {
                 boundaries,
                 suggested_boundaries_tx,
             )
-            .await
-            {
-                let mut error = error2.lock().unwrap();
-                *error = Some(err);
+            .await;
+
+            let mut state = archiver_state2.lock().unwrap();
+            state.finished = true;
+            if let Err(err) = result {
+                state.error = Some(err);
+            }
+            drop(state);
+
+            // Notify upload streams that archiver is finished (with or without error)
+            pxar_backup_stream_notifier.notify_one();
+            if separate_payload_stream {
+                pxar_backup_stream_notifier.notify_one();
             }
         };
 
@@ -111,14 +133,16 @@ impl PxarBackupStream {
             rx: Some(rx),
             suggested_boundaries: None,
             handle: Some(handle.clone()),
-            error: Arc::clone(&error),
+            archiver_state: archiver_state.clone(),
+            archiver_finished_notification: archiver_finished_notification.clone(),
         };
 
         let backup_payload_stream = payload_rx.map(|rx| Self {
             rx: Some(rx),
             suggested_boundaries: suggested_boundaries_rx,
             handle: Some(handle),
-            error,
+            archiver_state,
+            archiver_finished_notification,
         });
 
         Ok((backup_stream, backup_payload_stream))
@@ -143,8 +167,8 @@ impl Stream for PxarBackupStream {
     fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
         {
             // limit lock scope
-            let mut error = self.error.lock().unwrap();
-            if let Some(err) = error.take() {
+            let mut state = self.archiver_state.lock().unwrap();
+            if let Some(err) = state.error.take() {
                 return Poll::Ready(Some(Err(err)));
             }
         }
@@ -152,8 +176,17 @@ impl Stream for PxarBackupStream {
         match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
             Ok(data) => Poll::Ready(Some(data)),
             Err(_) => {
-                let mut error = self.error.lock().unwrap();
-                if let Some(err) = error.take() {
+                // If the archiver did not signal it is finished, wait for finished completion
+                // to avoid potentially miss errors
+                let finished = { self.archiver_state.lock().unwrap().finished };
+                if !finished {
+                    proxmox_async::runtime::block_on(
+                        self.archiver_finished_notification.notified(),
+                    );
+                }
+
+                let error = { self.archiver_state.lock().unwrap().error.take() };
+                if let Some(err) = error {
                     return Poll::Ready(Some(Err(err)));
                 }
                 Poll::Ready(None) // channel closed, no error
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel