From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH proxmox-backup 6/9] backup: remove AsyncIndexReader
Date: Wed, 2 Jun 2021 16:38:30 +0200 [thread overview]
Message-ID: <20210602143833.4423-7-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210602143833.4423-1-s.reiter@proxmox.com>
superseded by CachedChunkReader, with less code and more speed
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup.rs | 3 -
src/backup/async_index_reader.rs | 215 -------------------------------
2 files changed, 218 deletions(-)
delete mode 100644 src/backup/async_index_reader.rs
diff --git a/src/backup.rs b/src/backup.rs
index 5e1147b4..7bf29a5a 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -257,8 +257,5 @@ pub use verify::*;
mod catalog_shell;
pub use catalog_shell::*;
-mod async_index_reader;
-pub use async_index_reader::*;
-
mod cached_chunk_reader;
pub use cached_chunk_reader::*;
diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
deleted file mode 100644
index 20a37e7e..00000000
--- a/src/backup/async_index_reader.rs
+++ /dev/null
@@ -1,215 +0,0 @@
-use std::future::Future;
-use std::task::{Poll, Context};
-use std::pin::Pin;
-use std::io::SeekFrom;
-
-use anyhow::Error;
-use futures::future::FutureExt;
-use futures::ready;
-use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
-
-use proxmox::sys::error::io_err_other;
-use proxmox::io_format_err;
-
-use super::IndexFile;
-use super::read_chunk::AsyncReadChunk;
-use super::index::ChunkReadInfo;
-
-type ReadFuture<S> = dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static;
-
-// FIXME: This enum may not be required?
-// - Put the `WaitForData` case directly into a `read_future: Option<>`
-// - make the read loop as follows:
-// * if read_buffer is not empty:
-// use it
-// * else if read_future is there:
-// poll it
-// if read: move data to read_buffer
-// * else
-// create read future
-#[allow(clippy::enum_variant_names)]
-enum AsyncIndexReaderState<S> {
- NoData,
- WaitForData(Pin<Box<ReadFuture<S>>>),
- HaveData,
-}
-
-pub struct AsyncIndexReader<S, I: IndexFile> {
- store: Option<S>,
- index: I,
- read_buffer: Vec<u8>,
- current_chunk_offset: u64,
- current_chunk_idx: usize,
- current_chunk_info: Option<ChunkReadInfo>,
- position: u64,
- seek_to_pos: i64,
- state: AsyncIndexReaderState<S>,
-}
-
-// ok because the only public interfaces operates on &mut Self
-unsafe impl<S: Sync, I: IndexFile + Sync> Sync for AsyncIndexReader<S, I> {}
-
-impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
- pub fn new(index: I, store: S) -> Self {
- Self {
- store: Some(store),
- index,
- read_buffer: Vec::with_capacity(1024 * 1024),
- current_chunk_offset: 0,
- current_chunk_idx: 0,
- current_chunk_info: None,
- position: 0,
- seek_to_pos: 0,
- state: AsyncIndexReaderState::NoData,
- }
- }
-}
-
-impl<S, I> AsyncRead for AsyncIndexReader<S, I>
-where
- S: AsyncReadChunk + Unpin + Sync + 'static,
- I: IndexFile + Unpin,
-{
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &mut ReadBuf,
- ) -> Poll<tokio::io::Result<()>> {
- let this = Pin::get_mut(self);
- loop {
- match &mut this.state {
- AsyncIndexReaderState::NoData => {
- let (idx, offset) = if this.current_chunk_info.is_some() &&
- this.position == this.current_chunk_info.as_ref().unwrap().range.end
- {
- // optimization for sequential chunk read
- let next_idx = this.current_chunk_idx + 1;
- (next_idx, 0)
- } else {
- match this.index.chunk_from_offset(this.position) {
- Some(res) => res,
- None => return Poll::Ready(Ok(()))
- }
- };
-
- if idx >= this.index.index_count() {
- return Poll::Ready(Ok(()));
- }
-
- let info = this
- .index
- .chunk_info(idx)
- .ok_or_else(|| io_format_err!("could not get digest"))?;
-
- this.current_chunk_offset = offset;
- this.current_chunk_idx = idx;
- let old_info = this.current_chunk_info.replace(info.clone());
-
- if let Some(old_info) = old_info {
- if old_info.digest == info.digest {
- // hit, chunk is currently in cache
- this.state = AsyncIndexReaderState::HaveData;
- continue;
- }
- }
-
- // miss, need to download new chunk
- let store = match this.store.take() {
- Some(store) => store,
- None => {
- return Poll::Ready(Err(io_format_err!("could not find store")));
- }
- };
-
- let future = async move {
- store.read_chunk(&info.digest)
- .await
- .map(move |x| (store, x))
- };
-
- this.state = AsyncIndexReaderState::WaitForData(future.boxed());
- }
- AsyncIndexReaderState::WaitForData(ref mut future) => {
- match ready!(future.as_mut().poll(cx)) {
- Ok((store, chunk_data)) => {
- this.read_buffer = chunk_data;
- this.state = AsyncIndexReaderState::HaveData;
- this.store = Some(store);
- }
- Err(err) => {
- return Poll::Ready(Err(io_err_other(err)));
- }
- };
- }
- AsyncIndexReaderState::HaveData => {
- let offset = this.current_chunk_offset as usize;
- let len = this.read_buffer.len();
- let n = if len - offset < buf.remaining() {
- len - offset
- } else {
- buf.remaining()
- };
-
- buf.put_slice(&this.read_buffer[offset..(offset + n)]);
- this.position += n as u64;
-
- if offset + n == len {
- this.state = AsyncIndexReaderState::NoData;
- } else {
- this.current_chunk_offset += n as u64;
- this.state = AsyncIndexReaderState::HaveData;
- }
-
- return Poll::Ready(Ok(()));
- }
- }
- }
- }
-}
-
-impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
-where
- S: AsyncReadChunk + Unpin + Sync + 'static,
- I: IndexFile + Unpin,
-{
- fn start_seek(
- self: Pin<&mut Self>,
- pos: SeekFrom,
- ) -> tokio::io::Result<()> {
- let this = Pin::get_mut(self);
- this.seek_to_pos = match pos {
- SeekFrom::Start(offset) => {
- offset as i64
- },
- SeekFrom::End(offset) => {
- this.index.index_bytes() as i64 + offset
- },
- SeekFrom::Current(offset) => {
- this.position as i64 + offset
- }
- };
- Ok(())
- }
-
- fn poll_complete(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- ) -> Poll<tokio::io::Result<u64>> {
- let this = Pin::get_mut(self);
-
- let index_bytes = this.index.index_bytes();
- if this.seek_to_pos < 0 {
- return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
- } else if this.seek_to_pos > index_bytes as i64 {
- this.position = index_bytes;
- } else {
- this.position = this.seek_to_pos as u64;
- }
-
- // even if seeking within one chunk, we need to go to NoData to
- // recalculate the current_chunk_offset (data is cached anyway)
- this.state = AsyncIndexReaderState::NoData;
-
- Poll::Ready(Ok(this.position))
- }
-}
--
2.30.2
WARNING: multiple messages have this Message-ID
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 6/9] backup: remove AsyncIndexReader
Date: Wed, 2 Jun 2021 16:38:30 +0200 [thread overview]
Message-ID: <20210602143833.4423-7-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210602143833.4423-1-s.reiter@proxmox.com>
superseded by CachedChunkReader, with less code and more speed
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup.rs | 3 -
src/backup/async_index_reader.rs | 215 -------------------------------
2 files changed, 218 deletions(-)
delete mode 100644 src/backup/async_index_reader.rs
diff --git a/src/backup.rs b/src/backup.rs
index 5e1147b4..7bf29a5a 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -257,8 +257,5 @@ pub use verify::*;
mod catalog_shell;
pub use catalog_shell::*;
-mod async_index_reader;
-pub use async_index_reader::*;
-
mod cached_chunk_reader;
pub use cached_chunk_reader::*;
diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
deleted file mode 100644
index 20a37e7e..00000000
--- a/src/backup/async_index_reader.rs
+++ /dev/null
@@ -1,215 +0,0 @@
-use std::future::Future;
-use std::task::{Poll, Context};
-use std::pin::Pin;
-use std::io::SeekFrom;
-
-use anyhow::Error;
-use futures::future::FutureExt;
-use futures::ready;
-use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
-
-use proxmox::sys::error::io_err_other;
-use proxmox::io_format_err;
-
-use super::IndexFile;
-use super::read_chunk::AsyncReadChunk;
-use super::index::ChunkReadInfo;
-
-type ReadFuture<S> = dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static;
-
-// FIXME: This enum may not be required?
-// - Put the `WaitForData` case directly into a `read_future: Option<>`
-// - make the read loop as follows:
-// * if read_buffer is not empty:
-// use it
-// * else if read_future is there:
-// poll it
-// if read: move data to read_buffer
-// * else
-// create read future
-#[allow(clippy::enum_variant_names)]
-enum AsyncIndexReaderState<S> {
- NoData,
- WaitForData(Pin<Box<ReadFuture<S>>>),
- HaveData,
-}
-
-pub struct AsyncIndexReader<S, I: IndexFile> {
- store: Option<S>,
- index: I,
- read_buffer: Vec<u8>,
- current_chunk_offset: u64,
- current_chunk_idx: usize,
- current_chunk_info: Option<ChunkReadInfo>,
- position: u64,
- seek_to_pos: i64,
- state: AsyncIndexReaderState<S>,
-}
-
-// ok because the only public interfaces operates on &mut Self
-unsafe impl<S: Sync, I: IndexFile + Sync> Sync for AsyncIndexReader<S, I> {}
-
-impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
- pub fn new(index: I, store: S) -> Self {
- Self {
- store: Some(store),
- index,
- read_buffer: Vec::with_capacity(1024 * 1024),
- current_chunk_offset: 0,
- current_chunk_idx: 0,
- current_chunk_info: None,
- position: 0,
- seek_to_pos: 0,
- state: AsyncIndexReaderState::NoData,
- }
- }
-}
-
-impl<S, I> AsyncRead for AsyncIndexReader<S, I>
-where
- S: AsyncReadChunk + Unpin + Sync + 'static,
- I: IndexFile + Unpin,
-{
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &mut ReadBuf,
- ) -> Poll<tokio::io::Result<()>> {
- let this = Pin::get_mut(self);
- loop {
- match &mut this.state {
- AsyncIndexReaderState::NoData => {
- let (idx, offset) = if this.current_chunk_info.is_some() &&
- this.position == this.current_chunk_info.as_ref().unwrap().range.end
- {
- // optimization for sequential chunk read
- let next_idx = this.current_chunk_idx + 1;
- (next_idx, 0)
- } else {
- match this.index.chunk_from_offset(this.position) {
- Some(res) => res,
- None => return Poll::Ready(Ok(()))
- }
- };
-
- if idx >= this.index.index_count() {
- return Poll::Ready(Ok(()));
- }
-
- let info = this
- .index
- .chunk_info(idx)
- .ok_or_else(|| io_format_err!("could not get digest"))?;
-
- this.current_chunk_offset = offset;
- this.current_chunk_idx = idx;
- let old_info = this.current_chunk_info.replace(info.clone());
-
- if let Some(old_info) = old_info {
- if old_info.digest == info.digest {
- // hit, chunk is currently in cache
- this.state = AsyncIndexReaderState::HaveData;
- continue;
- }
- }
-
- // miss, need to download new chunk
- let store = match this.store.take() {
- Some(store) => store,
- None => {
- return Poll::Ready(Err(io_format_err!("could not find store")));
- }
- };
-
- let future = async move {
- store.read_chunk(&info.digest)
- .await
- .map(move |x| (store, x))
- };
-
- this.state = AsyncIndexReaderState::WaitForData(future.boxed());
- }
- AsyncIndexReaderState::WaitForData(ref mut future) => {
- match ready!(future.as_mut().poll(cx)) {
- Ok((store, chunk_data)) => {
- this.read_buffer = chunk_data;
- this.state = AsyncIndexReaderState::HaveData;
- this.store = Some(store);
- }
- Err(err) => {
- return Poll::Ready(Err(io_err_other(err)));
- }
- };
- }
- AsyncIndexReaderState::HaveData => {
- let offset = this.current_chunk_offset as usize;
- let len = this.read_buffer.len();
- let n = if len - offset < buf.remaining() {
- len - offset
- } else {
- buf.remaining()
- };
-
- buf.put_slice(&this.read_buffer[offset..(offset + n)]);
- this.position += n as u64;
-
- if offset + n == len {
- this.state = AsyncIndexReaderState::NoData;
- } else {
- this.current_chunk_offset += n as u64;
- this.state = AsyncIndexReaderState::HaveData;
- }
-
- return Poll::Ready(Ok(()));
- }
- }
- }
- }
-}
-
-impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
-where
- S: AsyncReadChunk + Unpin + Sync + 'static,
- I: IndexFile + Unpin,
-{
- fn start_seek(
- self: Pin<&mut Self>,
- pos: SeekFrom,
- ) -> tokio::io::Result<()> {
- let this = Pin::get_mut(self);
- this.seek_to_pos = match pos {
- SeekFrom::Start(offset) => {
- offset as i64
- },
- SeekFrom::End(offset) => {
- this.index.index_bytes() as i64 + offset
- },
- SeekFrom::Current(offset) => {
- this.position as i64 + offset
- }
- };
- Ok(())
- }
-
- fn poll_complete(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- ) -> Poll<tokio::io::Result<u64>> {
- let this = Pin::get_mut(self);
-
- let index_bytes = this.index.index_bytes();
- if this.seek_to_pos < 0 {
- return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
- } else if this.seek_to_pos > index_bytes as i64 {
- this.position = index_bytes;
- } else {
- this.position = this.seek_to_pos as u64;
- }
-
- // even if seeking within one chunk, we need to go to NoData to
- // recalculate the current_chunk_offset (data is cached anyway)
- this.state = AsyncIndexReaderState::NoData;
-
- Poll::Ready(Ok(this.position))
- }
-}
--
2.30.2
next prev parent reply other threads:[~2021-06-02 14:38 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-06-02 14:38 [pve-devel] [PATCH 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-04 12:22 ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:22 ` Wolfgang Bumiller
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-04 12:30 ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:30 ` Wolfgang Bumiller
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` Stefan Reiter [this message]
2021-06-02 14:38 ` [pbs-devel] [PATCH proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-04 12:16 ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:16 ` Wolfgang Bumiller
2021-06-07 8:03 ` [pve-devel] " Stefan Reiter
2021-06-07 8:03 ` Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210602143833.4423-7-s.reiter@proxmox.com \
--to=s.reiter@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.