From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id D88416317F for ; Tue, 9 Feb 2021 13:04:32 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CCEE12CAA3 for ; Tue, 9 Feb 2021 13:04:02 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 0B0BF2CA91 for ; Tue, 9 Feb 2021 13:04:00 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id C867F429FE for ; Tue, 9 Feb 2021 13:03:59 +0100 (CET) From: Stefan Reiter To: pbs-devel@lists.proxmox.com Date: Tue, 9 Feb 2021 13:03:48 +0100 Message-Id: <20210209120348.8359-3-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210209120348.8359-1-s.reiter@proxmox.com> References: <20210209120348.8359-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.036 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [pxar.rs, catar.rs, create.rs] Subject: [pbs-devel] [PATCH proxmox-backup 2/2] asyncify pxar create_archive X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 09 Feb 2021 12:04:32 -0000 ...to take advantage of the aio::Encoder from the pxar create. Rather straightforward conversion, but does require getting rid of references in the Archiver struct, and thus has to be given the Mutex for the catalog directly. The callback is boxed. archive_dir_contents can call itself recursively, and thus needs to return a boxed future. Users are adjusted, namely PxarBackupStream is converted to use an Abortable future instead of a thread so it supports async in its handler function, and the pxar bin create_archive is converted to an async API function. One test case is made to just use 'block_on'. Signed-off-by: Stefan Reiter --- Requires updated pxar crate. Long patch, but a lot of changes are just -call(); +call().await; or using the catalog mutex. Probably looks better with -w src/bin/pxar.rs | 6 +- src/client/pxar_backup_stream.rs | 65 +++++----- src/pxar/create.rs | 207 ++++++++++++++++--------------- tests/catar.rs | 5 +- 4 files changed, 143 insertions(+), 140 deletions(-) diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs index 814b3346..d830c570 100644 --- a/src/bin/pxar.rs +++ b/src/bin/pxar.rs @@ -295,7 +295,7 @@ fn extract_archive( )] /// Create a new .pxar archive. #[allow(clippy::too_many_arguments)] -fn create_archive( +async fn create_archive( archive: String, source: String, verbose: bool, @@ -376,7 +376,7 @@ fn create_archive( dir, writer, feature_flags, - |path| { + move |path| { if verbose { println!("{:?}", path); } @@ -384,7 +384,7 @@ fn create_archive( }, None, options, - )?; + ).await?; Ok(()) } diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs index 5fb28fd5..b57061a3 100644 --- a/src/client/pxar_backup_stream.rs +++ b/src/client/pxar_backup_stream.rs @@ -4,10 +4,10 @@ use std::path::Path; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use std::thread; use anyhow::{format_err, Error}; use futures::stream::Stream; +use futures::future::{Abortable, AbortHandle}; use nix::dir::Dir; use nix::fcntl::OFlag; use nix::sys::stat::Mode; @@ -21,14 +21,14 @@ use crate::backup::CatalogWriter; /// consumer. pub struct PxarBackupStream { rx: Option, Error>>>, - child: Option>, + handle: Option, error: Arc>>, } impl Drop for PxarBackupStream { fn drop(&mut self) { self.rx = None; - self.child.take().unwrap().join().unwrap(); + self.handle.take().unwrap().abort(); } } @@ -43,42 +43,41 @@ impl PxarBackupStream { let buffer_size = 256 * 1024; let error = Arc::new(Mutex::new(None)); - let child = std::thread::Builder::new() - .name("PxarBackupStream".to_string()) - .spawn({ - let error = Arc::clone(&error); - move || { - let mut catalog_guard = catalog.lock().unwrap(); - let writer = std::io::BufWriter::with_capacity( - buffer_size, - crate::tools::StdChannelWriter::new(tx), - ); + let error2 = Arc::clone(&error); + let handler = async move { + let writer = std::io::BufWriter::with_capacity( + buffer_size, + crate::tools::StdChannelWriter::new(tx), + ); - let verbose = options.verbose; + let verbose = options.verbose; - let writer = pxar::encoder::sync::StandardWriter::new(writer); - if let Err(err) = crate::pxar::create_archive( - dir, - writer, - crate::pxar::Flags::DEFAULT, - |path| { - if verbose { - println!("{:?}", path); - } - Ok(()) - }, - Some(&mut *catalog_guard), - options, - ) { - let mut error = error.lock().unwrap(); - *error = Some(err.to_string()); + let writer = pxar::encoder::sync::StandardWriter::new(writer); + if let Err(err) = crate::pxar::create_archive( + dir, + writer, + crate::pxar::Flags::DEFAULT, + move |path| { + if verbose { + println!("{:?}", path); } - } - })?; + Ok(()) + }, + Some(catalog), + options, + ).await { + let mut error = error2.lock().unwrap(); + *error = Some(err.to_string()); + } + }; + + let (handle, registration) = AbortHandle::new_pair(); + let future = Abortable::new(handler, registration); + tokio::spawn(future); Ok(Self { rx: Some(rx), - child: Some(child), + handle: Some(handle), error, }) } diff --git a/src/pxar/create.rs b/src/pxar/create.rs index 36de87da..6950b396 100644 --- a/src/pxar/create.rs +++ b/src/pxar/create.rs @@ -5,16 +5,19 @@ use std::io::{self, Read, Write}; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use nix::dir::Dir; use nix::errno::Errno; use nix::fcntl::OFlag; use nix::sys::stat::{FileStat, Mode}; +use futures::future::BoxFuture; +use futures::FutureExt; use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag}; use pxar::Metadata; -use pxar::encoder::LinkOffset; +use pxar::encoder::{SeqWrite, LinkOffset}; use proxmox::c_str; use proxmox::sys::error::SysError; @@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter { } } -struct Archiver<'a, 'b> { +struct Archiver { feature_flags: Flags, fs_feature_flags: Flags, fs_magic: i64, patterns: Vec, - callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>, - catalog: Option<&'b mut dyn BackupCatalogWriter>, + callback: Box Result<(), Error> + Send>, + catalog: Option>>, path: PathBuf, entry_counter: usize, entry_limit: usize, @@ -147,19 +150,19 @@ struct Archiver<'a, 'b> { file_copy_buffer: Vec, } -type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>; +type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; -pub fn create_archive( +pub async fn create_archive( source_dir: Dir, mut writer: T, feature_flags: Flags, - mut callback: F, - catalog: Option<&mut dyn BackupCatalogWriter>, + callback: F, + catalog: Option>>, options: PxarCreateOptions, ) -> Result<(), Error> where - T: pxar::encoder::SeqWrite, - F: FnMut(&Path) -> Result<(), Error>, + T: SeqWrite + Send, + F: FnMut(&Path) -> Result<(), Error> + Send + 'static, { let fs_magic = detect_fs_type(source_dir.as_raw_fd())?; if is_virtual_file_system(fs_magic) { @@ -182,8 +185,7 @@ where set.insert(stat.st_dev); } - let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite; - let mut encoder = Encoder::new(writer, &metadata)?; + let mut encoder = Encoder::new(&mut writer, &metadata).await?; let mut patterns = options.patterns; @@ -199,7 +201,7 @@ where feature_flags, fs_feature_flags, fs_magic, - callback: &mut callback, + callback: Box::new(callback), patterns, catalog, path: PathBuf::new(), @@ -213,8 +215,8 @@ where file_copy_buffer: vec::undefined(4 * 1024 * 1024), }; - archiver.archive_dir_contents(&mut encoder, source_dir, true)?; - encoder.finish()?; + archiver.archive_dir_contents(&mut encoder, source_dir, true).await?; + encoder.finish().await?; Ok(()) } @@ -224,7 +226,7 @@ struct FileListEntry { stat: FileStat, } -impl<'a, 'b> Archiver<'a, 'b> { +impl Archiver { /// Get the currently effective feature flags. (Requested flags masked by the file system /// feature flags). fn flags(&self) -> Flags { @@ -239,49 +241,51 @@ impl<'a, 'b> Archiver<'a, 'b> { } } - fn archive_dir_contents( - &mut self, - encoder: &mut Encoder, + fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>( + &'a mut self, + encoder: &'a mut Encoder<'b, T>, mut dir: Dir, is_root: bool, - ) -> Result<(), Error> { - let entry_counter = self.entry_counter; + ) -> BoxFuture<'a, Result<(), Error>> { + async move { + let entry_counter = self.entry_counter; - let old_patterns_count = self.patterns.len(); - self.read_pxar_excludes(dir.as_raw_fd())?; + let old_patterns_count = self.patterns.len(); + self.read_pxar_excludes(dir.as_raw_fd())?; - let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?; + let mut file_list = self.generate_directory_file_list(&mut dir, is_root)?; - if is_root && old_patterns_count > 0 { - file_list.push(FileListEntry { - name: CString::new(".pxarexclude-cli").unwrap(), - path: PathBuf::new(), - stat: unsafe { std::mem::zeroed() }, - }); - } - - let dir_fd = dir.as_raw_fd(); - - let old_path = std::mem::take(&mut self.path); - - for file_entry in file_list { - let file_name = file_entry.name.to_bytes(); - - if is_root && file_name == b".pxarexclude-cli" { - self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?; - continue; + if is_root && old_patterns_count > 0 { + file_list.push(FileListEntry { + name: CString::new(".pxarexclude-cli").unwrap(), + path: PathBuf::new(), + stat: unsafe { std::mem::zeroed() }, + }); } - (self.callback)(&file_entry.path)?; - self.path = file_entry.path; - self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat) - .map_err(|err| self.wrap_err(err))?; - } - self.path = old_path; - self.entry_counter = entry_counter; - self.patterns.truncate(old_patterns_count); + let dir_fd = dir.as_raw_fd(); - Ok(()) + let old_path = std::mem::take(&mut self.path); + + for file_entry in file_list { + let file_name = file_entry.name.to_bytes(); + + if is_root && file_name == b".pxarexclude-cli" { + self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?; + continue; + } + + (self.callback)(&file_entry.path)?; + self.path = file_entry.path; + self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await + .map_err(|err| self.wrap_err(err))?; + } + self.path = old_path; + self.entry_counter = entry_counter; + self.patterns.truncate(old_patterns_count); + + Ok(()) + }.boxed() } /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`. @@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> { Ok(()) } - fn encode_pxarexclude_cli( + async fn encode_pxarexclude_cli( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, file_name: &CStr, patterns_count: usize, ) -> Result<(), Error> { let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]); - - if let Some(ref mut catalog) = self.catalog { - catalog.add_file(file_name, content.len() as u64, 0)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?; } let mut metadata = Metadata::default(); metadata.stat.mode = pxar::format::mode::IFREG | 0o600; - let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?; - file.write_all(&content)?; + let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?; + file.write_all(&content).await?; Ok(()) } @@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> { Ok(()) } - fn add_entry( + async fn add_entry( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, parent: RawFd, c_file_name: &CStr, stat: &FileStat, @@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> { if stat.st_nlink > 1 { if let Some((path, offset)) = self.hardlinks.get(&link_info) { - if let Some(ref mut catalog) = self.catalog { - catalog.add_hardlink(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_hardlink(c_file_name)?; } - encoder.add_hardlink(file_name, path, *offset)?; + encoder.add_hardlink(file_name, path, *offset).await?; return Ok(()); } } let file_size = stat.st_size as u64; - if let Some(ref mut catalog) = self.catalog { - catalog.add_file(c_file_name, file_size, stat.st_mtime)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?; } let offset: LinkOffset = - self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?; + self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?; if stat.st_nlink > 1 { self.hardlinks.insert(link_info, (self.path.clone(), offset)); @@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> { mode::IFDIR => { let dir = Dir::from_fd(fd.into_raw_fd())?; - if let Some(ref mut catalog) = self.catalog { - catalog.start_directory(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().start_directory(c_file_name)?; } - let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat); - if let Some(ref mut catalog) = self.catalog { - catalog.end_directory()?; + let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().end_directory()?; } result } mode::IFSOCK => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_socket(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_socket(c_file_name)?; } - Ok(encoder.add_socket(&metadata, file_name)?) + Ok(encoder.add_socket(&metadata, file_name).await?) } mode::IFIFO => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_fifo(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_fifo(c_file_name)?; } - Ok(encoder.add_fifo(&metadata, file_name)?) + Ok(encoder.add_fifo(&metadata, file_name).await?) } mode::IFLNK => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_symlink(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_symlink(c_file_name)?; } - self.add_symlink(encoder, fd, file_name, &metadata) + self.add_symlink(encoder, fd, file_name, &metadata).await } mode::IFBLK => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_block_device(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_block_device(c_file_name)?; } - self.add_device(encoder, file_name, &metadata, &stat) + self.add_device(encoder, file_name, &metadata, &stat).await } mode::IFCHR => { - if let Some(ref mut catalog) = self.catalog { - catalog.add_char_device(c_file_name)?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_char_device(c_file_name)?; } - self.add_device(encoder, file_name, &metadata, &stat) + self.add_device(encoder, file_name, &metadata, &stat).await } other => bail!( "encountered unknown file type: 0x{:x} (0o{:o})", @@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> { } } - fn add_directory( + async fn add_directory( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, dir: Dir, dir_name: &CStr, metadata: &Metadata, @@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> { ) -> Result<(), Error> { let dir_name = OsStr::from_bytes(dir_name.to_bytes()); - let mut encoder = encoder.create_directory(dir_name, &metadata)?; + let mut encoder = encoder.create_directory(dir_name, &metadata).await?; let old_fs_magic = self.fs_magic; let old_fs_feature_flags = self.fs_feature_flags; @@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> { writeln!(self.logger, "skipping mount point: {:?}", self.path)?; Ok(()) } else { - self.archive_dir_contents(&mut encoder, dir, false) + self.archive_dir_contents(&mut encoder, dir, false).await }; self.fs_magic = old_fs_magic; self.fs_feature_flags = old_fs_feature_flags; self.current_st_dev = old_st_dev; - encoder.finish()?; + encoder.finish().await?; result } - fn add_regular_file( + async fn add_regular_file( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, fd: Fd, file_name: &Path, metadata: &Metadata, @@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> { ) -> Result { let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) }; let mut remaining = file_size; - let mut out = encoder.create_file(metadata, file_name, file_size)?; + let mut out = encoder.create_file(metadata, file_name, file_size).await?; while remaining != 0 { let mut got = match file.read(&mut self.file_copy_buffer[..]) { Ok(0) => break, @@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> { self.report_file_grew_while_reading()?; got = remaining as usize; } - out.write_all(&self.file_copy_buffer[..got])?; + out.write_all(&self.file_copy_buffer[..got]).await?; remaining -= got as u64; } if remaining > 0 { @@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> { vec::clear(&mut self.file_copy_buffer[..to_zero]); while remaining != 0 { let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize; - out.write_all(&self.file_copy_buffer[..fill])?; + out.write_all(&self.file_copy_buffer[..fill]).await?; remaining -= fill as u64; } } @@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> { Ok(out.file_offset()) } - fn add_symlink( + async fn add_symlink( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, fd: Fd, file_name: &Path, metadata: &Metadata, ) -> Result<(), Error> { let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?; - encoder.add_symlink(metadata, file_name, dest)?; + encoder.add_symlink(metadata, file_name, dest).await?; Ok(()) } - fn add_device( + async fn add_device( &mut self, - encoder: &mut Encoder, + encoder: &mut Encoder<'_, T>, file_name: &Path, metadata: &Metadata, stat: &FileStat, @@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> { metadata, file_name, pxar::format::Device::from_dev_t(stat.st_rdev), - )?) + ).await?) } } diff --git a/tests/catar.rs b/tests/catar.rs index 2d9dea71..550600c6 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> { ..PxarCreateOptions::default() }; - create_archive( + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(create_archive( dir, writer, Flags::DEFAULT, |_| Ok(()), None, options, - )?; + ))?; Command::new("cmp") .arg("--verbose") -- 2.20.1