public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [RFC v2 proxmox-backup 33/36] client: pxar: add look-ahead caching
Date: Tue, 12 Mar 2024 15:08:49 +0100	[thread overview]
Message-ID: <1710246197.nrcmjqhopa.astroid@yuna.none> (raw)
In-Reply-To: <20240305092703.126906-34-c.ebner@proxmox.com>

On March 5, 2024 10:27 am, Christian Ebner wrote:
> Implements the methods to cache entries in a look-ahead cache and
> flush the entries to archive, either by re-using and injecting the
> payload chunks from the previous backup snapshot and storing the
> reference to it, or by re-encoding the chunks.

this is a bit terse for the amount of code below ;)

> 
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> changes since version 1:
> - fix flushing of final chunk before archive finish
> - fix formatting
> - remove unneeded log output
> 
>  pbs-client/src/pxar/create.rs | 293 ++++++++++++++++++++++++++++++++++
>  1 file changed, 293 insertions(+)
> 
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 3b221b54..b2ce898f 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -828,6 +828,299 @@ impl Archiver {
>          }
>      }
>  
> +    async fn cache_or_flush_entries<T: SeqWrite + Send>(

this is only called if we have a previous payload

> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,

so this must always be set for the call to make any sense, so we can
drop the Option<>.

> +        c_file_name: &CStr,
> +        stat: &FileStat,
> +        fd: OwnedFd,
> +        metadata: &Metadata,
> +    ) -> Result<(), Error> {
> +        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> +        let reusable = if let Some(accessor) = accessor {
> +            self.is_reusable_entry(accessor, file_name, stat, metadata)
> +                .await?

then this part here

> +        } else {
> +            None
> +        };
> +
> +        let file_size = stat.st_size as u64;
> +        if let Some(start_offset) = reusable {

can just be inlined here

> +            if let Some(ref ref_payload_index) = self.previous_payload_index {
> +                let end_offset = start_offset + file_size;
> +                let (indices, start_padding, _end_padding) =
> +                    ref_payload_index.indices(start_offset, end_offset)?;

already noted at the patch introducing `indices`, this is the only call
site, and one of the tuple members is not used..

> +
> +                let boundary = encoder.payload_position()?;
> +                let offset = self.reused_chunks.insert(indices, boundary, start_padding);
> +
> +                if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD {
> +                    self.flush_cached_to_archive(encoder, true, true).await?;
> +

this vvvvvv

> +                    encoder
> +                        .add_payload_ref(metadata, file_name, file_size, offset)
> +                        .await?;
> +
> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog
> +                            .lock()
> +                            .unwrap()
> +                            .add_file(&c_file_name, file_size, stat.st_mtime)?;
> +                    }

    ^^^^^^^

also happens in self.flush_cached_to_archive -> self.flush_entry_archive
since we pass reuse_chunks, so couldn't we just flush above and then continue
with the else branch below to cache this new entry, instead of
inline-adding the one bypassing the cache?

> +                } else {
> +                    self.caching_enabled = true;
> +                    self.cached_payload_size += file_size;
> +                    let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> +                        fd,
> +                        c_file_name.into(),
> +                        stat.clone(),
> +                        metadata.clone(),
> +                        offset,
> +                    ));
> +                    self.cached_entries.push(cache_entry);
> +                }
> +
> +                return Ok(());
> +            }
> +        }
> +
> +        self.flush_cached_to_archive(encoder, false, true).await?;
> +        self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
> +            .await
> +    }
> +
> +    async fn flush_cached_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        reuse_chunks: bool,
> +        keep_back_last_chunk: bool,
> +    ) -> Result<(), Error> {
> +        if reuse_chunks {
> +            self.flush_reused_chunks(encoder, keep_back_last_chunk)?;
> +        } else {
> +            self.clear_cached_chunks(encoder)?;
> +        }
> +        let entries = std::mem::take(&mut self.cached_entries);
> +
> +        self.caching_enabled = false;
> +        self.cached_payload_size = 0;
> +
> +        for entry in entries {
> +            match entry {
> +                CacheEntry::RegEntry(data) => {
> +                    self.flush_entry_to_archive(encoder, data, reuse_chunks)
> +                        .await?
> +                }
> +                CacheEntry::PxarExcludeCliEntry(entry, old_patterns_count) => {
> +                    self.encode_pxarexclude_cli(encoder, &entry.name, old_patterns_count)
> +                        .await?;
> +                }
> +                CacheEntry::DirEntry(data) => {
> +                    self.flush_directory_to_archive(encoder, data).await?
> +                }
> +                CacheEntry::DirEnd => {
> +                    let result = encoder.finish().await?;

nit: result is `()`, there's no point in keeping it

> +                    if let Some(ref catalog) = self.catalog {
> +                        catalog.lock().unwrap().end_directory()?;
> +                    }
> +                    result

nor is there in returning it here

> +                }
> +            }

especially since the whole match result is discarded anyway ;)

> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn flush_reused_chunks<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        keep_back_last_chunk: bool,
> +    ) -> Result<(), Error> {
> +        let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
> +
> +        // Do not inject the last reused chunk directly, but keep it as base for further entries
> +        // to reduce chunk duplication. Needs to be flushed even on cache clear!
> +        let last_chunk = if keep_back_last_chunk {
> +            reused_chunks.chunks.pop()
> +        } else {
> +            None
> +        };
> +
> +        let mut injection_boundary = reused_chunks.start_boundary();
> +        for chunks in reused_chunks.chunks.chunks(128) {
> +            let size = chunks.iter().fold(0u64, |sum, chunk| sum + chunk.size());
> +            let inject_chunks = InjectChunks {
> +                boundary: injection_boundary.raw(),
> +                chunks: chunks.to_vec(),
> +                size: size as usize,
> +            };
> +            let mut boundary = self.forced_boundaries.lock().unwrap();
> +            boundary.push_back(inject_chunks);
> +            injection_boundary = injection_boundary.add(size);
> +            encoder.advance(size)?;
> +        }
> +
> +        if let Some(chunk) = last_chunk {
> +            let _offset = self
> +                .reused_chunks
> +                .insert(vec![chunk], injection_boundary, 0);
> +            // Make sure that we flush this chunk even on clear calls
> +            self.reused_chunks.must_flush_first = true;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn clear_cached_chunks<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +    ) -> Result<(), Error> {
> +        let reused_chunks = std::mem::take(&mut self.reused_chunks);

this might deserve a comment or a more explicit call ;) took me a while
to follow a long..
> +
> +        if !reused_chunks.must_flush_first {
> +            return Ok(());
> +        }
> +
> +        // First chunk was kept back to avoid duplication but needs to be injected
> +        let injection_boundary = reused_chunks.start_boundary();
> +        if let Some(chunk) = reused_chunks.chunks.first() {
> +            let size = chunk.size();
> +            let inject_chunks = InjectChunks {
> +                boundary: injection_boundary.raw(),
> +                chunks: vec![chunk.clone()],
> +                size: size as usize,
> +            };
> +            let mut boundary = self.forced_boundaries.lock().unwrap();
> +            boundary.push_back(inject_chunks);
> +            encoder.advance(size)?;
> +        } else {
> +            bail!("missing first chunk");
> +        }
> +
> +        Ok(())
> +    }
> +
> +    async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>(
> +        &'a mut self,
> +        encoder: &'a mut Encoder<'b, T>,
> +        entry_data: CacheEntryData,
> +    ) -> Result<(), Error> {
> +        let CacheEntryData {
> +            c_file_name,
> +            metadata,
> +            ..
> +        } = entry_data;
> +        let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> +
> +        if let Some(ref catalog) = self.catalog {
> +            catalog.lock().unwrap().start_directory(&c_file_name)?;
> +        }
> +
> +        encoder.create_directory(dir_name, &metadata).await?;
> +
> +        Ok(())
> +    }
> +
> +    async fn flush_entry_to_archive<T: SeqWrite + Send>(
> +        &mut self,
> +        encoder: &mut Encoder<'_, T>,
> +        entry_data: CacheEntryData,
> +        reuse_chunks: bool,
> +    ) -> Result<(), Error> {
> +        use pxar::format::mode;
> +
> +        let CacheEntryData {
> +            fd,
> +            c_file_name,
> +            stat,
> +            metadata,
> +            payload_offset,
> +        } = entry_data;
> +        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();

starting here this is almost 100% identical to add_entry_to_archive,
could we maybe somehow merge them?

> +
> +        match metadata.file_type() {
> +            mode::IFREG => {
> +                let link_info = HardLinkInfo {
> +                    st_dev: stat.st_dev,
> +                    st_ino: stat.st_ino,
> +                };
> +
> +                if stat.st_nlink > 1 {
> +                    if let Some((path, offset)) = self.hardlinks.get(&link_info) {
> +                        if let Some(ref catalog) = self.catalog {
> +                            catalog.lock().unwrap().add_hardlink(&c_file_name)?;
> +                        }
> +                        encoder.add_hardlink(file_name, path, *offset).await?;
> +                        return Ok(());
> +                    }
> +                }
> +
> +                let file_size = stat.st_size as u64;
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog
> +                        .lock()
> +                        .unwrap()
> +                        .add_file(&c_file_name, file_size, stat.st_mtime)?;
> +                }
> +
> +                if reuse_chunks {
> +                    encoder
> +                        .add_payload_ref(&metadata, file_name, file_size, payload_offset)
> +                        .await?;
> +                } else {
> +                    let offset: LinkOffset = self
> +                        .add_regular_file(encoder, fd, file_name, &metadata, file_size)
> +                        .await?;
> +
> +                    if stat.st_nlink > 1 {
> +                        self.hardlinks
> +                            .insert(link_info, (self.path.clone(), offset));
> +                    }
> +                }
> +            }
> +            mode::IFSOCK => {
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_socket(&c_file_name)?;
> +                }
> +                encoder.add_socket(&metadata, file_name).await?;
> +            }
> +            mode::IFIFO => {
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_fifo(&c_file_name)?;
> +                }
> +                encoder.add_fifo(&metadata, file_name).await?;
> +            }
> +            mode::IFLNK => {
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_symlink(&c_file_name)?;
> +                }
> +                self.add_symlink(encoder, fd, file_name, &metadata).await?;
> +            }
> +            mode::IFBLK => {
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_block_device(&c_file_name)?;
> +                }
> +                self.add_device(encoder, file_name, &metadata, &stat)
> +                    .await?;
> +            }
> +            mode::IFCHR => {
> +                if let Some(ref catalog) = self.catalog {
> +                    catalog.lock().unwrap().add_char_device(&c_file_name)?;
> +                }
> +                self.add_device(encoder, file_name, &metadata, &stat)
> +                    .await?;
> +            }
> +            other => bail!(
> +                "encountered unknown file type: 0x{:x} (0o{:o})",
> +                other,
> +                other
> +            ),
> +        }
> +
> +        Ok(())
> +    }
> +
>      async fn add_directory<T: SeqWrite + Send>(
>          &mut self,
>          encoder: &mut Encoder<'_, T>,
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




  reply	other threads:[~2024-03-12 14:08 UTC|newest]

Thread overview: 94+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-03-05  9:26 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 01/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 02/36] encoder: add optional output writer for file payloads Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 13:50     ` Christian Ebner
2024-03-11 15:41       ` Fabian Grünbichler
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 03/36] format/decoder: add method to read payload references Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 04/36] decoder: add optional payload input stream Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:05     ` Christian Ebner
2024-03-11 15:27       ` Fabian Grünbichler
2024-03-11 15:51         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 05/36] accessor: " Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 06/36] encoder: move to stack based state tracking Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:12     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 07/36] encoder: add payload reference capability Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:15     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 08/36] encoder: add payload position capability Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 09/36] encoder: add payload advance capability Christian Ebner
2024-03-11 13:22   ` Fabian Grünbichler
2024-03-11 14:22     ` Christian Ebner
2024-03-11 15:27       ` Fabian Grünbichler
2024-03-11 15:41         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
2024-03-11 14:57   ` Fabian Grünbichler
2024-03-11 15:12     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
2024-03-11 14:57   ` Fabian Grünbichler
2024-03-11 15:22     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:26     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 17/36] restore: " Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:29     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:31     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:24     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:31     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
2024-03-12  8:50   ` Fabian Grünbichler
2024-03-14  8:23     ` Christian Ebner
2024-03-12 12:47   ` Dietmar Maurer
2024-03-12 12:51     ` Christian Ebner
2024-03-12 13:03       ` Dietmar Maurer
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
2024-03-13  9:43   ` Dietmar Maurer
2024-03-14 14:03     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
2024-03-12  9:46   ` Fabian Grünbichler
2024-03-19 10:52     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
2024-03-12 10:07   ` Fabian Grünbichler
2024-03-19 11:51     ` Christian Ebner
2024-03-19 12:49       ` Fabian Grünbichler
2024-03-20  8:37         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
2024-03-12 12:12   ` Fabian Grünbichler
2024-03-12 12:25     ` Christian Ebner
2024-03-19 12:59     ` Christian Ebner
2024-03-19 13:04       ` Fabian Grünbichler
2024-03-20  8:52         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
2024-03-12 12:17   ` Fabian Grünbichler
2024-03-12 12:31     ` Christian Ebner
2024-03-20  9:28       ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
2024-03-12 14:08   ` Fabian Grünbichler [this message]
2024-03-20 10:28     ` Christian Ebner
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-03-13 11:12   ` Fabian Grünbichler
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-03-13 11:48   ` Fabian Grünbichler
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner
2024-03-13 11:18   ` Fabian Grünbichler
2024-03-13 11:44 ` [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Fabian Grünbichler

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1710246197.nrcmjqhopa.astroid@yuna.none \
    --to=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal