From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 115C91FF170
	for <inbox@lore.proxmox.com>; Thu, 29 May 2025 16:32:40 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 71E4815E46;
	Thu, 29 May 2025 16:32:43 +0200 (CEST)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Thu, 29 May 2025 16:32:05 +0200
Message-Id: <20250529143207.694497-41-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250529143207.694497-1-c.ebner@proxmox.com>
References: <20250529143207.694497-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.034 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [RFC v2 proxmox-backup 40/42] api: backup: use local
 datastore cache on S3 backend chunk upload
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

Take advantage of the local datastore cache to avoid re-uploading of
already known chunks. This not only helps improve the backup/upload
speeds, but also avoids additionally costs by reducing the number of
requests and transferred payload data to the S3 object store api.

If the cache is present, lookup if it contains the chunk, skipping
upload altogether if it is. Otherwise, upload the chunk into memory,
upload it to the S3 object store api and insert it into the local
datastore cache.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/api2/backup/upload_chunk.rs | 46 ++++++++++++++++++++++++++++++---
 src/server/pull.rs              |  4 +++
 2 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 838eec1fa..7a80fd0eb 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -247,10 +247,48 @@ async fn upload_to_backend(
             UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await
         }
         DatastoreBackend::S3(s3_client) => {
-            let is_duplicate = match s3_client.put_object(digest.into(), req_body).await? {
-                PutObjectResponse::PreconditionFailed => true,
-                PutObjectResponse::NeedsRetry => bail!("concurrent operation, reupload required"),
-                PutObjectResponse::Success(_content) => false,
+            // Load chunk data into memory, need to write it twice, to S3 object store and
+            // local cache store. Further, body needs to be consumed also if chunks insert
+            // can be skipped since cached.
+            let data = req_body
+                .map_err(Error::from)
+                .try_fold(Vec::new(), |mut acc, chunk| {
+                    acc.extend_from_slice(&chunk);
+                    future::ok::<_, Error>(acc)
+                })
+                .await?;
+
+            if encoded_size != data.len() as u32 {
+                bail!(
+                    "got blob with unexpected length ({encoded_size} != {})",
+                    data.len()
+                );
+            }
+
+            if env.datastore.cache_contains(&digest) {
+                return Ok((digest, size, encoded_size, true));
+            }
+
+            let datastore = env.datastore.clone();
+            let upload_body = hyper::Body::from(data.clone());
+            let upload = s3_client.put_object(digest.into(), upload_body);
+            let cache_insert = tokio::task::spawn_blocking(move || {
+                let chunk = DataBlob::from_raw(data)?;
+                datastore.cache_insert(&digest, &chunk)
+            });
+            let is_duplicate = match futures::join!(upload, cache_insert) {
+                (Ok(upload_response), Ok(Ok(()))) => match upload_response {
+                    PutObjectResponse::PreconditionFailed => true,
+                    PutObjectResponse::NeedsRetry => {
+                        bail!("concurrent operation, reupload required")
+                    }
+                    PutObjectResponse::Success(_content) => false,
+                },
+                (Ok(_), Ok(Err(err))) => return Err(err.context("chunk cache insert failed")),
+                (Ok(_), Err(err)) => {
+                    return Err(Error::from(err).context("chunk cache insert task failed"))
+                }
+                (Err(err), _) => return Err(err.context("chunk upload failed")),
             };
             Ok((digest, size, encoded_size, is_duplicate))
         }
diff --git a/src/server/pull.rs b/src/server/pull.rs
index f36efd7c8..85d3154eb 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -173,6 +173,10 @@ async fn pull_index_chunks<I: IndexFile>(
                     target2.insert_chunk(&chunk, &digest)?;
                 }
                 DatastoreBackend::S3(s3_client) => {
+                    if target2.cache_contains(&digest) {
+                        return Ok(());
+                    }
+                    target2.cache_insert(&digest, &chunk)?;
                     let data = chunk.raw_data().to_vec();
                     let upload_body = hyper::Body::from(data);
                     proxmox_async::runtime::block_on(
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel