From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v2 proxmox-backup 03/31] server: sync: move reader trait to common sync module
Date: Thu, 1 Aug 2024 09:43:35 +0200 [thread overview]
Message-ID: <20240801074403.36229-4-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240801074403.36229-1-c.ebner@proxmox.com>
Move the `PullReader` trait and the types implementing it to the
common sync module, so this can be reused for the push direction
variant for a sync job as well.
Adapt the naming to be more ambiguous by renaming `PullReader` trait to
`SyncSourceReader`, `LocalReader` to `LocalSourceReader` and
`RemoteReader` to `RemoteSourceReader`.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 1:
- no changes
src/server/pull.rs | 167 +++++----------------------------------------
src/server/sync.rs | 152 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 168 insertions(+), 151 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 24422ef41..5efe2d5f7 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,7 @@
//! Sync datastore by pulling contents from remote server
-use std::collections::{HashMap, HashSet};
-use std::io::{Seek, Write};
-use std::path::{Path, PathBuf};
+use std::collections::HashSet;
+use std::io::Seek;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@@ -15,11 +14,11 @@ use serde_json::json;
use tracing::{info, warn};
use pbs_api_types::{
- print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
-use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_client::{BackupReader, BackupRepository, HttpClient};
use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -29,26 +28,15 @@ use pbs_datastore::manifest::{
ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{
- check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
-};
+use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
use pbs_tools::sha::sha256;
-use super::sync::{RemovedVanishedStats, SyncStats};
+use super::sync::{
+ LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
-struct RemoteReader {
- backup_reader: Arc<BackupReader>,
- dir: BackupDir,
-}
-
-struct LocalReader {
- _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
- path: PathBuf,
- datastore: Arc<DataStore>,
-}
-
pub(crate) struct PullTarget {
store: Arc<DataStore>,
ns: BackupNamespace,
@@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error>;
+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
}
#[async_trait::async_trait]
@@ -230,10 +218,10 @@ impl PullSource for RemoteSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let backup_reader =
BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteReader {
+ Ok(Arc::new(RemoteSourceReader {
backup_reader,
dir: dir.clone(),
}))
@@ -298,14 +286,14 @@ impl PullSource for LocalSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
&dir.full_path(),
"snapshot",
"locked by another operation",
)?;
- Ok(Arc::new(LocalReader {
+ Ok(Arc::new(LocalSourceReader {
_dir_lock: Arc::new(Mutex::new(dir_lock)),
path: dir.full_path(),
datastore: dir.datastore().clone(),
@@ -313,129 +301,6 @@ impl PullSource for LocalSource {
}
}
-#[async_trait::async_trait]
-/// `PullReader` is a trait that provides an interface for reading data from a source.
-/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
-trait PullReader: Send + Sync {
- /// Returns a chunk reader with the specified encryption mode.
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
-
- /// Asynchronously loads a file from the source into a local file.
- /// `filename` is the name of the file to load from the source.
- /// `into` is the path of the local file to load the source file into.
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
-
- /// Tries to download the client log from the source and save it into a local file.
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
-}
-
-#[async_trait::async_trait]
-impl PullReader for RemoteReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(RemoteChunkReader::new(
- self.backup_reader.clone(),
- None,
- crypt_mode,
- HashMap::new(),
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
- if let Err(err) = download_result {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- info!(
- "skipping snapshot {} - vanished since start of sync",
- &self.dir,
- );
- return Ok(None);
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- };
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
- let mut tmp_path = to_path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = self
- .backup_reader
- .download(CLIENT_LOG_BLOB_NAME, tmpfile)
- .await
- {
- if let Err(err) = std::fs::rename(&tmp_path, to_path) {
- bail!("Atomic rename file {:?} failed - {}", to_path, err);
- }
- info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
- }
-
- Ok(())
- }
-
- fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
- false
- }
-}
-
-#[async_trait::async_trait]
-impl PullReader for LocalReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(LocalChunkReader::new(
- self.datastore.clone(),
- None,
- crypt_mode,
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let mut from_path = self.path.clone();
- from_path.push(filename);
- tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
- Ok(())
- }
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
- self.datastore.name() == target_store_name
- }
-}
-
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
@@ -643,7 +508,7 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
async fn pull_single_archive<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -726,7 +591,7 @@ async fn pull_single_archive<'a>(
/// -- if not, pull it from the remote
/// - Download log if not already existing
async fn pull_snapshot<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -837,7 +702,7 @@ async fn pull_snapshot<'a>(
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
async fn pull_snapshot_from<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 5f143ef63..323bc1a27 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -1,7 +1,24 @@
//! Sync datastore contents from source to target, either in push or pull direction
+use std::collections::HashMap;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
use std::time::Duration;
+use anyhow::{bail, Error};
+use http::StatusCode;
+use tracing::info;
+
+use proxmox_router::HttpError;
+
+use pbs_api_types::{BackupDir, CryptMode};
+use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{DataStore, LocalChunkReader};
+
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
pub(crate) groups: usize,
@@ -49,3 +66,138 @@ impl SyncStats {
}
}
}
+
+#[async_trait::async_trait]
+/// `SyncReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log,
+/// and checking whether chunk sync should be skipped.
+pub(crate) trait SyncSourceReader: Send + Sync {
+ /// Returns a chunk reader with the specified encryption mode.
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+ /// Asynchronously loads a file from the source into a local file.
+ /// `filename` is the name of the file to load from the source.
+ /// `into` is the path of the local file to load the source file into.
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
+
+ /// Tries to download the client log from the source and save it into a local file.
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+pub(crate) struct RemoteSourceReader {
+ pub(crate) backup_reader: Arc<BackupReader>,
+ pub(crate) dir: BackupDir,
+}
+
+pub(crate) struct LocalSourceReader {
+ pub(crate) _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+ pub(crate) path: PathBuf,
+ pub(crate) datastore: Arc<DataStore>,
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for RemoteSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(RemoteChunkReader::new(
+ self.backup_reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ info!(
+ "skipping snapshot {} - vanished since start of sync",
+ &self.dir
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = self
+ .backup_reader
+ .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+ .await
+ {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {to_path:?} failed - {err}");
+ }
+ info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
+ }
+
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+ false
+ }
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for LocalSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(LocalChunkReader::new(
+ self.datastore.clone(),
+ None,
+ crypt_mode,
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let mut from_path = self.path.clone();
+ from_path.push(filename);
+ tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+ self.datastore.name() == target_store_name
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-08-01 7:45 UTC|newest]
Thread overview: 46+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-08-01 7:43 [pbs-devel] [PATCH v2 proxmox-backup 00/31] fix #3044: push datastore to remote target Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 01/31] api: datastore: add missing whitespace in description Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 02/31] server: sync: move sync related stats to common module Christian Ebner
2024-08-01 7:43 ` Christian Ebner [this message]
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 04/31] server: sync: move source to common sync module Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 05/31] client: backup writer: bundle upload stats counters Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 06/31] client: backup writer: factor out merged chunk stream upload Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 07/31] client: backup writer: add chunk count and duration stats Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 08/31] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 09/31] api: backup: add no-timestamp-check flag to backup endpoint Christian Ebner
2024-08-07 10:33 ` Fabian Grünbichler
2024-08-07 10:48 ` Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 10/31] server: sync: move skip info/reason to common sync module Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 11/31] server: sync: make skip reason message more genenric Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 12/31] server: sync: factor out namespace depth check into sync module Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 13/31] api types: define remote permissions and roles for push sync Christian Ebner
2024-08-07 10:45 ` Fabian Grünbichler
2024-08-07 11:21 ` Fabian Grünbichler
2024-09-05 11:17 ` Christian Ebner
2024-09-05 12:12 ` Fabian Grünbichler
2024-09-05 12:26 ` Christian Ebner
2024-09-05 12:42 ` Fabian Grünbichler
2024-09-05 13:27 ` Christian Ebner
2024-09-05 14:06 ` Fabian Grünbichler
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 14/31] fix #3044: server: implement push support for sync operations Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 15/31] config: jobs: add `sync-push` config type for push sync jobs Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 16/31] api: push: implement endpoint for sync in push direction Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 17/31] api: sync: move sync job invocation to common module Christian Ebner
2024-08-07 10:51 ` Fabian Grünbichler
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 18/31] api: sync jobs: expose optional `sync-direction` parameter Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 19/31] bin: manager: add datastore push cli command Christian Ebner
2024-08-07 10:55 ` Fabian Grünbichler
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 20/31] ui: group filter: allow to set namespace for local datastore Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 21/31] ui: sync edit: source group filters based on sync direction Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 22/31] ui: add view with separate grids for pull and push sync jobs Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 23/31] ui: sync job: adapt edit window to be used for pull and push Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 24/31] ui: sync: pass sync-direction to allow removing push jobs Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 25/31] ui: sync view: do not use data model proxy for store Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 26/31] ui: sync view: set sync direction when invoking run task via api Christian Ebner
2024-08-01 7:43 ` [pbs-devel] [PATCH v2 proxmox-backup 27/31] datastore: move `BackupGroupDeleteStats` to api types Christian Ebner
2024-08-07 11:00 ` Fabian Grünbichler
2024-08-01 7:44 ` [pbs-devel] [PATCH v2 proxmox-backup 28/31] api types: implement api type for `BackupGroupDeleteStats` Christian Ebner
2024-08-01 7:44 ` [pbs-devel] [PATCH v2 proxmox-backup 29/31] datastore: increment deleted group counter when removing group Christian Ebner
2024-08-01 7:44 ` [pbs-devel] [PATCH v2 proxmox-backup 30/31] api: datastore/namespace: return backup groups delete stats on remove Christian Ebner
2024-08-01 7:44 ` [pbs-devel] [PATCH v2 proxmox-backup 31/31] server: sync job: use delete stats provided by the api Christian Ebner
2024-09-12 14:38 ` [pbs-devel] [PATCH v2 proxmox-backup 00/31] fix #3044: push datastore to remote target Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240801074403.36229-4-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.