public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pve-devel] [RFC PATCH manager] WIP: api: implement node-independent bulk actions
@ 2025-03-18 10:39 Dominik Csapak
  2025-03-18 11:30 ` Stefan Hanreich
  0 siblings, 1 reply; 4+ messages in thread
From: Dominik Csapak @ 2025-03-18 10:39 UTC (permalink / raw)
  To: pve-devel

To achieve this, we start a worker task and use our generic api client
to start the tasks on the relevant nodes. The client always points
to 'localhost' so we let the pveproxy worry about the proxying etc.

We reuse some logic from the startall/stopall/etc. calls, like getting
the ordered guest info list.

Not yet implemented are:
* filters
* failure mode resolution (we could implement this later too)
* token handling (not sure if we need this at all if we check the
  permissions correctly upfront?)
* suspend
* some call specific parameters

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
this is a pre-requisite for having bulk actions on PDM.
Since we normally don't do such things 'cluster-wide' I wanted to
send the patch early to get feedback on my design decisons like:
* using the api client this way
* api naming/location
* handling parallel requests
* etc.

There are alternative methods to achieve similar results:
* use some kind of queuing system on the cluster (e.g. via pmxcfs)
* using the 'startall'/'stopall' calls from pve in PDM
* surely some other thing I didn't think about

We can of course start with this, and change the underlying mechanism
later too.

If we go this route, I could also rewrite the code in rust if wanted,
since there is nothing particularly dependent on perl here
(besides getting the vmlist, but that could stay in perl).
The bulk of the logic is how to start tasks + handle them finishing +
handling filter + concurrency.

 PVE/API2/Cluster.pm       |   7 +
 PVE/API2/Cluster/Bulk.pm  | 475 ++++++++++++++++++++++++++++++++++++++
 PVE/API2/Cluster/Makefile |   1 +
 PVE/API2/Nodes.pm         |  24 +-
 4 files changed, 496 insertions(+), 11 deletions(-)
 create mode 100644 PVE/API2/Cluster/Bulk.pm

diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
index a0e5c11b..478610e6 100644
--- a/PVE/API2/Cluster.pm
+++ b/PVE/API2/Cluster.pm
@@ -25,6 +25,7 @@ use PVE::API2::ACMEAccount;
 use PVE::API2::ACMEPlugin;
 use PVE::API2::Backup;
 use PVE::API2::Cluster::BackupInfo;
+use PVE::API2::Cluster::Bulk;
 use PVE::API2::Cluster::Ceph;
 use PVE::API2::Cluster::Mapping;
 use PVE::API2::Cluster::Jobs;
@@ -103,6 +104,11 @@ __PACKAGE__->register_method ({
     path => 'mapping',
 });
 
+__PACKAGE__->register_method ({
+    subclass => "PVE::API2::Cluster::Bulk",
+    path => 'bulk-actions',
+});
+
 if ($have_sdn) {
     __PACKAGE__->register_method ({
        subclass => "PVE::API2::Network::SDN",
@@ -162,6 +168,7 @@ __PACKAGE__->register_method ({
 	    { name => 'resources' },
 	    { name => 'status' },
 	    { name => 'tasks' },
+	    { name => 'bulk-actions' },
 	];
 
 	if ($have_sdn) {
diff --git a/PVE/API2/Cluster/Bulk.pm b/PVE/API2/Cluster/Bulk.pm
new file mode 100644
index 00000000..05a79155
--- /dev/null
+++ b/PVE/API2/Cluster/Bulk.pm
@@ -0,0 +1,475 @@
+package PVE::API2::Cluster::Bulk;
+
+use strict;
+use warnings;
+
+use PVE::APIClient::LWP;
+use PVE::AccessControl;
+use PVE::Cluster;
+use PVE::Exception qw(raise raise_perm_exc raise_param_exc);
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::RESTHandler;
+use PVE::RPCEnvironment;
+use PVE::Tools qw();
+
+use PVE::API2::Nodes;
+
+use base qw(PVE::RESTHandler);
+
+
+__PACKAGE__->register_method ({
+    name => 'index',
+    path => '',
+    method => 'GET',
+    description => "Bulk action index.",
+    permissions => { user => 'all' },
+    parameters => {
+	additionalProperties => 0,
+	properties => {},
+    },
+    returns => {
+	type => 'array',
+	items => {
+	    type => "object",
+	    properties => {},
+	},
+	links => [ { rel => 'child', href => "{name}" } ],
+    },
+    code => sub {
+	my ($param) = @_;
+
+	return [
+	    { name => 'start' },
+	    { name => 'shutdown' },
+	    { name => 'migrate' },
+	];
+    }});
+
+sub create_client {
+    my ($authuser, $request_timeout) = @_;
+    my ($user, undef) = PVE::AccessControl::split_tokenid($authuser, 1);
+
+    # TODO: How to handle Tokens?
+    my $ticket = PVE::AccessControl::assemble_ticket($user || $authuser);
+    my $csrf_token = PVE::AccessControl::assemble_csrf_prevention_token($user || $authuser);
+
+    my $node = PVE::INotify::nodename();
+    my $fingerprint = PVE::Cluster::get_node_fingerprint($node);
+
+    my $conn_args = {
+	protocol => 'https',
+	host => 'localhost', # always call the api locally, let pveproxy handle the proxying
+	port => 8006,
+	ticket => $ticket,
+	timeout => $request_timeout // 25, # default slightly shorter than the proxy->daemon timeout
+	cached_fingerprints => {
+	    $fingerprint => 1,
+	}
+    };
+
+    my $api_client = PVE::APIClient::LWP->new($conn_args->%*);
+    if (defined($csrf_token)) {
+	$api_client->update_csrftoken($csrf_token);
+    }
+
+    return $api_client;
+}
+
+# takes a vm list in the form of
+# {
+#     0 => {
+#         100 => { .. guest info ..},
+#         101 => { .. guest info ..},
+#     },
+#     1 => {
+#         102 => { .. guest info ..},
+#         103 => { .. guest info ..},
+#     },
+# }
+#
+# max_workers: how many parallel tasks should be started.
+# start_task: a sub that returns eiter a upid or 1 (undef means failure)
+# check_task: if start_task returned a upid, will wait for that to finish and
+#    call check_task with the resulting task status
+sub foreach_guest {
+    my ($startlist, $max_workers, $start_task, $check_task) = @_;
+
+    my $rpcenv = PVE::RPCEnvironment::get();
+    my $authuser = $rpcenv->get_user();
+    my $api_client = create_client($authuser);
+
+    my $failed = [];
+    for my $order (sort {$a <=> $b} keys $startlist->%*) {
+	my $vmlist = $startlist->{$order};
+	my $workers = {};
+
+	for my $vmid (sort {$a <=> $b} keys $vmlist->%*) {
+
+	    # wait until at least one slot is free
+	    while (scalar(keys($workers->%*)) >= $max_workers) {
+		for my $upid (keys($workers->%*)) {
+		    my $worker = $workers->{$upid};
+		    my $node = $worker->{guest}->{node};
+
+		    my $task = $api_client->get("/nodes/$node/tasks/$upid/status");
+		    if ($task->{status} ne 'running') {
+			my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
+			push $failed->@*, $worker->{vmid} if $is_error;
+
+			$check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
+
+			delete $workers->{$upid};
+		    }
+		}
+		sleep(1); # How much?
+	    }
+
+	    my $guest = $vmlist->{$vmid};
+	    my $upid = eval { $start_task->($api_client, $vmid, $guest) };
+	    warn $@ if $@;
+
+	    # success but no task necessary
+	    next if "$upid" eq "1";
+
+	    if (!defined($upid)) {
+		push $failed->@*, $vmid;
+		continue;
+	    }
+
+	    $workers->{$upid} = {
+		vmid => $vmid,
+		guest => $guest,
+	    };
+	}
+
+	# wait until current order is finished
+	for my $upid (keys($workers->%*)) {
+	    my $worker = $workers->{$upid};
+	    my $node = $worker->{guest}->{node};
+
+	    my $task = wait_for_task_finished($api_client, $node, $upid);
+	    my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
+	    push $failed->@*, $worker->{vmid} if $is_error;
+
+	    $check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
+
+	    delete $workers->{$upid};
+	}
+    }
+
+    return $failed;
+}
+
+sub get_type_text {
+    my ($type) = @_;
+
+    if ($type eq 'lxc') {
+	return 'CT';
+    } elsif ($type eq 'qemu') {
+	return 'VM';
+    } else {
+	die "unknown guest type $type\n";
+    }
+}
+
+sub wait_for_task_finished {
+    my ($client, $node, $upid) = @_;
+
+    while (1) {
+	my $task = $client->get("/nodes/$node/tasks/$upid/status");
+	return $task if $task->{status} ne 'running';
+	sleep(1); # How much time?
+    }
+}
+
+sub check_guest_permissions {
+    my ($rpcenv, $authuser, $vmlist, $priv_list) = @_;
+
+    my @vms = PVE::Tools::split_list($vmlist);
+    if (scalar(@vms) > 0) {
+	$rpcenv->check($authuser, "/vms/$_", $priv_list) for @vms;
+    } elsif (!$rpcenv->check($authuser, "/", $priv_list, 1)) {
+	raise_perm_exc("/, VM.PowerMgmt");
+    }
+}
+
+__PACKAGE__->register_method({
+    name => 'start',
+    path => 'start',
+    method => 'POST',
+    description => "Bulk start all guests on the cluster.",
+    permissions => { user => 'all' },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    vms => {
+		description => "Only consider guests from this comma separated list of VMIDs.",
+		type => 'string',  format => 'pve-vmid-list',
+		optional => 1,
+	    },
+	    'max-workers' => {
+		description => "How many parallel tasks at maximum should be started.",
+		optional => 1,
+		default => 1,
+		type => 'integer',
+	    },
+	    # TODO:
+	    # Failure resolution mode (fail, warn, retry?)
+	    # mode-limits (offline only, suspend only, ?)
+	    # filter (tags, name, ?)
+	},
+    },
+    returns => {
+	type => 'string',
+	description => "UPID of the worker",
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
+
+	my $code = sub {
+	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
+
+	    my $start_task = sub {
+		my ($api_client, $vmid, $guest) = @_;
+		my $node = $guest->{node};
+
+		my $type = $guest->{type};
+		my $type_text = get_type_text($type);
+
+		my $url = "/nodes/$node/$type/$vmid/status/start";
+		print STDERR "Starting $type_text $vmid\n";
+		return $api_client->post($url);
+	    };
+
+	    my $check_task = sub {
+		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
+		my $node = $guest->{node};
+
+		my $default_delay = 0;
+
+		if (!$is_error) {
+		    my $delay = defined($guest->{up}) ? int($guest->{up}) : $default_delay;
+		    if ($delay > 0) {
+			print STDERR "Waiting for $delay seconds (startup delay)\n" if $guest->{up};
+			for (my $i = 0; $i < $delay; $i++) {
+			    sleep(1);
+			}
+		    }
+		} else {
+		    my $type_text = get_type_text($guest->{type});
+		    print STDERR "Starting $type_text $vmid failed: $task->{exitstatus}\n";
+		}
+	    };
+
+	    my $max_workers = $param->{'max-workers'} // 1;
+	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
+
+	    if (scalar($failed->@*)) {
+		die "Some guests failed to start: " . join(', ', $failed->@*) . "\n";
+	    }
+	};
+
+	return $rpcenv->fork_worker('bulkstart', undef, $authuser, $code);
+    }});
+
+__PACKAGE__->register_method({
+    name => 'shutdown',
+    path => 'shutdown',
+    method => 'POST',
+    description => "Bulk shutdown all guests on the cluster.",
+    permissions => { user => 'all' },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    vms => {
+		description => "Only consider guests from this comma separated list of VMIDs.",
+		type => 'string',  format => 'pve-vmid-list',
+		optional => 1,
+	    },
+	    timeout => {
+		description => "Default shutdown timeout in seconds if none is configured for the guest.",
+		type => 'integer',
+		default => 180,
+		optional => 1,
+	    },
+	    'max-workers' => {
+		description => "How many parallel tasks at maximum should be started.",
+		optional => 1,
+		default => 1,
+		type => 'integer',
+	    },
+	    # TODO:
+	    # Failure resolution mode (fail, warn, retry?)
+	    # mode-limits (offline only, suspend only, ?)
+	    # filter (tags, name, ?)
+	},
+    },
+    returns => {
+	type => 'string',
+	description => "UPID of the worker",
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
+
+	my $code = sub {
+	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
+
+	    # reverse order for shutdown
+	    for my $order (keys $startlist->%*) {
+		my $list = delete $startlist->{$order};
+		$order = $order * -1;
+		$startlist->{$order} = $list;
+	    }
+
+	    my $start_task = sub {
+		my ($api_client, $vmid, $guest) = @_;
+		my $node = $guest->{node};
+
+		my $type = $guest->{type};
+		my $type_text = get_type_text($type);
+
+		my $timeout = int($guest->{down} // $param->{timeout} // 180);
+
+		my $params = {
+		    forceStop => 1,
+		    timeout => $timeout,
+		};
+
+		my $url = "/nodes/$node/$type/$vmid/status/shutdown";
+		print STDERR "Shutting down $type_text $vmid (Timeout = $timeout seconds)\n";
+		return $api_client->post($url, $params);
+	    };
+
+	    my $check_task = sub {
+		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
+		my $node = $guest->{node};
+		    if ($is_error) {
+			my $type_text = get_type_text($guest->{type});
+			print STDERR "Stopping $type_text $vmid failed: $task->{exitstatus}\n";
+		    }
+	    };
+
+	    my $max_workers = $param->{'max-workers'} // 1;
+	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
+
+	    if (scalar($failed->@*)) {
+		die "Some guests failed to shutdown " . join(', ', $failed->@*) . "\n";
+	    }
+	};
+
+	return $rpcenv->fork_worker('bulkshutdown', undef, $authuser, $code);
+    }});
+
+__PACKAGE__->register_method({
+    name => 'migrate',
+    path => 'migrate',
+    method => 'POST',
+    description => "Bulk migrate all guests on the cluster.",
+    permissions => { user => 'all' },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    vms => {
+		description => "Only consider guests from this comma separated list of VMIDs.",
+		type => 'string',  format => 'pve-vmid-list',
+		optional => 1,
+	    },
+	    'target-node' => get_standard_option('pve-node', { description => "Target node." }),
+	    online => {
+		type => 'boolean',
+		description => "Enable live migration for VMs and restart migration for CTs.",
+		optional => 1,
+	    },
+	    "with-local-disks" => {
+		type => 'boolean',
+		description => "Enable live storage migration for local disk",
+		optional => 1,
+	    },
+	    'max-workers' => {
+		description => "How many parallel tasks at maximum should be started.",
+		optional => 1,
+		default => 1,
+		type => 'integer',
+	    },
+	    # TODO:
+	    # Failure resolution mode (fail, warn, retry?)
+	    # mode-limits (offline only, suspend only, ?)
+	    # filter (tags, name, ?)
+	},
+    },
+    returns => {
+	type => 'string',
+	description => "UPID of the worker",
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.Migrate' ]);
+
+	my $code = sub {
+	    my $list = PVE::API2::Nodes::Nodeinfo::get_filtered_vmlist(undef, $param->{vms}, 1, 1);
+
+	    my $start_task = sub {
+		my ($api_client, $vmid, $guest) = @_;
+		my $node = $guest->{node};
+
+		my $type = $guest->{type};
+		my $type_text = get_type_text($type);
+
+		if ($node eq $param->{'target-node'}) {
+		    print STDERR "$type_text $vmid already on $node, skipping.\n";
+		    return 1;
+		}
+
+		my $params = {
+		    target => $param->{'target-node'},
+		};
+
+		if ($type eq 'lxc') {
+		    $params->{restart} = $param->{online} if defined($param->{online});
+		} elsif ($type eq 'qemu') {
+		    $params->{online} = $param->{online} if defined($param->{online});
+		    $params->{'with-local-disks'} = $param->{'with-local-disks'} if defined($param->{'with-local-disks'});
+		}
+
+		my $url = "/nodes/$node/$type/$vmid/migrate";
+		print STDERR "Migrating $type_text $vmid\n";
+		return $api_client->post($url, $params);
+	    };
+
+	    my $check_task = sub {
+		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
+		if ($is_error) {
+		    my $type_text = get_type_text($guest->{type});
+		    print STDERR "Migrating $type_text $vmid failed: $task->{exitstatus}\n";
+		}
+	    };
+
+	    my $max_workers = $param->{'max-workers'} // 1;
+	    my $failed = foreach_guest({ '0' => $list }, $max_workers, $start_task, $check_task);
+
+	    if (scalar($failed->@*)) {
+		die "Some guests failed to migrate " . join(', ', $failed->@*) . "\n";
+	    }
+	};
+
+	return $rpcenv->fork_worker('bulkmigrate', undef, $authuser, $code);
+    }});
+
+1;
diff --git a/PVE/API2/Cluster/Makefile b/PVE/API2/Cluster/Makefile
index b109e5cb..ed02b4be 100644
--- a/PVE/API2/Cluster/Makefile
+++ b/PVE/API2/Cluster/Makefile
@@ -6,6 +6,7 @@ SUBDIRS=Mapping
 # ensure we do not conflict with files shipped by pve-cluster!!
 PERLSOURCE= 			\
 	BackupInfo.pm		\
+	Bulk.pm			\
 	MetricServer.pm		\
 	Mapping.pm		\
 	Notifications.pm		\
diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
index 9cdf19db..02bc7299 100644
--- a/PVE/API2/Nodes.pm
+++ b/PVE/API2/Nodes.pm
@@ -1829,7 +1829,7 @@ __PACKAGE__->register_method({
 # * vmid whitelist
 # * guest is a template (default: skip)
 # * guest is HA manged (default: skip)
-my $get_filtered_vmlist = sub {
+sub get_filtered_vmlist {
     my ($nodename, $vmfilter, $templates, $ha_managed) = @_;
 
     my $vmlist = PVE::Cluster::get_vmlist();
@@ -1856,13 +1856,14 @@ my $get_filtered_vmlist = sub {
 		die "unknown virtual guest type '$d->{type}'\n";
 	    }
 
-	    my $conf = $class->load_config($vmid);
+	    my $conf = $class->load_config($vmid, $d->{node});
 	    return if !$templates && $class->is_template($conf);
 	    return if !$ha_managed && PVE::HA::Config::vm_is_ha_managed($vmid);
 
 	    $res->{$vmid}->{conf} = $conf;
 	    $res->{$vmid}->{type} = $d->{type};
 	    $res->{$vmid}->{class} = $class;
+	    $res->{$vmid}->{node} = $d->{node};
 	};
 	warn $@ if $@;
     }
@@ -1871,13 +1872,13 @@ my $get_filtered_vmlist = sub {
 };
 
 # return all VMs which should get started/stopped on power up/down
-my $get_start_stop_list = sub {
+sub get_start_stop_list {
     my ($nodename, $autostart, $vmfilter) = @_;
 
     # do not skip HA vms on force or if a specific VMID set is wanted
     my $include_ha_managed = defined($vmfilter) ? 1 : 0;
 
-    my $vmlist = $get_filtered_vmlist->($nodename, $vmfilter, undef, $include_ha_managed);
+    my $vmlist = get_filtered_vmlist($nodename, $vmfilter, undef, $include_ha_managed);
 
     my $resList = {};
     foreach my $vmid (keys %$vmlist) {
@@ -1889,15 +1890,16 @@ my $get_start_stop_list = sub {
 
 	$resList->{$order}->{$vmid} = $startup;
 	$resList->{$order}->{$vmid}->{type} = $vmlist->{$vmid}->{type};
+	$resList->{$order}->{$vmid}->{node} = $vmlist->{$vmid}->{node};
     }
 
     return $resList;
-};
+}
 
 my $remove_locks_on_startup = sub {
     my ($nodename) = @_;
 
-    my $vmlist = &$get_filtered_vmlist($nodename, undef, undef, 1);
+    my $vmlist = get_filtered_vmlist($nodename, undef, undef, 1);
 
     foreach my $vmid (keys %$vmlist) {
 	my $conf = $vmlist->{$vmid}->{conf};
@@ -1983,7 +1985,7 @@ __PACKAGE__->register_method ({
 	    warn $@ if $@;
 
 	    my $autostart = $force ? undef : 1;
-	    my $startList = $get_start_stop_list->($nodename, $autostart, $param->{vms});
+	    my $startList = get_start_stop_list($nodename, $autostart, $param->{vms});
 
 	    # Note: use numeric sorting with <=>
 	    for my $order (sort {$a <=> $b} keys %$startList) {
@@ -2096,7 +2098,7 @@ __PACKAGE__->register_method ({
 		minimum => 0,
 		maximum => 2 * 3600, # mostly arbitrary, but we do not want to high timeouts
 	    },
-	},
+	}
     },
     returns => {
 	type => 'string',
@@ -2123,7 +2125,7 @@ __PACKAGE__->register_method ({
 
 	    $rpcenv->{type} = 'priv'; # to start tasks in background
 
-	    my $stopList = $get_start_stop_list->($nodename, undef, $param->{vms});
+	    my $stopList = get_start_stop_list($nodename, undef, $param->{vms});
 
 	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
 	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
@@ -2246,7 +2248,7 @@ __PACKAGE__->register_method ({
 
 	    $rpcenv->{type} = 'priv'; # to start tasks in background
 
-	    my $toSuspendList = $get_start_stop_list->($nodename, undef, $param->{vms});
+	    my $toSuspendList = get_start_stop_list($nodename, undef, $param->{vms});
 
 	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
 	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
@@ -2434,7 +2436,7 @@ __PACKAGE__->register_method ({
 	my $code = sub {
 	    $rpcenv->{type} = 'priv'; # to start tasks in background
 
-	    my $vmlist = &$get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
+	    my $vmlist = get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
 	    if (!scalar(keys %$vmlist)) {
 		warn "no virtual guests matched, nothing to do..\n";
 		return;
-- 
2.39.5



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pve-devel] [RFC PATCH manager] WIP: api: implement node-independent bulk actions
  2025-03-18 10:39 [pve-devel] [RFC PATCH manager] WIP: api: implement node-independent bulk actions Dominik Csapak
@ 2025-03-18 11:30 ` Stefan Hanreich
  2025-03-19  9:04   ` Dominik Csapak
  0 siblings, 1 reply; 4+ messages in thread
From: Stefan Hanreich @ 2025-03-18 11:30 UTC (permalink / raw)
  To: Proxmox VE development discussion, Dominik Csapak

This would be really interesting for applying the SDN configuration as
well, where I'm currently calling the existing bulk-call. Really like
the design with the start / check callbacks, that should make this quite
flexible.

On 3/18/25 11:39, Dominik Csapak wrote:
> To achieve this, we start a worker task and use our generic api client
> to start the tasks on the relevant nodes. The client always points
> to 'localhost' so we let the pveproxy worry about the proxying etc.
> 
> We reuse some logic from the startall/stopall/etc. calls, like getting
> the ordered guest info list.
> 
> Not yet implemented are:
> * filters
> * failure mode resolution (we could implement this later too)
> * token handling (not sure if we need this at all if we check the
>   permissions correctly upfront?)
> * suspend
> * some call specific parameters
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> this is a pre-requisite for having bulk actions on PDM.
> Since we normally don't do such things 'cluster-wide' I wanted to
> send the patch early to get feedback on my design decisons like:
> * using the api client this way
> * api naming/location
> * handling parallel requests
> * etc.
> 
> There are alternative methods to achieve similar results:
> * use some kind of queuing system on the cluster (e.g. via pmxcfs)
> * using the 'startall'/'stopall' calls from pve in PDM
> * surely some other thing I didn't think about
> 
> We can of course start with this, and change the underlying mechanism
> later too.
> 
> If we go this route, I could also rewrite the code in rust if wanted,
> since there is nothing particularly dependent on perl here
> (besides getting the vmlist, but that could stay in perl).
> The bulk of the logic is how to start tasks + handle them finishing +
> handling filter + concurrency.

I'm actually reading the VM list in the firewall via this:
https://git.proxmox.com/?p=proxmox-ve-rs.git;a=blob;f=proxmox-ve-config/src/guest/mod.rs;h=74fd8abc000aec0fa61898840d44ab8a4cd9018b;hb=HEAD#l69

So we could build upon that if we want to implement it in Rust?

I have something similar, *very* basic, implemented for running multiple
tasks across clusters in my SDN patch series - so maybe we could
repurpose that for a possible implementation, even generalize it?

>  PVE/API2/Cluster.pm       |   7 +
>  PVE/API2/Cluster/Bulk.pm  | 475 ++++++++++++++++++++++++++++++++++++++
>  PVE/API2/Cluster/Makefile |   1 +
>  PVE/API2/Nodes.pm         |  24 +-
>  4 files changed, 496 insertions(+), 11 deletions(-)
>  create mode 100644 PVE/API2/Cluster/Bulk.pm
> 
> diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
> index a0e5c11b..478610e6 100644
> --- a/PVE/API2/Cluster.pm
> +++ b/PVE/API2/Cluster.pm
> @@ -25,6 +25,7 @@ use PVE::API2::ACMEAccount;
>  use PVE::API2::ACMEPlugin;
>  use PVE::API2::Backup;
>  use PVE::API2::Cluster::BackupInfo;
> +use PVE::API2::Cluster::Bulk;
>  use PVE::API2::Cluster::Ceph;
>  use PVE::API2::Cluster::Mapping;
>  use PVE::API2::Cluster::Jobs;
> @@ -103,6 +104,11 @@ __PACKAGE__->register_method ({
>      path => 'mapping',
>  });
>  
> +__PACKAGE__->register_method ({
> +    subclass => "PVE::API2::Cluster::Bulk",
> +    path => 'bulk-actions',
> +});
> +
>  if ($have_sdn) {
>      __PACKAGE__->register_method ({
>         subclass => "PVE::API2::Network::SDN",
> @@ -162,6 +168,7 @@ __PACKAGE__->register_method ({
>  	    { name => 'resources' },
>  	    { name => 'status' },
>  	    { name => 'tasks' },
> +	    { name => 'bulk-actions' },
>  	];
>  
>  	if ($have_sdn) {
> diff --git a/PVE/API2/Cluster/Bulk.pm b/PVE/API2/Cluster/Bulk.pm
> new file mode 100644
> index 00000000..05a79155
> --- /dev/null
> +++ b/PVE/API2/Cluster/Bulk.pm
> @@ -0,0 +1,475 @@
> +package PVE::API2::Cluster::Bulk;

We might wanna think about using sub-paths already, since I can see this
growing quite fast (at least a sub-path for SDN would be interesting). I
don't know how many other potential use-cases there are aside from that.

> +
> +use strict;
> +use warnings;
> +
> +use PVE::APIClient::LWP;
> +use PVE::AccessControl;
> +use PVE::Cluster;
> +use PVE::Exception qw(raise raise_perm_exc raise_param_exc);
> +use PVE::JSONSchema qw(get_standard_option);
> +use PVE::RESTHandler;
> +use PVE::RPCEnvironment;
> +use PVE::Tools qw();
> +
> +use PVE::API2::Nodes;
> +
> +use base qw(PVE::RESTHandler);
> +
> +
> +__PACKAGE__->register_method ({
> +    name => 'index',
> +    path => '',
> +    method => 'GET',
> +    description => "Bulk action index.",
> +    permissions => { user => 'all' },
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {},
> +    },
> +    returns => {
> +	type => 'array',
> +	items => {
> +	    type => "object",
> +	    properties => {},
> +	},
> +	links => [ { rel => 'child', href => "{name}" } ],
> +    },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	return [
> +	    { name => 'start' },
> +	    { name => 'shutdown' },
> +	    { name => 'migrate' },
> +	];
> +    }});
> +
> +sub create_client {
> +    my ($authuser, $request_timeout) = @_;
> +    my ($user, undef) = PVE::AccessControl::split_tokenid($authuser, 1);
> +
> +    # TODO: How to handle Tokens?
> +    my $ticket = PVE::AccessControl::assemble_ticket($user || $authuser);
> +    my $csrf_token = PVE::AccessControl::assemble_csrf_prevention_token($user || $authuser);
> +
> +    my $node = PVE::INotify::nodename();
> +    my $fingerprint = PVE::Cluster::get_node_fingerprint($node);
> +
> +    my $conn_args = {
> +	protocol => 'https',
> +	host => 'localhost', # always call the api locally, let pveproxy handle the proxying
> +	port => 8006,
> +	ticket => $ticket,
> +	timeout => $request_timeout // 25, # default slightly shorter than the proxy->daemon timeout
> +	cached_fingerprints => {
> +	    $fingerprint => 1,
> +	}
> +    };
> +
> +    my $api_client = PVE::APIClient::LWP->new($conn_args->%*);
> +    if (defined($csrf_token)) {
> +	$api_client->update_csrftoken($csrf_token);
> +    }
> +
> +    return $api_client;
> +}
> +
> +# takes a vm list in the form of
> +# {
> +#     0 => {
> +#         100 => { .. guest info ..},
> +#         101 => { .. guest info ..},
> +#     },
> +#     1 => {
> +#         102 => { .. guest info ..},
> +#         103 => { .. guest info ..},
> +#     },
> +# }
> +#
> +# max_workers: how many parallel tasks should be started.
> +# start_task: a sub that returns eiter a upid or 1 (undef means failure)
> +# check_task: if start_task returned a upid, will wait for that to finish and
> +#    call check_task with the resulting task status
> +sub foreach_guest {
> +    my ($startlist, $max_workers, $start_task, $check_task) = @_;
> +
> +    my $rpcenv = PVE::RPCEnvironment::get();
> +    my $authuser = $rpcenv->get_user();
> +    my $api_client = create_client($authuser);
> +
> +    my $failed = [];
> +    for my $order (sort {$a <=> $b} keys $startlist->%*) {
> +	my $vmlist = $startlist->{$order};
> +	my $workers = {};
> +
> +	for my $vmid (sort {$a <=> $b} keys $vmlist->%*) {
> +
> +	    # wait until at least one slot is free
> +	    while (scalar(keys($workers->%*)) >= $max_workers) {
> +		for my $upid (keys($workers->%*)) {
> +		    my $worker = $workers->{$upid};
> +		    my $node = $worker->{guest}->{node};
> +
> +		    my $task = $api_client->get("/nodes/$node/tasks/$upid/status");

what happens if this call fails for any reason (iow status code != 2xx)?
this is fallible afaict from a quick glance at the API client. should we
handle errors here and retry / abort? We should certainly not just die
here I'd say.

> +		    if ($task->{status} ne 'running') {
> +			my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
> +			push $failed->@*, $worker->{vmid} if $is_error;
> +
> +			$check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
> +
> +			delete $workers->{$upid};
> +		    }
> +		}
> +		sleep(1); # How much?
> +	    }
> +
> +	    my $guest = $vmlist->{$vmid};
> +	    my $upid = eval { $start_task->($api_client, $vmid, $guest) };
> +	    warn $@ if $@;
> +
> +	    # success but no task necessary
> +	    next if "$upid" eq "1";
> +
> +	    if (!defined($upid)) {
> +		push $failed->@*, $vmid;
> +		continue;
> +	    }
> +
> +	    $workers->{$upid} = {
> +		vmid => $vmid,
> +		guest => $guest,
> +	    };
> +	}
> +
> +	# wait until current order is finished
> +	for my $upid (keys($workers->%*)) {
> +	    my $worker = $workers->{$upid};
> +	    my $node = $worker->{guest}->{node};
> +
> +	    my $task = wait_for_task_finished($api_client, $node, $upid);
> +	    my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
> +	    push $failed->@*, $worker->{vmid} if $is_error;
> +
> +	    $check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
> +
> +	    delete $workers->{$upid};

Maybe extract that into a function, since it seems to be the same code
as above?

Or maybe even a do while would simplify things here? Haven't thought it
through 100%, just an idea:

  do {
   // check for terminated workers and reap them
   // fill empty worker slots with new workers
  }
  while (workers_exist)

Would maybe simplify things and not require the waiting part at the end?

> +	}
> +    }
> +
> +    return $failed;
> +}
> +
> +sub get_type_text {
> +    my ($type) = @_;
> +
> +    if ($type eq 'lxc') {
> +	return 'CT';
> +    } elsif ($type eq 'qemu') {
> +	return 'VM';
> +    } else {
> +	die "unknown guest type $type\n";
> +    }
> +}
> +
> +sub wait_for_task_finished {
> +    my ($client, $node, $upid) = @_;
> +
> +    while (1) {
> +	my $task = $client->get("/nodes/$node/tasks/$upid/status");

also some error handling / retry logic? maybe just at the call site though..

> +	return $task if $task->{status} ne 'running';
> +	sleep(1); # How much time?
> +    }
> +}
> +
> +sub check_guest_permissions {
> +    my ($rpcenv, $authuser, $vmlist, $priv_list) = @_;
> +
> +    my @vms = PVE::Tools::split_list($vmlist);
> +    if (scalar(@vms) > 0) {
> +	$rpcenv->check($authuser, "/vms/$_", $priv_list) for @vms;
> +    } elsif (!$rpcenv->check($authuser, "/", $priv_list, 1)) {
> +	raise_perm_exc("/, VM.PowerMgmt");
> +    }
> +}
> +
> +__PACKAGE__->register_method({
> +    name => 'start',
> +    path => 'start',
> +    method => 'POST',
> +    description => "Bulk start all guests on the cluster.",
> +    permissions => { user => 'all' },

Since the permissions check happens inside the API call, it would be
nice to add a description here that states what checks are performed.

> +    protected => 1,
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    vms => {
> +		description => "Only consider guests from this comma separated list of VMIDs.",
> +		type => 'string',  format => 'pve-vmid-list',
> +		optional => 1,
> +	    },
> +	    'max-workers' => {
> +		description => "How many parallel tasks at maximum should be started.",
> +		optional => 1,
> +		default => 1,
> +		type => 'integer',
> +	    },
> +	    # TODO:
> +	    # Failure resolution mode (fail, warn, retry?)

Do you intend to pass this to foreach_guests or should this be handled
by check_task? Depending on this, you can ignore the comments above
w.r.t. fallibility. If check_task should handle it, we should be really
careful with how we handle the API call failures inside and return them
to check_task, so we can make informed decisions there.

> +	    # mode-limits (offline only, suspend only, ?)
> +	    # filter (tags, name, ?)
> +	},
> +    },
> +    returns => {
> +	type => 'string',
> +	description => "UPID of the worker",
> +    },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $rpcenv = PVE::RPCEnvironment::get();
> +	my $authuser = $rpcenv->get_user();
> +
> +	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
> +
> +	my $code = sub {
> +	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
> +
> +	    my $start_task = sub {
> +		my ($api_client, $vmid, $guest) = @_;
> +		my $node = $guest->{node};
> +
> +		my $type = $guest->{type};
> +		my $type_text = get_type_text($type);
> +
> +		my $url = "/nodes/$node/$type/$vmid/status/start";
> +		print STDERR "Starting $type_text $vmid\n";
> +		return $api_client->post($url);
> +	    };
> +
> +	    my $check_task = sub {
> +		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
> +		my $node = $guest->{node};
> +
> +		my $default_delay = 0;
> +
> +		if (!$is_error) {
> +		    my $delay = defined($guest->{up}) ? int($guest->{up}) : $default_delay;
> +		    if ($delay > 0) {
> +			print STDERR "Waiting for $delay seconds (startup delay)\n" if $guest->{up};
> +			for (my $i = 0; $i < $delay; $i++) {
> +			    sleep(1);
> +			}
> +		    }
> +		} else {
> +		    my $type_text = get_type_text($guest->{type});
> +		    print STDERR "Starting $type_text $vmid failed: $task->{exitstatus}\n";
> +		}
> +	    };
> +
> +	    my $max_workers = $param->{'max-workers'} // 1;
> +	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
> +
> +	    if (scalar($failed->@*)) {
> +		die "Some guests failed to start: " . join(', ', $failed->@*) . "\n";
> +	    }
> +	};
> +
> +	return $rpcenv->fork_worker('bulkstart', undef, $authuser, $code);
> +    }});
> +
> +__PACKAGE__->register_method({
> +    name => 'shutdown',
> +    path => 'shutdown',
> +    method => 'POST',
> +    description => "Bulk shutdown all guests on the cluster.",
> +    permissions => { user => 'all' },

description here as well

> +    protected => 1,
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    vms => {
> +		description => "Only consider guests from this comma separated list of VMIDs.",
> +		type => 'string',  format => 'pve-vmid-list',
> +		optional => 1,
> +	    },
> +	    timeout => {
> +		description => "Default shutdown timeout in seconds if none is configured for the guest.",
> +		type => 'integer',
> +		default => 180,
> +		optional => 1,
> +	    },
> +	    'max-workers' => {
> +		description => "How many parallel tasks at maximum should be started.",
> +		optional => 1,
> +		default => 1,
> +		type => 'integer',
> +	    },
> +	    # TODO:
> +	    # Failure resolution mode (fail, warn, retry?)
> +	    # mode-limits (offline only, suspend only, ?)
> +	    # filter (tags, name, ?)
> +	},
> +    },
> +    returns => {
> +	type => 'string',
> +	description => "UPID of the worker",
> +    },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $rpcenv = PVE::RPCEnvironment::get();
> +	my $authuser = $rpcenv->get_user();
> +
> +	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
> +
> +	my $code = sub {
> +	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
> +
> +	    # reverse order for shutdown
> +	    for my $order (keys $startlist->%*) {
> +		my $list = delete $startlist->{$order};
> +		$order = $order * -1;
> +		$startlist->{$order} = $list;
> +	    }
> +
> +	    my $start_task = sub {
> +		my ($api_client, $vmid, $guest) = @_;
> +		my $node = $guest->{node};
> +
> +		my $type = $guest->{type};
> +		my $type_text = get_type_text($type);
> +
> +		my $timeout = int($guest->{down} // $param->{timeout} // 180);
> +
> +		my $params = {
> +		    forceStop => 1,
> +		    timeout => $timeout,
> +		};
> +
> +		my $url = "/nodes/$node/$type/$vmid/status/shutdown";
> +		print STDERR "Shutting down $type_text $vmid (Timeout = $timeout seconds)\n";
> +		return $api_client->post($url, $params);
> +	    };
> +
> +	    my $check_task = sub {
> +		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
> +		my $node = $guest->{node};
> +		    if ($is_error) {
> +			my $type_text = get_type_text($guest->{type});
> +			print STDERR "Stopping $type_text $vmid failed: $task->{exitstatus}\n";
> +		    }
> +	    };
> +
> +	    my $max_workers = $param->{'max-workers'} // 1;
> +	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
> +
> +	    if (scalar($failed->@*)) {
> +		die "Some guests failed to shutdown " . join(', ', $failed->@*) . "\n";
> +	    }
> +	};
> +
> +	return $rpcenv->fork_worker('bulkshutdown', undef, $authuser, $code);
> +    }});
> +
> +__PACKAGE__->register_method({
> +    name => 'migrate',
> +    path => 'migrate',
> +    method => 'POST',
> +    description => "Bulk migrate all guests on the cluster.",
> +    permissions => { user => 'all' },

description here as well

> +    protected => 1,
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    vms => {
> +		description => "Only consider guests from this comma separated list of VMIDs.",
> +		type => 'string',  format => 'pve-vmid-list',
> +		optional => 1,
> +	    },
> +	    'target-node' => get_standard_option('pve-node', { description => "Target node." }),
> +	    online => {
> +		type => 'boolean',
> +		description => "Enable live migration for VMs and restart migration for CTs.",
> +		optional => 1,
> +	    },
> +	    "with-local-disks" => {
> +		type => 'boolean',
> +		description => "Enable live storage migration for local disk",
> +		optional => 1,
> +	    },
> +	    'max-workers' => {
> +		description => "How many parallel tasks at maximum should be started.",
> +		optional => 1,
> +		default => 1,
> +		type => 'integer',
> +	    },
> +	    # TODO:
> +	    # Failure resolution mode (fail, warn, retry?)
> +	    # mode-limits (offline only, suspend only, ?)
> +	    # filter (tags, name, ?)
> +	},
> +    },
> +    returns => {
> +	type => 'string',
> +	description => "UPID of the worker",
> +    },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $rpcenv = PVE::RPCEnvironment::get();
> +	my $authuser = $rpcenv->get_user();
> +
> +	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.Migrate' ]);
> +
> +	my $code = sub {
> +	    my $list = PVE::API2::Nodes::Nodeinfo::get_filtered_vmlist(undef, $param->{vms}, 1, 1);
> +
> +	    my $start_task = sub {
> +		my ($api_client, $vmid, $guest) = @_;
> +		my $node = $guest->{node};
> +
> +		my $type = $guest->{type};
> +		my $type_text = get_type_text($type);
> +
> +		if ($node eq $param->{'target-node'}) {
> +		    print STDERR "$type_text $vmid already on $node, skipping.\n";
> +		    return 1;
> +		}
> +
> +		my $params = {
> +		    target => $param->{'target-node'},
> +		};
> +
> +		if ($type eq 'lxc') {
> +		    $params->{restart} = $param->{online} if defined($param->{online});
> +		} elsif ($type eq 'qemu') {
> +		    $params->{online} = $param->{online} if defined($param->{online});
> +		    $params->{'with-local-disks'} = $param->{'with-local-disks'} if defined($param->{'with-local-disks'});
> +		}
> +
> +		my $url = "/nodes/$node/$type/$vmid/migrate";
> +		print STDERR "Migrating $type_text $vmid\n";
> +		return $api_client->post($url, $params);
> +	    };
> +
> +	    my $check_task = sub {
> +		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
> +		if ($is_error) {
> +		    my $type_text = get_type_text($guest->{type});
> +		    print STDERR "Migrating $type_text $vmid failed: $task->{exitstatus}\n";
> +		}
> +	    };
> +
> +	    my $max_workers = $param->{'max-workers'} // 1;
> +	    my $failed = foreach_guest({ '0' => $list }, $max_workers, $start_task, $check_task);
> +
> +	    if (scalar($failed->@*)) {
> +		die "Some guests failed to migrate " . join(', ', $failed->@*) . "\n";
> +	    }
> +	};
> +
> +	return $rpcenv->fork_worker('bulkmigrate', undef, $authuser, $code);
> +    }});
> +
> +1;
> diff --git a/PVE/API2/Cluster/Makefile b/PVE/API2/Cluster/Makefile
> index b109e5cb..ed02b4be 100644
> --- a/PVE/API2/Cluster/Makefile
> +++ b/PVE/API2/Cluster/Makefile
> @@ -6,6 +6,7 @@ SUBDIRS=Mapping
>  # ensure we do not conflict with files shipped by pve-cluster!!
>  PERLSOURCE= 			\
>  	BackupInfo.pm		\
> +	Bulk.pm			\
>  	MetricServer.pm		\
>  	Mapping.pm		\
>  	Notifications.pm		\
> diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
> index 9cdf19db..02bc7299 100644
> --- a/PVE/API2/Nodes.pm
> +++ b/PVE/API2/Nodes.pm
> @@ -1829,7 +1829,7 @@ __PACKAGE__->register_method({
>  # * vmid whitelist
>  # * guest is a template (default: skip)
>  # * guest is HA manged (default: skip)
> -my $get_filtered_vmlist = sub {
> +sub get_filtered_vmlist {
>      my ($nodename, $vmfilter, $templates, $ha_managed) = @_;
>  
>      my $vmlist = PVE::Cluster::get_vmlist();
> @@ -1856,13 +1856,14 @@ my $get_filtered_vmlist = sub {
>  		die "unknown virtual guest type '$d->{type}'\n";
>  	    }
>  
> -	    my $conf = $class->load_config($vmid);
> +	    my $conf = $class->load_config($vmid, $d->{node});
>  	    return if !$templates && $class->is_template($conf);
>  	    return if !$ha_managed && PVE::HA::Config::vm_is_ha_managed($vmid);
>  
>  	    $res->{$vmid}->{conf} = $conf;
>  	    $res->{$vmid}->{type} = $d->{type};
>  	    $res->{$vmid}->{class} = $class;
> +	    $res->{$vmid}->{node} = $d->{node};
>  	};
>  	warn $@ if $@;
>      }
> @@ -1871,13 +1872,13 @@ my $get_filtered_vmlist = sub {
>  };
>  
>  # return all VMs which should get started/stopped on power up/down
> -my $get_start_stop_list = sub {
> +sub get_start_stop_list {
>      my ($nodename, $autostart, $vmfilter) = @_;
>  
>      # do not skip HA vms on force or if a specific VMID set is wanted
>      my $include_ha_managed = defined($vmfilter) ? 1 : 0;
>  
> -    my $vmlist = $get_filtered_vmlist->($nodename, $vmfilter, undef, $include_ha_managed);
> +    my $vmlist = get_filtered_vmlist($nodename, $vmfilter, undef, $include_ha_managed);
>  
>      my $resList = {};
>      foreach my $vmid (keys %$vmlist) {
> @@ -1889,15 +1890,16 @@ my $get_start_stop_list = sub {
>  
>  	$resList->{$order}->{$vmid} = $startup;
>  	$resList->{$order}->{$vmid}->{type} = $vmlist->{$vmid}->{type};
> +	$resList->{$order}->{$vmid}->{node} = $vmlist->{$vmid}->{node};
>      }
>  
>      return $resList;
> -};
> +}
>  
>  my $remove_locks_on_startup = sub {
>      my ($nodename) = @_;
>  
> -    my $vmlist = &$get_filtered_vmlist($nodename, undef, undef, 1);
> +    my $vmlist = get_filtered_vmlist($nodename, undef, undef, 1);
>  
>      foreach my $vmid (keys %$vmlist) {
>  	my $conf = $vmlist->{$vmid}->{conf};
> @@ -1983,7 +1985,7 @@ __PACKAGE__->register_method ({
>  	    warn $@ if $@;
>  
>  	    my $autostart = $force ? undef : 1;
> -	    my $startList = $get_start_stop_list->($nodename, $autostart, $param->{vms});
> +	    my $startList = get_start_stop_list($nodename, $autostart, $param->{vms});
>  
>  	    # Note: use numeric sorting with <=>
>  	    for my $order (sort {$a <=> $b} keys %$startList) {
> @@ -2096,7 +2098,7 @@ __PACKAGE__->register_method ({
>  		minimum => 0,
>  		maximum => 2 * 3600, # mostly arbitrary, but we do not want to high timeouts
>  	    },
> -	},
> +	}
>      },
>      returns => {
>  	type => 'string',
> @@ -2123,7 +2125,7 @@ __PACKAGE__->register_method ({
>  
>  	    $rpcenv->{type} = 'priv'; # to start tasks in background
>  
> -	    my $stopList = $get_start_stop_list->($nodename, undef, $param->{vms});
> +	    my $stopList = get_start_stop_list($nodename, undef, $param->{vms});
>  
>  	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
>  	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
> @@ -2246,7 +2248,7 @@ __PACKAGE__->register_method ({
>  
>  	    $rpcenv->{type} = 'priv'; # to start tasks in background
>  
> -	    my $toSuspendList = $get_start_stop_list->($nodename, undef, $param->{vms});
> +	    my $toSuspendList = get_start_stop_list($nodename, undef, $param->{vms});
>  
>  	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
>  	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
> @@ -2434,7 +2436,7 @@ __PACKAGE__->register_method ({
>  	my $code = sub {
>  	    $rpcenv->{type} = 'priv'; # to start tasks in background
>  
> -	    my $vmlist = &$get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
> +	    my $vmlist = get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
>  	    if (!scalar(keys %$vmlist)) {
>  		warn "no virtual guests matched, nothing to do..\n";
>  		return;



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pve-devel] [RFC PATCH manager] WIP: api: implement node-independent bulk actions
  2025-03-18 11:30 ` Stefan Hanreich
@ 2025-03-19  9:04   ` Dominik Csapak
  2025-03-20  8:44     ` Stefan Hanreich
  0 siblings, 1 reply; 4+ messages in thread
From: Dominik Csapak @ 2025-03-19  9:04 UTC (permalink / raw)
  To: Stefan Hanreich, Proxmox VE development discussion

On 3/18/25 12:30, Stefan Hanreich wrote:
> This would be really interesting for applying the SDN configuration as
> well, where I'm currently calling the existing bulk-call. Really like
> the design with the start / check callbacks, that should make this quite
> flexible.
> 
> On 3/18/25 11:39, Dominik Csapak wrote:
>> To achieve this, we start a worker task and use our generic api client
>> to start the tasks on the relevant nodes. The client always points
>> to 'localhost' so we let the pveproxy worry about the proxying etc.
>>
>> We reuse some logic from the startall/stopall/etc. calls, like getting
>> the ordered guest info list.
>>
>> Not yet implemented are:
>> * filters
>> * failure mode resolution (we could implement this later too)
>> * token handling (not sure if we need this at all if we check the
>>    permissions correctly upfront?)
>> * suspend
>> * some call specific parameters
>>
>> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
>> ---
>> this is a pre-requisite for having bulk actions on PDM.
>> Since we normally don't do such things 'cluster-wide' I wanted to
>> send the patch early to get feedback on my design decisons like:
>> * using the api client this way
>> * api naming/location
>> * handling parallel requests
>> * etc.
>>
>> There are alternative methods to achieve similar results:
>> * use some kind of queuing system on the cluster (e.g. via pmxcfs)
>> * using the 'startall'/'stopall' calls from pve in PDM
>> * surely some other thing I didn't think about
>>
>> We can of course start with this, and change the underlying mechanism
>> later too.
>>
>> If we go this route, I could also rewrite the code in rust if wanted,
>> since there is nothing particularly dependent on perl here
>> (besides getting the vmlist, but that could stay in perl).
>> The bulk of the logic is how to start tasks + handle them finishing +
>> handling filter + concurrency.
> 
> I'm actually reading the VM list in the firewall via this:
> https://git.proxmox.com/?p=proxmox-ve-rs.git;a=blob;f=proxmox-ve-config/src/guest/mod.rs;h=74fd8abc000aec0fa61898840d44ab8a4cd9018b;hb=HEAD#l69
> 
> So we could build upon that if we want to implement it in Rust?
> 
> I have something similar, *very* basic, implemented for running multiple
> tasks across clusters in my SDN patch series - so maybe we could
> repurpose that for a possible implementation, even generalize it?

Yeah sounds good if we want to do it this way, for my use case here we need to parse the config of
all guests though, not sure if we can do that in rust. maybe with just a minimal config like 'boot'
and such? Or we try to pull out the pve api types from pdm since there are parts of the config
already exposed i think...

> 
>>   PVE/API2/Cluster.pm       |   7 +
>>   PVE/API2/Cluster/Bulk.pm  | 475 ++++++++++++++++++++++++++++++++++++++
>>   PVE/API2/Cluster/Makefile |   1 +
>>   PVE/API2/Nodes.pm         |  24 +-
>>   4 files changed, 496 insertions(+), 11 deletions(-)
>>   create mode 100644 PVE/API2/Cluster/Bulk.pm
>>
>> diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
>> index a0e5c11b..478610e6 100644
>> --- a/PVE/API2/Cluster.pm
>> +++ b/PVE/API2/Cluster.pm
>> @@ -25,6 +25,7 @@ use PVE::API2::ACMEAccount;
>>   use PVE::API2::ACMEPlugin;
>>   use PVE::API2::Backup;
>>   use PVE::API2::Cluster::BackupInfo;
>> +use PVE::API2::Cluster::Bulk;
>>   use PVE::API2::Cluster::Ceph;
>>   use PVE::API2::Cluster::Mapping;
>>   use PVE::API2::Cluster::Jobs;
>> @@ -103,6 +104,11 @@ __PACKAGE__->register_method ({
>>       path => 'mapping',
>>   });
>>   
>> +__PACKAGE__->register_method ({
>> +    subclass => "PVE::API2::Cluster::Bulk",
>> +    path => 'bulk-actions',
>> +});
>> +
>>   if ($have_sdn) {
>>       __PACKAGE__->register_method ({
>>          subclass => "PVE::API2::Network::SDN",
>> @@ -162,6 +168,7 @@ __PACKAGE__->register_method ({
>>   	    { name => 'resources' },
>>   	    { name => 'status' },
>>   	    { name => 'tasks' },
>> +	    { name => 'bulk-actions' },
>>   	];
>>   
>>   	if ($have_sdn) {
>> diff --git a/PVE/API2/Cluster/Bulk.pm b/PVE/API2/Cluster/Bulk.pm
>> new file mode 100644
>> index 00000000..05a79155
>> --- /dev/null
>> +++ b/PVE/API2/Cluster/Bulk.pm
>> @@ -0,0 +1,475 @@
>> +package PVE::API2::Cluster::Bulk;
> 
> We might wanna think about using sub-paths already, since I can see this
> growing quite fast (at least a sub-path for SDN would be interesting). I
> don't know how many other potential use-cases there are aside from that.
> 

sure I would suggest it like this:

/cluster/bulk-actions/guest/{start,shutdown,...} -> PVE::API2::Cluster::Bulk(Actions?)::Guest;
/cluster/bulk-actions/sdn/{...} -> PVE::API2::Cluster::Bulk::SDN;

maybe in the future we can have:
/cluster/bulk-actions/node/{...} -> PVE::API2::Cluster::Bulk::Node;

>> +
>> +use strict;
>> +use warnings;
>> +
>> +use PVE::APIClient::LWP;
>> +use PVE::AccessControl;
>> +use PVE::Cluster;
>> +use PVE::Exception qw(raise raise_perm_exc raise_param_exc);
>> +use PVE::JSONSchema qw(get_standard_option);
>> +use PVE::RESTHandler;
>> +use PVE::RPCEnvironment;
>> +use PVE::Tools qw();
>> +
>> +use PVE::API2::Nodes;
>> +
>> +use base qw(PVE::RESTHandler);
>> +
>> +
>> +__PACKAGE__->register_method ({
>> +    name => 'index',
>> +    path => '',
>> +    method => 'GET',
>> +    description => "Bulk action index.",
>> +    permissions => { user => 'all' },
>> +    parameters => {
>> +	additionalProperties => 0,
>> +	properties => {},
>> +    },
>> +    returns => {
>> +	type => 'array',
>> +	items => {
>> +	    type => "object",
>> +	    properties => {},
>> +	},
>> +	links => [ { rel => 'child', href => "{name}" } ],
>> +    },
>> +    code => sub {
>> +	my ($param) = @_;
>> +
>> +	return [
>> +	    { name => 'start' },
>> +	    { name => 'shutdown' },
>> +	    { name => 'migrate' },
>> +	];
>> +    }});
>> +
>> +sub create_client {
>> +    my ($authuser, $request_timeout) = @_;
>> +    my ($user, undef) = PVE::AccessControl::split_tokenid($authuser, 1);
>> +
>> +    # TODO: How to handle Tokens?
>> +    my $ticket = PVE::AccessControl::assemble_ticket($user || $authuser);
>> +    my $csrf_token = PVE::AccessControl::assemble_csrf_prevention_token($user || $authuser);
>> +
>> +    my $node = PVE::INotify::nodename();
>> +    my $fingerprint = PVE::Cluster::get_node_fingerprint($node);
>> +
>> +    my $conn_args = {
>> +	protocol => 'https',
>> +	host => 'localhost', # always call the api locally, let pveproxy handle the proxying
>> +	port => 8006,
>> +	ticket => $ticket,
>> +	timeout => $request_timeout // 25, # default slightly shorter than the proxy->daemon timeout
>> +	cached_fingerprints => {
>> +	    $fingerprint => 1,
>> +	}
>> +    };
>> +
>> +    my $api_client = PVE::APIClient::LWP->new($conn_args->%*);
>> +    if (defined($csrf_token)) {
>> +	$api_client->update_csrftoken($csrf_token);
>> +    }
>> +
>> +    return $api_client;
>> +}
>> +
>> +# takes a vm list in the form of
>> +# {
>> +#     0 => {
>> +#         100 => { .. guest info ..},
>> +#         101 => { .. guest info ..},
>> +#     },
>> +#     1 => {
>> +#         102 => { .. guest info ..},
>> +#         103 => { .. guest info ..},
>> +#     },
>> +# }
>> +#
>> +# max_workers: how many parallel tasks should be started.
>> +# start_task: a sub that returns eiter a upid or 1 (undef means failure)
>> +# check_task: if start_task returned a upid, will wait for that to finish and
>> +#    call check_task with the resulting task status
>> +sub foreach_guest {
>> +    my ($startlist, $max_workers, $start_task, $check_task) = @_;
>> +
>> +    my $rpcenv = PVE::RPCEnvironment::get();
>> +    my $authuser = $rpcenv->get_user();
>> +    my $api_client = create_client($authuser);
>> +
>> +    my $failed = [];
>> +    for my $order (sort {$a <=> $b} keys $startlist->%*) {
>> +	my $vmlist = $startlist->{$order};
>> +	my $workers = {};
>> +
>> +	for my $vmid (sort {$a <=> $b} keys $vmlist->%*) {
>> +
>> +	    # wait until at least one slot is free
>> +	    while (scalar(keys($workers->%*)) >= $max_workers) {
>> +		for my $upid (keys($workers->%*)) {
>> +		    my $worker = $workers->{$upid};
>> +		    my $node = $worker->{guest}->{node};
>> +
>> +		    my $task = $api_client->get("/nodes/$node/tasks/$upid/status");
> 
> what happens if this call fails for any reason (iow status code != 2xx)?
> this is fallible afaict from a quick glance at the API client. should we
> handle errors here and retry / abort? We should certainly not just die
> here I'd say.

true, i'd probably do an eval {} here and have a failure mode (i'd begin with
'continue, but report an error at the end') that we could extend to
e.g. retry x-times, abort the job, ...


> 
>> +		    if ($task->{status} ne 'running') {
>> +			my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
>> +			push $failed->@*, $worker->{vmid} if $is_error;
>> +
>> +			$check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
>> +
>> +			delete $workers->{$upid};
>> +		    }
>> +		}
>> +		sleep(1); # How much?
>> +	    }
>> +
>> +	    my $guest = $vmlist->{$vmid};
>> +	    my $upid = eval { $start_task->($api_client, $vmid, $guest) };
>> +	    warn $@ if $@;
>> +
>> +	    # success but no task necessary
>> +	    next if "$upid" eq "1";
>> +
>> +	    if (!defined($upid)) {
>> +		push $failed->@*, $vmid;
>> +		continue;
>> +	    }
>> +
>> +	    $workers->{$upid} = {
>> +		vmid => $vmid,
>> +		guest => $guest,
>> +	    };
>> +	}
>> +
>> +	# wait until current order is finished
>> +	for my $upid (keys($workers->%*)) {
>> +	    my $worker = $workers->{$upid};
>> +	    my $node = $worker->{guest}->{node};
>> +
>> +	    my $task = wait_for_task_finished($api_client, $node, $upid);
>> +	    my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
>> +	    push $failed->@*, $worker->{vmid} if $is_error;
>> +
>> +	    $check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
>> +
>> +	    delete $workers->{$upid};
> 
> Maybe extract that into a function, since it seems to be the same code
> as above?
> 
> Or maybe even a do while would simplify things here? Haven't thought it
> through 100%, just an idea:
> 
>    do {
>     // check for terminated workers and reap them
>     // fill empty worker slots with new workers
>    }
>    while (workers_exist)
> 
> Would maybe simplify things and not require the waiting part at the end?

it's not so easy sadly since the two blocks are not the same

we have two different mechanisms here

we have worker slots (max_workers) that we want to fill.
while we are going through an order (e.g. for start/shutdown) we don't
want to start with the next order while there are still workers running so

while we can still ad workers, we loop over the existing ones until one is
finished and queue the next. at the end of the 'order' we have wait for all
remaining workers before continuing to the next order.

> 
>> +	}
>> +    }
>> +
>> +    return $failed;
>> +}
>> +
>> +sub get_type_text {
>> +    my ($type) = @_;
>> +
>> +    if ($type eq 'lxc') {
>> +	return 'CT';
>> +    } elsif ($type eq 'qemu') {
>> +	return 'VM';
>> +    } else {
>> +	die "unknown guest type $type\n";
>> +    }
>> +}
>> +
>> +sub wait_for_task_finished {
>> +    my ($client, $node, $upid) = @_;
>> +
>> +    while (1) {
>> +	my $task = $client->get("/nodes/$node/tasks/$upid/status");
> 
> also some error handling / retry logic? maybe just at the call site though..

yes, i'd handle error from this at the call site

> 
>> +	return $task if $task->{status} ne 'running';
>> +	sleep(1); # How much time?
>> +    }
>> +}
>> +
>> +sub check_guest_permissions {
>> +    my ($rpcenv, $authuser, $vmlist, $priv_list) = @_;
>> +
>> +    my @vms = PVE::Tools::split_list($vmlist);
>> +    if (scalar(@vms) > 0) {
>> +	$rpcenv->check($authuser, "/vms/$_", $priv_list) for @vms;
>> +    } elsif (!$rpcenv->check($authuser, "/", $priv_list, 1)) {
>> +	raise_perm_exc("/, VM.PowerMgmt");
>> +    }
>> +}
>> +
>> +__PACKAGE__->register_method({
>> +    name => 'start',
>> +    path => 'start',
>> +    method => 'POST',
>> +    description => "Bulk start all guests on the cluster.",
>> +    permissions => { user => 'all' },
> 
> Since the permissions check happens inside the API call, it would be
> nice to add a description here that states what checks are performed.
> 

true, i did not add all descriptions yet.>> +    protected => 1,
>> +    parameters => {
>> +	additionalProperties => 0,
>> +	properties => {
>> +	    vms => {
>> +		description => "Only consider guests from this comma separated list of VMIDs.",
>> +		type => 'string',  format => 'pve-vmid-list',
>> +		optional => 1,
>> +	    },
>> +	    'max-workers' => {
>> +		description => "How many parallel tasks at maximum should be started.",
>> +		optional => 1,
>> +		default => 1,
>> +		type => 'integer',
>> +	    },
>> +	    # TODO:
>> +	    # Failure resolution mode (fail, warn, retry?)
> 
> Do you intend to pass this to foreach_guests or should this be handled
> by check_task? Depending on this, you can ignore the comments above
> w.r.t. fallibility. If check_task should handle it, we should be really
> careful with how we handle the API call failures inside and return them
> to check_task, so we can make informed decisions there.

yes, my intention was to let foreach_guest handle that. (probably a different name
would be good when it does so many things)

> 
>> +	    # mode-limits (offline only, suspend only, ?)
>> +	    # filter (tags, name, ?)
>> +	},
>> +    },
>> +    returns => {
>> +	type => 'string',
>> +	description => "UPID of the worker",
>> +    },
>> +    code => sub {
>> +	my ($param) = @_;
>> +
>> +	my $rpcenv = PVE::RPCEnvironment::get();
>> +	my $authuser = $rpcenv->get_user();
>> +
>> +	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
>> +
>> +	my $code = sub {
>> +	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
>> +
>> +	    my $start_task = sub {
>> +		my ($api_client, $vmid, $guest) = @_;
>> +		my $node = $guest->{node};
>> +
>> +		my $type = $guest->{type};
>> +		my $type_text = get_type_text($type);
>> +
>> +		my $url = "/nodes/$node/$type/$vmid/status/start";
>> +		print STDERR "Starting $type_text $vmid\n";
>> +		return $api_client->post($url);
>> +	    };
>> +
>> +	    my $check_task = sub {
>> +		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
>> +		my $node = $guest->{node};
>> +
>> +		my $default_delay = 0;
>> +
>> +		if (!$is_error) {
>> +		    my $delay = defined($guest->{up}) ? int($guest->{up}) : $default_delay;
>> +		    if ($delay > 0) {
>> +			print STDERR "Waiting for $delay seconds (startup delay)\n" if $guest->{up};
>> +			for (my $i = 0; $i < $delay; $i++) {
>> +			    sleep(1);
>> +			}
>> +		    }
>> +		} else {
>> +		    my $type_text = get_type_text($guest->{type});
>> +		    print STDERR "Starting $type_text $vmid failed: $task->{exitstatus}\n";
>> +		}
>> +	    };
>> +
>> +	    my $max_workers = $param->{'max-workers'} // 1;
>> +	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
>> +
>> +	    if (scalar($failed->@*)) {
>> +		die "Some guests failed to start: " . join(', ', $failed->@*) . "\n";
>> +	    }
>> +	};
>> +
>> +	return $rpcenv->fork_worker('bulkstart', undef, $authuser, $code);
>> +    }});
>> +
>> +__PACKAGE__->register_method({
>> +    name => 'shutdown',
>> +    path => 'shutdown',
>> +    method => 'POST',
>> +    description => "Bulk shutdown all guests on the cluster.",
>> +    permissions => { user => 'all' },
> 
> description here as well
> 
>> +    protected => 1,
>> +    parameters => {
>> +	additionalProperties => 0,
>> +	properties => {
>> +	    vms => {
>> +		description => "Only consider guests from this comma separated list of VMIDs.",
>> +		type => 'string',  format => 'pve-vmid-list',
>> +		optional => 1,
>> +	    },
>> +	    timeout => {
>> +		description => "Default shutdown timeout in seconds if none is configured for the guest.",
>> +		type => 'integer',
>> +		default => 180,
>> +		optional => 1,
>> +	    },
>> +	    'max-workers' => {
>> +		description => "How many parallel tasks at maximum should be started.",
>> +		optional => 1,
>> +		default => 1,
>> +		type => 'integer',
>> +	    },
>> +	    # TODO:
>> +	    # Failure resolution mode (fail, warn, retry?)
>> +	    # mode-limits (offline only, suspend only, ?)
>> +	    # filter (tags, name, ?)
>> +	},
>> +    },
>> +    returns => {
>> +	type => 'string',
>> +	description => "UPID of the worker",
>> +    },
>> +    code => sub {
>> +	my ($param) = @_;
>> +
>> +	my $rpcenv = PVE::RPCEnvironment::get();
>> +	my $authuser = $rpcenv->get_user();
>> +
>> +	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
>> +
>> +	my $code = sub {
>> +	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
>> +
>> +	    # reverse order for shutdown
>> +	    for my $order (keys $startlist->%*) {
>> +		my $list = delete $startlist->{$order};
>> +		$order = $order * -1;
>> +		$startlist->{$order} = $list;
>> +	    }
>> +
>> +	    my $start_task = sub {
>> +		my ($api_client, $vmid, $guest) = @_;
>> +		my $node = $guest->{node};
>> +
>> +		my $type = $guest->{type};
>> +		my $type_text = get_type_text($type);
>> +
>> +		my $timeout = int($guest->{down} // $param->{timeout} // 180);
>> +
>> +		my $params = {
>> +		    forceStop => 1,
>> +		    timeout => $timeout,
>> +		};
>> +
>> +		my $url = "/nodes/$node/$type/$vmid/status/shutdown";
>> +		print STDERR "Shutting down $type_text $vmid (Timeout = $timeout seconds)\n";
>> +		return $api_client->post($url, $params);
>> +	    };
>> +
>> +	    my $check_task = sub {
>> +		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
>> +		my $node = $guest->{node};
>> +		    if ($is_error) {
>> +			my $type_text = get_type_text($guest->{type});
>> +			print STDERR "Stopping $type_text $vmid failed: $task->{exitstatus}\n";
>> +		    }
>> +	    };
>> +
>> +	    my $max_workers = $param->{'max-workers'} // 1;
>> +	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
>> +
>> +	    if (scalar($failed->@*)) {
>> +		die "Some guests failed to shutdown " . join(', ', $failed->@*) . "\n";
>> +	    }
>> +	};
>> +
>> +	return $rpcenv->fork_worker('bulkshutdown', undef, $authuser, $code);
>> +    }});
>> +
>> +__PACKAGE__->register_method({
>> +    name => 'migrate',
>> +    path => 'migrate',
>> +    method => 'POST',
>> +    description => "Bulk migrate all guests on the cluster.",
>> +    permissions => { user => 'all' },
> 
> description here as well
> 
>> +    protected => 1,
>> +    parameters => {
>> +	additionalProperties => 0,
>> +	properties => {
>> +	    vms => {
>> +		description => "Only consider guests from this comma separated list of VMIDs.",
>> +		type => 'string',  format => 'pve-vmid-list',
>> +		optional => 1,
>> +	    },
>> +	    'target-node' => get_standard_option('pve-node', { description => "Target node." }),
>> +	    online => {
>> +		type => 'boolean',
>> +		description => "Enable live migration for VMs and restart migration for CTs.",
>> +		optional => 1,
>> +	    },
>> +	    "with-local-disks" => {
>> +		type => 'boolean',
>> +		description => "Enable live storage migration for local disk",
>> +		optional => 1,
>> +	    },
>> +	    'max-workers' => {
>> +		description => "How many parallel tasks at maximum should be started.",
>> +		optional => 1,
>> +		default => 1,
>> +		type => 'integer',
>> +	    },
>> +	    # TODO:
>> +	    # Failure resolution mode (fail, warn, retry?)
>> +	    # mode-limits (offline only, suspend only, ?)
>> +	    # filter (tags, name, ?)
>> +	},
>> +    },
>> +    returns => {
>> +	type => 'string',
>> +	description => "UPID of the worker",
>> +    },
>> +    code => sub {
>> +	my ($param) = @_;
>> +
>> +	my $rpcenv = PVE::RPCEnvironment::get();
>> +	my $authuser = $rpcenv->get_user();
>> +
>> +	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.Migrate' ]);
>> +
>> +	my $code = sub {
>> +	    my $list = PVE::API2::Nodes::Nodeinfo::get_filtered_vmlist(undef, $param->{vms}, 1, 1);
>> +
>> +	    my $start_task = sub {
>> +		my ($api_client, $vmid, $guest) = @_;
>> +		my $node = $guest->{node};
>> +
>> +		my $type = $guest->{type};
>> +		my $type_text = get_type_text($type);
>> +
>> +		if ($node eq $param->{'target-node'}) {
>> +		    print STDERR "$type_text $vmid already on $node, skipping.\n";
>> +		    return 1;
>> +		}
>> +
>> +		my $params = {
>> +		    target => $param->{'target-node'},
>> +		};
>> +
>> +		if ($type eq 'lxc') {
>> +		    $params->{restart} = $param->{online} if defined($param->{online});
>> +		} elsif ($type eq 'qemu') {
>> +		    $params->{online} = $param->{online} if defined($param->{online});
>> +		    $params->{'with-local-disks'} = $param->{'with-local-disks'} if defined($param->{'with-local-disks'});
>> +		}
>> +
>> +		my $url = "/nodes/$node/$type/$vmid/migrate";
>> +		print STDERR "Migrating $type_text $vmid\n";
>> +		return $api_client->post($url, $params);
>> +	    };
>> +
>> +	    my $check_task = sub {
>> +		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
>> +		if ($is_error) {
>> +		    my $type_text = get_type_text($guest->{type});
>> +		    print STDERR "Migrating $type_text $vmid failed: $task->{exitstatus}\n";
>> +		}
>> +	    };
>> +
>> +	    my $max_workers = $param->{'max-workers'} // 1;
>> +	    my $failed = foreach_guest({ '0' => $list }, $max_workers, $start_task, $check_task);
>> +
>> +	    if (scalar($failed->@*)) {
>> +		die "Some guests failed to migrate " . join(', ', $failed->@*) . "\n";
>> +	    }
>> +	};
>> +
>> +	return $rpcenv->fork_worker('bulkmigrate', undef, $authuser, $code);
>> +    }});
>> +
>> +1;
>> diff --git a/PVE/API2/Cluster/Makefile b/PVE/API2/Cluster/Makefile
>> index b109e5cb..ed02b4be 100644
>> --- a/PVE/API2/Cluster/Makefile
>> +++ b/PVE/API2/Cluster/Makefile
>> @@ -6,6 +6,7 @@ SUBDIRS=Mapping
>>   # ensure we do not conflict with files shipped by pve-cluster!!
>>   PERLSOURCE= 			\
>>   	BackupInfo.pm		\
>> +	Bulk.pm			\
>>   	MetricServer.pm		\
>>   	Mapping.pm		\
>>   	Notifications.pm		\
>> diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
>> index 9cdf19db..02bc7299 100644
>> --- a/PVE/API2/Nodes.pm
>> +++ b/PVE/API2/Nodes.pm
>> @@ -1829,7 +1829,7 @@ __PACKAGE__->register_method({
>>   # * vmid whitelist
>>   # * guest is a template (default: skip)
>>   # * guest is HA manged (default: skip)
>> -my $get_filtered_vmlist = sub {
>> +sub get_filtered_vmlist {
>>       my ($nodename, $vmfilter, $templates, $ha_managed) = @_;
>>   
>>       my $vmlist = PVE::Cluster::get_vmlist();
>> @@ -1856,13 +1856,14 @@ my $get_filtered_vmlist = sub {
>>   		die "unknown virtual guest type '$d->{type}'\n";
>>   	    }
>>   
>> -	    my $conf = $class->load_config($vmid);
>> +	    my $conf = $class->load_config($vmid, $d->{node});
>>   	    return if !$templates && $class->is_template($conf);
>>   	    return if !$ha_managed && PVE::HA::Config::vm_is_ha_managed($vmid);
>>   
>>   	    $res->{$vmid}->{conf} = $conf;
>>   	    $res->{$vmid}->{type} = $d->{type};
>>   	    $res->{$vmid}->{class} = $class;
>> +	    $res->{$vmid}->{node} = $d->{node};
>>   	};
>>   	warn $@ if $@;
>>       }
>> @@ -1871,13 +1872,13 @@ my $get_filtered_vmlist = sub {
>>   };
>>   
>>   # return all VMs which should get started/stopped on power up/down
>> -my $get_start_stop_list = sub {
>> +sub get_start_stop_list {
>>       my ($nodename, $autostart, $vmfilter) = @_;
>>   
>>       # do not skip HA vms on force or if a specific VMID set is wanted
>>       my $include_ha_managed = defined($vmfilter) ? 1 : 0;
>>   
>> -    my $vmlist = $get_filtered_vmlist->($nodename, $vmfilter, undef, $include_ha_managed);
>> +    my $vmlist = get_filtered_vmlist($nodename, $vmfilter, undef, $include_ha_managed);
>>   
>>       my $resList = {};
>>       foreach my $vmid (keys %$vmlist) {
>> @@ -1889,15 +1890,16 @@ my $get_start_stop_list = sub {
>>   
>>   	$resList->{$order}->{$vmid} = $startup;
>>   	$resList->{$order}->{$vmid}->{type} = $vmlist->{$vmid}->{type};
>> +	$resList->{$order}->{$vmid}->{node} = $vmlist->{$vmid}->{node};
>>       }
>>   
>>       return $resList;
>> -};
>> +}
>>   
>>   my $remove_locks_on_startup = sub {
>>       my ($nodename) = @_;
>>   
>> -    my $vmlist = &$get_filtered_vmlist($nodename, undef, undef, 1);
>> +    my $vmlist = get_filtered_vmlist($nodename, undef, undef, 1);
>>   
>>       foreach my $vmid (keys %$vmlist) {
>>   	my $conf = $vmlist->{$vmid}->{conf};
>> @@ -1983,7 +1985,7 @@ __PACKAGE__->register_method ({
>>   	    warn $@ if $@;
>>   
>>   	    my $autostart = $force ? undef : 1;
>> -	    my $startList = $get_start_stop_list->($nodename, $autostart, $param->{vms});
>> +	    my $startList = get_start_stop_list($nodename, $autostart, $param->{vms});
>>   
>>   	    # Note: use numeric sorting with <=>
>>   	    for my $order (sort {$a <=> $b} keys %$startList) {
>> @@ -2096,7 +2098,7 @@ __PACKAGE__->register_method ({
>>   		minimum => 0,
>>   		maximum => 2 * 3600, # mostly arbitrary, but we do not want to high timeouts
>>   	    },
>> -	},
>> +	}
>>       },
>>       returns => {
>>   	type => 'string',
>> @@ -2123,7 +2125,7 @@ __PACKAGE__->register_method ({
>>   
>>   	    $rpcenv->{type} = 'priv'; # to start tasks in background
>>   
>> -	    my $stopList = $get_start_stop_list->($nodename, undef, $param->{vms});
>> +	    my $stopList = get_start_stop_list($nodename, undef, $param->{vms});
>>   
>>   	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
>>   	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
>> @@ -2246,7 +2248,7 @@ __PACKAGE__->register_method ({
>>   
>>   	    $rpcenv->{type} = 'priv'; # to start tasks in background
>>   
>> -	    my $toSuspendList = $get_start_stop_list->($nodename, undef, $param->{vms});
>> +	    my $toSuspendList = get_start_stop_list($nodename, undef, $param->{vms});
>>   
>>   	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
>>   	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
>> @@ -2434,7 +2436,7 @@ __PACKAGE__->register_method ({
>>   	my $code = sub {
>>   	    $rpcenv->{type} = 'priv'; # to start tasks in background
>>   
>> -	    my $vmlist = &$get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
>> +	    my $vmlist = get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
>>   	    if (!scalar(keys %$vmlist)) {
>>   		warn "no virtual guests matched, nothing to do..\n";
>>   		return;
> 



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pve-devel] [RFC PATCH manager] WIP: api: implement node-independent bulk actions
  2025-03-19  9:04   ` Dominik Csapak
@ 2025-03-20  8:44     ` Stefan Hanreich
  0 siblings, 0 replies; 4+ messages in thread
From: Stefan Hanreich @ 2025-03-20  8:44 UTC (permalink / raw)
  To: Dominik Csapak, Proxmox VE development discussion


On 3/19/25 10:04, Dominik Csapak wrote:
> On 3/18/25 12:30, Stefan Hanreich wrote:
>>> There are alternative methods to achieve similar results:
>>> * use some kind of queuing system on the cluster (e.g. via pmxcfs)
>>> * using the 'startall'/'stopall' calls from pve in PDM
>>> * surely some other thing I didn't think about
>>>
>>> We can of course start with this, and change the underlying mechanism
>>> later too.
>>>
>>> If we go this route, I could also rewrite the code in rust if wanted,
>>> since there is nothing particularly dependent on perl here
>>> (besides getting the vmlist, but that could stay in perl).
>>> The bulk of the logic is how to start tasks + handle them finishing +
>>> handling filter + concurrency.
>>
>> I'm actually reading the VM list in the firewall via this:
>> https://git.proxmox.com/?p=proxmox-ve-rs.git;a=blob;f=proxmox-ve-
>> config/src/guest/
>> mod.rs;h=74fd8abc000aec0fa61898840d44ab8a4cd9018b;hb=HEAD#l69
>>
>> So we could build upon that if we want to implement it in Rust?
>>
>> I have something similar, *very* basic, implemented for running multiple
>> tasks across clusters in my SDN patch series - so maybe we could
>> repurpose that for a possible implementation, even generalize it?
> 
> Yeah sounds good if we want to do it this way, for my use case here we
> need to parse the config of
> all guests though, not sure if we can do that in rust. maybe with just a
> minimal config like 'boot'
> and such? Or we try to pull out the pve api types from pdm since there
> are parts of the config
> already exposed i think...

Makes sense to leave it in Perl then, I just thought I'd point it out if
the guest list alone was the dealbreaker.


>>> diff --git a/PVE/API2/Cluster/Bulk.pm b/PVE/API2/Cluster/Bulk.pm
>>> new file mode 100644
>>> index 00000000..05a79155
>>> --- /dev/null
>>> +++ b/PVE/API2/Cluster/Bulk.pm
>>> @@ -0,0 +1,475 @@
>>> +package PVE::API2::Cluster::Bulk;
>>
>> We might wanna think about using sub-paths already, since I can see this
>> growing quite fast (at least a sub-path for SDN would be interesting). I
>> don't know how many other potential use-cases there are aside from that.
>>
> 
> sure I would suggest it like this:
> 
> /cluster/bulk-actions/guest/{start,shutdown,...} ->
> PVE::API2::Cluster::Bulk(Actions?)::Guest;
> /cluster/bulk-actions/sdn/{...} -> PVE::API2::Cluster::Bulk::SDN;
> 
> maybe in the future we can have:
> /cluster/bulk-actions/node/{...} -> PVE::API2::Cluster::Bulk::Node;
> 

fine with me!


>> Maybe extract that into a function, since it seems to be the same code
>> as above?
>>
>> Or maybe even a do while would simplify things here? Haven't thought it
>> through 100%, just an idea:
>>
>>    do {
>>     // check for terminated workers and reap them
>>     // fill empty worker slots with new workers
>>    }
>>    while (workers_exist)
>>
>> Would maybe simplify things and not require the waiting part at the end?
> 
> it's not so easy sadly since the two blocks are not the same
> 
> we have two different mechanisms here
> 
> we have worker slots (max_workers) that we want to fill.
> while we are going through an order (e.g. for start/shutdown) we don't
> want to start with the next order while there are still workers running so
> 
> while we can still ad workers, we loop over the existing ones until one is
> finished and queue the next. at the end of the 'order' we have wait for all
> remaining workers before continuing to the next order.
> 

Yeah, I thought it'd possible to do that loop for each order - but it
was just a quick thought I scribbled down to possibly avoid duplicating
code. I figured I'm probably missing something.



_______________________________________________
pve-devel mailing list
pve-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2025-03-20  8:45 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-03-18 10:39 [pve-devel] [RFC PATCH manager] WIP: api: implement node-independent bulk actions Dominik Csapak
2025-03-18 11:30 ` Stefan Hanreich
2025-03-19  9:04   ` Dominik Csapak
2025-03-20  8:44     ` Stefan Hanreich

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal