From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 416789D55 for ; Mon, 28 Aug 2023 16:43:13 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 22DD41ED72 for ; Mon, 28 Aug 2023 16:42:43 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Mon, 28 Aug 2023 16:42:39 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B56154724F for ; Mon, 28 Aug 2023 16:42:39 +0200 (CEST) From: Max Carrara To: pbs-devel@lists.proxmox.com Date: Mon, 28 Aug 2023 16:42:03 +0200 Message-Id: <20230828144204.3591503-2-m.carrara@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20230828144204.3591503-1-m.carrara@proxmox.com> References: <20230828144204.3591503-1-m.carrara@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.081 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor` X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 28 Aug 2023 14:43:13 -0000 This preliminary commit adds a fully functioning prototype implementation of `AsyncExtractor`, which is - unsurprisingly - a complete `async` implementation of the original `Extractor` (and its accompanying `ExtractorIter`). In order to limit the scope of this RFC and focus on its intent, the `pbs_client::pxar::aio` module is introduced, which contains all relevant changes (instead of refactoring half of the crate in a dozen or two commits). The design of the new extractor is split into three main parts: 1. the higher-level `AsyncExtractor` which exposes the public extraction API 2. the lower-level `RawAsyncExtractor` which serves as the "workhorse" of its implementors, including `AsyncExtractor` 3. the `SyncTaskWorker` which receives blocking / synchronous tasks via a queue and runs them in parallel - this allows the `RawAsyncExtractor` to remain fully cooperative and non-blocking Furthermore, by moving everything that can be considered an *extraction option* into the `PxarExtractionOptions` `struct`, the overall control flow and usage of `AsyncExtractor` and `RawAsyncExtractor` is much simpler to follow. The `ErrorHandler` type alias is made more flexible by requiring an `Fn` instead of an `FnMut`; the callback is no longer generic but instead becomes its own threadsafe alias named `Callback`. The following modules are adapted in a similar fashion, but remain identical in their function: `pbs_client::pxar::dir_stack` --> `pbs_client::pxar::aio::dir_stack` * The original `PxarDirStack` is split in two parts; it now uses the new `DirStack` in order to be somewhat more testable * Synchronization mechanisms and guarantees are put in place in regards to the new extractor implementation, as the `PxarDirStack` must now be shared between multiple threads or tasks `pbs_client::pxar::metadata` --> `pbs_client::pxar::aio::metadata` * Function signatures now take an `&Arc` in order to make it easier to be passed around threads. (There was no measurable difference between `Arc` vs `&Arc` vs `&pxar::Entry`) * The old `ErrorHandler` is replaced with the new version in `pbs_client::pxar::aio` Signed-off-by: Max Carrara --- Cargo.toml | 1 + pbs-client/Cargo.toml | 1 + pbs-client/src/pxar/aio/dir_stack.rs | 543 +++++++++++++++++++ pbs-client/src/pxar/aio/extract/extractor.rs | 446 +++++++++++++++ pbs-client/src/pxar/aio/extract/mod.rs | 220 ++++++++ pbs-client/src/pxar/aio/extract/raw.rs | 503 +++++++++++++++++ pbs-client/src/pxar/aio/metadata.rs | 412 ++++++++++++++ pbs-client/src/pxar/aio/mod.rs | 11 + pbs-client/src/pxar/aio/worker.rs | 167 ++++++ pbs-client/src/pxar/mod.rs | 1 + 10 files changed, 2305 insertions(+) create mode 100644 pbs-client/src/pxar/aio/dir_stack.rs create mode 100644 pbs-client/src/pxar/aio/extract/extractor.rs create mode 100644 pbs-client/src/pxar/aio/extract/mod.rs create mode 100644 pbs-client/src/pxar/aio/extract/raw.rs create mode 100644 pbs-client/src/pxar/aio/metadata.rs create mode 100644 pbs-client/src/pxar/aio/mod.rs create mode 100644 pbs-client/src/pxar/aio/worker.rs diff --git a/Cargo.toml b/Cargo.toml index c7773f0e..ba6d874a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ hyper = { version = "0.14", features = [ "full" ] } lazy_static = "1.4" libc = "0.2" log = "0.4.17" +memchr = "2.5" nix = "0.26.1" nom = "7" num-traits = "0.2" diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml index ed7d651d..047efb41 100644 --- a/pbs-client/Cargo.toml +++ b/pbs-client/Cargo.toml @@ -17,6 +17,7 @@ hyper.workspace = true lazy_static.workspace = true libc.workspace = true log.workspace = true +memchr.workspace = true nix.workspace = true openssl.workspace = true percent-encoding.workspace = true diff --git a/pbs-client/src/pxar/aio/dir_stack.rs b/pbs-client/src/pxar/aio/dir_stack.rs new file mode 100644 index 00000000..62cf9ee5 --- /dev/null +++ b/pbs-client/src/pxar/aio/dir_stack.rs @@ -0,0 +1,543 @@ +#![allow(unused)] + +use std::ffi::OsStr; +use std::fmt::Debug; +use std::os::fd::{AsRawFd, BorrowedFd, RawFd}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{bail, format_err, Context, Error}; +use nix::dir::Dir; +use nix::fcntl::OFlag; +use nix::sys::stat::{mkdirat, Mode}; + +use proxmox_sys::error::SysError; + +// NOTE: This is essentially crate::pxar::tools:assert_single_path_component +// but kept separate here for the time being +pub fn assert_path_has_single_normal_component>(path: P) -> Result<(), Error> { + let path = path.as_ref(); + + if !path.is_relative() { + bail!("path is absolute: {:?}", path) + } + + let mut components = path.components(); + + if !matches!(components.next(), Some(std::path::Component::Normal(_))) { + bail!("invalid component in path: {:?}", path) + } + + if components.next().is_some() { + bail!("path has multiple components: {:?}", path) + } + + Ok(()) +} + +/// A stack which stores directory path components and associates each directory +/// with some data. +pub struct DirStack { + path: PathBuf, + data_stack: Vec, +} + +impl Default for DirStack { + #[inline] + fn default() -> Self { + Self { + path: PathBuf::new(), + data_stack: vec![], + } + } +} + +impl Clone for DirStack { + #[inline] + fn clone(&self) -> Self { + Self { + path: self.path.clone(), + data_stack: self.data_stack.clone(), + } + } +} + +impl Debug for DirStack { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DirStack") + .field("path", &self.path) + .field("data_stack", &self.data_stack) + .finish() + } +} + +impl DirStack { + /// Returns a [`Path`] slice of the entire path that's on the stack. + /// + /// The [dangling root](DirStack::with_dangling_root) will be included if it exists. + #[inline] + pub fn as_path(&self) -> &Path { + self.path.as_path() + } + + /// Removes all data from the stack. + /// + /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists. + #[inline] + pub fn clear(&mut self) { + while self.pop().is_some() {} + } + + /// Returns the [dangling root](DirStack::with_dangling_root) or an empty + /// path if it doesn't exist. + #[inline] + pub fn dangling_root(&self) -> &Path { + if self.data_stack.is_empty() { + self.path.as_ref() + } else { + let mut iter = self.path.iter(); + iter.nth_back(self.data_stack.len() - 1); + iter.as_path() + } + } + + /// Returns the first path component and its associated data. + /// + /// If the stack has a [dangling root](DirStack::with_dangling_root), it + /// will be ignored. Meaning, only the first *added* path component will + /// be returned. + #[inline] + pub fn first(&self) -> Option<(&Path, &T)> { + self.data_stack.first().and_then(|data| { + self.path + .iter() + .nth_back(self.data_stack.len() - 1) + .map(|component| (component.as_ref(), data)) + }) + } + + /// Checks whether the stack is empty. + /// + /// The stack is considered empty if there are no path components with + /// associated data, even if it has a [dangling root](DirStack::with_dangling_root). + #[inline] + pub fn is_empty(&self) -> bool { + self.data_stack.is_empty() + } + + /// Returns the last path component and its associated data. + #[inline] + pub fn last(&self) -> Option<(&Path, &T)> { + self.data_stack.last().and_then(|data| { + self.path + .file_name() + .map(|file_name| (file_name.as_ref(), data)) + }) + } + + /// Returns the number of elements on the stack. + /// + /// Note that the root path (the very first path) may consist of multiple components, + /// but is still counted as one element. + #[inline] + pub fn len(&self) -> usize { + self.data_stack.len() + } + + /// Creates a new, empty [`DirStack`]. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Returns the **full path** before [`DirStack::last()`] and the data associated + /// with the component it leads to, **if both exist.** + /// + /// # Example + /// + /// ``` + /// use std::path::Path; + /// + /// # use tools::DirStack; + /// + /// let mut dir_stack: DirStack = DirStack::new(); + /// + /// dir_stack.push("foo", 1); + /// dir_stack.push("bar", 2); + /// dir_stack.push("baz", 3); + /// + /// let expected = Path::new("foo/bar"); + /// + /// assert_eq!(dir_stack.parent(), Some((expected, &2))); + /// + /// ``` + #[inline] + pub fn parent(&self) -> Option<(&Path, &T)> { + if self.is_empty() { + None + } else { + self.path.parent().and_then(|parent| { + if self.data_stack.len() > 1 { + Some((parent, &self.data_stack[self.data_stack.len() - 2])) + } else { + None + } + }) + } + } + /// Removes the last path component from the stack and returns its associated + /// data, or [`None`] if the stack is empty. + #[inline] + pub fn pop(&mut self) -> Option { + self.data_stack.pop().map(|data| { + self.path.pop(); + data + }) + } + + /// Pushes a path and its associated data onto the stack. + /// + /// The path must consist of a single normal component. + /// See [`Component::Normal`][std::path::Component::Normal]. + #[inline] + pub fn push>(&mut self, path: P, data: T) -> Result<(), Error> { + assert_path_has_single_normal_component(&path)?; + + self.path.push(path.as_ref()); + self.data_stack.push(data); + + Ok(()) + } + + /// Clears the stack, replacing all elements with the given path and that path's + /// associated data. + /// + /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists. + /// + /// Contrary to [`DirStack::push()`], this can be any path and doesn't have to consist of + /// only a single path component. This is useful if you want to e.g. initialize the stack + /// with your current working directory or similar. + #[inline] + pub fn replace>(&mut self, path: P, data: T) { + self.clear(); + self.path.push(path); + self.data_stack.push(data); + } + + /// Creates a new [`DirStack`] from a path without any associated data. + /// This stack has a **dangling root** which means that even though it has a + /// path, it is still considered to be empty, as it lacks any data. + /// + /// This is useful if you want to have a base directory that can never + /// be left through [`DirStack::pop()`], such as your current working directory, + /// for example. + #[inline] + pub fn with_dangling_root>(path: P) -> Self { + Self { + path: path.as_ref().to_owned(), + data_stack: vec![], + } + } +} + +impl std::ops::Index for DirStack +where + Idx: std::slice::SliceIndex<[T], Output = T>, +{ + type Output = T; + + #[inline] + fn index(&self, index: Idx) -> &Self::Output { + &self.data_stack[index] + } +} + +impl std::ops::IndexMut for DirStack +where + Idx: std::slice::SliceIndex<[T], Output = T>, +{ + #[inline] + fn index_mut(&mut self, index: Idx) -> &mut Self::Output { + &mut self.data_stack[index] + } +} + +/// Helper struct that associates a [`Entry`][e] with a potentially opened directory [`Dir`]. +/// +/// It's the developer's responsibility that the contained [`Entry`][e]'s *kind* +/// is actually a [`Directory`][d], and that the opened [`Dir`] corresponds to the +/// provided [`Entry`][e]. +/// +/// [e]: pxar::Entry +/// [d]: pxar::EntryKind::Directory +#[derive(Debug)] +pub(super) struct PxarDir { + entry: Arc, + dir: Option, +} + +impl PxarDir { + /// Creates a new [`PxarDir`] from an [`Arc`][pxar::Entry]. + #[inline] + pub fn new(entry: Arc) -> Self { + Self { entry, dir: None } + } + + /// Creates a new [`PxarDir`] from an [`Arc`][pxar::Entry] + /// which is associated with a [`Dir`]. + #[inline] + pub fn with_dir(entry: Arc, dir: Dir) -> Self { + Self { + entry, + dir: Some(dir), + } + } + + /// Return the file name of the inner [`Entry`][pxar::Entry]. + #[inline] + pub fn file_name(&self) -> &OsStr { + self.entry.file_name() + } + + /// Return a [`BorrowedFd`] to the inner [`Dir`] if available. + #[inline] + pub fn try_as_borrowed_fd(&self) -> Option { + // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead. + self.dir + .as_ref() + .map(|dir| unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) }) + } + + /// Moves the inner [`Entry`][pxar::Entry] and [`Option`][dir] out of the [`PxarDir`]. + /// + /// [dir]: nix::dir::Dir + #[inline] + pub fn into_inner(self) -> (Arc, Option) { + (self.entry, self.dir) + } + + #[inline] + fn create_at(&mut self, parent: RawFd, allow_existing: bool) -> Result { + const PERMS: Mode = Mode::from_bits_truncate(0o700); + + match mkdirat(parent, self.file_name(), PERMS) { + Ok(()) => (), + Err(error) => { + if !(allow_existing && error.already_exists()) { + return Err(error).context("directory already exists"); + } + } + } + + self.open_at(parent) + } + + #[inline] + fn open_at(&mut self, parent: RawFd) -> Result { + Dir::openat(parent, self.file_name(), OFlag::O_DIRECTORY, Mode::empty()) + .context("failed to open directory") + .map(|dir| { + // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead. + let fd = unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) }; + + self.dir = Some(dir); + + fd + }) + } +} + +/// The [`PxarDirStack`] is used to keep track of traversed and created [`PxarDir`s][PxarDir]. +#[derive(Debug)] +pub(super) struct PxarDirStack { + inner: DirStack, + len_created: usize, +} + +/// This struct may safely be `Sync` as it is only used in the context of the +/// [`RawAsyncExtractor`][super::RawAsyncExtractor], which never accesses an +/// underlying [`Dir`] concurrently or in parallel. +unsafe impl Sync for PxarDirStack {} + +impl PxarDirStack { + #[inline] + pub fn new() -> Self { + Self { + inner: DirStack::new(), + len_created: 0, + } + } + + #[inline] + pub fn len(&self) -> usize { + self.inner.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + #[inline] + pub fn as_path(&self) -> &Path { + self.inner.as_path() + } + + #[inline] + pub fn push>(&mut self, path: P, dir: PxarDir) -> Result<(), Error> { + let path = path.as_ref(); + self.inner.push(path, dir).map(|_| { + if self.inner[self.inner.len() - 1].dir.is_some() { + self.len_created += 1; + } + }) + } + + #[inline] + pub fn pop(&mut self) -> Option { + self.inner.pop().map(|dir| { + self.len_created = self.len_created.min(self.len()); + dir + }) + } + + #[inline] + pub fn root_dir_fd(&self) -> Result { + self.inner + .first() + .ok_or_else(|| format_err!("stack underrun")) + .map(|(_, pxar_dir)| { + pxar_dir + .try_as_borrowed_fd() + .context("lost track of directory file descriptors") + })? + } + + /// Return the last [`Dir`]'s file descriptor as a [`BorrowedFd`] + /// or create it if it's not available. + #[inline] + pub fn last_dir_fd(&mut self, allow_existing: bool) -> Result { + if self.is_empty() { + bail!("no directory entries on the stack") + } + + if self.len_created == 0 { + bail!("no created file descriptors on the stack") + } + + let mut fd = self.inner[self.len_created - 1] + .try_as_borrowed_fd() + .context("lost track of directory file descriptors")? + .as_raw_fd(); + + while self.len_created < self.len() { + fd = self.inner[self.len_created] + .create_at(fd, allow_existing) + .map(|borrowed_fd| borrowed_fd.as_raw_fd())?; + self.len_created += 1; + } + + self.inner[self.len_created - 1] + .try_as_borrowed_fd() + .context("lost track of directory file descriptors") + } +} + +#[cfg(test)] +mod test { + use std::ffi::OsStr; + use std::path::Path; + + use super::DirStack; + + // helper extension trait to make asserts more concise + trait PathHelper { + fn as_path(&self) -> &Path; + } + + impl PathHelper for T + where + T: AsRef, + { + fn as_path(&self) -> &Path { + self.as_ref().as_ref() + } + } + + #[test] + fn test_dir_stack_base() { + let mut dir_stack: DirStack = DirStack::new(); + + assert!( + dir_stack.push("foo", 1).is_ok(), + "couldn't push onto stack!" + ); + + dir_stack.push("bar", 2).unwrap(); + dir_stack.push("baz", 3).unwrap(); + + assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path()); + + assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1))); + assert_eq!(dir_stack.last(), Some(("baz".as_path(), &3))); + assert_eq!(dir_stack.parent(), Some(("foo/bar".as_path(), &2))); + + assert_eq!(dir_stack.pop(), Some(3)); + + assert_eq!(dir_stack.last(), Some(("bar".as_path(), &2))); + assert_eq!(dir_stack.parent(), Some(("foo".as_path(), &1))); + + assert_eq!(dir_stack.pop(), Some(2)); + + assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1))); + assert_eq!(dir_stack.last(), Some(("foo".as_path(), &1))); + assert!(dir_stack.parent().is_none()); + + assert_eq!(dir_stack.pop(), Some(1)); + + assert!(dir_stack.first().is_none()); + assert!(dir_stack.last().is_none()); + assert!(dir_stack.parent().is_none()); + + assert_eq!(dir_stack.as_path(), "".as_path()); + + dir_stack.push("foo", 1).unwrap(); + dir_stack.clear(); + + assert_eq!(dir_stack.as_path(), "".as_path()); + } + + #[test] + fn test_dir_stack_dangling_root() { + let mut dir_stack: DirStack = DirStack::with_dangling_root("foo/bar/baz"); + + assert!(dir_stack.is_empty()); + + assert!(dir_stack.push("qux", 1).is_ok()); + + let expected = Some(("qux".as_path(), &1u8)); + + assert_eq!(dir_stack.first(), expected); + assert_eq!(dir_stack.last(), expected); + assert!(dir_stack.parent().is_none()); + + assert_eq!(dir_stack.as_path(), "foo/bar/baz/qux".as_path()); + assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path()); + assert!(!dir_stack.is_empty()); + + assert_eq!(dir_stack.pop(), Some(1)); + + assert!(dir_stack.first().is_none()); + assert!(dir_stack.last().is_none()); + assert!(dir_stack.parent().is_none()); + + assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path()); + assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path()); + assert!(dir_stack.is_empty()); + + let empty_dir_stack: DirStack = DirStack::new(); + assert!(empty_dir_stack.is_empty()); + assert_eq!(empty_dir_stack.dangling_root(), "".as_path()) + } +} diff --git a/pbs-client/src/pxar/aio/extract/extractor.rs b/pbs-client/src/pxar/aio/extract/extractor.rs new file mode 100644 index 00000000..944ec157 --- /dev/null +++ b/pbs-client/src/pxar/aio/extract/extractor.rs @@ -0,0 +1,446 @@ +use std::os::unix::prelude::OsStrExt; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{bail, format_err, Context, Error, Result}; +use memchr; +use pathpatterns::{MatchList, MatchType}; + +use pxar::accessor::aio::Accessor; +use pxar::accessor::SeqReadAtAdapter; +use pxar::decoder::aio::Decoder; +use pxar::decoder::SeqRead; + +use super::raw::RawAsyncExtractor; +use super::PxarExtractOptions; + +use crate::pxar::Flags; + +/// Helper enum that represents the state of the root [`Entry`][pxar::Entry] +/// in the context of the [`AsyncExtractor`]. +/// +/// For example: Because the [`Accessor`] skips the archive's root directory once a +/// [`Decoder`] is instantiated from it, the root entry has to be decoded +/// and provided beforehand. +#[derive(Debug)] +pub(crate) enum RootEntryState { + None, + Decoded { entry: Arc }, + Extracted, +} + +pub struct AsyncExtractor { + decoder: Box>, + inner: Box, + + options: Arc, + + root_entry_state: RootEntryState, + end_reached: bool, + + match_stack: Vec, + current_match: bool, +} + +type FileReader = pxar::accessor::sync::FileRefReader>; + +impl AsyncExtractor> { + /// Create a new extractor from an existing pxar archive file. + /// + /// This is the preferred way to extract an archive that has been saved to + /// local storage, taking advantage of the fact that the archive is available on a drive. + pub async fn with_file

(src_path: P, options: PxarExtractOptions) -> Result + where + P: AsRef, + { + let src_path = src_path.as_ref(); + + let file = std::fs::File::open(src_path) + .with_context(|| format!("failed to open file: {:#?}", src_path))?; + + // allows us to toss the accessor away once we've gotten our decoder from it + let file = Arc::new(file); + + let accessor = Accessor::from_file_ref(file) + .await + .context("failed to instantiate pxar accessor")?; + + // root entry needs to be pre-decoded because `decode_full()` skips it + let root_dir = accessor.open_root().await?; + let root_entry = root_dir.lookup_self().await?.entry().clone(); + + let decoder = root_dir + .decode_full() + .await + .context("failed to instantiate decoder from accessor")?; + + Ok(Self::with_root_entry(decoder, options, root_entry)) + } +} + +impl AsyncExtractor { + /// Create a new extractor from an existing [`Decoder`]. + /// It is assumed that no entries were decoded prior to the creation of the extractor. + pub fn new(decoder: Decoder, options: PxarExtractOptions) -> Self { + let root_entry_state = RootEntryState::None; + Self::new_impl(decoder, options, root_entry_state) + } + + /// Create a new extractor from an existing [`Decoder`] and a pre-decoded root + /// [`Entry`][pxar::Entry]. This is useful if you need to handle the extraction + /// of the archive's root in a special manner. + pub fn with_root_entry( + decoder: Decoder, + options: PxarExtractOptions, + root_entry: pxar::Entry, + ) -> Self { + let root_entry_state = RootEntryState::Decoded { + entry: Arc::new(root_entry), + }; + + Self::new_impl(decoder, options, root_entry_state) + } + + /// Create a new extractor from an arbitrary input that implements [`SeqRead`]. + pub async fn with_input(input: T, options: PxarExtractOptions) -> Result { + let decoder = Decoder::new(input) + .await + .context("failed to instantiate decoder")?; + + let root_entry_state = RootEntryState::None; + + Ok(Self::new_impl(decoder, options, root_entry_state)) + } + + fn new_impl( + mut decoder: Decoder, + options: PxarExtractOptions, + root_entry_state: RootEntryState, + ) -> Self { + decoder.enable_goodbye_entries(true); + + let current_match = options.default_match; + + Self { + decoder: Box::new(decoder), + inner: Box::new(RawAsyncExtractor::new()), + options: Arc::new(options), + root_entry_state, + end_reached: false, + match_stack: vec![], + current_match, + } + } +} + +impl AsyncExtractor { + /// Decode and extract the next [`Entry`][pxar::Entry]. + /// + /// Upon each call, this function will continue to extract entries until + /// either an error is encountered or the archive has been fully extracted. + #[inline] + pub async fn next(&mut self) -> Option> { + if self.end_reached { + return None; + } + + if !matches!(self.root_entry_state, RootEntryState::Extracted) { + return Some(self.handle_root_entry().await); + } + + let entry = match self.decode_next().await { + Some(Ok(entry)) => entry, + // other cases are already handled in `decode_next()` so we can just return here + rval => { + return rval.map(|result| result.map(drop)); + } + }; + + let mut result = match self.extract_next(entry).await { + Err(error) => self.exec_error_handler(error).await, + res => res, + }; + + if result.is_err() { + if let Err(stop_error) = self.stop_extraction() { + result = result + .context(stop_error) + .context("encountered another error during extractor shutdown"); + } + } + + Some(result) + } + + /// Helper method that starts the extraction process by extracting the root + /// directory of the archive to the given destination. + #[inline] + async fn start_extraction(&mut self, entry: Arc) -> Result<(), Error> { + if matches!(self.root_entry_state, RootEntryState::Extracted) { + bail!("root entry of archive was already extracted") + } + + self.inner.start_worker_thread(&self.options)?; + + self.inner + .extract_root_dir(entry, &self.options) + .await + .map(|_| { + self.root_entry_state = RootEntryState::Extracted; + }) + } + + /// Helper method that stops the extraction process by + #[inline] + fn stop_extraction(&mut self) -> Result<(), Error> { + self.end_reached = true; + proxmox_async::runtime::block_in_place(|| self.inner.join_worker_thread()) + } + + /// Decodes the next [`Entry`][pxar::Entry] and wraps it in an [`Arc`] on success. + /// + /// If an error occurs while decoding an entry, the extractor is subsequently stopped. + #[inline(always)] // hot path + async fn decode_next(&mut self) -> Option, Error>> { + let entry = match self.decoder.next().await { + None => { + return if let Err(error) = self.stop_extraction() { + Some(Err(error)) + } else { + None + }; + } + Some(Err(error)) => { + let result = Err(error).context("error reading pxar archive"); + + let result = if let Err(stop_error) = self.stop_extraction() { + result + .context(stop_error) + .context("encountered another error during extractor shutdown") + } else { + result + }; + + return Some(result); + } + Some(Ok(entry)) => entry, + }; + + Some(Ok(Arc::new(entry))) + } + + /// Helper method that extracts the archive's root entry depending on the + /// [`RootEntryState`]. + /// + /// If the root pxar entry is decoded or was provided, the extraction process + /// is started via [`RawAsyncExtractor::start_extraction()`]. + async fn handle_root_entry(&mut self) -> Result<(), Error> { + let decode_result = match self.root_entry_state { + RootEntryState::None => match self.decode_next().await { + Some(result) => result.context("error while decoding root entry"), + None => Err(format_err!("no root entry found - archive is empty")), + }, + RootEntryState::Decoded { ref entry } => Ok(Arc::clone(entry)), + RootEntryState::Extracted => Err(format_err!("root entry was already extracted")), + }; + + let entry = decode_result.map_err(|error| { + let _ = self.stop_extraction(); + error + })?; + + self.start_extraction(entry).await.map_err(|error| { + let _ = self.stop_extraction(); + error + }) + } + + /// Extract an [`Entry`][pxar::Entry] depending on its [`EntryKind`][pxar::EntryKind]. + /// + /// + /// This method checks each entry's filename and matches it against the given + /// [`MatchList`]. Once an entry is safe to extract and matches, the + /// [`Callback`][super::Callback] is executed and the underlying [`RawAsyncExtractor`] + /// is used to perform the actual extraction operation. + #[inline(always)] // hot path + async fn extract_next(&mut self, entry: Arc) -> Result<(), Error> { + Self::check_entry_filename(&entry)?; + + let (match_type, did_match) = self.match_entry(&entry); + + use pxar::EntryKind::*; + + match (did_match, entry.kind()) { + (_, Directory) => { + self.exec_callback(&entry).await; + + let do_create = self.current_match && match_type != Some(MatchType::Exclude); + + let res = self + .inner + .enter_directory(entry, &self.options, do_create) + .await; + + if res.is_ok() { + // We're starting a new directory, push our old matching + // state and replace it with our new one: + self.match_stack.push(self.current_match); + self.current_match = did_match; + } + + res + } + (_, GoodbyeTable) => { + self.exec_callback(&entry).await; + let res = self.inner.leave_directory(&self.options).await; + + if res.is_ok() { + // We left a directory, also get back our previous matching state. This is in sync + // with `dir_stack` so this should never be empty except for the final goodbye + // table, in which case we get back to the default of `true`. + self.current_match = self.match_stack.pop().unwrap_or(true); + } + + res + } + (true, Symlink(_)) => { + self.exec_callback(&entry).await; + + self.inner.extract_symlink(entry, &self.options).await + } + (true, Hardlink(_)) => { + self.exec_callback(&entry).await; + + self.inner.extract_hardlink(entry, &self.options).await + } + (true, Device(dev)) => { + self.exec_callback(&entry).await; + + if self.has_feature_flags(Flags::WITH_DEVICE_NODES) { + let dev = dev.to_owned(); + self.inner.extract_device(entry, &self.options, dev).await + } else { + Ok(()) + } + } + (true, Fifo) => { + self.exec_callback(&entry).await; + + if self.has_feature_flags(Flags::WITH_FIFOS) { + self.inner.extract_fifo(entry, &self.options, 0).await + } else { + Ok(()) + } + } + (true, Socket) => { + self.exec_callback(&entry).await; + + if self.has_feature_flags(Flags::WITH_SOCKETS) { + self.inner.extract_socket(entry, &self.options, 0).await + } else { + Ok(()) + } + } + (true, File { size, .. }) => { + self.exec_callback(&entry).await; + + if let Some(ref mut contents) = self.decoder.contents() { + let size = size.to_owned(); + self.inner + .extract_file(entry, &self.options, contents, size) + .await + } else { + Err(format_err!( + "found regular file entry without contents in archive" + )) + } + } + (false, _) => Ok(()), + } + } + + /// Checks whether the [`Entry`'s][e] filename is valid. + /// + /// The filename is valid if and only if: + /// * it doesn't contain slashes `/` + /// * it doesn't contain null bytes `\0` + /// + /// [e]: pxar::Entry + fn check_entry_filename(entry: &pxar::Entry) -> Result<(), Error> { + let file_name_bytes = entry.file_name().as_bytes(); + + if let Some(pos) = memchr::memchr(b'/', file_name_bytes) { + bail!( + "archive entry filename contains slash at position {pos}, \ + which is invalid and a security concern" + ) + } + + if let Some(pos) = memchr::memchr(0, file_name_bytes) { + bail!("archive entry filename contains NULL byte at position {pos}") + } + + Ok(()) + } + + /// Helper method that checks whether the [`Entry`'s][pxar::Entry] path and file mode + /// match the provided [`MatchList`] in [`PxarExtractOptions`]. + /// + /// Whether the entry actually matched is also returned together with the + /// [`Option`][MatchType]. If the latter is [`None`], the + /// **current match** is used as a fallback. + #[inline] + pub(crate) fn match_entry(&self, entry: &pxar::Entry) -> (Option, bool) { + let path_bytes = entry.path().as_os_str().as_bytes(); + let file_mode = entry.metadata().file_type() as u32; + + // NOTE: On large match lists this blocks for quite some time, + // so this could maybe be called in spawn_blocking() above a certain size + // ... buuut that depends on how we define what a "large" `match_list` actually is + + // We can `unwrap()` safely here because we get a `Result<_, Infallible>` + let match_type = self + .options + .match_list + .matches(path_bytes, Some(file_mode)) + .unwrap(); + + let did_match = match match_type { + Some(MatchType::Include) => true, + Some(MatchType::Exclude) => false, + None => self.current_match, + }; + + (match_type, did_match) + } + + #[inline] + fn has_feature_flags(&self, feature_flags: Flags) -> bool { + self.options.feature_flags.contains(feature_flags) + } + + /// Helper method to spawn and await a task that executes the extractor's + /// [`ErrorHandler`][super::ErrorHandler]. + #[inline] + async fn exec_error_handler(&self, error: Error) -> Result<(), Error> { + tokio::task::spawn_blocking({ + let error_handler = Arc::clone(&self.options.error_handler); + move || error_handler(error) + }) + .await + .context("failed to execute error handler")? + } + + /// Helper method to spawn and await a task that executes the extractor's + /// [`Callback`][super::Callback]. + #[inline] + async fn exec_callback(&self, entry: &Arc) { + if let Some(ref callback) = self.options.callback { + tokio::task::spawn({ + let callback = Arc::clone(callback); + let entry = Arc::clone(entry); + async move { callback(entry).await } + }); + } + } +} diff --git a/pbs-client/src/pxar/aio/extract/mod.rs b/pbs-client/src/pxar/aio/extract/mod.rs new file mode 100644 index 00000000..82d9a596 --- /dev/null +++ b/pbs-client/src/pxar/aio/extract/mod.rs @@ -0,0 +1,220 @@ +use std::borrow::Cow; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use futures::future::BoxFuture; +use lazy_static::lazy_static; + +use pathpatterns::MatchEntry; + +use crate::pxar::Flags; + +pub(crate) mod extractor; +pub use extractor::AsyncExtractor; + +pub(crate) mod raw; +pub use raw::RawAsyncExtractor; + +// NOTE: These will be put into a separate module and are only used here +// to avoid (too much) code duplication +use crate::pxar::extract::OverwriteFlags; +use crate::pxar::extract::PxarExtractContext; + +pub type ErrorHandler = + Box Result<(), anyhow::Error> + Send + Sync + 'static>; + +pub type Callback = Box) -> BoxFuture<'static, ()> + Send + Sync + 'static>; + +lazy_static! { + static ref DEFAULT_ERROR_HANDLER: Arc = Arc::new(Box::new(Err)); +} + +/// Options for extracting a pxar archive. +pub struct PxarExtractOptions { + /// The destination directory to which the archive should be extracted. + pub destination: PathBuf, + + /// The flags that control what's extracted from the archive. + pub feature_flags: Flags, + + /// The flags that control what kind of file system entries may or may not + /// be overwritten. + pub overwrite_flags: OverwriteFlags, + + /// Whether to allow already existing directories or not. + pub allow_existing_dirs: bool, + + /// The initial matching case for extracted entries. + pub default_match: bool, + + /// A list of match entries. Each [`MatchEntry`] lets you control which files + /// should or shouldn't be extracted. + pub match_list: Vec, + + /// A boxed closure that's used to handle errors throughout the entire + /// extraction process. + pub error_handler: Arc, + + /// An optional future that is called whenever a [`pxar::Entry`] matches + /// and is about to be extracted. + /// + /// **Note that this future is spawned as a task and not awaited until completion.** + pub callback: Option>, +} + +impl PxarExtractOptions { + pub fn builder(destination: D) -> PxarExtractOptionsBuilder + where + D: Into>, + { + PxarExtractOptionsBuilder::new(destination) + } +} + +impl std::fmt::Debug for PxarExtractOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let error_handler_msg = if Arc::ptr_eq(&self.error_handler, &DEFAULT_ERROR_HANDLER) { + &"default (no-op)" + } else { + &"custom" + }; + + f.debug_struct("PxarExtractOptions") + .field("destination", &self.destination) + .field("feature_flags", &self.feature_flags) + .field("overwrite_flags", &self.overwrite_flags) + .field("allow_existing_dirs", &self.allow_existing_dirs) + .field("default_match", &self.default_match) + .field("match_list", &self.match_list) + .field("error_handler", error_handler_msg) + .field("callback", &self.callback.is_some()) + .finish() + } +} + +/// This builder is used to configure the behaviour of the [`AsyncExtractor`][extr]. +/// +/// See [`PxarExtractOptions`] for a complete description of all extraction options. +/// +/// [extr]: extractor::AsyncExtractor +pub struct PxarExtractOptionsBuilder { + destination: Cow<'static, Path>, + + feature_flags: Flags, + overwrite_flags: OverwriteFlags, + allow_existing_dirs: bool, + + default_match: bool, + match_list: Vec, + + error_handler: Option>, + callback: Option>, +} + +impl std::fmt::Debug for PxarExtractOptionsBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PxarExtractOptionsBuilder") + .field("destination", &self.destination) + .field("feature_flags", &self.feature_flags) + .field("overwrite_flags", &self.overwrite_flags) + .field("allow_existing_dirs", &self.allow_existing_dirs) + .field("default_match", &self.default_match) + .field("match_list", &self.match_list) + .field("error_handler", &self.error_handler.is_some()) + .field("callback", &self.callback.is_some()) + .finish() + } +} + +impl PxarExtractOptionsBuilder { + pub fn new(destination: D) -> Self + where + D: Into>, + { + Self { + destination: destination.into(), + feature_flags: Flags::default(), + overwrite_flags: OverwriteFlags::empty(), + allow_existing_dirs: false, + default_match: true, // entries are considered as matching by default + match_list: Default::default(), + error_handler: None, + callback: None, + } + } + + pub fn build(&mut self) -> PxarExtractOptions { + let error_handler = self + .error_handler + .as_ref() + .unwrap_or(&DEFAULT_ERROR_HANDLER); + + PxarExtractOptions { + destination: self.destination.to_path_buf(), + feature_flags: self.feature_flags, + overwrite_flags: self.overwrite_flags, + allow_existing_dirs: self.allow_existing_dirs, + default_match: self.default_match, + match_list: self.match_list.clone(), + error_handler: Arc::clone(error_handler), + callback: self.callback.as_ref().map(Arc::clone), + } + } +} + +impl PxarExtractOptionsBuilder { + pub fn feature_flags(&mut self, flags: Flags) -> &mut Self { + self.feature_flags = flags; + self + } + + pub fn overwrite_flags(&mut self, flags: OverwriteFlags) -> &mut Self { + self.overwrite_flags = flags; + self + } + + pub fn allow_existing_dirs(&mut self, value: bool) -> &mut Self { + self.allow_existing_dirs = value; + self + } + + pub fn default_match(&mut self, value: bool) -> &mut Self { + self.default_match = value; + self + } + + pub fn push_match_entry(&mut self, match_entry: MatchEntry) -> &mut Self { + self.match_list.push(match_entry); + self + } + + pub fn push_match_list>>( + &mut self, + match_list: T, + ) -> &mut Self { + // for some reason, clippy doesn't like this + #[allow(clippy::redundant_clone)] + self.match_list.extend(match_list.to_owned()); + self + } + + pub fn error_handler(&mut self, error_handler: ErrorHandler) -> &mut Self { + let error_handler = Arc::new(error_handler); + self.error_handler_ref(error_handler) + } + + pub fn error_handler_ref(&mut self, error_handler: Arc) -> &mut Self { + self.error_handler.replace(error_handler); + self + } + + pub fn callback(&mut self, callback: Callback) -> &mut Self { + let callback = Arc::new(callback); + self.callback_ref(callback) + } + + pub fn callback_ref(&mut self, callback: Arc) -> &mut Self { + self.callback.replace(callback); + self + } +} diff --git a/pbs-client/src/pxar/aio/extract/raw.rs b/pbs-client/src/pxar/aio/extract/raw.rs new file mode 100644 index 00000000..d6e20981 --- /dev/null +++ b/pbs-client/src/pxar/aio/extract/raw.rs @@ -0,0 +1,503 @@ +use std::os::fd::{AsRawFd, FromRawFd, RawFd}; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{bail, Context, Error, Result}; +use nix::sys::stat::SFlag; +use nix::{dir::Dir, fcntl::OFlag, sys::stat::Mode}; + +use pxar::format::Device; +use tokio::io::AsyncRead; +use tokio::sync::RwLock; + +use proxmox_sys::fs::CreateOptions; + +use super::OverwriteFlags; +use super::PxarExtractContext; +use super::PxarExtractOptions; + +use crate::pxar::aio::dir_stack::{PxarDir, PxarDirStack}; +use crate::pxar::aio::metadata; +use crate::pxar::aio::worker::SyncTaskWorker; + +/// The [`RawAsyncExtractor`] is used to extract individual [`pxar::Entries`][pxar::Entry]. +/// Its purpose is to provide a common underlying mechanism with which an archive is +/// extracted. +/// +/// The tracking of directories is based on a stack. [Entering][enter] and [leaving][leave] +/// directories simply corresponds to [`push()`][push] and [`pop()`][pop]. All remaining +/// operations do not modify the raw extractor's stack otherwise. +/// +/// In order to make extraction fully concurrent, an internal worker thread is used +/// to which synchronous tasks are sent to. This worker thread has to be started manually. +/// +/// No state is tracked otherwise, which means that it's your own responsibility +/// to [extract the archive's root][extr_root], as well as [start][w_start] or [stop][w_stop] +/// the [`RawAsyncExtractor`]'s internal worker thread. +/// +/// +/// [enter]: Self::enter_directory() +/// [leave]: Self::leave_directory() +/// [push]: std::path::PathBuf::push() +/// [pop]: std::path::PathBuf::pop() +/// [extr_root]: Self::extract_root_dir +/// [w_start]: Self::start_worker_thread +/// [w_stop]: Self::join_worker_thread +pub struct RawAsyncExtractor { + /// A stack of directories used to aid in traversing and extracting the pxar archive. + dir_stack: Arc>, + + /// Worker thread for synchronous tasks. + worker: Option, +} + +impl Default for RawAsyncExtractor { + fn default() -> Self { + Self::new() + } +} + +impl RawAsyncExtractor { + #[inline] + pub fn new() -> Self { + let dir_stack = Arc::new(RwLock::new(PxarDirStack::new())); + + Self { + dir_stack, + worker: None, + } + } + + /// Extract the root directory of a pxar archive to the given [`destination`][dest]. + /// + /// [dest]: PxarExtractOptions::destination + pub async fn extract_root_dir( + &mut self, + entry: Arc, + options: &PxarExtractOptions, + ) -> Result<(), Error> { + let path = options.destination.clone(); + + let root_pxar_dir = + tokio::task::spawn_blocking(move || Self::do_extract_root_dir(path, entry)) + .await + .context("failed to execute task to create extraction directory")??; + + let dir_name = options + .destination + .file_name() + .context("extraction destination directory has no name")?; + + self.dir_stack.write().await.push(dir_name, root_pxar_dir) + } + + #[inline] + fn do_extract_root_dir>( + path: P, + entry: Arc, + ) -> Result { + const CREATE_OPTS: CreateOptions = + CreateOptions::new().perm(Mode::from_bits_truncate(0o700)); + + if !entry.is_dir() { + bail!("pxar archive does not start with a directory entry!"); + } + + let path = path.as_ref(); + + proxmox_sys::fs::create_path(path, None, Some(CREATE_OPTS)) + .with_context(|| format!("error creating extraction directory {path:?}"))?; + + let dir = Dir::open(path, OFlag::O_DIRECTORY | OFlag::O_CLOEXEC, Mode::empty()) + .with_context(|| format!("unable to open extraction directory {path:?}"))?; + + Ok(PxarDir::with_dir(entry, dir)) + } + + #[inline] + pub fn start_worker_thread(&mut self, options: &PxarExtractOptions) -> Result<(), Error> { + // When encountering lots of symlinks or hardlinks, the queue may become quite large + const WORKER_QUEUE_SIZE: usize = 5000; + + if self.worker.is_some() { + bail!("worker thread already started") + } + + let error_handler = Arc::clone(&options.error_handler); + + self.worker.replace(SyncTaskWorker::with_error_handler( + WORKER_QUEUE_SIZE, + error_handler, + )); + + Ok(()) + } + + #[inline] + pub fn join_worker_thread(&mut self) -> Result<(), Error> { + if let Some(mut worker) = self.worker.take() { + worker.join() + } else { + bail!("worker thread already joined") + } + } + + /// Convenience wrapper to send a task to the [`SyncTaskWorker`]. + #[inline] + async fn send_to_worker(&self, task: F) -> Result<(), Error> + where + F: FnOnce() -> Result<(), Error> + Send + 'static, + { + if let Some(worker) = self.worker.as_ref() { + worker.send(task).await + } else { + bail!("failed to send task to worker - worker already finished") + } + } +} + +// Extraction operations for each kind of pxar entry. +impl RawAsyncExtractor { + #[inline] + fn parent_fd(dir_stack: &RwLock, allow_existing: bool) -> Result { + dir_stack + .blocking_write() + .last_dir_fd(allow_existing) + .map(|fd| fd.as_raw_fd()) + } + + pub async fn enter_directory( + &mut self, + entry: Arc, + options: &PxarExtractOptions, + do_create: bool, + ) -> Result<(), Error> { + let dir_stack = Arc::clone(&self.dir_stack); + let allow_existing = options.allow_existing_dirs; + + let task = move || { + { + let pxar_dir = PxarDir::new(Arc::clone(&entry)); + let mut locked_dir_stack = dir_stack.blocking_write(); + + locked_dir_stack.push(entry.file_name(), pxar_dir)?; + + if do_create { + locked_dir_stack.last_dir_fd(allow_existing).map(drop)?; + } + + Ok::<(), Error>(()) + } + .context(PxarExtractContext::EnterDirectory) + }; + + self.send_to_worker(task).await + } + + pub async fn leave_directory(&mut self, options: &PxarExtractOptions) -> Result<(), Error> { + let dir_stack = Arc::clone(&self.dir_stack); + let error_handler = Arc::clone(&options.error_handler); + let flags = options.feature_flags; + + let task = move || { + { + let pxar_dir = dir_stack + .blocking_write() + .pop() + .context("unexpected end of directory stack")?; + + let fd = pxar_dir.try_as_borrowed_fd().map(|fd| fd.as_raw_fd()); + let (entry, dir) = pxar_dir.into_inner(); + + if let Some(fd) = fd { + metadata::apply(flags, &entry, fd, &error_handler)?; + } + + // dropping `dir` closes it - we want that to happen after metadata was applied + drop(dir); + Ok::<(), Error>(()) + } + .context(PxarExtractContext::LeaveDirectory) + }; + + self.send_to_worker(task).await + } + + pub async fn extract_symlink( + &self, + entry: Arc, + options: &PxarExtractOptions, + ) -> Result<(), Error> { + use pxar::EntryKind::Symlink; + + let dir_stack = Arc::clone(&self.dir_stack); + let error_handler = Arc::clone(&options.error_handler); + + let allow_existing = options.allow_existing_dirs; + let feature_flags = options.feature_flags; + + let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::SYMLINK); + + let task = move || { + { + let link = match entry.kind() { + Symlink(link) => link, + _ => bail!("received invalid entry kind while trying to extract symlink"), + }; + + let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?; + + let result = + nix::unistd::symlinkat(link.as_os_str(), Some(parent_fd), entry.file_name()); + + match result { + Ok(()) => {} + Err(nix::errno::Errno::EEXIST) if do_overwrite => { + // Never unlink directories + let flag = nix::unistd::UnlinkatFlags::NoRemoveDir; + nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?; + + nix::unistd::symlinkat( + link.as_os_str(), + Some(parent_fd), + entry.file_name(), + )?; + } + Err(error) => return Err(error.into()), + } + + metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler) + } + .context(PxarExtractContext::ExtractSymlink) + }; + + self.send_to_worker(task) + .await + .context(PxarExtractContext::ExtractSymlink) + } + + pub async fn extract_hardlink( + &self, + entry: Arc, + options: &PxarExtractOptions, + ) -> Result<(), Error> { + use pxar::EntryKind::Hardlink; + + let dir_stack = Arc::clone(&self.dir_stack); + + let allow_existing = options.allow_existing_dirs; + let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::HARDLINK); + + let task = move || { + { + let link = match entry.kind() { + Hardlink(link) => { + if !AsRef::::as_ref(link.as_os_str()).is_relative() { + bail!("received absolute path while trying to extract hardlink") + } else { + link.as_os_str().to_owned() + } + } + _ => bail!("received invalid entry kind while trying to extract hardlink"), + }; + + let root_fd = dir_stack.blocking_read().root_dir_fd()?.as_raw_fd(); + let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?; + + let make_link = || { + nix::unistd::linkat( + Some(root_fd), + link.as_os_str(), + Some(parent_fd), + entry.file_name(), + nix::unistd::LinkatFlags::NoSymlinkFollow, + ) + }; + + match make_link() { + Err(nix::Error::EEXIST) if do_overwrite => { + let flag = nix::unistd::UnlinkatFlags::NoRemoveDir; + nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?; + make_link() + } + result => result, + } + } + .context(PxarExtractContext::ExtractHardlink) + }; + + self.send_to_worker(task) + .await + .context(PxarExtractContext::ExtractHardlink) + } + + pub async fn extract_device( + &self, + entry: Arc, + options: &PxarExtractOptions, + device: Device, + ) -> Result<(), Error> { + self.extract_special(entry, options, device.to_dev_t()) + .await + .context(PxarExtractContext::ExtractDevice) + } + + pub async fn extract_fifo( + &self, + entry: Arc, + options: &PxarExtractOptions, + device: nix::sys::stat::dev_t, + ) -> Result<(), Error> { + self.extract_special(entry, options, device) + .await + .context(PxarExtractContext::ExtractFifo) + } + + pub async fn extract_socket( + &self, + entry: Arc, + options: &PxarExtractOptions, + device: nix::sys::stat::dev_t, + ) -> Result<(), Error> { + self.extract_special(entry, options, device) + .await + .context(PxarExtractContext::ExtractSocket) + } + + #[inline(always)] + async fn extract_special( + &self, + entry: Arc, + options: &PxarExtractOptions, + device: nix::sys::stat::dev_t, + ) -> Result<(), Error> { + let dir_stack = Arc::clone(&self.dir_stack); + let allow_existing = options.allow_existing_dirs; + + let feature_flags = options.feature_flags; + let error_handler = Arc::clone(&options.error_handler); + + let task = move || { + let mode = metadata::perms_from_metadata(entry.metadata())?; + + let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?; + + nix::sys::stat::mknodat(parent_fd, entry.file_name(), SFlag::empty(), mode, device) + .context("failed to create special device")?; + + metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler) + .context("failed to apply metadata to special device") + }; + + self.send_to_worker(task).await + } + + pub async fn extract_file( + &mut self, + entry: Arc, + options: &PxarExtractOptions, + contents: C, + size: u64, + ) -> Result<(), Error> + where + C: AsyncRead + Unpin, + { + let (sender, receiver) = tokio::sync::oneshot::channel(); + + let file_creation_task = { + let mut oflags = OFlag::O_CREAT | OFlag::O_WRONLY | OFlag::O_CLOEXEC; + if options.overwrite_flags.contains(OverwriteFlags::FILE) { + oflags |= OFlag::O_TRUNC; + } else { + oflags |= OFlag::O_EXCL; + } + + let entry = Arc::clone(&entry); + let dir_stack = Arc::clone(&self.dir_stack); + let error_handler = Arc::clone(&options.error_handler); + + let allow_existing = options.allow_existing_dirs; + let feature_flags = options.feature_flags; + + move || { + { + let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?; + + let raw_fd = nix::fcntl::openat( + parent_fd, + entry.file_name(), + oflags, + Mode::from_bits(0o700).unwrap(), + ) + .with_context(|| "failed to create file".to_string())?; + + metadata::apply_initial_flags(feature_flags, &entry, raw_fd, &error_handler)?; + + let file = unsafe { tokio::fs::File::from_raw_fd(raw_fd) }; + + match sender.send(file) { + Ok(()) => Ok::<(), Error>(()), + Err(_) => bail!("failed to send file descriptor to origin future"), + } + } + .context(PxarExtractContext::ExtractFile) + } + }; + + // The task is queued for the worker ... + self.send_to_worker(file_creation_task) + .await + .context(PxarExtractContext::ExtractFile)?; + + // ... and its response is then awaited, so other things + // may be done in the meantime + let mut file = receiver.await.context( + "failed to receive raw file descriptor from worker thread - did the thread die?", + )?; + + let mut contents = tokio::io::BufReader::new(contents); + let copy_result = proxmox_io::sparse_copy_async(&mut contents, &mut file) + .await + .context(PxarExtractContext::ExtractFile)?; + + if size != copy_result.written { + bail!( + "extracted {} bytes of a file of {} bytes", + copy_result.written, + size + ); + } + + let task = { + let raw_fd = file.as_raw_fd(); + let feature_flags = options.feature_flags; + let error_handler = Arc::clone(&options.error_handler); + + move || { + { + if copy_result.seeked_last { + while match nix::unistd::ftruncate(raw_fd, size as i64) { + Ok(_) => false, + Err(errno) if errno == nix::errno::Errno::EINTR => true, + Err(err) => return Err(err).context("error setting file size"), + } {} + } + + metadata::apply(feature_flags, &entry, raw_fd, &error_handler) + .with_context(|| "failed to apply metadata to file".to_string())?; + + // 'file' closes itself when dropped, so we move and drop it here explicitly + // after the remaining work on it was done + drop(file); + + Ok::<(), Error>(()) + } + .context(PxarExtractContext::ExtractFile) + } + }; + + self.send_to_worker(task) + .await + .context(PxarExtractContext::ExtractFile) + } +} diff --git a/pbs-client/src/pxar/aio/metadata.rs b/pbs-client/src/pxar/aio/metadata.rs new file mode 100644 index 00000000..06927498 --- /dev/null +++ b/pbs-client/src/pxar/aio/metadata.rs @@ -0,0 +1,412 @@ +use std::ffi::{CStr, CString}; +use std::os::fd::{AsRawFd, RawFd}; +use std::sync::Arc; + +use anyhow::{bail, format_err, Context, Error}; +use nix::errno::Errno; +use nix::fcntl::OFlag; +use nix::sys::stat::{Mode, UtimensatFlags}; + +use proxmox_sys::c_result; +use proxmox_sys::error::SysError; +use proxmox_sys::fs::{self, acl, xattr}; +use pxar::Metadata; + +use super::ErrorHandler; +use crate::pxar::Flags; + +// NOTE: The functions here are equivalent to crate::pxar::metadata and have +// only been adapted to take a pxar::Entry directly. + +// +// utility functions +// + +fn allow_notsupp(err: E) -> Result<(), E> { + if err.is_errno(Errno::EOPNOTSUPP) { + Ok(()) + } else { + Err(err) + } +} + +fn allow_notsupp_remember(err: E, not_supp: &mut bool) -> Result<(), E> { + if err.is_errno(Errno::EOPNOTSUPP) { + *not_supp = true; + Ok(()) + } else { + Err(err) + } +} + +fn timestamp_to_update_timespec(mtime: &pxar::format::StatxTimestamp) -> [libc::timespec; 2] { + // restore mtime + const UTIME_OMIT: i64 = (1 << 30) - 2; + + [ + libc::timespec { + tv_sec: 0, + tv_nsec: UTIME_OMIT, + }, + libc::timespec { + tv_sec: mtime.secs, + tv_nsec: mtime.nanos as _, + }, + ] +} + +/// Get the file permissions as `nix::Mode` +pub fn perms_from_metadata(meta: &Metadata) -> Result { + let mode = meta.stat.get_permission_bits(); + + u32::try_from(mode) + .context("couldn't narrow permission bits") + .and_then(|mode| { + Mode::from_bits(mode) + .with_context(|| format!("mode contains illegal bits: 0x{:x} (0o{:o})", mode, mode)) + }) +} + +// +// metadata application: +// + +pub fn apply_initial_flags( + feature_flags: Flags, + entry: &Arc, + fd: RawFd, + error_handler: &Arc, +) -> Result<(), Error> { + let entry_flags = Flags::from_bits_truncate(entry.metadata().stat.flags); + + apply_chattr( + fd, + entry_flags.to_initial_chattr(), + feature_flags.to_initial_chattr(), + ) + .or_else(&**error_handler) +} + +pub fn apply_at( + feature_flags: Flags, + entry: &Arc, + parent: RawFd, + error_handler: &Arc, +) -> Result<(), Error> { + let fd = proxmox_sys::fd::openat( + &parent, + entry.file_name(), + OFlag::O_PATH | OFlag::O_CLOEXEC | OFlag::O_NOFOLLOW, + Mode::empty(), + ) + .context("failed to open parent file descriptor")?; + + apply(feature_flags, entry, fd.as_raw_fd(), error_handler) +} + +pub fn apply( + feature_flags: Flags, + entry: &Arc, + fd: RawFd, + error_handler: &Arc, +) -> Result<(), Error> { + let metadata = entry.metadata(); + let c_proc_path = CString::new(format!("/proc/self/fd/{}", fd)).unwrap(); + + apply_ownership(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?; + + let mut skip_xattrs = false; + apply_xattrs(feature_flags, &c_proc_path, metadata, &mut skip_xattrs) + .or_else(&**error_handler)?; + + add_fcaps(feature_flags, &c_proc_path, metadata, &mut skip_xattrs).or_else(&**error_handler)?; + + apply_acls(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?; + + apply_quota_project_id(feature_flags, fd, metadata).or_else(&**error_handler)?; + + // Finally mode and time. We may lose access with mode, but the changing the mode also + // affects times. + if !metadata.is_symlink() && feature_flags.contains(Flags::WITH_PERMISSIONS) { + let mode = perms_from_metadata(metadata)?; + nix::sys::stat::fchmod(fd, mode) + .or_else(allow_notsupp) + .context("failed to change file mode") + .or_else(&**error_handler)?; + } + + let [atime, mtime] = timestamp_to_update_timespec(&metadata.stat.mtime); + let res = nix::sys::stat::utimensat( + None, + c_proc_path.as_ref(), + &atime.into(), + &mtime.into(), + UtimensatFlags::FollowSymlink, + ); + + match res { + Ok(_) => (), + Err(ref err) if err.is_errno(Errno::EOPNOTSUPP) => (), + Err(err) => { + let err = format_err!(err).context("failed to restore mtime attribute"); + error_handler(err)?; + } + } + + if metadata.stat.flags != 0 { + apply_flags(feature_flags, fd, metadata.stat.flags).or_else(&**error_handler)?; + } + + Ok(()) +} + +fn apply_ownership( + feature_flags: Flags, + c_proc_path: &CStr, + metadata: &Metadata, +) -> Result<(), Error> { + if !feature_flags.contains(Flags::WITH_OWNER) { + return Ok(()); + } + + // UID and GID first, as this fails if we lose access anyway. + nix::unistd::chown( + c_proc_path, + Some(metadata.stat.uid.into()), + Some(metadata.stat.gid.into()), + ) + .or_else(allow_notsupp) + .context("failed to apply ownership") +} + +fn add_fcaps( + feature_flags: Flags, + c_proc_path: &CStr, + metadata: &Metadata, + skip_xattrs: &mut bool, +) -> Result<(), Error> { + if *skip_xattrs || !feature_flags.contains(Flags::WITH_FCAPS) { + return Ok(()); + } + let fcaps = match metadata.fcaps.as_ref() { + Some(fcaps) => fcaps, + None => return Ok(()), + }; + + c_result!(unsafe { + libc::setxattr( + c_proc_path.as_ptr(), + xattr::xattr_name_fcaps().as_ptr(), + fcaps.data.as_ptr() as *const libc::c_void, + fcaps.data.len(), + 0, + ) + }) + .map(drop) + .or_else(|err| allow_notsupp_remember(err, skip_xattrs)) + .context("failed to apply file capabilities") +} + +fn apply_xattrs( + feature_flags: Flags, + c_proc_path: &CStr, + metadata: &Metadata, + skip_xattrs: &mut bool, +) -> Result<(), Error> { + if *skip_xattrs || !feature_flags.contains(Flags::WITH_XATTRS) { + return Ok(()); + } + + for xattr in &metadata.xattrs { + if *skip_xattrs { + return Ok(()); + } + + if !xattr::is_valid_xattr_name(xattr.name()) { + log::info!("skipping invalid xattr named {:?}", xattr.name()); + continue; + } + + c_result!(unsafe { + libc::setxattr( + c_proc_path.as_ptr(), + xattr.name().as_ptr() as *const libc::c_char, + xattr.value().as_ptr() as *const libc::c_void, + xattr.value().len(), + 0, + ) + }) + .map(drop) + .or_else(|err| allow_notsupp_remember(err, &mut *skip_xattrs)) + .context("failed to apply extended attributes")?; + } + + Ok(()) +} + +fn apply_acls(feature_flags: Flags, c_proc_path: &CStr, metadata: &Metadata) -> Result<(), Error> { + if !feature_flags.contains(Flags::WITH_ACL) || metadata.acl.is_empty() { + return Ok(()); + } + + let mut acl = acl::ACL::init(5)?; + + // acl type access: + acl.add_entry_full( + acl::ACL_USER_OBJ, + None, + acl::mode_user_to_acl_permissions(metadata.stat.mode), + )?; + + acl.add_entry_full( + acl::ACL_OTHER, + None, + acl::mode_other_to_acl_permissions(metadata.stat.mode), + )?; + + match metadata.acl.group_obj.as_ref() { + Some(group_obj) => { + acl.add_entry_full( + acl::ACL_MASK, + None, + acl::mode_group_to_acl_permissions(metadata.stat.mode), + )?; + acl.add_entry_full(acl::ACL_GROUP_OBJ, None, group_obj.permissions.0)?; + } + None => { + let mode = acl::mode_group_to_acl_permissions(metadata.stat.mode); + + acl.add_entry_full(acl::ACL_GROUP_OBJ, None, mode)?; + + if !metadata.acl.users.is_empty() || !metadata.acl.groups.is_empty() { + log::warn!("Warning: Missing GROUP_OBJ entry in ACL, resetting to value of MASK"); + acl.add_entry_full(acl::ACL_MASK, None, mode)?; + } + } + } + + for user in &metadata.acl.users { + acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?; + } + + for group in &metadata.acl.groups { + acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?; + } + + if !acl.is_valid() { + bail!("Error while restoring ACL - ACL invalid"); + } + + acl.set_file(c_proc_path, acl::ACL_TYPE_ACCESS)?; + drop(acl); + + // acl type default: + if let Some(default) = metadata.acl.default.as_ref() { + let mut acl = acl::ACL::init(5)?; + + acl.add_entry_full(acl::ACL_USER_OBJ, None, default.user_obj_permissions.0)?; + + acl.add_entry_full(acl::ACL_GROUP_OBJ, None, default.group_obj_permissions.0)?; + + acl.add_entry_full(acl::ACL_OTHER, None, default.other_permissions.0)?; + + if default.mask_permissions != pxar::format::acl::Permissions::NO_MASK { + acl.add_entry_full(acl::ACL_MASK, None, default.mask_permissions.0)?; + } + + for user in &metadata.acl.default_users { + acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?; + } + + for group in &metadata.acl.default_groups { + acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?; + } + + if !acl.is_valid() { + bail!("Error while restoring ACL - ACL invalid"); + } + + acl.set_file(c_proc_path, acl::ACL_TYPE_DEFAULT)?; + } + + Ok(()) +} + +fn apply_quota_project_id( + feature_flags: Flags, + fd: RawFd, + metadata: &Metadata, +) -> Result<(), Error> { + if !feature_flags.contains(Flags::WITH_QUOTA_PROJID) { + return Ok(()); + } + + let projid = match metadata.quota_project_id { + Some(projid) => projid, + None => return Ok(()), + }; + + let mut fsxattr = fs::FSXAttr::default(); + unsafe { + fs::fs_ioc_fsgetxattr(fd, &mut fsxattr) + .context("error while getting fsxattr to restore quota project id")?; + + fsxattr.fsx_projid = projid.projid as u32; + + fs::fs_ioc_fssetxattr(fd, &fsxattr) + .context("error while setting fsxattr to restore quota project id")?; + } + + Ok(()) +} + +fn errno_is_unsupported(errno: Errno) -> bool { + matches!( + errno, + Errno::ENOTTY | Errno::ENOSYS | Errno::EBADF | Errno::EOPNOTSUPP | Errno::EINVAL + ) +} + +fn apply_chattr(fd: RawFd, chattr: libc::c_long, mask: libc::c_long) -> Result<(), Error> { + if chattr == 0 { + return Ok(()); + } + + let mut fattr: libc::c_long = 0; + match unsafe { fs::read_attr_fd(fd, &mut fattr) } { + Ok(_) => (), + Err(errno) if errno_is_unsupported(errno) => { + return Ok(()); + } + Err(err) => return Err(err).context("failed to read file attributes"), + } + + let attr = (chattr & mask) | (fattr & !mask); + + if attr == fattr { + return Ok(()); + } + + match unsafe { fs::write_attr_fd(fd, &attr) } { + Ok(_) => Ok(()), + Err(errno) if errno_is_unsupported(errno) => Ok(()), + Err(err) => Err(err).context("failed to set file attributes"), + } +} + +fn apply_flags(feature_flags: Flags, fd: RawFd, entry_flags: u64) -> Result<(), Error> { + let entry_flags = Flags::from_bits_truncate(entry_flags); + + apply_chattr(fd, entry_flags.to_chattr(), feature_flags.to_chattr())?; + + let fatattr = (feature_flags & entry_flags).to_fat_attr(); + if fatattr != 0 { + match unsafe { fs::write_fat_attr_fd(fd, &fatattr) } { + Ok(_) => (), + Err(errno) if errno_is_unsupported(errno) => (), + Err(err) => return Err(err).context("failed to set file FAT attributes"), + } + } + + Ok(()) +} diff --git a/pbs-client/src/pxar/aio/mod.rs b/pbs-client/src/pxar/aio/mod.rs new file mode 100644 index 00000000..dfa8d84a --- /dev/null +++ b/pbs-client/src/pxar/aio/mod.rs @@ -0,0 +1,11 @@ +mod dir_stack; + +pub(crate) mod extract; +pub use extract::{ + AsyncExtractor, Callback, ErrorHandler, PxarExtractOptions, PxarExtractOptionsBuilder, + RawAsyncExtractor, +}; + +mod metadata; + +mod worker; diff --git a/pbs-client/src/pxar/aio/worker.rs b/pbs-client/src/pxar/aio/worker.rs new file mode 100644 index 00000000..791e3129 --- /dev/null +++ b/pbs-client/src/pxar/aio/worker.rs @@ -0,0 +1,167 @@ +#![allow(unused)] +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; + +use anyhow::{bail, Error}; +use tokio::sync::mpsc; + +use super::ErrorHandler; + +type WorkerTask = Box Result<(), Error> + Send>; + +/// [`SyncTaskWorker`] is a wrapper around a [`Thread`][std::thread::Thread] +/// with the purpose of running synchronous tasks provided by a queue. +/// +/// A task is anything that implements [`FnOnce() -> Result<(), Error>`] +/// and can be sent asynchronously to the worker via [`send()`]. +/// +/// As soon as a task returns an [`Error`], the worker will drop its channel, +/// discarding any remaining tasks. Errors therefore either ought to be handled +/// inside tasks themselves, or with an [explicitly provided][eh] [`ErrorHandler`]. +/// +/// [eh]: Self::with_error_handler +pub(crate) struct SyncTaskWorker { + sender: Option>, + join_handle: Option>>, + task_error: Arc>>, +} + +impl SyncTaskWorker { + pub fn new(queue_size: usize) -> Self { + Self::new_inner(queue_size, Arc::new(Box::new(Err))) + } + + pub fn with_error_handler(queue_size: usize, error_handler: Arc) -> Self { + Self::new_inner(queue_size, error_handler) + } + + fn new_inner(queue_size: usize, error_handler: Arc) -> Self { + let (sender, receiver) = mpsc::channel(queue_size); + + let task_error = Arc::new(Mutex::new(None)); + + let worker_loop = { + let mut receiver = receiver; + let last_error = Arc::clone(&task_error); + let error_handler = Arc::clone(&error_handler); + + move || loop { + let function: WorkerTask = if let Some(function) = receiver.blocking_recv() { + function + } else { + return; + }; + + let result = (function)().or_else(|error| error_handler(error)); + + match result { + Ok(()) => (), + Err(error) => { + let mut guard = last_error.lock().unwrap(); + if guard.is_none() { + let error = error + .context("extractor worker thread encountered unexpected error"); + *guard = Some(error); + } + + drop(receiver); + return; + } + } + } + }; + + let join_handle = thread::spawn(worker_loop); + + Self { + sender: Some(sender), + join_handle: Some(Box::new(join_handle)), + task_error, + } + } + + #[inline] + fn check_for_error(&self) -> Result<(), Error> { + let mut guard = self.task_error.lock().unwrap(); + if let Some(error) = guard.take() { + Err(error) + } else { + Ok(()) + } + } + + /// Send a task to the worker to execute. + /// + /// In case any of the worker thread's previous tasks failed, its error + /// will be returned. Subsequent calls will not return the same error. + /// + /// Otherwise, this will fail if the worker's channel is already closed or + /// if its internal sender has already been dropped. + #[inline] + pub async fn send(&self, task: F) -> Result<(), Error> + where + F: FnOnce() -> Result<(), Error> + Send + 'static, + { + self.check_for_error()?; + + if let Some(ref sender) = self.sender { + match sender.send(Box::new(task)).await { + Ok(()) => Ok(()), + Err(_) => { + bail!("failed to send task input to worker - channel closed"); + } + } + } else { + bail!("failed to send task input to worker - sender already dropped"); + } + } + + /// Get the current available capacity of the worker thread's queue. + #[inline] + pub fn queue_capacity(&self) -> Option { + self.sender.as_ref().map(|s| s.capacity()) + } + + /// Get the maximum capacity of the worker thread's queue. + #[inline] + pub fn max_queue_capacity(&self) -> Option { + self.sender.as_ref().map(|s| s.max_capacity()) + } + + /// Run all outstanding tasks until completion or until a task returns an error. + /// + /// The first encountered error will be returned, if it exists. + /// + /// Subsequent calls to [`join()`] will not have any effect. + #[inline] + pub fn join(&mut self) -> Result<(), Error> { + drop(self.sender.take()); + + let join_res = self + .join_handle + .take() + .map(|handle| handle.join()) + .unwrap_or(Ok(())); + + // Can't use `anyhow::Context` on `JoinError`, grr + match join_res { + Ok(()) => (), + Err(_) => bail!("extractor worker thread panicked"), + }; + + self.check_for_error()?; + + Ok(()) + } +} + +// [`Drop`] is implemented to ensure the thread is joined appropriately when +// the worker is dropped. +// +// Note however, that this will consume `last_error` (if any), so it's always +// better to check for errors manually. +impl Drop for SyncTaskWorker { + fn drop(&mut self) { + let _ = self.join(); + } +} diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs index 14674b9b..9060bc0e 100644 --- a/pbs-client/src/pxar/mod.rs +++ b/pbs-client/src/pxar/mod.rs @@ -47,6 +47,7 @@ //! (user, group, acl, ...) because this is already defined by the //! linked `ENTRY`. +pub mod aio; pub(crate) mod create; pub(crate) mod dir_stack; pub(crate) mod extract; -- 2.39.2