From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 57CCA1FF140 for ; Fri, 24 Apr 2026 15:36:48 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id D648B1A503; Fri, 24 Apr 2026 15:36:47 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup] sync: pull: use LogLineSender in d SyncSourceReader Date: Fri, 24 Apr 2026 15:35:44 +0200 Message-ID: <20260424133613.3135956-1-d.csapak@proxmox.com> X-Mailer: git-send-email 2.47.3 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -1.250 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy ENA_SUBJ_ODD_CASE 2.6 Subject has odd case KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [sync.rs,pull.rs] Message-ID-Hash: XEQAQ3GZ5KUWIOFC47P3ED537JQJULDV X-Message-ID-Hash: XEQAQ3GZ5KUWIOFC47P3ED537JQJULDV X-MailFrom: d.csapak@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: otherwise some log messages are not rendered with the correct prefix when using multiple worker threads. For example: Snapshot ct/122/2026-04-24T12:56:17Z: got backup log file client.log.blob vs [ct/122]: Snapshot ct/122/2026-04-24T13:00:02Z: got backup log file client.log.blob Signed-off-by: Dominik Csapak --- there may be other places where we don't use the LogLineSender here, but could not find any on first glance. src/server/pull.rs | 10 ++++--- src/server/sync.rs | 67 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 7fa273edb..2dff2a9f8 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -450,7 +450,7 @@ async fn pull_single_archive<'a>( .await?; reader - .load_file_into(archive_name, &tmp_path) + .load_file_into(archive_name, &tmp_path, Arc::clone(&log_sender)) .await .with_context(|| archive_prefix.clone())?; @@ -664,7 +664,11 @@ async fn pull_snapshot<'a>( let mut tmp_manifest_name = manifest_name.clone(); tmp_manifest_name.set_extension("tmp"); let Some(tmp_manifest_blob) = reader - .load_file_into(MANIFEST_BLOB_NAME.as_ref(), &tmp_manifest_name) + .load_file_into( + MANIFEST_BLOB_NAME.as_ref(), + &tmp_manifest_name, + Arc::clone(&log_sender), + ) .await .with_context(|| prefix.clone())? else { @@ -676,7 +680,7 @@ async fn pull_snapshot<'a>( let fetch_log = async || { if !client_log_name.exists() { reader - .try_download_client_log(&client_log_name) + .try_download_client_log(&client_log_name, Arc::clone(&log_sender)) .await .with_context(|| prefix.clone())?; if client_log_name.exists() { diff --git a/src/server/sync.rs b/src/server/sync.rs index d25180341..8acbaf673 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -101,10 +101,19 @@ pub(crate) trait SyncSourceReader: Send + Sync { /// Asynchronously loads a file from the source into a local file. /// `filename` is the name of the file to load from the source. /// `into` is the path of the local file to load the source file into. - async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error>; + async fn load_file_into( + &self, + filename: &str, + into: &Path, + log_sender: Arc, + ) -> Result, Error>; /// Tries to download the client log from the source and save it into a local file. - async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>; + async fn try_download_client_log( + &self, + to_path: &Path, + log_sender: Arc, + ) -> Result<(), Error>; fn skip_chunk_sync(&self, target_store_name: &str) -> bool; } @@ -137,7 +146,12 @@ impl SyncSourceReader for RemoteSourceReader { Ok(Arc::new(chunk_reader)) } - async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error> { + async fn load_file_into( + &self, + filename: &str, + into: &Path, + log_sender: Arc, + ) -> Result, Error> { let mut tmp_file = std::fs::OpenOptions::new() .write(true) .create(true) @@ -149,10 +163,15 @@ impl SyncSourceReader for RemoteSourceReader { match err.downcast_ref::() { Some(HttpError { code, message }) => match *code { StatusCode::NOT_FOUND => { - info!( - "Snapshot {}: skipped because vanished since start of sync", - &self.dir - ); + log_sender + .log( + Level::INFO, + format!( + "Snapshot {}: skipped because vanished since start of sync", + &self.dir + ), + ) + .await?; return Ok(None); } _ => { @@ -168,7 +187,11 @@ impl SyncSourceReader for RemoteSourceReader { Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) } - async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> { + async fn try_download_client_log( + &self, + to_path: &Path, + log_sender: Arc, + ) -> Result<(), Error> { let mut tmp_path = to_path.to_owned(); tmp_path.set_extension("tmp"); @@ -189,11 +212,16 @@ impl SyncSourceReader for RemoteSourceReader { if let Err(err) = std::fs::rename(&tmp_path, to_path) { bail!("Atomic rename file {to_path:?} failed - {err}"); } - info!( - "Snapshot {snapshot}: got backup log file {client_log_name}", - snapshot = &self.dir, - client_log_name = client_log_name.deref() - ); + log_sender + .log( + Level::INFO, + format!( + "Snapshot {snapshot}: got backup log file {client_log_name}", + snapshot = &self.dir, + client_log_name = client_log_name.deref() + ), + ) + .await?; } Ok(()) @@ -215,7 +243,12 @@ impl SyncSourceReader for LocalSourceReader { Ok(Arc::new(chunk_reader)) } - async fn load_file_into(&self, filename: &str, into: &Path) -> Result, Error> { + async fn load_file_into( + &self, + filename: &str, + into: &Path, + _log_sender: Arc, + ) -> Result, Error> { let mut tmp_file = std::fs::OpenOptions::new() .write(true) .create(true) @@ -229,7 +262,11 @@ impl SyncSourceReader for LocalSourceReader { Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) } - async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> { + async fn try_download_client_log( + &self, + _to_path: &Path, + _log_sender: Arc, + ) -> Result<(), Error> { Ok(()) } -- 2.47.3