all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader
Date: Wed, 22 Jul 2020 15:56:22 +0200	[thread overview]
Message-ID: <20200722135625.23653-3-s.reiter@proxmox.com> (raw)
In-Reply-To: <20200722135625.23653-1-s.reiter@proxmox.com>

Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.

Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.

AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------
 src/backup/index.rs              |   1 +
 2 files changed, 97 insertions(+), 20 deletions(-)

diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index 0911375e..98372aa1 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -1,30 +1,35 @@
 use std::future::Future;
 use std::task::{Poll, Context};
 use std::pin::Pin;
+use std::io::SeekFrom;
 
 use anyhow::Error;
 use futures::future::FutureExt;
 use futures::ready;
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, AsyncSeek};
 
 use proxmox::sys::error::io_err_other;
 use proxmox::io_format_err;
 
 use super::IndexFile;
 use super::read_chunk::AsyncReadChunk;
+use super::index::ChunkReadInfo;
 
 enum AsyncIndexReaderState<S> {
     NoData,
     WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
-    HaveData(usize),
+    HaveData,
 }
 
 pub struct AsyncIndexReader<S, I: IndexFile> {
     store: Option<S>,
     index: I,
     read_buffer: Vec<u8>,
+    current_chunk_offset: u64,
     current_chunk_idx: usize,
-    current_chunk_digest: [u8; 32],
+    current_chunk_info: Option<ChunkReadInfo>,
+    position: u64,
+    seek_to_pos: i64,
     state: AsyncIndexReaderState<S>,
 }
 
@@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
             store: Some(store),
             index,
             read_buffer: Vec::with_capacity(1024 * 1024),
+            current_chunk_offset: 0,
             current_chunk_idx: 0,
-            current_chunk_digest: [0u8; 32],
+            current_chunk_info: None,
+            position: 0,
+            seek_to_pos: 0,
             state: AsyncIndexReaderState::NoData,
         }
     }
@@ -58,23 +66,41 @@ where
         loop {
             match &mut this.state {
                 AsyncIndexReaderState::NoData => {
-                    if this.current_chunk_idx >= this.index.index_count() {
+                    let (idx, offset) = if this.current_chunk_info.is_some() &&
+                        this.position == this.current_chunk_info.as_ref().unwrap().range.end
+                    {
+                        // optimization for sequential chunk read
+                        let next_idx = this.current_chunk_idx + 1;
+                        (next_idx, 0)
+                    } else {
+                        match this.index.chunk_from_offset(this.position) {
+                            Some(res) => res,
+                            None => return Poll::Ready(Ok(0))
+                        }
+                    };
+
+                    if idx >= this.index.index_count() {
                         return Poll::Ready(Ok(0));
                     }
 
-                    let digest = this
+                    let info = this
                         .index
-                        .index_digest(this.current_chunk_idx)
-                        .ok_or(io_format_err!("could not get digest"))?
-                        .clone();
+                        .chunk_info(idx)
+                        .ok_or(io_format_err!("could not get digest"))?;
 
-                    if digest == this.current_chunk_digest {
-                        this.state = AsyncIndexReaderState::HaveData(0);
-                        continue;
+                    this.current_chunk_offset = offset;
+                    this.current_chunk_idx = idx;
+                    let old_info = this.current_chunk_info.replace(info.clone());
+
+                    if let Some(old_info) = old_info {
+                        if old_info.digest == info.digest {
+                            // hit, chunk is currently in cache
+                            this.state = AsyncIndexReaderState::HaveData;
+                            continue;
+                        }
                     }
 
-                    this.current_chunk_digest = digest;
-
+                    // miss, need to download new chunk
                     let store = match this.store.take() {
                         Some(store) => store,
                         None => {
@@ -83,7 +109,7 @@ where
                     };
 
                     let future = async move {
-                        store.read_chunk(&digest)
+                        store.read_chunk(&info.digest)
                             .await
                             .map(move |x| (store, x))
                     };
@@ -95,7 +121,7 @@ where
                         Ok((store, mut chunk_data)) => {
                             this.read_buffer.clear();
                             this.read_buffer.append(&mut chunk_data);
-                            this.state = AsyncIndexReaderState::HaveData(0);
+                            this.state = AsyncIndexReaderState::HaveData;
                             this.store = Some(store);
                         }
                         Err(err) => {
@@ -103,8 +129,8 @@ where
                         }
                     };
                 }
-                AsyncIndexReaderState::HaveData(offset) => {
-                    let offset = *offset;
+                AsyncIndexReaderState::HaveData => {
+                    let offset = this.current_chunk_offset as usize;
                     let len = this.read_buffer.len();
                     let n = if len - offset < buf.len() {
                         len - offset
@@ -113,11 +139,13 @@ where
                     };
 
                     buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+                    this.position += n as u64;
+
                     if offset + n == len {
                         this.state = AsyncIndexReaderState::NoData;
-                        this.current_chunk_idx += 1;
                     } else {
-                        this.state = AsyncIndexReaderState::HaveData(offset + n);
+                        this.current_chunk_offset += n as u64;
+                        this.state = AsyncIndexReaderState::HaveData;
                     }
 
                     return Poll::Ready(Ok(n));
@@ -126,3 +154,51 @@ where
         }
     }
 }
+
+impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
+where
+    S: AsyncReadChunk + Unpin + Sync + 'static,
+    I: IndexFile + Unpin,
+{
+    fn start_seek(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+        pos: SeekFrom,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => {
+                offset as i64
+            },
+            SeekFrom::End(offset) => {
+                this.index.index_bytes() as i64 + offset
+            },
+            SeekFrom::Current(offset) => {
+                this.position as i64 + offset
+            }
+        };
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index.index_bytes();
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        // even if seeking within one chunk, we need to go to NoData to
+        // recalculate the current_chunk_offset (data is cached anyway)
+        this.state = AsyncIndexReaderState::NoData;
+
+        Poll::Ready(Ok(this.position))
+    }
+}
diff --git a/src/backup/index.rs b/src/backup/index.rs
index 2eab8524..c6bab56e 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::ops::Range;
 
+#[derive(Clone)]
 pub struct ChunkReadInfo {
     pub range: Range<u64>,
     pub digest: [u8; 32],
-- 
2.20.1





  parent reply	other threads:[~2020-07-22 13:57 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
2020-07-22 14:16   ` Thomas Lamprecht
2020-07-22 14:24     ` Stefan Reiter
2020-07-22 14:41       ` Thomas Lamprecht
2020-07-22 13:56 ` Stefan Reiter [this message]
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
2020-07-23  8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200722135625.23653-3-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal