From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 814F56A692 for ; Tue, 16 Feb 2021 18:08:21 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 769E519231 for ; Tue, 16 Feb 2021 18:07:51 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 8EF2418E64 for ; Tue, 16 Feb 2021 18:07:33 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 578F0461D4 for ; Tue, 16 Feb 2021 18:07:33 +0100 (CET) From: Stefan Reiter To: pbs-devel@lists.proxmox.com Date: Tue, 16 Feb 2021 18:06:56 +0100 Message-Id: <20210216170710.31767-9-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210216170710.31767-1-s.reiter@proxmox.com> References: <20210216170710.31767-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.028 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mod.rs, extract.rs] Subject: [pbs-devel] [PATCH proxmox-backup 08/22] pxar/extract: add sequential variants to create_zip, extract_sub_dir X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 16 Feb 2021 17:08:21 -0000 For streaming pxar files directly from a restore source and extracting them on the fly, we cannot create an Accessor, and instead have to live with a sequential Decoder. This supports only the aio::Decoder variant, since the functions are async anyway. The original functionality remains in place, the new functions are labelled with a _seq suffix. The recursive functions actually doing the work are changed to take an EitherEntry enum variant that can contain either an Accessor (recursive operation) or a Decoder (linear operation). If the seq_ variants are given an encoder where the current position points to a file, they will only extract/encode this file, if it's a directory, they will extract until they leave the directory they started in. Signed-off-by: Stefan Reiter --- src/pxar/extract.rs | 388 ++++++++++++++++++++++++++++++++------------ src/pxar/mod.rs | 5 +- 2 files changed, 292 insertions(+), 101 deletions(-) diff --git a/src/pxar/extract.rs b/src/pxar/extract.rs index b673b4b8..66a5ed59 100644 --- a/src/pxar/extract.rs +++ b/src/pxar/extract.rs @@ -17,8 +17,9 @@ use nix::sys::stat::Mode; use pathpatterns::{MatchEntry, MatchList, MatchType}; use pxar::format::Device; -use pxar::Metadata; +use pxar::{Entry, Metadata, EntryKind}; use pxar::accessor::aio::{Accessor, FileContents, FileEntry}; +use pxar::decoder::aio::Decoder; use proxmox::c_result; use proxmox::tools::fs::{create_path, CreateOptions}; @@ -90,8 +91,6 @@ where let mut err_path_stack = vec![OsString::from("/")]; let mut current_match = options.extract_match_default; while let Some(entry) = decoder.next() { - use pxar::EntryKind; - let entry = entry.map_err(|err| format_err!("error reading pxar archive: {}", err))?; let file_name_os = entry.file_name(); @@ -471,9 +470,23 @@ impl Extractor { } } +enum EitherEntry< + 'a, + S: pxar::decoder::SeqRead + Unpin + Send + 'static, + T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static, +> { + Entry(Entry, &'a mut Decoder), + FileEntry(FileEntry, &'a mut Accessor), +} + +// These types are never constructed, but we need some concrete type fulfilling S and T from +// EitherEntry so rust is happy with its use in async fns +type BogusSeqRead = pxar::decoder::sync::StandardReader; +type BogusReadAt = pxar::accessor::sync::FileRefReader>; + pub async fn create_zip( output: W, - decoder: Accessor, + mut decoder: Accessor, path: P, verbose: bool, ) -> Result<(), Error> @@ -484,96 +497,174 @@ where { let root = decoder.open_root().await?; let file = root - .lookup(&path).await? - .ok_or(format_err!("error opening '{:?}'", path.as_ref()))?; + .lookup(&path) + .await? + .ok_or_else(|| format_err!("error opening '{:?}'", path.as_ref()))?; let mut prefix = PathBuf::new(); let mut components = file.entry().path().components(); - components.next_back(); // discar last + components.next_back(); // discard last for comp in components { prefix.push(comp); } let mut zipencoder = ZipEncoder::new(output); - let mut decoder = decoder; - recurse_files_zip(&mut zipencoder, &mut decoder, &prefix, file, verbose) + let entry: EitherEntry = EitherEntry::FileEntry(file, &mut decoder); + add_entry_to_zip(&mut zipencoder, entry, &prefix, verbose) .await .map_err(|err| { eprintln!("error during creating of zip: {}", err); err })?; - zipencoder - .finish() - .await - .map_err(|err| { - eprintln!("error during finishing of zip: {}", err); - err - }) + zipencoder.finish().await.map_err(|err| { + eprintln!("error during finishing of zip: {}", err); + err + }) } -fn recurse_files_zip<'a, T, W>( +pub async fn create_zip_seq( + output: W, + mut decoder: Decoder, + verbose: bool, +) -> Result<(), Error> +where + S: pxar::decoder::SeqRead + Unpin + Send + 'static, + W: tokio::io::AsyncWrite + Unpin + Send + 'static, +{ + decoder.enable_goodbye_entries(true); + let root = match decoder.peek().await { + Some(Ok(root)) => root, + Some(Err(err)) => bail!("error getting root entry from pxar: {}", err), + None => bail!("cannot extract empty archive"), + }; + + let mut prefix = PathBuf::new(); + let mut components = root.path().components(); + components.next_back(); // discard last + for comp in components { + prefix.push(comp); + } + + let mut zipencoder = ZipEncoder::new(output); + + let root_is_file = matches!(root.kind(), EntryKind::File { .. }); + let mut dir_level = 0; + + while let Some(file) = decoder.next().await { + match file { + Ok(file) => { + match file.kind() { + EntryKind::Directory => dir_level += 1, + EntryKind::GoodbyeTable => { + dir_level -= 1; + // only extract until we leave the directory we started in + if dir_level == 0 { + break; + } + } + _ => {} + } + + let entry: EitherEntry = EitherEntry::Entry(file, &mut decoder); + add_entry_to_zip(&mut zipencoder, entry, &prefix, verbose) + .await + .map_err(|err| { + eprintln!("error during creating of zip: {}", err); + err + })?; + + if root_is_file { + break; + } + } + Err(err) => bail!("error in decoder: {}", err), + } + } + + zipencoder.finish().await.map_err(|err| { + eprintln!("error during finishing of zip: {}", err); + err + }) +} + +fn add_entry_to_zip<'a, S, T, W>( zip: &'a mut ZipEncoder, - decoder: &'a mut Accessor, + file: EitherEntry<'a, S, T>, prefix: &'a Path, - file: FileEntry, verbose: bool, ) -> Pin> + Send + 'a>> where + S: pxar::decoder::SeqRead + Unpin + Send + 'static, T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static, W: tokio::io::AsyncWrite + Unpin + Send + 'static, { - use pxar::EntryKind; Box::pin(async move { - let metadata = file.entry().metadata(); - let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf(); + let (metadata, path, kind) = match file { + EitherEntry::Entry(ref e, _) => (e.metadata(), e.path(), e.kind()), + EitherEntry::FileEntry(ref fe, _) => (fe.metadata(), fe.path(), fe.kind()), + }; - match file.kind() { + if verbose && !matches!(kind, EntryKind::GoodbyeTable) { + eprintln!("adding '{}' to zip", path.display()); + } + + match kind { EntryKind::File { .. } => { - if verbose { - eprintln!("adding '{}' to zip", path.display()); - } let entry = ZipEntry::new( path, metadata.stat.mtime.secs, metadata.stat.mode as u16, true, ); - zip.add_entry(entry, Some(file.contents().await?)) - .await - .map_err(|err| format_err!("could not send file entry: {}", err))?; + let contents = match file { + EitherEntry::Entry(_, dec) => Box::new(match dec.contents() { + Some(con) => con, + None => bail!("file without contents found"), + }) + as Box, + EitherEntry::FileEntry(ref fe, _) => Box::new( + fe.contents() + .await + .map_err(|err| format_err!("file with bad contents found: {}", err))?, + ) + as Box, + }; + zip.add_entry(entry, Some(contents)) + .await + .map_err(|err| format_err!("could not send file entry: {}", err))?; } EntryKind::Hardlink(_) => { - let realfile = decoder.follow_hardlink(&file).await?; - if verbose { - eprintln!("adding '{}' to zip", path.display()); + // we can't extract hardlinks in sequential extraction + if let EitherEntry::FileEntry(ref fe, ref accessor) = file { + let realfile = accessor.follow_hardlink(&fe).await?; + let entry = ZipEntry::new( + path, + metadata.stat.mtime.secs, + metadata.stat.mode as u16, + true, + ); + zip.add_entry(entry, Some(realfile.contents().await?)) + .await + .map_err(|err| format_err!("could not send file entry: {}", err))?; } - let entry = ZipEntry::new( - path, - metadata.stat.mtime.secs, - metadata.stat.mode as u16, - true, - ); - zip.add_entry(entry, Some(realfile.contents().await?)) - .await - .map_err(|err| format_err!("could not send file entry: {}", err))?; } EntryKind::Directory => { - let dir = file.enter_directory().await?; - let mut readdir = dir.read_dir(); - if verbose { - eprintln!("adding '{}' to zip", path.display()); - } let entry = ZipEntry::new( path, metadata.stat.mtime.secs, metadata.stat.mode as u16, false, ); - zip.add_entry::>(entry, None).await?; - while let Some(entry) = readdir.next().await { - let entry = entry?.decode_entry().await?; - recurse_files_zip(zip, decoder, prefix, entry, verbose).await?; + if let EitherEntry::FileEntry(fe, a) = file { + let dir = fe.enter_directory().await?; + let mut readdir = dir.read_dir(); + zip.add_entry::>(entry, None).await?; + while let Some(entry) = readdir.next().await { + let entry = entry?.decode_entry().await?; + let entry: EitherEntry = EitherEntry::FileEntry(entry, a); + add_entry_to_zip(zip, entry, prefix, verbose).await?; + } } } _ => {} // ignore all else @@ -583,6 +674,43 @@ where }) } +fn get_extractor(destination: DEST, metadata: Metadata) -> Result +where + DEST: AsRef +{ + create_path( + &destination, + None, + Some(CreateOptions::new().perm(Mode::from_bits_truncate(0o700))), + ) + .map_err(|err| { + format_err!( + "error creating directory {:?}: {}", + destination.as_ref(), + err + ) + })?; + + let dir = Dir::open( + destination.as_ref(), + OFlag::O_DIRECTORY | OFlag::O_CLOEXEC, + Mode::empty(), + ) + .map_err(|err| { + format_err!( + "unable to open target directory {:?}: {}", + destination.as_ref(), + err, + ) + })?; + + Ok(Extractor::new( + dir, + metadata, + false, + Flags::DEFAULT, + )) +} pub async fn extract_sub_dir( destination: DEST, @@ -597,47 +725,86 @@ where { let root = decoder.open_root().await?; - create_path( - &destination, - None, - Some(CreateOptions::new().perm(Mode::from_bits_truncate(0o700))), - ) - .map_err(|err| format_err!("error creating directory {:?}: {}", destination.as_ref(), err))?; - - let dir = Dir::open( - destination.as_ref(), - OFlag::O_DIRECTORY | OFlag::O_CLOEXEC, - Mode::empty(), - ) - .map_err(|err| format_err!("unable to open target directory {:?}: {}", destination.as_ref(), err,))?; - - let mut extractor = Extractor::new( - dir, + let mut extractor = get_extractor( + destination, root.lookup_self().await?.entry().metadata().clone(), - false, - Flags::DEFAULT, - ); + )?; let file = root - .lookup(&path).await? - .ok_or(format_err!("error opening '{:?}'", path.as_ref()))?; + .lookup(&path) + .await? + .ok_or_else(|| format_err!("error opening '{:?}'", path.as_ref()))?; - recurse_files_extractor(&mut extractor, &mut decoder, file, verbose).await + let entry: EitherEntry = EitherEntry::FileEntry(file, &mut decoder); + do_extract_sub_dir(&mut extractor, entry, verbose).await } -fn recurse_files_extractor<'a, T>( +pub async fn extract_sub_dir_seq( + destination: DEST, + mut decoder: Decoder, + verbose: bool, +) -> Result<(), Error> +where + S: pxar::decoder::SeqRead + Unpin + Send + 'static, + DEST: AsRef, +{ + decoder.enable_goodbye_entries(true); + let root = match decoder.peek().await { + Some(Ok(root)) => root, + Some(Err(err)) => bail!("error getting root entry from pxar: {}", err), + None => bail!("cannot extract empty archive"), + }; + + let mut extractor = get_extractor(destination, root.metadata().clone())?; + let root_is_file = matches!(root.kind(), EntryKind::File { .. }); + let mut dir_level = 0; + + while let Some(file) = decoder.next().await { + match file { + Ok(file) => { + match file.kind() { + EntryKind::Directory => dir_level += 1, + EntryKind::GoodbyeTable => { + dir_level -= 1; + // only extract until we leave the directory we started in + if dir_level == 0 { + break; + } + }, + _ => {} + } + + let path = file.path().to_owned(); + let entry: EitherEntry = EitherEntry::Entry(file, &mut decoder); + if let Err(err) = do_extract_sub_dir(&mut extractor, entry, verbose).await { + eprintln!("error extracting {}: {}", path.display(), err); + } + + if root_is_file { + break; + } + } + Err(err) => bail!("error in decoder: {}", err), + } + } + + Ok(()) +} + +fn do_extract_sub_dir<'a, S, T>( extractor: &'a mut Extractor, - decoder: &'a mut Accessor, - file: FileEntry, + file: EitherEntry<'a, S, T>, verbose: bool, ) -> Pin> + Send + 'a>> where + S: pxar::decoder::SeqRead + Unpin + Send, T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync + 'static, { - use pxar::EntryKind; Box::pin(async move { - let metadata = file.entry().metadata(); - let file_name_os = file.file_name(); + let (metadata, file_name_os, path, kind) = match file { + EitherEntry::Entry(ref e, _) => (e.metadata(), e.file_name(), e.path(), e.kind()), + EitherEntry::FileEntry(ref fe, _) => (fe.metadata(), fe.file_name(), fe.path(), fe.kind()), + }; // safety check: a file entry in an archive must never contain slashes: if file_name_os.as_bytes().contains(&b'/') { @@ -647,28 +814,32 @@ where let file_name = CString::new(file_name_os.as_bytes()) .map_err(|_| format_err!("encountered file name with null-bytes"))?; - if verbose { - eprintln!("extracting: {}", file.path().display()); + if verbose && !matches!(kind, EntryKind::GoodbyeTable) { + eprintln!("extracting: {}", path.display()); } - match file.kind() { + match kind { EntryKind::Directory => { extractor .enter_directory(file_name_os.to_owned(), metadata.clone(), true) .map_err(|err| format_err!("error at entry {:?}: {}", file_name_os, err))?; - let dir = file.enter_directory().await?; - let mut readdir = dir.read_dir(); - while let Some(entry) = readdir.next().await { - let entry = entry?.decode_entry().await?; - let filename = entry.path().to_path_buf(); + // for EitherEntry::Entry we detect directory end with GoodbyeTable + if let EitherEntry::FileEntry(file, a) = file { + let dir = file.enter_directory().await?; + let mut readdir = dir.read_dir(); + while let Some(entry) = readdir.next().await { + let entry = entry?.decode_entry().await?; + let filename = entry.path().to_path_buf(); - // log errors and continue - if let Err(err) = recurse_files_extractor(extractor, decoder, entry, verbose).await { - eprintln!("error extracting {:?}: {}", filename.display(), err); + // log errors and continue + let entry: EitherEntry = EitherEntry::FileEntry(entry, a); + if let Err(err) = do_extract_sub_dir(extractor, entry, verbose).await { + eprintln!("error extracting {}: {}", filename.display(), err); + } } + extractor.leave_directory()?; } - extractor.leave_directory()?; } EntryKind::Symlink(link) => { extractor.extract_symlink(&file_name, metadata, link.as_ref())?; @@ -691,17 +862,34 @@ where extractor.extract_special(&file_name, metadata, 0)?; } } - EntryKind::File { size, .. } => extractor.async_extract_file( - &file_name, - metadata, - *size, - &mut file.contents().await.map_err(|_| { - format_err!("found regular file entry without contents in archive") - })?, - ).await?, - EntryKind::GoodbyeTable => {}, // ignore + EntryKind::File { size, .. } => { + extractor + .async_extract_file( + &file_name, + metadata, + *size, + &mut match file { + EitherEntry::Entry(_, dec) => Box::new(match dec.contents() { + Some(con) => con, + None => bail!("file without contents found"), + }) + as Box, + EitherEntry::FileEntry(ref fe, _) => { + Box::new(fe.contents().await.map_err(|err| { + format_err!("file with bad contents found: {}", err) + })?) + as Box + } + }, + ) + .await? + } + EntryKind::GoodbyeTable => { + if let EitherEntry::Entry(_, _) = file { + extractor.leave_directory()?; + } + } } Ok(()) }) } - diff --git a/src/pxar/mod.rs b/src/pxar/mod.rs index d1302962..d5c42942 100644 --- a/src/pxar/mod.rs +++ b/src/pxar/mod.rs @@ -59,7 +59,10 @@ mod flags; pub use flags::Flags; pub use create::{create_archive, PxarCreateOptions}; -pub use extract::{create_zip, extract_archive, extract_sub_dir, ErrorHandler, PxarExtractOptions}; +pub use extract::{ + create_zip, create_zip_seq, extract_archive, extract_sub_dir, extract_sub_dir_seq, + ErrorHandler, PxarExtractOptions, +}; /// The format requires to build sorted directory lookup tables in /// memory, so we restrict the number of allowed entries to limit -- 2.20.1