public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Wolfgang Bumiller <w.bumiller@proxmox.com>
To: Dominik Csapak <d.csapak@proxmox.com>
Cc: pbs-devel@lists.proxmox.com
Subject: Re: [pbs-devel] [PATCH proxmox-backup 6/6] api: admin: datastore: implement streaming content api call
Date: Tue, 7 Oct 2025 14:51:09 +0200	[thread overview]
Message-ID: <fe7wgj7jjubyyzxorsefl2xpzgg4iyuem5t4n3elo5dz6papjr@z5wftp2afyyf> (raw)
In-Reply-To: <20251003085045.1346864-8-d.csapak@proxmox.com>

On Fri, Oct 03, 2025 at 10:50:39AM +0200, Dominik Csapak wrote:
> this is a new api call that utilizes `async-stream` together with
> `proxmox_router::Stream` to provide a streaming interface to querying
> the datastore content.
> 
> This can be done when a client reuqests this api call with the
> `application/json-seq` Accept header.
> 
> In contrast to the existing api calls, this one
> * returns all types of content items (namespaces, groups, snapshots; can
>   be filtered with a parameter)
> * iterates over them recursively (with the range that is given with the
>   parameter)
> 
> The api call returns the data in the following order:
> * first all visible namespaces
> * then for each ns in order
>   * each group
>   * each snapshot
> 
> This is done so that we can have a good way of building a tree view in
> the ui.
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> This should be thouroughly checked for permission checks. I did it to
> the best of my ability, but of course some bug/issue could have crept in.
> 
> interesting side node, in my rather large setup with ~600 groups and ~1000
> snapshosts per group, streaming this is faster than using the current
> `snapshot` api (by a lot):
> * `snapshot` api -> ~3 min
> * `content` api with streaming -> ~2:11 min
> * `content` api without streaming -> ~3 min
> 
> It seems that either collecting such a 'large' api response (~200MiB)
> is expensive. My guesses what happens here are either:
> * frequent (re)allocation of the resulting vec
> * or serde's serializing code
> 
> but the cost seems still pretty high for that.
> LMK if i should further investigate this.
> 
>  Cargo.toml                  |   2 +
>  src/api2/admin/datastore.rs | 201 +++++++++++++++++++++++++++++++-----
>  2 files changed, 176 insertions(+), 27 deletions(-)
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index b3f55b4db..21eb293ba 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -114,6 +114,7 @@ pbs-tools = { path = "pbs-tools" }
>  
>  # regular crates
>  anyhow = "1.0"
> +async-stream = "0.3"
>  async-trait = "0.1.56"
>  apt-pkg-native = "0.3.2"
>  bitflags = "2.4"
> @@ -168,6 +169,7 @@ zstd-safe = "7"
>  [dependencies]
>  anyhow.workspace = true
>  async-trait.workspace = true
> +async-stream.workspace = true
>  bytes.workspace = true
>  cidr.workspace = true
>  const_format.workspace = true
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 2252dcfa4..bf94f6400 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -23,7 +23,7 @@ use proxmox_compression::zstd::ZstdEncoder;
>  use proxmox_log::LogContext;
>  use proxmox_router::{
>      http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
> -    Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
> +    Record, Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
>  };
>  use proxmox_rrd_api_types::{RrdMode, RrdTimeframe};
>  use proxmox_schema::*;
> @@ -39,15 +39,16 @@ use pxar::EntryKind;
>  
>  use pbs_api_types::{
>      print_ns_and_snapshot, print_store_and_ns, ArchiveType, Authid, BackupArchiveName,
> -    BackupContent, BackupGroupDeleteStats, BackupNamespace, BackupType, Counts, CryptMode,
> -    DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
> -    GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
> -    MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SyncJobConfig,
> -    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
> -    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
> -    IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
> -    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> -    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
> +    BackupContent, BackupGroupDeleteStats, BackupNamespace, BackupType, ContentListItem,
> +    ContentType, Counts, CryptMode, DataStoreConfig, DataStoreListItem, DataStoreMountStatus,
> +    DataStoreStatus, GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions,
> +    MaintenanceMode, MaintenanceType, NamespaceListItem, Operation, PruneJobOptions,
> +    SnapshotListItem, SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA,
> +    BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME,
> +    CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH,
> +    NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY,
> +    PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID,
> +    UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,

And more reasons why [this][1] *really* needs at least *partial*
stabilization so we can switch to `Item` level imports. 🙄

[1] https://github.com/rust-lang/rustfmt/issues/4991#issuecomment-3369792412

>  };
>  use pbs_client::pxar::{create_tar, create_zip};
>  use pbs_config::CachedUserInfo;
> @@ -70,7 +71,10 @@ use proxmox_rest_server::{formatter, worker_is_active, WorkerTask};
>  
>  use crate::api2::backup::optional_ns_param;
>  use crate::api2::node::rrd::create_value_from_rrd;
> -use crate::backup::{check_ns_privs_full, ListAccessibleBackupGroups, VerifyWorker, NS_PRIVS_OK};
> +use crate::backup::{
> +    can_access_any_namespace_in_range, check_ns_privs, check_ns_privs_full,
> +    ListAccessibleBackupGroups, VerifyWorker, NS_PRIVS_OK,
> +};
>  use crate::server::jobstate::{compute_schedule_status, Job, JobState};
>  use crate::tools::{backup_info_to_snapshot_list_item, get_all_snapshot_files, read_backup_index};
>  
> @@ -396,7 +400,7 @@ pub async fn delete_snapshot(
>  }
>  
>  #[api(
> -    serializing: true,
> +    stream: true,
>      input: {
>          properties: {
>              store: { schema: DATASTORE_SCHEMA },
> @@ -404,40 +408,137 @@ pub async fn delete_snapshot(
>                  type: BackupNamespace,
>                  optional: true,
>              },
> -            "backup-type": {
> +            "max-depth": {
> +                schema: NS_MAX_DEPTH_SCHEMA,
>                  optional: true,
> -                type: BackupType,
>              },
> -            "backup-id": {
> +            "content-type": {
>                  optional: true,
> -                schema: BACKUP_ID_SCHEMA,
> +                type: ContentType,
>              },
>          },
>      },
> -    returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOTS_RETURN_TYPE,
>      access: {
>          permission: &Permission::Anybody,
>          description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
>              or DATASTORE_BACKUP and being the owner of the group",
>      },
>  )]
> -/// List backup snapshots.
> -pub async fn list_snapshots(
> +/// List datastore content, recursively through all namespaces.
> +pub async fn list_content(
>      store: String,
>      ns: Option<BackupNamespace>,
> -    backup_type: Option<BackupType>,
> -    backup_id: Option<String>,
> +    max_depth: Option<usize>,
> +    content_type: Option<ContentType>,
>      _param: Value,
>      _info: &ApiMethod,
>      rpcenv: &mut dyn RpcEnvironment,
> -) -> Result<Vec<SnapshotListItem>, Error> {
> +) -> Result<proxmox_router::Stream, Error> {
> +    let (sender, mut receiver) = tokio::sync::mpsc::channel(128);
> +
>      let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let user_info = CachedUserInfo::new()?;
>  
> -    tokio::task::spawn_blocking(move || unsafe {
> -        list_snapshots_blocking(store, ns, backup_type, backup_id, auth_id)
> -    })
> -    .await
> -    .map_err(|err| format_err!("failed to await blocking task: {err}"))?
> +    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
> +    if !can_access_any_namespace_in_range(
> +        datastore.clone(),
> +        &auth_id,
> +        &user_info,
> +        ns.clone(),
> +        max_depth,
> +    ) {
> +        proxmox_router::http_bail!(FORBIDDEN, "permission check failed");
> +    }
> +
> +    let ns = ns.unwrap_or_default();
> +
> +    let (list_ns, list_group, list_snapshots) = match content_type {
> +        Some(ContentType::Namespace) => (true, false, false),
> +        Some(ContentType::Group) => (false, true, false),
> +        Some(ContentType::Snapshot) => (false, false, true),
> +        None => (true, true, true),
> +    };
> +
> +    tokio::spawn(async move {

As Thomas already pointed out, you don't need to wrap the
`spawn_blocking` future as what amounts to an `async { fut.await }`.

But also, see the [tokio docs][2] on `JoinHandle`:

    A JoinHandle detaches the associated task when it is dropped

so you can skip the entire above line.

[2]: https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html

> +        tokio::task::spawn_blocking(move || {
> +            if list_ns {
> +                for ns in datastore.recursive_iter_backup_ns_ok(ns.clone(), max_depth)? {
> +                    match check_ns_privs(&store, &ns, &auth_id, NS_PRIVS_OK) {
> +                        Ok(_) => sender.blocking_send(Record::new(ContentListItem::from(
> +                            NamespaceListItem { ns, comment: None },
> +                        )))?,
> +                        Err(_) => continue,

Note that in the current streaming architecture the records can also be
error values, so you may want to consider passing errors through.
There are various helpers: `Record::error` (for std errors),
`Record::error_msg`, or `Record::error_value` if you need structured
errors.

> +                    }
> +                }
> +            }
> +
> +            if !list_group && !list_snapshots {
> +                return Ok(());
> +            }
> +
> +            for ns in datastore.recursive_iter_backup_ns_ok(ns, max_depth)? {
> +                let list_all = match check_ns_privs_full(
> +                    &store,
> +                    &ns,
> +                    &auth_id,
> +                    PRIV_DATASTORE_AUDIT,
> +                    PRIV_DATASTORE_BACKUP,
> +                ) {
> +                    Ok(requires_owner) => !requires_owner,
> +                    Err(_) => continue,
> +                };
> +                if list_group {
> +                    for group in datastore.iter_backup_groups(ns.clone())? {
> +                        let group = group?;

^ The above 2 `?` may also be cases where it would be better to send a
`Record::error`?

Unless we want those to cancel the entire rest of the stream?

> +                        let group = backup_group_to_group_list_item(
> +                            datastore.clone(),
> +                            group,
> +                            &ns,
> +                            &auth_id,
> +                            list_all,
> +                        );
> +
> +                        if let Some(group) = group {
> +                            sender.blocking_send(Record::new(ContentListItem::from((
> +                                ns.clone(),
> +                                group,
> +                            ))))?;
> +                        }
> +                    }
> +                }
> +
> +                if !list_snapshots {
> +                    continue;
> +                }
> +
> +                for group in datastore.iter_backup_groups(ns.clone())? {
> +                    let group = group?;

^ Same here

> +                    let owner = match get_group_owner(&store, &ns, &group) {
> +                        Some(auth_id) => auth_id,
> +                        None => continue,
> +                    };
> +                    for snapshot in group.iter_snapshots()? {
> +                        let snapshot = BackupInfo::new(snapshot?)?;
> +                        let snapshot = backup_info_to_snapshot_list_item(&snapshot, &owner);
> +                        sender.blocking_send(Record::new(ContentListItem::from((
> +                            ns.clone(),
> +                            snapshot,
> +                        ))))?;
> +                    }
> +                }
> +            }
> +            Ok::<_, Error>(())
> +        })
> +        .await??;
> +
> +        Ok::<_, Error>(())
> +    });
> +    Ok(async_stream::try_stream! {
> +        while let Some(elem) = receiver.recv().await {
> +            yield elem;
> +        }
> +    }
> +    .into())

You should be able to instead return `ReceiverStream(receiver).into()`,
instead of this entire manual loop. ;-)

>  }
>  
>  fn get_group_owner(
> @@ -523,6 +624,51 @@ unsafe fn list_snapshots_blocking(
>      })
>  }
>  
> +#[api(
> +    serializing: true,
> +    input: {
> +        properties: {
> +            store: { schema: DATASTORE_SCHEMA },
> +            ns: {
> +                type: BackupNamespace,
> +                optional: true,
> +            },
> +            "backup-type": {
> +                optional: true,
> +                type: BackupType,
> +            },
> +            "backup-id": {
> +                optional: true,
> +                schema: BACKUP_ID_SCHEMA,
> +            },
> +        },
> +    },
> +    returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOTS_RETURN_TYPE,
> +    access: {
> +        permission: &Permission::Anybody,
> +        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
> +            or DATASTORE_BACKUP and being the owner of the group",
> +    },
> +)]
> +/// List backup snapshots.
> +pub async fn list_snapshots(
> +    store: String,
> +    ns: Option<BackupNamespace>,
> +    backup_type: Option<BackupType>,
> +    backup_id: Option<String>,
> +    _param: Value,
> +    _info: &ApiMethod,
> +    rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<SnapshotListItem>, Error> {
> +    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +
> +    tokio::task::spawn_blocking(move || unsafe {
> +        list_snapshots_blocking(store, ns, backup_type, backup_id, auth_id)
> +    })
> +    .await
> +    .map_err(|err| format_err!("failed to await blocking task: {err}"))?
> +}
> +
>  async fn get_snapshots_count(
>      store: &Arc<DataStore>,
>      owner: Option<&Authid>,
> @@ -2773,6 +2919,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
>          "change-owner",
>          &Router::new().post(&API_METHOD_SET_BACKUP_OWNER),
>      ),
> +    ("content", &Router::new().get(&API_METHOD_LIST_CONTENT)),
>      (
>          "download",
>          &Router::new().download(&API_METHOD_DOWNLOAD_FILE),
> -- 
> 2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

  parent reply	other threads:[~2025-10-07 12:51 UTC|newest]

Thread overview: 21+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-10-03  8:50 [pbs-devel] [PATCH proxmox{, -backup} 0/7] introduce " Dominik Csapak
2025-10-03  8:50 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: add api types for " Dominik Csapak
2025-10-07  8:59   ` Wolfgang Bumiller
2025-10-08  6:41     ` Dominik Csapak
2025-10-03  8:50 ` [pbs-devel] [PATCH proxmox-backup 1/6] backup: hierarchy: add new can_access_any_namespace_in_range helper Dominik Csapak
2025-10-03  9:52   ` Thomas Lamprecht
2025-10-03 10:10     ` Dominik Csapak
2025-10-03 10:21       ` Thomas Lamprecht
2025-10-03  8:50 ` [pbs-devel] [PATCH proxmox-backup 2/6] backup: hierarchy: reuse 'NS_PRIVS_OK' for namespace helper Dominik Csapak
2025-10-03  8:50 ` [pbs-devel] [PATCH proxmox-backup 3/6] api: admin: datastore: refactor BackupGroup to GroupListItem conversion Dominik Csapak
2025-10-03  8:50 ` [pbs-devel] [PATCH proxmox-backup 4/6] api: admin: datastore: factor out 'get_group_owner' Dominik Csapak
2025-10-03  8:50 ` [pbs-devel] [PATCH proxmox-backup 5/6] api: admin: datastore: optimize `groups` api call Dominik Csapak
2025-10-03 10:18   ` Thomas Lamprecht
2025-10-03 10:51     ` Dominik Csapak
2025-10-03 12:37       ` Thomas Lamprecht
2025-10-03  8:50 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: admin: datastore: implement streaming content " Dominik Csapak
2025-10-03 11:55   ` Thomas Lamprecht
2025-10-07 12:51   ` Wolfgang Bumiller [this message]
2025-10-07 14:22     ` Thomas Lamprecht
2025-10-07 14:31       ` Wolfgang Bumiller
2025-10-07 15:05         ` Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=fe7wgj7jjubyyzxorsefl2xpzgg4iyuem5t4n3elo5dz6papjr@z5wftp2afyyf \
    --to=w.bumiller@proxmox.com \
    --cc=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal