all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v3 3/7] api2/tape/restore: add optional snapshots to 'restore'
Date: Thu,  6 May 2021 14:20:04 +0200	[thread overview]
Message-ID: <20210506122008.11297-4-d.csapak@proxmox.com> (raw)
In-Reply-To: <20210506122008.11297-1-d.csapak@proxmox.com>

this makes it possible to only restore some snapshots from a tape media-set
instead of the whole. If the user selects only a small part, this will
probably be faster (and definitely uses less space on the target
datastores).

the user has to provide a list of snapshots to restore in the form of
'store:type/group/id'
e.g. 'mystore:ct/100/2021-01-01T00:00:00Z'

we achieve this by first restoring the index to a temp dir, retrieving
a list of chunks, and using the catalog, we generate a list of
media/files that we need to (partially) restore.

finally, we copy the snapshots to the correct dir in the datastore,
and clean up the temp dir

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/tape/restore.rs | 691 ++++++++++++++++++++++++++++++++++-----
 1 file changed, 608 insertions(+), 83 deletions(-)

diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 9884b379..f5c35e5a 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -1,4 +1,4 @@
-use std::path::Path;
+use std::path::{Path, PathBuf};
 use std::ffi::OsStr;
 use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
@@ -40,6 +40,7 @@ use crate::{
         UPID_SCHEMA,
         Authid,
         Userid,
+        TAPE_RESTORE_SNAPSHOT_SCHEMA,
     },
     config::{
         self,
@@ -51,9 +52,14 @@ use crate::{
         },
     },
     backup::{
+        ArchiveType,
+        archive_type,
+        IndexFile,
         MANIFEST_BLOB_NAME,
         CryptMode,
         DataStore,
+        DynamicIndexReader,
+        FixedIndexReader,
         BackupDir,
         DataBlob,
         BackupManifest,
@@ -69,6 +75,7 @@ use crate::{
         MediaId,
         MediaSet,
         MediaCatalog,
+        MediaSetCatalog,
         Inventory,
         lock_media_set,
         file_formats::{
@@ -95,6 +102,8 @@ use crate::{
     },
 };
 
+const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup";
+
 pub struct DataStoreMap {
     map: HashMap<String, Arc<DataStore>>,
     default: Option<Arc<DataStore>>,
@@ -183,6 +192,7 @@ fn check_datastore_privs(
 
 pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE);
 
+
 #[api(
    input: {
         properties: {
@@ -200,6 +210,14 @@ pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE);
                 type: Userid,
                 optional: true,
             },
+            "snapshots": {
+                description: "List of snapshots.",
+                type: Array,
+                optional: true,
+                items: {
+                    schema: TAPE_RESTORE_SNAPSHOT_SCHEMA,
+                },
+            },
             owner: {
                 type: Authid,
                 optional: true,
@@ -222,6 +240,7 @@ pub fn restore(
     drive: String,
     media_set: String,
     notify_user: Option<Userid>,
+    snapshots: Option<Vec<String>>,
     owner: Option<Authid>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
@@ -266,14 +285,20 @@ pub fn restore(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let taskid = used_datastores
-        .iter()
-        .map(|s| s.to_string())
-        .collect::<Vec<String>>()
-        .join(", ");
+    let (worker_type, task_id) = if snapshots.is_some() {
+        ("tape-restore-single", None)
+    } else {
+        let task_id = used_datastores
+            .iter()
+            .map(|s| s.to_string())
+            .collect::<Vec<String>>()
+            .join(", ");
+        ("tape-restore", Some(task_id))
+    };
+
     let upid_str = WorkerTask::new_thread(
-        "tape-restore",
-        Some(taskid),
+        worker_type,
+        task_id,
         auth_id.clone(),
         to_stdout,
         move |worker| {
@@ -281,100 +306,608 @@ pub fn restore(
 
             set_tape_device_state(&drive, &worker.upid().to_string())?;
 
-            let members = inventory.compute_media_set_members(&media_set_uuid)?;
+            let restore_owner = owner.as_ref().unwrap_or(&auth_id);
 
-            let media_list = members.media_list();
+            let email = notify_user
+                .as_ref()
+                .and_then(|userid| lookup_user_email(userid))
+                .or_else(|| lookup_user_email(&auth_id.clone().into()));
+
+            let res = if let Some(snapshots) = snapshots {
+                restore_single_worker(
+                    worker.clone(),
+                    snapshots,
+                    inventory,
+                    media_set_uuid,
+                    drive_config,
+                    &drive,
+                    store_map,
+                    restore_owner,
+                    email,
+                )
+            } else {
+                task_log!(worker, "Restore mediaset '{}'", media_set);
+                task_log!(worker, "Pool: {}", pool);
+                let res = restore_worker(
+                    worker.clone(),
+                    inventory,
+                    media_set_uuid,
+                    drive_config,
+                    &drive,
+                    store_map,
+                    restore_owner,
+                    email,
+                );
+                task_log!(worker, "Restore mediaset '{}' done", media_set);
+                res
+            };
 
-            let mut media_id_list = Vec::new();
+            if let Err(err) = set_tape_device_state(&drive, "") {
+                task_log!(
+                    worker,
+                    "could not unset drive state for {}: {}",
+                    drive,
+                    err
+                );
+            }
 
-            let mut encryption_key_fingerprint = None;
+            res
+        }
+    )?;
 
-            for (seq_nr, media_uuid) in media_list.iter().enumerate() {
-                match media_uuid {
-                    None => {
-                        bail!("media set {} is incomplete (missing member {}).", media_set_uuid, seq_nr);
-                    }
-                    Some(media_uuid) => {
-                        let media_id = inventory.lookup_media(media_uuid).unwrap();
-                        if let Some(ref set) = media_id.media_set_label { // always true here
-                            if encryption_key_fingerprint.is_none() && set.encryption_key_fingerprint.is_some() {
-                                encryption_key_fingerprint = set.encryption_key_fingerprint.clone();
-                            }
-                        }
-                        media_id_list.push(media_id);
+    Ok(upid_str.into())
+}
+
+fn restore_worker(
+    worker: Arc<WorkerTask>,
+    inventory: Inventory,
+    media_set_uuid: Uuid,
+    drive_config: SectionConfigData,
+    drive_name: &str,
+    store_map: DataStoreMap,
+    restore_owner: &Authid,
+    email: Option<String>,
+) -> Result<(), Error> {
+    let members = inventory.compute_media_set_members(&media_set_uuid)?;
+
+    let media_list = members.media_list();
+
+    let mut media_id_list = Vec::new();
+
+    let mut encryption_key_fingerprint = None;
+
+    for (seq_nr, media_uuid) in media_list.iter().enumerate() {
+        match media_uuid {
+            None => {
+                bail!("media set {} is incomplete (missing member {}).", media_set_uuid, seq_nr);
+            }
+            Some(media_uuid) => {
+                let media_id = inventory.lookup_media(media_uuid).unwrap();
+                if let Some(ref set) = media_id.media_set_label { // always true here
+                    if encryption_key_fingerprint.is_none() && set.encryption_key_fingerprint.is_some() {
+                        encryption_key_fingerprint = set.encryption_key_fingerprint.clone();
                     }
                 }
+                media_id_list.push(media_id);
+            }
+        }
+    }
+
+    if let Some(fingerprint) = encryption_key_fingerprint {
+        task_log!(worker, "Encryption key fingerprint: {}", fingerprint);
+    }
+
+    task_log!(
+        worker,
+        "Datastore(s): {}",
+        store_map
+        .used_datastores()
+        .into_iter()
+        .map(String::from)
+        .collect::<Vec<String>>()
+        .join(", "),
+    );
+
+    task_log!(worker, "Drive: {}", drive_name);
+    task_log!(
+        worker,
+        "Required media list: {}",
+        media_id_list.iter()
+        .map(|media_id| media_id.label.label_text.as_str())
+        .collect::<Vec<&str>>()
+        .join(";")
+    );
+
+    let mut datastore_locks = Vec::new();
+    for store_name in store_map.used_datastores() {
+        // explicit create shared lock to prevent GC on newly created chunks
+        if let Some(store) = store_map.get_datastore(store_name) {
+            let shared_store_lock = store.try_shared_chunk_store_lock()?;
+            datastore_locks.push(shared_store_lock);
+        }
+    }
+
+    let mut checked_chunks_map = HashMap::new();
+
+    for media_id in media_id_list.iter() {
+        request_and_restore_media(
+            worker.clone(),
+            media_id,
+            &drive_config,
+            drive_name,
+            &store_map,
+            &mut checked_chunks_map,
+            restore_owner,
+            &email,
+        )?;
+    }
+
+    Ok(())
+}
+
+fn restore_single_worker(
+    worker: Arc<WorkerTask>,
+    snapshots: Vec<String>,
+    inventory: Inventory,
+    media_set_uuid: Uuid,
+    drive_config: SectionConfigData,
+    drive_name: &str,
+    store_map: DataStoreMap,
+    restore_owner: &Authid,
+    email: Option<String>,
+) -> Result<(), Error> {
+    let base_path: PathBuf = format!("{}/{}", RESTORE_TMP_DIR, media_set_uuid).into();
+    std::fs::create_dir_all(&base_path)?;
+
+    let catalog = get_media_set_catalog(&inventory, &media_set_uuid)?;
+
+    let mut datastore_locks = Vec::new();
+    let mut snapshot_file_list: HashMap<Uuid, Vec<u64>> = HashMap::new();
+    let mut snapshot_locks = HashMap::new();
+
+    let res = proxmox::try_block!({
+        // assemble snapshot files/locks
+        for i in 0..snapshots.len() {
+            let store_snapshot = &snapshots[i];
+            let mut split = snapshots[i].splitn(2, ':');
+            let source_datastore = split
+                .next()
+                .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
+            let snapshot = split
+                .next()
+                .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
+            let backup_dir: BackupDir = snapshot.parse()?;
+
+            let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
+                format_err!(
+                    "could not find mapping for source datastore: {}",
+                    source_datastore
+                )
+            })?;
+
+            let (owner, _group_lock) =
+                datastore.create_locked_backup_group(backup_dir.group(), &restore_owner)?;
+            if restore_owner != &owner {
+                // only the owner is allowed to create additional snapshots
+                bail!(
+                    "restore '{}' failed - owner check failed ({} != {})",
+                    snapshot,
+                    restore_owner,
+                    owner
+                );
             }
 
-            task_log!(worker, "Restore mediaset '{}'", media_set);
-            if let Some(fingerprint) = encryption_key_fingerprint {
-                task_log!(worker, "Encryption key fingerprint: {}", fingerprint);
+            let (media_id, file_num) = if let Some((media_uuid, nr)) =
+                catalog.lookup_snapshot(&source_datastore, &snapshot)
+            {
+                let media_id = inventory.lookup_media(media_uuid).unwrap();
+                (media_id, nr)
+            } else {
+                task_warn!(
+                    worker,
+                    "did not find snapshot '{}' in media set {}",
+                    snapshot,
+                    media_set_uuid
+                );
+                continue;
+            };
+
+            let (_rel_path, is_new, snap_lock) = datastore.create_locked_backup_dir(&backup_dir)?;
+
+            if !is_new {
+                task_log!(
+                    worker,
+                    "found snapshot {} on target datastore, skipping...",
+                    snapshot
+                );
+                continue;
             }
-            task_log!(worker, "Pool: {}", pool);
-            task_log!(
-                worker,
-                "Datastore(s): {}",
-                store_map
-                    .used_datastores()
-                    .into_iter()
-                    .map(String::from)
-                    .collect::<Vec<String>>()
-                    .join(", "),
-            );
 
-            task_log!(worker, "Drive: {}", drive);
+            snapshot_locks.insert(store_snapshot.to_string(), snap_lock);
+
+            let shared_store_lock = datastore.try_shared_chunk_store_lock()?;
+            datastore_locks.push(shared_store_lock);
+
+            let file_list = snapshot_file_list
+                .entry(media_id.label.uuid.clone())
+                .or_insert_with(Vec::new);
+            file_list.push(file_num);
+
             task_log!(
                 worker,
-                "Required media list: {}",
-                media_id_list.iter()
-                    .map(|media_id| media_id.label.label_text.as_str())
-                    .collect::<Vec<&str>>()
-                    .join(";")
+                "found snapshot {} on {}: file {}",
+                snapshot,
+                media_id.label.label_text,
+                file_num
             );
+        }
+
+        if snapshot_file_list.is_empty() {
+            task_log!(worker, "nothing to restore, skipping remaining phases...");
+            return Ok(());
+        }
 
-            let mut datastore_locks = Vec::new();
-            for store_name in store_map.used_datastores() {
-                // explicit create shared lock to prevent GC on newly created chunks
-                if let Some(store) = store_map.get_datastore(store_name) {
-                    let shared_store_lock = store.try_shared_chunk_store_lock()?;
-                    datastore_locks.push(shared_store_lock);
+        task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir");
+        let mut chunks_list: HashMap<String, HashSet<[u8; 32]>> = HashMap::new();
+        for (media_uuid, file_list) in snapshot_file_list.iter_mut() {
+            let media_id = inventory.lookup_media(media_uuid).unwrap();
+            let (drive, info) = request_and_load_media(
+                &worker,
+                &drive_config,
+                &drive_name,
+                &media_id.label,
+                &email,
+            )?;
+            file_list.sort_unstable();
+            restore_snapshots_to_tmpdir(
+                worker.clone(),
+                &base_path,
+                file_list,
+                drive,
+                &info,
+                &media_set_uuid,
+                &mut chunks_list,
+            )?;
+        }
+
+        let mut media_list: HashMap<Uuid, HashMap<u64, HashSet<[u8; 32]>>> = HashMap::new();
+
+        for (source_datastore, chunks) in chunks_list.into_iter() {
+            let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
+                format_err!(
+                    "could not find mapping for source datastore: {}",
+                    source_datastore
+                )
+            })?;
+            for digest in chunks.into_iter() {
+                // we only want to restore chunks that we do not have yet
+                if !datastore.cond_touch_chunk(&digest, false)? {
+                    if let Some((uuid, nr)) = catalog.lookup_chunk(&source_datastore, &digest) {
+                        let file = media_list.entry(uuid.clone()).or_insert_with(HashMap::new);
+                        let chunks = file.entry(nr).or_insert_with(HashSet::new);
+                        chunks.insert(digest);
+                    }
                 }
             }
+        }
 
-            let mut checked_chunks_map = HashMap::new();
+        // we do not need it anymore, saves memory
+        drop(catalog);
 
-            for media_id in media_id_list.iter() {
-                request_and_restore_media(
-                    worker.clone(),
-                    media_id,
-                    &drive_config,
-                    &drive,
-                    &store_map,
-                    &mut checked_chunks_map,
-                    &auth_id,
-                    &notify_user,
-                    &owner,
-                )?;
+        if !media_list.is_empty() {
+            task_log!(worker, "Phase 2: restore chunks to datastores");
+        } else {
+            task_log!(worker, "all chunks exist already, skipping phase 2...");
+        }
+
+        for (media_uuid, file_list) in media_list.iter_mut() {
+            let media_id = inventory.lookup_media(media_uuid).unwrap();
+            let (mut drive, _info) = request_and_load_media(
+                &worker,
+                &drive_config,
+                &drive_name,
+                &media_id.label,
+                &email,
+            )?;
+            let mut files: Vec<u64> = file_list.keys().map(|v| *v).collect();
+            files.sort();
+            restore_chunk_file_list(worker.clone(), &mut drive, &files[..], &store_map, file_list)?;
+        }
+
+        task_log!(
+            worker,
+            "Phase 3: copy snapshots from temp dir to datastores"
+        );
+        for (store_snapshot, _lock) in snapshot_locks.into_iter() {
+            let mut split = store_snapshot.splitn(2, ':');
+            let source_datastore = split
+                .next()
+                .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
+            let snapshot = split
+                .next()
+                .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
+            let backup_dir: BackupDir = snapshot.parse()?;
+
+            let datastore = store_map
+                .get_datastore(&source_datastore)
+                .ok_or_else(|| format_err!("unexpected source datastore: {}", source_datastore))?;
+
+            let mut tmp_path = base_path.clone();
+            tmp_path.push(&source_datastore);
+            tmp_path.push(snapshot);
+
+            let path = datastore.snapshot_path(&backup_dir);
+
+            for entry in std::fs::read_dir(tmp_path)? {
+                let entry = entry?;
+                let mut new_path = path.clone();
+                new_path.push(entry.file_name());
+                std::fs::copy(entry.path(), new_path)?;
             }
+            task_log!(worker, "Restore snapshot '{}' done", snapshot);
+        }
+        Ok(())
+    });
+
+    match std::fs::remove_dir_all(&base_path) {
+        Ok(()) => {}
+        Err(err) => task_warn!(worker, "error cleaning up: {}", err),
+    }
 
-            drop(datastore_locks);
+    res
+}
 
-            task_log!(worker, "Restore mediaset '{}' done", media_set);
+fn get_media_set_catalog(
+    inventory: &Inventory,
+    media_set_uuid: &Uuid,
+) -> Result<MediaSetCatalog, Error> {
+    let status_path = Path::new(TAPE_STATUS_DIR);
+
+    let members = inventory.compute_media_set_members(media_set_uuid)?;
+    let media_list = members.media_list();
+    let mut catalog = MediaSetCatalog::new();
+
+    for (seq_nr, media_uuid) in media_list.iter().enumerate() {
+        match media_uuid {
+            None => {
+                bail!(
+                    "media set {} is incomplete (missing member {}).",
+                    media_set_uuid,
+                    seq_nr
+                );
+            }
+            Some(media_uuid) => {
+                let media_id = inventory.lookup_media(media_uuid).unwrap();
+                let media_catalog = MediaCatalog::open(status_path, &media_id, false, false)?;
+                catalog.append_catalog(media_catalog)?;
+            }
+        }
+    }
+
+    Ok(catalog)
+}
+
+fn restore_snapshots_to_tmpdir(
+    worker: Arc<WorkerTask>,
+    path: &PathBuf,
+    file_list: &[u64],
+    mut drive: Box<dyn TapeDriver>,
+    media_id: &MediaId,
+    media_set_uuid: &Uuid,
+    chunks_list: &mut HashMap<String, HashSet<[u8; 32]>>,
+) -> Result<(), Error> {
+    match media_id.media_set_label {
+        None => {
+            bail!(
+                "missing media set label on media {} ({})",
+                media_id.label.label_text,
+                media_id.label.uuid
+            );
+        }
+        Some(ref set) => {
+            if set.uuid != *media_set_uuid {
+                bail!(
+                    "wrong media set label on media {} ({} != {})",
+                    media_id.label.label_text,
+                    media_id.label.uuid,
+                    media_set_uuid
+                );
+            }
+            let encrypt_fingerprint = set.encryption_key_fingerprint.clone().map(|fp| {
+                task_log!(worker, "Encryption key fingerprint: {}", fp);
+                (fp, set.uuid.clone())
+            });
+
+            drive.set_encryption(encrypt_fingerprint)?;
+        }
+    }
+
+    for file_num in file_list {
+        drive.move_to_file(*file_num)?;
+        let mut reader = drive.read_next_file()?;
+
+        let header: MediaContentHeader = unsafe { reader.read_le_value()? };
+        if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
+            bail!("missing MediaContentHeader");
+        }
+
+        match header.content_magic {
+            PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
+                let header_data = reader.read_exact_allocated(header.size as usize)?;
+
+                let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
+                    .map_err(|err| {
+                        format_err!("unable to parse snapshot archive header - {}", err)
+                    })?;
+
+                let source_datastore = archive_header.store;
+                let snapshot = archive_header.snapshot;
 
-            if let Err(err) = set_tape_device_state(&drive, "") {
                 task_log!(
                     worker,
-                    "could not unset drive state for {}: {}",
-                    drive,
-                    err
+                    "File {}: snapshot archive {}:{}",
+                    file_num,
+                    source_datastore,
+                    snapshot
+                );
+
+                let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
+
+                let mut tmp_path = path.clone();
+                tmp_path.push(&source_datastore);
+                tmp_path.push(snapshot);
+                std::fs::create_dir_all(&tmp_path)?;
+
+                let chunks = chunks_list
+                    .entry(source_datastore)
+                    .or_insert_with(HashSet::new);
+                let manifest = try_restore_snapshot_archive(worker.clone(), &mut decoder, &tmp_path)?;
+                for item in manifest.files() {
+                    let mut archive_path = tmp_path.to_owned();
+                    archive_path.push(&item.filename);
+
+                    let index: Box<dyn IndexFile> = match archive_type(&item.filename)? {
+                        ArchiveType::DynamicIndex => {
+                            Box::new(DynamicIndexReader::open(&archive_path)?)
+                        }
+                        ArchiveType::FixedIndex => {
+                            Box::new(FixedIndexReader::open(&archive_path)?)
+                        }
+                        ArchiveType::Blob => continue,
+                    };
+                    for i in 0..index.index_count() {
+                        if let Some(digest) = index.index_digest(i) {
+                            chunks.insert(*digest);
+                        }
+                    }
+                }
+            }
+            _ => bail!("unexpected file type"),
+        }
+    }
+
+    Ok(())
+}
+
+fn restore_chunk_file_list(
+    worker: Arc<WorkerTask>,
+    drive: &mut Box<dyn TapeDriver>,
+    file_list: &[u64],
+    store_map: &DataStoreMap,
+    chunk_list: &mut HashMap<u64, HashSet<[u8; 32]>>,
+) -> Result<(), Error> {
+    for nr in file_list {
+        let current_file_number = drive.current_file_number()?;
+        if current_file_number != *nr {
+            drive.move_to_file(*nr)?;
+        }
+        let mut reader = drive.read_next_file()?;
+        let header: MediaContentHeader = unsafe { reader.read_le_value()? };
+        if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
+            bail!("missing MediaContentHeader");
+        }
+
+        match header.content_magic {
+            PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
+                let header_data = reader.read_exact_allocated(header.size as usize)?;
+
+                let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
+                    .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
+
+                let source_datastore = archive_header.store;
+
+                task_log!(
+                    worker,
+                    "File {}: chunk archive for datastore '{}'",
+                    nr,
+                    source_datastore
                 );
+
+                let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
+                    format_err!("unexpected chunk archive for store: {}", source_datastore)
+                })?;
+
+                let chunks = chunk_list
+                    .get_mut(nr)
+                    .ok_or_else(|| format_err!("undexpected file nr: {}", nr))?;
+
+                let count = restore_partial_chunk_archive(worker.clone(), reader, datastore.clone(), chunks)?;
+                task_log!(worker, "restored {} chunks", count);
             }
+            _ => bail!("unexpected content magic {:?}", header.content_magic),
+        }
+    }
 
+    Ok(())
+}
+
+fn restore_partial_chunk_archive<'a>(
+    worker: Arc<WorkerTask>,
+    reader: Box<dyn 'a + TapeRead>,
+    datastore: Arc<DataStore>,
+    chunk_list: &mut HashSet<[u8; 32]>,
+) -> Result<usize, Error> {
+    let mut decoder = ChunkArchiveDecoder::new(reader);
+
+    let mut count = 0;
+
+    let start_time = std::time::SystemTime::now();
+    let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
+    let bytes2 = bytes.clone();
+
+    let writer_pool = ParallelHandler::new(
+        "tape restore chunk writer",
+        4,
+        move |(chunk, digest): (DataBlob, [u8; 32])| {
+            if !datastore.cond_touch_chunk(&digest, false)? {
+                bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
+                chunk.verify_crc()?;
+                if chunk.crypt_mode()? == CryptMode::None {
+                    chunk.decode(None, Some(&digest))?; // verify digest
+                }
+
+                datastore.insert_chunk(&chunk, &digest)?;
+            }
             Ok(())
+        },
+    );
+
+    let verify_and_write_channel = writer_pool.channel();
+
+    loop {
+        let (digest, blob) = match decoder.next_chunk()? {
+            Some((digest, blob)) => (digest, blob),
+            None => break,
+        };
+
+        worker.check_abort()?;
+
+        if chunk_list.remove(&digest) {
+            verify_and_write_channel.send((blob, digest.clone()))?;
+            count += 1;
         }
-    )?;
 
-    Ok(upid_str.into())
+        if chunk_list.is_empty() {
+            break;
+        }
+    }
+
+    drop(verify_and_write_channel);
+
+    writer_pool.complete()?;
+
+    let elapsed = start_time.elapsed()?.as_secs_f64();
+
+    let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+
+    task_log!(
+        worker,
+        "restored {} bytes ({:.2} MB/s)",
+        bytes,
+        (bytes as f64) / (1_000_000.0 * elapsed)
+    );
+
+    Ok(count)
 }
 
 /// Request and restore complete media without using existing catalog (create catalog instead)
@@ -385,21 +918,15 @@ pub fn request_and_restore_media(
     drive_name: &str,
     store_map: &DataStoreMap,
     checked_chunks_map: &mut HashMap<String, HashSet<[u8;32]>>,
-    authid: &Authid,
-    notify_user: &Option<Userid>,
-    owner: &Option<Authid>,
+    restore_owner: &Authid,
+    email: &Option<String>,
 ) -> Result<(), Error> {
     let media_set_uuid = match media_id.media_set_label {
         None => bail!("restore_media: no media set - internal error"),
         Some(ref set) => &set.uuid,
     };
 
-    let email = notify_user
-        .as_ref()
-        .and_then(|userid| lookup_user_email(userid))
-        .or_else(|| lookup_user_email(&authid.clone().into()));
-
-    let (mut drive, info) = request_and_load_media(&worker, &drive_config, &drive_name, &media_id.label, &email)?;
+    let (mut drive, info) = request_and_load_media(&worker, &drive_config, &drive_name, &media_id.label, email)?;
 
     match info.media_set_label {
         None => {
@@ -419,8 +946,6 @@ pub fn request_and_restore_media(
         }
     }
 
-    let restore_owner = owner.as_ref().unwrap_or(authid);
-
     restore_media(
         worker,
         &mut drive,
-- 
2.20.1





  parent reply	other threads:[~2021-05-06 12:20 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-05-06 12:20 [pbs-devel] [PATCH proxmox-backup v3 0/7] tape: single snapshot restore Dominik Csapak
2021-05-06 12:20 ` [pbs-devel] [PATCH proxmox-backup v3 1/7] api2/tape/restore: return backup manifest in try_restore_snapshot_archive Dominik Csapak
2021-05-07 10:49   ` [pbs-devel] applied: " Dietmar Maurer
2021-05-06 12:20 ` [pbs-devel] [PATCH proxmox-backup v3 2/7] api2/types: add TAPE_RESTORE_SNAPSHOT_SCHEMA Dominik Csapak
2021-05-07 10:51   ` [pbs-devel] applied: " Dietmar Maurer
2021-05-06 12:20 ` Dominik Csapak [this message]
2021-05-06 12:20 ` [pbs-devel] [PATCH proxmox-backup v3 4/7] tape/inventory: add completion helper for tape snapshots Dominik Csapak
2021-05-06 12:20 ` [pbs-devel] [PATCH proxmox-backup v3 5/7] bin/proxmox-tape: add optional snapshots to restore command Dominik Csapak
2021-05-06 12:20 ` [pbs-devel] [PATCH proxmox-backup v3 6/7] ui: tape: add single snapshot restore Dominik Csapak
2021-05-06 12:20 ` [pbs-devel] [PATCH proxmox-backup v3 7/7] docs/api-viewer: improve rendering of array format Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210506122008.11297-4-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal