From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id ECE787D18A for ; Mon, 8 Nov 2021 14:08:38 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4F387252B2 for ; Mon, 8 Nov 2021 14:08:08 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 8728F2518B for ; Mon, 8 Nov 2021 14:08:00 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 5F5D1422F3 for ; Mon, 8 Nov 2021 14:08:00 +0100 (CET) From: Dominik Csapak To: pve-devel@lists.proxmox.com Date: Mon, 8 Nov 2021 14:07:54 +0100 Message-Id: <20211108130758.160914-4-d.csapak@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20211108130758.160914-1-d.csapak@proxmox.com> References: <20211108130758.160914-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.235 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pve-devel] [PATCH manager v2 2/6] add PVE/Jobs to handle VZDump jobs X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 08 Nov 2021 13:08:39 -0000 this adds a SectionConfig handling for jobs (only 'vzdump' for now) that represents a job that will be handled by pvescheduler and a basic 'job-state' handling for reading/writing state json files this has some intersections with pvesrs state handling, but does not use a single state file for all jobs, but seperate ones, like we do it in the backup-server. Signed-off-by: Dominik Csapak --- PVE/Jobs.pm | 286 +++++++++++++++++++++++++++++++++++++++++++++ PVE/Jobs/Makefile | 16 +++ PVE/Jobs/Plugin.pm | 61 ++++++++++ PVE/Jobs/VZDump.pm | 54 +++++++++ PVE/Makefile | 3 +- 5 files changed, 419 insertions(+), 1 deletion(-) create mode 100644 PVE/Jobs.pm create mode 100644 PVE/Jobs/Makefile create mode 100644 PVE/Jobs/Plugin.pm create mode 100644 PVE/Jobs/VZDump.pm diff --git a/PVE/Jobs.pm b/PVE/Jobs.pm new file mode 100644 index 00000000..2fe197d2 --- /dev/null +++ b/PVE/Jobs.pm @@ -0,0 +1,286 @@ +package PVE::Jobs; + +use strict; +use warnings; +use JSON; + +use PVE::Cluster qw(cfs_read_file cfs_lock_file); +use PVE::Jobs::Plugin; +use PVE::Jobs::VZDump; +use PVE::Tools; + +PVE::Jobs::VZDump->register(); +PVE::Jobs::Plugin->init(); + +my $state_dir = "/var/lib/pve-manager/jobs"; +my $lock_dir = "/var/lock/pve-manager"; + +my $get_state_file = sub { + my ($jobid, $type) = @_; + return "$state_dir/$type-$jobid.json"; +}; + +my $default_state = { + state => 'created', + time => 0, +}; + +# lockless, since we use file_get_contents, which is atomic +sub read_job_state { + my ($jobid, $type) = @_; + my $path = $get_state_file->($jobid, $type); + return if ! -e $path; + + my $raw = PVE::Tools::file_get_contents($path); + + return $default_state if $raw eq ''; + + # untaint $raw + if ($raw =~ m/^(\{.*\})$/) { + return decode_json($1); + } + + die "invalid json data in '$path'\n"; +} + +sub lock_job_state { + my ($jobid, $type, $sub) = @_; + + my $filename = "$lock_dir/$type-$jobid.lck"; + + my $res = PVE::Tools::lock_file($filename, 10, $sub); + die $@ if $@; + + return $res; +} + +my $get_job_status = sub { + my ($state) = @_; + + if (!defined($state->{upid})) { + return; # not started + } + + my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1); + die "unable to parse worker upid - $state->{upid}\n" if !$task; + die "no such task\n" if ! -f $filename; + + my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid}); + if ($pstart && $pstart == $task->{pstart}) { + return; # still running + } + + return PVE::Tools::upid_read_status($state->{upid}); +}; + +# checks if the job is already finished if it was started before and +# updates the statefile accordingly +sub update_job_stopped { + my ($jobid, $type) = @_; + + # first check unlocked to save time, + my $state = read_job_state($jobid, $type); + return if !defined($state) || $state->{state} ne 'started'; # removed or not started + + if (defined($get_job_status->($state))) { + lock_job_state($jobid, $type, sub { + my $state = read_job_state($jobid, $type); + return if !defined($state) || $state->{state} ne 'started'; # removed or not started + + my $status = $get_job_status->($state); + + my $new_state = { + state => 'stopped', + msg => $status, + upid => $state->{upid} + }; + + if ($state->{updated}) { # save updated time stamp + $new_state->{updated} = $state->{updated}; + } + + my $path = $get_state_file->($jobid, $type); + PVE::Tools::file_set_contents($path, encode_json($new_state)); + }); + } +} + +# must be called when the job is first created +sub create_job { + my ($jobid, $type) = @_; + + lock_job_state($jobid, $type, sub { + my $state = read_job_state($jobid, $type) // $default_state; + + if ($state->{state} ne 'created') { + die "job state already exists\n"; + } + + $state->{time} = time(); + + my $path = $get_state_file->($jobid, $type); + PVE::Tools::file_set_contents($path, encode_json($state)); + }); +} + +# to be called when the job is removed +sub remove_job { + my ($jobid, $type) = @_; + my $path = $get_state_file->($jobid, $type); + unlink $path; +} + +# checks if the job can be started and sets the state to 'starting' +# returns 1 if the job can be started, 0 otherwise +sub starting_job { + my ($jobid, $type) = @_; + + # first check unlocked to save time + my $state = read_job_state($jobid, $type); + return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started + + lock_job_state($jobid, $type, sub { + my $state = read_job_state($jobid, $type); + return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started + + my $new_state = { + state => 'starting', + time => time(), + }; + + my $path = $get_state_file->($jobid, $type); + PVE::Tools::file_set_contents($path, encode_json($new_state)); + }); + return 1; +} + +sub started_job { + my ($jobid, $type, $upid, $err) = @_; + lock_job_state($jobid, $type, sub { + my $state = read_job_state($jobid, $type); + return if !defined($state); # job was removed, do not update + die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting'; + + my $new_state; + if (defined($err)) { + $new_state = { + state => 'stopped', + msg => $err, + time => time(), + }; + } else { + $new_state = { + state => 'started', + upid => $upid, + }; + } + + my $path = $get_state_file->($jobid, $type); + PVE::Tools::file_set_contents($path, encode_json($new_state)); + }); +} + +# will be called when the job schedule is updated +sub updated_job_schedule { + my ($jobid, $type) = @_; + lock_job_state($jobid, $type, sub { + my $old_state = read_job_state($jobid, $type) // $default_state; + + $old_state->{updated} = time(); + + my $path = $get_state_file->($jobid, $type); + PVE::Tools::file_set_contents($path, encode_json($old_state)); + }); +} + +sub get_last_runtime { + my ($jobid, $type) = @_; + + my $state = read_job_state($jobid, $type) // $default_state; + + return $state->{updated} if defined($state->{updated}); + + if (my $upid = $state->{upid}) { + my ($task) = PVE::Tools::upid_decode($upid, 1); + die "unable to parse worker upid\n" if !$task; + return $task->{starttime}; + } + + return $state->{time} // 0; +} + +sub run_jobs { + synchronize_job_states_with_config(); + + my $jobs_cfg = cfs_read_file('jobs.cfg'); + my $nodename = PVE::INotify::nodename(); + + foreach my $id (sort keys %{$jobs_cfg->{ids}}) { + my $cfg = $jobs_cfg->{ids}->{$id}; + my $type = $cfg->{type}; + my $schedule = delete $cfg->{schedule}; + + # only schedule local jobs + next if defined($cfg->{node}) && $cfg->{node} ne $nodename; + + eval { + update_job_stopped($id, $type); + }; + if (my $err = $@) { + warn "could not update job state, skipping - $err\n"; + next; + } + + # only schedule enabled jobs + next if defined($cfg->{enabled}) && !$cfg->{enabled}; + + my $last_run = get_last_runtime($id, $type); + my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule); + my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run) // 0; + + if (time() >= $next_sync) { + my $plugin = PVE::Jobs::Plugin->lookup($type); + if (starting_job($id, $type)) { + my $upid = eval { $plugin->run($cfg) }; + if (my $err = $@) { + warn $@ if $@; + started_job($id, $type, undef, $err); + } elsif ($upid eq 'OK') { # some jobs return OK immediatly + started_job($id, $type, undef, 'OK'); + } else { + started_job($id, $type, $upid); + } + } + } + } +} + +# creates and removes statefiles for job configs +sub synchronize_job_states_with_config { + cfs_lock_file('jobs.cfg', undef, sub { + my $data = cfs_read_file('jobs.cfg'); + + for my $id (keys $data->{ids}->%*) { + my $job = $data->{ids}->{$id}; + my $type = $job->{type}; + my $jobstate = read_job_state($id, $type); + create_job($id, $type) if !defined($jobstate); + } + + PVE::Tools::dir_glob_foreach($state_dir, '(.*?)-(.*).json', sub { + my ($path, $type, $id) = @_; + + if (!defined($data->{ids}->{$id})) { + remove_job($id, $type); + } + }); + }); + die $@ if $@; +} + +sub setup_dirs { + mkdir $state_dir; + mkdir $lock_dir; +} + +1; diff --git a/PVE/Jobs/Makefile b/PVE/Jobs/Makefile new file mode 100644 index 00000000..6023c3ba --- /dev/null +++ b/PVE/Jobs/Makefile @@ -0,0 +1,16 @@ +include ../../defines.mk + +PERLSOURCE = \ + Plugin.pm\ + VZDump.pm + +all: + +.PHONY: clean +clean: + rm -rf *~ + +.PHONY: install +install: ${PERLSOURCE} + install -d ${PERLLIBDIR}/PVE/Jobs + install -m 0644 ${PERLSOURCE} ${PERLLIBDIR}/PVE/Jobs diff --git a/PVE/Jobs/Plugin.pm b/PVE/Jobs/Plugin.pm new file mode 100644 index 00000000..69c31cf2 --- /dev/null +++ b/PVE/Jobs/Plugin.pm @@ -0,0 +1,61 @@ +package PVE::Jobs::Plugin; + +use strict; +use warnings; + +use PVE::Cluster qw(cfs_register_file); + +use base qw(PVE::SectionConfig); + +cfs_register_file('jobs.cfg', + sub { __PACKAGE__->parse_config(@_); }, + sub { __PACKAGE__->write_config(@_); }); + +my $defaultData = { + propertyList => { + type => { description => "Section type." }, + id => { + description => "The ID of the VZDump job.", + type => 'string', + format => 'pve-configid', + }, + enabled => { + description => "Determines if the job is enabled.", + type => 'boolean', + default => 1, + optional => 1, + }, + schedule => { + description => "Backup schedule. The format is a subset of `systemd` calendar events.", + type => 'string', format => 'pve-calendar-event', + maxLength => 128, + }, + }, +}; + +sub private { + return $defaultData; +} + +sub parse_config { + my ($class, $filename, $raw) = @_; + + my $cfg = $class->SUPER::parse_config($filename, $raw); + + foreach my $id (sort keys %{$cfg->{ids}}) { + my $data = $cfg->{ids}->{$id}; + + $data->{id} = $id; + $data->{enabled} //= 1; + } + + return $cfg; +} + +sub run { + my ($class, $cfg) = @_; + # implement in subclass + die "not implemented"; +} + +1; diff --git a/PVE/Jobs/VZDump.pm b/PVE/Jobs/VZDump.pm new file mode 100644 index 00000000..043b7ace --- /dev/null +++ b/PVE/Jobs/VZDump.pm @@ -0,0 +1,54 @@ +package PVE::Jobs::VZDump; + +use strict; +use warnings; + +use PVE::INotify; +use PVE::VZDump::Common; +use PVE::API2::VZDump; +use PVE::Cluster; + +use base qw(PVE::Jobs::Plugin); + +sub type { + return 'vzdump'; +} + +my $props = PVE::VZDump::Common::json_config_properties(); + +sub properties { + return $props; +} + +sub options { + my $options = { + enabled => { optional => 1 }, + schedule => {}, + }; + foreach my $opt (keys %$props) { + if ($props->{$opt}->{optional}) { + $options->{$opt} = { optional => 1 }; + } else { + $options->{$opt} = {}; + } + } + + return $options; +} + +sub run { + my ($class, $conf) = @_; + + # remove all non vzdump related options + foreach my $opt (keys %$conf) { + delete $conf->{$opt} if !defined($props->{$opt}); + } + + $conf->{quiet} = 1; # do not write to stdout/stderr + + PVE::Cluster::cfs_update(); # refresh vmlist + + return PVE::API2::VZDump->vzdump($conf); +} + +1; diff --git a/PVE/Makefile b/PVE/Makefile index 0071fab1..48b85d33 100644 --- a/PVE/Makefile +++ b/PVE/Makefile @@ -1,6 +1,6 @@ include ../defines.mk -SUBDIRS=API2 Status CLI Service Ceph +SUBDIRS=API2 Status CLI Service Ceph Jobs PERLSOURCE = \ API2.pm \ @@ -11,6 +11,7 @@ PERLSOURCE = \ CertHelpers.pm \ ExtMetric.pm \ HTTPServer.pm \ + Jobs.pm \ NodeConfig.pm \ Report.pm \ VZDump.pm -- 2.30.2