From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <d.csapak@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id D3A05794EE
 for <pbs-devel@lists.proxmox.com>; Tue,  4 May 2021 12:21:49 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id C9768278CD
 for <pbs-devel@lists.proxmox.com>; Tue,  4 May 2021 12:21:49 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [94.136.29.106])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS id E226A278C4
 for <pbs-devel@lists.proxmox.com>; Tue,  4 May 2021 12:21:48 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B693542A3B
 for <pbs-devel@lists.proxmox.com>; Tue,  4 May 2021 12:21:48 +0200 (CEST)
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Tue,  4 May 2021 12:21:47 +0200
Message-Id: <20210504102147.15648-1-d.csapak@proxmox.com>
X-Mailer: git-send-email 2.20.1
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.011 Adjusted score from AWL reputation of From: address
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [restore.rs, drive.rs]
Subject: [pbs-devel] [PATCH proxmox-backup] tape/restore: optimize chunk
 restore behaviour
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Tue, 04 May 2021 10:21:49 -0000

by checking the 'checked_chunks' before trying to write to disk
and by doing the existance check in the parallel handler. This way,
we do not have to check the existance of a chunk multiple times
(if multiple source datastores gets restored to the same target
datastore) and also we do not have to wait on the stat before reading
the next chunk.

We have to change the &WorkerTask to an Arc though, otherwise we
cannot log to the worker from the parallel handler

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/tape/drive.rs   |  2 +-
 src/api2/tape/restore.rs | 72 ++++++++++++++++++++++------------------
 2 files changed, 41 insertions(+), 33 deletions(-)

diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index c97de61a..9cf36b37 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -1336,7 +1336,7 @@ pub fn catalog_media(
             drive.read_label()?; // skip over labels - we already read them above
 
             let mut checked_chunks = HashMap::new();
-            restore_media(&worker, &mut drive, &media_id, None, &mut checked_chunks, verbose)?;
+            restore_media(worker, &mut drive, &media_id, None, &mut checked_chunks, verbose)?;
 
             Ok(())
         },
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 2614c68a..1dd6ba11 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -334,7 +334,7 @@ pub fn restore(
 
             for media_id in media_id_list.iter() {
                 request_and_restore_media(
-                    &worker,
+                    worker.clone(),
                     media_id,
                     &drive_config,
                     &drive,
@@ -368,7 +368,7 @@ pub fn restore(
 
 /// Request and restore complete media without using existing catalog (create catalog instead)
 pub fn request_and_restore_media(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     media_id: &MediaId,
     drive_config: &SectionConfigData,
     drive_name: &str,
@@ -388,7 +388,7 @@ pub fn request_and_restore_media(
         .and_then(|userid| lookup_user_email(userid))
         .or_else(|| lookup_user_email(&authid.clone().into()));
 
-    let (mut drive, info) = request_and_load_media(worker, &drive_config, &drive_name, &media_id.label, &email)?;
+    let (mut drive, info) = request_and_load_media(&worker, &drive_config, &drive_name, &media_id.label, &email)?;
 
     match info.media_set_label {
         None => {
@@ -424,7 +424,7 @@ pub fn request_and_restore_media(
 ///
 /// Only create the catalog if target is None.
 pub fn restore_media(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     drive: &mut Box<dyn TapeDriver>,
     media_id: &MediaId,
     target: Option<(&DataStoreMap, &Authid)>,
@@ -452,7 +452,7 @@ pub fn restore_media(
             Ok(reader) => reader,
         };
 
-        restore_archive(worker, reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?;
+        restore_archive(worker.clone(), reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?;
     }
 
     MediaCatalog::finish_temporary_database(status_path, &media_id.label.uuid, true)?;
@@ -461,7 +461,7 @@ pub fn restore_media(
 }
 
 fn restore_archive<'a>(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     mut reader: Box<dyn 'a + TapeRead>,
     current_file_number: u64,
     target: Option<(&DataStoreMap, &Authid)>,
@@ -520,7 +520,7 @@ fn restore_archive<'a>(
                     if is_new {
                         task_log!(worker, "restore snapshot {}", backup_dir);
 
-                        match restore_snapshot_archive(worker, reader, &path, &datastore, checked_chunks) {
+                        match restore_snapshot_archive(worker.clone(), reader, &path, &datastore, checked_chunks) {
                             Err(err) => {
                                 std::fs::remove_dir_all(&path)?;
                                 bail!("restore snapshot {} failed - {}", backup_dir, err);
@@ -574,9 +574,9 @@ fn restore_archive<'a>(
                     .or_insert(HashSet::new());
 
                 let chunks = if let Some(datastore) = datastore {
-                    restore_chunk_archive(worker, reader, datastore, checked_chunks, verbose)?
+                    restore_chunk_archive(worker.clone(), reader, datastore, checked_chunks, verbose)?
                 } else {
-                    scan_chunk_archive(worker, reader, verbose)?
+                    scan_chunk_archive(worker.clone(), reader, verbose)?
                 };
 
                 if let Some(chunks) = chunks {
@@ -619,7 +619,7 @@ fn restore_archive<'a>(
 
 // Read chunk archive without restoring data - just record contained chunks
 fn scan_chunk_archive<'a>(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     reader: Box<dyn 'a + TapeRead>,
     verbose: bool,
 ) -> Result<Option<Vec<[u8;32]>>, Error> {
@@ -642,7 +642,7 @@ fn scan_chunk_archive<'a>(
 
                 // check if this is an aborted stream without end marker
                 if let Ok(false) = reader.has_end_marker() {
-                    worker.log("missing stream end marker".to_string());
+                    task_log!(worker, "missing stream end marker");
                     return Ok(None);
                 }
 
@@ -664,7 +664,7 @@ fn scan_chunk_archive<'a>(
 }
 
 fn restore_chunk_archive<'a>(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     reader: Box<dyn 'a + TapeRead>,
     datastore: Arc<DataStore>,
     checked_chunks: &mut HashSet<[u8;32]>,
@@ -676,25 +676,38 @@ fn restore_chunk_archive<'a>(
     let mut decoder = ChunkArchiveDecoder::new(reader);
 
     let datastore2 = datastore.clone();
+    let start_time = std::time::SystemTime::now();
+    let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
+    let bytes2 = bytes.clone();
+
+    let worker2 = worker.clone();
+
     let writer_pool = ParallelHandler::new(
         "tape restore chunk writer",
         4,
         move |(chunk, digest): (DataBlob, [u8; 32])| {
-            // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
-            chunk.verify_crc()?;
-            if chunk.crypt_mode()? == CryptMode::None {
-                chunk.decode(None, Some(&digest))?; // verify digest
-            }
+            let chunk_exists = datastore2.cond_touch_chunk(&digest, false)?;
+            if !chunk_exists {
+                if verbose {
+                    task_log!(worker2, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
+                }
+                bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
+                // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
+                chunk.verify_crc()?;
+                if chunk.crypt_mode()? == CryptMode::None {
+                    chunk.decode(None, Some(&digest))?; // verify digest
+                }
 
-            datastore2.insert_chunk(&chunk, &digest)?;
+                datastore2.insert_chunk(&chunk, &digest)?;
+            } else if verbose {
+                task_log!(worker2, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
+            }
             Ok(())
         },
     );
 
     let verify_and_write_channel = writer_pool.channel();
 
-    let start_time = std::time::SystemTime::now();
-    let mut bytes = 0;
 
     loop {
         let (digest, blob) = match decoder.next_chunk() {
@@ -710,7 +723,7 @@ fn restore_chunk_archive<'a>(
 
                 // check if this is an aborted stream without end marker
                 if let Ok(false) = reader.has_end_marker() {
-                    worker.log("missing stream end marker".to_string());
+                    task_log!(worker, "missing stream end marker");
                     return Ok(None);
                 }
 
@@ -721,17 +734,10 @@ fn restore_chunk_archive<'a>(
 
         worker.check_abort()?;
 
-        let chunk_exists = datastore.cond_touch_chunk(&digest, false)?;
-        if !chunk_exists {
-             if verbose {
-                task_log!(worker, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
-            }
-            bytes += blob.raw_size();
+        if !checked_chunks.contains(&digest) {
             verify_and_write_channel.send((blob, digest.clone()))?;
-         } else if verbose {
-            task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
+            checked_chunks.insert(digest.clone());
         }
-        checked_chunks.insert(digest.clone());
         chunks.push(digest);
     }
 
@@ -741,6 +747,8 @@ fn restore_chunk_archive<'a>(
 
     let elapsed = start_time.elapsed()?.as_secs_f64();
 
+    let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+
     task_log!(
         worker,
         "restored {} bytes ({:.2} MB/s)",
@@ -752,7 +760,7 @@ fn restore_chunk_archive<'a>(
 }
 
 fn restore_snapshot_archive<'a>(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     reader: Box<dyn 'a + TapeRead>,
     snapshot_path: &Path,
     datastore: &DataStore,
@@ -782,7 +790,7 @@ fn restore_snapshot_archive<'a>(
 }
 
 fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     decoder: &mut pxar::decoder::sync::Decoder<R>,
     snapshot_path: &Path,
     _datastore: &DataStore,
-- 
2.20.1