From: Wolfgang Bumiller <w.bumiller@proxmox.com>
To: Dominik Csapak <d.csapak@proxmox.com>
Cc: pbs-devel@lists.proxmox.com
Subject: Re: [pbs-devel] [PATCH proxmox-backup 6/6] api: admin: datastore: implement streaming content api call
Date: Tue, 7 Oct 2025 14:51:09 +0200 [thread overview]
Message-ID: <fe7wgj7jjubyyzxorsefl2xpzgg4iyuem5t4n3elo5dz6papjr@z5wftp2afyyf> (raw)
In-Reply-To: <20251003085045.1346864-8-d.csapak@proxmox.com>
On Fri, Oct 03, 2025 at 10:50:39AM +0200, Dominik Csapak wrote:
> this is a new api call that utilizes `async-stream` together with
> `proxmox_router::Stream` to provide a streaming interface to querying
> the datastore content.
>
> This can be done when a client reuqests this api call with the
> `application/json-seq` Accept header.
>
> In contrast to the existing api calls, this one
> * returns all types of content items (namespaces, groups, snapshots; can
> be filtered with a parameter)
> * iterates over them recursively (with the range that is given with the
> parameter)
>
> The api call returns the data in the following order:
> * first all visible namespaces
> * then for each ns in order
> * each group
> * each snapshot
>
> This is done so that we can have a good way of building a tree view in
> the ui.
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> This should be thouroughly checked for permission checks. I did it to
> the best of my ability, but of course some bug/issue could have crept in.
>
> interesting side node, in my rather large setup with ~600 groups and ~1000
> snapshosts per group, streaming this is faster than using the current
> `snapshot` api (by a lot):
> * `snapshot` api -> ~3 min
> * `content` api with streaming -> ~2:11 min
> * `content` api without streaming -> ~3 min
>
> It seems that either collecting such a 'large' api response (~200MiB)
> is expensive. My guesses what happens here are either:
> * frequent (re)allocation of the resulting vec
> * or serde's serializing code
>
> but the cost seems still pretty high for that.
> LMK if i should further investigate this.
>
> Cargo.toml | 2 +
> src/api2/admin/datastore.rs | 201 +++++++++++++++++++++++++++++++-----
> 2 files changed, 176 insertions(+), 27 deletions(-)
>
> diff --git a/Cargo.toml b/Cargo.toml
> index b3f55b4db..21eb293ba 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -114,6 +114,7 @@ pbs-tools = { path = "pbs-tools" }
>
> # regular crates
> anyhow = "1.0"
> +async-stream = "0.3"
> async-trait = "0.1.56"
> apt-pkg-native = "0.3.2"
> bitflags = "2.4"
> @@ -168,6 +169,7 @@ zstd-safe = "7"
> [dependencies]
> anyhow.workspace = true
> async-trait.workspace = true
> +async-stream.workspace = true
> bytes.workspace = true
> cidr.workspace = true
> const_format.workspace = true
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 2252dcfa4..bf94f6400 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -23,7 +23,7 @@ use proxmox_compression::zstd::ZstdEncoder;
> use proxmox_log::LogContext;
> use proxmox_router::{
> http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
> - Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
> + Record, Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
> };
> use proxmox_rrd_api_types::{RrdMode, RrdTimeframe};
> use proxmox_schema::*;
> @@ -39,15 +39,16 @@ use pxar::EntryKind;
>
> use pbs_api_types::{
> print_ns_and_snapshot, print_store_and_ns, ArchiveType, Authid, BackupArchiveName,
> - BackupContent, BackupGroupDeleteStats, BackupNamespace, BackupType, Counts, CryptMode,
> - DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
> - GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
> - MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SyncJobConfig,
> - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
> - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
> - IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
> - PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> - PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
> + BackupContent, BackupGroupDeleteStats, BackupNamespace, BackupType, ContentListItem,
> + ContentType, Counts, CryptMode, DataStoreConfig, DataStoreListItem, DataStoreMountStatus,
> + DataStoreStatus, GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions,
> + MaintenanceMode, MaintenanceType, NamespaceListItem, Operation, PruneJobOptions,
> + SnapshotListItem, SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA,
> + BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME,
> + CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH,
> + NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY,
> + PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID,
> + UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
And more reasons why [this][1] *really* needs at least *partial*
stabilization so we can switch to `Item` level imports. 🙄
[1] https://github.com/rust-lang/rustfmt/issues/4991#issuecomment-3369792412
> };
> use pbs_client::pxar::{create_tar, create_zip};
> use pbs_config::CachedUserInfo;
> @@ -70,7 +71,10 @@ use proxmox_rest_server::{formatter, worker_is_active, WorkerTask};
>
> use crate::api2::backup::optional_ns_param;
> use crate::api2::node::rrd::create_value_from_rrd;
> -use crate::backup::{check_ns_privs_full, ListAccessibleBackupGroups, VerifyWorker, NS_PRIVS_OK};
> +use crate::backup::{
> + can_access_any_namespace_in_range, check_ns_privs, check_ns_privs_full,
> + ListAccessibleBackupGroups, VerifyWorker, NS_PRIVS_OK,
> +};
> use crate::server::jobstate::{compute_schedule_status, Job, JobState};
> use crate::tools::{backup_info_to_snapshot_list_item, get_all_snapshot_files, read_backup_index};
>
> @@ -396,7 +400,7 @@ pub async fn delete_snapshot(
> }
>
> #[api(
> - serializing: true,
> + stream: true,
> input: {
> properties: {
> store: { schema: DATASTORE_SCHEMA },
> @@ -404,40 +408,137 @@ pub async fn delete_snapshot(
> type: BackupNamespace,
> optional: true,
> },
> - "backup-type": {
> + "max-depth": {
> + schema: NS_MAX_DEPTH_SCHEMA,
> optional: true,
> - type: BackupType,
> },
> - "backup-id": {
> + "content-type": {
> optional: true,
> - schema: BACKUP_ID_SCHEMA,
> + type: ContentType,
> },
> },
> },
> - returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOTS_RETURN_TYPE,
> access: {
> permission: &Permission::Anybody,
> description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
> or DATASTORE_BACKUP and being the owner of the group",
> },
> )]
> -/// List backup snapshots.
> -pub async fn list_snapshots(
> +/// List datastore content, recursively through all namespaces.
> +pub async fn list_content(
> store: String,
> ns: Option<BackupNamespace>,
> - backup_type: Option<BackupType>,
> - backup_id: Option<String>,
> + max_depth: Option<usize>,
> + content_type: Option<ContentType>,
> _param: Value,
> _info: &ApiMethod,
> rpcenv: &mut dyn RpcEnvironment,
> -) -> Result<Vec<SnapshotListItem>, Error> {
> +) -> Result<proxmox_router::Stream, Error> {
> + let (sender, mut receiver) = tokio::sync::mpsc::channel(128);
> +
> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> + let user_info = CachedUserInfo::new()?;
>
> - tokio::task::spawn_blocking(move || unsafe {
> - list_snapshots_blocking(store, ns, backup_type, backup_id, auth_id)
> - })
> - .await
> - .map_err(|err| format_err!("failed to await blocking task: {err}"))?
> + let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
> + if !can_access_any_namespace_in_range(
> + datastore.clone(),
> + &auth_id,
> + &user_info,
> + ns.clone(),
> + max_depth,
> + ) {
> + proxmox_router::http_bail!(FORBIDDEN, "permission check failed");
> + }
> +
> + let ns = ns.unwrap_or_default();
> +
> + let (list_ns, list_group, list_snapshots) = match content_type {
> + Some(ContentType::Namespace) => (true, false, false),
> + Some(ContentType::Group) => (false, true, false),
> + Some(ContentType::Snapshot) => (false, false, true),
> + None => (true, true, true),
> + };
> +
> + tokio::spawn(async move {
As Thomas already pointed out, you don't need to wrap the
`spawn_blocking` future as what amounts to an `async { fut.await }`.
But also, see the [tokio docs][2] on `JoinHandle`:
A JoinHandle detaches the associated task when it is dropped
so you can skip the entire above line.
[2]: https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html
> + tokio::task::spawn_blocking(move || {
> + if list_ns {
> + for ns in datastore.recursive_iter_backup_ns_ok(ns.clone(), max_depth)? {
> + match check_ns_privs(&store, &ns, &auth_id, NS_PRIVS_OK) {
> + Ok(_) => sender.blocking_send(Record::new(ContentListItem::from(
> + NamespaceListItem { ns, comment: None },
> + )))?,
> + Err(_) => continue,
Note that in the current streaming architecture the records can also be
error values, so you may want to consider passing errors through.
There are various helpers: `Record::error` (for std errors),
`Record::error_msg`, or `Record::error_value` if you need structured
errors.
> + }
> + }
> + }
> +
> + if !list_group && !list_snapshots {
> + return Ok(());
> + }
> +
> + for ns in datastore.recursive_iter_backup_ns_ok(ns, max_depth)? {
> + let list_all = match check_ns_privs_full(
> + &store,
> + &ns,
> + &auth_id,
> + PRIV_DATASTORE_AUDIT,
> + PRIV_DATASTORE_BACKUP,
> + ) {
> + Ok(requires_owner) => !requires_owner,
> + Err(_) => continue,
> + };
> + if list_group {
> + for group in datastore.iter_backup_groups(ns.clone())? {
> + let group = group?;
^ The above 2 `?` may also be cases where it would be better to send a
`Record::error`?
Unless we want those to cancel the entire rest of the stream?
> + let group = backup_group_to_group_list_item(
> + datastore.clone(),
> + group,
> + &ns,
> + &auth_id,
> + list_all,
> + );
> +
> + if let Some(group) = group {
> + sender.blocking_send(Record::new(ContentListItem::from((
> + ns.clone(),
> + group,
> + ))))?;
> + }
> + }
> + }
> +
> + if !list_snapshots {
> + continue;
> + }
> +
> + for group in datastore.iter_backup_groups(ns.clone())? {
> + let group = group?;
^ Same here
> + let owner = match get_group_owner(&store, &ns, &group) {
> + Some(auth_id) => auth_id,
> + None => continue,
> + };
> + for snapshot in group.iter_snapshots()? {
> + let snapshot = BackupInfo::new(snapshot?)?;
> + let snapshot = backup_info_to_snapshot_list_item(&snapshot, &owner);
> + sender.blocking_send(Record::new(ContentListItem::from((
> + ns.clone(),
> + snapshot,
> + ))))?;
> + }
> + }
> + }
> + Ok::<_, Error>(())
> + })
> + .await??;
> +
> + Ok::<_, Error>(())
> + });
> + Ok(async_stream::try_stream! {
> + while let Some(elem) = receiver.recv().await {
> + yield elem;
> + }
> + }
> + .into())
You should be able to instead return `ReceiverStream(receiver).into()`,
instead of this entire manual loop. ;-)
> }
>
> fn get_group_owner(
> @@ -523,6 +624,51 @@ unsafe fn list_snapshots_blocking(
> })
> }
>
> +#[api(
> + serializing: true,
> + input: {
> + properties: {
> + store: { schema: DATASTORE_SCHEMA },
> + ns: {
> + type: BackupNamespace,
> + optional: true,
> + },
> + "backup-type": {
> + optional: true,
> + type: BackupType,
> + },
> + "backup-id": {
> + optional: true,
> + schema: BACKUP_ID_SCHEMA,
> + },
> + },
> + },
> + returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOTS_RETURN_TYPE,
> + access: {
> + permission: &Permission::Anybody,
> + description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
> + or DATASTORE_BACKUP and being the owner of the group",
> + },
> +)]
> +/// List backup snapshots.
> +pub async fn list_snapshots(
> + store: String,
> + ns: Option<BackupNamespace>,
> + backup_type: Option<BackupType>,
> + backup_id: Option<String>,
> + _param: Value,
> + _info: &ApiMethod,
> + rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<SnapshotListItem>, Error> {
> + let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +
> + tokio::task::spawn_blocking(move || unsafe {
> + list_snapshots_blocking(store, ns, backup_type, backup_id, auth_id)
> + })
> + .await
> + .map_err(|err| format_err!("failed to await blocking task: {err}"))?
> +}
> +
> async fn get_snapshots_count(
> store: &Arc<DataStore>,
> owner: Option<&Authid>,
> @@ -2773,6 +2919,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
> "change-owner",
> &Router::new().post(&API_METHOD_SET_BACKUP_OWNER),
> ),
> + ("content", &Router::new().get(&API_METHOD_LIST_CONTENT)),
> (
> "download",
> &Router::new().download(&API_METHOD_DOWNLOAD_FILE),
> --
> 2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-10-07 12:51 UTC|newest]
Thread overview: 21+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-10-03 8:50 [pbs-devel] [PATCH proxmox{, -backup} 0/7] introduce " Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: add api types for " Dominik Csapak
2025-10-07 8:59 ` Wolfgang Bumiller
2025-10-08 6:41 ` Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 1/6] backup: hierarchy: add new can_access_any_namespace_in_range helper Dominik Csapak
2025-10-03 9:52 ` Thomas Lamprecht
2025-10-03 10:10 ` Dominik Csapak
2025-10-03 10:21 ` Thomas Lamprecht
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 2/6] backup: hierarchy: reuse 'NS_PRIVS_OK' for namespace helper Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 3/6] api: admin: datastore: refactor BackupGroup to GroupListItem conversion Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 4/6] api: admin: datastore: factor out 'get_group_owner' Dominik Csapak
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 5/6] api: admin: datastore: optimize `groups` api call Dominik Csapak
2025-10-03 10:18 ` Thomas Lamprecht
2025-10-03 10:51 ` Dominik Csapak
2025-10-03 12:37 ` Thomas Lamprecht
2025-10-03 8:50 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: admin: datastore: implement streaming content " Dominik Csapak
2025-10-03 11:55 ` Thomas Lamprecht
2025-10-07 12:51 ` Wolfgang Bumiller [this message]
2025-10-07 14:22 ` Thomas Lamprecht
2025-10-07 14:31 ` Wolfgang Bumiller
2025-10-07 15:05 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=fe7wgj7jjubyyzxorsefl2xpzgg4iyuem5t4n3elo5dz6papjr@z5wftp2afyyf \
--to=w.bumiller@proxmox.com \
--cc=d.csapak@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox