all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup 1/3] pull: rustfmt
@ 2021-01-15 10:48 Fabian Grünbichler
  2021-01-15 10:48 ` [pbs-devel] [PATCH proxmox-backup 2/3] pull: add error context for initial group list call Fabian Grünbichler
                   ` (2 more replies)
  0 siblings, 3 replies; 7+ messages in thread
From: Fabian Grünbichler @ 2021-01-15 10:48 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
this module should be fairly quiet at the moment, so a good opportunity
to get this out of the way ;)

 src/client/pull.rs | 250 ++++++++++++++++++++++++++++++---------------
 1 file changed, 168 insertions(+), 82 deletions(-)

diff --git a/src/client/pull.rs b/src/client/pull.rs
index ed8256fa..f6ef7cde 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -2,22 +2,21 @@
 
 use anyhow::{bail, format_err, Error};
 use serde_json::json;
+use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
-use std::sync::{Arc, Mutex};
-use std::collections::{HashSet, HashMap};
 use std::io::{Seek, SeekFrom};
-use std::time::SystemTime;
 use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::SystemTime;
 
-use proxmox::api::error::{StatusCode, HttpError};
 use crate::{
-    tools::{ParallelHandler, compute_file_csum},
-    server::WorkerTask,
-    backup::*,
     api2::types::*,
+    backup::*,
     client::*,
+    server::WorkerTask,
+    tools::{compute_file_csum, ParallelHandler},
 };
-
+use proxmox::api::error::{HttpError, StatusCode};
 
 // fixme: implement filters
 // fixme: delete vanished groups
@@ -28,9 +27,8 @@ async fn pull_index_chunks<I: IndexFile>(
     chunk_reader: RemoteChunkReader,
     target: Arc<DataStore>,
     index: I,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     use futures::stream::{self, StreamExt, TryStreamExt};
 
     let start_time = SystemTime::now();
@@ -47,18 +45,19 @@ async fn pull_index_chunks<I: IndexFile>(
                     guard.insert(info.digest);
                 }
                 !done
-            })
+            }),
     );
 
     let target2 = target.clone();
     let verify_pool = ParallelHandler::new(
-        "sync chunk writer", 4,
-        move |(chunk, digest, size): (DataBlob, [u8;32], u64)|  {
+        "sync chunk writer",
+        4,
+        move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
             // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
             target2.insert_chunk(&chunk, &digest)?;
             Ok(())
-       }
+        },
     );
 
     let verify_and_write_channel = verify_pool.channel();
@@ -67,14 +66,15 @@ async fn pull_index_chunks<I: IndexFile>(
 
     stream
         .map(|info| {
-
             let target = Arc::clone(&target);
             let chunk_reader = chunk_reader.clone();
             let bytes = Arc::clone(&bytes);
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
-                let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
+                let chunk_exists = crate::tools::runtime::block_in_place(|| {
+                    target.cond_touch_chunk(&info.digest, false)
+                })?;
                 if chunk_exists {
                     //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
                     return Ok::<_, Error>(());
@@ -84,12 +84,14 @@ async fn pull_index_chunks<I: IndexFile>(
                 let raw_size = chunk.raw_size() as usize;
 
                 // decode, verify and write in a separate threads to maximize throughput
-                crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?;
+                crate::tools::runtime::block_in_place(|| {
+                    verify_and_write_channel.send((chunk, info.digest, info.size()))
+                })?;
 
                 bytes.fetch_add(raw_size, Ordering::SeqCst);
 
                 Ok(())
-           })
+            })
         })
         .try_buffer_unordered(20)
         .try_for_each(|_res| futures::future::ok(()))
@@ -103,7 +105,11 @@ async fn pull_index_chunks<I: IndexFile>(
 
     let bytes = bytes.load(Ordering::SeqCst);
 
-    worker.log(format!("downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed)));
+    worker.log(format!(
+        "downloaded {} bytes ({:.2} MiB/s)",
+        bytes,
+        (bytes as f64) / (1024.0 * 1024.0 * elapsed)
+    ));
 
     Ok(())
 }
@@ -112,7 +118,6 @@ async fn download_manifest(
     reader: &BackupReader,
     filename: &std::path::Path,
 ) -> Result<std::fs::File, Error> {
-
     let mut tmp_manifest_file = std::fs::OpenOptions::new()
         .write(true)
         .create(true)
@@ -120,20 +125,23 @@ async fn download_manifest(
         .read(true)
         .open(&filename)?;
 
-    reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?;
+    reader
+        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
+        .await?;
 
     tmp_manifest_file.seek(SeekFrom::Start(0))?;
 
     Ok(tmp_manifest_file)
 }
 
-fn verify_archive(
-    info: &FileInfo,
-    csum: &[u8; 32],
-    size: u64,
-) -> Result<(), Error> {
+fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
     if size != info.size {
-        bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size);
+        bail!(
+            "wrong size for file '{}' ({} != {})",
+            info.filename,
+            info.size,
+            size
+        );
     }
 
     if csum != &info.csum {
@@ -150,9 +158,8 @@ async fn pull_single_archive(
     tgt_store: Arc<DataStore>,
     snapshot: &BackupDir,
     archive_info: &FileInfo,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     let archive_name = &archive_info.filename;
     let mut path = tgt_store.base_path();
     path.push(snapshot.relative_path());
@@ -172,20 +179,36 @@ async fn pull_single_archive(
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
-            let index = DynamicIndexReader::new(tmpfile)
-                .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
+            let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
+                format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+            })?;
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
+            pull_index_chunks(
+                worker,
+                chunk_reader.clone(),
+                tgt_store.clone(),
+                index,
+                downloaded_chunks,
+            )
+            .await?;
         }
         ArchiveType::FixedIndex => {
-            let index = FixedIndexReader::new(tmpfile)
-                .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
+            let index = FixedIndexReader::new(tmpfile).map_err(|err| {
+                format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+            })?;
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
+            pull_index_chunks(
+                worker,
+                chunk_reader.clone(),
+                tgt_store.clone(),
+                index,
+                downloaded_chunks,
+            )
+            .await?;
         }
         ArchiveType::Blob => {
             let (csum, size) = compute_file_csum(&mut tmpfile)?;
@@ -205,7 +228,6 @@ async fn try_client_log_download(
     reader: Arc<BackupReader>,
     path: &std::path::Path,
 ) -> Result<(), Error> {
-
     let mut tmp_path = path.to_owned();
     tmp_path.set_extension("tmp");
 
@@ -231,9 +253,8 @@ async fn pull_snapshot(
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
     snapshot: &BackupDir,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     let mut manifest_name = tgt_store.base_path();
     manifest_name.push(snapshot.relative_path());
     manifest_name.push(MANIFEST_BLOB_NAME);
@@ -250,34 +271,45 @@ async fn pull_snapshot(
         Ok(manifest_file) => manifest_file,
         Err(err) => {
             match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => {
-                    match *code {
-                        StatusCode::NOT_FOUND => {
-                            worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot));
-                            return Ok(());
-                        },
-                        _ => {
-                            bail!("HTTP error {} - {}", code, message);
-                        },
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        worker.log(format!(
+                            "skipping snapshot {} - vanished since start of sync",
+                            snapshot
+                        ));
+                        return Ok(());
+                    }
+                    _ => {
+                        bail!("HTTP error {} - {}", code, message);
                     }
                 },
                 None => {
                     return Err(err);
-                },
+                }
             };
-        },
+        }
     };
     let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
 
     if manifest_name.exists() {
         let manifest_blob = proxmox::try_block!({
-            let mut manifest_file = std::fs::File::open(&manifest_name)
-                .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
+            let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
+                format_err!(
+                    "unable to open local manifest {:?} - {}",
+                    manifest_name,
+                    err
+                )
+            })?;
 
             let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
             Ok(manifest_blob)
-        }).map_err(|err: Error| {
-            format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
+        })
+        .map_err(|err: Error| {
+            format_err!(
+                "unable to read local manifest {:?} - {}",
+                manifest_name,
+                err
+            )
         })?;
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
@@ -332,7 +364,12 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new());
+        let mut chunk_reader = RemoteChunkReader::new(
+            reader.clone(),
+            None,
+            item.chunk_crypt_mode(),
+            HashMap::new(),
+        );
 
         pull_single_archive(
             worker,
@@ -342,7 +379,8 @@ async fn pull_snapshot(
             snapshot,
             &item,
             downloaded_chunks.clone(),
-        ).await?;
+        )
+        .await?;
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -364,15 +402,22 @@ pub async fn pull_snapshot_from(
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
     snapshot: &BackupDir,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
 
     if is_new {
         worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
 
-        if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
+        if let Err(err) = pull_snapshot(
+            worker,
+            reader,
+            tgt_store.clone(),
+            &snapshot,
+            downloaded_chunks,
+        )
+        .await
+        {
             if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
                 worker.log(format!("cleanup error - {}", cleanup_err));
             }
@@ -381,8 +426,18 @@ pub async fn pull_snapshot_from(
         worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
     } else {
         worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
-        pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
-        worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
+        pull_snapshot(
+            worker,
+            reader,
+            tgt_store.clone(),
+            &snapshot,
+            downloaded_chunks,
+        )
+        .await?;
+        worker.log(format!(
+            "re-sync snapshot {:?} done",
+            snapshot.relative_path()
+        ));
     }
 
     Ok(())
@@ -397,7 +452,6 @@ pub async fn pull_group(
     delete: bool,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-
     let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
 
     let args = json!({
@@ -419,7 +473,7 @@ pub async fn pull_group(
     let mut remote_snapshots = std::collections::HashSet::new();
 
     // start with 16384 chunks (up to 65GB)
-    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
+    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
     progress.group_snapshots = list.len() as u64;
 
@@ -428,7 +482,10 @@ pub async fn pull_group(
 
         // in-progress backups can't be synced
         if item.size.is_none() {
-            worker.log(format!("skipping snapshot {} - in-progress backup", snapshot));
+            worker.log(format!(
+                "skipping snapshot {} - in-progress backup",
+                snapshot
+            ));
             continue;
         }
 
@@ -437,7 +494,9 @@ pub async fn pull_group(
         remote_snapshots.insert(backup_time);
 
         if let Some(last_sync_time) = last_sync {
-            if last_sync_time > backup_time { continue; }
+            if last_sync_time > backup_time {
+                continue;
+            }
         }
 
         // get updated auth_info (new tickets)
@@ -447,7 +506,12 @@ pub async fn pull_group(
             .password(Some(auth_info.ticket.clone()))
             .fingerprint(fingerprint.clone());
 
-        let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.auth_id(), options)?;
+        let new_client = HttpClient::new(
+            src_repo.host(),
+            src_repo.port(),
+            src_repo.auth_id(),
+            options,
+        )?;
 
         let reader = BackupReader::start(
             new_client,
@@ -457,9 +521,17 @@ pub async fn pull_group(
             snapshot.group().backup_id(),
             backup_time,
             true,
-        ).await?;
+        )
+        .await?;
 
-        let result = pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await;
+        let result = pull_snapshot_from(
+            worker,
+            reader,
+            tgt_store.clone(),
+            &snapshot,
+            downloaded_chunks.clone(),
+        )
+        .await;
 
         progress.done_snapshots = pos as u64 + 1;
         worker.log(format!("percentage done: {}", progress));
@@ -471,8 +543,13 @@ pub async fn pull_group(
         let local_list = group.list_backups(&tgt_store.base_path())?;
         for info in local_list {
             let backup_time = info.backup_dir.backup_time();
-            if remote_snapshots.contains(&backup_time) { continue; }
-            worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
+            if remote_snapshots.contains(&backup_time) {
+                continue;
+            }
+            worker.log(format!(
+                "delete vanished snapshot {:?}",
+                info.backup_dir.relative_path()
+            ));
             tgt_store.remove_backup_dir(&info.backup_dir, false)?;
         }
     }
@@ -488,7 +565,6 @@ pub async fn pull_store(
     delete: bool,
     auth_id: Authid,
 ) -> Result<(), Error> {
-
     // explicit create shared lock to prevent GC on newly created chunks
     let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
 
@@ -528,19 +604,23 @@ pub async fn pull_store(
         let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
             Ok(result) => result,
             Err(err) => {
-                worker.log(format!("sync group {}/{} failed - group lock failed: {}",
-                                   item.backup_type, item.backup_id, err));
+                worker.log(format!(
+                    "sync group {}/{} failed - group lock failed: {}",
+                    item.backup_type, item.backup_id, err
+                ));
                 errors = true; // do not stop here, instead continue
                 continue;
             }
         };
 
         // permission check
-        if auth_id != owner { // only the owner is allowed to create additional snapshots
-            worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})",
-                               item.backup_type, item.backup_id, auth_id, owner));
+        if auth_id != owner {
+            // only the owner is allowed to create additional snapshots
+            worker.log(format!(
+                "sync group {}/{} failed - owner check failed ({} != {})",
+                item.backup_type, item.backup_id, auth_id, owner
+            ));
             errors = true; // do not stop here, instead continue
-
         } else if let Err(err) = pull_group(
             worker,
             client,
@@ -549,12 +629,12 @@ pub async fn pull_store(
             &group,
             delete,
             &mut progress,
-        ).await {
+        )
+        .await
+        {
             worker.log(format!(
                 "sync group {}/{} failed - {}",
-                item.backup_type,
-                item.backup_id,
-                err,
+                item.backup_type, item.backup_id, err,
             ));
             errors = true; // do not stop here, instead continue
         }
@@ -564,8 +644,14 @@ pub async fn pull_store(
         let result: Result<(), Error> = proxmox::try_block!({
             let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
             for local_group in local_groups {
-                if new_groups.contains(&local_group) { continue; }
-                worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
+                if new_groups.contains(&local_group) {
+                    continue;
+                }
+                worker.log(format!(
+                    "delete vanished group '{}/{}'",
+                    local_group.backup_type(),
+                    local_group.backup_id()
+                ));
                 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
                     worker.log(err.to_string());
                     errors = true;
-- 
2.20.1





^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2021-01-18  8:36 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-15 10:48 [pbs-devel] [PATCH proxmox-backup 1/3] pull: rustfmt Fabian Grünbichler
2021-01-15 10:48 ` [pbs-devel] [PATCH proxmox-backup 2/3] pull: add error context for initial group list call Fabian Grünbichler
2021-01-18  5:52   ` [pbs-devel] applied: " Thomas Lamprecht
2021-01-15 10:48 ` [pbs-devel] [PATCH proxmox-backup 3/3] pull: only remove owned groups Fabian Grünbichler
2021-01-18  5:57   ` Thomas Lamprecht
2021-01-18  8:35     ` Fabian Grünbichler
2021-01-18  5:52 ` [pbs-devel] applied: [PATCH proxmox-backup 1/3] pull: rustfmt Thomas Lamprecht

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal