all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH qemu-server 6/7] migrate: add remote migration handling
Date: Tue, 13 Apr 2021 14:16:39 +0200	[thread overview]
Message-ID: <20210413121640.3602975-22-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20210413121640.3602975-1-f.gruenbichler@proxmox.com>

remote migration uses a websocket connection to a task worker running on
the target node instead of commands via SSH to control the migration.
this websocket tunnel is started earlier than the SSH tunnel, and allows
adding UNIX-socket forwarding over additional websocket connections
on-demand.

the main differences to regular intra-cluster migration are:
- source VM config and disks are not removed upon migration
- shared storages are treated like local storages, since we can't
assume they are shared across clusters
- NBD migrated disks are explicitly pre-allocated on the target node via
tunnel command before starting the target VM instance
- in addition to storages, network bridges and the VMID itself is
transformed via a user defined mapping

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---

Notes:
    requires
    - mtunnel endpoints on remote node
    - proxmox-websocket-tunnel on local node

 PVE/QemuMigrate.pm | 592 +++++++++++++++++++++++++++++++++++++++------
 PVE/QemuServer.pm  |   8 +-
 2 files changed, 521 insertions(+), 79 deletions(-)

diff --git a/PVE/QemuMigrate.pm b/PVE/QemuMigrate.pm
index 5d44c51..e1053d0 100644
--- a/PVE/QemuMigrate.pm
+++ b/PVE/QemuMigrate.pm
@@ -7,7 +7,13 @@ use IO::File;
 use IPC::Open2;
 use POSIX qw( WNOHANG );
 use Time::HiRes qw( usleep );
+use JSON qw(encode_json decode_json);
+use IO::Socket::UNIX;
+use Socket qw(SOCK_STREAM);
+use Storable qw(dclone);
+use URI::Escape;
 
+use PVE::APIClient::LWP;
 use PVE::Cluster;
 use PVE::INotify;
 use PVE::RPCEnvironment;
@@ -83,7 +89,7 @@ sub finish_command_pipe {
 	}
     }
 
-    $self->log('info', "ssh tunnel still running - terminating now with SIGTERM\n");
+    $self->log('info', "tunnel still running - terminating now with SIGTERM\n");
     kill(15, $cpid);
 
     # wait again
@@ -92,11 +98,11 @@ sub finish_command_pipe {
 	sleep(1);
     }
 
-    $self->log('info', "ssh tunnel still running - terminating now with SIGKILL\n");
+    $self->log('info', "tunnel still running - terminating now with SIGKILL\n");
     kill 9, $cpid;
     sleep 1;
 
-    $self->log('err', "ssh tunnel child process (PID $cpid) couldn't be collected\n")
+    $self->log('err', "tunnel child process (PID $cpid) couldn't be collected\n")
 	if !&$collect_child_process();
 }
 
@@ -113,18 +119,28 @@ sub read_tunnel {
     };
     die "reading from tunnel failed: $@\n" if $@;
 
-    chomp $output;
+    chomp $output if defined($output);
 
     return $output;
 }
 
 sub write_tunnel {
-    my ($self, $tunnel, $timeout, $command) = @_;
+    my ($self, $tunnel, $timeout, $command, $params) = @_;
 
     $timeout = 60 if !defined($timeout);
 
     my $writer = $tunnel->{writer};
 
+    if ($tunnel->{version} && $tunnel->{version} >= 2) {
+	my $object = defined($params) ? dclone($params) : {};
+	$object->{cmd} = $command;
+
+	$command = eval { JSON::encode_json($object) };
+
+	die "failed to encode command as JSON - $@\n"
+	    if $@;
+    }
+
     eval {
 	PVE::Tools::run_with_timeout($timeout, sub {
 	    print $writer "$command\n";
@@ -134,13 +150,29 @@ sub write_tunnel {
     die "writing to tunnel failed: $@\n" if $@;
 
     if ($tunnel->{version} && $tunnel->{version} >= 1) {
-	my $res = eval { $self->read_tunnel($tunnel, 10); };
+	my $res = eval { $self->read_tunnel($tunnel, $timeout); };
 	die "no reply to command '$command': $@\n" if $@;
 
-	if ($res eq 'OK') {
-	    return;
+	if ($tunnel->{version} == 1) {
+	    if ($res eq 'OK') {
+		return;
+	    } else {
+		die "tunnel replied '$res' to command '$command'\n";
+	    }
 	} else {
-	    die "tunnel replied '$res' to command '$command'\n";
+	    my $parsed = eval { JSON::decode_json($res) };
+	    die "failed to decode tunnel reply '$res' (command '$command') - $@\n"
+		if $@;
+
+	    if (!$parsed->{success}) {
+		if (defined($parsed->{msg})) {
+		    die "error - tunnel command '$command' failed - $parsed->{msg}\n";
+		} else {
+		    die "error - tunnel command '$command' failed\n";
+		}
+	    }
+
+	    return $parsed;
 	}
     }
 }
@@ -183,10 +215,149 @@ sub fork_tunnel {
     return $tunnel;
 }
 
+my $forward_unix_socket = sub {
+    my ($self, $local, $remote) = @_;
+
+    my $params = dclone($self->{tunnel}->{params});
+    $params->{unix} = $local;
+    $params->{url} = $params->{url} ."socket=$remote&";
+    $params->{ticket} = { path => $remote };
+
+    my $cmd = encode_json({
+	control => JSON::true,
+	cmd => 'forward',
+	data => $params,
+    });
+
+    my $writer = $self->{tunnel}->{writer};
+    eval {
+	unlink $local;
+	PVE::Tools::run_with_timeout(15, sub {
+	    print $writer "$cmd\n";
+	    $writer->flush();
+	});
+    };
+    die "failed to write forwarding command - $@\n" if $@;
+
+    $self->read_tunnel($self->{tunnel});
+
+    $self->log('info', "Forwarded local unix socket '$local' to remote '$remote' via websocket tunnel");
+};
+
+sub fork_websocket_tunnel {
+    my ($self, $storages) = @_;
+
+    my $remote = $self->{opts}->{remote};
+    my $conn = $remote->{conn};
+
+    my $websocket_url = "https://$conn->{host}:$conn->{port}/api2/json/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnelwebsocket";
+
+    my $params = {
+	url => $websocket_url,
+    };
+
+    if (my $apitoken = $conn->{apitoken}) {
+	$params->{headers} = [["Authorization", "$apitoken"]];
+    } else {
+	die "can't connect to remote host without credentials\n";
+    }
+
+    if (my $fps = $conn->{cached_fingerprints}) {
+	$params->{fingerprint} = (keys %$fps)[0];
+    }
+
+    my $api_client = PVE::APIClient::LWP->new(%$conn);
+    my $storage_list = join(',', keys %$storages);
+    my $res = $api_client->post("/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnel", { storages => $storage_list });
+    $self->log('info', "remote: started migration tunnel worker '$res->{upid}'");
+    $params->{url} .= "?ticket=".uri_escape($res->{ticket});
+    $params->{url} .= "&socket=$res->{socket}";
+
+    my $reader = IO::Pipe->new();
+    my $writer = IO::Pipe->new();
+
+    my $cpid = fork();
+    if ($cpid) {
+	$writer->writer();
+	$reader->reader();
+	my $tunnel = { writer => $writer, reader => $reader, pid => $cpid };
+
+	eval {
+	    my $writer = $tunnel->{writer};
+	    my $cmd = encode_json({
+		control => JSON::true,
+		cmd => 'connect',
+		data => $params,
+	    });
+
+	    eval {
+		PVE::Tools::run_with_timeout(15, sub {
+		    print {$writer} "$cmd\n";
+		    $writer->flush();
+		});
+	    };
+	    die "failed to write tunnel connect command - $@\n" if $@;
+	};
+	die "failed to connect via WS: $@\n" if $@;
+
+	my $err;
+        eval {
+	    my $writer = $tunnel->{writer};
+	    my $cmd = encode_json({
+		cmd => 'version',
+	    });
+
+	    eval {
+		PVE::Tools::run_with_timeout(15, sub {
+		    print {$writer} "$cmd\n";
+		    $writer->flush();
+		});
+	    };
+	    $err = "failed to write tunnel version command - $@\n" if $@;
+	    my $ver = $self->read_tunnel($tunnel, 10);
+	    $ver = JSON::decode_json($ver);
+	    $ver = $ver->{tunnel};
+
+	    if ($ver =~ /^(\d+)$/) {
+		$tunnel->{version} = $1;
+		$self->log('info', "tunnel info: $ver\n");
+	    } else {
+		$err = "received invalid tunnel version string '$ver'\n" if !$err;
+	    }
+	};
+	$err = $@ if !$err;
+
+	if ($err) {
+	    $self->finish_command_pipe($tunnel);
+	    die "can't open migration tunnel - $err";
+	}
+
+	$params->{url} = "$websocket_url?";
+	$tunnel->{params} = $params; # for forwarding
+
+	return $tunnel;
+    } else {
+	eval {
+	    $writer->reader();
+	    $reader->writer();
+	    PVE::Tools::run_command(
+		['proxmox-websocket-tunnel'],
+		input => "<&".fileno($writer),
+		output => ">&".fileno($reader),
+		errfunc => sub { my $line = shift; print "tunnel: $line\n"; },
+	    );
+	};
+	warn "CMD websocket tunnel died: $@\n" if $@;
+	exit 0;
+    }
+}
+
 sub finish_tunnel {
-    my ($self, $tunnel) = @_;
+    my ($self, $tunnel, $cleanup) = @_;
 
-    eval { $self->write_tunnel($tunnel, 30, 'quit'); };
+    $cleanup = $cleanup ? 1 : 0;
+
+    eval { $self->write_tunnel($tunnel, 30, 'quit', { cleanup => $cleanup }); };
     my $err = $@;
 
     $self->finish_command_pipe($tunnel, 30);
@@ -337,14 +508,19 @@ sub prepare {
 
     my $vollist = PVE::QemuServer::get_vm_volumes($conf);
 
+    my $storages = {};
     foreach my $volid (@$vollist) {
 	my ($sid, $volname) = PVE::Storage::parse_volume_id($volid, 1);
 
-	# check if storage is available on both nodes
 	my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
+	$storages->{$targetsid} = 1;
 
+	# check if storage is available on source node
 	my $scfg = PVE::Storage::storage_check_node($self->{storecfg}, $sid);
-	PVE::Storage::storage_check_node($self->{storecfg}, $targetsid, $self->{node});
+
+	# check if storage is available on target node
+	PVE::Storage::storage_check_node($self->{storecfg}, $targetsid, $self->{node})
+	    if !$self->{opts}->{remote};
 
 	if ($scfg->{shared}) {
 	    # PVE::Storage::activate_storage checks this for non-shared storages
@@ -354,10 +530,16 @@ sub prepare {
 	}
     }
 
-    # test ssh connection
-    my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
-    eval { $self->cmd_quiet($cmd); };
-    die "Can't connect to destination address using public key\n" if $@;
+    if ($self->{opts}->{remote}) {
+	# test & establish websocket connection
+	$self->{tunnel} = $self->fork_websocket_tunnel($storages);
+	print "websocket tunnel started\n";
+    } else {
+	# test ssh connection
+	my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
+	eval { $self->cmd_quiet($cmd); };
+	die "Can't connect to destination address using public key\n" if $@;
+    }
 
     return $running;
 }
@@ -395,7 +577,7 @@ sub sync_disks {
 	my @sids = PVE::Storage::storage_ids($storecfg);
 	foreach my $storeid (@sids) {
 	    my $scfg = PVE::Storage::storage_config($storecfg, $storeid);
-	    next if $scfg->{shared};
+	    next if $scfg->{shared} && !$self->{opts}->{remote};
 	    next if !PVE::Storage::storage_check_enabled($storecfg, $storeid, undef, 1);
 
 	    # get list from PVE::Storage (for unused volumes)
@@ -403,15 +585,17 @@ sub sync_disks {
 
 	    next if @{$dl->{$storeid}} == 0;
 
-	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $storeid);
-	    # check if storage is available on target node
-	    PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
+	    if (!$self->{opts}->{remote}) {
+		my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $storeid);
+		# check if storage is available on target node
+		PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
 
-	    # grandfather in existing mismatches
-	    if ($targetsid ne $storeid) {
-		my $target_scfg = PVE::Storage::storage_config($storecfg, $targetsid);
-		die "content type 'images' is not available on storage '$targetsid'\n"
-		    if !$target_scfg->{content}->{images};
+		# grandfather in existing mismatches
+		if ($targetsid ne $storeid) {
+		    my $target_scfg = PVE::Storage::storage_config($storecfg, $targetsid);
+		    die "content type 'images' is not available on storage '$targetsid'\n"
+			if !$target_scfg->{content}->{images};
+		}
 	    }
 
 	    PVE::Storage::foreach_volid($dl, sub {
@@ -456,12 +640,16 @@ sub sync_disks {
 
 	    my ($sid, $volname) = PVE::Storage::parse_volume_id($volid);
 
-	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
-	    # check if storage is available on both nodes
+	    # check if storage is available on source node
 	    my $scfg = PVE::Storage::storage_check_node($storecfg, $sid);
-	    PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
 
-	    return if $scfg->{shared};
+	    # check target storage on target node if intra-cluster migration
+	    if (!$self->{opts}->{remote}) {
+		my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
+		PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
+
+		return if $scfg->{shared};
+	    }
 
 	    $local_volumes->{$volid}->{ref} = $attr->{referenced_in_config} ? 'config' : 'snapshot';
 	    $local_volumes->{$volid}->{ref} = 'storage' if $attr->{is_unused};
@@ -543,6 +731,9 @@ sub sync_disks {
 
 	    my $migratable = $scfg->{type} =~ /^(?:dir|zfspool|lvmthin|lvm)$/;
 
+	    # TODO: what is this even here for?
+	    $migratable = 1 if $self->{opts}->{remote};
+
 	    die "can't migrate '$volid' - storage type '$scfg->{type}' not supported\n"
 		if !$migratable;
 
@@ -553,6 +744,9 @@ sub sync_disks {
 	}
 
 	if ($self->{replication_jobcfg}) {
+	    die "can't migrate VM with replicated volumes to remote cluster/node\n"
+		if $self->{opts}->{remote};
+
 	    if ($self->{running}) {
 
 		my $version = PVE::QemuServer::kvm_user_version();
@@ -615,6 +809,7 @@ sub sync_disks {
 
 	$self->log('info', "copying local disk images") if scalar(%$local_volumes);
 
+	my $forwarded = 0;
 	foreach my $volid (sort keys %$local_volumes) {
 	    my ($sid, $volname) = PVE::Storage::parse_volume_id($volid);
 	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
@@ -628,27 +823,121 @@ sub sync_disks {
 		next;
 	    } else {
 		next if $self->{replicated_volumes}->{$volid};
+
 		push @{$self->{volumes}}, $volid;
+		my $new_volid;
+		
 		my $opts = $self->{opts};
-		# use 'migrate' limit for transfer to other node
-		my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$targetsid, $sid], $opts->{bwlimit});
-		# JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
-		$bwlimit = $bwlimit * 1024 if defined($bwlimit);
-
-		my $storage_migrate_opts = {
-		    'ratelimit_bps' => $bwlimit,
-		    'insecure' => $opts->{migration_type} eq 'insecure',
-		    'with_snapshots' => $local_volumes->{$volid}->{snapshots},
-		    'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
-		};
-
-		my $logfunc = sub { $self->log('info', $_[0]); };
-		my $new_volid = eval {
-		    PVE::Storage::storage_migrate($storecfg, $volid, $self->{ssh_info},
-						  $targetsid, $storage_migrate_opts, $logfunc);
-		};
-		if (my $err = $@) {
-		    die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
+		if (my $remote = $opts->{remote}) {
+		    my $remote_vmid = $remote->{vmid};
+		    my (undef, $name, undef, undef, undef, undef, $format) = PVE::Storage::parse_volname($storecfg, $volid);
+		    my $scfg = PVE::Storage::storage_config($storecfg, $sid);
+		    PVE::Storage::activate_volumes($storecfg, [$volid]);
+
+		    # use 'migrate' limit for transfer to other node
+		    # TODO: get limit from remote cluster as well?
+		    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$sid], $opts->{bwlimit});
+		    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
+		    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
+
+		    my $with_snapshots = $local_volumes->{$volid}->{snapshots} ? 1 : 0;
+		    my $snapshot;
+		    if ($scfg->{type} eq 'zfspool') {
+			$snapshot = '__migration__';
+			$with_snapshots = 1;
+			PVE::Storage::volume_snapshot($storecfg, $volid, $snapshot);
+		    }
+
+		    if ($self->{vmid} != $remote_vmid) {
+			$name =~ s/-$self->{vmid}-/-$remote_vmid-/g;
+			$name =~ s/^$self->{vmid}\//$remote_vmid\//;
+		    }
+
+		    my @export_formats = PVE::Storage::volume_export_formats($storecfg, $volid, undef, undef, $with_snapshots);
+
+		    my $storage_migrate_opts = {
+			format => $format,
+			storage => $targetsid,
+			'with-snapshots' => $with_snapshots,
+			'allow-rename' => !$local_volumes->{$volid}->{is_vmstate},
+			'export-formats' => @export_formats,
+			volname => $name,
+		    };
+		    my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk-import', $storage_migrate_opts);
+		    my $local = "/run/qemu-server/$self->{vmid}.storage";
+		    if (!$forwarded) {
+			$forward_unix_socket->($self, $local, $res->{socket});
+			$forwarded = 1;
+		    }
+		    my $socket = IO::Socket::UNIX->new(Peer => $local, Type => SOCK_STREAM())
+			or die "failed to connect to websocket tunnel at $local\n";
+		    # we won't be reading from the socket
+		    shutdown($socket, 0);
+		    my $send = ['pvesm', 'export', $volid, $res->{format}, '-', '-with-snapshots', $with_snapshots];
+		    push @$send, '-snapshot', $snapshot if $snapshot;
+
+		    my @cstream;
+		    if (defined($bwlimit)) {
+			@cstream = ([ '/usr/bin/cstream', '-t', $bwlimit ]);
+			$self->log('info', "using a bandwidth limit of $bwlimit bps for transferring '$volid'");
+		    }
+
+		    eval {
+			PVE::Tools::run_command(
+			    [$send, @cstream],
+			    output => '>&'.fileno($socket),
+			    errfunc => sub { my $line = shift; $self->log('warn', $line); },
+			);
+		    };
+		    my $send_error = $@;
+
+		    # don't close the connection entirely otherwise the
+		    # receiving end might not get all buffered data (and
+		    # fails with 'connection reset by peer')
+		    shutdown($socket, 1);
+
+		    # wait for the remote process to finish
+		    while ($res = $self->write_tunnel($self->{tunnel}, 10, 'query-disk-import')) {
+			if ($res->{status} eq 'pending') {
+			    $self->log('info', "waiting for disk import to finish..\n");
+			    sleep(1)
+			} elsif ($res->{status} eq 'complete') {
+			    $new_volid = $res->{volid};
+			    last;
+			} else {
+			    die "unknown query-disk-import result: $res->{status}\n";
+			}
+		    }
+
+		    # now close the socket
+		    close($socket);
+		    die $send_error if $send_error;
+		} else {
+		    # use 'migrate' limit for transfer to other node
+		    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$sid], $opts->{bwlimit});
+		    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
+		    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
+		    my $storage_migrate_opts = {
+			'ratelimit_bps' => $bwlimit,
+			'insecure' => $opts->{migration_type} eq 'insecure',
+			'with_snapshots' => $local_volumes->{$volid}->{snapshots},
+			'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
+		    };
+
+		    my $logfunc = sub { $self->log('info', $_[0]); };
+		    $new_volid = eval {
+			PVE::Storage::storage_migrate(
+			    $storecfg,
+			    $volid,
+			    $self->{ssh_info},
+			    $targetsid,
+			    $storage_migrate_opts,
+			    $logfunc,
+			);
+		    };
+		    if (my $err = $@) {
+			die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
+		    }
 		}
 
 		$self->{volume_map}->{$volid} = $new_volid;
@@ -667,6 +956,12 @@ sub sync_disks {
 sub cleanup_remotedisks {
     my ($self) = @_;
 
+    if ($self->{opts}->{remote}) {
+	$self->finish_tunnel($self, $self->{tunnel}, 1);
+	delete $self->{tunnel};
+	return;
+    }
+
     foreach my $target_drive (keys %{$self->{target_drive}}) {
 	my $drivestr = $self->{target_drive}->{$target_drive}->{drivestr};
 	next if !defined($drivestr);
@@ -714,8 +1009,71 @@ sub phase1 {
     # sync_disks fixes disk sizes to match their actual size, write changes so
     # target allocates correct volumes
     PVE::QemuConfig->write_config($vmid, $conf);
+
+    $self->phase1_remote($vmid) if $self->{opts}->{remote};
 };
 
+sub phase1_remote {
+    my ($self, $vmid) = @_;
+
+    my $remote_conf = PVE::QemuConfig->load_config($vmid);
+    PVE::QemuConfig->update_volume_ids($remote_conf, $self->{volume_map});
+
+    my $storage_map = $self->{opts}->{storagemap};
+    $self->{nbd} = {};
+    PVE::QemuConfig->foreach_volume($remote_conf, sub {
+	my ($ds, $drive) = @_;
+
+	# TODO eject CDROM?
+	return if PVE::QemuServer::drive_is_cdrom($drive);
+
+	my $volid = $drive->{file};
+	return if !$volid;
+
+	return if !grep { $_ eq $volid} @{$self->{online_local_volumes}};
+
+	my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
+	my $scfg = PVE::Storage::storage_config($self->{storecfg}, $storeid);
+	my $source_format = PVE::QemuServer::qemu_img_format($scfg, $volname);
+
+	# set by target cluster
+	my $oldvolid = delete $drive->{file};
+	delete $drive->{format};
+
+	my $targetsid = PVE::QemuServer::map_id($storage_map, $storeid);
+
+	my $params = {
+	    format => $source_format,
+	    storage => $targetsid,
+	    drive => $drive,
+	};
+
+	$self->log('info', "Allocating volume for drive '$ds' on remote storage '$targetsid'..");
+	my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk', $params);
+
+	$self->log('info', "mapped: $oldvolid => $res->{volid}");
+	$remote_conf->{$ds} = $res->{drivestr};
+	$self->{nbd}->{$ds} = $res;
+    });
+
+    # TODO: check bridge availability earlier?
+    my $bridgemap = $self->{opts}->{bridgemap};
+    foreach my $opt (keys %$remote_conf) {
+	next if $opt !~ m/^net\d+$/;
+
+	next if !$remote_conf->{$opt};
+	my $d = PVE::QemuServer::parse_net($remote_conf->{$opt});
+	next if !$d || !$d->{bridge};
+
+	my $target_bridge = PVE::QemuServer::map_id($bridgemap, $d->{bridge});
+	$self->log('info', "mapped: $opt from $d->{bridge} to $target_bridge");
+	$d->{bridge} = $target_bridge;
+	$remote_conf->{$opt} = PVE::QemuServer::print_net($d);
+    }
+
+    $self->write_tunnel($self->{tunnel}, 10, 'config', { conf => $remote_conf });
+}
+
 sub phase1_cleanup {
     my ($self, $vmid, $err) = @_;
 
@@ -863,6 +1221,28 @@ sub phase2_start_local_cluster {
     return ($tunnel_info, $spice_port);
 }
 
+sub phase2_start_remote_cluster {
+    my ($self, $vmid, $params) = @_;
+
+    die "insecure migration to remote cluster not implemented\n"
+	if $params->{migrate_opts}->{type} ne 'websocket';
+
+    my $remote_vmid = $self->{opts}->{remote}->{vmid};
+
+    my $res = $self->write_tunnel($self->{tunnel}, 10, "start", $params);
+
+    foreach my $drive (keys %{$res->{drives}}) {
+	$self->{target_drive}->{$drive}->{drivestr} = $res->{drives}->{$drive}->{drivestr};
+	my $nbd_uri = $res->{drives}->{$drive}->{nbd_uri};
+	die "unexpected NBD uri for '$drive': $nbd_uri\n"
+	    if $nbd_uri !~ s!/run/qemu-server/$remote_vmid\_!/run/qemu-server/$vmid\_!;
+
+	$self->{target_drive}->{$drive}->{nbd_uri} = $nbd_uri;
+    }
+
+    return ($res->{migrate}, $res->{spice_port});
+}
+
 sub phase2 {
     my ($self, $vmid) = @_;
 
@@ -898,10 +1278,39 @@ sub phase2 {
 	},
     };
 
-    my ($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
+    my ($tunnel_info, $spice_port);
+
+    if (my $remote = $self->{opts}->{remote}) {
+	my $remote_vmid = $remote->{vmid};
+	$params->{migrate_opts}->{remote_node} = $self->{node};
+	($tunnel_info, $spice_port) = $self->phase2_start_remote_cluster($vmid, $params);
+	die "only UNIX sockets are supported for remote migration\n"
+	    if $tunnel_info->{proto} ne 'unix';
+
+	my $forwarded = {};
+	my $remote_socket = $tunnel_info->{addr};
+	my $local_socket = $remote_socket;
+	$local_socket =~ s/$remote_vmid/$vmid/g;
+	$tunnel_info->{addr} = $local_socket;
+
+	$self->log('info', "forwarding migration socket '$local_socket' => '$remote_socket'");
+	$forward_unix_socket->($self, $local_socket, $remote_socket);
+	$forwarded->{$local_socket} = 1;
+
+	foreach my $remote_socket (@{$tunnel_info->{unix_sockets}}) {
+	    my $local_socket = $remote_socket;
+	    $local_socket =~ s/$remote_vmid/$vmid/g;
+	    next if $forwarded->{$local_socket};
+	    $self->log('info', "forwarding migration socket '$local_socket' => '$remote_socket'");
+	    $forward_unix_socket->($self, $local_socket, $remote_socket);
+	    $forwarded->{$local_socket} = 1;
+	}
+    } else {
+	($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
 
-    $self->log('info', "start remote tunnel");
-    $self->start_remote_tunnel($tunnel_info);
+	$self->log('info', "start remote tunnel");
+	$self->start_remote_tunnel($tunnel_info);
+    }
 
     my $migrate_uri = "$tunnel_info->{proto}:$tunnel_info->{addr}";
     $migrate_uri .= ":$tunnel_info->{port}"
@@ -923,13 +1332,15 @@ sub phase2 {
 	    my $nbd_uri = $target->{nbd_uri};
 
 	    my $source_drive = PVE::QemuServer::parse_drive($drive, $conf->{$drive});
-	    my $target_drive = PVE::QemuServer::parse_drive($drive, $target->{drivestr});
-
 	    my $source_volid = $source_drive->{file};
-	    my $target_volid = $target_drive->{file};
-
 	    my $source_sid = PVE::Storage::Plugin::parse_volume_id($source_volid);
-	    my $target_sid = PVE::Storage::Plugin::parse_volume_id($target_volid);
+
+	    my ($target_volid, $target_sid);
+	    if (!$self->{opts}->{remote}) {
+		my $target_drive = PVE::QemuServer::parse_drive($drive, $target->{drivestr});
+		$target_volid = $target_drive->{file};
+		$target_sid = PVE::Storage::Plugin::parse_volume_id($target_volid);
+	    }
 
 	    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$source_sid, $target_sid], $opt_bwlimit);
 	    my $bitmap = $target->{bitmap};
@@ -937,8 +1348,10 @@ sub phase2 {
 	    $self->log('info', "$drive: start migration to $nbd_uri");
 	    PVE::QemuServer::qemu_drive_mirror($vmid, $drive, $nbd_uri, $vmid, undef, $self->{storage_migration_jobs}, 'skip', undef, $bwlimit, $bitmap);
 
-	    $self->{volume_map}->{$source_volid} = $target_volid;
-	    $self->log('info', "volume '$source_volid' is '$target_volid' on the target\n");
+	    if ($target_volid) {
+		$self->{volume_map}->{$source_volid} = $target_volid;
+		$self->log('info', "volume '$source_volid' is '$target_volid' on the target\n");
+	    }
 	}
     }
 
@@ -995,7 +1408,7 @@ sub phase2 {
     };
     $self->log('info', "migrate-set-parameters error: $@") if $@;
 
-    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga})) {
+    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga} && !$self->{opts}->{remote})) {
 	my $rpcenv = PVE::RPCEnvironment::get();
 	my $authuser = $rpcenv->get_user();
 
@@ -1159,11 +1572,15 @@ sub phase2_cleanup {
 
     my $nodename = PVE::INotify::nodename();
 
-    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
-    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
-    if (my $err = $@) {
-        $self->log('err', $err);
-        $self->{errors} = 1;
+    if ($self->{tunnel} && $self->{tunnel}->{version} >= 2) {
+	$self->write_tunnel($self->{tunnel}, 10, 'stop');
+    } else {
+	my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
+	eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
+	if (my $err = $@) {
+	    $self->log('err', $err);
+	    $self->{errors} = 1;
+	}
     }
 
     # cleanup after stopping, otherwise disks might be in-use by target VM!
@@ -1188,6 +1605,8 @@ sub phase3 {
     my $volids = $self->{volumes};
     return if $self->{phase2errors};
 
+    return if $self->{opts}->{remote};
+
     # destroy local copies
     foreach my $volid (@$volids) {
 	eval { PVE::Storage::vdisk_free($self->{storecfg}, $volid); };
@@ -1220,7 +1639,7 @@ sub phase3_cleanup {
 	}
     }
 
-    if ($self->{volume_map}) {
+    if ($self->{volume_map} && !$self->{opts}->{remote}) {
 	my $target_drives = $self->{target_drive};
 
 	# FIXME: for NBD storage migration we now only update the volid, and
@@ -1237,26 +1656,35 @@ sub phase3_cleanup {
 
     # transfer replication state before move config
     $self->transfer_replication_state() if $self->{is_replicated};
-    PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
+    if ($self->{opts}->{remote}) {
+	# TODO decide whether to remove or lock&keep here
+    } else {
+	PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
+    }
     $self->switch_replication_job_target() if $self->{is_replicated};
 
     if ($self->{livemigration}) {
 	if ($self->{stopnbd}) {
 	    $self->log('info', "stopping NBD storage migration server on target.");
 	    # stop nbd server on remote vm - requirement for resume since 2.9
-	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
+	    if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 2) {
+		$self->write_tunnel($tunnel, 30, 'nbdstop');
+	    } else {
+		my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
 
-	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
-	    if (my $err = $@) {
-		$self->log('err', $err);
-		$self->{errors} = 1;
+		eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
+		if (my $err = $@) {
+		    $self->log('err', $err);
+		    $self->{errors} = 1;
+		}
 	    }
 	}
 
 	# config moved and nbd server stopped - now we can resume vm on target
 	if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 1) {
+	    my $cmd = $tunnel->{version} == 1 ? "resume $vmid" : "resume";
 	    eval {
-		$self->write_tunnel($tunnel, 30, "resume $vmid");
+		$self->write_tunnel($tunnel, 30, $cmd);
 	    };
 	    if (my $err = $@) {
 		$self->log('err', $err);
@@ -1276,18 +1704,21 @@ sub phase3_cleanup {
 	}
 
 	if ($self->{storage_migration} && PVE::QemuServer::parse_guest_agent($conf)->{fstrim_cloned_disks} && $self->{running}) {
+	    # TODO mtunnel command
 	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 'fstrim'];
 	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
 	}
     }
 
     # close tunnel on successful migration, on error phase2_cleanup closed it
-    if ($tunnel) {
+    if ($tunnel && $tunnel->{version} == 1) {
 	eval { finish_tunnel($self, $tunnel);  };
 	if (my $err = $@) {
 	    $self->log('err', $err);
 	    $self->{errors} = 1;
 	}
+	$tunnel = undef;
+	delete $self->{tunnel};
     }
 
     eval {
@@ -1321,7 +1752,8 @@ sub phase3_cleanup {
 	$self->{errors} = 1;
     }
 
-    if($self->{storage_migration}) {
+    # TODO decide whether we want a "keep source VM" option for remote migration
+    if ($self->{storage_migration} && !$self->{opts}->{remote}) {
 	# destroy local copies
 	my $volids = $self->{online_local_volumes};
 
@@ -1340,8 +1772,14 @@ sub phase3_cleanup {
     }
 
     # clear migrate lock
-    my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
-    $self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
+    if ($tunnel && $tunnel->{version} >= 2) {
+	$self->write_tunnel($tunnel, 10, "unlock");
+
+	$self->finish_tunnel($tunnel);
+    } else {
+	my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
+	$self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
+    }
 }
 
 sub final_cleanup {
diff --git a/PVE/QemuServer.pm b/PVE/QemuServer.pm
index a131fc8..a713258 100644
--- a/PVE/QemuServer.pm
+++ b/PVE/QemuServer.pm
@@ -5094,7 +5094,11 @@ sub vm_start_nolock {
     my $defaults = load_defaults();
 
     # set environment variable useful inside network script
-    $ENV{PVE_MIGRATED_FROM} = $migratedfrom if $migratedfrom;
+    if ($migrate_opts->{remote_node}) {
+	$ENV{PVE_MIGRATED_FROM} = $migrate_opts->{remote_node};
+    } elsif ($migratedfrom) {
+	$ENV{PVE_MIGRATED_FROM} = $migratedfrom;
+    }
 
     PVE::GuestHelpers::exec_hookscript($conf, $vmid, 'pre-start', 1);
 
@@ -5310,7 +5314,7 @@ sub vm_start_nolock {
 
 	my $migrate_storage_uri;
 	# nbd_protocol_version > 0 for unix socket support
-	if ($nbd_protocol_version > 0 && $migration_type eq 'secure') {
+	if ($nbd_protocol_version > 0 && ($migration_type eq 'secure' || $migration_type eq 'websocket')) {
 	    my $socket_path = "/run/qemu-server/$vmid\_nbd.migrate";
 	    mon_cmd($vmid, "nbd-server-start", addr => { type => 'unix', data => { path => $socket_path } } );
 	    $migrate_storage_uri = "nbd:unix:$socket_path";
-- 
2.20.1





  parent reply	other threads:[~2021-04-13 12:17 UTC|newest]

Thread overview: 40+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-04-13 12:16 [pve-devel] [RFC qemu-server++ 0/22] remote migration Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH proxmox 1/2] websocket: make field public Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH proxmox 2/2] websocket: adapt for client connection Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH proxmox-websocket-tunnel 1/2] initial commit Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH proxmox-websocket-tunnel 2/2] add tunnel implementation Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH access-control 1/2] tickets: add tunnel ticket Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH access-control 2/2] ticket: normalize path for verification Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH cluster 1/4] remote.cfg: add new config file Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH cluster 2/4] add get_remote_info Fabian Grünbichler
2021-04-18 17:07   ` Thomas Lamprecht
2021-04-19  7:48     ` Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH cluster 3/4] remote: add option/completion Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH cluster 4/4] get_remote_info: also return FP if available Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH common 1/2] schema: pull out abstract 'id-pair' verifier Fabian Grünbichler
2021-04-16 10:24   ` [pve-devel] applied: " Thomas Lamprecht
2021-04-19  8:43     ` [pve-devel] [PATCH common] fixup: remove double braces Stefan Reiter
2021-04-19  9:56       ` [pve-devel] applied: " Thomas Lamprecht
2021-04-13 12:16 ` [pve-devel] [PATCH common 2/2] schema: add pve-bridge-id option/format/pair Fabian Grünbichler
2021-04-16  9:53   ` Thomas Lamprecht
2021-04-16 10:10     ` Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH guest-common] migrate: handle migration_network with remote migration Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH manager] API: add node address(es) API endpoint Fabian Grünbichler
2021-04-16 10:17   ` Thomas Lamprecht
2021-04-16 11:37     ` Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH storage] import: allow import from UNIX socket Fabian Grünbichler
2021-04-16 10:24   ` [pve-devel] applied: " Thomas Lamprecht
2021-04-13 12:16 ` [pve-devel] [PATCH qemu-server 1/7] migrate: factor out storage checks Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH qemu-server 2/7] refactor map_storage to map_id Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH qemu-server 3/7] schema: use pve-bridge-id Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH qemu-server 4/7] mtunnel: add API endpoints Fabian Grünbichler
2021-04-13 12:16 ` [pve-devel] [PATCH qemu-server 5/7] migrate: refactor remote VM/tunnel start Fabian Grünbichler
2021-04-13 12:16 ` Fabian Grünbichler [this message]
2021-04-13 12:16 ` [pve-devel] [PATCH qemu-server 7/7] api: add remote migrate endpoint Fabian Grünbichler
2021-04-15 14:04 ` [pve-devel] [RFC qemu-server++ 0/22] remote migration alexandre derumier
2021-04-15 14:32   ` Fabian Grünbichler
2021-04-15 14:36     ` Thomas Lamprecht
2021-04-15 16:38     ` Moula BADJI
2021-05-05  6:02       ` aderumier
2021-05-05  9:22         ` Dominik Csapak
2021-04-16  7:36     ` alexandre derumier

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210413121640.3602975-22-f.gruenbichler@proxmox.com \
    --to=f.gruenbichler@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal