public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 2/3] api: reader: gracefully handle reader client disconnects
Date: Tue,  3 Dec 2024 12:27:55 +0100	[thread overview]
Message-ID: <20241203112756.63872-2-c.ebner@proxmox.com> (raw)
In-Reply-To: <20241203112756.63872-1-c.ebner@proxmox.com>

Currently, if a reader client disconnects after finishing its work,
the connection will be closed by the client without notifying the
server. The future handling the connection on then server side will
then return with a connection error, and in consequence the reader
worker task will log with error state. This can cause confusion [0],
as this is not an error but normal behaviour.

Instead of failing, provide an api endpoint to request gracefully
closing of the connection by the client. This will trigger a signal
to the future handling the connection, to gracefully close it.

Report in the community forum:
[0] https://forum.proxmox.com/threads/158306/

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 Cargo.toml                     |  1 +
 src/api2/reader/environment.rs | 12 +++++-
 src/api2/reader/mod.rs         | 74 ++++++++++++++++++++++++++++++++--
 3 files changed, 83 insertions(+), 4 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index d14f320a6..e3ddb1942 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -185,6 +185,7 @@ num-traits.workspace = true
 once_cell.workspace = true
 openssl.workspace = true
 percent-encoding.workspace = true
+pin-project-lite.workspace = true
 regex.workspace = true
 rustyline.workspace = true
 serde.workspace = true
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f43..ac8970865 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -1,7 +1,8 @@
 use std::collections::HashSet;
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, Mutex, RwLock};
 
 use serde_json::{json, Value};
+use tokio::sync::oneshot;
 
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
 
@@ -23,6 +24,7 @@ pub struct ReaderEnvironment {
     pub worker: Arc<WorkerTask>,
     pub datastore: Arc<DataStore>,
     pub backup_dir: BackupDir,
+    connection_shutdown_trigger: Arc<Mutex<Option<oneshot::Sender<()>>>>,
     allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
 }
 
@@ -33,6 +35,7 @@ impl ReaderEnvironment {
         worker: Arc<WorkerTask>,
         datastore: Arc<DataStore>,
         backup_dir: BackupDir,
+        connection_shutdown_trigger: oneshot::Sender<()>,
     ) -> Self {
         Self {
             result_attributes: json!({}),
@@ -43,6 +46,7 @@ impl ReaderEnvironment {
             debug: tracing::enabled!(tracing::Level::DEBUG),
             formatter: JSON_FORMATTER,
             backup_dir,
+            connection_shutdown_trigger: Arc::new(Mutex::new(Some(connection_shutdown_trigger))),
             allowed_chunks: Arc::new(RwLock::new(HashSet::new())),
         }
     }
@@ -69,6 +73,12 @@ impl ReaderEnvironment {
     pub fn check_chunk_access(&self, digest: [u8; 32]) -> bool {
         self.allowed_chunks.read().unwrap().contains(&digest)
     }
+
+    pub fn connection_shutdown(&self) {
+        if let Some(trigger) = self.connection_shutdown_trigger.lock().unwrap().take() {
+            let _ = trigger.send(());
+        }
+    }
 }
 
 impl RpcEnvironment for ReaderEnvironment {
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 50f80de43..973b8f257 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -1,13 +1,20 @@
 //! Backup reader/restore protocol (HTTP2 upgrade)
 
+use std::pin::Pin;
+use std::task::Poll;
+
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
 use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
 use hyper::http::request::Parts;
+use hyper::server::conn::Connection;
+use hyper::upgrade::Upgraded;
 use hyper::{Body, Request, Response, StatusCode};
+use pin_project_lite::pin_project;
 use serde::Deserialize;
 use serde_json::Value;
+use tokio::sync::oneshot;
 
 use proxmox_rest_server::{H2Service, WorkerTask};
 use proxmox_router::{
@@ -156,12 +163,15 @@ fn upgrade_to_backup_reader_protocol(
             move |worker| async move {
                 let _guard = _guard;
 
+                let (connection_shutdown_trigger, connection_shutdown_receiver) =
+                    oneshot::channel::<()>();
                 let mut env = ReaderEnvironment::new(
                     env_type,
                     auth_id,
                     worker.clone(),
                     datastore,
                     backup_dir,
+                    connection_shutdown_trigger,
                 );
 
                 env.debug = debug;
@@ -192,9 +202,10 @@ fn upgrade_to_backup_reader_protocol(
                     http.http2_initial_connection_window_size(window_size);
                     http.http2_max_frame_size(4 * 1024 * 1024);
 
-                    http.serve_connection(conn, service)
-                        .map_err(Error::from)
-                        .await
+                    let connection = http.serve_connection(conn, service);
+                    let graceful_shutdown_connection =
+                        GracefulShutdownConnection::new(connection, connection_shutdown_receiver);
+                    graceful_shutdown_connection.await.map_err(Error::from)
                 };
 
                 futures::select! {
@@ -222,12 +233,53 @@ fn upgrade_to_backup_reader_protocol(
     .boxed()
 }
 
+pin_project! {
+    struct GracefulShutdownConnection {
+        #[pin]
+        connection: Connection<Upgraded, H2Service<ReaderEnvironment>, ExecInheritLogContext>,
+        #[pin]
+        shutdown_receiver: oneshot::Receiver<()>,
+        shutdown_state: Option<Result<(), oneshot::error::RecvError>>,
+    }
+}
+
+impl Future for GracefulShutdownConnection {
+    type Output = Result<(), hyper::Error>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+        let mut this = self.project();
+
+        if this.shutdown_state.is_none() {
+            if let Poll::Ready(shutdown) = this.shutdown_receiver.poll(cx) {
+                let _ = this.shutdown_state.insert(shutdown);
+                this.connection.as_mut().graceful_shutdown();
+            }
+        }
+
+        this.connection.poll(cx)
+    }
+}
+
+impl GracefulShutdownConnection {
+    fn new(
+        connection: Connection<Upgraded, H2Service<ReaderEnvironment>, ExecInheritLogContext>,
+        shutdown_receiver: oneshot::Receiver<()>,
+    ) -> Self {
+        Self {
+            connection,
+            shutdown_receiver,
+            shutdown_state: None,
+        }
+    }
+}
+
 const READER_API_SUBDIRS: SubdirMap = &[
     ("chunk", &Router::new().download(&API_METHOD_DOWNLOAD_CHUNK)),
     (
         "download",
         &Router::new().download(&API_METHOD_DOWNLOAD_FILE),
     ),
+    ("finish", &Router::new().post(&API_METHOD_FINISH)),
     ("speedtest", &Router::new().download(&API_METHOD_SPEEDTEST)),
 ];
 
@@ -347,6 +399,22 @@ fn download_chunk(
     .boxed()
 }
 
+#[sortable]
+pub const API_METHOD_FINISH: ApiMethod = ApiMethod::new(
+    &ApiHandler::Sync(&finish),
+    &ObjectSchema::new("Signal the reader instance is finished", &[]),
+);
+
+fn finish(
+    _param: Value,
+    _info: &ApiMethod,
+    rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Value, Error> {
+    let env: &ReaderEnvironment = rpcenv.as_ref();
+    env.connection_shutdown();
+    Ok(Value::Null)
+}
+
 /* this is too slow
 fn download_chunk_old(
     _parts: Parts,
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  reply	other threads:[~2024-12-03 11:28 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-12-03 11:27 [pbs-devel] [PATCH proxmox-backup 1/3] client: backup: remove unnecessary clone for backup writer Christian Ebner
2024-12-03 11:27 ` Christian Ebner [this message]
2024-12-03 12:17   ` [pbs-devel] [PATCH proxmox-backup 2/3] api: reader: gracefully handle reader client disconnects Christian Ebner
2024-12-03 11:27 ` [pbs-devel] [PATCH proxmox-backup 3/3] client: reader: signal server before client disconnect Christian Ebner
2024-12-04  8:59 ` [pbs-devel] [PATCH proxmox-backup 1/3] client: backup: remove unnecessary clone for backup writer Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20241203112756.63872-2-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal