public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH v2 proxmox-backup 6/9] backup: remove AsyncIndexReader
Date: Mon,  7 Jun 2021 17:35:29 +0200	[thread overview]
Message-ID: <20210607153532.2522267-7-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210607153532.2522267-1-s.reiter@proxmox.com>

superseded by CachedChunkReader, with less code and more speed

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup.rs                    |   3 -
 src/backup/async_index_reader.rs | 215 -------------------------------
 2 files changed, 218 deletions(-)
 delete mode 100644 src/backup/async_index_reader.rs

diff --git a/src/backup.rs b/src/backup.rs
index 5e1147b4..7bf29a5a 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -257,8 +257,5 @@ pub use verify::*;
 mod catalog_shell;
 pub use catalog_shell::*;
 
-mod async_index_reader;
-pub use async_index_reader::*;
-
 mod cached_chunk_reader;
 pub use cached_chunk_reader::*;
diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
deleted file mode 100644
index 20a37e7e..00000000
--- a/src/backup/async_index_reader.rs
+++ /dev/null
@@ -1,215 +0,0 @@
-use std::future::Future;
-use std::task::{Poll, Context};
-use std::pin::Pin;
-use std::io::SeekFrom;
-
-use anyhow::Error;
-use futures::future::FutureExt;
-use futures::ready;
-use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
-
-use proxmox::sys::error::io_err_other;
-use proxmox::io_format_err;
-
-use super::IndexFile;
-use super::read_chunk::AsyncReadChunk;
-use super::index::ChunkReadInfo;
-
-type ReadFuture<S> = dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static;
-
-// FIXME: This enum may not be required?
-// - Put the `WaitForData` case directly into a `read_future: Option<>`
-// - make the read loop as follows:
-//   * if read_buffer is not empty:
-//        use it
-//   * else if read_future is there:
-//        poll it
-//        if read: move data to read_buffer
-//   * else
-//        create read future
-#[allow(clippy::enum_variant_names)]
-enum AsyncIndexReaderState<S> {
-    NoData,
-    WaitForData(Pin<Box<ReadFuture<S>>>),
-    HaveData,
-}
-
-pub struct AsyncIndexReader<S, I: IndexFile> {
-    store: Option<S>,
-    index: I,
-    read_buffer: Vec<u8>,
-    current_chunk_offset: u64,
-    current_chunk_idx: usize,
-    current_chunk_info: Option<ChunkReadInfo>,
-    position: u64,
-    seek_to_pos: i64,
-    state: AsyncIndexReaderState<S>,
-}
-
-// ok because the only public interfaces operates on &mut Self
-unsafe impl<S: Sync, I: IndexFile + Sync> Sync for AsyncIndexReader<S, I> {}
-
-impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
-    pub fn new(index: I, store: S) -> Self {
-        Self {
-            store: Some(store),
-            index,
-            read_buffer: Vec::with_capacity(1024 * 1024),
-            current_chunk_offset: 0,
-            current_chunk_idx: 0,
-            current_chunk_info: None,
-            position: 0,
-            seek_to_pos: 0,
-            state: AsyncIndexReaderState::NoData,
-        }
-    }
-}
-
-impl<S, I> AsyncRead for AsyncIndexReader<S, I>
-where
-    S: AsyncReadChunk + Unpin + Sync + 'static,
-    I: IndexFile + Unpin,
-{
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context,
-        buf: &mut ReadBuf,
-    ) -> Poll<tokio::io::Result<()>> {
-        let this = Pin::get_mut(self);
-        loop {
-            match &mut this.state {
-                AsyncIndexReaderState::NoData => {
-                    let (idx, offset) = if this.current_chunk_info.is_some() &&
-                        this.position == this.current_chunk_info.as_ref().unwrap().range.end
-                    {
-                        // optimization for sequential chunk read
-                        let next_idx = this.current_chunk_idx + 1;
-                        (next_idx, 0)
-                    } else {
-                        match this.index.chunk_from_offset(this.position) {
-                            Some(res) => res,
-                            None => return Poll::Ready(Ok(()))
-                        }
-                    };
-
-                    if idx >= this.index.index_count() {
-                        return Poll::Ready(Ok(()));
-                    }
-
-                    let info = this
-                        .index
-                        .chunk_info(idx)
-                        .ok_or_else(|| io_format_err!("could not get digest"))?;
-
-                    this.current_chunk_offset = offset;
-                    this.current_chunk_idx = idx;
-                    let old_info = this.current_chunk_info.replace(info.clone());
-
-                    if let Some(old_info) = old_info {
-                        if old_info.digest == info.digest {
-                            // hit, chunk is currently in cache
-                            this.state = AsyncIndexReaderState::HaveData;
-                            continue;
-                        }
-                    }
-
-                    // miss, need to download new chunk
-                    let store = match this.store.take() {
-                        Some(store) => store,
-                        None => {
-                            return Poll::Ready(Err(io_format_err!("could not find store")));
-                        }
-                    };
-
-                    let future = async move {
-                        store.read_chunk(&info.digest)
-                            .await
-                            .map(move |x| (store, x))
-                    };
-
-                    this.state = AsyncIndexReaderState::WaitForData(future.boxed());
-                }
-                AsyncIndexReaderState::WaitForData(ref mut future) => {
-                    match ready!(future.as_mut().poll(cx)) {
-                        Ok((store, chunk_data)) => {
-                            this.read_buffer = chunk_data;
-                            this.state = AsyncIndexReaderState::HaveData;
-                            this.store = Some(store);
-                        }
-                        Err(err) => {
-                            return Poll::Ready(Err(io_err_other(err)));
-                        }
-                    };
-                }
-                AsyncIndexReaderState::HaveData => {
-                    let offset = this.current_chunk_offset as usize;
-                    let len = this.read_buffer.len();
-                    let n = if len - offset < buf.remaining() {
-                        len - offset
-                    } else {
-                        buf.remaining()
-                    };
-
-                    buf.put_slice(&this.read_buffer[offset..(offset + n)]);
-                    this.position += n as u64;
-
-                    if offset + n == len {
-                        this.state = AsyncIndexReaderState::NoData;
-                    } else {
-                        this.current_chunk_offset += n as u64;
-                        this.state = AsyncIndexReaderState::HaveData;
-                    }
-
-                    return Poll::Ready(Ok(()));
-                }
-            }
-        }
-    }
-}
-
-impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
-where
-    S: AsyncReadChunk + Unpin + Sync + 'static,
-    I: IndexFile + Unpin,
-{
-    fn start_seek(
-        self: Pin<&mut Self>,
-        pos: SeekFrom,
-    ) -> tokio::io::Result<()> {
-        let this = Pin::get_mut(self);
-        this.seek_to_pos = match pos {
-            SeekFrom::Start(offset) => {
-                offset as i64
-            },
-            SeekFrom::End(offset) => {
-                this.index.index_bytes() as i64 + offset
-            },
-            SeekFrom::Current(offset) => {
-                this.position as i64 + offset
-            }
-        };
-        Ok(())
-    }
-
-    fn poll_complete(
-        self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<tokio::io::Result<u64>> {
-        let this = Pin::get_mut(self);
-
-        let index_bytes = this.index.index_bytes();
-        if this.seek_to_pos < 0 {
-            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
-        } else if this.seek_to_pos > index_bytes as i64 {
-            this.position = index_bytes;
-        } else {
-            this.position = this.seek_to_pos as u64;
-        }
-
-        // even if seeking within one chunk, we need to go to NoData to
-        // recalculate the current_chunk_offset (data is cached anyway)
-        this.state = AsyncIndexReaderState::NoData;
-
-        Poll::Ready(Ok(this.position))
-    }
-}
-- 
2.30.2





  parent reply	other threads:[~2021-06-07 15:36 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
2021-06-07 15:35 ` Stefan Reiter [this message]
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
2021-06-08  7:58 ` [pve-devel] applied: [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Wolfgang Bumiller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210607153532.2522267-7-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal