* [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings @ 2020-07-20 15:02 Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter ` (2 more replies) 0 siblings, 3 replies; 5+ messages in thread From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw) To: pbs-devel When using the PBS blockdriver with qemu-nbd (for example), it can happen that enough read requests are issued to saturate the tokio thread pool. Not an issue in general, but as me and Wolfgang painstakenly discovered a while back, it does break block_on, which is used in BufferedFixedReader. This means that reading larger amounts of data would hang the QEMU process [0]. Fix this by making the entire BufferedFixedReader API async, thus not requiring a block_on. Additionally I discovered a seperate bug (fixed by patch 3), wherein read requests that we're not aligned to the chunk size would return bogus data. This too only seems to happen in non-VM connections (e.g. nbd, etc...). [0] ...and since the NBD kernel driver appears to be horribly broken, this often also crashes most of the system, but that's a different story. If you ever get in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force disconnect the device ('qemu-nbd -d' intelligently issues a read before disconnecting, thus hanging before getting anything done...) backup: Stefan Reiter (1): make BufferedFixedReader async src/backup/fixed_index.rs | 145 ++++++++++---------------------------- 1 file changed, 37 insertions(+), 108 deletions(-) backup-qemu: Stefan Reiter (1): use new async BufferedFixedReader API src/restore.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) qemu: Stefan Reiter (1): PVE: PBS: iterate read_image_at until all data is available block/pbs.c | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async 2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter @ 2020-07-20 15:02 ` Stefan Reiter 2020-07-21 9:37 ` Dominik Csapak 2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter 2 siblings, 1 reply; 5+ messages in thread From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw) To: pbs-devel It's currently only used in proxmox-backup-qemu, so no users in this package need to be changed. This also allows simplifying the interface to just what is needed in proxmox-backup-qemu, i.e. by making it async we need to remove the std::io::Read trait, and that allows to get rid of the seeking workaround. Cache locking is done with a reader-writer lock, though multiple requests for one non-cached chunk are still possible (cannot hold a lock during async HTTP request) - though in testing, performance did not regress. The "sequential read" optimization is removed, since it didn't serve any purpose AFAICT. chunk_end was not used anywhere else (gave a warning). Signed-off-by: Stefan Reiter <s.reiter@proxmox.com> --- I'm unsure about the approach here, happy to hear feedback from anyone more familiar with Rust. I couldn't find a way to combine the "read" trait impl with async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only user I don't think this is an issue, but I don't know if this interface was put here deliberately, even if unused elsewhere. src/backup/fixed_index.rs | 145 ++++++++++---------------------------- 1 file changed, 37 insertions(+), 108 deletions(-) diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs index 73d0dad0..3f5fde2a 100644 --- a/src/backup/fixed_index.rs +++ b/src/backup/fixed_index.rs @@ -1,5 +1,6 @@ use anyhow::{bail, format_err, Error}; use std::io::{Seek, SeekFrom}; +use std::cmp::min; use super::chunk_stat::*; use super::chunk_store::*; @@ -11,7 +12,7 @@ use std::fs::File; use std::io::Write; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc,RwLock}; use super::read_chunk::*; use super::ChunkInfo; @@ -146,20 +147,6 @@ impl FixedIndexReader { Ok(()) } - #[inline] - fn chunk_end(&self, pos: usize) -> u64 { - if pos >= self.index_length { - panic!("chunk index out of range"); - } - - let end = ((pos + 1) * self.chunk_size) as u64; - if end > self.size { - self.size - } else { - end - } - } - pub fn print_info(&self) { println!("Size: {}", self.size); println!("ChunkSize: {}", self.chunk_size); @@ -466,27 +453,29 @@ impl FixedIndexWriter { } } +struct ChunkBuffer { + data: Vec<u8>, + chunk_idx: usize, +} + pub struct BufferedFixedReader<S> { store: S, index: FixedIndexReader, archive_size: u64, - read_buffer: Vec<u8>, - buffered_chunk_idx: usize, - buffered_chunk_start: u64, - read_offset: u64, + buffer: RwLock<ChunkBuffer>, } -impl<S: ReadChunk> BufferedFixedReader<S> { +impl<S: AsyncReadChunk> BufferedFixedReader<S> { pub fn new(index: FixedIndexReader, store: S) -> Self { let archive_size = index.size; Self { store, index, archive_size, - read_buffer: Vec::with_capacity(1024 * 1024), - buffered_chunk_idx: 0, - buffered_chunk_start: 0, - read_offset: 0, + buffer: RwLock::new(ChunkBuffer { + data: Vec::with_capacity(1024 * 1024 * 4), + chunk_idx: 0, + }), } } @@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> { self.archive_size } - fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { + async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> { let index = &self.index; let info = match index.chunk_info(idx) { Some(info) => info, @@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> { // fixme: avoid copy - let data = self.store.read_chunk(&info.digest)?; + let data = self.store.read_chunk(&info.digest).await?; let size = info.range.end - info.range.start; if size != data.len() as u64 { bail!("read chunk with wrong size ({} != {}", size, data.len()); } - self.read_buffer.clear(); - self.read_buffer.extend_from_slice(&data); + let mut buffer = self.buffer.write().unwrap(); + buffer.data.clear(); + buffer.data.extend_from_slice(&data); + buffer.chunk_idx = idx; - self.buffered_chunk_idx = idx; - - self.buffered_chunk_start = info.range.start as u64; - Ok(()) + Ok(data) } -} -impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> { - fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { + pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> { if offset == self.archive_size { - return Ok(&self.read_buffer[0..0]); + return Ok(0); } - let buffer_len = self.read_buffer.len(); - let index = &self.index; + let idx = (offset / self.index.chunk_size as u64) as usize; + + let mut copy_to_buf = move |from: &Vec<u8>| -> u64 { + let buffer_offset = (offset % self.index.chunk_size as u64) as usize; + let data_len = min(from.len() - buffer_offset, size); + unsafe { + std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len); + } + data_len as _ + }; - // optimization for sequential read - if buffer_len > 0 - && ((self.buffered_chunk_idx + 1) < index.index_length) - && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) { - let next_idx = self.buffered_chunk_idx + 1; - let next_end = index.chunk_end(next_idx); - if offset < next_end { - self.buffer_chunk(next_idx)?; - let buffer_offset = (offset - self.buffered_chunk_start) as usize; - return Ok(&self.read_buffer[buffer_offset..]); + let buffer = self.buffer.read().unwrap(); + if buffer.data.len() != 0 && buffer.chunk_idx == idx { + return Ok(copy_to_buf(&buffer.data)); } } - if (buffer_len == 0) - || (offset < self.buffered_chunk_start) - || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) - { - let idx = (offset / index.chunk_size as u64) as usize; - self.buffer_chunk(idx)?; - } - - let buffer_offset = (offset - self.buffered_chunk_start) as usize; - Ok(&self.read_buffer[buffer_offset..]) - } -} - -impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> { - fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { - use crate::tools::BufferedRead; - use std::io::{Error, ErrorKind}; - - let data = match self.buffered_read(self.read_offset) { - Ok(v) => v, - Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), - }; - - let n = if data.len() > buf.len() { - buf.len() - } else { - data.len() - }; - - unsafe { - std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); - } - - self.read_offset += n as u64; - - Ok(n) - } -} - -impl<S: ReadChunk> Seek for BufferedFixedReader<S> { - fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { - let new_offset = match pos { - SeekFrom::Start(start_offset) => start_offset as i64, - SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset, - SeekFrom::Current(offset) => (self.read_offset as i64) + offset, - }; - - use std::io::{Error, ErrorKind}; - if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { - return Err(Error::new( - ErrorKind::Other, - format!( - "seek is out of range {} ([0..{}])", - new_offset, self.archive_size - ), - )); - } - self.read_offset = new_offset as u64; - - Ok(self.read_offset) + let new_data = self.buffer_chunk(idx).await?; + Ok(copy_to_buf(&new_data)) } } -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async 2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter @ 2020-07-21 9:37 ` Dominik Csapak 0 siblings, 0 replies; 5+ messages in thread From: Dominik Csapak @ 2020-07-21 9:37 UTC (permalink / raw) To: pbs-devel On 7/20/20 5:02 PM, Stefan Reiter wrote: > It's currently only used in proxmox-backup-qemu, so no users in this > package need to be changed. > > This also allows simplifying the interface to just what is needed in > proxmox-backup-qemu, i.e. by making it async we need to remove the > std::io::Read trait, and that allows to get rid of the seeking > workaround. > > Cache locking is done with a reader-writer lock, though multiple > requests for one non-cached chunk are still possible (cannot hold a lock > during async HTTP request) - though in testing, performance did not > regress. > > The "sequential read" optimization is removed, since it didn't serve any > purpose AFAICT. chunk_end was not used anywhere else (gave a warning). > > Signed-off-by: Stefan Reiter <s.reiter@proxmox.com> > --- > > I'm unsure about the approach here, happy to hear feedback from anyone more > familiar with Rust. I couldn't find a way to combine the "read" trait impl with > async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only > user I don't think this is an issue, but I don't know if this interface was put > here deliberately, even if unused elsewhere. high level comment: maybe it would be nicer to implement AsyncSeek[0] for our existing AsyncIndexReader (wich also buffers a single chunk, but works for fixed + dynamic index). then we could simply keep the behaviour we have in proxmox-backup-qemu (just add '.await') for implementing this, we would have to extend the Index trait to provide a function fn get_chunk_from_offset(offset) -> (chunk_idx, offset_in_chunk) since a 'seek' would then be 'instant' (at least there is nothing we could await), we would just have a 'psuedo-async' interface to implement it would then 'magically' work also for a dynamic index alternatively, we could also implement the 'async_buffered_read' simply for our AsyncIndexReader also and still drop this here entirely more comments inline 0: https://docs.rs/tokio/0.2.9/tokio/io/trait.AsyncSeek.html > > src/backup/fixed_index.rs | 145 ++++++++++---------------------------- > 1 file changed, 37 insertions(+), 108 deletions(-) > > diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs > index 73d0dad0..3f5fde2a 100644 > --- a/src/backup/fixed_index.rs > +++ b/src/backup/fixed_index.rs > @@ -1,5 +1,6 @@ > use anyhow::{bail, format_err, Error}; > use std::io::{Seek, SeekFrom}; > +use std::cmp::min; no need to use this see comment further below > > use super::chunk_stat::*; > use super::chunk_store::*; > @@ -11,7 +12,7 @@ use std::fs::File; > use std::io::Write; > use std::os::unix::io::AsRawFd; > use std::path::{Path, PathBuf}; > -use std::sync::Arc; > +use std::sync::{Arc,RwLock}; > > use super::read_chunk::*; > use super::ChunkInfo; > @@ -146,20 +147,6 @@ impl FixedIndexReader { > Ok(()) > } > > - #[inline] > - fn chunk_end(&self, pos: usize) -> u64 { > - if pos >= self.index_length { > - panic!("chunk index out of range"); > - } > - > - let end = ((pos + 1) * self.chunk_size) as u64; > - if end > self.size { > - self.size > - } else { > - end > - } > - } > - > pub fn print_info(&self) { > println!("Size: {}", self.size); > println!("ChunkSize: {}", self.chunk_size); > @@ -466,27 +453,29 @@ impl FixedIndexWriter { > } > } > > +struct ChunkBuffer { > + data: Vec<u8>, > + chunk_idx: usize, > +} > + > pub struct BufferedFixedReader<S> { > store: S, > index: FixedIndexReader, > archive_size: u64, > - read_buffer: Vec<u8>, > - buffered_chunk_idx: usize, > - buffered_chunk_start: u64, > - read_offset: u64, > + buffer: RwLock<ChunkBuffer>, > } > > -impl<S: ReadChunk> BufferedFixedReader<S> { > +impl<S: AsyncReadChunk> BufferedFixedReader<S> { > pub fn new(index: FixedIndexReader, store: S) -> Self { > let archive_size = index.size; > Self { > store, > index, > archive_size, > - read_buffer: Vec::with_capacity(1024 * 1024), > - buffered_chunk_idx: 0, > - buffered_chunk_start: 0, > - read_offset: 0, > + buffer: RwLock::new(ChunkBuffer { > + data: Vec::with_capacity(1024 * 1024 * 4), > + chunk_idx: 0, > + }), > } > } > > @@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> { > self.archive_size > } > > - fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { > + async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> { > let index = &self.index; > let info = match index.chunk_info(idx) { > Some(info) => info, > @@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> { > > // fixme: avoid copy > > - let data = self.store.read_chunk(&info.digest)?; > + let data = self.store.read_chunk(&info.digest).await?; > let size = info.range.end - info.range.start; > if size != data.len() as u64 { > bail!("read chunk with wrong size ({} != {}", size, data.len()); > } > > - self.read_buffer.clear(); > - self.read_buffer.extend_from_slice(&data); > + let mut buffer = self.buffer.write().unwrap(); > + buffer.data.clear(); > + buffer.data.extend_from_slice(&data); > + buffer.chunk_idx = idx; > > - self.buffered_chunk_idx = idx; > - > - self.buffered_chunk_start = info.range.start as u64; > - Ok(()) > + Ok(data) > } > -} > > -impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> { > - fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { > + pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> { > if offset == self.archive_size { > - return Ok(&self.read_buffer[0..0]); > + return Ok(0); > } > > - let buffer_len = self.read_buffer.len(); > - let index = &self.index; > + let idx = (offset / self.index.chunk_size as u64) as usize; > + > + let mut copy_to_buf = move |from: &Vec<u8>| -> u64 { > + let buffer_offset = (offset % self.index.chunk_size as u64) as usize; > + let data_len = min(from.len() - buffer_offset, size); instead of using std::cmp::min, you can simply use min on a number: let data_len = size.min(from.len() - buffer_offset); > + unsafe { > + std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len); > + } here you can avoid unsafe code by writing: buf[0..data_len].copy_from_slice(from[buffer_offset..buffer_offset+data_len]); (does internally exactly the same, but checks the lengths and panics in error case instead of corrupting memory/segfaulting) > + data_len as _ > + }; > > - // optimization for sequential read > - if buffer_len > 0 > - && ((self.buffered_chunk_idx + 1) < index.index_length) > - && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) > { > - let next_idx = self.buffered_chunk_idx + 1; > - let next_end = index.chunk_end(next_idx); > - if offset < next_end { > - self.buffer_chunk(next_idx)?; > - let buffer_offset = (offset - self.buffered_chunk_start) as usize; > - return Ok(&self.read_buffer[buffer_offset..]); > + let buffer = self.buffer.read().unwrap(); > + if buffer.data.len() != 0 && buffer.chunk_idx == idx { > + return Ok(copy_to_buf(&buffer.data)); > } > } > > - if (buffer_len == 0) > - || (offset < self.buffered_chunk_start) > - || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) > - { > - let idx = (offset / index.chunk_size as u64) as usize; > - self.buffer_chunk(idx)?; > - } > - > - let buffer_offset = (offset - self.buffered_chunk_start) as usize; > - Ok(&self.read_buffer[buffer_offset..]) > - } > -} > - > -impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> { > - fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { > - use crate::tools::BufferedRead; > - use std::io::{Error, ErrorKind}; > - > - let data = match self.buffered_read(self.read_offset) { > - Ok(v) => v, > - Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), > - }; > - > - let n = if data.len() > buf.len() { > - buf.len() > - } else { > - data.len() > - }; > - > - unsafe { > - std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); > - } > - > - self.read_offset += n as u64; > - > - Ok(n) > - } > -} > - > -impl<S: ReadChunk> Seek for BufferedFixedReader<S> { > - fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { > - let new_offset = match pos { > - SeekFrom::Start(start_offset) => start_offset as i64, > - SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset, > - SeekFrom::Current(offset) => (self.read_offset as i64) + offset, > - }; > - > - use std::io::{Error, ErrorKind}; > - if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { > - return Err(Error::new( > - ErrorKind::Other, > - format!( > - "seek is out of range {} ([0..{}])", > - new_offset, self.archive_size > - ), > - )); > - } > - self.read_offset = new_offset as u64; > - > - Ok(self.read_offset) > + let new_data = self.buffer_chunk(idx).await?; > + Ok(copy_to_buf(&new_data)) > } > } > ^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API 2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter @ 2020-07-20 15:02 ` Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter 2 siblings, 0 replies; 5+ messages in thread From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw) To: pbs-devel Signed-off-by: Stefan Reiter <s.reiter@proxmox.com> --- Requires dependency bump on proxmox-backup, and while at it maybe the proxmox crate to 0.2 as well? (no issues in my testing, I've been building with 0.2.0 for a while now, since it's the one in the repos). src/restore.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/restore.rs b/src/restore.rs index 3e37066..9995743 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -1,13 +1,12 @@ use std::sync::{Arc, Mutex}; use std::collections::HashMap; -use std::io::{Read, Seek, SeekFrom}; use std::convert::TryInto; use anyhow::{format_err, bail, Error}; use once_cell::sync::OnceCell; use tokio::runtime::Runtime; -use proxmox_backup::tools::runtime::{get_runtime_with_builder, block_in_place}; +use proxmox_backup::tools::runtime::get_runtime_with_builder; use proxmox_backup::backup::*; use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader}; @@ -16,7 +15,7 @@ use crate::registry::Registry; use crate::capi_types::DataPointer; struct ImageAccessInfo { - reader: Arc<Mutex<BufferedFixedReader<RemoteChunkReader>>>, + reader: Arc<BufferedFixedReader<RemoteChunkReader>>, _archive_name: String, archive_size: u64, } @@ -236,7 +235,7 @@ impl RestoreTask { let info = ImageAccessInfo { archive_size, _archive_name: archive_name, /// useful to debug - reader: Arc::new(Mutex::new(reader)), + reader: Arc::new(reader), }; (*self.image_registry.lock().unwrap()).register(info) @@ -260,12 +259,9 @@ impl RestoreTask { bail!("read index {} out of bounds {}", offset, image_size); } - let bytes = block_in_place(|| { - let mut reader = reader.lock().unwrap(); - reader.seek(SeekFrom::Start(offset))?; - let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)}; - reader.read(buf) - })?; + let size = size as usize; + let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size)}; + let bytes = reader.async_buffered_read(buf, size, offset).await?; Ok(bytes.try_into()?) } -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available 2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter @ 2020-07-20 15:02 ` Stefan Reiter 2 siblings, 0 replies; 5+ messages in thread From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw) To: pbs-devel Signed-off-by: Stefan Reiter <s.reiter@proxmox.com> --- Sent as seperate patch for review, but can also be squashed into the PBS bdrv patch easily. block/pbs.c | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/block/pbs.c b/block/pbs.c index 1481a2bfd1..0083406ad9 100644 --- a/block/pbs.c +++ b/block/pbs.c @@ -198,7 +198,7 @@ static coroutine_fn int pbs_co_preadv(BlockDriverState *bs, QEMUIOVector *qiov, int flags) { BDRVPBSState *s = bs->opaque; - int ret; + int ret, cur = 0; char *pbs_error = NULL; uint8_t *buf = malloc(bytes); @@ -207,18 +207,31 @@ static coroutine_fn int pbs_co_preadv(BlockDriverState *bs, .ctx = qemu_get_current_aio_context(), }; - proxmox_restore_read_image_at_async(s->conn, s->aid, buf, offset, bytes, - read_callback, (void *) &rcb, &ret, &pbs_error); + /* we need to retry until we have either read all requested data or hit the + * end of the file - QEMU does not re-call this function in case the + * returned value is not equal to 'bytes', it just assumes there is no data + * to be read, which breaks unaligned reads */ + while (cur < bytes) { + proxmox_restore_read_image_at_async(s->conn, s->aid, buf + cur, offset + cur, bytes - cur, + read_callback, (void *) &rcb, &ret, &pbs_error); + qemu_coroutine_yield(); - qemu_coroutine_yield(); + if (ret < 0) { + fprintf(stderr, "error during PBS read: %s\n", pbs_error ? pbs_error : "unknown error"); + if (pbs_error) proxmox_backup_free_error(pbs_error); + free(buf); + return -EIO; + } - if (ret < 0) { - fprintf(stderr, "error during PBS read: %s\n", pbs_error ? pbs_error : "unknown error"); - if (pbs_error) proxmox_backup_free_error(pbs_error); - return -EIO; + // EOF + if (ret == 0) { + break; + } + + cur += ret; } - qemu_iovec_from_buf(qiov, 0, buf, bytes); + qemu_iovec_from_buf(qiov, 0, buf, cur); free(buf); return ret; -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2020-07-21 9:38 UTC | newest] Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter 2020-07-21 9:37 ` Dominik Csapak 2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter 2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.