public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH 0/2] use async pxar encoder
@ 2021-02-09 12:03 Stefan Reiter
  2021-02-09 12:03 ` [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async Stefan Reiter
  2021-02-09 12:03 ` [pbs-devel] [PATCH proxmox-backup 2/2] asyncify pxar create_archive Stefan Reiter
  0 siblings, 2 replies; 6+ messages in thread
From: Stefan Reiter @ 2021-02-09 12:03 UTC (permalink / raw)
  To: pbs-devel

No change in functionality intended for these two patches, but they will be
needed for my upcoming file-restore series. (or, well, they avoid doing the same
sync-but-in-thread style we follow in PxarBackupStream right now)

Changes the pxar archive creation code to fully embracy async/await and the
aio::Encoder from the pxar crate. Requires some changes in both crates, to
ensure 'SeqWrite + Send' is properly supported.

Testing and benchmarking showed no real difference in behaviour, except for a
very slight slowdown when backing up a folder with many smaller files (I tested
on my local QEMU repo, that's 11.2 GiB of git indices and data/code files atm).
This may be attributable to the introduced Mutex locking, or async/await
overhead - or just a fluke on my machine. 17s without, 19s with the patches.

Folders with fewer, but bigger files are unaffected.

Certainly learned a lot of Rust from this :) Lifetimes are still hard to follow
IMHO, especially with async... Feedback is appreciated of course, if there's
anything that can be done better.


pxar: Stefan Reiter (1):
  make aio::Encoder actually behave with async

 src/encoder/aio.rs  |  48 ++++++++---------
 src/encoder/mod.rs  | 128 +++++++++++++++++++++++---------------------
 src/encoder/sync.rs |  28 ++++------
 3 files changed, 101 insertions(+), 103 deletions(-)

proxmox-backup: Stefan Reiter (1):
  asyncify pxar create_archive

 src/bin/pxar.rs                  |   6 +-
 src/client/pxar_backup_stream.rs |  65 +++++-----
 src/pxar/create.rs               | 207 ++++++++++++++++---------------
 tests/catar.rs                   |   5 +-
 4 files changed, 143 insertions(+), 140 deletions(-)

-- 
2.20.1




^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async
  2021-02-09 12:03 [pbs-devel] [PATCH 0/2] use async pxar encoder Stefan Reiter
@ 2021-02-09 12:03 ` Stefan Reiter
  2021-02-09 14:20   ` Wolfgang Bumiller
  2021-02-17  8:53   ` [pbs-devel] applied: " Wolfgang Bumiller
  2021-02-09 12:03 ` [pbs-devel] [PATCH proxmox-backup 2/2] asyncify pxar create_archive Stefan Reiter
  1 sibling, 2 replies; 6+ messages in thread
From: Stefan Reiter @ 2021-02-09 12:03 UTC (permalink / raw)
  To: pbs-devel

To really use the encoder with async/await, it needs to support
SeqWrite implementations that are Send. This requires changing a whole
bunch of '&mut dyn SeqWrite' trait objects to instead take a 'T:
SeqWrite' generic parameter directly instead. Most of this is quite
straightforward, though incurs a lot of churn (FileImpl needs a generic
parameter now for example).

The trickiest part is returning a new Encoder instance in
create_directory, as the trait object trick with
SeqWrite::as_trait_object doesn't work if SeqWrite is implemented for
generic '&mut S'.

Instead, work with the generic object directly, and express the
owned/borrowed state in the Encoder (to avoid nested borrowing) as an
enum EncoderOutput.

Add to the aio test to ensure the Encoder is now actually useable.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/encoder/aio.rs  |  48 ++++++++---------
 src/encoder/mod.rs  | 128 +++++++++++++++++++++++---------------------
 src/encoder/sync.rs |  28 ++++------
 3 files changed, 101 insertions(+), 103 deletions(-)

diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index f4b96b3..6b90ce5 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -13,12 +13,12 @@ use crate::Metadata;
 ///
 /// This is the `async` version of the `pxar` encoder.
 #[repr(transparent)]
-pub struct Encoder<'a, T: SeqWrite + 'a> {
+pub struct Encoder<'a, T: SeqWrite + 'a + Send> {
     inner: encoder::EncoderImpl<'a, T>,
 }
 
 #[cfg(feature = "tokio-io")]
-impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
+impl<'a, T: tokio::io::AsyncWrite + 'a + Send> Encoder<'a, TokioWriter<T>> {
     /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
     #[inline]
     pub async fn from_tokio(
@@ -44,11 +44,11 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
     }
 }
 
-impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
+impl<'a, T: SeqWrite + 'a + Send> Encoder<'a, T> {
     /// Create an asynchronous encoder for an output implementing our internal write interface.
     pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
         Ok(Self {
-            inner: encoder::EncoderImpl::new(output, metadata).await?,
+            inner: encoder::EncoderImpl::new(output.into(), metadata).await?,
         })
     }
 
@@ -60,7 +60,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         metadata: &Metadata,
         file_name: P,
         file_size: u64,
-    ) -> io::Result<File<'b>>
+    ) -> io::Result<File<'b, T>>
     where
         'a: 'b,
     {
@@ -94,14 +94,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
 
     /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
     /// `finish()` method, otherwise the entire archive will be in an error state.
-    pub async fn create_directory<'b, P: AsRef<Path>>(
-        &'b mut self,
+    pub async fn create_directory<P: AsRef<Path>>(
+        &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
-    where
-        'a: 'b,
-    {
+    ) -> io::Result<Encoder<'_, T>> {
         Ok(Encoder {
             inner: self
                 .inner
@@ -111,15 +108,10 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
     }
 
     /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub async fn finish(self) -> io::Result<T> {
+    pub async fn finish(self) -> io::Result<()> {
         self.inner.finish().await
     }
 
-    /// Cancel this directory and get back the contained writer.
-    pub fn into_writer(self) -> T {
-        self.inner.into_writer()
-    }
-
     /// Add a symbolic link to the archive.
     pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
         &mut self,
@@ -176,11 +168,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
 }
 
 #[repr(transparent)]
-pub struct File<'a> {
-    inner: encoder::FileImpl<'a>,
+pub struct File<'a, S: SeqWrite> {
+    inner: encoder::FileImpl<'a, S>,
 }
 
-impl<'a> File<'a> {
+impl<'a, S: SeqWrite> File<'a, S> {
     /// Get the file offset to be able to reference it with `add_hardlink`.
     pub fn file_offset(&self) -> LinkOffset {
         self.inner.file_offset()
@@ -198,7 +190,7 @@ impl<'a> File<'a> {
 }
 
 #[cfg(feature = "tokio-io")]
-impl<'a> tokio::io::AsyncWrite for File<'a> {
+impl<'a, S: SeqWrite> tokio::io::AsyncWrite for File<'a, S> {
     fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
         unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data)
     }
@@ -294,10 +286,16 @@ mod test {
             let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build())
                 .await
                 .unwrap();
-            encoder
-                .create_directory("baba", &Metadata::dir_builder(0o700).build())
-                .await
-                .unwrap();
+            {
+                let mut dir = encoder
+                    .create_directory("baba", &Metadata::dir_builder(0o700).build())
+                    .await
+                    .unwrap();
+                dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+                    .await
+                    .unwrap();
+            }
+            encoder.finish().await.unwrap();
         };
 
         fn test_send<T: Send>(_: T) {}
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 6ce35e0..428a5c5 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -48,20 +48,13 @@ pub trait SeqWrite {
     ) -> Poll<io::Result<usize>>;
 
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
-
-    /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
-    /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
-    /// subdirectory in that would have a trait object pointing to the trait object, and so on.
-    fn as_trait_object(&mut self) -> &mut dyn SeqWrite
-    where
-        Self: Sized,
-    {
-        self as &mut dyn SeqWrite
-    }
 }
 
 /// Allow using trait objects for generics taking a `SeqWrite`.
-impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
+impl<S> SeqWrite for &mut S
+where
+    S: SeqWrite + ?Sized,
+{
     fn poll_seq_write(
         self: Pin<&mut Self>,
         cx: &mut Context,
@@ -76,13 +69,6 @@ impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
         unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
     }
-
-    fn as_trait_object(&mut self) -> &mut dyn SeqWrite
-    where
-        Self: Sized,
-    {
-        &mut **self
-    }
 }
 
 /// awaitable verison of `poll_seq_write`.
@@ -230,12 +216,38 @@ impl EncoderState {
     }
 }
 
+pub(crate) enum EncoderOutput<'a, T> {
+    Owned(T),
+    Borrowed(&'a mut T),
+}
+
+impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
+    fn as_mut(&mut self) -> &mut T {
+        match self {
+            EncoderOutput::Owned(ref mut o) => o,
+            EncoderOutput::Borrowed(b) => b,
+        }
+    }
+}
+
+impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
+    fn from(t: T) -> Self {
+        EncoderOutput::Owned(t)
+    }
+}
+
+impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
+    fn from(t: &'a mut T) -> Self {
+        EncoderOutput::Borrowed(t)
+    }
+}
+
 /// The encoder state machine implementation for a directory.
 ///
 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
-    output: Option<T>,
+    output: EncoderOutput<'a, T>,
     state: EncoderState,
     parent: Option<&'a mut EncoderState>,
     finished: bool,
@@ -261,12 +273,12 @@ impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
 }
 
 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
-    pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
+    pub(crate) async fn new(output: EncoderOutput<'a, T>, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
         if !metadata.is_dir() {
             io_bail!("directory metadata must contain the directory mode flag");
         }
         let mut this = Self {
-            output: Some(output),
+            output,
             state: EncoderState::default(),
             parent: None,
             finished: false,
@@ -292,7 +304,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         metadata: &Metadata,
         file_name: &Path,
         file_size: u64,
-    ) -> io::Result<FileImpl<'b>>
+    ) -> io::Result<FileImpl<'b, T>>
     where
         'a: 'b,
     {
@@ -305,7 +317,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         metadata: &Metadata,
         file_name: &[u8],
         file_size: u64,
-    ) -> io::Result<FileImpl<'b>>
+    ) -> io::Result<FileImpl<'b, T>>
     where
         'a: 'b,
     {
@@ -318,7 +330,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         header.check_header_size()?;
 
         seq_write_struct(
-            self.output.as_mut().unwrap(),
+            self.output.as_mut(),
             header,
             &mut self.state.write_position,
         )
@@ -329,7 +341,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let meta_size = payload_data_offset - file_offset;
 
         Ok(FileImpl {
-            output: self.output.as_mut().unwrap(),
+            output: self.output.as_mut(),
             goodbye_item: GoodbyeItem {
                 hash: format::hash_filename(file_name),
                 offset: file_offset,
@@ -471,7 +483,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         self.start_file_do(metadata, file_name).await?;
         if let Some((htype, entry_data)) = entry_htype_data {
             seq_write_pxar_entry(
-                self.output.as_mut().unwrap(),
+                self.output.as_mut(),
                 htype,
                 entry_data,
                 &mut self.state.write_position,
@@ -495,14 +507,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         self.state.write_position
     }
 
-    pub async fn create_directory<'b>(
-        &'b mut self,
+    pub async fn create_directory(
+        &mut self,
         file_name: &Path,
         metadata: &Metadata,
-    ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
-    where
-        'a: 'b,
-    {
+    ) -> io::Result<EncoderImpl<'_, T>> {
         self.check()?;
 
         if !metadata.is_dir() {
@@ -523,8 +532,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         // the child will write to OUR state now:
         let write_position = self.position();
 
+        let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
+
         Ok(EncoderImpl {
-            output: self.output.as_mut().map(SeqWrite::as_trait_object),
+            // always forward as Borrowed(), to avoid stacking references on nested calls
+            output: self.output.as_mut().into(),
             state: EncoderState {
                 entry_offset,
                 files_offset,
@@ -535,7 +547,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             },
             parent: Some(&mut self.state),
             finished: false,
-            file_copy_buffer: Arc::clone(&self.file_copy_buffer),
+            file_copy_buffer,
         })
     }
 
@@ -553,7 +565,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
     async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
         seq_write_pxar_struct_entry(
-            self.output.as_mut().unwrap(),
+            self.output.as_mut(),
             format::PXAR_ENTRY,
             metadata.stat.clone(),
             &mut self.state.write_position,
@@ -579,7 +591,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
     async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
         seq_write_pxar_entry(
-            self.output.as_mut().unwrap(),
+            self.output.as_mut(),
             format::PXAR_XATTR,
             &xattr.data,
             &mut self.state.write_position,
@@ -590,7 +602,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
         for acl in &acl.users {
             seq_write_pxar_struct_entry(
-                self.output.as_mut().unwrap(),
+                self.output.as_mut(),
                 format::PXAR_ACL_USER,
                 acl.clone(),
                 &mut self.state.write_position,
@@ -600,7 +612,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         for acl in &acl.groups {
             seq_write_pxar_struct_entry(
-                self.output.as_mut().unwrap(),
+                self.output.as_mut(),
                 format::PXAR_ACL_GROUP,
                 acl.clone(),
                 &mut self.state.write_position,
@@ -610,7 +622,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         if let Some(acl) = &acl.group_obj {
             seq_write_pxar_struct_entry(
-                self.output.as_mut().unwrap(),
+                self.output.as_mut(),
                 format::PXAR_ACL_GROUP_OBJ,
                 acl.clone(),
                 &mut self.state.write_position,
@@ -620,7 +632,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         if let Some(acl) = &acl.default {
             seq_write_pxar_struct_entry(
-                self.output.as_mut().unwrap(),
+                self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT,
                 acl.clone(),
                 &mut self.state.write_position,
@@ -630,7 +642,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         for acl in &acl.default_users {
             seq_write_pxar_struct_entry(
-                self.output.as_mut().unwrap(),
+                self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_USER,
                 acl.clone(),
                 &mut self.state.write_position,
@@ -640,7 +652,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         for acl in &acl.default_groups {
             seq_write_pxar_struct_entry(
-                self.output.as_mut().unwrap(),
+                self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_GROUP,
                 acl.clone(),
                 &mut self.state.write_position,
@@ -653,7 +665,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
     async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
         seq_write_pxar_entry(
-            self.output.as_mut().unwrap(),
+            self.output.as_mut(),
             format::PXAR_FCAPS,
             &fcaps.data,
             &mut self.state.write_position,
@@ -666,7 +678,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         quota_project_id: &format::QuotaProjectId,
     ) -> io::Result<()> {
         seq_write_pxar_struct_entry(
-            self.output.as_mut().unwrap(),
+            self.output.as_mut(),
             format::PXAR_QUOTA_PROJID,
             *quota_project_id,
             &mut self.state.write_position,
@@ -677,7 +689,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
         crate::util::validate_filename(file_name)?;
         seq_write_pxar_entry_zero(
-            self.output.as_mut().unwrap(),
+            self.output.as_mut(),
             format::PXAR_FILENAME,
             file_name,
             &mut self.state.write_position,
@@ -685,10 +697,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         .await
     }
 
-    pub async fn finish(mut self) -> io::Result<T> {
+    pub async fn finish(mut self) -> io::Result<()> {
         let tail_bytes = self.finish_goodbye_table().await?;
         seq_write_pxar_entry(
-            self.output.as_mut().unwrap(),
+            self.output.as_mut(),
             format::PXAR_GOODBYE,
             &tail_bytes,
             &mut self.state.write_position,
@@ -713,11 +725,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             });
         }
         self.finished = true;
-        Ok(self.output.take().unwrap())
-    }
-
-    pub fn into_writer(mut self) -> T {
-        self.output.take().unwrap()
+        Ok(())
     }
 
     async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
@@ -764,8 +772,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 }
 
 /// Writer for a file object in a directory.
-pub struct FileImpl<'a> {
-    output: &'a mut dyn SeqWrite,
+pub struct FileImpl<'a, S: SeqWrite> {
+    output: &'a mut S,
 
     /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
     /// directly instead of on Drop of FileImpl?
@@ -780,7 +788,7 @@ pub struct FileImpl<'a> {
     parent: &'a mut EncoderState,
 }
 
-impl<'a> Drop for FileImpl<'a> {
+impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
     fn drop(&mut self) {
         if self.remaining_size != 0 {
             self.parent.add_error(EncodeError::IncompleteFile);
@@ -790,7 +798,7 @@ impl<'a> Drop for FileImpl<'a> {
     }
 }
 
-impl<'a> FileImpl<'a> {
+impl<'a, S: SeqWrite> FileImpl<'a, S> {
     /// Get the file offset to be able to reference it with `add_hardlink`.
     pub fn file_offset(&self) -> LinkOffset {
         LinkOffset(self.goodbye_item.offset)
@@ -828,7 +836,7 @@ impl<'a> FileImpl<'a> {
     #[cfg(feature = "tokio-io")]
     pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
         unsafe {
-            self.map_unchecked_mut(|this| &mut this.output)
+            self.map_unchecked_mut(|this| this.output)
                 .poll_flush(cx)
         }
     }
@@ -840,7 +848,7 @@ impl<'a> FileImpl<'a> {
     #[cfg(feature = "tokio-io")]
     pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
         unsafe {
-            self.map_unchecked_mut(|this| &mut this.output)
+            self.map_unchecked_mut(|this| this.output)
                 .poll_flush(cx)
         }
     }
@@ -864,14 +872,14 @@ impl<'a> FileImpl<'a> {
     /// Completely write file data for the current file entry in a pxar archive.
     pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
         self.check_remaining(data.len())?;
-        seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?;
+        seq_write_all(self.output, data, &mut self.parent.write_position).await?;
         self.remaining_size -= data.len() as u64;
         Ok(())
     }
 }
 
 #[cfg(feature = "tokio-io")]
-impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
+impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
     fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
         FileImpl::poll_write(self, cx, buf)
     }
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 9ace8bf..859714d 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -52,7 +52,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
     pub fn new(output: T, metadata: &Metadata) -> io::Result<Self> {
         Ok(Self {
-            inner: poll_result_once(encoder::EncoderImpl::new(output, metadata))?,
+            inner: poll_result_once(encoder::EncoderImpl::new(output.into(), metadata))?,
         })
     }
 
@@ -64,7 +64,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         metadata: &Metadata,
         file_name: P,
         file_size: u64,
-    ) -> io::Result<File<'b>>
+    ) -> io::Result<File<'b, T>>
     where
         'a: 'b,
     {
@@ -95,29 +95,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
 
     /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
     /// `finish()` method, otherwise the entire archive will be in an error state.
-    pub fn create_directory<'b, P: AsRef<Path>>(
-        &'b mut self,
+    pub fn create_directory<P: AsRef<Path>>(
+        &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
-    where
-        'a: 'b,
-    {
+    ) -> io::Result<Encoder<'_, T>> {
         Ok(Encoder {
             inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
         })
     }
 
     /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub fn finish(self) -> io::Result<T> {
+    pub fn finish(self) -> io::Result<()> {
         poll_result_once(self.inner.finish())
     }
 
-    /// Cancel this directory and get back the contained writer.
-    pub fn into_writer(self) -> T {
-        self.inner.into_writer()
-    }
-
     /// Add a symbolic link to the archive.
     pub fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
         &mut self,
@@ -174,18 +166,18 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
 }
 
 #[repr(transparent)]
-pub struct File<'a> {
-    inner: encoder::FileImpl<'a>,
+pub struct File<'a, S: SeqWrite> {
+    inner: encoder::FileImpl<'a, S>,
 }
 
-impl<'a> File<'a> {
+impl<'a, S: SeqWrite> File<'a, S> {
     /// Get the file offset to be able to reference it with `add_hardlink`.
     pub fn file_offset(&self) -> LinkOffset {
         self.inner.file_offset()
     }
 }
 
-impl<'a> io::Write for File<'a> {
+impl<'a, S: SeqWrite> io::Write for File<'a, S> {
     fn write(&mut self, data: &[u8]) -> io::Result<usize> {
         poll_result_once(self.inner.write(data))
     }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/2] asyncify pxar create_archive
  2021-02-09 12:03 [pbs-devel] [PATCH 0/2] use async pxar encoder Stefan Reiter
  2021-02-09 12:03 ` [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async Stefan Reiter
@ 2021-02-09 12:03 ` Stefan Reiter
  2021-02-17  9:02   ` [pbs-devel] applied: " Wolfgang Bumiller
  1 sibling, 1 reply; 6+ messages in thread
From: Stefan Reiter @ 2021-02-09 12:03 UTC (permalink / raw)
  To: pbs-devel

...to take advantage of the aio::Encoder from the pxar create.

Rather straightforward conversion, but does require getting rid of
references in the Archiver struct, and thus has to be given the Mutex
for the catalog directly. The callback is boxed.

archive_dir_contents can call itself recursively, and thus needs to
return a boxed future.

Users are adjusted, namely PxarBackupStream is converted to use an
Abortable future instead of a thread so it supports async in its handler
function, and the pxar bin create_archive is converted to an async API
function. One test case is made to just use 'block_on'.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

Requires updated pxar crate.

Long patch, but a lot of changes are just
-call();
+call().await;
or using the catalog mutex.

Probably looks better with -w

 src/bin/pxar.rs                  |   6 +-
 src/client/pxar_backup_stream.rs |  65 +++++-----
 src/pxar/create.rs               | 207 ++++++++++++++++---------------
 tests/catar.rs                   |   5 +-
 4 files changed, 143 insertions(+), 140 deletions(-)

diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs
index 814b3346..d830c570 100644
--- a/src/bin/pxar.rs
+++ b/src/bin/pxar.rs
@@ -295,7 +295,7 @@ fn extract_archive(
 )]
 /// Create a new .pxar archive.
 #[allow(clippy::too_many_arguments)]
-fn create_archive(
+async fn create_archive(
     archive: String,
     source: String,
     verbose: bool,
@@ -376,7 +376,7 @@ fn create_archive(
         dir,
         writer,
         feature_flags,
-        |path| {
+        move |path| {
             if verbose {
                 println!("{:?}", path);
             }
@@ -384,7 +384,7 @@ fn create_archive(
         },
         None,
         options,
-    )?;
+    ).await?;
 
     Ok(())
 }
diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs
index 5fb28fd5..b57061a3 100644
--- a/src/client/pxar_backup_stream.rs
+++ b/src/client/pxar_backup_stream.rs
@@ -4,10 +4,10 @@ use std::path::Path;
 use std::pin::Pin;
 use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
-use std::thread;
 
 use anyhow::{format_err, Error};
 use futures::stream::Stream;
+use futures::future::{Abortable, AbortHandle};
 use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
@@ -21,14 +21,14 @@ use crate::backup::CatalogWriter;
 /// consumer.
 pub struct PxarBackupStream {
     rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
-    child: Option<thread::JoinHandle<()>>,
+    handle: Option<AbortHandle>,
     error: Arc<Mutex<Option<String>>>,
 }
 
 impl Drop for PxarBackupStream {
     fn drop(&mut self) {
         self.rx = None;
-        self.child.take().unwrap().join().unwrap();
+        self.handle.take().unwrap().abort();
     }
 }
 
@@ -43,42 +43,41 @@ impl PxarBackupStream {
         let buffer_size = 256 * 1024;
 
         let error = Arc::new(Mutex::new(None));
-        let child = std::thread::Builder::new()
-            .name("PxarBackupStream".to_string())
-            .spawn({
-                let error = Arc::clone(&error);
-                move || {
-                    let mut catalog_guard = catalog.lock().unwrap();
-                    let writer = std::io::BufWriter::with_capacity(
-                        buffer_size,
-                        crate::tools::StdChannelWriter::new(tx),
-                    );
+        let error2 = Arc::clone(&error);
+        let handler = async move {
+            let writer = std::io::BufWriter::with_capacity(
+                buffer_size,
+                crate::tools::StdChannelWriter::new(tx),
+            );
 
-                    let verbose = options.verbose;
+            let verbose = options.verbose;
 
-                    let writer = pxar::encoder::sync::StandardWriter::new(writer);
-                    if let Err(err) = crate::pxar::create_archive(
-                        dir,
-                        writer,
-                        crate::pxar::Flags::DEFAULT,
-                        |path| {
-                            if verbose {
-                                println!("{:?}", path);
-                            }
-                            Ok(())
-                        },
-                        Some(&mut *catalog_guard),
-                        options,
-                    ) {
-                        let mut error = error.lock().unwrap();
-                        *error = Some(err.to_string());
+            let writer = pxar::encoder::sync::StandardWriter::new(writer);
+            if let Err(err) = crate::pxar::create_archive(
+                dir,
+                writer,
+                crate::pxar::Flags::DEFAULT,
+                move |path| {
+                    if verbose {
+                        println!("{:?}", path);
                     }
-                }
-            })?;
+                    Ok(())
+                },
+                Some(catalog),
+                options,
+            ).await {
+                let mut error = error2.lock().unwrap();
+                *error = Some(err.to_string());
+            }
+        };
+
+        let (handle, registration) = AbortHandle::new_pair();
+        let future = Abortable::new(handler, registration);
+        tokio::spawn(future);
 
         Ok(Self {
             rx: Some(rx),
-            child: Some(child),
+            handle: Some(handle),
             error,
         })
     }
diff --git a/src/pxar/create.rs b/src/pxar/create.rs
index 36de87da..6950b396 100644
--- a/src/pxar/create.rs
+++ b/src/pxar/create.rs
@@ -5,16 +5,19 @@ use std::io::{self, Read, Write};
 use std::os::unix::ffi::OsStrExt;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Error};
 use nix::dir::Dir;
 use nix::errno::Errno;
 use nix::fcntl::OFlag;
 use nix::sys::stat::{FileStat, Mode};
+use futures::future::BoxFuture;
+use futures::FutureExt;
 
 use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
 use pxar::Metadata;
-use pxar::encoder::LinkOffset;
+use pxar::encoder::{SeqWrite, LinkOffset};
 
 use proxmox::c_str;
 use proxmox::sys::error::SysError;
@@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter {
     }
 }
 
-struct Archiver<'a, 'b> {
+struct Archiver {
     feature_flags: Flags,
     fs_feature_flags: Flags,
     fs_magic: i64,
     patterns: Vec<MatchEntry>,
-    callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>,
-    catalog: Option<&'b mut dyn BackupCatalogWriter>,
+    callback: Box<dyn FnMut(&Path) -> Result<(), Error> + Send>,
+    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
     path: PathBuf,
     entry_counter: usize,
     entry_limit: usize,
@@ -147,19 +150,19 @@ struct Archiver<'a, 'b> {
     file_copy_buffer: Vec<u8>,
 }
 
-type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
+type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
 
-pub fn create_archive<T, F>(
+pub async fn create_archive<T, F>(
     source_dir: Dir,
     mut writer: T,
     feature_flags: Flags,
-    mut callback: F,
-    catalog: Option<&mut dyn BackupCatalogWriter>,
+    callback: F,
+    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
     options: PxarCreateOptions,
 ) -> Result<(), Error>
 where
-    T: pxar::encoder::SeqWrite,
-    F: FnMut(&Path) -> Result<(), Error>,
+    T: SeqWrite + Send,
+    F: FnMut(&Path) -> Result<(), Error> + Send + 'static,
 {
     let fs_magic = detect_fs_type(source_dir.as_raw_fd())?;
     if is_virtual_file_system(fs_magic) {
@@ -182,8 +185,7 @@ where
         set.insert(stat.st_dev);
     }
 
-    let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite;
-    let mut encoder = Encoder::new(writer, &metadata)?;
+    let mut encoder = Encoder::new(&mut writer, &metadata).await?;
 
     let mut patterns = options.patterns;
 
@@ -199,7 +201,7 @@ where
         feature_flags,
         fs_feature_flags,
         fs_magic,
-        callback: &mut callback,
+        callback: Box::new(callback),
         patterns,
         catalog,
         path: PathBuf::new(),
@@ -213,8 +215,8 @@ where
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
     };
 
-    archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
-    encoder.finish()?;
+    archiver.archive_dir_contents(&mut encoder, source_dir, true).await?;
+    encoder.finish().await?;
     Ok(())
 }
 
@@ -224,7 +226,7 @@ struct FileListEntry {
     stat: FileStat,
 }
 
-impl<'a, 'b> Archiver<'a, 'b> {
+impl Archiver {
     /// Get the currently effective feature flags. (Requested flags masked by the file system
     /// feature flags).
     fn flags(&self) -> Flags {
@@ -239,49 +241,51 @@ impl<'a, 'b> Archiver<'a, 'b> {
         }
     }
 
-    fn archive_dir_contents(
-        &mut self,
-        encoder: &mut Encoder,
+    fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>(
+        &'a mut self,
+        encoder: &'a mut Encoder<'b, T>,
         mut dir: Dir,
         is_root: bool,
-    ) -> Result<(), Error> {
-        let entry_counter = self.entry_counter;
+    ) -> BoxFuture<'a, Result<(), Error>> {
+        async move {
+            let entry_counter = self.entry_counter;
 
-        let old_patterns_count = self.patterns.len();
-        self.read_pxar_excludes(dir.as_raw_fd())?;
+            let old_patterns_count = self.patterns.len();
+            self.read_pxar_excludes(dir.as_raw_fd())?;
 
-        let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
+            let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
 
-        if is_root && old_patterns_count > 0 {
-            file_list.push(FileListEntry {
-                name: CString::new(".pxarexclude-cli").unwrap(),
-                path: PathBuf::new(),
-                stat: unsafe { std::mem::zeroed() },
-            });
-        }
-
-        let dir_fd = dir.as_raw_fd();
-
-        let old_path = std::mem::take(&mut self.path);
-
-        for file_entry in file_list {
-            let file_name = file_entry.name.to_bytes();
-
-            if is_root && file_name == b".pxarexclude-cli" {
-                self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?;
-                continue;
+            if is_root && old_patterns_count > 0 {
+                file_list.push(FileListEntry {
+                    name: CString::new(".pxarexclude-cli").unwrap(),
+                    path: PathBuf::new(),
+                    stat: unsafe { std::mem::zeroed() },
+                });
             }
 
-            (self.callback)(&file_entry.path)?;
-            self.path = file_entry.path;
-            self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
-                .map_err(|err| self.wrap_err(err))?;
-        }
-        self.path = old_path;
-        self.entry_counter = entry_counter;
-        self.patterns.truncate(old_patterns_count);
+            let dir_fd = dir.as_raw_fd();
 
-        Ok(())
+            let old_path = std::mem::take(&mut self.path);
+
+            for file_entry in file_list {
+                let file_name = file_entry.name.to_bytes();
+
+                if is_root && file_name == b".pxarexclude-cli" {
+                    self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?;
+                    continue;
+                }
+
+                (self.callback)(&file_entry.path)?;
+                self.path = file_entry.path;
+                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await
+                    .map_err(|err| self.wrap_err(err))?;
+            }
+            self.path = old_path;
+            self.entry_counter = entry_counter;
+            self.patterns.truncate(old_patterns_count);
+
+            Ok(())
+        }.boxed()
     }
 
     /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
@@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> {
         Ok(())
     }
 
-    fn encode_pxarexclude_cli(
+    async fn encode_pxarexclude_cli<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         file_name: &CStr,
         patterns_count: usize,
     ) -> Result<(), Error> {
         let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
-
-        if let Some(ref mut catalog) = self.catalog {
-            catalog.add_file(file_name, content.len() as u64, 0)?;
+        if let Some(ref catalog) = self.catalog {
+            catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?;
         }
 
         let mut metadata = Metadata::default();
         metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
 
-        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?;
-        file.write_all(&content)?;
+        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?;
+        file.write_all(&content).await?;
 
         Ok(())
     }
@@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
         Ok(())
     }
 
-    fn add_entry(
+    async fn add_entry<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         parent: RawFd,
         c_file_name: &CStr,
         stat: &FileStat,
@@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> {
 
                 if stat.st_nlink > 1 {
                     if let Some((path, offset)) = self.hardlinks.get(&link_info) {
-                        if let Some(ref mut catalog) = self.catalog {
-                            catalog.add_hardlink(c_file_name)?;
+                        if let Some(ref catalog) = self.catalog {
+                            catalog.lock().unwrap().add_hardlink(c_file_name)?;
                         }
 
-                        encoder.add_hardlink(file_name, path, *offset)?;
+                        encoder.add_hardlink(file_name, path, *offset).await?;
 
                         return Ok(());
                     }
                 }
 
                 let file_size = stat.st_size as u64;
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_file(c_file_name, file_size, stat.st_mtime)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?;
                 }
 
                 let offset: LinkOffset =
-                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?;
+                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?;
 
                 if stat.st_nlink > 1 {
                     self.hardlinks.insert(link_info, (self.path.clone(), offset));
@@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> {
             mode::IFDIR => {
                 let dir = Dir::from_fd(fd.into_raw_fd())?;
 
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.start_directory(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().start_directory(c_file_name)?;
                 }
-                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat);
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.end_directory()?;
+                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().end_directory()?;
                 }
                 result
             }
             mode::IFSOCK => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_socket(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_socket(c_file_name)?;
                 }
 
-                Ok(encoder.add_socket(&metadata, file_name)?)
+                Ok(encoder.add_socket(&metadata, file_name).await?)
             }
             mode::IFIFO => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_fifo(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_fifo(c_file_name)?;
                 }
 
-                Ok(encoder.add_fifo(&metadata, file_name)?)
+                Ok(encoder.add_fifo(&metadata, file_name).await?)
             }
             mode::IFLNK => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_symlink(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_symlink(c_file_name)?;
                 }
 
-                self.add_symlink(encoder, fd, file_name, &metadata)
+                self.add_symlink(encoder, fd, file_name, &metadata).await
             }
             mode::IFBLK => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_block_device(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_block_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, &stat)
+                self.add_device(encoder, file_name, &metadata, &stat).await
             }
             mode::IFCHR => {
-                if let Some(ref mut catalog) = self.catalog {
-                    catalog.add_char_device(c_file_name)?;
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_char_device(c_file_name)?;
                 }
 
-                self.add_device(encoder, file_name, &metadata, &stat)
+                self.add_device(encoder, file_name, &metadata, &stat).await
             }
             other => bail!(
                 "encountered unknown file type: 0x{:x} (0o{:o})",
@@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
         }
     }
 
-    fn add_directory(
+    async fn add_directory<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         dir: Dir,
         dir_name: &CStr,
         metadata: &Metadata,
@@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
     ) -> Result<(), Error> {
         let dir_name = OsStr::from_bytes(dir_name.to_bytes());
 
-        let mut encoder = encoder.create_directory(dir_name, &metadata)?;
+        let mut encoder = encoder.create_directory(dir_name, &metadata).await?;
 
         let old_fs_magic = self.fs_magic;
         let old_fs_feature_flags = self.fs_feature_flags;
@@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> {
             writeln!(self.logger, "skipping mount point: {:?}", self.path)?;
             Ok(())
         } else {
-            self.archive_dir_contents(&mut encoder, dir, false)
+            self.archive_dir_contents(&mut encoder, dir, false).await
         };
 
         self.fs_magic = old_fs_magic;
         self.fs_feature_flags = old_fs_feature_flags;
         self.current_st_dev = old_st_dev;
 
-        encoder.finish()?;
+        encoder.finish().await?;
         result
     }
 
-    fn add_regular_file(
+    async fn add_regular_file<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         fd: Fd,
         file_name: &Path,
         metadata: &Metadata,
@@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
     ) -> Result<LinkOffset, Error> {
         let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
         let mut remaining = file_size;
-        let mut out = encoder.create_file(metadata, file_name, file_size)?;
+        let mut out = encoder.create_file(metadata, file_name, file_size).await?;
         while remaining != 0 {
             let mut got = match file.read(&mut self.file_copy_buffer[..]) {
                 Ok(0) => break,
@@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
                 self.report_file_grew_while_reading()?;
                 got = remaining as usize;
             }
-            out.write_all(&self.file_copy_buffer[..got])?;
+            out.write_all(&self.file_copy_buffer[..got]).await?;
             remaining -= got as u64;
         }
         if remaining > 0 {
@@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
             vec::clear(&mut self.file_copy_buffer[..to_zero]);
             while remaining != 0 {
                 let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
-                out.write_all(&self.file_copy_buffer[..fill])?;
+                out.write_all(&self.file_copy_buffer[..fill]).await?;
                 remaining -= fill as u64;
             }
         }
@@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> {
         Ok(out.file_offset())
     }
 
-    fn add_symlink(
+    async fn add_symlink<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         fd: Fd,
         file_name: &Path,
         metadata: &Metadata,
     ) -> Result<(), Error> {
         let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?;
-        encoder.add_symlink(metadata, file_name, dest)?;
+        encoder.add_symlink(metadata, file_name, dest).await?;
         Ok(())
     }
 
-    fn add_device(
+    async fn add_device<T: SeqWrite + Send>(
         &mut self,
-        encoder: &mut Encoder,
+        encoder: &mut Encoder<'_, T>,
         file_name: &Path,
         metadata: &Metadata,
         stat: &FileStat,
@@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
             metadata,
             file_name,
             pxar::format::Device::from_dev_t(stat.st_rdev),
-        )?)
+        ).await?)
     }
 }
 
diff --git a/tests/catar.rs b/tests/catar.rs
index 2d9dea71..550600c6 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
         ..PxarCreateOptions::default()
     };
 
-    create_archive(
+    let rt = tokio::runtime::Runtime::new().unwrap();
+    rt.block_on(create_archive(
         dir,
         writer,
         Flags::DEFAULT,
         |_| Ok(()),
         None,
         options,
-    )?;
+    ))?;
 
     Command::new("cmp")
         .arg("--verbose")
-- 
2.20.1





^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async
  2021-02-09 12:03 ` [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async Stefan Reiter
@ 2021-02-09 14:20   ` Wolfgang Bumiller
  2021-02-17  8:53   ` [pbs-devel] applied: " Wolfgang Bumiller
  1 sibling, 0 replies; 6+ messages in thread
From: Wolfgang Bumiller @ 2021-02-09 14:20 UTC (permalink / raw)
  To: Stefan Reiter; +Cc: pbs-devel

I'll have an in-depth look another time, just a quick noteinline:

On Tue, Feb 09, 2021 at 01:03:47PM +0100, Stefan Reiter wrote:
> To really use the encoder with async/await, it needs to support
> SeqWrite implementations that are Send. This requires changing a whole
> bunch of '&mut dyn SeqWrite' trait objects to instead take a 'T:
> SeqWrite' generic parameter directly instead. Most of this is quite
> straightforward, though incurs a lot of churn (FileImpl needs a generic
> parameter now for example).
> 
> The trickiest part is returning a new Encoder instance in
> create_directory, as the trait object trick with
> SeqWrite::as_trait_object doesn't work if SeqWrite is implemented for
> generic '&mut S'.
> 
> Instead, work with the generic object directly, and express the
> owned/borrowed state in the Encoder (to avoid nested borrowing) as an
> enum EncoderOutput.
> 
> Add to the aio test to ensure the Encoder is now actually useable.
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>  src/encoder/aio.rs  |  48 ++++++++---------
>  src/encoder/mod.rs  | 128 +++++++++++++++++++++++---------------------
>  src/encoder/sync.rs |  28 ++++------
>  3 files changed, 101 insertions(+), 103 deletions(-)
> 
> diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
> index f4b96b3..6b90ce5 100644
> --- a/src/encoder/aio.rs
> +++ b/src/encoder/aio.rs
> @@ -13,12 +13,12 @@ use crate::Metadata;
>  ///
>  /// This is the `async` version of the `pxar` encoder.
>  #[repr(transparent)]
> -pub struct Encoder<'a, T: SeqWrite + 'a> {
> +pub struct Encoder<'a, T: SeqWrite + 'a + Send> {

^ This requirement should not strictly be necessary for Encoder to
become Send. Send will propagate anyway. (You would only need this if
you wanted to pass it as an `&dyn SeqWrite + Send` later on.

Plus, I do not want to force this to be part of the API. It's very much
possible to want to be able to reference non-Send local data in a
single-threaded executor.




^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pbs-devel] applied: [PATCH pxar 1/2] make aio::Encoder actually behave with async
  2021-02-09 12:03 ` [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async Stefan Reiter
  2021-02-09 14:20   ` Wolfgang Bumiller
@ 2021-02-17  8:53   ` Wolfgang Bumiller
  1 sibling, 0 replies; 6+ messages in thread
From: Wolfgang Bumiller @ 2021-02-17  8:53 UTC (permalink / raw)
  To: Stefan Reiter; +Cc: pbs-devel

Applied but reverted the `+ Send` requirements as they are not
necessary.

On Tue, Feb 09, 2021 at 01:03:47PM +0100, Stefan Reiter wrote:
> To really use the encoder with async/await, it needs to support
> SeqWrite implementations that are Send. This requires changing a whole
> bunch of '&mut dyn SeqWrite' trait objects to instead take a 'T:
> SeqWrite' generic parameter directly instead. Most of this is quite
> straightforward, though incurs a lot of churn (FileImpl needs a generic
> parameter now for example).
> 
> The trickiest part is returning a new Encoder instance in
> create_directory, as the trait object trick with
> SeqWrite::as_trait_object doesn't work if SeqWrite is implemented for
> generic '&mut S'.
> 
> Instead, work with the generic object directly, and express the
> owned/borrowed state in the Encoder (to avoid nested borrowing) as an
> enum EncoderOutput.
> 
> Add to the aio test to ensure the Encoder is now actually useable.
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>  src/encoder/aio.rs  |  48 ++++++++---------
>  src/encoder/mod.rs  | 128 +++++++++++++++++++++++---------------------
>  src/encoder/sync.rs |  28 ++++------
>  3 files changed, 101 insertions(+), 103 deletions(-)
> 
> diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
> index f4b96b3..6b90ce5 100644
> --- a/src/encoder/aio.rs
> +++ b/src/encoder/aio.rs
> @@ -13,12 +13,12 @@ use crate::Metadata;
>  ///
>  /// This is the `async` version of the `pxar` encoder.
>  #[repr(transparent)]
> -pub struct Encoder<'a, T: SeqWrite + 'a> {
> +pub struct Encoder<'a, T: SeqWrite + 'a + Send> {
>      inner: encoder::EncoderImpl<'a, T>,
>  }
>  
>  #[cfg(feature = "tokio-io")]
> -impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
> +impl<'a, T: tokio::io::AsyncWrite + 'a + Send> Encoder<'a, TokioWriter<T>> {
>      /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
>      #[inline]
>      pub async fn from_tokio(
> @@ -44,11 +44,11 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
>      }
>  }
>  
> -impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
> +impl<'a, T: SeqWrite + 'a + Send> Encoder<'a, T> {
>      /// Create an asynchronous encoder for an output implementing our internal write interface.
>      pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
>          Ok(Self {
> -            inner: encoder::EncoderImpl::new(output, metadata).await?,
> +            inner: encoder::EncoderImpl::new(output.into(), metadata).await?,
>          })
>      }
>  
> @@ -60,7 +60,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>          metadata: &Metadata,
>          file_name: P,
>          file_size: u64,
> -    ) -> io::Result<File<'b>>
> +    ) -> io::Result<File<'b, T>>
>      where
>          'a: 'b,
>      {
> @@ -94,14 +94,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>  
>      /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
>      /// `finish()` method, otherwise the entire archive will be in an error state.
> -    pub async fn create_directory<'b, P: AsRef<Path>>(
> -        &'b mut self,
> +    pub async fn create_directory<P: AsRef<Path>>(
> +        &mut self,
>          file_name: P,
>          metadata: &Metadata,
> -    ) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
> -    where
> -        'a: 'b,
> -    {
> +    ) -> io::Result<Encoder<'_, T>> {
>          Ok(Encoder {
>              inner: self
>                  .inner
> @@ -111,15 +108,10 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>      }
>  
>      /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
> -    pub async fn finish(self) -> io::Result<T> {
> +    pub async fn finish(self) -> io::Result<()> {
>          self.inner.finish().await
>      }
>  
> -    /// Cancel this directory and get back the contained writer.
> -    pub fn into_writer(self) -> T {
> -        self.inner.into_writer()
> -    }
> -
>      /// Add a symbolic link to the archive.
>      pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
>          &mut self,
> @@ -176,11 +168,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>  }
>  
>  #[repr(transparent)]
> -pub struct File<'a> {
> -    inner: encoder::FileImpl<'a>,
> +pub struct File<'a, S: SeqWrite> {
> +    inner: encoder::FileImpl<'a, S>,
>  }
>  
> -impl<'a> File<'a> {
> +impl<'a, S: SeqWrite> File<'a, S> {
>      /// Get the file offset to be able to reference it with `add_hardlink`.
>      pub fn file_offset(&self) -> LinkOffset {
>          self.inner.file_offset()
> @@ -198,7 +190,7 @@ impl<'a> File<'a> {
>  }
>  
>  #[cfg(feature = "tokio-io")]
> -impl<'a> tokio::io::AsyncWrite for File<'a> {
> +impl<'a, S: SeqWrite> tokio::io::AsyncWrite for File<'a, S> {
>      fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
>          unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data)
>      }
> @@ -294,10 +286,16 @@ mod test {
>              let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build())
>                  .await
>                  .unwrap();
> -            encoder
> -                .create_directory("baba", &Metadata::dir_builder(0o700).build())
> -                .await
> -                .unwrap();
> +            {
> +                let mut dir = encoder
> +                    .create_directory("baba", &Metadata::dir_builder(0o700).build())
> +                    .await
> +                    .unwrap();
> +                dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
> +                    .await
> +                    .unwrap();
> +            }
> +            encoder.finish().await.unwrap();
>          };
>  
>          fn test_send<T: Send>(_: T) {}
> diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
> index 6ce35e0..428a5c5 100644
> --- a/src/encoder/mod.rs
> +++ b/src/encoder/mod.rs
> @@ -48,20 +48,13 @@ pub trait SeqWrite {
>      ) -> Poll<io::Result<usize>>;
>  
>      fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
> -
> -    /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
> -    /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
> -    /// subdirectory in that would have a trait object pointing to the trait object, and so on.
> -    fn as_trait_object(&mut self) -> &mut dyn SeqWrite
> -    where
> -        Self: Sized,
> -    {
> -        self as &mut dyn SeqWrite
> -    }
>  }
>  
>  /// Allow using trait objects for generics taking a `SeqWrite`.
> -impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
> +impl<S> SeqWrite for &mut S
> +where
> +    S: SeqWrite + ?Sized,
> +{
>      fn poll_seq_write(
>          self: Pin<&mut Self>,
>          cx: &mut Context,
> @@ -76,13 +69,6 @@ impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
>      fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
>          unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
>      }
> -
> -    fn as_trait_object(&mut self) -> &mut dyn SeqWrite
> -    where
> -        Self: Sized,
> -    {
> -        &mut **self
> -    }
>  }
>  
>  /// awaitable verison of `poll_seq_write`.
> @@ -230,12 +216,38 @@ impl EncoderState {
>      }
>  }
>  
> +pub(crate) enum EncoderOutput<'a, T> {
> +    Owned(T),
> +    Borrowed(&'a mut T),
> +}
> +
> +impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
> +    fn as_mut(&mut self) -> &mut T {
> +        match self {
> +            EncoderOutput::Owned(ref mut o) => o,
> +            EncoderOutput::Borrowed(b) => b,
> +        }
> +    }
> +}
> +
> +impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
> +    fn from(t: T) -> Self {
> +        EncoderOutput::Owned(t)
> +    }
> +}
> +
> +impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
> +    fn from(t: &'a mut T) -> Self {
> +        EncoderOutput::Borrowed(t)
> +    }
> +}
> +
>  /// The encoder state machine implementation for a directory.
>  ///
>  /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
>  /// synchronous or `async` I/O objects in as output.
>  pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
> -    output: Option<T>,
> +    output: EncoderOutput<'a, T>,
>      state: EncoderState,
>      parent: Option<&'a mut EncoderState>,
>      finished: bool,
> @@ -261,12 +273,12 @@ impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
>  }
>  
>  impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
> -    pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
> +    pub(crate) async fn new(output: EncoderOutput<'a, T>, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
>          if !metadata.is_dir() {
>              io_bail!("directory metadata must contain the directory mode flag");
>          }
>          let mut this = Self {
> -            output: Some(output),
> +            output,
>              state: EncoderState::default(),
>              parent: None,
>              finished: false,
> @@ -292,7 +304,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          metadata: &Metadata,
>          file_name: &Path,
>          file_size: u64,
> -    ) -> io::Result<FileImpl<'b>>
> +    ) -> io::Result<FileImpl<'b, T>>
>      where
>          'a: 'b,
>      {
> @@ -305,7 +317,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          metadata: &Metadata,
>          file_name: &[u8],
>          file_size: u64,
> -    ) -> io::Result<FileImpl<'b>>
> +    ) -> io::Result<FileImpl<'b, T>>
>      where
>          'a: 'b,
>      {
> @@ -318,7 +330,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          header.check_header_size()?;
>  
>          seq_write_struct(
> -            self.output.as_mut().unwrap(),
> +            self.output.as_mut(),
>              header,
>              &mut self.state.write_position,
>          )
> @@ -329,7 +341,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          let meta_size = payload_data_offset - file_offset;
>  
>          Ok(FileImpl {
> -            output: self.output.as_mut().unwrap(),
> +            output: self.output.as_mut(),
>              goodbye_item: GoodbyeItem {
>                  hash: format::hash_filename(file_name),
>                  offset: file_offset,
> @@ -471,7 +483,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          self.start_file_do(metadata, file_name).await?;
>          if let Some((htype, entry_data)) = entry_htype_data {
>              seq_write_pxar_entry(
> -                self.output.as_mut().unwrap(),
> +                self.output.as_mut(),
>                  htype,
>                  entry_data,
>                  &mut self.state.write_position,
> @@ -495,14 +507,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          self.state.write_position
>      }
>  
> -    pub async fn create_directory<'b>(
> -        &'b mut self,
> +    pub async fn create_directory(
> +        &mut self,
>          file_name: &Path,
>          metadata: &Metadata,
> -    ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
> -    where
> -        'a: 'b,
> -    {
> +    ) -> io::Result<EncoderImpl<'_, T>> {
>          self.check()?;
>  
>          if !metadata.is_dir() {
> @@ -523,8 +532,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          // the child will write to OUR state now:
>          let write_position = self.position();
>  
> +        let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
> +
>          Ok(EncoderImpl {
> -            output: self.output.as_mut().map(SeqWrite::as_trait_object),
> +            // always forward as Borrowed(), to avoid stacking references on nested calls
> +            output: self.output.as_mut().into(),
>              state: EncoderState {
>                  entry_offset,
>                  files_offset,
> @@ -535,7 +547,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>              },
>              parent: Some(&mut self.state),
>              finished: false,
> -            file_copy_buffer: Arc::clone(&self.file_copy_buffer),
> +            file_copy_buffer,
>          })
>      }
>  
> @@ -553,7 +565,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>      async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
>          seq_write_pxar_struct_entry(
> -            self.output.as_mut().unwrap(),
> +            self.output.as_mut(),
>              format::PXAR_ENTRY,
>              metadata.stat.clone(),
>              &mut self.state.write_position,
> @@ -579,7 +591,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>      async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
>          seq_write_pxar_entry(
> -            self.output.as_mut().unwrap(),
> +            self.output.as_mut(),
>              format::PXAR_XATTR,
>              &xattr.data,
>              &mut self.state.write_position,
> @@ -590,7 +602,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>      async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
>          for acl in &acl.users {
>              seq_write_pxar_struct_entry(
> -                self.output.as_mut().unwrap(),
> +                self.output.as_mut(),
>                  format::PXAR_ACL_USER,
>                  acl.clone(),
>                  &mut self.state.write_position,
> @@ -600,7 +612,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>          for acl in &acl.groups {
>              seq_write_pxar_struct_entry(
> -                self.output.as_mut().unwrap(),
> +                self.output.as_mut(),
>                  format::PXAR_ACL_GROUP,
>                  acl.clone(),
>                  &mut self.state.write_position,
> @@ -610,7 +622,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>          if let Some(acl) = &acl.group_obj {
>              seq_write_pxar_struct_entry(
> -                self.output.as_mut().unwrap(),
> +                self.output.as_mut(),
>                  format::PXAR_ACL_GROUP_OBJ,
>                  acl.clone(),
>                  &mut self.state.write_position,
> @@ -620,7 +632,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>          if let Some(acl) = &acl.default {
>              seq_write_pxar_struct_entry(
> -                self.output.as_mut().unwrap(),
> +                self.output.as_mut(),
>                  format::PXAR_ACL_DEFAULT,
>                  acl.clone(),
>                  &mut self.state.write_position,
> @@ -630,7 +642,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>          for acl in &acl.default_users {
>              seq_write_pxar_struct_entry(
> -                self.output.as_mut().unwrap(),
> +                self.output.as_mut(),
>                  format::PXAR_ACL_DEFAULT_USER,
>                  acl.clone(),
>                  &mut self.state.write_position,
> @@ -640,7 +652,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>          for acl in &acl.default_groups {
>              seq_write_pxar_struct_entry(
> -                self.output.as_mut().unwrap(),
> +                self.output.as_mut(),
>                  format::PXAR_ACL_DEFAULT_GROUP,
>                  acl.clone(),
>                  &mut self.state.write_position,
> @@ -653,7 +665,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  
>      async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
>          seq_write_pxar_entry(
> -            self.output.as_mut().unwrap(),
> +            self.output.as_mut(),
>              format::PXAR_FCAPS,
>              &fcaps.data,
>              &mut self.state.write_position,
> @@ -666,7 +678,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          quota_project_id: &format::QuotaProjectId,
>      ) -> io::Result<()> {
>          seq_write_pxar_struct_entry(
> -            self.output.as_mut().unwrap(),
> +            self.output.as_mut(),
>              format::PXAR_QUOTA_PROJID,
>              *quota_project_id,
>              &mut self.state.write_position,
> @@ -677,7 +689,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>      async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
>          crate::util::validate_filename(file_name)?;
>          seq_write_pxar_entry_zero(
> -            self.output.as_mut().unwrap(),
> +            self.output.as_mut(),
>              format::PXAR_FILENAME,
>              file_name,
>              &mut self.state.write_position,
> @@ -685,10 +697,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>          .await
>      }
>  
> -    pub async fn finish(mut self) -> io::Result<T> {
> +    pub async fn finish(mut self) -> io::Result<()> {
>          let tail_bytes = self.finish_goodbye_table().await?;
>          seq_write_pxar_entry(
> -            self.output.as_mut().unwrap(),
> +            self.output.as_mut(),
>              format::PXAR_GOODBYE,
>              &tail_bytes,
>              &mut self.state.write_position,
> @@ -713,11 +725,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>              });
>          }
>          self.finished = true;
> -        Ok(self.output.take().unwrap())
> -    }
> -
> -    pub fn into_writer(mut self) -> T {
> -        self.output.take().unwrap()
> +        Ok(())
>      }
>  
>      async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
> @@ -764,8 +772,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
>  }
>  
>  /// Writer for a file object in a directory.
> -pub struct FileImpl<'a> {
> -    output: &'a mut dyn SeqWrite,
> +pub struct FileImpl<'a, S: SeqWrite> {
> +    output: &'a mut S,
>  
>      /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
>      /// directly instead of on Drop of FileImpl?
> @@ -780,7 +788,7 @@ pub struct FileImpl<'a> {
>      parent: &'a mut EncoderState,
>  }
>  
> -impl<'a> Drop for FileImpl<'a> {
> +impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
>      fn drop(&mut self) {
>          if self.remaining_size != 0 {
>              self.parent.add_error(EncodeError::IncompleteFile);
> @@ -790,7 +798,7 @@ impl<'a> Drop for FileImpl<'a> {
>      }
>  }
>  
> -impl<'a> FileImpl<'a> {
> +impl<'a, S: SeqWrite> FileImpl<'a, S> {
>      /// Get the file offset to be able to reference it with `add_hardlink`.
>      pub fn file_offset(&self) -> LinkOffset {
>          LinkOffset(self.goodbye_item.offset)
> @@ -828,7 +836,7 @@ impl<'a> FileImpl<'a> {
>      #[cfg(feature = "tokio-io")]
>      pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
>          unsafe {
> -            self.map_unchecked_mut(|this| &mut this.output)
> +            self.map_unchecked_mut(|this| this.output)
>                  .poll_flush(cx)
>          }
>      }
> @@ -840,7 +848,7 @@ impl<'a> FileImpl<'a> {
>      #[cfg(feature = "tokio-io")]
>      pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
>          unsafe {
> -            self.map_unchecked_mut(|this| &mut this.output)
> +            self.map_unchecked_mut(|this| this.output)
>                  .poll_flush(cx)
>          }
>      }
> @@ -864,14 +872,14 @@ impl<'a> FileImpl<'a> {
>      /// Completely write file data for the current file entry in a pxar archive.
>      pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
>          self.check_remaining(data.len())?;
> -        seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?;
> +        seq_write_all(self.output, data, &mut self.parent.write_position).await?;
>          self.remaining_size -= data.len() as u64;
>          Ok(())
>      }
>  }
>  
>  #[cfg(feature = "tokio-io")]
> -impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
> +impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
>      fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
>          FileImpl::poll_write(self, cx, buf)
>      }
> diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
> index 9ace8bf..859714d 100644
> --- a/src/encoder/sync.rs
> +++ b/src/encoder/sync.rs
> @@ -52,7 +52,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>      /// not allowed to use the `Waker`, as this will cause a `panic!`.
>      pub fn new(output: T, metadata: &Metadata) -> io::Result<Self> {
>          Ok(Self {
> -            inner: poll_result_once(encoder::EncoderImpl::new(output, metadata))?,
> +            inner: poll_result_once(encoder::EncoderImpl::new(output.into(), metadata))?,
>          })
>      }
>  
> @@ -64,7 +64,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>          metadata: &Metadata,
>          file_name: P,
>          file_size: u64,
> -    ) -> io::Result<File<'b>>
> +    ) -> io::Result<File<'b, T>>
>      where
>          'a: 'b,
>      {
> @@ -95,29 +95,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>  
>      /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
>      /// `finish()` method, otherwise the entire archive will be in an error state.
> -    pub fn create_directory<'b, P: AsRef<Path>>(
> -        &'b mut self,
> +    pub fn create_directory<P: AsRef<Path>>(
> +        &mut self,
>          file_name: P,
>          metadata: &Metadata,
> -    ) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
> -    where
> -        'a: 'b,
> -    {
> +    ) -> io::Result<Encoder<'_, T>> {
>          Ok(Encoder {
>              inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
>          })
>      }
>  
>      /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
> -    pub fn finish(self) -> io::Result<T> {
> +    pub fn finish(self) -> io::Result<()> {
>          poll_result_once(self.inner.finish())
>      }
>  
> -    /// Cancel this directory and get back the contained writer.
> -    pub fn into_writer(self) -> T {
> -        self.inner.into_writer()
> -    }
> -
>      /// Add a symbolic link to the archive.
>      pub fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
>          &mut self,
> @@ -174,18 +166,18 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
>  }
>  
>  #[repr(transparent)]
> -pub struct File<'a> {
> -    inner: encoder::FileImpl<'a>,
> +pub struct File<'a, S: SeqWrite> {
> +    inner: encoder::FileImpl<'a, S>,
>  }
>  
> -impl<'a> File<'a> {
> +impl<'a, S: SeqWrite> File<'a, S> {
>      /// Get the file offset to be able to reference it with `add_hardlink`.
>      pub fn file_offset(&self) -> LinkOffset {
>          self.inner.file_offset()
>      }
>  }
>  
> -impl<'a> io::Write for File<'a> {
> +impl<'a, S: SeqWrite> io::Write for File<'a, S> {
>      fn write(&mut self, data: &[u8]) -> io::Result<usize> {
>          poll_result_once(self.inner.write(data))
>      }
> -- 
> 2.20.1




^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pbs-devel] applied: [PATCH proxmox-backup 2/2] asyncify pxar create_archive
  2021-02-09 12:03 ` [pbs-devel] [PATCH proxmox-backup 2/2] asyncify pxar create_archive Stefan Reiter
@ 2021-02-17  9:02   ` Wolfgang Bumiller
  0 siblings, 0 replies; 6+ messages in thread
From: Wolfgang Bumiller @ 2021-02-17  9:02 UTC (permalink / raw)
  To: Stefan Reiter; +Cc: pbs-devel

applied

On Tue, Feb 09, 2021 at 01:03:48PM +0100, Stefan Reiter wrote:
> ...to take advantage of the aio::Encoder from the pxar create.
> 
> Rather straightforward conversion, but does require getting rid of
> references in the Archiver struct, and thus has to be given the Mutex
> for the catalog directly. The callback is boxed.
> 
> archive_dir_contents can call itself recursively, and thus needs to
> return a boxed future.
> 
> Users are adjusted, namely PxarBackupStream is converted to use an
> Abortable future instead of a thread so it supports async in its handler
> function, and the pxar bin create_archive is converted to an async API
> function. One test case is made to just use 'block_on'.
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
> 
> Requires updated pxar crate.
> 
> Long patch, but a lot of changes are just
> -call();
> +call().await;
> or using the catalog mutex.
> 
> Probably looks better with -w
> 
>  src/bin/pxar.rs                  |   6 +-
>  src/client/pxar_backup_stream.rs |  65 +++++-----
>  src/pxar/create.rs               | 207 ++++++++++++++++---------------
>  tests/catar.rs                   |   5 +-
>  4 files changed, 143 insertions(+), 140 deletions(-)
> 
> diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs
> index 814b3346..d830c570 100644
> --- a/src/bin/pxar.rs
> +++ b/src/bin/pxar.rs
> @@ -295,7 +295,7 @@ fn extract_archive(
>  )]
>  /// Create a new .pxar archive.
>  #[allow(clippy::too_many_arguments)]
> -fn create_archive(
> +async fn create_archive(
>      archive: String,
>      source: String,
>      verbose: bool,
> @@ -376,7 +376,7 @@ fn create_archive(
>          dir,
>          writer,
>          feature_flags,
> -        |path| {
> +        move |path| {
>              if verbose {
>                  println!("{:?}", path);
>              }
> @@ -384,7 +384,7 @@ fn create_archive(
>          },
>          None,
>          options,
> -    )?;
> +    ).await?;
>  
>      Ok(())
>  }
> diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs
> index 5fb28fd5..b57061a3 100644
> --- a/src/client/pxar_backup_stream.rs
> +++ b/src/client/pxar_backup_stream.rs
> @@ -4,10 +4,10 @@ use std::path::Path;
>  use std::pin::Pin;
>  use std::sync::{Arc, Mutex};
>  use std::task::{Context, Poll};
> -use std::thread;
>  
>  use anyhow::{format_err, Error};
>  use futures::stream::Stream;
> +use futures::future::{Abortable, AbortHandle};
>  use nix::dir::Dir;
>  use nix::fcntl::OFlag;
>  use nix::sys::stat::Mode;
> @@ -21,14 +21,14 @@ use crate::backup::CatalogWriter;
>  /// consumer.
>  pub struct PxarBackupStream {
>      rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
> -    child: Option<thread::JoinHandle<()>>,
> +    handle: Option<AbortHandle>,
>      error: Arc<Mutex<Option<String>>>,
>  }
>  
>  impl Drop for PxarBackupStream {
>      fn drop(&mut self) {
>          self.rx = None;
> -        self.child.take().unwrap().join().unwrap();
> +        self.handle.take().unwrap().abort();
>      }
>  }
>  
> @@ -43,42 +43,41 @@ impl PxarBackupStream {
>          let buffer_size = 256 * 1024;
>  
>          let error = Arc::new(Mutex::new(None));
> -        let child = std::thread::Builder::new()
> -            .name("PxarBackupStream".to_string())
> -            .spawn({
> -                let error = Arc::clone(&error);
> -                move || {
> -                    let mut catalog_guard = catalog.lock().unwrap();
> -                    let writer = std::io::BufWriter::with_capacity(
> -                        buffer_size,
> -                        crate::tools::StdChannelWriter::new(tx),
> -                    );
> +        let error2 = Arc::clone(&error);
> +        let handler = async move {
> +            let writer = std::io::BufWriter::with_capacity(
> +                buffer_size,
> +                crate::tools::StdChannelWriter::new(tx),
> +            );
>  
> -                    let verbose = options.verbose;
> +            let verbose = options.verbose;
>  
> -                    let writer = pxar::encoder::sync::StandardWriter::new(writer);
> -                    if let Err(err) = crate::pxar::create_archive(
> -                        dir,
> -                        writer,
> -                        crate::pxar::Flags::DEFAULT,
> -                        |path| {
> -                            if verbose {
> -                                println!("{:?}", path);
> -                            }
> -                            Ok(())
> -                        },
> -                        Some(&mut *catalog_guard),
> -                        options,
> -                    ) {
> -                        let mut error = error.lock().unwrap();
> -                        *error = Some(err.to_string());
> +            let writer = pxar::encoder::sync::StandardWriter::new(writer);
> +            if let Err(err) = crate::pxar::create_archive(
> +                dir,
> +                writer,
> +                crate::pxar::Flags::DEFAULT,
> +                move |path| {
> +                    if verbose {
> +                        println!("{:?}", path);
>                      }
> -                }
> -            })?;
> +                    Ok(())
> +                },
> +                Some(catalog),
> +                options,
> +            ).await {
> +                let mut error = error2.lock().unwrap();
> +                *error = Some(err.to_string());
> +            }
> +        };
> +
> +        let (handle, registration) = AbortHandle::new_pair();
> +        let future = Abortable::new(handler, registration);
> +        tokio::spawn(future);
>  
>          Ok(Self {
>              rx: Some(rx),
> -            child: Some(child),
> +            handle: Some(handle),
>              error,
>          })
>      }
> diff --git a/src/pxar/create.rs b/src/pxar/create.rs
> index 36de87da..6950b396 100644
> --- a/src/pxar/create.rs
> +++ b/src/pxar/create.rs
> @@ -5,16 +5,19 @@ use std::io::{self, Read, Write};
>  use std::os::unix::ffi::OsStrExt;
>  use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
>  use std::path::{Path, PathBuf};
> +use std::sync::{Arc, Mutex};
>  
>  use anyhow::{bail, format_err, Error};
>  use nix::dir::Dir;
>  use nix::errno::Errno;
>  use nix::fcntl::OFlag;
>  use nix::sys::stat::{FileStat, Mode};
> +use futures::future::BoxFuture;
> +use futures::FutureExt;
>  
>  use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
>  use pxar::Metadata;
> -use pxar::encoder::LinkOffset;
> +use pxar::encoder::{SeqWrite, LinkOffset};
>  
>  use proxmox::c_str;
>  use proxmox::sys::error::SysError;
> @@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter {
>      }
>  }
>  
> -struct Archiver<'a, 'b> {
> +struct Archiver {
>      feature_flags: Flags,
>      fs_feature_flags: Flags,
>      fs_magic: i64,
>      patterns: Vec<MatchEntry>,
> -    callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>,
> -    catalog: Option<&'b mut dyn BackupCatalogWriter>,
> +    callback: Box<dyn FnMut(&Path) -> Result<(), Error> + Send>,
> +    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
>      path: PathBuf,
>      entry_counter: usize,
>      entry_limit: usize,
> @@ -147,19 +150,19 @@ struct Archiver<'a, 'b> {
>      file_copy_buffer: Vec<u8>,
>  }
>  
> -type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
> +type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
>  
> -pub fn create_archive<T, F>(
> +pub async fn create_archive<T, F>(
>      source_dir: Dir,
>      mut writer: T,
>      feature_flags: Flags,
> -    mut callback: F,
> -    catalog: Option<&mut dyn BackupCatalogWriter>,
> +    callback: F,
> +    catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
>      options: PxarCreateOptions,
>  ) -> Result<(), Error>
>  where
> -    T: pxar::encoder::SeqWrite,
> -    F: FnMut(&Path) -> Result<(), Error>,
> +    T: SeqWrite + Send,
> +    F: FnMut(&Path) -> Result<(), Error> + Send + 'static,
>  {
>      let fs_magic = detect_fs_type(source_dir.as_raw_fd())?;
>      if is_virtual_file_system(fs_magic) {
> @@ -182,8 +185,7 @@ where
>          set.insert(stat.st_dev);
>      }
>  
> -    let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite;
> -    let mut encoder = Encoder::new(writer, &metadata)?;
> +    let mut encoder = Encoder::new(&mut writer, &metadata).await?;
>  
>      let mut patterns = options.patterns;
>  
> @@ -199,7 +201,7 @@ where
>          feature_flags,
>          fs_feature_flags,
>          fs_magic,
> -        callback: &mut callback,
> +        callback: Box::new(callback),
>          patterns,
>          catalog,
>          path: PathBuf::new(),
> @@ -213,8 +215,8 @@ where
>          file_copy_buffer: vec::undefined(4 * 1024 * 1024),
>      };
>  
> -    archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
> -    encoder.finish()?;
> +    archiver.archive_dir_contents(&mut encoder, source_dir, true).await?;
> +    encoder.finish().await?;
>      Ok(())
>  }
>  
> @@ -224,7 +226,7 @@ struct FileListEntry {
>      stat: FileStat,
>  }
>  
> -impl<'a, 'b> Archiver<'a, 'b> {
> +impl Archiver {
>      /// Get the currently effective feature flags. (Requested flags masked by the file system
>      /// feature flags).
>      fn flags(&self) -> Flags {
> @@ -239,49 +241,51 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          }
>      }
>  
> -    fn archive_dir_contents(
> -        &mut self,
> -        encoder: &mut Encoder,
> +    fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>(
> +        &'a mut self,
> +        encoder: &'a mut Encoder<'b, T>,
>          mut dir: Dir,
>          is_root: bool,
> -    ) -> Result<(), Error> {
> -        let entry_counter = self.entry_counter;
> +    ) -> BoxFuture<'a, Result<(), Error>> {
> +        async move {
> +            let entry_counter = self.entry_counter;
>  
> -        let old_patterns_count = self.patterns.len();
> -        self.read_pxar_excludes(dir.as_raw_fd())?;
> +            let old_patterns_count = self.patterns.len();
> +            self.read_pxar_excludes(dir.as_raw_fd())?;
>  
> -        let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
> +            let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?;
>  
> -        if is_root && old_patterns_count > 0 {
> -            file_list.push(FileListEntry {
> -                name: CString::new(".pxarexclude-cli").unwrap(),
> -                path: PathBuf::new(),
> -                stat: unsafe { std::mem::zeroed() },
> -            });
> -        }
> -
> -        let dir_fd = dir.as_raw_fd();
> -
> -        let old_path = std::mem::take(&mut self.path);
> -
> -        for file_entry in file_list {
> -            let file_name = file_entry.name.to_bytes();
> -
> -            if is_root && file_name == b".pxarexclude-cli" {
> -                self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?;
> -                continue;
> +            if is_root && old_patterns_count > 0 {
> +                file_list.push(FileListEntry {
> +                    name: CString::new(".pxarexclude-cli").unwrap(),
> +                    path: PathBuf::new(),
> +                    stat: unsafe { std::mem::zeroed() },
> +                });
>              }
>  
> -            (self.callback)(&file_entry.path)?;
> -            self.path = file_entry.path;
> -            self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
> -                .map_err(|err| self.wrap_err(err))?;
> -        }
> -        self.path = old_path;
> -        self.entry_counter = entry_counter;
> -        self.patterns.truncate(old_patterns_count);
> +            let dir_fd = dir.as_raw_fd();
>  
> -        Ok(())
> +            let old_path = std::mem::take(&mut self.path);
> +
> +            for file_entry in file_list {
> +                let file_name = file_entry.name.to_bytes();
> +
> +                if is_root && file_name == b".pxarexclude-cli" {
> +                    self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?;
> +                    continue;
> +                }
> +
> +                (self.callback)(&file_entry.path)?;
> +                self.path = file_entry.path;
> +                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await
> +                    .map_err(|err| self.wrap_err(err))?;
> +            }
> +            self.path = old_path;
> +            self.entry_counter = entry_counter;
> +            self.patterns.truncate(old_patterns_count);
> +
> +            Ok(())
> +        }.boxed()
>      }
>  
>      /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
> @@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          Ok(())
>      }
>  
> -    fn encode_pxarexclude_cli(
> +    async fn encode_pxarexclude_cli<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          file_name: &CStr,
>          patterns_count: usize,
>      ) -> Result<(), Error> {
>          let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
> -
> -        if let Some(ref mut catalog) = self.catalog {
> -            catalog.add_file(file_name, content.len() as u64, 0)?;
> +        if let Some(ref catalog) = self.catalog {
> +            catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?;
>          }
>  
>          let mut metadata = Metadata::default();
>          metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
>  
> -        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?;
> -        file.write_all(&content)?;
> +        let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?;
> +        file.write_all(&content).await?;
>  
>          Ok(())
>      }
> @@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          Ok(())
>      }
>  
> -    fn add_entry(
> +    async fn add_entry<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          parent: RawFd,
>          c_file_name: &CStr,
>          stat: &FileStat,
> @@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> {
>  
>                  if stat.st_nlink > 1 {
>                      if let Some((path, offset)) = self.hardlinks.get(&link_info) {
> -                        if let Some(ref mut catalog) = self.catalog {
> -                            catalog.add_hardlink(c_file_name)?;
> +                        if let Some(ref catalog) = self.catalog {
> +                            catalog.lock().unwrap().add_hardlink(c_file_name)?;
>                          }
>  
> -                        encoder.add_hardlink(file_name, path, *offset)?;
> +                        encoder.add_hardlink(file_name, path, *offset).await?;
>  
>                          return Ok(());
>                      }
>                  }
>  
>                  let file_size = stat.st_size as u64;
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_file(c_file_name, file_size, stat.st_mtime)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?;
>                  }
>  
>                  let offset: LinkOffset =
> -                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?;
> +                    self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?;
>  
>                  if stat.st_nlink > 1 {
>                      self.hardlinks.insert(link_info, (self.path.clone(), offset));
> @@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              mode::IFDIR => {
>                  let dir = Dir::from_fd(fd.into_raw_fd())?;
>  
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.start_directory(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().start_directory(c_file_name)?;
>                  }
> -                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat);
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.end_directory()?;
> +                let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().end_directory()?;
>                  }
>                  result
>              }
>              mode::IFSOCK => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_socket(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_socket(c_file_name)?;
>                  }
>  
> -                Ok(encoder.add_socket(&metadata, file_name)?)
> +                Ok(encoder.add_socket(&metadata, file_name).await?)
>              }
>              mode::IFIFO => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_fifo(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_fifo(c_file_name)?;
>                  }
>  
> -                Ok(encoder.add_fifo(&metadata, file_name)?)
> +                Ok(encoder.add_fifo(&metadata, file_name).await?)
>              }
>              mode::IFLNK => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_symlink(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_symlink(c_file_name)?;
>                  }
>  
> -                self.add_symlink(encoder, fd, file_name, &metadata)
> +                self.add_symlink(encoder, fd, file_name, &metadata).await
>              }
>              mode::IFBLK => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_block_device(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_block_device(c_file_name)?;
>                  }
>  
> -                self.add_device(encoder, file_name, &metadata, &stat)
> +                self.add_device(encoder, file_name, &metadata, &stat).await
>              }
>              mode::IFCHR => {
> -                if let Some(ref mut catalog) = self.catalog {
> -                    catalog.add_char_device(c_file_name)?;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_char_device(c_file_name)?;
>                  }
>  
> -                self.add_device(encoder, file_name, &metadata, &stat)
> +                self.add_device(encoder, file_name, &metadata, &stat).await
>              }
>              other => bail!(
>                  "encountered unknown file type: 0x{:x} (0o{:o})",
> @@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          }
>      }
>  
> -    fn add_directory(
> +    async fn add_directory<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          dir: Dir,
>          dir_name: &CStr,
>          metadata: &Metadata,
> @@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>      ) -> Result<(), Error> {
>          let dir_name = OsStr::from_bytes(dir_name.to_bytes());
>  
> -        let mut encoder = encoder.create_directory(dir_name, &metadata)?;
> +        let mut encoder = encoder.create_directory(dir_name, &metadata).await?;
>  
>          let old_fs_magic = self.fs_magic;
>          let old_fs_feature_flags = self.fs_feature_flags;
> @@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              writeln!(self.logger, "skipping mount point: {:?}", self.path)?;
>              Ok(())
>          } else {
> -            self.archive_dir_contents(&mut encoder, dir, false)
> +            self.archive_dir_contents(&mut encoder, dir, false).await
>          };
>  
>          self.fs_magic = old_fs_magic;
>          self.fs_feature_flags = old_fs_feature_flags;
>          self.current_st_dev = old_st_dev;
>  
> -        encoder.finish()?;
> +        encoder.finish().await?;
>          result
>      }
>  
> -    fn add_regular_file(
> +    async fn add_regular_file<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          fd: Fd,
>          file_name: &Path,
>          metadata: &Metadata,
> @@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>      ) -> Result<LinkOffset, Error> {
>          let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
>          let mut remaining = file_size;
> -        let mut out = encoder.create_file(metadata, file_name, file_size)?;
> +        let mut out = encoder.create_file(metadata, file_name, file_size).await?;
>          while remaining != 0 {
>              let mut got = match file.read(&mut self.file_copy_buffer[..]) {
>                  Ok(0) => break,
> @@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>                  self.report_file_grew_while_reading()?;
>                  got = remaining as usize;
>              }
> -            out.write_all(&self.file_copy_buffer[..got])?;
> +            out.write_all(&self.file_copy_buffer[..got]).await?;
>              remaining -= got as u64;
>          }
>          if remaining > 0 {
> @@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              vec::clear(&mut self.file_copy_buffer[..to_zero]);
>              while remaining != 0 {
>                  let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
> -                out.write_all(&self.file_copy_buffer[..fill])?;
> +                out.write_all(&self.file_copy_buffer[..fill]).await?;
>                  remaining -= fill as u64;
>              }
>          }
> @@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> {
>          Ok(out.file_offset())
>      }
>  
> -    fn add_symlink(
> +    async fn add_symlink<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          fd: Fd,
>          file_name: &Path,
>          metadata: &Metadata,
>      ) -> Result<(), Error> {
>          let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?;
> -        encoder.add_symlink(metadata, file_name, dest)?;
> +        encoder.add_symlink(metadata, file_name, dest).await?;
>          Ok(())
>      }
>  
> -    fn add_device(
> +    async fn add_device<T: SeqWrite + Send>(
>          &mut self,
> -        encoder: &mut Encoder,
> +        encoder: &mut Encoder<'_, T>,
>          file_name: &Path,
>          metadata: &Metadata,
>          stat: &FileStat,
> @@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
>              metadata,
>              file_name,
>              pxar::format::Device::from_dev_t(stat.st_rdev),
> -        )?)
> +        ).await?)
>      }
>  }
>  
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 2d9dea71..550600c6 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
>          ..PxarCreateOptions::default()
>      };
>  
> -    create_archive(
> +    let rt = tokio::runtime::Runtime::new().unwrap();
> +    rt.block_on(create_archive(
>          dir,
>          writer,
>          Flags::DEFAULT,
>          |_| Ok(()),
>          None,
>          options,
> -    )?;
> +    ))?;
>  
>      Command::new("cmp")
>          .arg("--verbose")
> -- 
> 2.20.1




^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2021-02-17  9:03 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-09 12:03 [pbs-devel] [PATCH 0/2] use async pxar encoder Stefan Reiter
2021-02-09 12:03 ` [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async Stefan Reiter
2021-02-09 14:20   ` Wolfgang Bumiller
2021-02-17  8:53   ` [pbs-devel] applied: " Wolfgang Bumiller
2021-02-09 12:03 ` [pbs-devel] [PATCH proxmox-backup 2/2] asyncify pxar create_archive Stefan Reiter
2021-02-17  9:02   ` [pbs-devel] applied: " Wolfgang Bumiller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal