From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 21B651FF163 for ; Thu, 12 Sep 2024 16:34:29 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6DFC13483A; Thu, 12 Sep 2024 16:34:30 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 12 Sep 2024 16:32:52 +0200 Message-Id: <20240912143322.548839-4-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240912143322.548839-1-c.ebner@proxmox.com> References: <20240912143322.548839-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.022 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 proxmox-backup 03/33] server: sync: move reader trait to common sync module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Move the `PullReader` trait and the types implementing it to the common sync module, so this can be reused for the push direction variant for a sync job as well. Adapt the naming to be more ambiguous by renaming `PullReader` trait to `SyncSourceReader`, `LocalReader` to `LocalSourceReader` and `RemoteReader` to `RemoteSourceReader`. Signed-off-by: Christian Ebner --- changes since version 2: - no changes src/server/pull.rs | 167 +++++---------------------------------------- src/server/sync.rs | 152 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 151 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 4a97bfaa3..20b5d9af8 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -1,8 +1,7 @@ //! Sync datastore by pulling contents from remote server -use std::collections::{HashMap, HashSet}; -use std::io::{Seek, Write}; -use std::path::{Path, PathBuf}; +use std::collections::HashSet; +use std::io::Seek; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; @@ -15,11 +14,11 @@ use serde_json::json; use tracing::{info, warn}; use pbs_api_types::{ - print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter, + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, }; -use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; +use pbs_client::{BackupReader, BackupRepository, HttpClient}; use pbs_config::CachedUserInfo; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; @@ -29,26 +28,15 @@ use pbs_datastore::manifest::{ ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, }; use pbs_datastore::read_chunk::AsyncReadChunk; -use pbs_datastore::{ - check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress, -}; +use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress}; use pbs_tools::sha::sha256; -use super::sync::{RemovedVanishedStats, SyncStats}; +use super::sync::{ + LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats, +}; use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; use crate::tools::parallel_handler::ParallelHandler; -struct RemoteReader { - backup_reader: Arc, - dir: BackupDir, -} - -struct LocalReader { - _dir_lock: Arc>, - path: PathBuf, - datastore: Arc, -} - pub(crate) struct PullTarget { store: Arc, ns: BackupNamespace, @@ -97,7 +85,7 @@ trait PullSource: Send + Sync { &self, ns: &BackupNamespace, dir: &BackupDir, - ) -> Result, Error>; + ) -> Result, Error>; } #[async_trait::async_trait] @@ -230,7 +218,7 @@ impl PullSource for RemoteSource { &self, ns: &BackupNamespace, dir: &BackupDir, - ) -> Result, Error> { + ) -> Result, Error> { let backup_reader = BackupReader::start( &self.client, None, @@ -240,7 +228,7 @@ impl PullSource for RemoteSource { tracing::enabled!(tracing::Level::DEBUG), ) .await?; - Ok(Arc::new(RemoteReader { + Ok(Arc::new(RemoteSourceReader { backup_reader, dir: dir.clone(), })) @@ -305,14 +293,14 @@ impl PullSource for LocalSource { &self, ns: &BackupNamespace, dir: &BackupDir, - ) -> Result, Error> { + ) -> Result, Error> { let dir = self.store.backup_dir(ns.clone(), dir.clone())?; let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared( &dir.full_path(), "snapshot", "locked by another operation", )?; - Ok(Arc::new(LocalReader { + Ok(Arc::new(LocalSourceReader { _dir_lock: Arc::new(Mutex::new(dir_lock)), path: dir.full_path(), datastore: dir.datastore().clone(), @@ -320,129 +308,6 @@ impl PullSource for LocalSource { } } -#[async_trait::async_trait] -/// `PullReader` is a trait that provides an interface for reading data from a source. -/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. -trait PullReader: Send + Sync { - /// Returns a chunk reader with the specified encryption mode. - fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc; - - /// Asynchronously loads a file from the source into a local file. - /// `filename` is the name of the file to load from the source. - /// `into` is the path of the local file to load the source file into. - async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error>; - - /// Tries to download the client log from the source and save it into a local file. - async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>; - - fn skip_chunk_sync(&self, target_store_name: &str) -> bool; -} - -#[async_trait::async_trait] -impl PullReader for RemoteReader { - fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { - Arc::new(RemoteChunkReader::new( - self.backup_reader.clone(), - None, - crypt_mode, - HashMap::new(), - )) - } - - async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error> { - let mut tmp_file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .read(true) - .open(into)?; - let download_result = self.backup_reader.download(filename, &mut tmp_file).await; - if let Err(err) = download_result { - match err.downcast_ref::() { - Some(HttpError { code, message }) => match *code { - StatusCode::NOT_FOUND => { - info!( - "skipping snapshot {} - vanished since start of sync", - &self.dir, - ); - return Ok(None); - } - _ => { - bail!("HTTP error {code} - {message}"); - } - }, - None => { - return Err(err); - } - }; - }; - tmp_file.rewind()?; - Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) - } - - async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> { - let mut tmp_path = to_path.to_owned(); - tmp_path.set_extension("tmp"); - - let tmpfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .read(true) - .open(&tmp_path)?; - - // Note: be silent if there is no log - only log successful download - if let Ok(()) = self - .backup_reader - .download(CLIENT_LOG_BLOB_NAME, tmpfile) - .await - { - if let Err(err) = std::fs::rename(&tmp_path, to_path) { - bail!("Atomic rename file {:?} failed - {}", to_path, err); - } - info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"); - } - - Ok(()) - } - - fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { - false - } -} - -#[async_trait::async_trait] -impl PullReader for LocalReader { - fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { - Arc::new(LocalChunkReader::new( - self.datastore.clone(), - None, - crypt_mode, - )) - } - - async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error> { - let mut tmp_file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .read(true) - .open(into)?; - let mut from_path = self.path.clone(); - from_path.push(filename); - tmp_file.write_all(std::fs::read(from_path)?.as_slice())?; - tmp_file.rewind()?; - Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) - } - - async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> { - Ok(()) - } - - fn skip_chunk_sync(&self, target_store_name: &str) -> bool { - self.datastore.name() == target_store_name - } -} - /// Parameters for a pull operation. pub(crate) struct PullParameters { /// Where data is pulled from @@ -650,7 +515,7 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err /// - if archive is an index, pull referenced chunks /// - Rename tmp file into real path async fn pull_single_archive<'a>( - reader: Arc, + reader: Arc, snapshot: &'a pbs_datastore::BackupDir, archive_info: &'a FileInfo, downloaded_chunks: Arc>>, @@ -733,7 +598,7 @@ async fn pull_single_archive<'a>( /// -- if not, pull it from the remote /// - Download log if not already existing async fn pull_snapshot<'a>( - reader: Arc, + reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result { @@ -844,7 +709,7 @@ async fn pull_snapshot<'a>( /// The `reader` is configured to read from the source backup directory, while the /// `snapshot` is pointing to the local datastore and target namespace. async fn pull_snapshot_from<'a>( - reader: Arc, + reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result { diff --git a/src/server/sync.rs b/src/server/sync.rs index 5f143ef63..323bc1a27 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -1,7 +1,24 @@ //! Sync datastore contents from source to target, either in push or pull direction +use std::collections::HashMap; +use std::io::{Seek, Write}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use std::time::Duration; +use anyhow::{bail, Error}; +use http::StatusCode; +use tracing::info; + +use proxmox_router::HttpError; + +use pbs_api_types::{BackupDir, CryptMode}; +use pbs_client::{BackupReader, RemoteChunkReader}; +use pbs_datastore::data_blob::DataBlob; +use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME; +use pbs_datastore::read_chunk::AsyncReadChunk; +use pbs_datastore::{DataStore, LocalChunkReader}; + #[derive(Default)] pub(crate) struct RemovedVanishedStats { pub(crate) groups: usize, @@ -49,3 +66,138 @@ impl SyncStats { } } } + +#[async_trait::async_trait] +/// `SyncReader` is a trait that provides an interface for reading data from a source. +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, +/// and checking whether chunk sync should be skipped. +pub(crate) trait SyncSourceReader: Send + Sync { + /// Returns a chunk reader with the specified encryption mode. + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc; + + /// Asynchronously loads a file from the source into a local file. + /// `filename` is the name of the file to load from the source. + /// `into` is the path of the local file to load the source file into. + async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error>; + + /// Tries to download the client log from the source and save it into a local file. + async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>; + + fn skip_chunk_sync(&self, target_store_name: &str) -> bool; +} + +pub(crate) struct RemoteSourceReader { + pub(crate) backup_reader: Arc, + pub(crate) dir: BackupDir, +} + +pub(crate) struct LocalSourceReader { + pub(crate) _dir_lock: Arc>, + pub(crate) path: PathBuf, + pub(crate) datastore: Arc, +} + +#[async_trait::async_trait] +impl SyncSourceReader for RemoteSourceReader { + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { + Arc::new(RemoteChunkReader::new( + self.backup_reader.clone(), + None, + crypt_mode, + HashMap::new(), + )) + } + + async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error> { + let mut tmp_file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .read(true) + .open(into)?; + let download_result = self.backup_reader.download(filename, &mut tmp_file).await; + if let Err(err) = download_result { + match err.downcast_ref::() { + Some(HttpError { code, message }) => match *code { + StatusCode::NOT_FOUND => { + info!( + "skipping snapshot {} - vanished since start of sync", + &self.dir + ); + return Ok(None); + } + _ => { + bail!("HTTP error {code} - {message}"); + } + }, + None => { + return Err(err); + } + }; + }; + tmp_file.rewind()?; + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) + } + + async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> { + let mut tmp_path = to_path.to_owned(); + tmp_path.set_extension("tmp"); + + let tmpfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .read(true) + .open(&tmp_path)?; + + // Note: be silent if there is no log - only log successful download + if let Ok(()) = self + .backup_reader + .download(CLIENT_LOG_BLOB_NAME, tmpfile) + .await + { + if let Err(err) = std::fs::rename(&tmp_path, to_path) { + bail!("Atomic rename file {to_path:?} failed - {err}"); + } + info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"); + } + + Ok(()) + } + + fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { + false + } +} + +#[async_trait::async_trait] +impl SyncSourceReader for LocalSourceReader { + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { + Arc::new(LocalChunkReader::new( + self.datastore.clone(), + None, + crypt_mode, + )) + } + + async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error> { + let mut tmp_file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .read(true) + .open(into)?; + let mut from_path = self.path.clone(); + from_path.push(filename); + tmp_file.write_all(std::fs::read(from_path)?.as_slice())?; + tmp_file.rewind()?; + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) + } + + async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> { + Ok(()) + } + + fn skip_chunk_sync(&self, target_store_name: &str) -> bool { + self.datastore.name() == target_store_name + } +} -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel