From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 48F4C631F7 for ; Tue, 9 Feb 2021 13:04:34 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3F8EB2CAA7 for ; Tue, 9 Feb 2021 13:04:04 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 019532CA90 for ; Tue, 9 Feb 2021 13:04:00 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id C3A0E418B1 for ; Tue, 9 Feb 2021 13:03:59 +0100 (CET) From: Stefan Reiter To: pbs-devel@lists.proxmox.com Date: Tue, 9 Feb 2021 13:03:47 +0100 Message-Id: <20210209120348.8359-2-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210209120348.8359-1-s.reiter@proxmox.com> References: <20210209120348.8359-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.036 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [aio.rs, sync.rs, mod.rs] Subject: [pbs-devel] [PATCH pxar 1/2] make aio::Encoder actually behave with async X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 09 Feb 2021 12:04:34 -0000 To really use the encoder with async/await, it needs to support SeqWrite implementations that are Send. This requires changing a whole bunch of '&mut dyn SeqWrite' trait objects to instead take a 'T: SeqWrite' generic parameter directly instead. Most of this is quite straightforward, though incurs a lot of churn (FileImpl needs a generic parameter now for example). The trickiest part is returning a new Encoder instance in create_directory, as the trait object trick with SeqWrite::as_trait_object doesn't work if SeqWrite is implemented for generic '&mut S'. Instead, work with the generic object directly, and express the owned/borrowed state in the Encoder (to avoid nested borrowing) as an enum EncoderOutput. Add to the aio test to ensure the Encoder is now actually useable. Signed-off-by: Stefan Reiter --- src/encoder/aio.rs | 48 ++++++++--------- src/encoder/mod.rs | 128 +++++++++++++++++++++++--------------------- src/encoder/sync.rs | 28 ++++------ 3 files changed, 101 insertions(+), 103 deletions(-) diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs index f4b96b3..6b90ce5 100644 --- a/src/encoder/aio.rs +++ b/src/encoder/aio.rs @@ -13,12 +13,12 @@ use crate::Metadata; /// /// This is the `async` version of the `pxar` encoder. #[repr(transparent)] -pub struct Encoder<'a, T: SeqWrite + 'a> { +pub struct Encoder<'a, T: SeqWrite + 'a + Send> { inner: encoder::EncoderImpl<'a, T>, } #[cfg(feature = "tokio-io")] -impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter> { +impl<'a, T: tokio::io::AsyncWrite + 'a + Send> Encoder<'a, TokioWriter> { /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output. #[inline] pub async fn from_tokio( @@ -44,11 +44,11 @@ impl<'a> Encoder<'a, TokioWriter> { } } -impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { +impl<'a, T: SeqWrite + 'a + Send> Encoder<'a, T> { /// Create an asynchronous encoder for an output implementing our internal write interface. pub async fn new(output: T, metadata: &Metadata) -> io::Result> { Ok(Self { - inner: encoder::EncoderImpl::new(output, metadata).await?, + inner: encoder::EncoderImpl::new(output.into(), metadata).await?, }) } @@ -60,7 +60,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { metadata: &Metadata, file_name: P, file_size: u64, - ) -> io::Result> + ) -> io::Result> where 'a: 'b, { @@ -94,14 +94,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the /// `finish()` method, otherwise the entire archive will be in an error state. - pub async fn create_directory<'b, P: AsRef>( - &'b mut self, + pub async fn create_directory>( + &mut self, file_name: P, metadata: &Metadata, - ) -> io::Result> - where - 'a: 'b, - { + ) -> io::Result> { Ok(Encoder { inner: self .inner @@ -111,15 +108,10 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { } /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`. - pub async fn finish(self) -> io::Result { + pub async fn finish(self) -> io::Result<()> { self.inner.finish().await } - /// Cancel this directory and get back the contained writer. - pub fn into_writer(self) -> T { - self.inner.into_writer() - } - /// Add a symbolic link to the archive. pub async fn add_symlink, PT: AsRef>( &mut self, @@ -176,11 +168,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { } #[repr(transparent)] -pub struct File<'a> { - inner: encoder::FileImpl<'a>, +pub struct File<'a, S: SeqWrite> { + inner: encoder::FileImpl<'a, S>, } -impl<'a> File<'a> { +impl<'a, S: SeqWrite> File<'a, S> { /// Get the file offset to be able to reference it with `add_hardlink`. pub fn file_offset(&self) -> LinkOffset { self.inner.file_offset() @@ -198,7 +190,7 @@ impl<'a> File<'a> { } #[cfg(feature = "tokio-io")] -impl<'a> tokio::io::AsyncWrite for File<'a> { +impl<'a, S: SeqWrite> tokio::io::AsyncWrite for File<'a, S> { fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll> { unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data) } @@ -294,10 +286,16 @@ mod test { let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build()) .await .unwrap(); - encoder - .create_directory("baba", &Metadata::dir_builder(0o700).build()) - .await - .unwrap(); + { + let mut dir = encoder + .create_directory("baba", &Metadata::dir_builder(0o700).build()) + .await + .unwrap(); + dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024) + .await + .unwrap(); + } + encoder.finish().await.unwrap(); }; fn test_send(_: T) {} diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs index 6ce35e0..428a5c5 100644 --- a/src/encoder/mod.rs +++ b/src/encoder/mod.rs @@ -48,20 +48,13 @@ pub trait SeqWrite { ) -> Poll>; fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll>; - - /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper. - /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another - /// subdirectory in that would have a trait object pointing to the trait object, and so on. - fn as_trait_object(&mut self) -> &mut dyn SeqWrite - where - Self: Sized, - { - self as &mut dyn SeqWrite - } } /// Allow using trait objects for generics taking a `SeqWrite`. -impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) { +impl SeqWrite for &mut S +where + S: SeqWrite + ?Sized, +{ fn poll_seq_write( self: Pin<&mut Self>, cx: &mut Context, @@ -76,13 +69,6 @@ impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) } } - - fn as_trait_object(&mut self) -> &mut dyn SeqWrite - where - Self: Sized, - { - &mut **self - } } /// awaitable verison of `poll_seq_write`. @@ -230,12 +216,38 @@ impl EncoderState { } } +pub(crate) enum EncoderOutput<'a, T> { + Owned(T), + Borrowed(&'a mut T), +} + +impl<'a, T> std::convert::AsMut for EncoderOutput<'a, T> { + fn as_mut(&mut self) -> &mut T { + match self { + EncoderOutput::Owned(ref mut o) => o, + EncoderOutput::Borrowed(b) => b, + } + } +} + +impl<'a, T> std::convert::From for EncoderOutput<'a, T> { + fn from(t: T) -> Self { + EncoderOutput::Owned(t) + } +} + +impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> { + fn from(t: &'a mut T) -> Self { + EncoderOutput::Borrowed(t) + } +} + /// The encoder state machine implementation for a directory. /// /// We use `async fn` to implement the encoder state machine so that we can easily plug in both /// synchronous or `async` I/O objects in as output. pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { - output: Option, + output: EncoderOutput<'a, T>, state: EncoderState, parent: Option<&'a mut EncoderState>, finished: bool, @@ -261,12 +273,12 @@ impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> { } impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { - pub async fn new(output: T, metadata: &Metadata) -> io::Result> { + pub(crate) async fn new(output: EncoderOutput<'a, T>, metadata: &Metadata) -> io::Result> { if !metadata.is_dir() { io_bail!("directory metadata must contain the directory mode flag"); } let mut this = Self { - output: Some(output), + output, state: EncoderState::default(), parent: None, finished: false, @@ -292,7 +304,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { metadata: &Metadata, file_name: &Path, file_size: u64, - ) -> io::Result> + ) -> io::Result> where 'a: 'b, { @@ -305,7 +317,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { metadata: &Metadata, file_name: &[u8], file_size: u64, - ) -> io::Result> + ) -> io::Result> where 'a: 'b, { @@ -318,7 +330,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { header.check_header_size()?; seq_write_struct( - self.output.as_mut().unwrap(), + self.output.as_mut(), header, &mut self.state.write_position, ) @@ -329,7 +341,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { let meta_size = payload_data_offset - file_offset; Ok(FileImpl { - output: self.output.as_mut().unwrap(), + output: self.output.as_mut(), goodbye_item: GoodbyeItem { hash: format::hash_filename(file_name), offset: file_offset, @@ -471,7 +483,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.start_file_do(metadata, file_name).await?; if let Some((htype, entry_data)) = entry_htype_data { seq_write_pxar_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), htype, entry_data, &mut self.state.write_position, @@ -495,14 +507,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.state.write_position } - pub async fn create_directory<'b>( - &'b mut self, + pub async fn create_directory( + &mut self, file_name: &Path, metadata: &Metadata, - ) -> io::Result> - where - 'a: 'b, - { + ) -> io::Result> { self.check()?; if !metadata.is_dir() { @@ -523,8 +532,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { // the child will write to OUR state now: let write_position = self.position(); + let file_copy_buffer = Arc::clone(&self.file_copy_buffer); + Ok(EncoderImpl { - output: self.output.as_mut().map(SeqWrite::as_trait_object), + // always forward as Borrowed(), to avoid stacking references on nested calls + output: self.output.as_mut().into(), state: EncoderState { entry_offset, files_offset, @@ -535,7 +547,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { }, parent: Some(&mut self.state), finished: false, - file_copy_buffer: Arc::clone(&self.file_copy_buffer), + file_copy_buffer, }) } @@ -553,7 +565,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_ENTRY, metadata.stat.clone(), &mut self.state.write_position, @@ -579,7 +591,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> { seq_write_pxar_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_XATTR, &xattr.data, &mut self.state.write_position, @@ -590,7 +602,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> { for acl in &acl.users { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_ACL_USER, acl.clone(), &mut self.state.write_position, @@ -600,7 +612,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { for acl in &acl.groups { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_ACL_GROUP, acl.clone(), &mut self.state.write_position, @@ -610,7 +622,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { if let Some(acl) = &acl.group_obj { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_ACL_GROUP_OBJ, acl.clone(), &mut self.state.write_position, @@ -620,7 +632,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { if let Some(acl) = &acl.default { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_ACL_DEFAULT, acl.clone(), &mut self.state.write_position, @@ -630,7 +642,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { for acl in &acl.default_users { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_ACL_DEFAULT_USER, acl.clone(), &mut self.state.write_position, @@ -640,7 +652,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { for acl in &acl.default_groups { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_ACL_DEFAULT_GROUP, acl.clone(), &mut self.state.write_position, @@ -653,7 +665,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> { seq_write_pxar_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_FCAPS, &fcaps.data, &mut self.state.write_position, @@ -666,7 +678,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { quota_project_id: &format::QuotaProjectId, ) -> io::Result<()> { seq_write_pxar_struct_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_QUOTA_PROJID, *quota_project_id, &mut self.state.write_position, @@ -677,7 +689,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> { crate::util::validate_filename(file_name)?; seq_write_pxar_entry_zero( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_FILENAME, file_name, &mut self.state.write_position, @@ -685,10 +697,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { .await } - pub async fn finish(mut self) -> io::Result { + pub async fn finish(mut self) -> io::Result<()> { let tail_bytes = self.finish_goodbye_table().await?; seq_write_pxar_entry( - self.output.as_mut().unwrap(), + self.output.as_mut(), format::PXAR_GOODBYE, &tail_bytes, &mut self.state.write_position, @@ -713,11 +725,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { }); } self.finished = true; - Ok(self.output.take().unwrap()) - } - - pub fn into_writer(mut self) -> T { - self.output.take().unwrap() + Ok(()) } async fn finish_goodbye_table(&mut self) -> io::Result> { @@ -764,8 +772,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } /// Writer for a file object in a directory. -pub struct FileImpl<'a> { - output: &'a mut dyn SeqWrite, +pub struct FileImpl<'a, S: SeqWrite> { + output: &'a mut S, /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it /// directly instead of on Drop of FileImpl? @@ -780,7 +788,7 @@ pub struct FileImpl<'a> { parent: &'a mut EncoderState, } -impl<'a> Drop for FileImpl<'a> { +impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> { fn drop(&mut self) { if self.remaining_size != 0 { self.parent.add_error(EncodeError::IncompleteFile); @@ -790,7 +798,7 @@ impl<'a> Drop for FileImpl<'a> { } } -impl<'a> FileImpl<'a> { +impl<'a, S: SeqWrite> FileImpl<'a, S> { /// Get the file offset to be able to reference it with `add_hardlink`. pub fn file_offset(&self) -> LinkOffset { LinkOffset(self.goodbye_item.offset) @@ -828,7 +836,7 @@ impl<'a> FileImpl<'a> { #[cfg(feature = "tokio-io")] pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { unsafe { - self.map_unchecked_mut(|this| &mut this.output) + self.map_unchecked_mut(|this| this.output) .poll_flush(cx) } } @@ -840,7 +848,7 @@ impl<'a> FileImpl<'a> { #[cfg(feature = "tokio-io")] pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { unsafe { - self.map_unchecked_mut(|this| &mut this.output) + self.map_unchecked_mut(|this| this.output) .poll_flush(cx) } } @@ -864,14 +872,14 @@ impl<'a> FileImpl<'a> { /// Completely write file data for the current file entry in a pxar archive. pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { self.check_remaining(data.len())?; - seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?; + seq_write_all(self.output, data, &mut self.parent.write_position).await?; self.remaining_size -= data.len() as u64; Ok(()) } } #[cfg(feature = "tokio-io")] -impl<'a> tokio::io::AsyncWrite for FileImpl<'a> { +impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> { fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { FileImpl::poll_write(self, cx, buf) } diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs index 9ace8bf..859714d 100644 --- a/src/encoder/sync.rs +++ b/src/encoder/sync.rs @@ -52,7 +52,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { /// not allowed to use the `Waker`, as this will cause a `panic!`. pub fn new(output: T, metadata: &Metadata) -> io::Result { Ok(Self { - inner: poll_result_once(encoder::EncoderImpl::new(output, metadata))?, + inner: poll_result_once(encoder::EncoderImpl::new(output.into(), metadata))?, }) } @@ -64,7 +64,7 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { metadata: &Metadata, file_name: P, file_size: u64, - ) -> io::Result> + ) -> io::Result> where 'a: 'b, { @@ -95,29 +95,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the /// `finish()` method, otherwise the entire archive will be in an error state. - pub fn create_directory<'b, P: AsRef>( - &'b mut self, + pub fn create_directory>( + &mut self, file_name: P, metadata: &Metadata, - ) -> io::Result> - where - 'a: 'b, - { + ) -> io::Result> { Ok(Encoder { inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?, }) } /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`. - pub fn finish(self) -> io::Result { + pub fn finish(self) -> io::Result<()> { poll_result_once(self.inner.finish()) } - /// Cancel this directory and get back the contained writer. - pub fn into_writer(self) -> T { - self.inner.into_writer() - } - /// Add a symbolic link to the archive. pub fn add_symlink, PT: AsRef>( &mut self, @@ -174,18 +166,18 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { } #[repr(transparent)] -pub struct File<'a> { - inner: encoder::FileImpl<'a>, +pub struct File<'a, S: SeqWrite> { + inner: encoder::FileImpl<'a, S>, } -impl<'a> File<'a> { +impl<'a, S: SeqWrite> File<'a, S> { /// Get the file offset to be able to reference it with `add_hardlink`. pub fn file_offset(&self) -> LinkOffset { self.inner.file_offset() } } -impl<'a> io::Write for File<'a> { +impl<'a, S: SeqWrite> io::Write for File<'a, S> { fn write(&mut self, data: &[u8]) -> io::Result { poll_result_once(self.inner.write(data)) } -- 2.20.1