From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module
Date: Mon, 15 Jul 2024 12:15:45 +0200 [thread overview]
Message-ID: <20240715101602.274244-8-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240715101602.274244-1-c.ebner@proxmox.com>
Move the `PullReader` trait and the types implementing it to the
common sync module, so this can be reused for the push direction
variant for a sync job as well.
Adapt the naming to be more ambiguous by renaming `PullReader` trait to
`SyncSourceReader`, `LocalReader` to `LocalSourceReader` and
`RemoteReader` to `RemoteSourceReader`.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 167 +++++----------------------------------------
src/server/sync.rs | 152 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 168 insertions(+), 151 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 24422ef41..5efe2d5f7 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,7 @@
//! Sync datastore by pulling contents from remote server
-use std::collections::{HashMap, HashSet};
-use std::io::{Seek, Write};
-use std::path::{Path, PathBuf};
+use std::collections::HashSet;
+use std::io::Seek;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@@ -15,11 +14,11 @@ use serde_json::json;
use tracing::{info, warn};
use pbs_api_types::{
- print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
-use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_client::{BackupReader, BackupRepository, HttpClient};
use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -29,26 +28,15 @@ use pbs_datastore::manifest::{
ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{
- check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
-};
+use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
use pbs_tools::sha::sha256;
-use super::sync::{RemovedVanishedStats, SyncStats};
+use super::sync::{
+ LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
-struct RemoteReader {
- backup_reader: Arc<BackupReader>,
- dir: BackupDir,
-}
-
-struct LocalReader {
- _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
- path: PathBuf,
- datastore: Arc<DataStore>,
-}
-
pub(crate) struct PullTarget {
store: Arc<DataStore>,
ns: BackupNamespace,
@@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error>;
+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
}
#[async_trait::async_trait]
@@ -230,10 +218,10 @@ impl PullSource for RemoteSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let backup_reader =
BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteReader {
+ Ok(Arc::new(RemoteSourceReader {
backup_reader,
dir: dir.clone(),
}))
@@ -298,14 +286,14 @@ impl PullSource for LocalSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
&dir.full_path(),
"snapshot",
"locked by another operation",
)?;
- Ok(Arc::new(LocalReader {
+ Ok(Arc::new(LocalSourceReader {
_dir_lock: Arc::new(Mutex::new(dir_lock)),
path: dir.full_path(),
datastore: dir.datastore().clone(),
@@ -313,129 +301,6 @@ impl PullSource for LocalSource {
}
}
-#[async_trait::async_trait]
-/// `PullReader` is a trait that provides an interface for reading data from a source.
-/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
-trait PullReader: Send + Sync {
- /// Returns a chunk reader with the specified encryption mode.
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
-
- /// Asynchronously loads a file from the source into a local file.
- /// `filename` is the name of the file to load from the source.
- /// `into` is the path of the local file to load the source file into.
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
-
- /// Tries to download the client log from the source and save it into a local file.
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
-}
-
-#[async_trait::async_trait]
-impl PullReader for RemoteReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(RemoteChunkReader::new(
- self.backup_reader.clone(),
- None,
- crypt_mode,
- HashMap::new(),
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
- if let Err(err) = download_result {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- info!(
- "skipping snapshot {} - vanished since start of sync",
- &self.dir,
- );
- return Ok(None);
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- };
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
- let mut tmp_path = to_path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = self
- .backup_reader
- .download(CLIENT_LOG_BLOB_NAME, tmpfile)
- .await
- {
- if let Err(err) = std::fs::rename(&tmp_path, to_path) {
- bail!("Atomic rename file {:?} failed - {}", to_path, err);
- }
- info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
- }
-
- Ok(())
- }
-
- fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
- false
- }
-}
-
-#[async_trait::async_trait]
-impl PullReader for LocalReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(LocalChunkReader::new(
- self.datastore.clone(),
- None,
- crypt_mode,
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let mut from_path = self.path.clone();
- from_path.push(filename);
- tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
- Ok(())
- }
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
- self.datastore.name() == target_store_name
- }
-}
-
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
@@ -643,7 +508,7 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
async fn pull_single_archive<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -726,7 +591,7 @@ async fn pull_single_archive<'a>(
/// -- if not, pull it from the remote
/// - Download log if not already existing
async fn pull_snapshot<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -837,7 +702,7 @@ async fn pull_snapshot<'a>(
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
async fn pull_snapshot_from<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 5f143ef63..323bc1a27 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -1,7 +1,24 @@
//! Sync datastore contents from source to target, either in push or pull direction
+use std::collections::HashMap;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
use std::time::Duration;
+use anyhow::{bail, Error};
+use http::StatusCode;
+use tracing::info;
+
+use proxmox_router::HttpError;
+
+use pbs_api_types::{BackupDir, CryptMode};
+use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{DataStore, LocalChunkReader};
+
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
pub(crate) groups: usize,
@@ -49,3 +66,138 @@ impl SyncStats {
}
}
}
+
+#[async_trait::async_trait]
+/// `SyncReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log,
+/// and checking whether chunk sync should be skipped.
+pub(crate) trait SyncSourceReader: Send + Sync {
+ /// Returns a chunk reader with the specified encryption mode.
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+ /// Asynchronously loads a file from the source into a local file.
+ /// `filename` is the name of the file to load from the source.
+ /// `into` is the path of the local file to load the source file into.
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
+
+ /// Tries to download the client log from the source and save it into a local file.
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+pub(crate) struct RemoteSourceReader {
+ pub(crate) backup_reader: Arc<BackupReader>,
+ pub(crate) dir: BackupDir,
+}
+
+pub(crate) struct LocalSourceReader {
+ pub(crate) _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+ pub(crate) path: PathBuf,
+ pub(crate) datastore: Arc<DataStore>,
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for RemoteSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(RemoteChunkReader::new(
+ self.backup_reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ info!(
+ "skipping snapshot {} - vanished since start of sync",
+ &self.dir
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = self
+ .backup_reader
+ .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+ .await
+ {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {to_path:?} failed - {err}");
+ }
+ info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
+ }
+
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+ false
+ }
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for LocalSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(LocalChunkReader::new(
+ self.datastore.clone(),
+ None,
+ crypt_mode,
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let mut from_path = self.path.clone();
+ from_path.push(filename);
+ tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+ self.datastore.name() == target_store_name
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-07-15 10:16 UTC|newest]
Thread overview: 36+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 01/24] datastore: data blob: fix typos in comments Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 02/24] server: pull: be more specific in module comment Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 03/24] server: pull: silence clippy to many arguments warning Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 04/24] www: sync edit: indetation style fix Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 05/24] server: pull: fix sync info message for root namespace Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module Christian Ebner
2024-07-15 10:15 ` Christian Ebner [this message]
2024-07-16 9:53 ` [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module Gabriel Goller
2024-07-23 7:32 ` Christian Ebner
2024-07-30 8:38 ` Gabriel Goller
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source " Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 11/24] client: backup writer: add chunk count and duration stats Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 12/24] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 13/24] api: backup: add ignore-previous flag to backup endpoint Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 14/24] server: sync: move skip info/reason to common sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 15/24] server: sync: make skip reason message more genenric Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 16/24] server: sync: factor out namespace depth check into sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 17/24] api types: define remote permissions and roles for push sync Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 18/24] fix #3044: server: implement push support for sync operations Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 19/24] api: config: extend sync job config by sync direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 20/24] api: push: implement endpoint for sync in push direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 22/24] bin: manager: add datastore push cli command Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 23/24] form: group filter: allow to set namespace for local datastore Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 24/24] www: sync edit: allow to set sync direction for sync jobs Christian Ebner
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
2024-07-16 14:28 ` Christian Ebner
2024-07-16 14:51 ` Gabriel Goller
2024-07-16 14:54 ` Christian Ebner
2024-07-23 14:00 ` Christian Ebner
2024-07-17 15:48 ` Thomas Lamprecht
2024-07-18 7:36 ` Christian Ebner
2024-07-30 10:42 ` Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240715101602.274244-8-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.