public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>`
@ 2023-08-28 14:42 Max Carrara
  2023-08-28 14:42 ` [pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>` Max Carrara
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Max Carrara @ 2023-08-28 14:42 UTC (permalink / raw)
  To: pbs-devel

This RFC proposes an asynchronous implementation of
`pbs_client::pxar::extract::{Extractor, ExtractorIter}`.

This `AsyncExtractor<T>` has been remodeled from the ground up while
preserving the core extraction logic. Its purpose is to provide
fully concurrent extraction of pxar files or streams. It does so by
offloading every synchronous / blocking call to a separate worker
thread with an internal queue. Extraction tasks are executed
sequentially to allow for predictable behaviour.

The async extractor is intentionally put into a separate module
(located at `pbs_client::pxar::aio`), as a complete refactor of every
existing extraction-related piece of code is beyond the scope (and
intention) of this RFC.

Its public API is nowhere near final, but serves its purpose for the
time being. Other functions found within `pbs_client::pxar::extract`
are not yet implemented.

Questions this RFC intends to resolve:
  1. In which situations would the `AsyncExtractor<T>` make sense?
     In which wouldn't it?
  2. Should the sync variant be kept around, sharing a `common`
     implementation with its async variant? If yes, why?
  3. Are there any features that the `AsyncExtractor<T>` lacks?

Even though of lesser priority, these questions should also be addressed:
  4. Which parts of the `AsyncExtractor<T>` are inadequate and could
     use improvement?
  5. Which traits should the `AsyncExtractor<T>` implement (if any?)
     (e.g. `tokio_stream`, etc.)

Furthermore, due to the nature of async applications requiring a
runtime in Rust, the `AsyncExtractor<T>` currently suffers from
the runtime's overhead. This difference in performance can be seen
when comparing the async version of `pxar` (see patch 2) with its
current sync counterpart. In my opinion, this does point towards a
common implementation which may be used by either sync or async
variant, but I am curious to what others have to say.

Let me know what you think! :-)

Max Carrara (2):
  pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>`
  pxar-bin: Use async instead of sync extractor

 Cargo.toml                                   |   1 +
 pbs-client/Cargo.toml                        |   1 +
 pbs-client/src/pxar/aio/dir_stack.rs         | 543 +++++++++++++++++++
 pbs-client/src/pxar/aio/extract/extractor.rs | 446 +++++++++++++++
 pbs-client/src/pxar/aio/extract/mod.rs       | 220 ++++++++
 pbs-client/src/pxar/aio/extract/raw.rs       | 503 +++++++++++++++++
 pbs-client/src/pxar/aio/metadata.rs          | 412 ++++++++++++++
 pbs-client/src/pxar/aio/mod.rs               |  11 +
 pbs-client/src/pxar/aio/worker.rs            | 167 ++++++
 pbs-client/src/pxar/mod.rs                   |   1 +
 pxar-bin/src/main.rs                         |  91 ++--
 11 files changed, 2352 insertions(+), 44 deletions(-)
 create mode 100644 pbs-client/src/pxar/aio/dir_stack.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/extractor.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/mod.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/raw.rs
 create mode 100644 pbs-client/src/pxar/aio/metadata.rs
 create mode 100644 pbs-client/src/pxar/aio/mod.rs
 create mode 100644 pbs-client/src/pxar/aio/worker.rs

--
2.39.2





^ permalink raw reply	[flat|nested] 4+ messages in thread

* [pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>`
  2023-08-28 14:42 [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>` Max Carrara
@ 2023-08-28 14:42 ` Max Carrara
  2023-08-28 14:42 ` [pbs-devel] [RFC PATCH proxmox-backup 2/2] pxar-bin: Use async instead of sync extractor Max Carrara
  2023-12-15 16:33 ` [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>` Max Carrara
  2 siblings, 0 replies; 4+ messages in thread
From: Max Carrara @ 2023-08-28 14:42 UTC (permalink / raw)
  To: pbs-devel

This preliminary commit adds a fully functioning prototype
implementation of `AsyncExtractor<T>`, which is - unsurprisingly -
a complete `async` implementation of the original `Extractor` (and its
accompanying `ExtractorIter`).

In order to limit the scope of this RFC and focus on its intent,
the `pbs_client::pxar::aio` module is introduced, which contains all
relevant changes (instead of refactoring half of the crate in a dozen
or two commits).

The design of the new extractor is split into three main parts:

  1. the higher-level `AsyncExtractor<T>` which exposes the public
     extraction API
  2. the lower-level `RawAsyncExtractor` which serves as the
     "workhorse" of its implementors, including `AsyncExtractor<T>`
  3. the `SyncTaskWorker` which receives blocking / synchronous tasks
     via a queue and runs them in parallel - this allows the
     `RawAsyncExtractor` to remain fully cooperative and non-blocking

Furthermore, by moving everything that can be considered an
*extraction option* into the `PxarExtractionOptions` `struct`, the
overall control flow and usage of `AsyncExtractor<T>` and
`RawAsyncExtractor` is much simpler to follow.

The `ErrorHandler` type alias is made more flexible by requiring
an `Fn` instead of an `FnMut`; the callback is no longer generic but
instead becomes its own threadsafe alias named `Callback`.

The following modules are adapted in a similar fashion, but remain
identical in their function:

  `pbs_client::pxar::dir_stack` --> `pbs_client::pxar::aio::dir_stack`

  * The original `PxarDirStack` is split in two parts; it now uses
    the new `DirStack<T>` in order to be somewhat more testable
  * Synchronization mechanisms and guarantees are put in place
    in regards to the new extractor implementation, as the
    `PxarDirStack` must now be shared between multiple threads or
    tasks

  `pbs_client::pxar::metadata`  --> `pbs_client::pxar::aio::metadata`

  * Function signatures now take an `&Arc<pxar::Entry>` in order to
    make it easier to be passed around threads.
    (There was no measurable difference between `Arc<pxar::Entry>`
    vs `&Arc<pxar::Entry>` vs `&pxar::Entry`)
  * The old `ErrorHandler` is replaced with the new version in
    `pbs_client::pxar::aio`

Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
 Cargo.toml                                   |   1 +
 pbs-client/Cargo.toml                        |   1 +
 pbs-client/src/pxar/aio/dir_stack.rs         | 543 +++++++++++++++++++
 pbs-client/src/pxar/aio/extract/extractor.rs | 446 +++++++++++++++
 pbs-client/src/pxar/aio/extract/mod.rs       | 220 ++++++++
 pbs-client/src/pxar/aio/extract/raw.rs       | 503 +++++++++++++++++
 pbs-client/src/pxar/aio/metadata.rs          | 412 ++++++++++++++
 pbs-client/src/pxar/aio/mod.rs               |  11 +
 pbs-client/src/pxar/aio/worker.rs            | 167 ++++++
 pbs-client/src/pxar/mod.rs                   |   1 +
 10 files changed, 2305 insertions(+)
 create mode 100644 pbs-client/src/pxar/aio/dir_stack.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/extractor.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/mod.rs
 create mode 100644 pbs-client/src/pxar/aio/extract/raw.rs
 create mode 100644 pbs-client/src/pxar/aio/metadata.rs
 create mode 100644 pbs-client/src/pxar/aio/mod.rs
 create mode 100644 pbs-client/src/pxar/aio/worker.rs

diff --git a/Cargo.toml b/Cargo.toml
index c7773f0e..ba6d874a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -122,6 +122,7 @@ hyper = { version = "0.14", features = [ "full" ] }
 lazy_static = "1.4"
 libc = "0.2"
 log = "0.4.17"
+memchr = "2.5"
 nix = "0.26.1"
 nom = "7"
 num-traits = "0.2"
diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml
index ed7d651d..047efb41 100644
--- a/pbs-client/Cargo.toml
+++ b/pbs-client/Cargo.toml
@@ -17,6 +17,7 @@ hyper.workspace = true
 lazy_static.workspace = true
 libc.workspace = true
 log.workspace = true
+memchr.workspace = true
 nix.workspace = true
 openssl.workspace = true
 percent-encoding.workspace = true
diff --git a/pbs-client/src/pxar/aio/dir_stack.rs b/pbs-client/src/pxar/aio/dir_stack.rs
new file mode 100644
index 00000000..62cf9ee5
--- /dev/null
+++ b/pbs-client/src/pxar/aio/dir_stack.rs
@@ -0,0 +1,543 @@
+#![allow(unused)]
+
+use std::ffi::OsStr;
+use std::fmt::Debug;
+use std::os::fd::{AsRawFd, BorrowedFd, RawFd};
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error};
+use nix::dir::Dir;
+use nix::fcntl::OFlag;
+use nix::sys::stat::{mkdirat, Mode};
+
+use proxmox_sys::error::SysError;
+
+// NOTE: This is essentially crate::pxar::tools:assert_single_path_component
+// but kept separate here for the time being
+pub fn assert_path_has_single_normal_component<P: AsRef<Path>>(path: P) -> Result<(), Error> {
+    let path = path.as_ref();
+
+    if !path.is_relative() {
+        bail!("path is absolute: {:?}", path)
+    }
+
+    let mut components = path.components();
+
+    if !matches!(components.next(), Some(std::path::Component::Normal(_))) {
+        bail!("invalid component in path: {:?}", path)
+    }
+
+    if components.next().is_some() {
+        bail!("path has multiple components: {:?}", path)
+    }
+
+    Ok(())
+}
+
+/// A stack which stores directory path components and associates each directory
+/// with some data.
+pub struct DirStack<T> {
+    path: PathBuf,
+    data_stack: Vec<T>,
+}
+
+impl<T> Default for DirStack<T> {
+    #[inline]
+    fn default() -> Self {
+        Self {
+            path: PathBuf::new(),
+            data_stack: vec![],
+        }
+    }
+}
+
+impl<T: Clone> Clone for DirStack<T> {
+    #[inline]
+    fn clone(&self) -> Self {
+        Self {
+            path: self.path.clone(),
+            data_stack: self.data_stack.clone(),
+        }
+    }
+}
+
+impl<T: Debug> Debug for DirStack<T> {
+    #[inline]
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("DirStack")
+            .field("path", &self.path)
+            .field("data_stack", &self.data_stack)
+            .finish()
+    }
+}
+
+impl<T> DirStack<T> {
+    /// Returns a [`Path`] slice of the entire path that's on the stack.
+    ///
+    /// The [dangling root](DirStack::with_dangling_root) will be included if it exists.
+    #[inline]
+    pub fn as_path(&self) -> &Path {
+        self.path.as_path()
+    }
+
+    /// Removes all data from the stack.
+    ///
+    /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists.
+    #[inline]
+    pub fn clear(&mut self) {
+        while self.pop().is_some() {}
+    }
+
+    /// Returns the [dangling root](DirStack::with_dangling_root) or an empty
+    /// path if it doesn't exist.
+    #[inline]
+    pub fn dangling_root(&self) -> &Path {
+        if self.data_stack.is_empty() {
+            self.path.as_ref()
+        } else {
+            let mut iter = self.path.iter();
+            iter.nth_back(self.data_stack.len() - 1);
+            iter.as_path()
+        }
+    }
+
+    /// Returns the first path component and its associated data.
+    ///
+    /// If the stack has a [dangling root](DirStack::with_dangling_root), it
+    /// will be ignored. Meaning, only the first *added* path component will
+    /// be returned.
+    #[inline]
+    pub fn first(&self) -> Option<(&Path, &T)> {
+        self.data_stack.first().and_then(|data| {
+            self.path
+                .iter()
+                .nth_back(self.data_stack.len() - 1)
+                .map(|component| (component.as_ref(), data))
+        })
+    }
+
+    /// Checks whether the stack is empty.
+    ///
+    /// The stack is considered empty if there are no path components with
+    /// associated data, even if it has a [dangling root](DirStack::with_dangling_root).
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.data_stack.is_empty()
+    }
+
+    /// Returns the last path component and its associated data.
+    #[inline]
+    pub fn last(&self) -> Option<(&Path, &T)> {
+        self.data_stack.last().and_then(|data| {
+            self.path
+                .file_name()
+                .map(|file_name| (file_name.as_ref(), data))
+        })
+    }
+
+    /// Returns the number of elements on the stack.
+    ///
+    /// Note that the root path (the very first path) may consist of multiple components,
+    /// but is still counted as one element.
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.data_stack.len()
+    }
+
+    /// Creates a new, empty [`DirStack`].
+    #[inline]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Returns the **full path** before [`DirStack::last()`] and the data associated
+    /// with the component it leads to, **if both exist.**
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// use std::path::Path;
+    ///
+    /// # use tools::DirStack;
+    ///
+    /// let mut dir_stack: DirStack<u8> = DirStack::new();
+    ///
+    /// dir_stack.push("foo", 1);
+    /// dir_stack.push("bar", 2);
+    /// dir_stack.push("baz", 3);
+    ///
+    /// let expected = Path::new("foo/bar");
+    ///
+    /// assert_eq!(dir_stack.parent(), Some((expected, &2)));
+    ///
+    /// ```
+    #[inline]
+    pub fn parent(&self) -> Option<(&Path, &T)> {
+        if self.is_empty() {
+            None
+        } else {
+            self.path.parent().and_then(|parent| {
+                if self.data_stack.len() > 1 {
+                    Some((parent, &self.data_stack[self.data_stack.len() - 2]))
+                } else {
+                    None
+                }
+            })
+        }
+    }
+    /// Removes the last path component from the stack and returns its associated
+    /// data, or [`None`] if the stack is empty.
+    #[inline]
+    pub fn pop(&mut self) -> Option<T> {
+        self.data_stack.pop().map(|data| {
+            self.path.pop();
+            data
+        })
+    }
+
+    /// Pushes a path and its associated data onto the stack.
+    ///
+    /// The path must consist of a single normal component.
+    /// See [`Component::Normal`][std::path::Component::Normal].
+    #[inline]
+    pub fn push<P: AsRef<Path>>(&mut self, path: P, data: T) -> Result<(), Error> {
+        assert_path_has_single_normal_component(&path)?;
+
+        self.path.push(path.as_ref());
+        self.data_stack.push(data);
+
+        Ok(())
+    }
+
+    /// Clears the stack, replacing all elements with the given path and that path's
+    /// associated data.
+    ///
+    /// The [dangling root](DirStack::with_dangling_root) will be preserved if it exists.
+    ///
+    /// Contrary to [`DirStack::push()`], this can be any path and doesn't have to consist of
+    /// only a single path component. This is useful if you want to e.g. initialize the stack
+    /// with your current working directory or similar.
+    #[inline]
+    pub fn replace<P: AsRef<Path>>(&mut self, path: P, data: T) {
+        self.clear();
+        self.path.push(path);
+        self.data_stack.push(data);
+    }
+
+    /// Creates a new [`DirStack`] from a path without any associated data.
+    /// This stack has a **dangling root** which means that even though it has a
+    /// path, it is still considered to be empty, as it lacks any data.
+    ///
+    /// This is useful if you want to have a base directory that can never
+    /// be left through [`DirStack::pop()`], such as your current working directory,
+    /// for example.
+    #[inline]
+    pub fn with_dangling_root<P: AsRef<Path>>(path: P) -> Self {
+        Self {
+            path: path.as_ref().to_owned(),
+            data_stack: vec![],
+        }
+    }
+}
+
+impl<T, Idx> std::ops::Index<Idx> for DirStack<T>
+where
+    Idx: std::slice::SliceIndex<[T], Output = T>,
+{
+    type Output = T;
+
+    #[inline]
+    fn index(&self, index: Idx) -> &Self::Output {
+        &self.data_stack[index]
+    }
+}
+
+impl<T, Idx> std::ops::IndexMut<Idx> for DirStack<T>
+where
+    Idx: std::slice::SliceIndex<[T], Output = T>,
+{
+    #[inline]
+    fn index_mut(&mut self, index: Idx) -> &mut Self::Output {
+        &mut self.data_stack[index]
+    }
+}
+
+/// Helper struct that associates a [`Entry`][e] with a potentially opened directory [`Dir`].
+///
+/// It's the developer's responsibility that the contained [`Entry`][e]'s *kind*
+/// is actually a [`Directory`][d], and that the opened [`Dir`] corresponds to the
+/// provided [`Entry`][e].
+///
+/// [e]: pxar::Entry
+/// [d]: pxar::EntryKind::Directory
+#[derive(Debug)]
+pub(super) struct PxarDir {
+    entry: Arc<pxar::Entry>,
+    dir: Option<Dir>,
+}
+
+impl PxarDir {
+    /// Creates a new [`PxarDir`] from an [`Arc<pxar::Entry>`][pxar::Entry].
+    #[inline]
+    pub fn new(entry: Arc<pxar::Entry>) -> Self {
+        Self { entry, dir: None }
+    }
+
+    /// Creates a new [`PxarDir`] from an [`Arc<pxar::Entry>`][pxar::Entry]
+    /// which is associated with a [`Dir`].
+    #[inline]
+    pub fn with_dir(entry: Arc<pxar::Entry>, dir: Dir) -> Self {
+        Self {
+            entry,
+            dir: Some(dir),
+        }
+    }
+
+    /// Return the file name of the inner [`Entry`][pxar::Entry].
+    #[inline]
+    pub fn file_name(&self) -> &OsStr {
+        self.entry.file_name()
+    }
+
+    /// Return a [`BorrowedFd`] to the inner [`Dir`] if available.
+    #[inline]
+    pub fn try_as_borrowed_fd(&self) -> Option<BorrowedFd> {
+        // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead.
+        self.dir
+            .as_ref()
+            .map(|dir| unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) })
+    }
+
+    /// Moves the inner [`Entry`][pxar::Entry] and [`Option<Dir>`][dir] out of the [`PxarDir`].
+    ///
+    /// [dir]: nix::dir::Dir
+    #[inline]
+    pub fn into_inner(self) -> (Arc<pxar::Entry>, Option<Dir>) {
+        (self.entry, self.dir)
+    }
+
+    #[inline]
+    fn create_at(&mut self, parent: RawFd, allow_existing: bool) -> Result<BorrowedFd, Error> {
+        const PERMS: Mode = Mode::from_bits_truncate(0o700);
+
+        match mkdirat(parent, self.file_name(), PERMS) {
+            Ok(()) => (),
+            Err(error) => {
+                if !(allow_existing && error.already_exists()) {
+                    return Err(error).context("directory already exists");
+                }
+            }
+        }
+
+        self.open_at(parent)
+    }
+
+    #[inline]
+    fn open_at(&mut self, parent: RawFd) -> Result<BorrowedFd, Error> {
+        Dir::openat(parent, self.file_name(), OFlag::O_DIRECTORY, Mode::empty())
+            .context("failed to open directory")
+            .map(|dir| {
+                // FIXME: Once `nix` adds `AsFd` support use `.as_fd()` instead.
+                let fd = unsafe { BorrowedFd::borrow_raw(dir.as_raw_fd()) };
+
+                self.dir = Some(dir);
+
+                fd
+            })
+    }
+}
+
+/// The [`PxarDirStack`] is used to keep track of traversed and created [`PxarDir`s][PxarDir].
+#[derive(Debug)]
+pub(super) struct PxarDirStack {
+    inner: DirStack<PxarDir>,
+    len_created: usize,
+}
+
+/// This struct may safely be `Sync` as it is only used in the context of the
+/// [`RawAsyncExtractor`][super::RawAsyncExtractor], which never accesses an
+/// underlying [`Dir`] concurrently or in parallel.
+unsafe impl Sync for PxarDirStack {}
+
+impl PxarDirStack {
+    #[inline]
+    pub fn new() -> Self {
+        Self {
+            inner: DirStack::new(),
+            len_created: 0,
+        }
+    }
+
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.inner.len()
+    }
+
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.inner.is_empty()
+    }
+
+    #[inline]
+    pub fn as_path(&self) -> &Path {
+        self.inner.as_path()
+    }
+
+    #[inline]
+    pub fn push<P: AsRef<Path>>(&mut self, path: P, dir: PxarDir) -> Result<(), Error> {
+        let path = path.as_ref();
+        self.inner.push(path, dir).map(|_| {
+            if self.inner[self.inner.len() - 1].dir.is_some() {
+                self.len_created += 1;
+            }
+        })
+    }
+
+    #[inline]
+    pub fn pop(&mut self) -> Option<PxarDir> {
+        self.inner.pop().map(|dir| {
+            self.len_created = self.len_created.min(self.len());
+            dir
+        })
+    }
+
+    #[inline]
+    pub fn root_dir_fd(&self) -> Result<BorrowedFd, Error> {
+        self.inner
+            .first()
+            .ok_or_else(|| format_err!("stack underrun"))
+            .map(|(_, pxar_dir)| {
+                pxar_dir
+                    .try_as_borrowed_fd()
+                    .context("lost track of directory file descriptors")
+            })?
+    }
+
+    /// Return the last [`Dir`]'s file descriptor as a [`BorrowedFd`]
+    /// or create it if it's not available.
+    #[inline]
+    pub fn last_dir_fd(&mut self, allow_existing: bool) -> Result<BorrowedFd, Error> {
+        if self.is_empty() {
+            bail!("no directory entries on the stack")
+        }
+
+        if self.len_created == 0 {
+            bail!("no created file descriptors on the stack")
+        }
+
+        let mut fd = self.inner[self.len_created - 1]
+            .try_as_borrowed_fd()
+            .context("lost track of directory file descriptors")?
+            .as_raw_fd();
+
+        while self.len_created < self.len() {
+            fd = self.inner[self.len_created]
+                .create_at(fd, allow_existing)
+                .map(|borrowed_fd| borrowed_fd.as_raw_fd())?;
+            self.len_created += 1;
+        }
+
+        self.inner[self.len_created - 1]
+            .try_as_borrowed_fd()
+            .context("lost track of directory file descriptors")
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::ffi::OsStr;
+    use std::path::Path;
+
+    use super::DirStack;
+
+    // helper extension trait to make asserts more concise
+    trait PathHelper {
+        fn as_path(&self) -> &Path;
+    }
+
+    impl<T> PathHelper for T
+    where
+        T: AsRef<OsStr>,
+    {
+        fn as_path(&self) -> &Path {
+            self.as_ref().as_ref()
+        }
+    }
+
+    #[test]
+    fn test_dir_stack_base() {
+        let mut dir_stack: DirStack<u8> = DirStack::new();
+
+        assert!(
+            dir_stack.push("foo", 1).is_ok(),
+            "couldn't push onto stack!"
+        );
+
+        dir_stack.push("bar", 2).unwrap();
+        dir_stack.push("baz", 3).unwrap();
+
+        assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path());
+
+        assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1)));
+        assert_eq!(dir_stack.last(), Some(("baz".as_path(), &3)));
+        assert_eq!(dir_stack.parent(), Some(("foo/bar".as_path(), &2)));
+
+        assert_eq!(dir_stack.pop(), Some(3));
+
+        assert_eq!(dir_stack.last(), Some(("bar".as_path(), &2)));
+        assert_eq!(dir_stack.parent(), Some(("foo".as_path(), &1)));
+
+        assert_eq!(dir_stack.pop(), Some(2));
+
+        assert_eq!(dir_stack.first(), Some(("foo".as_path(), &1)));
+        assert_eq!(dir_stack.last(), Some(("foo".as_path(), &1)));
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.pop(), Some(1));
+
+        assert!(dir_stack.first().is_none());
+        assert!(dir_stack.last().is_none());
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.as_path(), "".as_path());
+
+        dir_stack.push("foo", 1).unwrap();
+        dir_stack.clear();
+
+        assert_eq!(dir_stack.as_path(), "".as_path());
+    }
+
+    #[test]
+    fn test_dir_stack_dangling_root() {
+        let mut dir_stack: DirStack<u8> = DirStack::with_dangling_root("foo/bar/baz");
+
+        assert!(dir_stack.is_empty());
+
+        assert!(dir_stack.push("qux", 1).is_ok());
+
+        let expected = Some(("qux".as_path(), &1u8));
+
+        assert_eq!(dir_stack.first(), expected);
+        assert_eq!(dir_stack.last(), expected);
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.as_path(), "foo/bar/baz/qux".as_path());
+        assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path());
+        assert!(!dir_stack.is_empty());
+
+        assert_eq!(dir_stack.pop(), Some(1));
+
+        assert!(dir_stack.first().is_none());
+        assert!(dir_stack.last().is_none());
+        assert!(dir_stack.parent().is_none());
+
+        assert_eq!(dir_stack.as_path(), "foo/bar/baz".as_path());
+        assert_eq!(dir_stack.dangling_root(), "foo/bar/baz".as_path());
+        assert!(dir_stack.is_empty());
+
+        let empty_dir_stack: DirStack<u8> = DirStack::new();
+        assert!(empty_dir_stack.is_empty());
+        assert_eq!(empty_dir_stack.dangling_root(), "".as_path())
+    }
+}
diff --git a/pbs-client/src/pxar/aio/extract/extractor.rs b/pbs-client/src/pxar/aio/extract/extractor.rs
new file mode 100644
index 00000000..944ec157
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/extractor.rs
@@ -0,0 +1,446 @@
+use std::os::unix::prelude::OsStrExt;
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error, Result};
+use memchr;
+use pathpatterns::{MatchList, MatchType};
+
+use pxar::accessor::aio::Accessor;
+use pxar::accessor::SeqReadAtAdapter;
+use pxar::decoder::aio::Decoder;
+use pxar::decoder::SeqRead;
+
+use super::raw::RawAsyncExtractor;
+use super::PxarExtractOptions;
+
+use crate::pxar::Flags;
+
+/// Helper enum that represents the state of the root [`Entry`][pxar::Entry]
+/// in the context of the [`AsyncExtractor<T>`].
+///
+/// For example: Because the [`Accessor<T>`] skips the archive's root directory once a
+/// [`Decoder<T>`] is instantiated from it, the root entry has to be decoded
+/// and provided beforehand.
+#[derive(Debug)]
+pub(crate) enum RootEntryState {
+    None,
+    Decoded { entry: Arc<pxar::Entry> },
+    Extracted,
+}
+
+pub struct AsyncExtractor<T> {
+    decoder: Box<Decoder<T>>,
+    inner: Box<RawAsyncExtractor>,
+
+    options: Arc<PxarExtractOptions>,
+
+    root_entry_state: RootEntryState,
+    end_reached: bool,
+
+    match_stack: Vec<bool>,
+    current_match: bool,
+}
+
+type FileReader = pxar::accessor::sync::FileRefReader<Arc<std::fs::File>>;
+
+impl AsyncExtractor<SeqReadAtAdapter<FileReader>> {
+    /// Create a new extractor from an existing pxar archive file.
+    ///
+    /// This is the preferred way to extract an archive that has been saved to
+    /// local storage, taking advantage of the fact that the archive is available on a drive.
+    pub async fn with_file<P>(src_path: P, options: PxarExtractOptions) -> Result<Self, Error>
+    where
+        P: AsRef<Path>,
+    {
+        let src_path = src_path.as_ref();
+
+        let file = std::fs::File::open(src_path)
+            .with_context(|| format!("failed to open file: {:#?}", src_path))?;
+
+        // allows us to toss the accessor away once we've gotten our decoder from it
+        let file = Arc::new(file);
+
+        let accessor = Accessor::from_file_ref(file)
+            .await
+            .context("failed to instantiate pxar accessor")?;
+
+        // root entry needs to be pre-decoded because `decode_full()` skips it
+        let root_dir = accessor.open_root().await?;
+        let root_entry = root_dir.lookup_self().await?.entry().clone();
+
+        let decoder = root_dir
+            .decode_full()
+            .await
+            .context("failed to instantiate decoder from accessor")?;
+
+        Ok(Self::with_root_entry(decoder, options, root_entry))
+    }
+}
+
+impl<T: SeqRead> AsyncExtractor<T> {
+    /// Create a new extractor from an existing [`Decoder`].
+    /// It is assumed that no entries were decoded prior to the creation of the extractor.
+    pub fn new(decoder: Decoder<T>, options: PxarExtractOptions) -> Self {
+        let root_entry_state = RootEntryState::None;
+        Self::new_impl(decoder, options, root_entry_state)
+    }
+
+    /// Create a new extractor from an existing [`Decoder`] and a pre-decoded root
+    /// [`Entry`][pxar::Entry]. This is useful if you need to handle the extraction
+    /// of the archive's root in a special manner.
+    pub fn with_root_entry(
+        decoder: Decoder<T>,
+        options: PxarExtractOptions,
+        root_entry: pxar::Entry,
+    ) -> Self {
+        let root_entry_state = RootEntryState::Decoded {
+            entry: Arc::new(root_entry),
+        };
+
+        Self::new_impl(decoder, options, root_entry_state)
+    }
+
+    /// Create a new extractor from an arbitrary input that implements [`SeqRead`].
+    pub async fn with_input(input: T, options: PxarExtractOptions) -> Result<Self, Error> {
+        let decoder = Decoder::new(input)
+            .await
+            .context("failed to instantiate decoder")?;
+
+        let root_entry_state = RootEntryState::None;
+
+        Ok(Self::new_impl(decoder, options, root_entry_state))
+    }
+
+    fn new_impl(
+        mut decoder: Decoder<T>,
+        options: PxarExtractOptions,
+        root_entry_state: RootEntryState,
+    ) -> Self {
+        decoder.enable_goodbye_entries(true);
+
+        let current_match = options.default_match;
+
+        Self {
+            decoder: Box::new(decoder),
+            inner: Box::new(RawAsyncExtractor::new()),
+            options: Arc::new(options),
+            root_entry_state,
+            end_reached: false,
+            match_stack: vec![],
+            current_match,
+        }
+    }
+}
+
+impl<T: SeqRead> AsyncExtractor<T> {
+    /// Decode and extract the next [`Entry`][pxar::Entry].
+    ///
+    /// Upon each call, this function will continue to extract entries until
+    /// either an error is encountered or the archive has been fully extracted.
+    #[inline]
+    pub async fn next(&mut self) -> Option<Result<(), Error>> {
+        if self.end_reached {
+            return None;
+        }
+
+        if !matches!(self.root_entry_state, RootEntryState::Extracted) {
+            return Some(self.handle_root_entry().await);
+        }
+
+        let entry = match self.decode_next().await {
+            Some(Ok(entry)) => entry,
+            // other cases are already handled in `decode_next()` so we can just return here
+            rval => {
+                return rval.map(|result| result.map(drop));
+            }
+        };
+
+        let mut result = match self.extract_next(entry).await {
+            Err(error) => self.exec_error_handler(error).await,
+            res => res,
+        };
+
+        if result.is_err() {
+            if let Err(stop_error) = self.stop_extraction() {
+                result = result
+                    .context(stop_error)
+                    .context("encountered another error during extractor shutdown");
+            }
+        }
+
+        Some(result)
+    }
+
+    /// Helper method that starts the extraction process by extracting the root
+    /// directory of the archive to the given destination.
+    #[inline]
+    async fn start_extraction(&mut self, entry: Arc<pxar::Entry>) -> Result<(), Error> {
+        if matches!(self.root_entry_state, RootEntryState::Extracted) {
+            bail!("root entry of archive was already extracted")
+        }
+
+        self.inner.start_worker_thread(&self.options)?;
+
+        self.inner
+            .extract_root_dir(entry, &self.options)
+            .await
+            .map(|_| {
+                self.root_entry_state = RootEntryState::Extracted;
+            })
+    }
+
+    /// Helper method that stops the extraction process by
+    #[inline]
+    fn stop_extraction(&mut self) -> Result<(), Error> {
+        self.end_reached = true;
+        proxmox_async::runtime::block_in_place(|| self.inner.join_worker_thread())
+    }
+
+    /// Decodes the next [`Entry`][pxar::Entry] and wraps it in an [`Arc`] on success.
+    ///
+    /// If an error occurs while decoding an entry, the extractor is subsequently stopped.
+    #[inline(always)] // hot path
+    async fn decode_next(&mut self) -> Option<Result<Arc<pxar::Entry>, Error>> {
+        let entry = match self.decoder.next().await {
+            None => {
+                return if let Err(error) = self.stop_extraction() {
+                    Some(Err(error))
+                } else {
+                    None
+                };
+            }
+            Some(Err(error)) => {
+                let result = Err(error).context("error reading pxar archive");
+
+                let result = if let Err(stop_error) = self.stop_extraction() {
+                    result
+                        .context(stop_error)
+                        .context("encountered another error during extractor shutdown")
+                } else {
+                    result
+                };
+
+                return Some(result);
+            }
+            Some(Ok(entry)) => entry,
+        };
+
+        Some(Ok(Arc::new(entry)))
+    }
+
+    /// Helper method that extracts the archive's root entry depending on the
+    /// [`RootEntryState`].
+    ///
+    /// If the root pxar entry is decoded or was provided, the extraction process
+    /// is started via [`RawAsyncExtractor::start_extraction()`].
+    async fn handle_root_entry(&mut self) -> Result<(), Error> {
+        let decode_result = match self.root_entry_state {
+            RootEntryState::None => match self.decode_next().await {
+                Some(result) => result.context("error while decoding root entry"),
+                None => Err(format_err!("no root entry found - archive is empty")),
+            },
+            RootEntryState::Decoded { ref entry } => Ok(Arc::clone(entry)),
+            RootEntryState::Extracted => Err(format_err!("root entry was already extracted")),
+        };
+
+        let entry = decode_result.map_err(|error| {
+            let _ = self.stop_extraction();
+            error
+        })?;
+
+        self.start_extraction(entry).await.map_err(|error| {
+            let _ = self.stop_extraction();
+            error
+        })
+    }
+
+    /// Extract an [`Entry`][pxar::Entry] depending on its [`EntryKind`][pxar::EntryKind].
+    ///
+    ///
+    /// This method checks each entry's filename and matches it against the given
+    /// [`MatchList`]. Once an entry is safe to extract and matches, the
+    /// [`Callback`][super::Callback] is executed and the underlying [`RawAsyncExtractor`]
+    /// is used to perform the actual extraction operation.
+    #[inline(always)] // hot path
+    async fn extract_next(&mut self, entry: Arc<pxar::Entry>) -> Result<(), Error> {
+        Self::check_entry_filename(&entry)?;
+
+        let (match_type, did_match) = self.match_entry(&entry);
+
+        use pxar::EntryKind::*;
+
+        match (did_match, entry.kind()) {
+            (_, Directory) => {
+                self.exec_callback(&entry).await;
+
+                let do_create = self.current_match && match_type != Some(MatchType::Exclude);
+
+                let res = self
+                    .inner
+                    .enter_directory(entry, &self.options, do_create)
+                    .await;
+
+                if res.is_ok() {
+                    // We're starting a new directory, push our old matching
+                    // state and replace it with our new one:
+                    self.match_stack.push(self.current_match);
+                    self.current_match = did_match;
+                }
+
+                res
+            }
+            (_, GoodbyeTable) => {
+                self.exec_callback(&entry).await;
+                let res = self.inner.leave_directory(&self.options).await;
+
+                if res.is_ok() {
+                    // We left a directory, also get back our previous matching state. This is in sync
+                    // with `dir_stack` so this should never be empty except for the final goodbye
+                    // table, in which case we get back to the default of `true`.
+                    self.current_match = self.match_stack.pop().unwrap_or(true);
+                }
+
+                res
+            }
+            (true, Symlink(_)) => {
+                self.exec_callback(&entry).await;
+
+                self.inner.extract_symlink(entry, &self.options).await
+            }
+            (true, Hardlink(_)) => {
+                self.exec_callback(&entry).await;
+
+                self.inner.extract_hardlink(entry, &self.options).await
+            }
+            (true, Device(dev)) => {
+                self.exec_callback(&entry).await;
+
+                if self.has_feature_flags(Flags::WITH_DEVICE_NODES) {
+                    let dev = dev.to_owned();
+                    self.inner.extract_device(entry, &self.options, dev).await
+                } else {
+                    Ok(())
+                }
+            }
+            (true, Fifo) => {
+                self.exec_callback(&entry).await;
+
+                if self.has_feature_flags(Flags::WITH_FIFOS) {
+                    self.inner.extract_fifo(entry, &self.options, 0).await
+                } else {
+                    Ok(())
+                }
+            }
+            (true, Socket) => {
+                self.exec_callback(&entry).await;
+
+                if self.has_feature_flags(Flags::WITH_SOCKETS) {
+                    self.inner.extract_socket(entry, &self.options, 0).await
+                } else {
+                    Ok(())
+                }
+            }
+            (true, File { size, .. }) => {
+                self.exec_callback(&entry).await;
+
+                if let Some(ref mut contents) = self.decoder.contents() {
+                    let size = size.to_owned();
+                    self.inner
+                        .extract_file(entry, &self.options, contents, size)
+                        .await
+                } else {
+                    Err(format_err!(
+                        "found regular file entry without contents in archive"
+                    ))
+                }
+            }
+            (false, _) => Ok(()),
+        }
+    }
+
+    /// Checks whether the [`Entry`'s][e] filename is valid.
+    ///
+    /// The filename is valid if and only if:
+    /// * it doesn't contain slashes `/`
+    /// * it doesn't contain null bytes `\0`
+    ///
+    /// [e]: pxar::Entry
+    fn check_entry_filename(entry: &pxar::Entry) -> Result<(), Error> {
+        let file_name_bytes = entry.file_name().as_bytes();
+
+        if let Some(pos) = memchr::memchr(b'/', file_name_bytes) {
+            bail!(
+                "archive entry filename contains slash at position {pos}, \
+                which is invalid and a security concern"
+            )
+        }
+
+        if let Some(pos) = memchr::memchr(0, file_name_bytes) {
+            bail!("archive entry filename contains NULL byte at position {pos}")
+        }
+
+        Ok(())
+    }
+
+    /// Helper method that checks whether the [`Entry`'s][pxar::Entry] path and file mode
+    /// match the provided [`MatchList`] in [`PxarExtractOptions`].
+    ///
+    /// Whether the entry actually matched is also returned together with the
+    /// [`Option<MatchType>`][MatchType]. If the latter is [`None`], the
+    /// **current match** is used as a fallback.
+    #[inline]
+    pub(crate) fn match_entry(&self, entry: &pxar::Entry) -> (Option<MatchType>, bool) {
+        let path_bytes = entry.path().as_os_str().as_bytes();
+        let file_mode = entry.metadata().file_type() as u32;
+
+        // NOTE: On large match lists this blocks for quite some time,
+        // so this could maybe be called in spawn_blocking() above a certain size
+        // ... buuut that depends on how we define what a "large" `match_list` actually is
+
+        // We can `unwrap()` safely here because we get a `Result<_, Infallible>`
+        let match_type = self
+            .options
+            .match_list
+            .matches(path_bytes, Some(file_mode))
+            .unwrap();
+
+        let did_match = match match_type {
+            Some(MatchType::Include) => true,
+            Some(MatchType::Exclude) => false,
+            None => self.current_match,
+        };
+
+        (match_type, did_match)
+    }
+
+    #[inline]
+    fn has_feature_flags(&self, feature_flags: Flags) -> bool {
+        self.options.feature_flags.contains(feature_flags)
+    }
+
+    /// Helper method to spawn and await a task that executes the extractor's
+    /// [`ErrorHandler`][super::ErrorHandler].
+    #[inline]
+    async fn exec_error_handler(&self, error: Error) -> Result<(), Error> {
+        tokio::task::spawn_blocking({
+            let error_handler = Arc::clone(&self.options.error_handler);
+            move || error_handler(error)
+        })
+        .await
+        .context("failed to execute error handler")?
+    }
+
+    /// Helper method to spawn and await a task that executes the extractor's
+    /// [`Callback`][super::Callback].
+    #[inline]
+    async fn exec_callback(&self, entry: &Arc<pxar::Entry>) {
+        if let Some(ref callback) = self.options.callback {
+            tokio::task::spawn({
+                let callback = Arc::clone(callback);
+                let entry = Arc::clone(entry);
+                async move { callback(entry).await }
+            });
+        }
+    }
+}
diff --git a/pbs-client/src/pxar/aio/extract/mod.rs b/pbs-client/src/pxar/aio/extract/mod.rs
new file mode 100644
index 00000000..82d9a596
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/mod.rs
@@ -0,0 +1,220 @@
+use std::borrow::Cow;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+
+use futures::future::BoxFuture;
+use lazy_static::lazy_static;
+
+use pathpatterns::MatchEntry;
+
+use crate::pxar::Flags;
+
+pub(crate) mod extractor;
+pub use extractor::AsyncExtractor;
+
+pub(crate) mod raw;
+pub use raw::RawAsyncExtractor;
+
+// NOTE: These will be put into a separate module and are only used here
+//       to avoid (too much) code duplication
+use crate::pxar::extract::OverwriteFlags;
+use crate::pxar::extract::PxarExtractContext;
+
+pub type ErrorHandler =
+    Box<dyn Fn(anyhow::Error) -> Result<(), anyhow::Error> + Send + Sync + 'static>;
+
+pub type Callback = Box<dyn Fn(Arc<pxar::Entry>) -> BoxFuture<'static, ()> + Send + Sync + 'static>;
+
+lazy_static! {
+    static ref DEFAULT_ERROR_HANDLER: Arc<ErrorHandler> = Arc::new(Box::new(Err));
+}
+
+/// Options for extracting a pxar archive.
+pub struct PxarExtractOptions {
+    /// The destination directory to which the archive should be extracted.
+    pub destination: PathBuf,
+
+    /// The flags that control what's extracted from the archive.
+    pub feature_flags: Flags,
+
+    /// The flags that control what kind of file system entries may or may not
+    /// be overwritten.
+    pub overwrite_flags: OverwriteFlags,
+
+    /// Whether to allow already existing directories or not.
+    pub allow_existing_dirs: bool,
+
+    /// The initial matching case for extracted entries.
+    pub default_match: bool,
+
+    /// A list of match entries. Each [`MatchEntry`] lets you control which files
+    /// should or shouldn't be extracted.
+    pub match_list: Vec<MatchEntry>,
+
+    /// A boxed closure that's used to handle errors throughout the entire
+    /// extraction process.
+    pub error_handler: Arc<ErrorHandler>,
+
+    /// An optional future that is called whenever a [`pxar::Entry`] matches
+    /// and is about to be extracted.
+    ///
+    /// **Note that this future is spawned as a task and not awaited until completion.**
+    pub callback: Option<Arc<Callback>>,
+}
+
+impl PxarExtractOptions {
+    pub fn builder<D>(destination: D) -> PxarExtractOptionsBuilder
+    where
+        D: Into<Cow<'static, Path>>,
+    {
+        PxarExtractOptionsBuilder::new(destination)
+    }
+}
+
+impl std::fmt::Debug for PxarExtractOptions {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let error_handler_msg = if Arc::ptr_eq(&self.error_handler, &DEFAULT_ERROR_HANDLER) {
+            &"default (no-op)"
+        } else {
+            &"custom"
+        };
+
+        f.debug_struct("PxarExtractOptions")
+            .field("destination", &self.destination)
+            .field("feature_flags", &self.feature_flags)
+            .field("overwrite_flags", &self.overwrite_flags)
+            .field("allow_existing_dirs", &self.allow_existing_dirs)
+            .field("default_match", &self.default_match)
+            .field("match_list", &self.match_list)
+            .field("error_handler", error_handler_msg)
+            .field("callback", &self.callback.is_some())
+            .finish()
+    }
+}
+
+/// This builder is used to configure the behaviour of the [`AsyncExtractor`][extr].
+///
+/// See [`PxarExtractOptions`] for a complete description of all extraction options.
+///
+/// [extr]: extractor::AsyncExtractor
+pub struct PxarExtractOptionsBuilder {
+    destination: Cow<'static, Path>,
+
+    feature_flags: Flags,
+    overwrite_flags: OverwriteFlags,
+    allow_existing_dirs: bool,
+
+    default_match: bool,
+    match_list: Vec<MatchEntry>,
+
+    error_handler: Option<Arc<ErrorHandler>>,
+    callback: Option<Arc<Callback>>,
+}
+
+impl std::fmt::Debug for PxarExtractOptionsBuilder {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PxarExtractOptionsBuilder")
+            .field("destination", &self.destination)
+            .field("feature_flags", &self.feature_flags)
+            .field("overwrite_flags", &self.overwrite_flags)
+            .field("allow_existing_dirs", &self.allow_existing_dirs)
+            .field("default_match", &self.default_match)
+            .field("match_list", &self.match_list)
+            .field("error_handler", &self.error_handler.is_some())
+            .field("callback", &self.callback.is_some())
+            .finish()
+    }
+}
+
+impl PxarExtractOptionsBuilder {
+    pub fn new<D>(destination: D) -> Self
+    where
+        D: Into<Cow<'static, Path>>,
+    {
+        Self {
+            destination: destination.into(),
+            feature_flags: Flags::default(),
+            overwrite_flags: OverwriteFlags::empty(),
+            allow_existing_dirs: false,
+            default_match: true, // entries are considered as matching by default
+            match_list: Default::default(),
+            error_handler: None,
+            callback: None,
+        }
+    }
+
+    pub fn build(&mut self) -> PxarExtractOptions {
+        let error_handler = self
+            .error_handler
+            .as_ref()
+            .unwrap_or(&DEFAULT_ERROR_HANDLER);
+
+        PxarExtractOptions {
+            destination: self.destination.to_path_buf(),
+            feature_flags: self.feature_flags,
+            overwrite_flags: self.overwrite_flags,
+            allow_existing_dirs: self.allow_existing_dirs,
+            default_match: self.default_match,
+            match_list: self.match_list.clone(),
+            error_handler: Arc::clone(error_handler),
+            callback: self.callback.as_ref().map(Arc::clone),
+        }
+    }
+}
+
+impl PxarExtractOptionsBuilder {
+    pub fn feature_flags(&mut self, flags: Flags) -> &mut Self {
+        self.feature_flags = flags;
+        self
+    }
+
+    pub fn overwrite_flags(&mut self, flags: OverwriteFlags) -> &mut Self {
+        self.overwrite_flags = flags;
+        self
+    }
+
+    pub fn allow_existing_dirs(&mut self, value: bool) -> &mut Self {
+        self.allow_existing_dirs = value;
+        self
+    }
+
+    pub fn default_match(&mut self, value: bool) -> &mut Self {
+        self.default_match = value;
+        self
+    }
+
+    pub fn push_match_entry(&mut self, match_entry: MatchEntry) -> &mut Self {
+        self.match_list.push(match_entry);
+        self
+    }
+
+    pub fn push_match_list<T: ToOwned<Owned = Vec<MatchEntry>>>(
+        &mut self,
+        match_list: T,
+    ) -> &mut Self {
+        // for some reason, clippy doesn't like this
+        #[allow(clippy::redundant_clone)]
+        self.match_list.extend(match_list.to_owned());
+        self
+    }
+
+    pub fn error_handler(&mut self, error_handler: ErrorHandler) -> &mut Self {
+        let error_handler = Arc::new(error_handler);
+        self.error_handler_ref(error_handler)
+    }
+
+    pub fn error_handler_ref(&mut self, error_handler: Arc<ErrorHandler>) -> &mut Self {
+        self.error_handler.replace(error_handler);
+        self
+    }
+
+    pub fn callback(&mut self, callback: Callback) -> &mut Self {
+        let callback = Arc::new(callback);
+        self.callback_ref(callback)
+    }
+
+    pub fn callback_ref(&mut self, callback: Arc<Callback>) -> &mut Self {
+        self.callback.replace(callback);
+        self
+    }
+}
diff --git a/pbs-client/src/pxar/aio/extract/raw.rs b/pbs-client/src/pxar/aio/extract/raw.rs
new file mode 100644
index 00000000..d6e20981
--- /dev/null
+++ b/pbs-client/src/pxar/aio/extract/raw.rs
@@ -0,0 +1,503 @@
+use std::os::fd::{AsRawFd, FromRawFd, RawFd};
+use std::path::Path;
+use std::sync::Arc;
+
+use anyhow::{bail, Context, Error, Result};
+use nix::sys::stat::SFlag;
+use nix::{dir::Dir, fcntl::OFlag, sys::stat::Mode};
+
+use pxar::format::Device;
+use tokio::io::AsyncRead;
+use tokio::sync::RwLock;
+
+use proxmox_sys::fs::CreateOptions;
+
+use super::OverwriteFlags;
+use super::PxarExtractContext;
+use super::PxarExtractOptions;
+
+use crate::pxar::aio::dir_stack::{PxarDir, PxarDirStack};
+use crate::pxar::aio::metadata;
+use crate::pxar::aio::worker::SyncTaskWorker;
+
+/// The [`RawAsyncExtractor`] is used to extract individual [`pxar::Entries`][pxar::Entry].
+/// Its purpose is to provide a common underlying mechanism with which an archive is
+/// extracted.
+///
+/// The tracking of directories is based on a stack. [Entering][enter] and [leaving][leave]
+/// directories simply corresponds to [`push()`][push] and [`pop()`][pop]. All remaining
+/// operations do not modify the raw extractor's stack otherwise.
+///
+/// In order to make extraction fully concurrent, an internal worker thread is used
+/// to which synchronous tasks are sent to. This worker thread has to be started manually.
+///
+/// No state is tracked otherwise, which means that it's your own responsibility
+/// to [extract the archive's root][extr_root], as well as [start][w_start] or [stop][w_stop]
+/// the [`RawAsyncExtractor`]'s internal worker thread.
+///
+///
+/// [enter]: Self::enter_directory()
+/// [leave]: Self::leave_directory()
+/// [push]: std::path::PathBuf::push()
+/// [pop]: std::path::PathBuf::pop()
+/// [extr_root]: Self::extract_root_dir
+/// [w_start]: Self::start_worker_thread
+/// [w_stop]: Self::join_worker_thread
+pub struct RawAsyncExtractor {
+    /// A stack of directories used to aid in traversing and extracting the pxar archive.
+    dir_stack: Arc<RwLock<PxarDirStack>>,
+
+    /// Worker thread for synchronous tasks.
+    worker: Option<SyncTaskWorker>,
+}
+
+impl Default for RawAsyncExtractor {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl RawAsyncExtractor {
+    #[inline]
+    pub fn new() -> Self {
+        let dir_stack = Arc::new(RwLock::new(PxarDirStack::new()));
+
+        Self {
+            dir_stack,
+            worker: None,
+        }
+    }
+
+    /// Extract the root directory of a pxar archive to the given [`destination`][dest].
+    ///
+    /// [dest]: PxarExtractOptions::destination
+    pub async fn extract_root_dir(
+        &mut self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+    ) -> Result<(), Error> {
+        let path = options.destination.clone();
+
+        let root_pxar_dir =
+            tokio::task::spawn_blocking(move || Self::do_extract_root_dir(path, entry))
+                .await
+                .context("failed to execute task to create extraction directory")??;
+
+        let dir_name = options
+            .destination
+            .file_name()
+            .context("extraction destination directory has no name")?;
+
+        self.dir_stack.write().await.push(dir_name, root_pxar_dir)
+    }
+
+    #[inline]
+    fn do_extract_root_dir<P: AsRef<Path>>(
+        path: P,
+        entry: Arc<pxar::Entry>,
+    ) -> Result<PxarDir, Error> {
+        const CREATE_OPTS: CreateOptions =
+            CreateOptions::new().perm(Mode::from_bits_truncate(0o700));
+
+        if !entry.is_dir() {
+            bail!("pxar archive does not start with a directory entry!");
+        }
+
+        let path = path.as_ref();
+
+        proxmox_sys::fs::create_path(path, None, Some(CREATE_OPTS))
+            .with_context(|| format!("error creating extraction directory {path:?}"))?;
+
+        let dir = Dir::open(path, OFlag::O_DIRECTORY | OFlag::O_CLOEXEC, Mode::empty())
+            .with_context(|| format!("unable to open extraction directory {path:?}"))?;
+
+        Ok(PxarDir::with_dir(entry, dir))
+    }
+
+    #[inline]
+    pub fn start_worker_thread(&mut self, options: &PxarExtractOptions) -> Result<(), Error> {
+        // When encountering lots of symlinks or hardlinks, the queue may become quite large
+        const WORKER_QUEUE_SIZE: usize = 5000;
+
+        if self.worker.is_some() {
+            bail!("worker thread already started")
+        }
+
+        let error_handler = Arc::clone(&options.error_handler);
+
+        self.worker.replace(SyncTaskWorker::with_error_handler(
+            WORKER_QUEUE_SIZE,
+            error_handler,
+        ));
+
+        Ok(())
+    }
+
+    #[inline]
+    pub fn join_worker_thread(&mut self) -> Result<(), Error> {
+        if let Some(mut worker) = self.worker.take() {
+            worker.join()
+        } else {
+            bail!("worker thread already joined")
+        }
+    }
+
+    /// Convenience wrapper to send a task to the [`SyncTaskWorker`].
+    #[inline]
+    async fn send_to_worker<F>(&self, task: F) -> Result<(), Error>
+    where
+        F: FnOnce() -> Result<(), Error> + Send + 'static,
+    {
+        if let Some(worker) = self.worker.as_ref() {
+            worker.send(task).await
+        } else {
+            bail!("failed to send task to worker - worker already finished")
+        }
+    }
+}
+
+// Extraction operations for each kind of pxar entry.
+impl RawAsyncExtractor {
+    #[inline]
+    fn parent_fd(dir_stack: &RwLock<PxarDirStack>, allow_existing: bool) -> Result<RawFd, Error> {
+        dir_stack
+            .blocking_write()
+            .last_dir_fd(allow_existing)
+            .map(|fd| fd.as_raw_fd())
+    }
+
+    pub async fn enter_directory(
+        &mut self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        do_create: bool,
+    ) -> Result<(), Error> {
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let allow_existing = options.allow_existing_dirs;
+
+        let task = move || {
+            {
+                let pxar_dir = PxarDir::new(Arc::clone(&entry));
+                let mut locked_dir_stack = dir_stack.blocking_write();
+
+                locked_dir_stack.push(entry.file_name(), pxar_dir)?;
+
+                if do_create {
+                    locked_dir_stack.last_dir_fd(allow_existing).map(drop)?;
+                }
+
+                Ok::<(), Error>(())
+            }
+            .context(PxarExtractContext::EnterDirectory)
+        };
+
+        self.send_to_worker(task).await
+    }
+
+    pub async fn leave_directory(&mut self, options: &PxarExtractOptions) -> Result<(), Error> {
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let error_handler = Arc::clone(&options.error_handler);
+        let flags = options.feature_flags;
+
+        let task = move || {
+            {
+                let pxar_dir = dir_stack
+                    .blocking_write()
+                    .pop()
+                    .context("unexpected end of directory stack")?;
+
+                let fd = pxar_dir.try_as_borrowed_fd().map(|fd| fd.as_raw_fd());
+                let (entry, dir) = pxar_dir.into_inner();
+
+                if let Some(fd) = fd {
+                    metadata::apply(flags, &entry, fd, &error_handler)?;
+                }
+
+                // dropping `dir` closes it - we want that to happen after metadata was applied
+                drop(dir);
+                Ok::<(), Error>(())
+            }
+            .context(PxarExtractContext::LeaveDirectory)
+        };
+
+        self.send_to_worker(task).await
+    }
+
+    pub async fn extract_symlink(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+    ) -> Result<(), Error> {
+        use pxar::EntryKind::Symlink;
+
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let error_handler = Arc::clone(&options.error_handler);
+
+        let allow_existing = options.allow_existing_dirs;
+        let feature_flags = options.feature_flags;
+
+        let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::SYMLINK);
+
+        let task = move || {
+            {
+                let link = match entry.kind() {
+                    Symlink(link) => link,
+                    _ => bail!("received invalid entry kind while trying to extract symlink"),
+                };
+
+                let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+                let result =
+                    nix::unistd::symlinkat(link.as_os_str(), Some(parent_fd), entry.file_name());
+
+                match result {
+                    Ok(()) => {}
+                    Err(nix::errno::Errno::EEXIST) if do_overwrite => {
+                        // Never unlink directories
+                        let flag = nix::unistd::UnlinkatFlags::NoRemoveDir;
+                        nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?;
+
+                        nix::unistd::symlinkat(
+                            link.as_os_str(),
+                            Some(parent_fd),
+                            entry.file_name(),
+                        )?;
+                    }
+                    Err(error) => return Err(error.into()),
+                }
+
+                metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler)
+            }
+            .context(PxarExtractContext::ExtractSymlink)
+        };
+
+        self.send_to_worker(task)
+            .await
+            .context(PxarExtractContext::ExtractSymlink)
+    }
+
+    pub async fn extract_hardlink(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+    ) -> Result<(), Error> {
+        use pxar::EntryKind::Hardlink;
+
+        let dir_stack = Arc::clone(&self.dir_stack);
+
+        let allow_existing = options.allow_existing_dirs;
+        let do_overwrite = options.overwrite_flags.contains(OverwriteFlags::HARDLINK);
+
+        let task = move || {
+            {
+                let link = match entry.kind() {
+                    Hardlink(link) => {
+                        if !AsRef::<Path>::as_ref(link.as_os_str()).is_relative() {
+                            bail!("received absolute path while trying to extract hardlink")
+                        } else {
+                            link.as_os_str().to_owned()
+                        }
+                    }
+                    _ => bail!("received invalid entry kind while trying to extract hardlink"),
+                };
+
+                let root_fd = dir_stack.blocking_read().root_dir_fd()?.as_raw_fd();
+                let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+                let make_link = || {
+                    nix::unistd::linkat(
+                        Some(root_fd),
+                        link.as_os_str(),
+                        Some(parent_fd),
+                        entry.file_name(),
+                        nix::unistd::LinkatFlags::NoSymlinkFollow,
+                    )
+                };
+
+                match make_link() {
+                    Err(nix::Error::EEXIST) if do_overwrite => {
+                        let flag = nix::unistd::UnlinkatFlags::NoRemoveDir;
+                        nix::unistd::unlinkat(Some(parent_fd), entry.file_name(), flag)?;
+                        make_link()
+                    }
+                    result => result,
+                }
+            }
+            .context(PxarExtractContext::ExtractHardlink)
+        };
+
+        self.send_to_worker(task)
+            .await
+            .context(PxarExtractContext::ExtractHardlink)
+    }
+
+    pub async fn extract_device(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: Device,
+    ) -> Result<(), Error> {
+        self.extract_special(entry, options, device.to_dev_t())
+            .await
+            .context(PxarExtractContext::ExtractDevice)
+    }
+
+    pub async fn extract_fifo(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: nix::sys::stat::dev_t,
+    ) -> Result<(), Error> {
+        self.extract_special(entry, options, device)
+            .await
+            .context(PxarExtractContext::ExtractFifo)
+    }
+
+    pub async fn extract_socket(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: nix::sys::stat::dev_t,
+    ) -> Result<(), Error> {
+        self.extract_special(entry, options, device)
+            .await
+            .context(PxarExtractContext::ExtractSocket)
+    }
+
+    #[inline(always)]
+    async fn extract_special(
+        &self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        device: nix::sys::stat::dev_t,
+    ) -> Result<(), Error> {
+        let dir_stack = Arc::clone(&self.dir_stack);
+        let allow_existing = options.allow_existing_dirs;
+
+        let feature_flags = options.feature_flags;
+        let error_handler = Arc::clone(&options.error_handler);
+
+        let task = move || {
+            let mode = metadata::perms_from_metadata(entry.metadata())?;
+
+            let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+            nix::sys::stat::mknodat(parent_fd, entry.file_name(), SFlag::empty(), mode, device)
+                .context("failed to create special device")?;
+
+            metadata::apply_at(feature_flags, &entry, parent_fd, &error_handler)
+                .context("failed to apply metadata to special device")
+        };
+
+        self.send_to_worker(task).await
+    }
+
+    pub async fn extract_file<C>(
+        &mut self,
+        entry: Arc<pxar::Entry>,
+        options: &PxarExtractOptions,
+        contents: C,
+        size: u64,
+    ) -> Result<(), Error>
+    where
+        C: AsyncRead + Unpin,
+    {
+        let (sender, receiver) = tokio::sync::oneshot::channel();
+
+        let file_creation_task = {
+            let mut oflags = OFlag::O_CREAT | OFlag::O_WRONLY | OFlag::O_CLOEXEC;
+            if options.overwrite_flags.contains(OverwriteFlags::FILE) {
+                oflags |= OFlag::O_TRUNC;
+            } else {
+                oflags |= OFlag::O_EXCL;
+            }
+
+            let entry = Arc::clone(&entry);
+            let dir_stack = Arc::clone(&self.dir_stack);
+            let error_handler = Arc::clone(&options.error_handler);
+
+            let allow_existing = options.allow_existing_dirs;
+            let feature_flags = options.feature_flags;
+
+            move || {
+                {
+                    let parent_fd = Self::parent_fd(&dir_stack, allow_existing)?;
+
+                    let raw_fd = nix::fcntl::openat(
+                        parent_fd,
+                        entry.file_name(),
+                        oflags,
+                        Mode::from_bits(0o700).unwrap(),
+                    )
+                    .with_context(|| "failed to create file".to_string())?;
+
+                    metadata::apply_initial_flags(feature_flags, &entry, raw_fd, &error_handler)?;
+
+                    let file = unsafe { tokio::fs::File::from_raw_fd(raw_fd) };
+
+                    match sender.send(file) {
+                        Ok(()) => Ok::<(), Error>(()),
+                        Err(_) => bail!("failed to send file descriptor to origin future"),
+                    }
+                }
+                .context(PxarExtractContext::ExtractFile)
+            }
+        };
+
+        // The task is queued for the worker ...
+        self.send_to_worker(file_creation_task)
+            .await
+            .context(PxarExtractContext::ExtractFile)?;
+
+        // ... and its response is then awaited, so other things
+        // may be done in the meantime
+        let mut file = receiver.await.context(
+            "failed to receive raw file descriptor from worker thread - did the thread die?",
+        )?;
+
+        let mut contents = tokio::io::BufReader::new(contents);
+        let copy_result = proxmox_io::sparse_copy_async(&mut contents, &mut file)
+            .await
+            .context(PxarExtractContext::ExtractFile)?;
+
+        if size != copy_result.written {
+            bail!(
+                "extracted {} bytes of a file of {} bytes",
+                copy_result.written,
+                size
+            );
+        }
+
+        let task = {
+            let raw_fd = file.as_raw_fd();
+            let feature_flags = options.feature_flags;
+            let error_handler = Arc::clone(&options.error_handler);
+
+            move || {
+                {
+                    if copy_result.seeked_last {
+                        while match nix::unistd::ftruncate(raw_fd, size as i64) {
+                            Ok(_) => false,
+                            Err(errno) if errno == nix::errno::Errno::EINTR => true,
+                            Err(err) => return Err(err).context("error setting file size"),
+                        } {}
+                    }
+
+                    metadata::apply(feature_flags, &entry, raw_fd, &error_handler)
+                        .with_context(|| "failed to apply metadata to file".to_string())?;
+
+                    // 'file' closes itself when dropped, so we move and drop it here explicitly
+                    // after the remaining work on it was done
+                    drop(file);
+
+                    Ok::<(), Error>(())
+                }
+                .context(PxarExtractContext::ExtractFile)
+            }
+        };
+
+        self.send_to_worker(task)
+            .await
+            .context(PxarExtractContext::ExtractFile)
+    }
+}
diff --git a/pbs-client/src/pxar/aio/metadata.rs b/pbs-client/src/pxar/aio/metadata.rs
new file mode 100644
index 00000000..06927498
--- /dev/null
+++ b/pbs-client/src/pxar/aio/metadata.rs
@@ -0,0 +1,412 @@
+use std::ffi::{CStr, CString};
+use std::os::fd::{AsRawFd, RawFd};
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Context, Error};
+use nix::errno::Errno;
+use nix::fcntl::OFlag;
+use nix::sys::stat::{Mode, UtimensatFlags};
+
+use proxmox_sys::c_result;
+use proxmox_sys::error::SysError;
+use proxmox_sys::fs::{self, acl, xattr};
+use pxar::Metadata;
+
+use super::ErrorHandler;
+use crate::pxar::Flags;
+
+// NOTE: The functions here are equivalent to crate::pxar::metadata and have
+// only been adapted to take a pxar::Entry directly.
+
+//
+// utility functions
+//
+
+fn allow_notsupp<E: SysError>(err: E) -> Result<(), E> {
+    if err.is_errno(Errno::EOPNOTSUPP) {
+        Ok(())
+    } else {
+        Err(err)
+    }
+}
+
+fn allow_notsupp_remember<E: SysError>(err: E, not_supp: &mut bool) -> Result<(), E> {
+    if err.is_errno(Errno::EOPNOTSUPP) {
+        *not_supp = true;
+        Ok(())
+    } else {
+        Err(err)
+    }
+}
+
+fn timestamp_to_update_timespec(mtime: &pxar::format::StatxTimestamp) -> [libc::timespec; 2] {
+    // restore mtime
+    const UTIME_OMIT: i64 = (1 << 30) - 2;
+
+    [
+        libc::timespec {
+            tv_sec: 0,
+            tv_nsec: UTIME_OMIT,
+        },
+        libc::timespec {
+            tv_sec: mtime.secs,
+            tv_nsec: mtime.nanos as _,
+        },
+    ]
+}
+
+/// Get the file permissions as `nix::Mode`
+pub fn perms_from_metadata(meta: &Metadata) -> Result<Mode, Error> {
+    let mode = meta.stat.get_permission_bits();
+
+    u32::try_from(mode)
+        .context("couldn't narrow permission bits")
+        .and_then(|mode| {
+            Mode::from_bits(mode)
+                .with_context(|| format!("mode contains illegal bits: 0x{:x} (0o{:o})", mode, mode))
+        })
+}
+
+//
+// metadata application:
+//
+
+pub fn apply_initial_flags(
+    feature_flags: Flags,
+    entry: &Arc<pxar::Entry>,
+    fd: RawFd,
+    error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+    let entry_flags = Flags::from_bits_truncate(entry.metadata().stat.flags);
+
+    apply_chattr(
+        fd,
+        entry_flags.to_initial_chattr(),
+        feature_flags.to_initial_chattr(),
+    )
+    .or_else(&**error_handler)
+}
+
+pub fn apply_at(
+    feature_flags: Flags,
+    entry: &Arc<pxar::Entry>,
+    parent: RawFd,
+    error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+    let fd = proxmox_sys::fd::openat(
+        &parent,
+        entry.file_name(),
+        OFlag::O_PATH | OFlag::O_CLOEXEC | OFlag::O_NOFOLLOW,
+        Mode::empty(),
+    )
+    .context("failed to open parent file descriptor")?;
+
+    apply(feature_flags, entry, fd.as_raw_fd(), error_handler)
+}
+
+pub fn apply(
+    feature_flags: Flags,
+    entry: &Arc<pxar::Entry>,
+    fd: RawFd,
+    error_handler: &Arc<ErrorHandler>,
+) -> Result<(), Error> {
+    let metadata = entry.metadata();
+    let c_proc_path = CString::new(format!("/proc/self/fd/{}", fd)).unwrap();
+
+    apply_ownership(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?;
+
+    let mut skip_xattrs = false;
+    apply_xattrs(feature_flags, &c_proc_path, metadata, &mut skip_xattrs)
+        .or_else(&**error_handler)?;
+
+    add_fcaps(feature_flags, &c_proc_path, metadata, &mut skip_xattrs).or_else(&**error_handler)?;
+
+    apply_acls(feature_flags, &c_proc_path, metadata).or_else(&**error_handler)?;
+
+    apply_quota_project_id(feature_flags, fd, metadata).or_else(&**error_handler)?;
+
+    // Finally mode and time. We may lose access with mode, but the changing the mode also
+    // affects times.
+    if !metadata.is_symlink() && feature_flags.contains(Flags::WITH_PERMISSIONS) {
+        let mode = perms_from_metadata(metadata)?;
+        nix::sys::stat::fchmod(fd, mode)
+            .or_else(allow_notsupp)
+            .context("failed to change file mode")
+            .or_else(&**error_handler)?;
+    }
+
+    let [atime, mtime] = timestamp_to_update_timespec(&metadata.stat.mtime);
+    let res = nix::sys::stat::utimensat(
+        None,
+        c_proc_path.as_ref(),
+        &atime.into(),
+        &mtime.into(),
+        UtimensatFlags::FollowSymlink,
+    );
+
+    match res {
+        Ok(_) => (),
+        Err(ref err) if err.is_errno(Errno::EOPNOTSUPP) => (),
+        Err(err) => {
+            let err = format_err!(err).context("failed to restore mtime attribute");
+            error_handler(err)?;
+        }
+    }
+
+    if metadata.stat.flags != 0 {
+        apply_flags(feature_flags, fd, metadata.stat.flags).or_else(&**error_handler)?;
+    }
+
+    Ok(())
+}
+
+fn apply_ownership(
+    feature_flags: Flags,
+    c_proc_path: &CStr,
+    metadata: &Metadata,
+) -> Result<(), Error> {
+    if !feature_flags.contains(Flags::WITH_OWNER) {
+        return Ok(());
+    }
+
+    // UID and GID first, as this fails if we lose access anyway.
+    nix::unistd::chown(
+        c_proc_path,
+        Some(metadata.stat.uid.into()),
+        Some(metadata.stat.gid.into()),
+    )
+    .or_else(allow_notsupp)
+    .context("failed to apply ownership")
+}
+
+fn add_fcaps(
+    feature_flags: Flags,
+    c_proc_path: &CStr,
+    metadata: &Metadata,
+    skip_xattrs: &mut bool,
+) -> Result<(), Error> {
+    if *skip_xattrs || !feature_flags.contains(Flags::WITH_FCAPS) {
+        return Ok(());
+    }
+    let fcaps = match metadata.fcaps.as_ref() {
+        Some(fcaps) => fcaps,
+        None => return Ok(()),
+    };
+
+    c_result!(unsafe {
+        libc::setxattr(
+            c_proc_path.as_ptr(),
+            xattr::xattr_name_fcaps().as_ptr(),
+            fcaps.data.as_ptr() as *const libc::c_void,
+            fcaps.data.len(),
+            0,
+        )
+    })
+    .map(drop)
+    .or_else(|err| allow_notsupp_remember(err, skip_xattrs))
+    .context("failed to apply file capabilities")
+}
+
+fn apply_xattrs(
+    feature_flags: Flags,
+    c_proc_path: &CStr,
+    metadata: &Metadata,
+    skip_xattrs: &mut bool,
+) -> Result<(), Error> {
+    if *skip_xattrs || !feature_flags.contains(Flags::WITH_XATTRS) {
+        return Ok(());
+    }
+
+    for xattr in &metadata.xattrs {
+        if *skip_xattrs {
+            return Ok(());
+        }
+
+        if !xattr::is_valid_xattr_name(xattr.name()) {
+            log::info!("skipping invalid xattr named {:?}", xattr.name());
+            continue;
+        }
+
+        c_result!(unsafe {
+            libc::setxattr(
+                c_proc_path.as_ptr(),
+                xattr.name().as_ptr() as *const libc::c_char,
+                xattr.value().as_ptr() as *const libc::c_void,
+                xattr.value().len(),
+                0,
+            )
+        })
+        .map(drop)
+        .or_else(|err| allow_notsupp_remember(err, &mut *skip_xattrs))
+        .context("failed to apply extended attributes")?;
+    }
+
+    Ok(())
+}
+
+fn apply_acls(feature_flags: Flags, c_proc_path: &CStr, metadata: &Metadata) -> Result<(), Error> {
+    if !feature_flags.contains(Flags::WITH_ACL) || metadata.acl.is_empty() {
+        return Ok(());
+    }
+
+    let mut acl = acl::ACL::init(5)?;
+
+    // acl type access:
+    acl.add_entry_full(
+        acl::ACL_USER_OBJ,
+        None,
+        acl::mode_user_to_acl_permissions(metadata.stat.mode),
+    )?;
+
+    acl.add_entry_full(
+        acl::ACL_OTHER,
+        None,
+        acl::mode_other_to_acl_permissions(metadata.stat.mode),
+    )?;
+
+    match metadata.acl.group_obj.as_ref() {
+        Some(group_obj) => {
+            acl.add_entry_full(
+                acl::ACL_MASK,
+                None,
+                acl::mode_group_to_acl_permissions(metadata.stat.mode),
+            )?;
+            acl.add_entry_full(acl::ACL_GROUP_OBJ, None, group_obj.permissions.0)?;
+        }
+        None => {
+            let mode = acl::mode_group_to_acl_permissions(metadata.stat.mode);
+
+            acl.add_entry_full(acl::ACL_GROUP_OBJ, None, mode)?;
+
+            if !metadata.acl.users.is_empty() || !metadata.acl.groups.is_empty() {
+                log::warn!("Warning: Missing GROUP_OBJ entry in ACL, resetting to value of MASK");
+                acl.add_entry_full(acl::ACL_MASK, None, mode)?;
+            }
+        }
+    }
+
+    for user in &metadata.acl.users {
+        acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?;
+    }
+
+    for group in &metadata.acl.groups {
+        acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?;
+    }
+
+    if !acl.is_valid() {
+        bail!("Error while restoring ACL - ACL invalid");
+    }
+
+    acl.set_file(c_proc_path, acl::ACL_TYPE_ACCESS)?;
+    drop(acl);
+
+    // acl type default:
+    if let Some(default) = metadata.acl.default.as_ref() {
+        let mut acl = acl::ACL::init(5)?;
+
+        acl.add_entry_full(acl::ACL_USER_OBJ, None, default.user_obj_permissions.0)?;
+
+        acl.add_entry_full(acl::ACL_GROUP_OBJ, None, default.group_obj_permissions.0)?;
+
+        acl.add_entry_full(acl::ACL_OTHER, None, default.other_permissions.0)?;
+
+        if default.mask_permissions != pxar::format::acl::Permissions::NO_MASK {
+            acl.add_entry_full(acl::ACL_MASK, None, default.mask_permissions.0)?;
+        }
+
+        for user in &metadata.acl.default_users {
+            acl.add_entry_full(acl::ACL_USER, Some(user.uid), user.permissions.0)?;
+        }
+
+        for group in &metadata.acl.default_groups {
+            acl.add_entry_full(acl::ACL_GROUP, Some(group.gid), group.permissions.0)?;
+        }
+
+        if !acl.is_valid() {
+            bail!("Error while restoring ACL - ACL invalid");
+        }
+
+        acl.set_file(c_proc_path, acl::ACL_TYPE_DEFAULT)?;
+    }
+
+    Ok(())
+}
+
+fn apply_quota_project_id(
+    feature_flags: Flags,
+    fd: RawFd,
+    metadata: &Metadata,
+) -> Result<(), Error> {
+    if !feature_flags.contains(Flags::WITH_QUOTA_PROJID) {
+        return Ok(());
+    }
+
+    let projid = match metadata.quota_project_id {
+        Some(projid) => projid,
+        None => return Ok(()),
+    };
+
+    let mut fsxattr = fs::FSXAttr::default();
+    unsafe {
+        fs::fs_ioc_fsgetxattr(fd, &mut fsxattr)
+            .context("error while getting fsxattr to restore quota project id")?;
+
+        fsxattr.fsx_projid = projid.projid as u32;
+
+        fs::fs_ioc_fssetxattr(fd, &fsxattr)
+            .context("error while setting fsxattr to restore quota project id")?;
+    }
+
+    Ok(())
+}
+
+fn errno_is_unsupported(errno: Errno) -> bool {
+    matches!(
+        errno,
+        Errno::ENOTTY | Errno::ENOSYS | Errno::EBADF | Errno::EOPNOTSUPP | Errno::EINVAL
+    )
+}
+
+fn apply_chattr(fd: RawFd, chattr: libc::c_long, mask: libc::c_long) -> Result<(), Error> {
+    if chattr == 0 {
+        return Ok(());
+    }
+
+    let mut fattr: libc::c_long = 0;
+    match unsafe { fs::read_attr_fd(fd, &mut fattr) } {
+        Ok(_) => (),
+        Err(errno) if errno_is_unsupported(errno) => {
+            return Ok(());
+        }
+        Err(err) => return Err(err).context("failed to read file attributes"),
+    }
+
+    let attr = (chattr & mask) | (fattr & !mask);
+
+    if attr == fattr {
+        return Ok(());
+    }
+
+    match unsafe { fs::write_attr_fd(fd, &attr) } {
+        Ok(_) => Ok(()),
+        Err(errno) if errno_is_unsupported(errno) => Ok(()),
+        Err(err) => Err(err).context("failed to set file attributes"),
+    }
+}
+
+fn apply_flags(feature_flags: Flags, fd: RawFd, entry_flags: u64) -> Result<(), Error> {
+    let entry_flags = Flags::from_bits_truncate(entry_flags);
+
+    apply_chattr(fd, entry_flags.to_chattr(), feature_flags.to_chattr())?;
+
+    let fatattr = (feature_flags & entry_flags).to_fat_attr();
+    if fatattr != 0 {
+        match unsafe { fs::write_fat_attr_fd(fd, &fatattr) } {
+            Ok(_) => (),
+            Err(errno) if errno_is_unsupported(errno) => (),
+            Err(err) => return Err(err).context("failed to set file FAT attributes"),
+        }
+    }
+
+    Ok(())
+}
diff --git a/pbs-client/src/pxar/aio/mod.rs b/pbs-client/src/pxar/aio/mod.rs
new file mode 100644
index 00000000..dfa8d84a
--- /dev/null
+++ b/pbs-client/src/pxar/aio/mod.rs
@@ -0,0 +1,11 @@
+mod dir_stack;
+
+pub(crate) mod extract;
+pub use extract::{
+    AsyncExtractor, Callback, ErrorHandler, PxarExtractOptions, PxarExtractOptionsBuilder,
+    RawAsyncExtractor,
+};
+
+mod metadata;
+
+mod worker;
diff --git a/pbs-client/src/pxar/aio/worker.rs b/pbs-client/src/pxar/aio/worker.rs
new file mode 100644
index 00000000..791e3129
--- /dev/null
+++ b/pbs-client/src/pxar/aio/worker.rs
@@ -0,0 +1,167 @@
+#![allow(unused)]
+use std::sync::{Arc, Mutex};
+use std::thread::{self, JoinHandle};
+
+use anyhow::{bail, Error};
+use tokio::sync::mpsc;
+
+use super::ErrorHandler;
+
+type WorkerTask = Box<dyn FnOnce() -> Result<(), Error> + Send>;
+
+/// [`SyncTaskWorker`] is a wrapper around a [`Thread`][std::thread::Thread]
+/// with the purpose of running synchronous tasks provided by a queue.
+///
+/// A task is anything that implements [`FnOnce() -> Result<(), Error>`]
+/// and can be sent asynchronously to the worker via [`send()`].
+///
+/// As soon as a task returns an [`Error`], the worker will drop its channel,
+/// discarding any remaining tasks. Errors therefore either ought to be handled
+/// inside tasks themselves, or with an [explicitly provided][eh] [`ErrorHandler`].
+///
+/// [eh]: Self::with_error_handler
+pub(crate) struct SyncTaskWorker {
+    sender: Option<mpsc::Sender<WorkerTask>>,
+    join_handle: Option<Box<JoinHandle<()>>>,
+    task_error: Arc<Mutex<Option<Error>>>,
+}
+
+impl SyncTaskWorker {
+    pub fn new(queue_size: usize) -> Self {
+        Self::new_inner(queue_size, Arc::new(Box::new(Err)))
+    }
+
+    pub fn with_error_handler(queue_size: usize, error_handler: Arc<ErrorHandler>) -> Self {
+        Self::new_inner(queue_size, error_handler)
+    }
+
+    fn new_inner(queue_size: usize, error_handler: Arc<ErrorHandler>) -> Self {
+        let (sender, receiver) = mpsc::channel(queue_size);
+
+        let task_error = Arc::new(Mutex::new(None));
+
+        let worker_loop = {
+            let mut receiver = receiver;
+            let last_error = Arc::clone(&task_error);
+            let error_handler = Arc::clone(&error_handler);
+
+            move || loop {
+                let function: WorkerTask = if let Some(function) = receiver.blocking_recv() {
+                    function
+                } else {
+                    return;
+                };
+
+                let result = (function)().or_else(|error| error_handler(error));
+
+                match result {
+                    Ok(()) => (),
+                    Err(error) => {
+                        let mut guard = last_error.lock().unwrap();
+                        if guard.is_none() {
+                            let error = error
+                                .context("extractor worker thread encountered unexpected error");
+                            *guard = Some(error);
+                        }
+
+                        drop(receiver);
+                        return;
+                    }
+                }
+            }
+        };
+
+        let join_handle = thread::spawn(worker_loop);
+
+        Self {
+            sender: Some(sender),
+            join_handle: Some(Box::new(join_handle)),
+            task_error,
+        }
+    }
+
+    #[inline]
+    fn check_for_error(&self) -> Result<(), Error> {
+        let mut guard = self.task_error.lock().unwrap();
+        if let Some(error) = guard.take() {
+            Err(error)
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Send a task to the worker to execute.
+    ///
+    /// In case any of the worker thread's previous tasks failed, its error
+    /// will be returned. Subsequent calls will not return the same error.
+    ///
+    /// Otherwise, this will fail if the worker's channel is already closed or
+    /// if its internal sender has already been dropped.
+    #[inline]
+    pub async fn send<F>(&self, task: F) -> Result<(), Error>
+    where
+        F: FnOnce() -> Result<(), Error> + Send + 'static,
+    {
+        self.check_for_error()?;
+
+        if let Some(ref sender) = self.sender {
+            match sender.send(Box::new(task)).await {
+                Ok(()) => Ok(()),
+                Err(_) => {
+                    bail!("failed to send task input to worker - channel closed");
+                }
+            }
+        } else {
+            bail!("failed to send task input to worker - sender already dropped");
+        }
+    }
+
+    /// Get the current available capacity of the worker thread's queue.
+    #[inline]
+    pub fn queue_capacity(&self) -> Option<usize> {
+        self.sender.as_ref().map(|s| s.capacity())
+    }
+
+    /// Get the maximum capacity of the worker thread's queue.
+    #[inline]
+    pub fn max_queue_capacity(&self) -> Option<usize> {
+        self.sender.as_ref().map(|s| s.max_capacity())
+    }
+
+    /// Run all outstanding tasks until completion or until a task returns an error.
+    ///
+    /// The first encountered error will be returned, if it exists.
+    ///
+    /// Subsequent calls to [`join()`] will not have any effect.
+    #[inline]
+    pub fn join(&mut self) -> Result<(), Error> {
+        drop(self.sender.take());
+
+        let join_res = self
+            .join_handle
+            .take()
+            .map(|handle| handle.join())
+            .unwrap_or(Ok(()));
+
+        // Can't use `anyhow::Context` on `JoinError`, grr
+        match join_res {
+            Ok(()) => (),
+            Err(_) => bail!("extractor worker thread panicked"),
+        };
+
+        self.check_for_error()?;
+
+        Ok(())
+    }
+}
+
+// [`Drop`] is implemented to ensure the thread is joined appropriately when
+// the worker is dropped.
+//
+// Note however, that this will consume `last_error` (if any), so it's always
+// better to check for errors manually.
+impl Drop for SyncTaskWorker {
+    fn drop(&mut self) {
+        let _ = self.join();
+    }
+}
diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs
index 14674b9b..9060bc0e 100644
--- a/pbs-client/src/pxar/mod.rs
+++ b/pbs-client/src/pxar/mod.rs
@@ -47,6 +47,7 @@
 //! (user, group, acl, ...) because this is already defined by the
 //! linked `ENTRY`.

+pub mod aio;
 pub(crate) mod create;
 pub(crate) mod dir_stack;
 pub(crate) mod extract;
--
2.39.2





^ permalink raw reply	[flat|nested] 4+ messages in thread

* [pbs-devel] [RFC PATCH proxmox-backup 2/2] pxar-bin: Use async instead of sync extractor
  2023-08-28 14:42 [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>` Max Carrara
  2023-08-28 14:42 ` [pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>` Max Carrara
@ 2023-08-28 14:42 ` Max Carrara
  2023-12-15 16:33 ` [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>` Max Carrara
  2 siblings, 0 replies; 4+ messages in thread
From: Max Carrara @ 2023-08-28 14:42 UTC (permalink / raw)
  To: pbs-devel

This commit serves as an example of how the new `AsyncExtractor<T>`
may be used. The extraction options are created using the new
builder patterns as well.

In this case, the sync version can almost be directly swapped in
place with the async version.

Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
 pxar-bin/src/main.rs | 91 +++++++++++++++++++++++---------------------
 1 file changed, 47 insertions(+), 44 deletions(-)

diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index bc044035..6dc7d375 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -12,30 +12,12 @@ use futures::select;
 use tokio::signal::unix::{signal, SignalKind};

 use pathpatterns::{MatchEntry, MatchType, PatternFlag};
-use pbs_client::pxar::{
-    format_single_line_entry, Flags, OverwriteFlags, PxarExtractOptions, ENCODER_MAX_ENTRIES,
-};
+use pbs_client::pxar::aio;
+use pbs_client::pxar::{format_single_line_entry, Flags, OverwriteFlags, ENCODER_MAX_ENTRIES};

 use proxmox_router::cli::*;
 use proxmox_schema::api;

-fn extract_archive_from_reader<R: std::io::Read>(
-    reader: &mut R,
-    target: &str,
-    feature_flags: Flags,
-    options: PxarExtractOptions,
-) -> Result<(), Error> {
-    pbs_client::pxar::extract_archive(
-        pxar::decoder::Decoder::from_std(reader)?,
-        Path::new(target),
-        feature_flags,
-        |path| {
-            log::debug!("{:?}", path);
-        },
-        options,
-    )
-}
-
 #[api(
     input: {
         properties: {
@@ -124,7 +106,7 @@ fn extract_archive_from_reader<R: std::io::Read>(
 )]
 /// Extract an archive.
 #[allow(clippy::too_many_arguments)]
-fn extract_archive(
+async fn extract_archive(
     archive: String,
     pattern: Option<Vec<String>>,
     target: Option<String>,
@@ -193,38 +175,59 @@ fn extract_archive(

     let extract_match_default = match_list.is_empty();

+    // Use the new options builder for convenienve
+    let mut options_builder = aio::PxarExtractOptions::builder(PathBuf::from(target));
+
+    options_builder
+        .feature_flags(feature_flags)
+        .overwrite_flags(overwrite_flags)
+        .allow_existing_dirs(allow_existing_dirs)
+        .default_match(extract_match_default)
+        .push_match_list(match_list);
+
+    // The builder makes it easier to conditionally configure the extractor
     let was_ok = Arc::new(AtomicBool::new(true));
-    let on_error = if strict {
-        // by default errors are propagated up
-        None
-    } else {
+    if strict {
         let was_ok = Arc::clone(&was_ok);
-        // otherwise we want to log them but not act on them
-        Some(Box::new(move |err| {
+        // log errors but don't act upon them
+
+        let error_handler = Box::new(move |error| {
             was_ok.store(false, Ordering::Release);
-            log::error!("error: {}", err);
+            log::error!("error: {}", error);
             Ok(())
-        })
-            as Box<dyn FnMut(Error) -> Result<(), Error> + Send>)
-    };
+        });

-    let options = PxarExtractOptions {
-        match_list: &match_list,
-        allow_existing_dirs,
-        overwrite_flags,
-        extract_match_default,
-        on_error,
-    };
+        options_builder.error_handler(error_handler);
+    }
+
+    let options = options_builder.build();

     if archive == "-" {
-        let stdin = std::io::stdin();
-        let mut reader = stdin.lock();
-        extract_archive_from_reader(&mut reader, target, feature_flags, options)?;
+        let stdin = tokio::io::stdin();
+        let decoder =
+            pxar::decoder::aio::Decoder::from_tokio(tokio::io::BufReader::new(stdin)).await?;
+        let mut extractor = aio::AsyncExtractor::new(decoder, options);
+
+        while let Some(result) = extractor.next().await {
+            if let Err(error) = result {
+                bail!(
+                    "encountered unexpected error during extraction:\n{:?}",
+                    error
+                )
+            }
+        }
     } else {
         log::debug!("PXAR extract: {}", archive);
-        let file = std::fs::File::open(archive)?;
-        let mut reader = std::io::BufReader::new(file);
-        extract_archive_from_reader(&mut reader, target, feature_flags, options)?;
+
+        let mut extractor = aio::AsyncExtractor::with_file(archive, options).await?;
+        while let Some(result) = extractor.next().await {
+            if let Err(error) = result {
+                bail!(
+                    "encountered unexpected error during extraction:\n{:?}",
+                    error
+                )
+            }
+        }
     }

     if !was_ok.load(Ordering::Acquire) {
--
2.39.2





^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>`
  2023-08-28 14:42 [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>` Max Carrara
  2023-08-28 14:42 ` [pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>` Max Carrara
  2023-08-28 14:42 ` [pbs-devel] [RFC PATCH proxmox-backup 2/2] pxar-bin: Use async instead of sync extractor Max Carrara
@ 2023-12-15 16:33 ` Max Carrara
  2 siblings, 0 replies; 4+ messages in thread
From: Max Carrara @ 2023-12-15 16:33 UTC (permalink / raw)
  To: pbs-devel

On 8/28/23 16:42, Max Carrara wrote:
> This RFC proposes an asynchronous implementation of
> `pbs_client::pxar::extract::{Extractor, ExtractorIter}`.

ping - still applies on master




^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2023-12-15 16:34 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-08-28 14:42 [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>` Max Carrara
2023-08-28 14:42 ` [pbs-devel] [RFC PATCH proxmox-backup 1/2] pbs-client: pxar: Add prototype implementation of `AsyncExtractor<T>` Max Carrara
2023-08-28 14:42 ` [pbs-devel] [RFC PATCH proxmox-backup 2/2] pxar-bin: Use async instead of sync extractor Max Carrara
2023-12-15 16:33 ` [pbs-devel] [RFC PATCH proxmox-backup 0/2] Introduce experimental `AsyncExtractor<T>` Max Carrara

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal