public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH datacenter-manager 1/2] server: add jobstate module from PBS
Date: Thu, 27 Nov 2025 16:37:07 +0100	[thread overview]
Message-ID: <20251127153708.393210-2-l.wagner@proxmox.com> (raw)
In-Reply-To: <20251127153708.393210-1-l.wagner@proxmox.com>

This is a general helper module that helps with keeping track of
periodic tasks. Thus far we have not really needed it in PDM (although
it could be useful for remote update fetching), but this changes with
task log rotation which will be added next.

The module was copied as is and then adapted where needed to make it
work with PDM; some types and paths were changed. In the future this
should be broken out and added to some shared crate (could be a new
micro crate if no better place comes to mind), but given the time
constraints a copy is good enough for now. This module will probably
remain pretty unchanged in the near future, so the risk of diverging
implementations should be pretty small.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/jobstate.rs | 299 +++++++++++++++++++++++++++++++++++++++++
 server/src/lib.rs      |   1 +
 2 files changed, 300 insertions(+)
 create mode 100644 server/src/jobstate.rs

diff --git a/server/src/jobstate.rs b/server/src/jobstate.rs
new file mode 100644
index 00000000..c8e0d766
--- /dev/null
+++ b/server/src/jobstate.rs
@@ -0,0 +1,299 @@
+// FIXME: This was copied from PBS - move this to a shared crate and then make both
+// use the same implementation. Maybe use this opportunity to
+//  - not use hard-coded paths
+//  - add tests
+//  - audit the public API in general and improve if needed
+
+use std::path::{Path, PathBuf};
+
+use anyhow::{bail, format_err, Error};
+use nix::sys::stat::Mode;
+use serde::{Deserialize, Serialize};
+
+use proxmox_product_config::ApiLockGuard;
+use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
+use proxmox_sys::fs::{create_path, file_read_optional_string, replace_file};
+use proxmox_time::CalendarEvent;
+
+use pbs_api_types::JobScheduleStatus;
+
+use pdm_api_types::UPID;
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
+/// Represents the State of a specific Job
+pub enum JobState {
+    /// A job was created at 'time', but never started/finished
+    Created { time: i64 },
+    /// The Job was last started in 'upid',
+    Started { upid: String },
+    /// The Job was last started in 'upid', which finished with 'state', and was last updated at 'updated'
+    Finished {
+        upid: String,
+        state: TaskState,
+        updated: Option<i64>,
+    },
+}
+
+/// Represents a Job and holds the correct lock
+pub struct Job {
+    jobtype: String,
+    jobname: String,
+    /// The State of the job
+    pub state: JobState,
+    _lock: ApiLockGuard,
+}
+
+const JOB_STATE_BASEDIR: &str = concat!(pdm_buildcfg::PDM_STATE_DIR_M!(), "/jobstates");
+
+/// Create jobstate stat dir with correct permission
+pub fn create_jobstate_dir() -> Result<(), Error> {
+    let mode = Mode::from_bits_truncate(0o0750);
+    let opts = proxmox_product_config::default_create_options().perm(mode);
+
+    create_path(JOB_STATE_BASEDIR, Some(opts), Some(opts))
+        .map_err(|err: Error| format_err!("unable to create job state dir - {err}"))?;
+
+    Ok(())
+}
+
+fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
+    let mut path = PathBuf::from(JOB_STATE_BASEDIR);
+    path.push(format!("{jobtype}-{jobname}.json"));
+    path
+}
+
+fn get_lock<P>(path: P) -> Result<ApiLockGuard, Error>
+where
+    P: AsRef<Path>,
+{
+    let mut path = path.as_ref().to_path_buf();
+    path.set_extension("lck");
+    proxmox_product_config::open_api_lockfile(path, None, true)
+}
+
+/// Removes the statefile of a job, this is useful if we delete a job
+pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut path = get_path(jobtype, jobname);
+    let _lock = get_lock(&path)?;
+    if let Err(err) = std::fs::remove_file(&path) {
+        if err.kind() != std::io::ErrorKind::NotFound {
+            bail!("cannot remove statefile for {jobtype} - {jobname}: {err}");
+        }
+    }
+    path.set_extension("lck");
+    if let Err(err) = std::fs::remove_file(&path) {
+        if err.kind() != std::io::ErrorKind::NotFound {
+            bail!("cannot remove lockfile for {jobtype} - {jobname}: {err}");
+        }
+    }
+    Ok(())
+}
+
+/// Creates the statefile with the state 'Created'
+/// overwrites if it exists already
+pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut job = Job::new(jobtype, jobname)?;
+    job.write_state()
+}
+
+/// Tries to update the state file with the current time
+/// if the job is currently running, does nothing.
+/// Intended for use when the schedule changes.
+pub fn update_job_last_run_time(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut job = match Job::new(jobtype, jobname) {
+        Ok(job) => job,
+        Err(_) => return Ok(()), // was locked (running), so do not update
+    };
+    let time = proxmox_time::epoch_i64();
+
+    job.state = match JobState::load(jobtype, jobname)? {
+        JobState::Created { .. } => JobState::Created { time },
+        JobState::Started { .. } => return Ok(()), // currently running (without lock?)
+        JobState::Finished {
+            upid,
+            state,
+            updated: _,
+        } => JobState::Finished {
+            upid,
+            state,
+            updated: Some(time),
+        },
+    };
+    job.write_state()
+}
+
+/// Returns the last run time of a job by reading the statefile
+/// Note that this is not locked
+pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
+    match JobState::load(jobtype, jobname)? {
+        JobState::Created { time } => Ok(time),
+        JobState::Finished {
+            updated: Some(time),
+            ..
+        } => Ok(time),
+        JobState::Started { upid }
+        | JobState::Finished {
+            upid,
+            state: _,
+            updated: None,
+        } => {
+            let upid: UPID = upid
+                .parse()
+                .map_err(|err| format_err!("could not parse upid from state: {err}"))?;
+            Ok(upid.starttime)
+        }
+    }
+}
+
+impl JobState {
+    /// Loads and deserializes the jobstate from type and name.
+    /// When the loaded state indicates a started UPID,
+    /// we go and check if it has already stopped, and
+    /// returning the correct state.
+    ///
+    /// This does not update the state in the file.
+    pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+        if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
+            match serde_json::from_str(&state)? {
+                JobState::Started { upid } => {
+                    let parsed: UPID = upid
+                        .parse()
+                        .map_err(|err| format_err!("error parsing upid: {err}"))?;
+
+                    if !worker_is_active_local(&parsed) {
+                        let state = upid_read_status(&parsed).unwrap_or(TaskState::Unknown {
+                            endtime: parsed.starttime,
+                        });
+
+                        Ok(JobState::Finished {
+                            upid,
+                            state,
+                            updated: None,
+                        })
+                    } else {
+                        Ok(JobState::Started { upid })
+                    }
+                }
+                other => Ok(other),
+            }
+        } else {
+            Ok(JobState::Created {
+                time: proxmox_time::epoch_i64() - 30,
+            })
+        }
+    }
+}
+
+impl Job {
+    /// Creates a new instance of a job with the correct lock held
+    /// (will be hold until the job is dropped again).
+    ///
+    /// This does not load the state from the file, to do that,
+    /// 'load' must be called
+    pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+        let path = get_path(jobtype, jobname);
+
+        let _lock = get_lock(path)?;
+
+        Ok(Self {
+            jobtype: jobtype.to_string(),
+            jobname: jobname.to_string(),
+            state: JobState::Created {
+                time: proxmox_time::epoch_i64(),
+            },
+            _lock,
+        })
+    }
+
+    /// Start the job and update the statefile accordingly
+    /// Fails if the job was already started
+    pub fn start(&mut self, upid: &str) -> Result<(), Error> {
+        if let JobState::Started { .. } = self.state {
+            bail!("cannot start job that is started!");
+        }
+
+        self.state = JobState::Started {
+            upid: upid.to_string(),
+        };
+
+        self.write_state()
+    }
+
+    /// Finish the job and update the statefile accordingly with the given taskstate
+    /// Fails if the job was not yet started
+    pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
+        let upid = match &self.state {
+            JobState::Created { .. } => bail!("cannot finish when not started"),
+            JobState::Started { upid } => upid,
+            JobState::Finished { upid, .. } => upid,
+        }
+        .to_string();
+
+        self.state = JobState::Finished {
+            upid,
+            state,
+            updated: None,
+        };
+
+        self.write_state()
+    }
+
+    pub fn jobtype(&self) -> &str {
+        &self.jobtype
+    }
+
+    pub fn jobname(&self) -> &str {
+        &self.jobname
+    }
+
+    fn write_state(&mut self) -> Result<(), Error> {
+        let serialized = serde_json::to_string(&self.state)?;
+        let path = get_path(&self.jobtype, &self.jobname);
+        let options = proxmox_product_config::default_create_options();
+
+        replace_file(path, serialized.as_bytes(), options, false)
+    }
+}
+
+pub fn compute_schedule_status(
+    job_state: &JobState,
+    schedule: Option<&str>,
+) -> Result<JobScheduleStatus, Error> {
+    let (upid, endtime, state, last) = match job_state {
+        JobState::Created { time } => (None, None, None, *time),
+        JobState::Started { upid } => {
+            let parsed_upid: UPID = upid.parse()?;
+            (Some(upid), None, None, parsed_upid.starttime)
+        }
+        JobState::Finished {
+            upid,
+            state,
+            updated,
+        } => {
+            let last = updated.unwrap_or_else(|| state.endtime());
+            (
+                Some(upid),
+                Some(state.endtime()),
+                Some(state.to_string()),
+                last,
+            )
+        }
+    };
+
+    let mut status = JobScheduleStatus {
+        last_run_upid: upid.map(String::from),
+        last_run_state: state,
+        last_run_endtime: endtime,
+        ..Default::default()
+    };
+
+    if let Some(schedule) = schedule {
+        if let Ok(event) = schedule.parse::<CalendarEvent>() {
+            // ignore errors
+            status.next_run = event.compute_next_event(last).unwrap_or(None);
+        }
+    }
+
+    Ok(status)
+}
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 0f25aa71..bd8660a7 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -5,6 +5,7 @@ pub mod api;
 pub mod auth;
 pub mod context;
 pub mod env;
+pub mod jobstate;
 pub mod metric_collection;
 pub mod parallel_fetcher;
 pub mod remote_cache;
-- 
2.47.3



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  reply	other threads:[~2025-11-27 15:37 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-27 15:37 [pdm-devel] [PATCH datacenter-manager 0/2] access/auth/task-log rotation Lukas Wagner
2025-11-27 15:37 ` Lukas Wagner [this message]
2025-11-27 15:37 ` [pdm-devel] [PATCH datacenter-manager 2/2] server: add task log, auth log and access log rotation Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20251127153708.393210-2-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal