From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 85F3C91C1 for ; Thu, 24 Aug 2023 15:10:03 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 65F5132A6C for ; Thu, 24 Aug 2023 15:10:03 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 24 Aug 2023 15:10:01 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 8FB6B43C0A for ; Thu, 24 Aug 2023 15:10:01 +0200 (CEST) Date: Thu, 24 Aug 2023 15:09:58 +0200 From: Wolfgang Bumiller To: Hannes Laimer Cc: pbs-devel@lists.proxmox.com Message-ID: <2komt526xjwevu4k7iicjesvomtdkontq7nbeqbpdld5bra2xk@6bbw5iimpnt4> References: <20230808121344.199500-1-h.laimer@proxmox.com> <20230808121344.199500-6-h.laimer@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20230808121344.199500-6-h.laimer@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.050 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 24 Aug 2023 13:10:03 -0000 On Tue, Aug 08, 2023 at 02:13:43PM +0200, Hannes Laimer wrote: > ... making the pull logic independent from the actual source > using two traits. > > Signed-off-by: Hannes Laimer > --- > Cargo.toml | 2 + > pbs-datastore/src/read_chunk.rs | 2 +- > src/api2/config/remote.rs | 14 +- > src/api2/pull.rs | 31 +- > src/server/pull.rs | 943 +++++++++++++++++++------------- > 5 files changed, 570 insertions(+), 422 deletions(-) > > diff --git a/src/server/pull.rs b/src/server/pull.rs > index e55452d1..e1a27a8c 100644 > --- a/src/server/pull.rs > +++ b/src/server/pull.rs > @@ -1,28 +1,26 @@ > //! Sync datastore from remote server > > use std::collections::{HashMap, HashSet}; > -use std::io::{Seek, SeekFrom}; > +use std::io::Seek; > +use std::path::Path; > use std::sync::atomic::{AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > use std::time::SystemTime; > > use anyhow::{bail, format_err, Error}; > use http::StatusCode; > -use pbs_config::CachedUserInfo; > -use serde_json::json; > - > +use proxmox_rest_server::WorkerTask; > use proxmox_router::HttpError; > -use proxmox_sys::task_log; > +use proxmox_sys::{task_log, task_warn}; > +use serde_json::json; > > use pbs_api_types::{ > - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem, > - Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, > + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter, > + GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, > PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, > }; > - > -use pbs_client::{ > - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader, > -}; > +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; > +use pbs_config::CachedUserInfo; > use pbs_datastore::data_blob::DataBlob; > use pbs_datastore::dynamic_index::DynamicIndexReader; > use pbs_datastore::fixed_index::FixedIndexReader; > @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile; > use pbs_datastore::manifest::{ > archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, > }; > +use pbs_datastore::read_chunk::AsyncReadChunk; > use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; > use pbs_tools::sha::sha256; > -use proxmox_rest_server::WorkerTask; > > use crate::backup::{check_ns_modification_privs, check_ns_privs}; > use crate::tools::parallel_handler::ParallelHandler; > > -/// Parameters for a pull operation. > -pub(crate) struct PullParameters { > - /// Remote that is pulled from > - remote: Remote, > - /// Full specification of remote datastore > - source: BackupRepository, > - /// Local store that is pulled into > +struct RemoteReader { > + backup_reader: Arc, > + dir: BackupDir, > +} > + > +pub(crate) struct PullTarget { > store: Arc, > - /// Remote namespace > - remote_ns: BackupNamespace, > - /// Local namespace (anchor) > ns: BackupNamespace, > +} > + > +pub(crate) struct RemoteSource { > + repo: BackupRepository, > + ns: BackupNamespace, > + client: HttpClient, > +} > + > +#[async_trait::async_trait] > +/// `PullSource` is a trait that provides an interface for pulling data/information from a source. > +/// The trait includes methods for listing namespaces, groups, and backup directories, > +/// as well as retrieving a reader for reading data from the source > +trait PullSource: Send + Sync { > + /// Lists namespaces from the source. > + async fn list_namespaces( > + &self, > + max_depth: &mut Option, > + worker: &WorkerTask, > + ) -> Result, Error>; > + > + /// Lists groups within a specific namespace from the source. > + async fn list_groups( > + &self, > + namespace: &BackupNamespace, > + owner: &Authid, > + ) -> Result, Error>; > + > + /// Lists backup directories for a specific group within a specific namespace from the source. > + async fn list_backup_dirs( > + &self, > + namespace: &BackupNamespace, > + group: &BackupGroup, > + worker: &WorkerTask, > + ) -> Result, Error>; > + fn get_ns(&self) -> BackupNamespace; > + fn print_store_and_ns(&self) -> String; > + > + /// Returns a reader for reading data from a specific backup directory. > + async fn reader( > + &self, > + ns: &BackupNamespace, > + dir: &BackupDir, > + ) -> Result, Error>; > +} > + > +#[async_trait::async_trait] > +impl PullSource for RemoteSource { > + async fn list_namespaces( > + &self, > + max_depth: &mut Option, > + worker: &WorkerTask, > + ) -> Result, Error> { > + if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { > + vec![self.ns.clone()]; This (still) does nothing, as mentioned in v2 ;-) > + } > + > + let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); > + let mut data = json!({}); > + if let Some(max_depth) = max_depth { > + data["max-depth"] = json!(max_depth); > + } > + > + if !self.ns.is_root() { > + data["parent"] = json!(self.ns); > + } > + self.client.login().await?; > + > + let mut result = match self.client.get(&path, Some(data)).await { > + Ok(res) => res, > + Err(err) => match err.downcast_ref::() { > + Some(HttpError { code, message }) => match code { > + &StatusCode::NOT_FOUND => { > + if self.ns.is_root() && max_depth.is_none() { > + task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); > + task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); > + max_depth.replace(0); > + } else { > + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") > + } > + > + return Ok(vec![self.ns.clone()]); > + } > + _ => { > + bail!("Querying namespaces failed - HTTP error {code} - {message}"); > + } > + }, > + None => { > + bail!("Querying namespaces failed - {err}"); > + } > + }, > + }; > + > + let list: Vec = > + serde_json::from_value::>(result["data"].take())? > + .iter() > + .map(|list_item| list_item.ns.clone()) If you're already modifying this, use .into_iter() .map(|list_item| list_item.ns) since we don't really need to clone() here > + .collect(); > + > + Ok(list) > + } > + > + async fn list_groups( > + &self, > + namespace: &BackupNamespace, > + _owner: &Authid, > + ) -> Result, Error> { > + let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); > + > + let args = if !namespace.is_root() { > + Some(json!({ "ns": namespace.clone() })) > + } else { > + None > + }; > + > + self.client.login().await?; > + let mut result = > + self.client.get(&path, args).await.map_err(|err| { > + format_err!("Failed to retrieve backup groups from remote - {}", err) > + })?; > + > + Ok( > + serde_json::from_value::>(result["data"].take()) > + .map_err(Error::from)? > + .into_iter() > + .map(|item| item.backup) > + .collect::>(), > + ) > + } > + > + async fn list_backup_dirs( > + &self, > + _namespace: &BackupNamespace, > + group: &BackupGroup, > + worker: &WorkerTask, > + ) -> Result, Error> { > + let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); > + > + let mut args = json!({ > + "backup-type": group.ty, > + "backup-id": group.id, > + }); > + > + if !self.ns.is_root() { > + args["ns"] = serde_json::to_value(&self.ns)?; > + } > + > + self.client.login().await?; > + > + let mut result = self.client.get(&path, Some(args)).await?; > + let snapshot_list: Vec = serde_json::from_value(result["data"].take())?; > + Ok(snapshot_list > + .into_iter() > + .filter_map(|item: SnapshotListItem| { > + let snapshot = item.backup; > + // in-progress backups can't be synced > + if item.size.is_none() { > + task_log!( > + worker, > + "skipping snapshot {} - in-progress backup", > + snapshot > + ); > + return None; > + } > + > + Some(snapshot) > + }) > + .collect::>()) > + } > + > + fn get_ns(&self) -> BackupNamespace { > + self.ns.clone() > + } > + > + fn print_store_and_ns(&self) -> String { > + print_store_and_ns(self.repo.store(), &self.ns) > + } > + > + async fn reader( > + &self, > + ns: &BackupNamespace, > + dir: &BackupDir, > + ) -> Result, Error> { > + let backup_reader = > + BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; > + Ok(Arc::new(RemoteReader { > + backup_reader, > + dir: dir.clone(), > + })) > + } > +} > + > +#[async_trait::async_trait] > +/// `PullReader` is a trait that provides an interface for reading data from a source. > +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. > +trait PullReader: Send + Sync { > + /// Returns a chunk reader with the specified encryption mode. > + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc; > + > + /// Asynchronously loads a file from the source into a local file. > + /// `filename` is the name of the file to load from the source. > + /// `into` is the path of the local file to load the source file into. > + async fn load_file_into( > + &self, > + filename: &str, > + into: &Path, > + worker: &WorkerTask, > + ) -> Result, Error>; > + > + /// Tries to download the client log from the source and save it into a local file. > + async fn try_download_client_log( > + &self, > + to_path: &Path, > + worker: &WorkerTask, > + ) -> Result<(), Error>; > + > + fn skip_chunk_sync(&self, target_store_name: &str) -> bool; > +} > + > +#[async_trait::async_trait] > +impl PullReader for RemoteReader { > + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { > + Arc::new(RemoteChunkReader::new( > + self.backup_reader.clone(), > + None, > + crypt_mode, > + HashMap::new(), > + )) > + } > + > + async fn load_file_into( > + &self, > + filename: &str, > + into: &Path, > + worker: &WorkerTask, > + ) -> Result, Error> { > + let mut tmp_file = std::fs::OpenOptions::new() > + .write(true) > + .create(true) > + .truncate(true) > + .read(true) > + .open(into)?; > + let download_result = self.backup_reader.download(filename, &mut tmp_file).await; > + if let Err(err) = download_result { > + match err.downcast_ref::() { > + Some(HttpError { code, message }) => match *code { > + StatusCode::NOT_FOUND => { > + task_log!( > + worker, > + "skipping snapshot {} - vanished since start of sync", > + &self.dir, > + ); > + return Ok(None); > + } > + _ => { > + bail!("HTTP error {code} - {message}"); > + } > + }, > + None => { > + return Err(err); > + } > + }; > + }; > + tmp_file.rewind()?; > + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) > + } > + > + async fn try_download_client_log( > + &self, > + to_path: &Path, > + worker: &WorkerTask, > + ) -> Result<(), Error> { > + let mut tmp_path = to_path.to_owned(); > + tmp_path.set_extension("tmp"); > + > + let tmpfile = std::fs::OpenOptions::new() > + .write(true) > + .create(true) > + .read(true) > + .open(&tmp_path)?; > + > + // Note: be silent if there is no log - only log successful download > + if let Ok(()) = self > + .backup_reader > + .download(CLIENT_LOG_BLOB_NAME, tmpfile) > + .await > + { > + if let Err(err) = std::fs::rename(&tmp_path, to_path) { > + bail!("Atomic rename file {:?} failed - {}", to_path, err); > + } > + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); > + } > + > + Ok(()) > + } > + > + fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { > + false > + } > +} > + > +/// Parameters for a pull operation. > +pub(crate) struct PullParameters { > + /// Where data is pulled from > + source: Arc, > + /// Where data should be pulled into > + target: PullTarget, > /// Owner of synced groups (needs to match local owner of pre-existing groups) > owner: Authid, > /// Whether to remove groups which exist locally, but not on the remote end > @@ -57,22 +357,16 @@ pub(crate) struct PullParameters { > max_depth: Option, > /// Filters for reducing the pull scope > group_filter: Option>, > - /// Rate limits for all transfers from `remote` > - limit: RateLimitConfig, > /// How many snapshots should be transferred at most (taking the newest N snapshots) > transfer_last: Option, > } > > impl PullParameters { > /// Creates a new instance of `PullParameters`. > - /// > - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a > - /// [BackupRepository] with `remote_store`. > - #[allow(clippy::too_many_arguments)] > pub(crate) fn new( > store: &str, > ns: BackupNamespace, > - remote: &str, > + remote: Option<&str>, > remote_store: &str, > remote_ns: BackupNamespace, > owner: Authid, > @@ -82,49 +376,56 @@ impl PullParameters { > limit: RateLimitConfig, > transfer_last: Option, > ) -> Result { > - let store = DataStore::lookup_datastore(store, Some(Operation::Write))?; > - > if let Some(max_depth) = max_depth { > ns.check_max_depth(max_depth)?; > remote_ns.check_max_depth(max_depth)?; > - } > - > - let (remote_config, _digest) = pbs_config::remote::config()?; > - let remote: Remote = remote_config.lookup("remote", remote)?; > - > + }; > let remove_vanished = remove_vanished.unwrap_or(false); > > - let source = BackupRepository::new( > - Some(remote.config.auth_id.clone()), > - Some(remote.config.host.clone()), > - remote.config.port, > - remote_store.to_string(), > - ); > + let source: Arc = if let Some(remote) = remote { > + let (remote_config, _digest) = pbs_config::remote::config()?; > + let remote: Remote = remote_config.lookup("remote", remote)?; > > - Ok(Self { > - remote, > - remote_ns, > + let repo = BackupRepository::new( > + Some(remote.config.auth_id.clone()), > + Some(remote.config.host.clone()), > + remote.config.port, > + remote_store.to_string(), > + ); > + let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?; > + Arc::new(RemoteSource { > + repo, > + ns: remote_ns, > + client, > + }) > + } else { > + bail!("local sync not implemented yet") > + }; > + let target = PullTarget { > + store: DataStore::lookup_datastore(store, Some(Operation::Write))?, > ns, > + }; > + > + Ok(Self { > source, > - store, > + target, > owner, > remove_vanished, > max_depth, > group_filter, > - limit, > transfer_last, > }) > } > > - /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from. > - pub async fn client(&self) -> Result { > - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await > + pub(crate) fn get_target_ns(&self) -> Result { > + let source_ns = self.source.get_ns(); > + source_ns.map_prefix(&source_ns, &self.target.ns) ^ This code is still weird, again, as already mentioned in v2 > } > } > > async fn pull_index_chunks( > worker: &WorkerTask, > - chunk_reader: RemoteChunkReader, > + chunk_reader: Arc, > target: Arc, > index: I, > downloaded_chunks: Arc>>, > @@ -215,26 +516,6 @@ async fn pull_index_chunks( > Ok(()) > } > > -async fn download_manifest( > - reader: &BackupReader, > - filename: &std::path::Path, > -) -> Result { > - let mut tmp_manifest_file = std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .truncate(true) > - .read(true) > - .open(filename)?; > - > - reader > - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) > - .await?; > - > - tmp_manifest_file.seek(SeekFrom::Start(0))?; > - > - Ok(tmp_manifest_file) > -} > - > fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { > if size != info.size { > bail!( > @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err > /// Pulls a single file referenced by a manifest. > /// > /// Pulling an archive consists of the following steps: > -/// - Create tmp file for archive > -/// - Download archive file into tmp file > -/// - Verify tmp file checksum > +/// - Load archive file into tmp file > +/// -- Load file into tmp file > +/// -- Verify tmp file checksum > /// - if archive is an index, pull referenced chunks > /// - Rename tmp file into real path > -async fn pull_single_archive( > - worker: &WorkerTask, > - reader: &BackupReader, > - chunk_reader: &mut RemoteChunkReader, > - snapshot: &pbs_datastore::BackupDir, > - archive_info: &FileInfo, > +async fn pull_single_archive<'a>( > + worker: &'a WorkerTask, > + reader: Arc, > + snapshot: &'a pbs_datastore::BackupDir, > + archive_info: &'a FileInfo, > downloaded_chunks: Arc>>, > ) -> Result<(), Error> { > let archive_name = &archive_info.filename; > @@ -277,13 +557,11 @@ async fn pull_single_archive( > > task_log!(worker, "sync archive {}", archive_name); > > - let mut tmpfile = std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .read(true) > - .open(&tmp_path)?; > + reader > + .load_file_into(archive_name, &tmp_path, worker) > + .await?; > > - reader.download(archive_name, &mut tmpfile).await?; > + let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?; > > match archive_type(archive_name)? { > ArchiveType::DynamicIndex => { > @@ -293,14 +571,18 @@ async fn pull_single_archive( > let (csum, size) = index.compute_csum(); > verify_archive(archive_info, &csum, size)?; > > - pull_index_chunks( > - worker, > - chunk_reader.clone(), > - snapshot.datastore().clone(), > - index, > - downloaded_chunks, > - ) > - .await?; > + if reader.skip_chunk_sync(snapshot.datastore().name()) { > + task_log!(worker, "skipping chunk sync for same datatsore"); The t<->s typo is still there, as mentioned in v2. > + } else { > + pull_index_chunks( > + worker, > + reader.chunk_reader(archive_info.crypt_mode), > + snapshot.datastore().clone(), > + index, > + downloaded_chunks, > + ) > + .await?; > + } > } > ArchiveType::FixedIndex => { > let index = FixedIndexReader::new(tmpfile).map_err(|err| { > @@ -309,17 +591,21 @@ async fn pull_single_archive( > let (csum, size) = index.compute_csum(); > verify_archive(archive_info, &csum, size)?; > > - pull_index_chunks( > - worker, > - chunk_reader.clone(), > - snapshot.datastore().clone(), > - index, > - downloaded_chunks, > - ) > - .await?; > + if reader.skip_chunk_sync(snapshot.datastore().name()) { > + task_log!(worker, "skipping chunk sync for same datatsore"); The t<->s typo is still there, as mentioned in v2. > + } else { > + pull_index_chunks( > + worker, > + reader.chunk_reader(archive_info.crypt_mode), > + snapshot.datastore().clone(), > + index, > + downloaded_chunks, > + ) > + .await?; > + } > } > ArchiveType::Blob => { > - tmpfile.seek(SeekFrom::Start(0))?; > + tmpfile.rewind()?; > let (csum, size) = sha256(&mut tmpfile)?; > verify_archive(archive_info, &csum, size)?; > }