From: Wolfgang Bumiller <w.bumiller@proxmox.com>
To: Dominik Csapak <d.csapak@proxmox.com>
Cc: pbs-devel@lists.proxmox.com
Subject: Re: [pbs-devel] [PATCH proxmox-backup 6/6] api: admin: datastore: implement streaming content api call
Date: Tue, 7 Oct 2025 14:51:09 +0200 [thread overview]
Message-ID: <fe7wgj7jjubyyzxorsefl2xpzgg4iyuem5t4n3elo5dz6papjr@z5wftp2afyyf> (raw)
In-Reply-To: <20251003085045.1346864-8-d.csapak@proxmox.com>
On Fri, Oct 03, 2025 at 10:50:39AM +0200, Dominik Csapak wrote:
> this is a new api call that utilizes `async-stream` together with
> `proxmox_router::Stream` to provide a streaming interface to querying
> the datastore content.
>
> This can be done when a client reuqests this api call with the
> `application/json-seq` Accept header.
>
> In contrast to the existing api calls, this one
> * returns all types of content items (namespaces, groups, snapshots; can
> be filtered with a parameter)
> * iterates over them recursively (with the range that is given with the
> parameter)
>
> The api call returns the data in the following order:
> * first all visible namespaces
> * then for each ns in order
> * each group
> * each snapshot
>
> This is done so that we can have a good way of building a tree view in
> the ui.
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> This should be thouroughly checked for permission checks. I did it to
> the best of my ability, but of course some bug/issue could have crept in.
>
> interesting side node, in my rather large setup with ~600 groups and ~1000
> snapshosts per group, streaming this is faster than using the current
> `snapshot` api (by a lot):
> * `snapshot` api -> ~3 min
> * `content` api with streaming -> ~2:11 min
> * `content` api without streaming -> ~3 min
>
> It seems that either collecting such a 'large' api response (~200MiB)
> is expensive. My guesses what happens here are either:
> * frequent (re)allocation of the resulting vec
> * or serde's serializing code
>
> but the cost seems still pretty high for that.
> LMK if i should further investigate this.
>
> Cargo.toml | 2 +
> src/api2/admin/datastore.rs | 201 +++++++++++++++++++++++++++++++-----
> 2 files changed, 176 insertions(+), 27 deletions(-)
>
> diff --git a/Cargo.toml b/Cargo.toml
> index b3f55b4db..21eb293ba 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -114,6 +114,7 @@ pbs-tools = { path = "pbs-tools" }
>
> # regular crates
> anyhow = "1.0"
> +async-stream = "0.3"
> async-trait = "0.1.56"
> apt-pkg-native = "0.3.2"
> bitflags = "2.4"
> @@ -168,6 +169,7 @@ zstd-safe = "7"
> [dependencies]
> anyhow.workspace = true
> async-trait.workspace = true
> +async-stream.workspace = true
> bytes.workspace = true
> cidr.workspace = true
> const_format.workspace = true
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 2252dcfa4..bf94f6400 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -23,7 +23,7 @@ use proxmox_compression::zstd::ZstdEncoder;
> use proxmox_log::LogContext;
> use proxmox_router::{
> http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
> - Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
> + Record, Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
> };
> use proxmox_rrd_api_types::{RrdMode, RrdTimeframe};
> use proxmox_schema::*;
> @@ -39,15 +39,16 @@ use pxar::EntryKind;
>
> use pbs_api_types::{
> print_ns_and_snapshot, print_store_and_ns, ArchiveType, Authid, BackupArchiveName,
> - BackupContent, BackupGroupDeleteStats, BackupNamespace, BackupType, Counts, CryptMode,
> - DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
> - GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
> - MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SyncJobConfig,
> - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
> - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
> - IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
> - PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> - PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
> + BackupContent, BackupGroupDeleteStats, BackupNamespace, BackupType, ContentListItem,
> + ContentType, Counts, CryptMode, DataStoreConfig, DataStoreListItem, DataStoreMountStatus,
> + DataStoreStatus, GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions,
> + MaintenanceMode, MaintenanceType, NamespaceListItem, Operation, PruneJobOptions,
> + SnapshotListItem, SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA,
> + BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME,
> + CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH,
> + NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY,
> + PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID,
> + UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
And more reasons why [this][1] *really* needs at least *partial*
stabilization so we can switch to `Item` level imports. 🙄
[1] https://github.com/rust-lang/rustfmt/issues/4991#issuecomment-3369792412
> };
> use pbs_client::pxar::{create_tar, create_zip};
> use pbs_config::CachedUserInfo;
> @@ -70,7 +71,10 @@ use proxmox_rest_server::{formatter, worker_is_active, WorkerTask};
>
> use crate::api2::backup::optional_ns_param;
> use crate::api2::node::rrd::create_value_from_rrd;
> -use crate::backup::{check_ns_privs_full, ListAccessibleBackupGroups, VerifyWorker, NS_PRIVS_OK};
> +use crate::backup::{
> + can_access_any_namespace_in_range, check_ns_privs, check_ns_privs_full,
> + ListAccessibleBackupGroups, VerifyWorker, NS_PRIVS_OK,
> +};
> use crate::server::jobstate::{compute_schedule_status, Job, JobState};
> use crate::tools::{backup_info_to_snapshot_list_item, get_all_snapshot_files, read_backup_index};
>
> @@ -396,7 +400,7 @@ pub async fn delete_snapshot(
> }
>
> #[api(
> - serializing: true,
> + stream: true,
> input: {
> properties: {
> store: { schema: DATASTORE_SCHEMA },
> @@ -404,40 +408,137 @@ pub async fn delete_snapshot(
> type: BackupNamespace,
> optional: true,
> },
> - "backup-type": {
> + "max-depth": {
> + schema: NS_MAX_DEPTH_SCHEMA,
> optional: true,
> - type: BackupType,
> },
> - "backup-id": {
> + "content-type": {
> optional: true,
> - schema: BACKUP_ID_SCHEMA,
> + type: ContentType,
> },
> },
> },
> - returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOTS_RETURN_TYPE,
> access: {
> permission: &Permission::Anybody,
> description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
> or DATASTORE_BACKUP and being the owner of the group",
> },
> )]
> -/// List backup snapshots.
> -pub async fn list_snapshots(
> +/// List datastore content, recursively through all namespaces.
> +pub async fn list_content(
> store: String,
> ns: Option<BackupNamespace>,
> - backup_type: Option<BackupType>,
> - backup_id: Option<String>,
> + max_depth: Option<usize>,
> + content_type: Option<ContentType>,
> _param: Value,
> _info: &ApiMethod,
> rpcenv: &mut dyn RpcEnvironment,
> -) -> Result<Vec<SnapshotListItem>, Error> {
> +) -> Result<proxmox_router::Stream, Error> {
> + let (sender, mut receiver) = tokio::sync::mpsc::channel(128);
> +
> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> + let user_info = CachedUserInfo::new()?;
>
> - tokio::task::spawn_blocking(move || unsafe {
> - list_snapshots_blocking(store, ns, backup_type, backup_id, auth_id)
> - })
> - .await
> - .map_err(|err| format_err!("failed to await blocking task: {err}"))?
> + let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
> + if !can_access_any_namespace_in_range(
> + datastore.clone(),
> + &auth_id,
> + &user_info,
> + ns.clone(),
> + max_depth,
> + ) {
> + proxmox_router::http_bail!(FORBIDDEN, "permission check failed");
> + }
> +
> + let ns = ns.unwrap_or_default();
> +
> + let (list_ns, list_group, list_snapshots) = match content_type {
> + Some(ContentType::Namespace) => (true, false, false),
> + Some(ContentType::Group) => (false, true, false),
> + Some(ContentType::Snapshot) => (false, false, true),
> + None => (true, true, true),
> + };
> +
> + tokio::spawn(async move {
As Thomas already pointed out, you don't need to wrap the
`spawn_blocking` future as what amounts to an `async { fut.await }`.
But also, see the [tokio docs][2] on `JoinHandle`:
A JoinHandle detaches the associated task when it is dropped
so you can skip the entire above line.
[2]: https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html
> + tokio::task::spawn_blocking(move || {
> + if list_ns {
> + for ns in datastore.recursive_iter_backup_ns_ok(ns.clone(), max_depth)? {
> + match check_ns_privs(&store, &ns, &auth_id, NS_PRIVS_OK) {
> + Ok(_) => sender.blocking_send(Record::new(ContentListItem::from(
> + NamespaceListItem { ns, comment: None },
> + )))?,
> + Err(_) => continue,
Note that in the current streaming architecture the records can also be
error values, so you may want to consider passing errors through.
There are various helpers: `Record::error` (for std errors),
`Record::error_msg`, or `Record::error_value` if you need structured
errors.
> + }
> + }
> + }
> +
> + if !list_group && !list_snapshots {
> + return Ok(());
> + }
> +
> + for ns in datastore.recursive_iter_backup_ns_ok(ns, max_depth)? {
> + let list_all = match check_ns_privs_full(
> + &store,
> + &ns,
> + &auth_id,
> + PRIV_DATASTORE_AUDIT,
> + PRIV_DATASTORE_BACKUP,
> + ) {
> + Ok(requires_owner) => !requires_owner,
> + Err(_) => continue,
> + };
> + if list_group {
> + for group in datastore.iter_backup_groups(ns.clone())? {
> + let group = group?;
^ The above 2 `?` may also be cases where it would be better to send a
`Record::error`?
Unless we want those to cancel the entire rest of the stream?
> + let group = backup_group_to_group_list_item(
> + datastore.clone(),
> + group,
> + &ns,
> + &auth_id,
> + list_all,
> + );
> +
> + if let Some(group) = group {
> + sender.blocking_send(Record::new(ContentListItem::from((
> + ns.clone(),
> + group,
> + ))))?;
> + }
> + }
> + }
> +
> + if !list_snapshots {
> + continue;
> + }
> +
> + for group in datastore.iter_backup_groups(ns.clone())? {
> + let group = group?;
^ Same here
> + let owner = match get_group_owner(&store, &ns, &group) {
> + Some(auth_id) => auth_id,
> + None => continue,
> + };
> + for snapshot in group.iter_snapshots()? {
> + let snapshot = BackupInfo::new(snapshot?)?;
> + let snapshot = backup_info_to_snapshot_list_item(&snapshot, &owner);
> + sender.blocking_send(Record::new(ContentListItem::from((
> + ns.clone(),
> + snapshot,
> + ))))?;
> + }
> + }
> + }
> + Ok::<_, Error>(())
> + })
> + .await??;
> +
> + Ok::<_, Error>(())
> + });
> + Ok(async_stream::try_stream! {
> + while let Some(elem) = receiver.recv().await {
> + yield elem;
> + }
> + }
> + .into())
You should be able to instead return `ReceiverStream(receiver).into()`,
instead of this entire manual loop. ;-)
> }
>
> fn get_group_owner(
> @@ -523,6 +624,51 @@ unsafe fn list_snapshots_blocking(
> })
> }
>
> +#[api(
> + serializing: true,
> + input: {
> + properties: {
> + store: { schema: DATASTORE_SCHEMA },
> + ns: {
> + type: BackupNamespace,
> + optional: true,
> + },
> + "backup-type": {
> + optional: true,
> + type: BackupType,
> + },
> + "backup-id": {
> + optional: true,
> + schema: BACKUP_ID_SCHEMA,
> + },
> + },
> + },
> + returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOTS_RETURN_TYPE,
> + access: {
> + permission: &Permission::Anybody,
> + description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
> + or DATASTORE_BACKUP and being the owner of the group",
> + },
> +)]
> +/// List backup snapshots.
> +pub async fn list_snapshots(
> + store: String,
> + ns: Option<BackupNamespace>,
> + backup_type: Option<BackupType>,
> + backup_id: Option<String>,
> + _param: Value,
> + _info: &ApiMethod,
> + rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<SnapshotListItem>, Error> {
> + let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +
> + tokio::task::spawn_blocking(move || unsafe {
> + list_snapshots_blocking(store, ns, backup_type, backup_id, auth_id)
> + })
> + .await
> + .map_err(|err| format_err!("failed to await blocking task: {err}"))?
> +}
> +
> async fn get_snapshots_count(
> store: &Arc<DataStore>,
> owner: Option<&Authid>,
> @@ -2773,6 +2919,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
> "change-owner",
> &Router::new().post(&API_METHOD_SET_BACKUP_OWNER),
> ),
> + ("content", &Router::new().get(&API_METHOD_LIST_CONTENT)),
> (
> "download",
> &Router::new().download(&API_METHOD_DOWNLOAD_FILE),
> --
> 2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-10-07 12:51 UTC|newest]
Thread overview: 21+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-10-03 8:50 [pbs-devel] [PATCH proxmox{, -backup} 0/7] introduce " Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: add api types for " Dominik Csapak
2025-10-07 8:59 ` Wolfgang Bumiller
2025-10-08 6:41 ` Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 1/6] backup: hierarchy: add new can_access_any_namespace_in_range helper Dominik Csapak
2025-10-03 9:52 ` Thomas Lamprecht
2025-10-03 10:10 ` Dominik Csapak
2025-10-03 10:21 ` Thomas Lamprecht
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 2/6] backup: hierarchy: reuse 'NS_PRIVS_OK' for namespace helper Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 3/6] api: admin: datastore: refactor BackupGroup to GroupListItem conversion Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 4/6] api: admin: datastore: factor out 'get_group_owner' Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 5/6] api: admin: datastore: optimize `groups` api call Dominik Csapak
2025-10-03 10:18 ` Thomas Lamprecht
2025-10-03 10:51 ` Dominik Csapak
2025-10-03 12:37 ` Thomas Lamprecht
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: admin: datastore: implement streaming content " Dominik Csapak
2025-10-03 11:55 ` Thomas Lamprecht
2025-10-07 12:51 ` Wolfgang Bumiller [this message]
2025-10-07 14:22 ` Thomas Lamprecht
2025-10-07 14:31 ` Wolfgang Bumiller
2025-10-07 15:05 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=fe7wgj7jjubyyzxorsefl2xpzgg4iyuem5t4n3elo5dz6papjr@z5wftp2afyyf \
--to=w.bumiller@proxmox.com \
--cc=d.csapak@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.