From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 0F9B37AB45 for ; Mon, 10 May 2021 11:28:36 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id D70531611A for ; Mon, 10 May 2021 11:28:05 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 8C5BB160A6 for ; Mon, 10 May 2021 11:28:02 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 6770D42E1C for ; Mon, 10 May 2021 11:28:02 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Mon, 10 May 2021 11:27:56 +0200 Message-Id: <20210510092800.17956-3-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210510092800.17956-1-d.csapak@proxmox.com> References: <20210510092800.17956-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.016 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup v4 2/6] api2/tape/restore: add optional snapshots to 'restore' X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 10 May 2021 09:28:36 -0000 this makes it possible to only restore some snapshots from a tape media-set instead of the whole. If the user selects only a small part, this will probably be faster (and definitely uses less space on the target datastores). the user has to provide a list of snapshots to restore in the form of 'store:type/group/id' e.g. 'mystore:ct/100/2021-01-01T00:00:00Z' we achieve this by first restoring the index to a temp dir, retrieving a list of chunks, and using the catalog, we generate a list of media/files that we need to (partially) restore. finally, we copy the snapshots to the correct dir in the datastore, and clean up the temp dir Signed-off-by: Dominik Csapak --- src/api2/tape/restore.rs | 550 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 527 insertions(+), 23 deletions(-) diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index d1298434..ede2e82b 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use std::ffi::OsStr; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; @@ -40,6 +40,7 @@ use crate::{ UPID_SCHEMA, Authid, Userid, + TAPE_RESTORE_SNAPSHOT_SCHEMA, }, config::{ self, @@ -51,9 +52,14 @@ use crate::{ }, }, backup::{ + ArchiveType, + archive_type, + IndexFile, MANIFEST_BLOB_NAME, CryptMode, DataStore, + DynamicIndexReader, + FixedIndexReader, BackupDir, DataBlob, BackupManifest, @@ -69,6 +75,7 @@ use crate::{ MediaId, MediaSet, MediaCatalog, + MediaSetCatalog, Inventory, lock_media_set, file_formats::{ @@ -95,6 +102,8 @@ use crate::{ }, }; +const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup"; + pub struct DataStoreMap { map: HashMap>, default: Option>, @@ -200,6 +209,14 @@ pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE); type: Userid, optional: true, }, + "snapshots": { + description: "List of snapshots.", + type: Array, + optional: true, + items: { + schema: TAPE_RESTORE_SNAPSHOT_SCHEMA, + }, + }, owner: { type: Authid, optional: true, @@ -222,6 +239,7 @@ pub fn restore( drive: String, media_set: String, notify_user: Option, + snapshots: Option>, owner: Option, rpcenv: &mut dyn RpcEnvironment, ) -> Result { @@ -266,14 +284,20 @@ pub fn restore( let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let taskid = used_datastores - .iter() - .map(|s| s.to_string()) - .collect::>() - .join(", "); + let (worker_type, task_id) = if snapshots.is_some() { + ("tape-restore-single", None) + } else { + let task_id = used_datastores + .iter() + .map(|s| s.to_string()) + .collect::>() + .join(", "); + ("tape-restore", Some(task_id)) + }; + let upid_str = WorkerTask::new_thread( - "tape-restore", - Some(taskid), + worker_type, + task_id, auth_id.clone(), to_stdout, move |worker| { @@ -288,20 +312,34 @@ pub fn restore( .and_then(|userid| lookup_user_email(userid)) .or_else(|| lookup_user_email(&auth_id.clone().into())); - - task_log!(worker, "Restore mediaset '{}'", media_set); - task_log!(worker, "Pool: {}", pool); - let res = restore_worker( - worker.clone(), - inventory, - media_set_uuid, - drive_config, - &drive, - store_map, - restore_owner, - email - ); - task_log!(worker, "Restore mediaset '{}' done", media_set); + let res = if let Some(snapshots) = snapshots { + restore_list_worker( + worker.clone(), + snapshots, + inventory, + media_set_uuid, + drive_config, + &drive, + store_map, + restore_owner, + email, + ) + } else { + task_log!(worker, "Restore mediaset '{}'", media_set); + task_log!(worker, "Pool: {}", pool); + let res = restore_full_worker( + worker.clone(), + inventory, + media_set_uuid, + drive_config, + &drive, + store_map, + restore_owner, + email, + ); + task_log!(worker, "Restore mediaset '{}' done", media_set); + res + }; if let Err(err) = set_tape_device_state(&drive, "") { task_log!( @@ -319,7 +357,7 @@ pub fn restore( Ok(upid_str.into()) } -fn restore_worker( +fn restore_full_worker( worker: Arc, inventory: Inventory, media_set_uuid: Uuid, @@ -406,6 +444,472 @@ fn restore_worker( Ok(()) } +fn restore_list_worker( + worker: Arc, + snapshots: Vec, + inventory: Inventory, + media_set_uuid: Uuid, + drive_config: SectionConfigData, + drive_name: &str, + store_map: DataStoreMap, + restore_owner: &Authid, + email: Option, +) -> Result<(), Error> { + let base_path: PathBuf = format!("{}/{}", RESTORE_TMP_DIR, media_set_uuid).into(); + std::fs::create_dir_all(&base_path)?; + + let catalog = get_media_set_catalog(&inventory, &media_set_uuid)?; + + let mut datastore_locks = Vec::new(); + let mut snapshot_file_list: HashMap> = HashMap::new(); + let mut snapshot_locks = HashMap::new(); + + let res = proxmox::try_block!({ + // assemble snapshot files/locks + for i in 0..snapshots.len() { + let store_snapshot = &snapshots[i]; + let mut split = snapshots[i].splitn(2, ':'); + let source_datastore = split + .next() + .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?; + let snapshot = split + .next() + .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?; + let backup_dir: BackupDir = snapshot.parse()?; + + let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| { + format_err!( + "could not find mapping for source datastore: {}", + source_datastore + ) + })?; + + let (owner, _group_lock) = + datastore.create_locked_backup_group(backup_dir.group(), &restore_owner)?; + if restore_owner != &owner { + // only the owner is allowed to create additional snapshots + bail!( + "restore '{}' failed - owner check failed ({} != {})", + snapshot, + restore_owner, + owner + ); + } + + let (media_id, file_num) = if let Some((media_uuid, nr)) = + catalog.lookup_snapshot(&source_datastore, &snapshot) + { + let media_id = inventory.lookup_media(media_uuid).unwrap(); + (media_id, nr) + } else { + task_warn!( + worker, + "did not find snapshot '{}' in media set {}", + snapshot, + media_set_uuid + ); + continue; + }; + + let (_rel_path, is_new, snap_lock) = datastore.create_locked_backup_dir(&backup_dir)?; + + if !is_new { + task_log!( + worker, + "found snapshot {} on target datastore, skipping...", + snapshot + ); + continue; + } + + snapshot_locks.insert(store_snapshot.to_string(), snap_lock); + + let shared_store_lock = datastore.try_shared_chunk_store_lock()?; + datastore_locks.push(shared_store_lock); + + let file_list = snapshot_file_list + .entry(media_id.label.uuid.clone()) + .or_insert_with(Vec::new); + file_list.push(file_num); + + task_log!( + worker, + "found snapshot {} on {}: file {}", + snapshot, + media_id.label.label_text, + file_num + ); + } + + if snapshot_file_list.is_empty() { + task_log!(worker, "nothing to restore, skipping remaining phases..."); + return Ok(()); + } + + task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir"); + let mut chunks_list: HashMap> = HashMap::new(); + for (media_uuid, file_list) in snapshot_file_list.iter_mut() { + let media_id = inventory.lookup_media(media_uuid).unwrap(); + let (drive, info) = request_and_load_media( + &worker, + &drive_config, + &drive_name, + &media_id.label, + &email, + )?; + file_list.sort_unstable(); + restore_snapshots_to_tmpdir( + worker.clone(), + &base_path, + file_list, + drive, + &info, + &media_set_uuid, + &mut chunks_list, + )?; + } + + let mut media_list: HashMap>> = HashMap::new(); + + for (source_datastore, chunks) in chunks_list.into_iter() { + let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| { + format_err!( + "could not find mapping for source datastore: {}", + source_datastore + ) + })?; + for digest in chunks.into_iter() { + // we only want to restore chunks that we do not have yet + if !datastore.cond_touch_chunk(&digest, false)? { + if let Some((uuid, nr)) = catalog.lookup_chunk(&source_datastore, &digest) { + let file = media_list.entry(uuid.clone()).or_insert_with(HashMap::new); + let chunks = file.entry(nr).or_insert_with(HashSet::new); + chunks.insert(digest); + } + } + } + } + + // we do not need it anymore, saves memory + drop(catalog); + + if !media_list.is_empty() { + task_log!(worker, "Phase 2: restore chunks to datastores"); + } else { + task_log!(worker, "all chunks exist already, skipping phase 2..."); + } + + for (media_uuid, file_list) in media_list.iter_mut() { + let media_id = inventory.lookup_media(media_uuid).unwrap(); + let (mut drive, _info) = request_and_load_media( + &worker, + &drive_config, + &drive_name, + &media_id.label, + &email, + )?; + let mut files: Vec = file_list.keys().map(|v| *v).collect(); + files.sort(); + restore_chunk_file_list(worker.clone(), &mut drive, &files[..], &store_map, file_list)?; + } + + task_log!( + worker, + "Phase 3: copy snapshots from temp dir to datastores" + ); + for (store_snapshot, _lock) in snapshot_locks.into_iter() { + let mut split = store_snapshot.splitn(2, ':'); + let source_datastore = split + .next() + .ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?; + let snapshot = split + .next() + .ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?; + let backup_dir: BackupDir = snapshot.parse()?; + + let datastore = store_map + .get_datastore(&source_datastore) + .ok_or_else(|| format_err!("unexpected source datastore: {}", source_datastore))?; + + let mut tmp_path = base_path.clone(); + tmp_path.push(&source_datastore); + tmp_path.push(snapshot); + + let path = datastore.snapshot_path(&backup_dir); + + for entry in std::fs::read_dir(tmp_path)? { + let entry = entry?; + let mut new_path = path.clone(); + new_path.push(entry.file_name()); + std::fs::copy(entry.path(), new_path)?; + } + task_log!(worker, "Restore snapshot '{}' done", snapshot); + } + Ok(()) + }); + + match std::fs::remove_dir_all(&base_path) { + Ok(()) => {} + Err(err) => task_warn!(worker, "error cleaning up: {}", err), + } + + res +} + +fn get_media_set_catalog( + inventory: &Inventory, + media_set_uuid: &Uuid, +) -> Result { + let status_path = Path::new(TAPE_STATUS_DIR); + + let members = inventory.compute_media_set_members(media_set_uuid)?; + let media_list = members.media_list(); + let mut catalog = MediaSetCatalog::new(); + + for (seq_nr, media_uuid) in media_list.iter().enumerate() { + match media_uuid { + None => { + bail!( + "media set {} is incomplete (missing member {}).", + media_set_uuid, + seq_nr + ); + } + Some(media_uuid) => { + let media_id = inventory.lookup_media(media_uuid).unwrap(); + let media_catalog = MediaCatalog::open(status_path, &media_id, false, false)?; + catalog.append_catalog(media_catalog)?; + } + } + } + + Ok(catalog) +} + +fn restore_snapshots_to_tmpdir( + worker: Arc, + path: &PathBuf, + file_list: &[u64], + mut drive: Box, + media_id: &MediaId, + media_set_uuid: &Uuid, + chunks_list: &mut HashMap>, +) -> Result<(), Error> { + match media_id.media_set_label { + None => { + bail!( + "missing media set label on media {} ({})", + media_id.label.label_text, + media_id.label.uuid + ); + } + Some(ref set) => { + if set.uuid != *media_set_uuid { + bail!( + "wrong media set label on media {} ({} != {})", + media_id.label.label_text, + media_id.label.uuid, + media_set_uuid + ); + } + let encrypt_fingerprint = set.encryption_key_fingerprint.clone().map(|fp| { + task_log!(worker, "Encryption key fingerprint: {}", fp); + (fp, set.uuid.clone()) + }); + + drive.set_encryption(encrypt_fingerprint)?; + } + } + + for file_num in file_list { + drive.move_to_file(*file_num)?; + let mut reader = drive.read_next_file()?; + + let header: MediaContentHeader = unsafe { reader.read_le_value()? }; + if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 { + bail!("missing MediaContentHeader"); + } + + match header.content_magic { + PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => { + let header_data = reader.read_exact_allocated(header.size as usize)?; + + let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data) + .map_err(|err| { + format_err!("unable to parse snapshot archive header - {}", err) + })?; + + let source_datastore = archive_header.store; + let snapshot = archive_header.snapshot; + + task_log!( + worker, + "File {}: snapshot archive {}:{}", + file_num, + source_datastore, + snapshot + ); + + let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?; + + let mut tmp_path = path.clone(); + tmp_path.push(&source_datastore); + tmp_path.push(snapshot); + std::fs::create_dir_all(&tmp_path)?; + + let chunks = chunks_list + .entry(source_datastore) + .or_insert_with(HashSet::new); + let manifest = try_restore_snapshot_archive(worker.clone(), &mut decoder, &tmp_path)?; + for item in manifest.files() { + let mut archive_path = tmp_path.to_owned(); + archive_path.push(&item.filename); + + let index: Box = match archive_type(&item.filename)? { + ArchiveType::DynamicIndex => { + Box::new(DynamicIndexReader::open(&archive_path)?) + } + ArchiveType::FixedIndex => { + Box::new(FixedIndexReader::open(&archive_path)?) + } + ArchiveType::Blob => continue, + }; + for i in 0..index.index_count() { + if let Some(digest) = index.index_digest(i) { + chunks.insert(*digest); + } + } + } + } + _ => bail!("unexpected file type"), + } + } + + Ok(()) +} + +fn restore_chunk_file_list( + worker: Arc, + drive: &mut Box, + file_list: &[u64], + store_map: &DataStoreMap, + chunk_list: &mut HashMap>, +) -> Result<(), Error> { + for nr in file_list { + let current_file_number = drive.current_file_number()?; + if current_file_number != *nr { + drive.move_to_file(*nr)?; + } + let mut reader = drive.read_next_file()?; + let header: MediaContentHeader = unsafe { reader.read_le_value()? }; + if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 { + bail!("missing MediaContentHeader"); + } + + match header.content_magic { + PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => { + let header_data = reader.read_exact_allocated(header.size as usize)?; + + let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data) + .map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?; + + let source_datastore = archive_header.store; + + task_log!( + worker, + "File {}: chunk archive for datastore '{}'", + nr, + source_datastore + ); + + let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| { + format_err!("unexpected chunk archive for store: {}", source_datastore) + })?; + + let chunks = chunk_list + .get_mut(nr) + .ok_or_else(|| format_err!("undexpected file nr: {}", nr))?; + + let count = restore_partial_chunk_archive(worker.clone(), reader, datastore.clone(), chunks)?; + task_log!(worker, "restored {} chunks", count); + } + _ => bail!("unexpected content magic {:?}", header.content_magic), + } + } + + Ok(()) +} + +fn restore_partial_chunk_archive<'a>( + worker: Arc, + reader: Box, + datastore: Arc, + chunk_list: &mut HashSet<[u8; 32]>, +) -> Result { + let mut decoder = ChunkArchiveDecoder::new(reader); + + let mut count = 0; + + let start_time = std::time::SystemTime::now(); + let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let bytes2 = bytes.clone(); + + let writer_pool = ParallelHandler::new( + "tape restore chunk writer", + 4, + move |(chunk, digest): (DataBlob, [u8; 32])| { + if !datastore.cond_touch_chunk(&digest, false)? { + bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst); + chunk.verify_crc()?; + if chunk.crypt_mode()? == CryptMode::None { + chunk.decode(None, Some(&digest))?; // verify digest + } + + datastore.insert_chunk(&chunk, &digest)?; + } + Ok(()) + }, + ); + + let verify_and_write_channel = writer_pool.channel(); + + loop { + let (digest, blob) = match decoder.next_chunk()? { + Some((digest, blob)) => (digest, blob), + None => break, + }; + + worker.check_abort()?; + + if chunk_list.remove(&digest) { + verify_and_write_channel.send((blob, digest.clone()))?; + count += 1; + } + + if chunk_list.is_empty() { + break; + } + } + + drop(verify_and_write_channel); + + writer_pool.complete()?; + + let elapsed = start_time.elapsed()?.as_secs_f64(); + + let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst); + + task_log!( + worker, + "restored {} bytes ({:.2} MB/s)", + bytes, + (bytes as f64) / (1_000_000.0 * elapsed) + ); + + Ok(count) +} + + /// Request and restore complete media without using existing catalog (create catalog instead) pub fn request_and_restore_media( worker: Arc, -- 2.20.1