From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id C8CF31FF185 for ; Mon, 21 Jul 2025 16:15:44 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 56B02116DB; Mon, 21 Jul 2025 16:16:55 +0200 (CEST) Mime-Version: 1.0 Date: Mon, 21 Jul 2025 16:16:20 +0200 Message-Id: From: "Hannes Laimer" To: "Proxmox Backup Server development discussion" Cc: "pbs-devel" X-Mailer: aerc 0.20.1-112-gd31995f1e20b References: <20250719125035.9926-1-c.ebner@proxmox.com> <20250719125035.9926-40-c.ebner@proxmox.com> In-Reply-To: <20250719125035.9926-40-c.ebner@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1753107373344 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.025 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup v9 36/46] api/datastore: implement refresh endpoint for stores with s3 backend X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" On Sat Jul 19, 2025 at 2:50 PM CEST, Christian Ebner wrote: > Allows to easily refresh the contents on the local cache store for > datastores backed by an S3 object store. > > In order to guarantee that no read or write operations are ongoing, > the store is first set into the maintenance mode `S3Refresh`. Objects > are then fetched into a temporary directory to avoid loosing contents > and consistency in case of an error. Once all objects have been > fetched, clears out existing contents and moves the newly fetched > contents in place. > > Signed-off-by: Christian Ebner > --- > changes since version 8: > - refactor s3 refresh into more compact methods > - drop un-necessary drop(_lock) > - use missing tokio::task::spawn_blocking context for blocking > maintenance mode setting > > pbs-datastore/src/datastore.rs | 175 +++++++++++++++++++++++++++++++++ > src/api2/admin/datastore.rs | 34 +++++++ > 2 files changed, 209 insertions(+) > > diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs > index a524d7b32..b2af05eac 100644 > --- a/pbs-datastore/src/datastore.rs > +++ b/pbs-datastore/src/datastore.rs > @@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Context, Error}; > use http_body_util::BodyExt; > use nix::unistd::{unlinkat, UnlinkatFlags}; > use pbs_tools::lru_cache::LruCache; > +use tokio::io::AsyncWriteExt; > use tracing::{info, warn}; > > use proxmox_human_byte::HumanByte; > @@ -2200,4 +2201,178 @@ impl DataStore { > pub fn old_locking(&self) -> bool { > *OLD_LOCKING > } > + > + /// Set the datastore's maintenance mode to `S3Refresh`, fetch from S3 object store, clear and > + /// replace the local cache store contents. Once finished disable the maintenance mode again. > + /// Returns with error for other datastore backends without setting the maintenance mode. > + pub async fn s3_refresh(self: &Arc) -> Result<(), Error> { > + match self.backend()? { > + DatastoreBackend::Filesystem => bail!("store '{}' not backed by S3", self.name()), > + DatastoreBackend::S3(s3_client) => { > + let self_clone = Arc::clone(self); > + tokio::task::spawn_blocking(move || { > + self_clone.maintenance_mode(Some(MaintenanceMode { > + ty: MaintenanceType::S3Refresh, > + message: None, > + })) > + }) > + .await? > + .context("failed to set maintenance mode")?; I think we should hold the config lock, so it can't be changed while we refresh, no? > + > + let tmp_base = proxmox_sys::fs::make_tmp_dir(&self.base_path(), None) > + .context("failed to create temporary content folder in {store_base}")?; > + > + self.fetch_tmp_contents(&tmp_base, &s3_client).await?; > + self.move_tmp_contents_in_place(&tmp_base).await?; > + > + let self_clone = Arc::clone(self); > + tokio::task::spawn_blocking(move || self_clone.maintenance_mode(None)) > + .await? > + .context("failed to clear maintenance mode")?; > + } > + } > + Ok(()) > + } > + > + // Set or clear the datastores maintenance mode by locking and updating the datastore config > + fn maintenance_mode(&self, maintenance_mode: Option) -> Result<(), Error> { > + let _lock = pbs_config::datastore::lock_config()?; > + let (mut section_config, _digest) = pbs_config::datastore::config()?; > + let mut datastore: DataStoreConfig = section_config.lookup("datastore", self.name())?; > + datastore.set_maintenance_mode(maintenance_mode)?; > + section_config.set_data(self.name(), "datastore", &datastore)?; > + pbs_config::datastore::save_config(§ion_config)?; > + Ok(()) > + } > + > + // Fetch the contents (metadata, no chunks) of the datastore from the S3 object store to the > + // provided temporaray directory > + async fn fetch_tmp_contents(&self, tmp_base: &Path, s3_client: &S3Client) -> Result<(), Error> { > + let backup_user = pbs_config::backup_user().context("failed to get backup user")?; > + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644); > + let file_create_options = CreateOptions::new() > + .perm(mode) > + .owner(backup_user.uid) > + .group(backup_user.gid); > + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0755); > + let dir_create_options = CreateOptions::new() > + .perm(mode) > + .owner(backup_user.uid) > + .group(backup_user.gid); > + > + let list_prefix = S3PathPrefix::Some(S3_CONTENT_PREFIX.to_string()); > + let store_prefix = format!("{}/{S3_CONTENT_PREFIX}/", self.name()); > + let mut next_continuation_token: Option = None; > + loop { > + let list_objects_result = s3_client > + .list_objects_v2(&list_prefix, next_continuation_token.as_deref()) > + .await > + .context("failed to list object")?; > + > + let objects_to_fetch: Vec = list_objects_result > + .contents > + .into_iter() > + .map(|item| item.key) > + .collect(); > + > + for object_key in objects_to_fetch { > + let object_path = format!("{object_key}"); > + let object_path = object_path.strip_prefix(&store_prefix).with_context(|| { > + format!("failed to strip store context prefix {store_prefix} for {object_key}") > + })?; > + if object_path.ends_with(NAMESPACE_MARKER_FILENAME) { > + continue; > + } > + > + info!("Fetching object {object_path}"); > + > + let file_path = tmp_base.join(object_path); > + if let Some(parent) = file_path.parent() { > + proxmox_sys::fs::create_path( > + parent, > + Some(dir_create_options), > + Some(dir_create_options), > + )?; > + } > + > + let mut target_file = tokio::fs::OpenOptions::new() > + .write(true) > + .create(true) > + .truncate(true) > + .read(true) > + .open(&file_path) > + .await > + .with_context(|| format!("failed to create target file {file_path:?}"))?; > + > + if let Some(response) = s3_client > + .get_object(object_key) > + .await > + .with_context(|| format!("failed to fetch object {object_path}"))? > + { > + let data = response > + .content > + .collect() > + .await > + .context("failed to collect object contents")?; > + target_file > + .write_all(&data.to_bytes()) > + .await > + .context("failed to write to target file")?; > + file_create_options > + .apply_to(&mut target_file, &file_path) > + .context("failed to set target file create options")?; > + target_file > + .flush() > + .await > + .context("failed to flush target file")?; > + } else { > + bail!("failed to download {object_path}, not found"); > + } > + } > + > + if list_objects_result.is_truncated { > + next_continuation_token = list_objects_result > + .next_continuation_token > + .as_ref() > + .cloned(); > + continue; > + } > + break; > + } > + Ok(()) > + } > + > + // Fetch the contents (metadata, no chunks) of the datastore from the S3 object store to the > + // provided temporaray directory > + async fn move_tmp_contents_in_place(&self, tmp_base: &PathBuf) -> Result<(), Error> { > + for ty in ["vm", "ct", "host", "ns"] { > + let store_base_clone = self.base_path().clone(); > + let tmp_base_clone = tmp_base.clone(); > + tokio::task::spawn_blocking(move || { > + let type_dir = store_base_clone.join(ty); > + if let Err(err) = std::fs::remove_dir_all(&type_dir) { > + if err.kind() != io::ErrorKind::NotFound { > + return Err(err).with_context(|| { > + format!("failed to remove old contents in {type_dir:?}") > + }); > + } > + } > + let tmp_type_dir = tmp_base_clone.join(ty); > + if let Err(err) = std::fs::rename(&tmp_type_dir, &type_dir) { > + if err.kind() != io::ErrorKind::NotFound { > + return Err(err) > + .with_context(|| format!("failed to rename {tmp_type_dir:?}")); > + } > + } > + Ok::<(), Error>(()) > + }) > + .await? > + .with_context(|| format!("failed to refresh {:?}", self.base_path()))?; > + } > + > + std::fs::remove_dir_all(&tmp_base) > + .with_context(|| format!("failed to cleanup temporary content in {tmp_base:?}"))?; > + > + Ok(()) > + } > } > diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs > index 87a8641bd..23b216bef 100644 > --- a/src/api2/admin/datastore.rs > +++ b/src/api2/admin/datastore.rs > @@ -2707,6 +2707,39 @@ pub async fn unmount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result Ok(json!(upid)) > } > > +#[api( > + protected: true, > + input: { > + properties: { > + store: { > + schema: DATASTORE_SCHEMA, > + }, > + } > + }, > + returns: { > + schema: UPID_SCHEMA, > + }, > + access: { > + permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY, false), > + }, > +)] > +/// Refresh datastore contents from S3 to local cache store. > +pub async fn s3_refresh(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result { > + let datastore = DataStore::lookup_datastore(&store, Some(Operation::Lookup))?; > + let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; > + let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; > + > + let upid = WorkerTask::spawn( > + "s3-refresh", > + Some(store), > + auth_id.to_string(), > + to_stdout, > + move |_worker| async move { datastore.s3_refresh().await }, > + )?; > + > + Ok(json!(upid)) > +} > + > #[sortable] > const DATASTORE_INFO_SUBDIRS: SubdirMap = &[ > ( > @@ -2773,6 +2806,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[ > &Router::new().download(&API_METHOD_PXAR_FILE_DOWNLOAD), > ), > ("rrd", &Router::new().get(&API_METHOD_GET_RRD_STATS)), > + ("s3-refresh", &Router::new().put(&API_METHOD_S3_REFRESH)), > ( > "snapshots", > &Router::new() _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel