From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 46EFC95D83 for ; Wed, 28 Feb 2024 15:02:52 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 41B36CF13 for ; Wed, 28 Feb 2024 15:02:51 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 28 Feb 2024 15:02:47 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 56256479E4 for ; Wed, 28 Feb 2024 15:02:47 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 28 Feb 2024 15:01:55 +0100 Message-Id: <20240228140226.1251979-6-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com> References: <20240228140226.1251979-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.048 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [RFC pxar 05/36] accessor: add optional payload input stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 28 Feb 2024 14:02:52 -0000 Allows to read regular file payloads from a split pxar archive, where the payload stream has been redirected to a different archive on creation. Signed-off-by: Christian Ebner --- src/accessor/aio.rs | 7 ++++ src/accessor/mod.rs | 85 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 82 insertions(+), 10 deletions(-) diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs index 98d7755..db6c5e4 100644 --- a/src/accessor/aio.rs +++ b/src/accessor/aio.rs @@ -91,6 +91,13 @@ impl Accessor { }) } + /// Take the file payloads from the provided input stream rather than the regular pxar stream + pub fn redirect_payload_input(self, payload_input: T) -> Self { + Self { + inner: self.inner.redirect_payload_input(payload_input), + } + } + /// Open a directory handle to the root of the pxar archive. pub async fn open_root_ref(&self) -> io::Result> { Ok(Directory::new(self.inner.open_root_ref().await?)) diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs index ed99c85..6b3dfd2 100644 --- a/src/accessor/mod.rs +++ b/src/accessor/mod.rs @@ -182,6 +182,7 @@ pub(crate) struct AccessorImpl { input: T, size: u64, caches: Arc, + payload_input: Option, } impl AccessorImpl { @@ -194,9 +195,15 @@ impl AccessorImpl { input, size, caches: Arc::new(Caches::default()), + payload_input: None, }) } + pub fn redirect_payload_input(mut self, payload_input: T) -> Self { + self.payload_input = Some(payload_input); + self + } + pub fn size(&self) -> u64 { self.size } @@ -207,6 +214,9 @@ impl AccessorImpl { self.size, "/".into(), Arc::clone(&self.caches), + self.payload_input + .as_ref() + .map(|input| input as &dyn ReadAt), ) .await } @@ -227,8 +237,21 @@ async fn get_decoder( input: T, entry_range: Range, path: PathBuf, + payload_input: Option, ) -> io::Result>> { - DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await + let mut decoder = DecoderImpl::new_full( + SeqReadAtAdapter::new(input, entry_range.clone()), + path, + true, + ) + .await?; + + if let Some(payload_input) = payload_input { + // Payload stream is just passed along, the range can therefore be zero + decoder = decoder.redirect_payload_input(SeqReadAtAdapter::new(payload_input, 0..0)); + } + + Ok(decoder) } // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing! @@ -236,6 +259,7 @@ async fn get_decoder_at_filename( input: T, entry_range: Range, path: PathBuf, + payload_input: Option, ) -> io::Result<(DecoderImpl>, u64)> { // Read the header, it should be a FILENAME, then skip over it and its length: let header: format::Header = read_entry_at(&input, entry_range.start).await?; @@ -251,7 +275,7 @@ async fn get_decoder_at_filename( } Ok(( - get_decoder(input, entry_offset..entry_range.end, path).await?, + get_decoder(input, entry_offset..entry_range.end, path, payload_input).await?, entry_offset, )) } @@ -263,6 +287,7 @@ impl AccessorImpl { self.size, "/".into(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -274,6 +299,7 @@ impl AccessorImpl { offset, "/".into(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -287,23 +313,30 @@ impl AccessorImpl { self.input.clone(), entry_range_info.entry_range.clone(), PathBuf::new(), + self.payload_input.clone(), ) .await?; let entry = decoder .next() .await .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??; + Ok(FileEntryImpl { input: self.input.clone(), entry, entry_range_info: entry_range_info.clone(), caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } /// Allow opening arbitrary contents from a specific range. pub unsafe fn open_contents_at_range(&self, range: Range) -> FileContentsImpl { - FileContentsImpl::new(self.input.clone(), range) + if let Some(payload_input) = &self.payload_input { + FileContentsImpl::new(payload_input.clone(), range) + } else { + FileContentsImpl::new(self.input.clone(), range) + } } /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will @@ -326,9 +359,13 @@ impl AccessorImpl { let link_offset = entry_file_offset - link_offset; - let (mut decoder, entry_offset) = - get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new()) - .await?; + let (mut decoder, entry_offset) = get_decoder_at_filename( + self.input.clone(), + link_offset..self.size, + PathBuf::new(), + self.payload_input.clone(), + ) + .await?; let entry = decoder .next() @@ -354,6 +391,7 @@ impl AccessorImpl { entry_range: entry_offset..entry_end, }, caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } _ => io_bail!("hardlink does not point to a regular file"), @@ -370,6 +408,7 @@ pub(crate) struct DirectoryImpl { table: Arc<[GoodbyeItem]>, path: PathBuf, caches: Arc, + payload_input: Option, } impl DirectoryImpl { @@ -379,6 +418,7 @@ impl DirectoryImpl { end_offset: u64, path: PathBuf, caches: Arc, + payload_input: Option, ) -> io::Result> { let tail = Self::read_tail_entry(&input, end_offset).await?; @@ -408,6 +448,7 @@ impl DirectoryImpl { table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone), path, caches, + payload_input, }; // sanity check: @@ -503,6 +544,7 @@ impl DirectoryImpl { None => self.path.clone(), Some(file) => self.path.join(file), }, + self.payload_input.clone(), ) .await } @@ -534,6 +576,7 @@ impl DirectoryImpl { entry_range: self.entry_range(), }, caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } @@ -686,6 +729,7 @@ pub(crate) struct FileEntryImpl { entry: Entry, entry_range_info: EntryRangeInfo, caches: Arc, + payload_input: Option, } impl FileEntryImpl { @@ -699,6 +743,7 @@ impl FileEntryImpl { self.entry_range_info.entry_range.end, self.entry.path.clone(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -711,16 +756,35 @@ impl FileEntryImpl { } EntryKind::File { size, - offset: Some(offset), - .. - } => Ok(Some(offset..(offset + size))), + offset, + payload_offset, + } => { + // Payload offset will be some for PXAR_PAYLOAD_REF's + // It should win over the regular offset, since the actual payloads + // are stored in the separated payload input stream + if let Some(payload_offset) = payload_offset { + return Ok(Some(payload_offset..(payload_offset + size))); + } + + if let Some(offset) = offset { + return Ok(Some(offset..(offset + size))); + } + + Ok(None) + } _ => Ok(None), } } pub async fn contents(&self) -> io::Result> { match self.content_range()? { - Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)), + Some(range) => { + if let Some(ref payload_input) = self.payload_input { + Ok(FileContentsImpl::new(payload_input.clone(), range)) + } else { + Ok(FileContentsImpl::new(self.input.clone(), range)) + } + } None => io_bail!("not a file"), } } @@ -810,6 +874,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> { entry, entry_range_info: self.entry_range_info.clone(), caches: Arc::clone(&self.caches), + payload_input: self.dir.payload_input.clone(), }) } -- 2.39.2