From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <s.reiter@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id 17D8965390
 for <pbs-devel@lists.proxmox.com>; Wed, 22 Jul 2020 15:57:08 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id 085B8210BE
 for <pbs-devel@lists.proxmox.com>; Wed, 22 Jul 2020 15:56:38 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [212.186.127.180])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS id 3212421096
 for <pbs-devel@lists.proxmox.com>; Wed, 22 Jul 2020 15:56:35 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id E945D431A8
 for <pbs-devel@lists.proxmox.com>; Wed, 22 Jul 2020 15:56:34 +0200 (CEST)
From: Stefan Reiter <s.reiter@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Wed, 22 Jul 2020 15:56:22 +0200
Message-Id: <20200722135625.23653-3-s.reiter@proxmox.com>
X-Mailer: git-send-email 2.20.1
In-Reply-To: <20200722135625.23653-1-s.reiter@proxmox.com>
References: <20200722135625.23653-1-s.reiter@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.016 Adjusted score from AWL reputation of From: address
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_DNSWL_MED        -2.3 Sender listed at https://www.dnswl.org/,
 medium trust
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [index.rs, this.store]
Subject: [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for
 AsyncIndexReader
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Wed, 22 Jul 2020 13:57:08 -0000

Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.

Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.

AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------
 src/backup/index.rs              |   1 +
 2 files changed, 97 insertions(+), 20 deletions(-)

diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index 0911375e..98372aa1 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -1,30 +1,35 @@
 use std::future::Future;
 use std::task::{Poll, Context};
 use std::pin::Pin;
+use std::io::SeekFrom;
 
 use anyhow::Error;
 use futures::future::FutureExt;
 use futures::ready;
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, AsyncSeek};
 
 use proxmox::sys::error::io_err_other;
 use proxmox::io_format_err;
 
 use super::IndexFile;
 use super::read_chunk::AsyncReadChunk;
+use super::index::ChunkReadInfo;
 
 enum AsyncIndexReaderState<S> {
     NoData,
     WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
-    HaveData(usize),
+    HaveData,
 }
 
 pub struct AsyncIndexReader<S, I: IndexFile> {
     store: Option<S>,
     index: I,
     read_buffer: Vec<u8>,
+    current_chunk_offset: u64,
     current_chunk_idx: usize,
-    current_chunk_digest: [u8; 32],
+    current_chunk_info: Option<ChunkReadInfo>,
+    position: u64,
+    seek_to_pos: i64,
     state: AsyncIndexReaderState<S>,
 }
 
@@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
             store: Some(store),
             index,
             read_buffer: Vec::with_capacity(1024 * 1024),
+            current_chunk_offset: 0,
             current_chunk_idx: 0,
-            current_chunk_digest: [0u8; 32],
+            current_chunk_info: None,
+            position: 0,
+            seek_to_pos: 0,
             state: AsyncIndexReaderState::NoData,
         }
     }
@@ -58,23 +66,41 @@ where
         loop {
             match &mut this.state {
                 AsyncIndexReaderState::NoData => {
-                    if this.current_chunk_idx >= this.index.index_count() {
+                    let (idx, offset) = if this.current_chunk_info.is_some() &&
+                        this.position == this.current_chunk_info.as_ref().unwrap().range.end
+                    {
+                        // optimization for sequential chunk read
+                        let next_idx = this.current_chunk_idx + 1;
+                        (next_idx, 0)
+                    } else {
+                        match this.index.chunk_from_offset(this.position) {
+                            Some(res) => res,
+                            None => return Poll::Ready(Ok(0))
+                        }
+                    };
+
+                    if idx >= this.index.index_count() {
                         return Poll::Ready(Ok(0));
                     }
 
-                    let digest = this
+                    let info = this
                         .index
-                        .index_digest(this.current_chunk_idx)
-                        .ok_or(io_format_err!("could not get digest"))?
-                        .clone();
+                        .chunk_info(idx)
+                        .ok_or(io_format_err!("could not get digest"))?;
 
-                    if digest == this.current_chunk_digest {
-                        this.state = AsyncIndexReaderState::HaveData(0);
-                        continue;
+                    this.current_chunk_offset = offset;
+                    this.current_chunk_idx = idx;
+                    let old_info = this.current_chunk_info.replace(info.clone());
+
+                    if let Some(old_info) = old_info {
+                        if old_info.digest == info.digest {
+                            // hit, chunk is currently in cache
+                            this.state = AsyncIndexReaderState::HaveData;
+                            continue;
+                        }
                     }
 
-                    this.current_chunk_digest = digest;
-
+                    // miss, need to download new chunk
                     let store = match this.store.take() {
                         Some(store) => store,
                         None => {
@@ -83,7 +109,7 @@ where
                     };
 
                     let future = async move {
-                        store.read_chunk(&digest)
+                        store.read_chunk(&info.digest)
                             .await
                             .map(move |x| (store, x))
                     };
@@ -95,7 +121,7 @@ where
                         Ok((store, mut chunk_data)) => {
                             this.read_buffer.clear();
                             this.read_buffer.append(&mut chunk_data);
-                            this.state = AsyncIndexReaderState::HaveData(0);
+                            this.state = AsyncIndexReaderState::HaveData;
                             this.store = Some(store);
                         }
                         Err(err) => {
@@ -103,8 +129,8 @@ where
                         }
                     };
                 }
-                AsyncIndexReaderState::HaveData(offset) => {
-                    let offset = *offset;
+                AsyncIndexReaderState::HaveData => {
+                    let offset = this.current_chunk_offset as usize;
                     let len = this.read_buffer.len();
                     let n = if len - offset < buf.len() {
                         len - offset
@@ -113,11 +139,13 @@ where
                     };
 
                     buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+                    this.position += n as u64;
+
                     if offset + n == len {
                         this.state = AsyncIndexReaderState::NoData;
-                        this.current_chunk_idx += 1;
                     } else {
-                        this.state = AsyncIndexReaderState::HaveData(offset + n);
+                        this.current_chunk_offset += n as u64;
+                        this.state = AsyncIndexReaderState::HaveData;
                     }
 
                     return Poll::Ready(Ok(n));
@@ -126,3 +154,51 @@ where
         }
     }
 }
+
+impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
+where
+    S: AsyncReadChunk + Unpin + Sync + 'static,
+    I: IndexFile + Unpin,
+{
+    fn start_seek(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+        pos: SeekFrom,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => {
+                offset as i64
+            },
+            SeekFrom::End(offset) => {
+                this.index.index_bytes() as i64 + offset
+            },
+            SeekFrom::Current(offset) => {
+                this.position as i64 + offset
+            }
+        };
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index.index_bytes();
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        // even if seeking within one chunk, we need to go to NoData to
+        // recalculate the current_chunk_offset (data is cached anyway)
+        this.state = AsyncIndexReaderState::NoData;
+
+        Poll::Ready(Ok(this.position))
+    }
+}
diff --git a/src/backup/index.rs b/src/backup/index.rs
index 2eab8524..c6bab56e 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::ops::Range;
 
+#[derive(Clone)]
 pub struct ChunkReadInfo {
     pub range: Range<u64>,
     pub digest: [u8; 32],
-- 
2.20.1