public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pdm-devel] [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup
@ 2024-12-20 14:24 Lukas Wagner
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 2/5] pdm-api-types: derive Clone for TaskListItem Lukas Wagner
                   ` (4 more replies)
  0 siblings, 5 replies; 6+ messages in thread
From: Lukas Wagner @ 2024-12-20 14:24 UTC (permalink / raw)
  To: pdm-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/bin/proxmox-datacenter-privileged-api.rs | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/server/src/bin/proxmox-datacenter-privileged-api.rs b/server/src/bin/proxmox-datacenter-privileged-api.rs
index d4e8817..f8e137f 100644
--- a/server/src/bin/proxmox-datacenter-privileged-api.rs
+++ b/server/src/bin/proxmox-datacenter-privileged-api.rs
@@ -80,6 +80,13 @@ fn create_directories() -> Result<(), Error> {
         0o755,
     )?;
 
+    pdm_config::setup::mkdir_perms(
+        pdm_buildcfg::PDM_CACHE_DIR,
+        api_user.uid,
+        api_user.gid,
+        0o755,
+    )?;
+
     pdm_config::setup::mkdir_perms(
         concat!(pdm_buildcfg::PDM_LOG_DIR_M!(), "/api"),
         api_user.uid,
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pdm-devel] [RFC proxmox-datacenter-manager 2/5] pdm-api-types: derive Clone for TaskListItem
  2024-12-20 14:24 [pdm-devel] [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Lukas Wagner
@ 2024-12-20 14:24 ` Lukas Wagner
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 3/5] pdm-api-types: derive Eq, PartialEq and Hash for RemoteUpid Lukas Wagner
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2024-12-20 14:24 UTC (permalink / raw)
  To: pdm-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 lib/pdm-api-types/src/lib.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/pdm-api-types/src/lib.rs b/lib/pdm-api-types/src/lib.rs
index 05a0fa7..4e6fe0a 100644
--- a/lib/pdm-api-types/src/lib.rs
+++ b/lib/pdm-api-types/src/lib.rs
@@ -237,7 +237,7 @@ pub enum TaskStateType {
         upid: { schema: UPID::API_SCHEMA },
     },
 )]
-#[derive(Serialize, Deserialize)]
+#[derive(Clone, Serialize, Deserialize)]
 /// Task properties.
 pub struct TaskListItem {
     pub upid: String,
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pdm-devel] [RFC proxmox-datacenter-manager 3/5] pdm-api-types: derive Eq, PartialEq and Hash for RemoteUpid
  2024-12-20 14:24 [pdm-devel] [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Lukas Wagner
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 2/5] pdm-api-types: derive Clone for TaskListItem Lukas Wagner
@ 2024-12-20 14:24 ` Lukas Wagner
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 4/5] api: add caching 'remote-tasks' API endpoint Lukas Wagner
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2024-12-20 14:24 UTC (permalink / raw)
  To: pdm-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 lib/pdm-api-types/src/lib.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/pdm-api-types/src/lib.rs b/lib/pdm-api-types/src/lib.rs
index 4e6fe0a..bf312cc 100644
--- a/lib/pdm-api-types/src/lib.rs
+++ b/lib/pdm-api-types/src/lib.rs
@@ -339,7 +339,7 @@ pub const REMOTE_UPID_SCHEMA: Schema = StringSchema::new("A remote UPID")
     .min_length("C!UPID:N:12345678:12345678:12345678:::".len())
     .schema();
 
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, Eq, PartialEq, Hash)]
 pub struct RemoteUpid {
     remote: String,
     /// This is usually a pve upid, but may also be a pbs upid, they have distinct formats.
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pdm-devel] [RFC proxmox-datacenter-manager 4/5] api: add caching 'remote-tasks' API endpoint
  2024-12-20 14:24 [pdm-devel] [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Lukas Wagner
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 2/5] pdm-api-types: derive Clone for TaskListItem Lukas Wagner
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 3/5] pdm-api-types: derive Eq, PartialEq and Hash for RemoteUpid Lukas Wagner
@ 2024-12-20 14:24 ` Lukas Wagner
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 5/5] server: pve api: track new tasks created by PDM Lukas Wagner
  2025-01-13 11:28 ` [pdm-devel] applied: [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Dietmar Maurer
  4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2024-12-20 14:24 UTC (permalink / raw)
  To: pdm-devel

This commit adds a new endpoint that allows us to receive a list of all
tasks from all remotes. The avoid repeated polling of remotes, the
results are cached, with a default max_age of 5 minutes (could be
potentially much longer, just a value I picked up without thinking too
much about it to avoid bikeshedding myself.)

This commit also includes code from Dominik's previous RFC for task
tracking. Since the task tracking part is closely interwoven with the
caching part, I opted to include these here.

When PDM starts a task on a remote, a tracked task will be regulary
polled by a separate worker track status changes. Tracked tasks will be
folded into the (cached) task API reponse introduced in this commit.

  [the task tracking part:]
Originally-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---

Notes:
    -- Did not include PBS support for now, but this should not be hard to
    add later
    
    -- The approach I used here invalidates the cache if a tracked remote
    task finishes. The reason for that is that the API
    `nodes/.../tasks/<upid>/status` which is polled as part of the tracking
    process does not give us the `endtime` of the task,
    `endtime` is only included in the API that gives us the full task list
    (`/nodes/.../tasks`).
    Not super elegant, but it should work. We can always improve this
    later.

 server/src/api/mod.rs          |   2 +
 server/src/api/pve/tasks.rs    |   2 +-
 server/src/api/remote_tasks.rs |  39 +++
 server/src/lib.rs              |   1 +
 server/src/task_cache.rs       | 453 +++++++++++++++++++++++++++++++++
 5 files changed, 496 insertions(+), 1 deletion(-)
 create mode 100644 server/src/api/remote_tasks.rs
 create mode 100644 server/src/task_cache.rs

diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs
index 1d918f9..6c4831b 100644
--- a/server/src/api/mod.rs
+++ b/server/src/api/mod.rs
@@ -12,6 +12,7 @@ pub mod config;
 pub mod nodes;
 pub mod pbs;
 pub mod pve;
+pub mod remote_tasks;
 pub mod remotes;
 pub mod resources;
 mod rrd_common;
@@ -26,6 +27,7 @@ const SUBDIRS: SubdirMap = &sorted!([
     ("remotes", &remotes::ROUTER),
     ("resources", &resources::ROUTER),
     ("nodes", &nodes::ROUTER),
+    ("remote-tasks", &remote_tasks::ROUTER),
     ("version", &Router::new().get(&API_METHOD_VERSION)),
 ]);
 
diff --git a/server/src/api/pve/tasks.rs b/server/src/api/pve/tasks.rs
index 2e65836..0371fa0 100644
--- a/server/src/api/pve/tasks.rs
+++ b/server/src/api/pve/tasks.rs
@@ -118,7 +118,7 @@ async fn stop_task(remote: String, upid: RemoteUpid) -> Result<(), Error> {
     returns: { type: pve_api_types::TaskStatus },
 )]
 /// Get the status of a task from a Proxmox VE instance.
-async fn get_task_status(
+pub async fn get_task_status(
     remote: String,
     upid: RemoteUpid,
     wait: bool,
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
new file mode 100644
index 0000000..d15a5d0
--- /dev/null
+++ b/server/src/api/remote_tasks.rs
@@ -0,0 +1,39 @@
+use anyhow::Error;
+use pdm_api_types::TaskListItem;
+use proxmox_router::{list_subdirs_api_method, Permission, Router, SubdirMap};
+use proxmox_schema::api;
+use proxmox_sortable_macro::sortable;
+
+use crate::task_cache;
+
+pub const ROUTER: Router = Router::new()
+    .get(&list_subdirs_api_method!(SUBDIRS))
+    .subdirs(SUBDIRS);
+
+#[sortable]
+const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIST_TASKS)),]);
+
+#[api(
+    // FIXME:: see list-like API calls in resource routers, we probably want more fine-grained
+    // checks..
+    access: {
+        permission: &Permission::Anybody,
+    },
+    input: {
+        properties: {
+            "max-age": {
+                type: Integer,
+                optional: true,
+                // TODO: sensible default max-age
+                default: 300,
+                description: "Maximum age of cached task data",
+            }
+        },
+    },
+)]
+/// Get the list of tasks for all remotes.
+async fn list_tasks(max_age: i64) -> Result<Vec<TaskListItem>, Error> {
+    let tasks = task_cache::get_tasks(max_age).await?;
+
+    Ok(tasks)
+}
diff --git a/server/src/lib.rs b/server/src/lib.rs
index ae0910d..12dc912 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -7,6 +7,7 @@ pub mod context;
 pub mod env;
 pub mod metric_collection;
 pub mod resource_cache;
+pub mod task_cache;
 pub mod task_utils;
 
 pub mod connection;
diff --git a/server/src/task_cache.rs b/server/src/task_cache.rs
new file mode 100644
index 0000000..dab29fe
--- /dev/null
+++ b/server/src/task_cache.rs
@@ -0,0 +1,453 @@
+use std::{
+    collections::{HashMap, HashSet},
+    fs::File,
+    path::{Path, PathBuf},
+    sync::{LazyLock, RwLock},
+    time::Duration,
+};
+
+use anyhow::Error;
+use pdm_api_types::{
+    remotes::{Remote, RemoteType},
+    RemoteUpid, TaskListItem,
+};
+use proxmox_sys::fs::CreateOptions;
+use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
+use serde::{Deserialize, Serialize};
+use tokio::task::JoinHandle;
+
+use crate::{api::pve, task_utils};
+
+/// Get tasks for all remotes
+pub async fn get_tasks(max_age: i64) -> Result<Vec<TaskListItem>, Error> {
+    let (remotes, _) = pdm_config::remotes::config()?;
+
+    let mut all_tasks = Vec::new();
+
+    let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
+    let mut cache = TaskCache::new(cache_path)?;
+
+    // Force a refresh for all tasks of a remote if a task is finished.
+    // Not super nice, but saves us from persisting finished tasks. Also,
+    // the /nodes/<node>/tasks/<upid>/status endpoint does not return
+    // a task's endtime, which is only returned by
+    // /nodes/<node>/tasks...
+    // Room for improvements in the future.
+    invalidate_cache_for_finished_tasks(&mut cache);
+
+    for (remote_name, remote) in &remotes.sections {
+        let now = proxmox_time::epoch_i64();
+
+        if let Some(tasks) = cache.get_tasks(remote_name.as_str(), now, max_age) {
+            // Data in cache is recent enough and has not been invalidated.
+            all_tasks.extend(tasks);
+        } else {
+            let tasks = fetch_tasks(&remote).await?;
+            cache.set_tasks(remote_name.as_str(), tasks.clone(), now);
+
+            all_tasks.extend(tasks);
+        }
+    }
+
+    let mut returned_tasks = add_running_tasks(all_tasks)?;
+    returned_tasks.sort_by(|a, b| a.starttime.cmp(&b.starttime));
+
+    // We don't need to wait for this task to finish
+    tokio::task::spawn_blocking(move || {
+        if let Err(e) = cache.save() {
+            log::error!("could not save task cache: {e}");
+        }
+    });
+
+    Ok(returned_tasks)
+}
+
+/// Fetch tasks (active and finished) from a remote
+async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
+    let mut tasks = Vec::new();
+
+    match remote.ty {
+        RemoteType::Pve => {
+            let client = pve::connect(remote)?;
+
+            // N+1 requests - we could use /cluster/tasks, but that one
+            // only gives a limited task history
+            for node in client.list_nodes().await? {
+                let mut params = ListTasks::default();
+                // Include running tasks
+                params.source = Some(ListTasksSource::All);
+                // TODO: How much task history do we want? Right now we just hard-code it
+                // to 7 days.
+                params.since = Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60);
+
+                let list = client.get_task_list(&node.node, params).await?;
+                let mapped = map_tasks(list, &remote.id)?;
+
+                tasks.extend(mapped);
+            }
+        }
+        RemoteType::Pbs => {
+            // TODO: Add code for PBS
+        }
+    }
+
+    Ok(tasks)
+}
+
+/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
+fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
+    let mut mapped = Vec::new();
+
+    for task in tasks {
+        let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+        mapped.push(TaskListItem {
+            upid: remote_upid.to_string(),
+            node: task.node,
+            pid: task.pid,
+            pstart: task.pstart as u64,
+            starttime: task.starttime,
+            worker_type: task.ty,
+            worker_id: Some(task.id),
+            user: task.user,
+            endtime: task.endtime,
+            status: task.status,
+        })
+    }
+
+    Ok(mapped)
+}
+
+/// Drops the cached task list of a remote for all finished tasks.
+///
+/// We use this to force a refresh so that we get the full task
+/// info (including `endtime`) in the next API call.
+fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
+    let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
+
+    // If a task is finished, we force a refresh for the remote - otherwise
+    // we don't get the 'endtime' for the task.
+    for task in finished.drain() {
+        cache.invalidate_cache_for_remote(task.remote());
+    }
+}
+
+/// Supplement the list of tasks that we received from the remote with
+/// the tasks that were started by PDM and are currently running.
+fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
+    let mut returned_tasks = Vec::new();
+
+    let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
+    for task in cached_tasks {
+        let remote_upid = task.upid.parse()?;
+
+        if running_tasks.contains(&remote_upid) {
+            if task.endtime.is_some() {
+                // Task is finished but we still think it is running ->
+                // Drop it from RUNNING_FOREIGN_TASKS
+                running_tasks.remove(&remote_upid);
+
+                // No need to put it in FINISHED_TASKS, since we already
+                // got its state recently enough (we know the status and endtime)
+            }
+        } else {
+            returned_tasks.push(task);
+        }
+    }
+
+    for task in running_tasks.iter() {
+        let upid: PveUpid = task.upid.parse()?;
+        returned_tasks.push(TaskListItem {
+            upid: task.to_string(),
+            node: upid.node,
+            pid: upid.pid as i64,
+            pstart: upid.pstart,
+            starttime: upid.starttime,
+            worker_type: upid.worker_type,
+            worker_id: upid.worker_id,
+            user: upid.auth_id,
+            endtime: None,
+            status: None,
+        });
+    }
+
+    Ok(returned_tasks)
+}
+
+/// A cache for fetched remote tasks.
+struct TaskCache {
+    /// Cache entries
+    content: TaskCacheContent,
+
+    /// Entries that were added or updated - these will be persistet
+    /// when `save` is called.
+    new_or_updated: TaskCacheContent,
+
+    /// Cache entries were changed/removed.
+    dirty: bool,
+
+    /// File-location at which the cached tasks are stored.
+    cachefile_path: PathBuf,
+}
+
+impl TaskCache {
+    /// Create a new tasks cache instance by loading
+    /// the cache from disk.
+    fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
+        Ok(Self {
+            content: Self::load_content()?,
+            new_or_updated: Default::default(),
+            dirty: false,
+            cachefile_path,
+        })
+    }
+
+    /// Load the task cache contents from disk.
+    fn load_content() -> Result<TaskCacheContent, Error> {
+        let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
+        let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
+
+        let content = if let Some(content) = content {
+            serde_json::from_str(&content)?
+        } else {
+            Default::default()
+        };
+
+        Ok(content)
+    }
+
+    /// Get path for the cache's lockfile.
+    fn lockfile_path(&self) -> PathBuf {
+        let mut path = self.cachefile_path.clone();
+        path.set_extension("lock");
+        path
+    }
+
+    /// Persist the task cache
+    ///
+    /// This method requests an exclusive lock for the task cache lockfile.
+    fn save(&mut self) -> Result<(), Error> {
+        // if we have not updated anything, we don't have to update the cache file
+        if !self.dirty {
+            return Ok(());
+        }
+
+        let _guard = self.lock(Duration::from_secs(5))?;
+
+        // Read content again, in case somebody has changed it in the meanwhile
+        let mut content = Self::load_content()?;
+
+        for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
+            if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
+                // Only update entry if nobody else has updated it in the meanwhile
+                if existing_entry.timestamp < entry.timestamp {
+                    *existing_entry = entry;
+                }
+            } else {
+                content.remote_tasks.insert(remote_name, entry);
+            }
+        }
+
+        let bytes = serde_json::to_vec_pretty(&content)?;
+
+        let api_uid = pdm_config::api_user()?.uid;
+        let api_gid = pdm_config::api_group()?.gid;
+
+        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+        proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
+
+        self.dirty = false;
+
+        Ok(())
+    }
+
+    // Update task data for a given remote.
+    fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
+        self.dirty = true;
+        self.new_or_updated
+            .remote_tasks
+            .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
+    }
+
+    // Get task data for a given remote.
+    fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
+        if let Some(entry) = self.content.remote_tasks.get(remote) {
+            if (entry.timestamp + max_age) < now {
+                return None;
+            }
+
+            Some(entry.tasks.clone())
+        } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
+            if (entry.timestamp + max_age) < now {
+                return None;
+            }
+            Some(entry.tasks.clone())
+        } else {
+            None
+        }
+    }
+
+    // Invalidate cache for a given remote.
+    fn invalidate_cache_for_remote(&mut self, remote: &str) {
+        self.dirty = true;
+        self.content.remote_tasks.remove(remote);
+    }
+
+    // Lock the cache for modification.
+    //
+    // While the cache is locked, other users can still read the cache
+    // without a lock, since the cache file is replaced atomically
+    // when updating.
+    fn lock(&self, duration: Duration) -> Result<File, Error> {
+        let api_uid = pdm_config::api_user()?.uid;
+        let api_gid = pdm_config::api_group()?.gid;
+
+        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+        proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
+    }
+}
+
+#[derive(Serialize, Deserialize)]
+/// Per-remote entry in the task cache.
+struct TaskCacheEntry {
+    timestamp: i64,
+    tasks: Vec<TaskListItem>,
+}
+
+#[derive(Default, Serialize, Deserialize)]
+/// Content of the task cache file.
+struct TaskCacheContent {
+    remote_tasks: HashMap<String, TaskCacheEntry>,
+}
+
+/// Interval at which tracked tasks are polled
+const RUNNING_CHECK_INTERVAL_S: u64 = 10;
+
+/// Tasks which were started by PDM and are still running
+static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
+/// Tasks which were started by PDM and w
+static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
+
+fn init() -> RwLock<HashSet<RemoteUpid>> {
+    RwLock::new(HashSet::new())
+}
+
+/// Insert a remote UPID into the running list
+///
+/// If it is the first entry in the list, a background task is started to track its state
+///
+/// Returns the [`JoinHandle`] if a task was started.
+///
+/// panics on a poisoned mutex
+pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
+    let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
+
+    // the call inserting the first task in the list needs to start the checking task
+    let need_start_task = tasks.is_empty();
+    tasks.insert(task);
+
+    if !need_start_task {
+        return None;
+    }
+    drop(tasks);
+
+    Some(tokio::spawn(async move {
+        loop {
+            let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
+            tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+
+            let finished_tasks = get_finished_tasks().await;
+
+            // skip iteration if we still have tasks, just not finished ones
+            if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
+                continue;
+            }
+
+            let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
+            // we either have finished tasks, or the running task list was empty
+            let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
+
+            for (upid, _status) in finished_tasks {
+                if set.remove(&upid) {
+                    finished.insert(upid);
+                } else {
+                    // someone else removed & persisted the task in the meantime
+                }
+            }
+
+            // if no task remains, end the current task
+            // it will be restarted by the next caller that inserts one
+            if set.is_empty() {
+                return;
+            }
+        }
+    }))
+}
+
+/// Get a list of running foreign tasks
+///
+/// panics on a poisoned mutex
+pub fn get_running_tasks() -> Vec<RemoteUpid> {
+    RUNNING_FOREIGN_TASKS
+        .read()
+        .unwrap()
+        .iter()
+        .cloned()
+        .collect()
+}
+
+/// Checks all current saved UPIDs if they're still running, and if not,
+/// returns their upids + status
+///
+/// panics on a poisoned mutex
+pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
+    let mut finished = Vec::new();
+    let config = match pdm_config::remotes::config() {
+        Ok((config, _)) => config,
+        Err(err) => {
+            log::error!("could not open remotes config: {err}");
+            return Vec::new();
+        }
+    };
+    for task in get_running_tasks() {
+        match config.get(task.remote()) {
+            Some(remote) => match remote.ty {
+                RemoteType::Pve => {
+                    let status = match crate::api::pve::tasks::get_task_status(
+                        remote.id.clone(),
+                        task.clone(),
+                        false,
+                    )
+                    .await
+                    {
+                        Ok(status) => status,
+                        Err(err) => {
+                            log::error!("could not get status from remote: {err}");
+                            finished.push((task, "could not get status".to_string()));
+                            continue;
+                        }
+                    };
+                    if let Some(status) = status.exitstatus {
+                        finished.push((task, status.to_string()));
+                    }
+                }
+                RemoteType::Pbs => {
+                    let _client = match crate::pbs_client::connect(remote) {
+                        Ok(client) => client,
+                        Err(err) => {
+                            log::error!("could not get status from remote: {err}");
+                            finished.push((task, "could not get status".to_string()));
+                            continue;
+                        }
+                    };
+                    // FIXME implement get task status
+                    finished.push((task, "unknown state".to_string()));
+                }
+            },
+            None => finished.push((task, "unknown remote".to_string())),
+        }
+    }
+
+    finished
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pdm-devel] [RFC proxmox-datacenter-manager 5/5] server: pve api: track new tasks created by PDM
  2024-12-20 14:24 [pdm-devel] [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Lukas Wagner
                   ` (2 preceding siblings ...)
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 4/5] api: add caching 'remote-tasks' API endpoint Lukas Wagner
@ 2024-12-20 14:24 ` Lukas Wagner
  2025-01-13 11:28 ` [pdm-devel] applied: [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Dietmar Maurer
  4 siblings, 0 replies; 6+ messages in thread
From: Lukas Wagner @ 2024-12-20 14:24 UTC (permalink / raw)
  To: pdm-devel

When we start a new tasks from PDM, we add the task's UPID to the set of
tracked tasks. Theses tasks will be polled regularly to track their
status.

Originally-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/api/pve/mod.rs | 33 ++++++++++++++++++++++-----------
 1 file changed, 22 insertions(+), 11 deletions(-)

diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index 6b16b43..a232d59 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -24,12 +24,13 @@ use pdm_api_types::{
 
 use pve_api_types::client::PveClient;
 use pve_api_types::{
-    ClusterResourceKind, ClusterResourceType, QemuMigratePreconditions, StartQemuMigrationType,
+    ClusterResourceKind, ClusterResourceType, PveUpid, QemuMigratePreconditions,
+    StartQemuMigrationType,
 };
 
 use super::resources::{map_pve_lxc, map_pve_node, map_pve_qemu, map_pve_storage};
 
-use crate::connection;
+use crate::{connection, task_cache};
 
 mod node;
 mod rrddata;
@@ -115,6 +116,13 @@ const QEMU_VM_SUBDIRS: SubdirMap = &sorted!([
 
 const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES);
 
+// converts a remote + PveUpid into a RemoteUpid and starts tracking it
+fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+    let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
+    task_cache::track_running_task(remote_upid.clone());
+    Ok(remote_upid)
+}
+
 pub(crate) fn get_remote<'a>(
     config: &'a SectionConfigData<Remote>,
     id: &str,
@@ -507,7 +515,7 @@ pub async fn qemu_start(
         .start_qemu_async(&node, vmid, Default::default())
         .await?;
 
-    (remote, upid.to_string()).try_into()
+    new_remote_upid(remote, upid)
 }
 
 #[api(
@@ -540,7 +548,7 @@ pub async fn qemu_stop(
 
     let upid = pve.stop_qemu_async(&node, vmid, Default::default()).await?;
 
-    (remote, upid.to_string()).try_into()
+    new_remote_upid(remote, upid)
 }
 
 #[api(
@@ -575,7 +583,8 @@ pub async fn qemu_shutdown(
         .shutdown_qemu_async(&node, vmid, Default::default())
         .await?;
 
-    (remote, upid.to_string()).try_into()
+    //(remote, upid.to_string()).try_into()
+    new_remote_upid(remote, upid)
 }
 
 fn check_guest_delete_perms(
@@ -684,7 +693,8 @@ pub async fn qemu_migrate(
         with_local_disks,
     };
     let upid = pve.migrate_qemu(&node, vmid, params).await?;
-    (remote, upid.to_string()).try_into()
+    //(remote, upid.to_string()).try_into()
+    new_remote_upid(remote, upid)
 }
 
 #[api(
@@ -962,7 +972,7 @@ pub async fn lxc_start(
 
     let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
 
-    (remote, upid.to_string()).try_into()
+    new_remote_upid(remote, upid)
 }
 
 #[api(
@@ -995,7 +1005,7 @@ pub async fn lxc_stop(
 
     let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
 
-    (remote, upid.to_string()).try_into()
+    new_remote_upid(remote, upid)
 }
 
 #[api(
@@ -1030,7 +1040,7 @@ pub async fn lxc_shutdown(
         .shutdown_lxc_async(&node, vmid, Default::default())
         .await?;
 
-    (remote, upid.to_string()).try_into()
+    new_remote_upid(remote, upid)
 }
 
 #[api(
@@ -1109,7 +1119,8 @@ pub async fn lxc_migrate(
         timeout,
     };
     let upid = pve.migrate_lxc(&node, vmid, params).await?;
-    (remote, upid.to_string()).try_into()
+
+    new_remote_upid(remote, upid)
 }
 
 #[api(
@@ -1255,7 +1266,7 @@ pub async fn lxc_remote_migrate(
     log::info!("migrating vm {vmid} of node {node:?}");
     let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
 
-    (source, upid.to_string()).try_into()
+    new_remote_upid(source, upid)
 }
 
 #[api(
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 6+ messages in thread

* [pdm-devel] applied: [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup
  2024-12-20 14:24 [pdm-devel] [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Lukas Wagner
                   ` (3 preceding siblings ...)
  2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 5/5] server: pve api: track new tasks created by PDM Lukas Wagner
@ 2025-01-13 11:28 ` Dietmar Maurer
  4 siblings, 0 replies; 6+ messages in thread
From: Dietmar Maurer @ 2025-01-13 11:28 UTC (permalink / raw)
  To: Proxmox Datacenter Manager development discussion, Lukas Wagner

applied whole series


_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2025-01-13 11:28 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-12-20 14:24 [pdm-devel] [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Lukas Wagner
2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 2/5] pdm-api-types: derive Clone for TaskListItem Lukas Wagner
2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 3/5] pdm-api-types: derive Eq, PartialEq and Hash for RemoteUpid Lukas Wagner
2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 4/5] api: add caching 'remote-tasks' API endpoint Lukas Wagner
2024-12-20 14:24 ` [pdm-devel] [RFC proxmox-datacenter-manager 5/5] server: pve api: track new tasks created by PDM Lukas Wagner
2025-01-13 11:28 ` [pdm-devel] applied: [RFC proxmox-datacenter-manager 1/5] privileged-api: create /var/cache/proxmox-datacenter-manager/ on startup Dietmar Maurer

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal