From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH proxmox-offline-mirror 4/4] mirror: refactor fetch_binary/source_packages
Date: Tue, 18 Oct 2022 11:20:40 +0200 [thread overview]
Message-ID: <20221018092040.860121-7-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20221018092040.860121-1-f.gruenbichler@proxmox.com>
and pull out some of the progress variables into a struct.
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
src/mirror.rs | 520 ++++++++++++++++++++++++++++----------------------
1 file changed, 287 insertions(+), 233 deletions(-)
diff --git a/src/mirror.rs b/src/mirror.rs
index 39b7f47..faaaa19 100644
--- a/src/mirror.rs
+++ b/src/mirror.rs
@@ -7,7 +7,7 @@ use std::{
use anyhow::{bail, format_err, Error};
use flate2::bufread::GzDecoder;
-use globset::{Glob, GlobSetBuilder};
+use globset::{Glob, GlobSet, GlobSetBuilder};
use nix::libc;
use proxmox_http::{client::sync::Client, HttpClient, HttpOptions};
use proxmox_sys::fs::file_get_contents;
@@ -145,7 +145,7 @@ fn fetch_repo_file(
/// Helper to fetch InRelease (`detached` == false) or Release/Release.gpg (`detached` == true) files from repository.
///
/// Verifies the contained/detached signature and stores all fetched files under `prefix`.
-///
+///
/// Returns the verified raw release file data, or None if the "fetch" part itself fails.
fn fetch_release(
config: &ParsedMirrorConfig,
@@ -474,6 +474,259 @@ pub fn list_snapshots(config: &MirrorConfig) -> Result<Vec<Snapshot>, Error> {
Ok(list)
}
+struct MirrorProgress {
+ warnings: Vec<String>,
+ dry_run: Progress,
+ total: Progress,
+ skip_count: usize,
+ skip_bytes: usize,
+}
+
+fn convert_to_globset(config: &ParsedMirrorConfig) -> Result<Option<GlobSet>, Error> {
+ Ok(if let Some(skipped_packages) = &config.skip.skip_packages {
+ let mut globs = GlobSetBuilder::new();
+ for glob in skipped_packages {
+ let glob = Glob::new(glob)?;
+ globs.add(glob);
+ }
+ let globs = globs.build()?;
+ Some(globs)
+ } else {
+ None
+ })
+}
+
+fn fetch_binary_packages(
+ config: &ParsedMirrorConfig,
+ packages_indices: HashMap<&String, PackagesFile>,
+ dry_run: bool,
+ prefix: &Path,
+ progress: &mut MirrorProgress,
+) -> Result<(), Error> {
+ let skipped_package_globs = convert_to_globset(config)?;
+
+ for (basename, references) in packages_indices {
+ let total_files = references.files.len();
+ if total_files == 0 {
+ println!("\n{basename} - no files, skipping.");
+ continue;
+ } else {
+ println!("\n{basename} - {total_files} total file(s)");
+ }
+
+ let mut fetch_progress = Progress::new();
+ let mut skip_count = 0usize;
+ let mut skip_bytes = 0usize;
+ for package in references.files {
+ if let Some(ref sections) = &config.skip.skip_sections {
+ if sections.iter().any(|section| package.section == *section) {
+ println!(
+ "\tskipping {} - {}b (section '{}')",
+ package.package, package.size, package.section
+ );
+ skip_count += 1;
+ skip_bytes += package.size;
+ continue;
+ }
+ }
+ if let Some(skipped_package_globs) = &skipped_package_globs {
+ let matches = skipped_package_globs.matches(&package.package);
+ if !matches.is_empty() {
+ // safety, skipped_package_globs is set based on this
+ let globs = config.skip.skip_packages.as_ref().unwrap();
+ let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
+ println!(
+ "\tskipping {} - {}b (package glob(s): {})",
+ package.package,
+ package.size,
+ matches.join(", ")
+ );
+ skip_count += 1;
+ skip_bytes += package.size;
+ continue;
+ }
+ }
+ let url = get_repo_url(&config.repository, &package.file);
+
+ if dry_run {
+ if config.pool.contains(&package.checksums) {
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: 0,
+ });
+ } else {
+ println!("\t(dry-run) GET missing '{url}' ({}b)", package.size);
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: package.size,
+ });
+ }
+ } else {
+ let mut full_path = PathBuf::from(prefix);
+ full_path.push(&package.file);
+
+ match fetch_plain_file(
+ config,
+ &url,
+ &full_path,
+ package.size,
+ &package.checksums,
+ false,
+ dry_run,
+ ) {
+ Ok(res) => fetch_progress.update(&res),
+ Err(err) if config.ignore_errors => {
+ let msg = format!(
+ "{}: failed to fetch package '{}' - {}",
+ basename, package.file, err,
+ );
+ eprintln!("{msg}");
+ progress.warnings.push(msg);
+ }
+ Err(err) => return Err(err),
+ }
+ }
+
+ if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
+ println!("\tProgress: {fetch_progress}");
+ }
+ }
+ println!("\tProgress: {fetch_progress}");
+ if dry_run {
+ progress.dry_run += fetch_progress;
+ } else {
+ progress.total += fetch_progress;
+ }
+ if skip_count > 0 {
+ progress.skip_count += skip_count;
+ progress.skip_bytes += skip_bytes;
+ println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b");
+ }
+ }
+
+ Ok(())
+}
+
+fn fetch_source_packages(
+ config: &ParsedMirrorConfig,
+ source_packages_indices: HashMap<&String, SourcesFile>,
+ dry_run: bool,
+ prefix: &Path,
+ progress: &mut MirrorProgress,
+) -> Result<(), Error> {
+ let skipped_package_globs = convert_to_globset(config)?;
+
+ for (basename, references) in source_packages_indices {
+ let total_source_packages = references.source_packages.len();
+ if total_source_packages == 0 {
+ println!("\n{basename} - no files, skipping.");
+ continue;
+ } else {
+ println!("\n{basename} - {total_source_packages} total source package(s)");
+ }
+
+ let mut fetch_progress = Progress::new();
+ let mut skip_count = 0usize;
+ let mut skip_bytes = 0usize;
+ for package in references.source_packages {
+ if let Some(ref sections) = &config.skip.skip_sections {
+ if sections
+ .iter()
+ .any(|section| package.section.as_ref() == Some(section))
+ {
+ println!(
+ "\tskipping {} - {}b (section '{}')",
+ package.package,
+ package.size(),
+ package.section.as_ref().unwrap(),
+ );
+ skip_count += 1;
+ skip_bytes += package.size();
+ continue;
+ }
+ }
+ if let Some(skipped_package_globs) = &skipped_package_globs {
+ let matches = skipped_package_globs.matches(&package.package);
+ if !matches.is_empty() {
+ // safety, skipped_package_globs is set based on this
+ let globs = config.skip.skip_packages.as_ref().unwrap();
+ let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
+ println!(
+ "\tskipping {} - {}b (package glob(s): {})",
+ package.package,
+ package.size(),
+ matches.join(", ")
+ );
+ skip_count += 1;
+ skip_bytes += package.size();
+ continue;
+ }
+ }
+
+ for file_reference in package.files.values() {
+ let path = format!("{}/{}", package.directory, file_reference.file);
+ let url = get_repo_url(&config.repository, &path);
+
+ if dry_run {
+ if config.pool.contains(&file_reference.checksums) {
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: 0,
+ });
+ } else {
+ println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size);
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: file_reference.size,
+ });
+ }
+ } else {
+ let mut full_path = PathBuf::from(prefix);
+ full_path.push(&path);
+
+ match fetch_plain_file(
+ config,
+ &url,
+ &full_path,
+ file_reference.size,
+ &file_reference.checksums,
+ false,
+ dry_run,
+ ) {
+ Ok(res) => fetch_progress.update(&res),
+ Err(err) if config.ignore_errors => {
+ let msg = format!(
+ "{}: failed to fetch package '{}' - {}",
+ basename, file_reference.file, err,
+ );
+ eprintln!("{msg}");
+ progress.warnings.push(msg);
+ }
+ Err(err) => return Err(err),
+ }
+ }
+
+ if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 {
+ println!("\tProgress: {fetch_progress}");
+ }
+ }
+ }
+ println!("\tProgress: {fetch_progress}");
+ if dry_run {
+ progress.dry_run += fetch_progress;
+ } else {
+ progress.total += fetch_progress;
+ }
+ if skip_count > 0 {
+ progress.skip_count += skip_count;
+ progress.skip_bytes += skip_bytes;
+ println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b");
+ }
+ }
+
+ Ok(())
+}
+
/// Create a new snapshot of the remote repository, fetching and storing files as needed.
///
/// Operates in three phases:
@@ -518,8 +771,13 @@ pub fn create_snapshot(
let prefix = format!("{snapshot}.tmp");
let prefix = Path::new(&prefix);
- let mut total_progress = Progress::new();
- let mut warnings = Vec::new();
+ let mut progress = MirrorProgress {
+ warnings: Vec::new(),
+ skip_count: 0,
+ skip_bytes: 0,
+ dry_run: Progress::new(),
+ total: Progress::new(),
+ };
let parse_release = |res: FetchResult, name: &str| -> Result<ReleaseFile, Error> {
println!("Parsing {name}..");
@@ -534,14 +792,14 @@ pub fn create_snapshot(
// we want both on-disk for compat reasons, if both are available
let release = fetch_release(&config, prefix, true, dry_run)?
.map(|res| {
- total_progress.update(&res);
+ progress.total.update(&res);
parse_release(res, "Release")
})
.transpose()?;
let in_release = fetch_release(&config, prefix, false, dry_run)?
.map(|res| {
- total_progress.update(&res);
+ progress.total.update(&res);
parse_release(res, "InRelease")
})
.transpose()?;
@@ -671,7 +929,7 @@ pub fn create_snapshot(
reference.file_type, reference.path
);
eprintln!("{msg}");
- warnings.push(msg);
+ progress.warnings.push(msg);
failed_references.push(reference);
continue;
}
@@ -723,7 +981,7 @@ pub fn create_snapshot(
println!("Total dsc size for component: {component_dsc_size}");
packages_size += component_dsc_size;
- total_progress += fetch_progress;
+ progress.total += fetch_progress;
}
println!("Total deb size: {packages_size}");
if !failed_references.is_empty() {
@@ -733,244 +991,40 @@ pub fn create_snapshot(
}
}
- let skipped_package_globs = if let Some(skipped_packages) = &config.skip.skip_packages {
- let mut globs = GlobSetBuilder::new();
- for glob in skipped_packages {
- let glob = Glob::new(glob)?;
- globs.add(glob);
- }
- let globs = globs.build()?;
- Some(globs)
- } else {
- None
- };
-
println!("\nFetching packages..");
- let mut dry_run_progress = Progress::new();
- let mut total_skipped_count = 0usize;
- let mut total_skipped_bytes = 0usize;
- for (basename, references) in packages_indices {
- let total_files = references.files.len();
- if total_files == 0 {
- println!("\n{basename} - no files, skipping.");
- continue;
- } else {
- println!("\n{basename} - {total_files} total file(s)");
- }
-
- let mut fetch_progress = Progress::new();
- let mut skipped_count = 0usize;
- let mut skipped_bytes = 0usize;
- for package in references.files {
- if let Some(ref sections) = &config.skip.skip_sections {
- if sections.iter().any(|section| package.section == *section) {
- println!(
- "\tskipping {} - {}b (section '{}')",
- package.package, package.size, package.section
- );
- skipped_count += 1;
- skipped_bytes += package.size;
- continue;
- }
- }
- if let Some(skipped_package_globs) = &skipped_package_globs {
- let matches = skipped_package_globs.matches(&package.package);
- if !matches.is_empty() {
- // safety, skipped_package_globs is set based on this
- let globs = config.skip.skip_packages.as_ref().unwrap();
- let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
- println!(
- "\tskipping {} - {}b (package glob(s): {})",
- package.package,
- package.size,
- matches.join(", ")
- );
- skipped_count += 1;
- skipped_bytes += package.size;
- continue;
- }
- }
- let url = get_repo_url(&config.repository, &package.file);
-
- if dry_run {
- if config.pool.contains(&package.checksums) {
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: 0,
- });
- } else {
- println!("\t(dry-run) GET missing '{url}' ({}b)", package.size);
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: package.size,
- });
- }
- } else {
- let mut full_path = PathBuf::from(prefix);
- full_path.push(&package.file);
-
- match fetch_plain_file(
- &config,
- &url,
- &full_path,
- package.size,
- &package.checksums,
- false,
- dry_run,
- ) {
- Ok(res) => fetch_progress.update(&res),
- Err(err) if config.ignore_errors => {
- let msg = format!(
- "{}: failed to fetch package '{}' - {}",
- basename, package.file, err,
- );
- eprintln!("{msg}");
- warnings.push(msg);
- }
- Err(err) => return Err(err),
- }
- }
-
- if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
- println!("\tProgress: {fetch_progress}");
- }
- }
- println!("\tProgress: {fetch_progress}");
- if dry_run {
- dry_run_progress += fetch_progress;
- } else {
- total_progress += fetch_progress;
- }
- if skipped_count > 0 {
- total_skipped_count += skipped_count;
- total_skipped_bytes += skipped_bytes;
- println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b");
- }
- }
- for (basename, references) in source_packages_indices {
- let total_source_packages = references.source_packages.len();
- if total_source_packages == 0 {
- println!("\n{basename} - no files, skipping.");
- continue;
- } else {
- println!("\n{basename} - {total_source_packages} total source package(s)");
- }
+ fetch_binary_packages(&config, packages_indices, dry_run, prefix, &mut progress)?;
- let mut fetch_progress = Progress::new();
- let mut skipped_count = 0usize;
- let mut skipped_bytes = 0usize;
- for package in references.source_packages {
- if let Some(ref sections) = &config.skip.skip_sections {
- if sections
- .iter()
- .any(|section| package.section.as_ref() == Some(section))
- {
- println!(
- "\tskipping {} - {}b (section '{}')",
- package.package,
- package.size(),
- package.section.as_ref().unwrap(),
- );
- skipped_count += 1;
- skipped_bytes += package.size();
- continue;
- }
- }
- if let Some(skipped_package_globs) = &skipped_package_globs {
- let matches = skipped_package_globs.matches(&package.package);
- if !matches.is_empty() {
- // safety, skipped_package_globs is set based on this
- let globs = config.skip.skip_packages.as_ref().unwrap();
- let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
- println!(
- "\tskipping {} - {}b (package glob(s): {})",
- package.package,
- package.size(),
- matches.join(", ")
- );
- skipped_count += 1;
- skipped_bytes += package.size();
- continue;
- }
- }
-
- for file_reference in package.files.values() {
- let path = format!("{}/{}", package.directory, file_reference.file);
- let url = get_repo_url(&config.repository, &path);
-
- if dry_run {
- if config.pool.contains(&file_reference.checksums) {
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: 0,
- });
- } else {
- println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size);
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: file_reference.size,
- });
- }
- } else {
- let mut full_path = PathBuf::from(prefix);
- full_path.push(&path);
-
- match fetch_plain_file(
- &config,
- &url,
- &full_path,
- file_reference.size,
- &file_reference.checksums,
- false,
- dry_run,
- ) {
- Ok(res) => fetch_progress.update(&res),
- Err(err) if config.ignore_errors => {
- let msg = format!(
- "{}: failed to fetch package '{}' - {}",
- basename, file_reference.file, err,
- );
- eprintln!("{msg}");
- warnings.push(msg);
- }
- Err(err) => return Err(err),
- }
- }
-
- if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 {
- println!("\tProgress: {fetch_progress}");
- }
- }
- }
- println!("\tProgress: {fetch_progress}");
- if dry_run {
- dry_run_progress += fetch_progress;
- } else {
- total_progress += fetch_progress;
- }
- if skipped_count > 0 {
- total_skipped_count += skipped_count;
- total_skipped_bytes += skipped_bytes;
- println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b");
- }
- }
+ fetch_source_packages(
+ &config,
+ source_packages_indices,
+ dry_run,
+ prefix,
+ &mut progress,
+ )?;
if dry_run {
- println!("\nDry-run Stats (indices, downloaded but not persisted):\n{total_progress}");
- println!("\nDry-run stats (packages, new == missing):\n{dry_run_progress}");
+ println!(
+ "\nDry-run Stats (indices, downloaded but not persisted):\n{}",
+ progress.total
+ );
+ println!(
+ "\nDry-run stats (packages, new == missing):\n{}",
+ progress.dry_run
+ );
} else {
- println!("\nStats: {total_progress}");
+ println!("\nStats: {}", progress.total);
}
if total_count > 0 {
println!(
- "Skipped downloading {total_skipped_count} packages totalling {total_skipped_bytes}b"
+ "Skipped downloading {} packages totalling {}b",
+ progress.skip_count, progress.skip_bytes,
);
}
- if !warnings.is_empty() {
+ if !progress.warnings.is_empty() {
eprintln!("Warnings:");
- for msg in warnings {
+ for msg in progress.warnings {
eprintln!("- {msg}");
}
}
--
2.30.2
next prev parent reply other threads:[~2022-10-18 9:21 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-10-18 9:20 [pve-devel] [PATCH-SERIES 0/6] proxmox-offline-mirror filtering & deb-src support Fabian Grünbichler
2022-10-18 9:20 ` [pve-devel] [PATCH proxmox-apt 1/2] packages file: add section field Fabian Grünbichler
2022-10-18 9:20 ` [pve-devel] [PATCH proxmox-apt 2/2] deb822: source index support Fabian Grünbichler
2022-10-18 9:20 ` [pve-devel] [PATCH proxmox-offline-mirror 1/4] mirror: add exclusion of packages/sections Fabian Grünbichler
2022-10-18 9:20 ` [pve-devel] [PATCH proxmox-offline-mirror 2/4] mirror: implement source packages mirroring Fabian Grünbichler
2022-10-18 9:20 ` [pve-devel] [PATCH proxmox-offline-mirror 3/4] fix #4264: only require either Release or InRelease Fabian Grünbichler
2022-10-18 9:20 ` Fabian Grünbichler [this message]
2022-10-20 12:49 ` [pve-devel] applied-series: [PATCH-SERIES 0/6] proxmox-offline-mirror filtering & deb-src support Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20221018092040.860121-7-f.gruenbichler@proxmox.com \
--to=f.gruenbichler@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.