all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH manager v2 2/6] add PVE/Jobs to handle VZDump jobs
Date: Mon,  8 Nov 2021 14:07:54 +0100	[thread overview]
Message-ID: <20211108130758.160914-4-d.csapak@proxmox.com> (raw)
In-Reply-To: <20211108130758.160914-1-d.csapak@proxmox.com>

this adds a SectionConfig handling for jobs (only 'vzdump' for now) that
represents a job that will be handled by pvescheduler and a basic
'job-state' handling for reading/writing state json files

this has some intersections with pvesrs state handling, but does not
use a single state file for all jobs, but seperate ones, like we
do it in the backup-server.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 PVE/Jobs.pm        | 286 +++++++++++++++++++++++++++++++++++++++++++++
 PVE/Jobs/Makefile  |  16 +++
 PVE/Jobs/Plugin.pm |  61 ++++++++++
 PVE/Jobs/VZDump.pm |  54 +++++++++
 PVE/Makefile       |   3 +-
 5 files changed, 419 insertions(+), 1 deletion(-)
 create mode 100644 PVE/Jobs.pm
 create mode 100644 PVE/Jobs/Makefile
 create mode 100644 PVE/Jobs/Plugin.pm
 create mode 100644 PVE/Jobs/VZDump.pm

diff --git a/PVE/Jobs.pm b/PVE/Jobs.pm
new file mode 100644
index 00000000..2fe197d2
--- /dev/null
+++ b/PVE/Jobs.pm
@@ -0,0 +1,286 @@
+package PVE::Jobs;
+
+use strict;
+use warnings;
+use JSON;
+
+use PVE::Cluster qw(cfs_read_file cfs_lock_file);
+use PVE::Jobs::Plugin;
+use PVE::Jobs::VZDump;
+use PVE::Tools;
+
+PVE::Jobs::VZDump->register();
+PVE::Jobs::Plugin->init();
+
+my $state_dir = "/var/lib/pve-manager/jobs";
+my $lock_dir = "/var/lock/pve-manager";
+
+my $get_state_file = sub {
+    my ($jobid, $type) = @_;
+    return "$state_dir/$type-$jobid.json";
+};
+
+my $default_state = {
+    state => 'created',
+    time => 0,
+};
+
+# lockless, since we use file_get_contents, which is atomic
+sub read_job_state {
+    my ($jobid, $type) = @_;
+    my $path = $get_state_file->($jobid, $type);
+    return if ! -e $path;
+
+    my $raw = PVE::Tools::file_get_contents($path);
+
+    return $default_state if $raw eq '';
+
+    # untaint $raw
+    if ($raw =~ m/^(\{.*\})$/) {
+	return decode_json($1);
+    }
+
+    die "invalid json data in '$path'\n";
+}
+
+sub lock_job_state {
+    my ($jobid, $type, $sub) = @_;
+
+    my $filename = "$lock_dir/$type-$jobid.lck";
+
+    my $res = PVE::Tools::lock_file($filename, 10, $sub);
+    die $@ if $@;
+
+    return $res;
+}
+
+my $get_job_status = sub {
+    my ($state) = @_;
+
+    if (!defined($state->{upid})) {
+	return; # not started
+    }
+
+    my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
+    die "unable to parse worker upid - $state->{upid}\n" if !$task;
+    die "no such task\n" if ! -f $filename;
+
+    my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
+    if ($pstart && $pstart == $task->{pstart}) {
+	return; # still running
+    }
+
+    return PVE::Tools::upid_read_status($state->{upid});
+};
+
+# checks if the job is already finished if it was started before and
+# updates the statefile accordingly
+sub update_job_stopped {
+    my ($jobid, $type) = @_;
+
+    # first check unlocked to save time,
+    my $state = read_job_state($jobid, $type);
+    return if !defined($state) || $state->{state} ne 'started'; # removed or not started
+
+    if (defined($get_job_status->($state))) {
+	lock_job_state($jobid, $type, sub {
+	    my $state = read_job_state($jobid, $type);
+	    return if !defined($state) || $state->{state} ne 'started'; # removed or not started
+
+	    my $status = $get_job_status->($state);
+
+	    my $new_state = {
+		state => 'stopped',
+		msg => $status,
+		upid => $state->{upid}
+	    };
+
+	    if ($state->{updated}) { # save updated time stamp
+		$new_state->{updated} = $state->{updated};
+	    }
+
+	    my $path = $get_state_file->($jobid, $type);
+	    PVE::Tools::file_set_contents($path, encode_json($new_state));
+	});
+    }
+}
+
+# must be called when the job is first created
+sub create_job {
+    my ($jobid, $type) = @_;
+
+    lock_job_state($jobid, $type, sub {
+	my $state = read_job_state($jobid, $type) // $default_state;
+
+	if ($state->{state} ne 'created') {
+	    die "job state already exists\n";
+	}
+
+	$state->{time} = time();
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($state));
+    });
+}
+
+# to be called when the job is removed
+sub remove_job {
+    my ($jobid, $type) = @_;
+    my $path = $get_state_file->($jobid, $type);
+    unlink $path;
+}
+
+# checks if the job can be started and sets the state to 'starting'
+# returns 1 if the job can be started, 0 otherwise
+sub starting_job {
+    my ($jobid, $type) = @_;
+
+    # first check unlocked to save time
+    my $state = read_job_state($jobid, $type);
+    return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
+
+    lock_job_state($jobid, $type, sub {
+	my $state = read_job_state($jobid, $type);
+	return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
+
+	my $new_state = {
+	    state => 'starting',
+	    time => time(),
+	};
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($new_state));
+    });
+    return 1;
+}
+
+sub started_job {
+    my ($jobid, $type, $upid, $err) = @_;
+    lock_job_state($jobid, $type, sub {
+	my $state = read_job_state($jobid, $type);
+	return if !defined($state); # job was removed, do not update
+	die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
+
+	my $new_state;
+	if (defined($err)) {
+	    $new_state = {
+		state => 'stopped',
+		msg => $err,
+		time => time(),
+	    };
+	} else {
+	    $new_state = {
+		state => 'started',
+		upid => $upid,
+	    };
+	}
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($new_state));
+    });
+}
+
+# will be called when the job schedule is updated
+sub updated_job_schedule {
+    my ($jobid, $type) = @_;
+    lock_job_state($jobid, $type, sub {
+	my $old_state = read_job_state($jobid, $type) // $default_state;
+
+	$old_state->{updated} = time();
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($old_state));
+    });
+}
+
+sub get_last_runtime {
+    my ($jobid, $type) = @_;
+
+    my $state = read_job_state($jobid, $type) // $default_state;
+
+    return $state->{updated} if defined($state->{updated});
+
+    if (my $upid = $state->{upid}) {
+	my ($task) = PVE::Tools::upid_decode($upid, 1);
+	die "unable to parse worker upid\n" if !$task;
+	return $task->{starttime};
+    }
+
+    return $state->{time} // 0;
+}
+
+sub run_jobs {
+    synchronize_job_states_with_config();
+
+    my $jobs_cfg = cfs_read_file('jobs.cfg');
+    my $nodename = PVE::INotify::nodename();
+
+    foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
+	my $cfg = $jobs_cfg->{ids}->{$id};
+	my $type = $cfg->{type};
+	my $schedule = delete $cfg->{schedule};
+
+	# only schedule local jobs
+	next if defined($cfg->{node}) && $cfg->{node} ne $nodename;
+
+	eval {
+	    update_job_stopped($id, $type);
+	};
+	if (my $err = $@) {
+	    warn "could not update job state, skipping - $err\n";
+	    next;
+	}
+
+	# only schedule enabled jobs
+	next if defined($cfg->{enabled}) && !$cfg->{enabled};
+
+	my $last_run = get_last_runtime($id, $type);
+	my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
+	my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run) // 0;
+
+	if (time() >= $next_sync) {
+	    my $plugin = PVE::Jobs::Plugin->lookup($type);
+	    if (starting_job($id, $type)) {
+		my $upid = eval { $plugin->run($cfg) };
+		if (my $err = $@) {
+		    warn $@ if $@;
+		    started_job($id, $type, undef, $err);
+		} elsif ($upid eq 'OK') { # some jobs return OK immediatly
+		    started_job($id, $type, undef, 'OK');
+		} else {
+		    started_job($id, $type, $upid);
+		}
+	    }
+	}
+    }
+}
+
+# creates and removes statefiles for job configs
+sub synchronize_job_states_with_config {
+    cfs_lock_file('jobs.cfg', undef, sub {
+	my $data = cfs_read_file('jobs.cfg');
+
+	for my $id (keys $data->{ids}->%*) {
+	    my $job = $data->{ids}->{$id};
+	    my $type = $job->{type};
+	    my $jobstate = read_job_state($id, $type);
+	    create_job($id, $type) if !defined($jobstate);
+	}
+
+	PVE::Tools::dir_glob_foreach($state_dir, '(.*?)-(.*).json', sub {
+	    my ($path, $type, $id) = @_;
+
+	    if (!defined($data->{ids}->{$id})) {
+		remove_job($id, $type);
+	    }
+	});
+    });
+    die $@ if $@;
+}
+
+sub setup_dirs {
+    mkdir $state_dir;
+    mkdir $lock_dir;
+}
+
+1;
diff --git a/PVE/Jobs/Makefile b/PVE/Jobs/Makefile
new file mode 100644
index 00000000..6023c3ba
--- /dev/null
+++ b/PVE/Jobs/Makefile
@@ -0,0 +1,16 @@
+include ../../defines.mk
+
+PERLSOURCE =   \
+	Plugin.pm\
+	VZDump.pm
+
+all:
+
+.PHONY: clean
+clean:
+	rm -rf *~
+
+.PHONY: install
+install: ${PERLSOURCE}
+	install -d ${PERLLIBDIR}/PVE/Jobs
+	install -m 0644 ${PERLSOURCE} ${PERLLIBDIR}/PVE/Jobs
diff --git a/PVE/Jobs/Plugin.pm b/PVE/Jobs/Plugin.pm
new file mode 100644
index 00000000..69c31cf2
--- /dev/null
+++ b/PVE/Jobs/Plugin.pm
@@ -0,0 +1,61 @@
+package PVE::Jobs::Plugin;
+
+use strict;
+use warnings;
+
+use PVE::Cluster qw(cfs_register_file);
+
+use base qw(PVE::SectionConfig);
+
+cfs_register_file('jobs.cfg',
+		  sub { __PACKAGE__->parse_config(@_); },
+		  sub { __PACKAGE__->write_config(@_); });
+
+my $defaultData = {
+    propertyList => {
+	type => { description => "Section type." },
+	id => {
+	    description => "The ID of the VZDump job.",
+	    type => 'string',
+	    format => 'pve-configid',
+	},
+	enabled => {
+	    description => "Determines if the job is enabled.",
+	    type => 'boolean',
+	    default => 1,
+	    optional => 1,
+	},
+	schedule => {
+	    description => "Backup schedule. The format is a subset of `systemd` calendar events.",
+	    type => 'string', format => 'pve-calendar-event',
+	    maxLength => 128,
+	},
+    },
+};
+
+sub private {
+    return $defaultData;
+}
+
+sub parse_config {
+    my ($class, $filename, $raw) = @_;
+
+    my $cfg = $class->SUPER::parse_config($filename, $raw);
+
+    foreach my $id (sort keys %{$cfg->{ids}}) {
+	my $data = $cfg->{ids}->{$id};
+
+	$data->{id} = $id;
+	$data->{enabled}  //= 1;
+   }
+
+   return $cfg;
+}
+
+sub run {
+    my ($class, $cfg) = @_;
+    # implement in subclass
+    die "not implemented";
+}
+
+1;
diff --git a/PVE/Jobs/VZDump.pm b/PVE/Jobs/VZDump.pm
new file mode 100644
index 00000000..043b7ace
--- /dev/null
+++ b/PVE/Jobs/VZDump.pm
@@ -0,0 +1,54 @@
+package PVE::Jobs::VZDump;
+
+use strict;
+use warnings;
+
+use PVE::INotify;
+use PVE::VZDump::Common;
+use PVE::API2::VZDump;
+use PVE::Cluster;
+
+use base qw(PVE::Jobs::Plugin);
+
+sub type {
+    return 'vzdump';
+}
+
+my $props = PVE::VZDump::Common::json_config_properties();
+
+sub properties {
+    return $props;
+}
+
+sub options {
+    my $options = {
+	enabled => { optional => 1 },
+	schedule => {},
+    };
+    foreach my $opt (keys %$props) {
+	if ($props->{$opt}->{optional}) {
+	    $options->{$opt} = { optional => 1 };
+	} else {
+	    $options->{$opt} = {};
+	}
+    }
+
+    return $options;
+}
+
+sub run {
+    my ($class, $conf) = @_;
+
+    # remove all non vzdump related options
+    foreach my $opt (keys %$conf) {
+	delete $conf->{$opt} if !defined($props->{$opt});
+    }
+
+    $conf->{quiet} = 1; # do not write to stdout/stderr
+
+    PVE::Cluster::cfs_update(); # refresh vmlist
+
+    return PVE::API2::VZDump->vzdump($conf);
+}
+
+1;
diff --git a/PVE/Makefile b/PVE/Makefile
index 0071fab1..48b85d33 100644
--- a/PVE/Makefile
+++ b/PVE/Makefile
@@ -1,6 +1,6 @@
 include ../defines.mk
 
-SUBDIRS=API2 Status CLI Service Ceph
+SUBDIRS=API2 Status CLI Service Ceph Jobs
 
 PERLSOURCE = 			\
 	API2.pm			\
@@ -11,6 +11,7 @@ PERLSOURCE = 			\
 	CertHelpers.pm		\
 	ExtMetric.pm		\
 	HTTPServer.pm		\
+	Jobs.pm			\
 	NodeConfig.pm		\
 	Report.pm		\
 	VZDump.pm
-- 
2.30.2





  parent reply	other threads:[~2021-11-08 13:08 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-08 13:07 [pve-devel] [PATCH cluster/manager v2] add scheduling daemon for pvesr + vzdump (and more) Dominik Csapak
2021-11-08 13:07 ` [pve-devel] [PATCH cluster v2 1/1] add 'jobs.cfg' to observed files Dominik Csapak
2021-11-09 17:18   ` [pve-devel] applied: " Thomas Lamprecht
2021-11-08 13:07 ` [pve-devel] [PATCH manager v2 1/6] replace systemd timer with pvescheduler daemon Dominik Csapak
2021-11-08 13:07 ` Dominik Csapak [this message]
2021-11-08 13:07 ` [pve-devel] [PATCH manager v2 3/6] pvescheduler: run jobs from jobs.cfg Dominik Csapak
2021-11-08 13:07 ` [pve-devel] [PATCH manager v2 4/6] api/backup: refactor string for all days Dominik Csapak
2021-11-08 13:07 ` [pve-devel] [PATCH manager v2 5/6] api/backup: handle new vzdump jobs Dominik Csapak
2021-11-08 13:07 ` [pve-devel] [PATCH manager v2 6/6] ui: dc/backup: show id+schedule instead of dow+starttime Dominik Csapak
2021-11-09 14:17 ` [pve-devel] [PATCH cluster/manager v2] add scheduling daemon for pvesr + vzdump (and more) Dylan Whyte
2021-11-10  9:38   ` Dominik Csapak
2021-11-10 11:05     ` Thomas Lamprecht
2021-11-09 16:55 ` Aaron Lauterer
2021-11-10 20:42 ` [pve-devel] applied-series: " Thomas Lamprecht
2021-11-11 11:35 ` [pve-devel] " Fabian Ebner
2021-11-11 11:46   ` Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20211108130758.160914-4-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal