From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 6EF3689CB8 for ; Tue, 18 Oct 2022 11:21:42 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4F5DA26AF5 for ; Tue, 18 Oct 2022 11:21:12 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Tue, 18 Oct 2022 11:21:10 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 14D3E44A55 for ; Tue, 18 Oct 2022 11:21:10 +0200 (CEST) From: =?UTF-8?q?Fabian=20Gr=C3=BCnbichler?= To: pve-devel@lists.proxmox.com Date: Tue, 18 Oct 2022 11:20:40 +0200 Message-Id: <20221018092040.860121-7-f.gruenbichler@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20221018092040.860121-1-f.gruenbichler@proxmox.com> References: <20221018092040.860121-1-f.gruenbichler@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.142 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mirror.rs, progress.total] Subject: [pve-devel] [PATCH proxmox-offline-mirror 4/4] mirror: refactor fetch_binary/source_packages X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 18 Oct 2022 09:21:42 -0000 and pull out some of the progress variables into a struct. Signed-off-by: Fabian Grünbichler --- src/mirror.rs | 520 ++++++++++++++++++++++++++++---------------------- 1 file changed, 287 insertions(+), 233 deletions(-) diff --git a/src/mirror.rs b/src/mirror.rs index 39b7f47..faaaa19 100644 --- a/src/mirror.rs +++ b/src/mirror.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::{bail, format_err, Error}; use flate2::bufread::GzDecoder; -use globset::{Glob, GlobSetBuilder}; +use globset::{Glob, GlobSet, GlobSetBuilder}; use nix::libc; use proxmox_http::{client::sync::Client, HttpClient, HttpOptions}; use proxmox_sys::fs::file_get_contents; @@ -145,7 +145,7 @@ fn fetch_repo_file( /// Helper to fetch InRelease (`detached` == false) or Release/Release.gpg (`detached` == true) files from repository. /// /// Verifies the contained/detached signature and stores all fetched files under `prefix`. -/// +/// /// Returns the verified raw release file data, or None if the "fetch" part itself fails. fn fetch_release( config: &ParsedMirrorConfig, @@ -474,6 +474,259 @@ pub fn list_snapshots(config: &MirrorConfig) -> Result, Error> { Ok(list) } +struct MirrorProgress { + warnings: Vec, + dry_run: Progress, + total: Progress, + skip_count: usize, + skip_bytes: usize, +} + +fn convert_to_globset(config: &ParsedMirrorConfig) -> Result, Error> { + Ok(if let Some(skipped_packages) = &config.skip.skip_packages { + let mut globs = GlobSetBuilder::new(); + for glob in skipped_packages { + let glob = Glob::new(glob)?; + globs.add(glob); + } + let globs = globs.build()?; + Some(globs) + } else { + None + }) +} + +fn fetch_binary_packages( + config: &ParsedMirrorConfig, + packages_indices: HashMap<&String, PackagesFile>, + dry_run: bool, + prefix: &Path, + progress: &mut MirrorProgress, +) -> Result<(), Error> { + let skipped_package_globs = convert_to_globset(config)?; + + for (basename, references) in packages_indices { + let total_files = references.files.len(); + if total_files == 0 { + println!("\n{basename} - no files, skipping."); + continue; + } else { + println!("\n{basename} - {total_files} total file(s)"); + } + + let mut fetch_progress = Progress::new(); + let mut skip_count = 0usize; + let mut skip_bytes = 0usize; + for package in references.files { + if let Some(ref sections) = &config.skip.skip_sections { + if sections.iter().any(|section| package.section == *section) { + println!( + "\tskipping {} - {}b (section '{}')", + package.package, package.size, package.section + ); + skip_count += 1; + skip_bytes += package.size; + continue; + } + } + if let Some(skipped_package_globs) = &skipped_package_globs { + let matches = skipped_package_globs.matches(&package.package); + if !matches.is_empty() { + // safety, skipped_package_globs is set based on this + let globs = config.skip.skip_packages.as_ref().unwrap(); + let matches: Vec = matches.iter().map(|i| globs[*i].clone()).collect(); + println!( + "\tskipping {} - {}b (package glob(s): {})", + package.package, + package.size, + matches.join(", ") + ); + skip_count += 1; + skip_bytes += package.size; + continue; + } + } + let url = get_repo_url(&config.repository, &package.file); + + if dry_run { + if config.pool.contains(&package.checksums) { + fetch_progress.update(&FetchResult { + data: vec![], + fetched: 0, + }); + } else { + println!("\t(dry-run) GET missing '{url}' ({}b)", package.size); + fetch_progress.update(&FetchResult { + data: vec![], + fetched: package.size, + }); + } + } else { + let mut full_path = PathBuf::from(prefix); + full_path.push(&package.file); + + match fetch_plain_file( + config, + &url, + &full_path, + package.size, + &package.checksums, + false, + dry_run, + ) { + Ok(res) => fetch_progress.update(&res), + Err(err) if config.ignore_errors => { + let msg = format!( + "{}: failed to fetch package '{}' - {}", + basename, package.file, err, + ); + eprintln!("{msg}"); + progress.warnings.push(msg); + } + Err(err) => return Err(err), + } + } + + if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 { + println!("\tProgress: {fetch_progress}"); + } + } + println!("\tProgress: {fetch_progress}"); + if dry_run { + progress.dry_run += fetch_progress; + } else { + progress.total += fetch_progress; + } + if skip_count > 0 { + progress.skip_count += skip_count; + progress.skip_bytes += skip_bytes; + println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b"); + } + } + + Ok(()) +} + +fn fetch_source_packages( + config: &ParsedMirrorConfig, + source_packages_indices: HashMap<&String, SourcesFile>, + dry_run: bool, + prefix: &Path, + progress: &mut MirrorProgress, +) -> Result<(), Error> { + let skipped_package_globs = convert_to_globset(config)?; + + for (basename, references) in source_packages_indices { + let total_source_packages = references.source_packages.len(); + if total_source_packages == 0 { + println!("\n{basename} - no files, skipping."); + continue; + } else { + println!("\n{basename} - {total_source_packages} total source package(s)"); + } + + let mut fetch_progress = Progress::new(); + let mut skip_count = 0usize; + let mut skip_bytes = 0usize; + for package in references.source_packages { + if let Some(ref sections) = &config.skip.skip_sections { + if sections + .iter() + .any(|section| package.section.as_ref() == Some(section)) + { + println!( + "\tskipping {} - {}b (section '{}')", + package.package, + package.size(), + package.section.as_ref().unwrap(), + ); + skip_count += 1; + skip_bytes += package.size(); + continue; + } + } + if let Some(skipped_package_globs) = &skipped_package_globs { + let matches = skipped_package_globs.matches(&package.package); + if !matches.is_empty() { + // safety, skipped_package_globs is set based on this + let globs = config.skip.skip_packages.as_ref().unwrap(); + let matches: Vec = matches.iter().map(|i| globs[*i].clone()).collect(); + println!( + "\tskipping {} - {}b (package glob(s): {})", + package.package, + package.size(), + matches.join(", ") + ); + skip_count += 1; + skip_bytes += package.size(); + continue; + } + } + + for file_reference in package.files.values() { + let path = format!("{}/{}", package.directory, file_reference.file); + let url = get_repo_url(&config.repository, &path); + + if dry_run { + if config.pool.contains(&file_reference.checksums) { + fetch_progress.update(&FetchResult { + data: vec![], + fetched: 0, + }); + } else { + println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size); + fetch_progress.update(&FetchResult { + data: vec![], + fetched: file_reference.size, + }); + } + } else { + let mut full_path = PathBuf::from(prefix); + full_path.push(&path); + + match fetch_plain_file( + config, + &url, + &full_path, + file_reference.size, + &file_reference.checksums, + false, + dry_run, + ) { + Ok(res) => fetch_progress.update(&res), + Err(err) if config.ignore_errors => { + let msg = format!( + "{}: failed to fetch package '{}' - {}", + basename, file_reference.file, err, + ); + eprintln!("{msg}"); + progress.warnings.push(msg); + } + Err(err) => return Err(err), + } + } + + if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 { + println!("\tProgress: {fetch_progress}"); + } + } + } + println!("\tProgress: {fetch_progress}"); + if dry_run { + progress.dry_run += fetch_progress; + } else { + progress.total += fetch_progress; + } + if skip_count > 0 { + progress.skip_count += skip_count; + progress.skip_bytes += skip_bytes; + println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b"); + } + } + + Ok(()) +} + /// Create a new snapshot of the remote repository, fetching and storing files as needed. /// /// Operates in three phases: @@ -518,8 +771,13 @@ pub fn create_snapshot( let prefix = format!("{snapshot}.tmp"); let prefix = Path::new(&prefix); - let mut total_progress = Progress::new(); - let mut warnings = Vec::new(); + let mut progress = MirrorProgress { + warnings: Vec::new(), + skip_count: 0, + skip_bytes: 0, + dry_run: Progress::new(), + total: Progress::new(), + }; let parse_release = |res: FetchResult, name: &str| -> Result { println!("Parsing {name}.."); @@ -534,14 +792,14 @@ pub fn create_snapshot( // we want both on-disk for compat reasons, if both are available let release = fetch_release(&config, prefix, true, dry_run)? .map(|res| { - total_progress.update(&res); + progress.total.update(&res); parse_release(res, "Release") }) .transpose()?; let in_release = fetch_release(&config, prefix, false, dry_run)? .map(|res| { - total_progress.update(&res); + progress.total.update(&res); parse_release(res, "InRelease") }) .transpose()?; @@ -671,7 +929,7 @@ pub fn create_snapshot( reference.file_type, reference.path ); eprintln!("{msg}"); - warnings.push(msg); + progress.warnings.push(msg); failed_references.push(reference); continue; } @@ -723,7 +981,7 @@ pub fn create_snapshot( println!("Total dsc size for component: {component_dsc_size}"); packages_size += component_dsc_size; - total_progress += fetch_progress; + progress.total += fetch_progress; } println!("Total deb size: {packages_size}"); if !failed_references.is_empty() { @@ -733,244 +991,40 @@ pub fn create_snapshot( } } - let skipped_package_globs = if let Some(skipped_packages) = &config.skip.skip_packages { - let mut globs = GlobSetBuilder::new(); - for glob in skipped_packages { - let glob = Glob::new(glob)?; - globs.add(glob); - } - let globs = globs.build()?; - Some(globs) - } else { - None - }; - println!("\nFetching packages.."); - let mut dry_run_progress = Progress::new(); - let mut total_skipped_count = 0usize; - let mut total_skipped_bytes = 0usize; - for (basename, references) in packages_indices { - let total_files = references.files.len(); - if total_files == 0 { - println!("\n{basename} - no files, skipping."); - continue; - } else { - println!("\n{basename} - {total_files} total file(s)"); - } - - let mut fetch_progress = Progress::new(); - let mut skipped_count = 0usize; - let mut skipped_bytes = 0usize; - for package in references.files { - if let Some(ref sections) = &config.skip.skip_sections { - if sections.iter().any(|section| package.section == *section) { - println!( - "\tskipping {} - {}b (section '{}')", - package.package, package.size, package.section - ); - skipped_count += 1; - skipped_bytes += package.size; - continue; - } - } - if let Some(skipped_package_globs) = &skipped_package_globs { - let matches = skipped_package_globs.matches(&package.package); - if !matches.is_empty() { - // safety, skipped_package_globs is set based on this - let globs = config.skip.skip_packages.as_ref().unwrap(); - let matches: Vec = matches.iter().map(|i| globs[*i].clone()).collect(); - println!( - "\tskipping {} - {}b (package glob(s): {})", - package.package, - package.size, - matches.join(", ") - ); - skipped_count += 1; - skipped_bytes += package.size; - continue; - } - } - let url = get_repo_url(&config.repository, &package.file); - - if dry_run { - if config.pool.contains(&package.checksums) { - fetch_progress.update(&FetchResult { - data: vec![], - fetched: 0, - }); - } else { - println!("\t(dry-run) GET missing '{url}' ({}b)", package.size); - fetch_progress.update(&FetchResult { - data: vec![], - fetched: package.size, - }); - } - } else { - let mut full_path = PathBuf::from(prefix); - full_path.push(&package.file); - - match fetch_plain_file( - &config, - &url, - &full_path, - package.size, - &package.checksums, - false, - dry_run, - ) { - Ok(res) => fetch_progress.update(&res), - Err(err) if config.ignore_errors => { - let msg = format!( - "{}: failed to fetch package '{}' - {}", - basename, package.file, err, - ); - eprintln!("{msg}"); - warnings.push(msg); - } - Err(err) => return Err(err), - } - } - - if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 { - println!("\tProgress: {fetch_progress}"); - } - } - println!("\tProgress: {fetch_progress}"); - if dry_run { - dry_run_progress += fetch_progress; - } else { - total_progress += fetch_progress; - } - if skipped_count > 0 { - total_skipped_count += skipped_count; - total_skipped_bytes += skipped_bytes; - println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b"); - } - } - for (basename, references) in source_packages_indices { - let total_source_packages = references.source_packages.len(); - if total_source_packages == 0 { - println!("\n{basename} - no files, skipping."); - continue; - } else { - println!("\n{basename} - {total_source_packages} total source package(s)"); - } + fetch_binary_packages(&config, packages_indices, dry_run, prefix, &mut progress)?; - let mut fetch_progress = Progress::new(); - let mut skipped_count = 0usize; - let mut skipped_bytes = 0usize; - for package in references.source_packages { - if let Some(ref sections) = &config.skip.skip_sections { - if sections - .iter() - .any(|section| package.section.as_ref() == Some(section)) - { - println!( - "\tskipping {} - {}b (section '{}')", - package.package, - package.size(), - package.section.as_ref().unwrap(), - ); - skipped_count += 1; - skipped_bytes += package.size(); - continue; - } - } - if let Some(skipped_package_globs) = &skipped_package_globs { - let matches = skipped_package_globs.matches(&package.package); - if !matches.is_empty() { - // safety, skipped_package_globs is set based on this - let globs = config.skip.skip_packages.as_ref().unwrap(); - let matches: Vec = matches.iter().map(|i| globs[*i].clone()).collect(); - println!( - "\tskipping {} - {}b (package glob(s): {})", - package.package, - package.size(), - matches.join(", ") - ); - skipped_count += 1; - skipped_bytes += package.size(); - continue; - } - } - - for file_reference in package.files.values() { - let path = format!("{}/{}", package.directory, file_reference.file); - let url = get_repo_url(&config.repository, &path); - - if dry_run { - if config.pool.contains(&file_reference.checksums) { - fetch_progress.update(&FetchResult { - data: vec![], - fetched: 0, - }); - } else { - println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size); - fetch_progress.update(&FetchResult { - data: vec![], - fetched: file_reference.size, - }); - } - } else { - let mut full_path = PathBuf::from(prefix); - full_path.push(&path); - - match fetch_plain_file( - &config, - &url, - &full_path, - file_reference.size, - &file_reference.checksums, - false, - dry_run, - ) { - Ok(res) => fetch_progress.update(&res), - Err(err) if config.ignore_errors => { - let msg = format!( - "{}: failed to fetch package '{}' - {}", - basename, file_reference.file, err, - ); - eprintln!("{msg}"); - warnings.push(msg); - } - Err(err) => return Err(err), - } - } - - if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 { - println!("\tProgress: {fetch_progress}"); - } - } - } - println!("\tProgress: {fetch_progress}"); - if dry_run { - dry_run_progress += fetch_progress; - } else { - total_progress += fetch_progress; - } - if skipped_count > 0 { - total_skipped_count += skipped_count; - total_skipped_bytes += skipped_bytes; - println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b"); - } - } + fetch_source_packages( + &config, + source_packages_indices, + dry_run, + prefix, + &mut progress, + )?; if dry_run { - println!("\nDry-run Stats (indices, downloaded but not persisted):\n{total_progress}"); - println!("\nDry-run stats (packages, new == missing):\n{dry_run_progress}"); + println!( + "\nDry-run Stats (indices, downloaded but not persisted):\n{}", + progress.total + ); + println!( + "\nDry-run stats (packages, new == missing):\n{}", + progress.dry_run + ); } else { - println!("\nStats: {total_progress}"); + println!("\nStats: {}", progress.total); } if total_count > 0 { println!( - "Skipped downloading {total_skipped_count} packages totalling {total_skipped_bytes}b" + "Skipped downloading {} packages totalling {}b", + progress.skip_count, progress.skip_bytes, ); } - if !warnings.is_empty() { + if !progress.warnings.is_empty() { eprintln!("Warnings:"); - for msg in warnings { + for msg in progress.warnings { eprintln!("- {msg}"); } } -- 2.30.2