public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid
@ 2021-09-23 10:13 Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
                   ` (5 more replies)
  0 siblings, 6 replies; 8+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

Because we want to move worker_task.rs into proxmox-rest-server crate.
---
 pbs-api-types/src/upid.rs        | 13 +++++++------
 src/api2/admin/datastore.rs      |  6 +++---
 src/api2/backup/environment.rs   |  2 +-
 src/api2/backup/mod.rs           |  2 +-
 src/api2/config/acme.rs          |  6 +++---
 src/api2/config/datastore.rs     |  4 ++--
 src/api2/node/apt.rs             |  4 ++--
 src/api2/node/certificates.rs    |  6 +++---
 src/api2/node/disks/directory.rs |  4 ++--
 src/api2/node/disks/mod.rs       |  4 ++--
 src/api2/node/disks/zfs.rs       |  4 ++--
 src/api2/node/mod.rs             |  2 +-
 src/api2/node/network.rs         |  2 +-
 src/api2/node/services.rs        |  2 +-
 src/api2/node/tasks.rs           | 15 +++++++++------
 src/api2/pull.rs                 |  4 ++--
 src/api2/reader/mod.rs           |  2 +-
 src/api2/tape/backup.rs          |  4 ++--
 src/api2/tape/drive.rs           |  2 +-
 src/api2/tape/restore.rs         |  2 +-
 src/bin/proxmox-backup-proxy.rs  |  2 +-
 src/server/gc_job.rs             |  2 +-
 src/server/prune_job.rs          |  2 +-
 src/server/verify_job.rs         |  2 +-
 src/server/worker_task.rs        |  8 ++++----
 25 files changed, 55 insertions(+), 51 deletions(-)

diff --git a/pbs-api-types/src/upid.rs b/pbs-api-types/src/upid.rs
index ba23a646..29135bca 100644
--- a/pbs-api-types/src/upid.rs
+++ b/pbs-api-types/src/upid.rs
@@ -8,8 +8,6 @@ use proxmox::api::schema::{ApiStringFormat, ApiType, Schema, StringSchema, Array
 use proxmox::const_regex;
 use proxmox::sys::linux::procfs;
 
-use crate::Authid;
-
 /// Unique Process/Task Identifier
 ///
 /// We use this to uniquely identify worker task. UPIDs have a short
@@ -37,7 +35,7 @@ pub struct UPID {
     /// Worker ID (arbitrary ASCII string)
     pub worker_id: Option<String>,
     /// The authenticated entity who started the task
-    pub auth_id: Authid,
+    pub auth_id: String,
     /// The node name.
     pub node: String,
 }
@@ -71,7 +69,7 @@ impl UPID {
     pub fn new(
         worker_type: &str,
         worker_id: Option<String>,
-        auth_id: Authid,
+        auth_id: String,
     ) -> Result<Self, Error> {
 
         let pid = unsafe { libc::getpid() };
@@ -82,6 +80,10 @@ impl UPID {
             bail!("illegal characters in worker type '{}'", worker_type);
         }
 
+        if auth_id.contains(bad) {
+            bail!("illegal characters in auth_id '{}'", auth_id);
+        }
+
         static WORKER_TASK_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
 
         let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst);
@@ -184,7 +186,7 @@ pub struct TaskListItem {
     /// Worker ID (arbitrary ASCII string)
     pub worker_id: Option<String>,
     /// The authenticated entity who started the task
-    pub user: Authid,
+    pub user: String,
     /// The task end time (Epoch)
     #[serde(skip_serializing_if="Option::is_none")]
     pub endtime: Option<i64>,
@@ -200,4 +202,3 @@ pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
         &TaskListItem::API_SCHEMA,
     ).schema(),
 };
-
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 0b14dfbf..fbb93f35 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -722,7 +722,7 @@ pub fn verify(
     let upid_str = WorkerTask::new_thread(
         worker_type,
         Some(worker_id),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
@@ -862,7 +862,7 @@ pub fn prune(
 
 
     // We use a WorkerTask just to have a task log, but run synchrounously
-    let worker = WorkerTask::new("prune", Some(worker_id), auth_id, true)?;
+    let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?;
 
     if keep_all {
         worker.log("No prune selection - keeping all files.");
@@ -957,7 +957,7 @@ pub fn prune_datastore(
     let upid_str = WorkerTask::new_thread(
         "prune",
         Some(store.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| crate::server::prune_datastore(
             worker.clone(),
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 129ebd2b..306f91ee 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -525,7 +525,7 @@ impl BackupEnvironment {
         WorkerTask::new_thread(
             "verify",
             Some(worker_id),
-            self.auth_id.clone(),
+            self.auth_id.to_string(),
             false,
             move |worker| {
                 worker.log("Automatically verifying newly added snapshot");
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index 8f51f314..c14f19a4 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -166,7 +166,7 @@ async move {
     if !is_new { bail!("backup directory already exists."); }
 
 
-    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.clone(), true, move |worker| {
+    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.to_string(), true, move |worker| {
         let mut env = BackupEnvironment::new(
             env_type, auth_id, worker.clone(), datastore, backup_dir);
 
diff --git a/src/api2/config/acme.rs b/src/api2/config/acme.rs
index 593b79a3..564cafae 100644
--- a/src/api2/config/acme.rs
+++ b/src/api2/config/acme.rs
@@ -215,7 +215,7 @@ fn register_account(
     WorkerTask::spawn(
         "acme-register",
         Some(name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         true,
         move |worker| async move {
             let mut client = AcmeClient::new(directory);
@@ -275,7 +275,7 @@ pub fn update_account(
     WorkerTask::spawn(
         "acme-update",
         Some(name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         true,
         move |_worker| async move {
             let data = match contact {
@@ -320,7 +320,7 @@ pub fn deactivate_account(
     WorkerTask::spawn(
         "acme-deactivate",
         Some(name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         true,
         move |worker| async move {
             match AcmeClient::load(&name)
diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
index c6036fc3..0e6529f8 100644
--- a/src/api2/config/datastore.rs
+++ b/src/api2/config/datastore.rs
@@ -119,9 +119,9 @@ pub fn create_datastore(
     WorkerTask::new_thread(
         "create-datastore",
         Some(config.name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         to_stdout,
-        move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
+       move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
     )
 }
 
diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
index 8f4bc691..f02920c0 100644
--- a/src/api2/node/apt.rs
+++ b/src/api2/node/apt.rs
@@ -14,7 +14,7 @@ use proxmox_apt::repositories::{
 use proxmox_http::ProxyConfig;
 
 use pbs_api_types::{
-    Authid, APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
+    APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
 };
 
@@ -154,7 +154,7 @@ pub fn apt_update_database(
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
     let upid_str = WorkerTask::new_thread("aptupdate", None, auth_id, to_stdout, move |worker| {
diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs
index 82fa028d..7b31861e 100644
--- a/src/api2/node/certificates.rs
+++ b/src/api2/node/certificates.rs
@@ -11,7 +11,7 @@ use proxmox::api::router::SubdirMap;
 use proxmox::api::{api, Permission, Router, RpcEnvironment};
 use proxmox::list_subdirs_api_method;
 
-use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_MODIFY};
+use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY};
 use pbs_buildcfg::configdir;
 use pbs_tools::cert;
 
@@ -530,7 +530,7 @@ fn spawn_certificate_worker(
 
     let (node_config, _digest) = crate::config::node::config()?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     WorkerTask::spawn(name, None, auth_id, true, move |worker| async move {
         if let Some(cert) = order_certificate(worker, &node_config).await? {
@@ -559,7 +559,7 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result<String, Error
 
     let cert_pem = get_certificate_pem()?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     WorkerTask::spawn(
         "acme-revoke-cert",
diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
index 38809dcf..2f4a738d 100644
--- a/src/api2/node/disks/directory.rs
+++ b/src/api2/node/disks/directory.rs
@@ -7,7 +7,7 @@ use proxmox::api::section_config::SectionConfigData;
 use proxmox::api::router::Router;
 
 use pbs_api_types::{
-    Authid, DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
+    DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
     DATASTORE_SCHEMA, UPID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
 };
 
@@ -146,7 +146,7 @@ pub fn create_datastore_disk(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     let info = get_disk_usage_info(&disk, true)?;
 
diff --git a/src/api2/node/disks/mod.rs b/src/api2/node/disks/mod.rs
index 67f8f63a..b4001a54 100644
--- a/src/api2/node/disks/mod.rs
+++ b/src/api2/node/disks/mod.rs
@@ -7,7 +7,7 @@ use proxmox::{sortable, identity};
 use proxmox::{list_subdirs_api_method};
 
 use pbs_api_types::{
-    Authid, UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
+    UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
 };
 
@@ -144,7 +144,7 @@ pub fn initialize_disk(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     let info = get_disk_usage_info(&disk, true)?;
 
diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
index 14c2cfec..9fe0dac4 100644
--- a/src/api2/node/disks/zfs.rs
+++ b/src/api2/node/disks/zfs.rs
@@ -8,7 +8,7 @@ use proxmox::api::{
 use proxmox::api::router::Router;
 
 use pbs_api_types::{
-    Authid, ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
+    ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
     NODE_SCHEMA, ZPOOL_NAME_SCHEMA, DATASTORE_SCHEMA, DISK_ARRAY_SCHEMA,
     DISK_LIST_SCHEMA, ZFS_ASHIFT_SCHEMA, UPID_SCHEMA,
     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
@@ -168,7 +168,7 @@ pub fn create_zpool(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     let add_datastore = add_datastore.unwrap_or(false);
 
diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
index 9b31d595..8e357311 100644
--- a/src/api2/node/mod.rs
+++ b/src/api2/node/mod.rs
@@ -146,7 +146,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
     let upid = WorkerTask::spawn(
         "termproxy",
         None,
-        auth_id,
+        auth_id.to_string(),
         false,
         move |worker| async move {
             // move inside the worker so that it survives and does not close the port
diff --git a/src/api2/node/network.rs b/src/api2/node/network.rs
index 351fd11c..0fde9f2a 100644
--- a/src/api2/node/network.rs
+++ b/src/api2/node/network.rs
@@ -703,7 +703,7 @@ pub async fn reload_network_config(
 
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
-    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id, true, |_worker| async {
+    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id.to_string(), true, |_worker| async {
 
         let _ = std::fs::rename(network::NETWORK_INTERFACES_NEW_FILENAME, network::NETWORK_INTERFACES_FILENAME);
 
diff --git a/src/api2/node/services.rs b/src/api2/node/services.rs
index 25edd1b6..6c757f43 100644
--- a/src/api2/node/services.rs
+++ b/src/api2/node/services.rs
@@ -195,7 +195,7 @@ fn run_service_command(service: &str, cmd: &str, auth_id: Authid) -> Result<Valu
     let upid = WorkerTask::new_thread(
         &workerid,
         Some(service.clone()),
-        auth_id,
+        auth_id.to_string(),
         false,
         move |_worker| {
 
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 7066f889..169a3d4d 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -99,8 +99,8 @@ fn check_job_store(upid: &UPID, store: &str) -> bool {
 }
 
 fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
-    let task_auth_id = &upid.auth_id;
-    if auth_id == task_auth_id
+    let task_auth_id: Authid = upid.auth_id.parse()?;
+    if auth_id == &task_auth_id
         || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
         // task owner can always read
         Ok(())
@@ -200,6 +200,8 @@ async fn get_task_status(
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     check_task_access(&auth_id, &upid)?;
 
+    let task_auth_id: Authid = upid.auth_id.parse()?;
+
     let mut result = json!({
         "upid": param["upid"],
         "node": upid.node,
@@ -208,11 +210,11 @@ async fn get_task_status(
         "starttime": upid.starttime,
         "type": upid.worker_type,
         "id": upid.worker_id,
-        "user": upid.auth_id.user(),
+        "user": task_auth_id.user(),
     });
 
-    if upid.auth_id.is_token() {
-        result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str());
+    if task_auth_id.is_token() {
+        result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
     }
 
     if crate::server::worker_is_active(&upid).await? {
@@ -344,10 +346,11 @@ fn stop_task(
 
     let upid = extract_upid(&param)?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     if auth_id != upid.auth_id {
         let user_info = CachedUserInfo::new()?;
+        let auth_id: Authid = auth_id.parse()?;
         user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
     }
 
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 1eb86ea3..e631920f 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -81,7 +81,7 @@ pub fn do_sync_job(
     let upid_str = WorkerTask::spawn(
         &worker_type,
         Some(job_id.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| async move {
 
@@ -183,7 +183,7 @@ async fn pull (
     let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
 
     // fixme: set to_stdout to false?
-    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.clone(), true, move |worker| async move {
+    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
 
         worker.log(format!("sync datastore '{}' start", store));
 
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 821e83c4..fada952c 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -143,7 +143,7 @@ fn upgrade_to_backup_reader_protocol(
 
         let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
 
-        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
+        WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move {
             let _guard = _guard;
 
             let mut env = ReaderEnvironment::new(
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index 6b533820..fadbfa3d 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -195,7 +195,7 @@ pub fn do_tape_backup_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(job_id.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             job.start(&worker.upid().to_string())?;
@@ -376,7 +376,7 @@ pub fn backup(
     let upid_str = WorkerTask::new_thread(
         "tape-backup",
         Some(job_id),
-        auth_id,
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             let _drive_lock = drive_lock; // keep lock guard
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index 58f49f43..10aa6842 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -87,7 +87,7 @@ where
     let (config, _digest) = pbs_config::drive::config()?;
     let lock_guard = lock_tape_device(&config, &drive)?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
     WorkerTask::new_thread(worker_type, job_id, auth_id, to_stdout, move |worker| {
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 4ab60e8f..7739d1a4 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -275,7 +275,7 @@ pub fn restore(
     let upid_str = WorkerTask::new_thread(
         "tape-restore",
         Some(taskid),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             let _drive_lock = drive_lock; // keep lock guard
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 6734525b..518054bf 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -745,7 +745,7 @@ async fn schedule_task_log_rotate() {
     if let Err(err) = WorkerTask::new_thread(
         worker_type,
         None,
-        Authid::root_auth_id().clone(),
+        Authid::root_auth_id().to_string(),
         false,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 665b9631..317f4a36 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -26,7 +26,7 @@ pub fn do_garbage_collection_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(store.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index f298a74c..8d971a1c 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -105,7 +105,7 @@ pub fn do_prune_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(job.jobname().to_string()),
-        auth_id.clone(),
+        auth_id.to_string(),
         false,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index 7f6d73ff..6005b706 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -36,7 +36,7 @@ pub fn do_verification_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(job_id.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index a74b18e1..92ab50d7 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -18,7 +18,7 @@ use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
 
 use pbs_buildcfg;
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use pbs_api_types::{Authid, UPID};
+use pbs_api_types::UPID;
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
@@ -589,7 +589,7 @@ struct WorkerTaskData {
 
 impl WorkerTask {
 
-    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
+    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
         let upid = UPID::new(worker_type, worker_id, auth_id)?;
         let task_id = upid.task_id;
 
@@ -640,7 +640,7 @@ impl WorkerTask {
     pub fn spawn<F, T>(
         worker_type: &str,
         worker_id: Option<String>,
-        auth_id: Authid,
+        auth_id: String,
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
@@ -662,7 +662,7 @@ impl WorkerTask {
     pub fn new_thread<F>(
         worker_type: &str,
         worker_id: Option<String>,
-        auth_id: Authid,
+        auth_id: String,
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
-- 
2.30.2





^ permalink raw reply	[flat|nested] 8+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 11:36   ` Fabian Grünbichler
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4 Dietmar Maurer
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 8+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

And application now needs to call init_worker_tasks() before using
worker tasks.

Notable changes:
- need to call  init_worker_tasks() before using worker tasks.
- create_task_log_dirs() ís called inside init_worker_tasks()
- removed UpidExt trait
- use atomic_open_or_create_file()
- remove pbs_config and pbs_buildcfg dependency
---
 src/api2/node/tasks.rs          |   6 +-
 src/bin/proxmox-backup-api.rs   |   7 +-
 src/bin/proxmox-backup-proxy.rs |   5 +-
 src/server/mod.rs               |   3 -
 src/server/upid.rs              |  18 --
 src/server/worker_task.rs       | 475 +++++++++++++++++++-------------
 6 files changed, 290 insertions(+), 224 deletions(-)
 delete mode 100644 src/server/upid.rs

diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 169a3d4d..df4673a1 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -16,7 +16,7 @@ use pbs_api_types::{
 };
 
 use crate::api2::pull::check_pull_privs;
-use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
+use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
 use pbs_config::CachedUserInfo;
 
 // matches respective job execution privileges
@@ -220,7 +220,7 @@ async fn get_task_status(
     if crate::server::worker_is_active(&upid).await? {
         result["status"] = Value::from("running");
     } else {
-        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
+        let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
         result["status"] = Value::from("stopped");
         result["exitstatus"] = Value::from(exitstatus.to_string());
     };
@@ -287,7 +287,7 @@ async fn read_task_log(
 
     let mut count: u64 = 0;
 
-    let path = upid.log_path();
+    let path = upid_log_path(&upid)?;
 
     let file = File::open(path)?;
 
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 9ad10260..9901b85d 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
         bail!("unable to inititialize syslog - {}", err);
     }
 
-    server::create_task_log_dirs()?;
-
     config::create_configdir()?;
 
     config::update_self_signed_cert(false)?;
@@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
 
     config.enable_auth_log(
         pbs_buildcfg::API_AUTH_LOG_FN,
-        Some(dir_opts),
-        Some(file_opts),
+        Some(dir_opts.clone()),
+        Some(file_opts.clone()),
         &mut commando_sock,
     )?;
 
 
     let rest_server = RestServer::new(config);
+    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     // http server future:
     let server = daemon::create_daemon(
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 518054bf..5d8ed189 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
 
     config.enable_auth_log(
         pbs_buildcfg::API_AUTH_LOG_FN,
-        Some(dir_opts),
-        Some(file_opts),
+        Some(dir_opts.clone()),
+        Some(file_opts.clone()),
         &mut commando_sock,
     )?;
 
     let rest_server = RestServer::new(config);
+    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
 
diff --git a/src/server/mod.rs b/src/server/mod.rs
index a7dcee67..77320da6 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
     ctrl_sock_from_pid(*PID)
 }
 
-mod upid;
-pub use upid::*;
-
 mod worker_task;
 pub use worker_task::*;
 
diff --git a/src/server/upid.rs b/src/server/upid.rs
deleted file mode 100644
index 70a3e3fb..00000000
--- a/src/server/upid.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-pub trait UPIDExt: private::Sealed {
-    /// Returns the absolute path to the task log file
-    fn log_path(&self) -> std::path::PathBuf;
-}
-
-mod private {
-    pub trait Sealed {}
-    impl Sealed for  pbs_api_types::UPID {}
-}
-
-impl UPIDExt for  pbs_api_types::UPID {
-    fn log_path(&self) -> std::path::PathBuf {
-        let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
-        path.push(format!("{:02X}", self.pstart % 256));
-        path.push(self.to_string());
-        path
-    }
-}
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 92ab50d7..191d8a44 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -1,5 +1,6 @@
 use std::collections::{HashMap, VecDeque};
 use std::fs::File;
+use std::path::PathBuf;
 use std::io::{Read, Write, BufRead, BufReader};
 use std::panic::UnwindSafe;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -11,27 +12,267 @@ use lazy_static::lazy_static;
 use serde_json::{json, Value};
 use serde::{Serialize, Deserialize};
 use tokio::sync::oneshot;
+use nix::fcntl::OFlag;
+use once_cell::sync::OnceCell;
 
 use proxmox::sys::linux::procfs;
 use proxmox::try_block;
-use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
+use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
 
-use pbs_buildcfg;
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
 use pbs_api_types::UPID;
-use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
-use super::UPIDExt;
+struct TaskListLockGuard(File);
 
-macro_rules! taskdir {
-    ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+struct WorkerTaskSetup {
+    file_opts: CreateOptions,
+    taskdir: PathBuf,
+    task_lock_fn: PathBuf,
+    active_tasks_fn: PathBuf,
+    task_index_fn: PathBuf,
+    task_archive_fn: PathBuf,
+}
+
+static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
+
+fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
+    WORKER_TASK_SETUP.get()
+        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
+}
+
+impl WorkerTaskSetup {
+
+    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
+
+        let mut taskdir = basedir.clone();
+        taskdir.push("tasks");
+
+        let mut task_lock_fn = taskdir.clone();
+        task_lock_fn.push(".active.lock");
+
+        let mut active_tasks_fn = taskdir.clone();
+        active_tasks_fn.push("active");
+
+        let mut task_index_fn = taskdir.clone();
+        task_index_fn.push("index");
+
+        let mut task_archive_fn = taskdir.clone();
+        task_archive_fn.push("archive");
+
+        Self {
+            file_opts,
+            taskdir,
+            task_lock_fn,
+            active_tasks_fn,
+            task_index_fn,
+            task_archive_fn,
+        }
+    }
+
+    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        let timeout = std::time::Duration::new(10, 0);
+
+        let file = proxmox::tools::fs::open_file_locked(
+            &self.task_lock_fn,
+            timeout,
+            exclusive,
+            options,
+        )?;
+
+        Ok(TaskListLockGuard(file))
+    }
+
+    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
+        let mut path = self.taskdir.clone();
+        path.push(format!("{:02X}", upid.pstart % 256));
+        path.push(upid.to_string());
+        path
+    }
+
+    // atomically read/update the task list, update status of finished tasks
+    // new_upid is added to the list when specified.
+    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
+
+        let lock = self.lock_task_list_files(true)?;
+
+        // TODO remove with 1.x
+        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
+        let had_index_file = !finish_list.is_empty();
+
+        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+        // clippy doesn't quite catch this!
+        #[allow(clippy::unnecessary_filter_map)]
+        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
+            .into_iter()
+            .filter_map(|info| {
+                if info.state.is_some() {
+                    // this can happen when the active file still includes finished tasks
+                    finish_list.push(info);
+                    return None;
+                }
+
+                if !worker_is_active_local(&info.upid) {
+                    // println!("Detected stopped task '{}'", &info.upid_str);
+                    let now = proxmox::tools::time::epoch_i64();
+                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+                    finish_list.push(TaskListInfo {
+                        upid: info.upid,
+                        upid_str: info.upid_str,
+                        state: Some(status)
+                    });
+                    return None;
+                }
+
+                Some(info)
+            }).collect();
+
+        if let Some(upid) = new_upid {
+            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+        }
+
+        let active_raw = render_task_list(&active_list);
+
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        replace_file(
+            &self.active_tasks_fn,
+            active_raw.as_bytes(),
+            options,
+        )?;
+
+        finish_list.sort_unstable_by(|a, b| {
+            match (&a.state, &b.state) {
+                (Some(s1), Some(s2)) => s1.cmp(&s2),
+                (Some(_), None) => std::cmp::Ordering::Less,
+                (None, Some(_)) => std::cmp::Ordering::Greater,
+                _ => a.upid.starttime.cmp(&b.upid.starttime),
+            }
+        });
+
+        if !finish_list.is_empty() {
+            let options =  self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+            let mut writer = atomic_open_or_create_file(
+                &self.task_archive_fn,
+                OFlag::O_APPEND | OFlag::O_RDWR,
+                &[],
+                options,
+            )?;
+            for info in &finish_list {
+                writer.write_all(render_task_line(&info).as_bytes())?;
+            }
+        }
+
+        // TODO Remove with 1.x
+        // for compatibility, if we had an INDEX file, we do not need it anymore
+        if had_index_file {
+            let _ = nix::unistd::unlink(&self.task_index_fn);
+        }
+
+        drop(lock);
+
+        Ok(())
+    }
+
+    // Create task log directory with correct permissions
+    fn create_task_log_dirs(&self) -> Result<(), Error> {
+
+        try_block!({
+            let dir_opts = self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
+            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+            Ok(())
+        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+    }
+}
+
+/// Initialize the WorkerTask library
+pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
+    let setup = WorkerTaskSetup::new(basedir, file_opts);
+    setup.create_task_log_dirs()?;
+    WORKER_TASK_SETUP.set(setup)
+        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
+}
+
+/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
+/// rotates it if it is
+pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let _lock = setup.lock_task_list_files(true)?;
+
+    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
+            .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+    logrotate.rotate(size_threshold, None, max_files)
+}
+
+
+/// Path to the worker log file
+pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
+    let setup = worker_task_setup()?;
+    Ok(setup.log_path(upid))
+}
+
+/// Read endtime (time of last log line) and exitstatus from task log file
+/// If there is not a single line with at valid datetime, we assume the
+/// starttime to be the endtime
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let mut status = TaskState::Unknown { endtime: upid.starttime };
+
+    let path = setup.log_path(upid);
+
+    let mut file = File::open(path)?;
+
+    /// speedup - only read tail
+    use std::io::Seek;
+    use std::io::SeekFrom;
+    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
+
+    let mut data = Vec::with_capacity(8192);
+    file.read_to_end(&mut data)?;
+
+    // strip newlines at the end of the task logs
+    while data.last() == Some(&b'\n') {
+        data.pop();
+    }
+
+    let last_line = match data.iter().rposition(|c| *c == b'\n') {
+        Some(start) if data.len() > (start+1) => &data[start+1..],
+        Some(_) => &data, // should not happen, since we removed all trailing newlines
+        None => &data,
+    };
+
+    let last_line = std::str::from_utf8(last_line)
+        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
+
+    let mut iter = last_line.splitn(2, ": ");
+    if let Some(time_str) = iter.next() {
+        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+            // set the endtime even if we cannot parse the state
+            status = TaskState::Unknown { endtime };
+            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
+                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
+                    status = state;
+                }
+            }
+        }
+    }
+
+    Ok(status)
 }
-pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
 
 lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
@@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
     }
 }
 
-/// Create task log directory with correct permissions
-pub fn create_task_log_dirs() -> Result<(), Error> {
-
-    try_block!({
-        let backup_user = pbs_config::backup_user()?;
-        let opts = CreateOptions::new()
-            .owner(backup_user.uid)
-            .group(backup_user.gid);
-
-        create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
-        create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
-        create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
-        Ok(())
-    }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
-
-    Ok(())
-}
-
-/// Read endtime (time of last log line) and exitstatus from task log file
-/// If there is not a single line with at valid datetime, we assume the
-/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
-    let mut status = TaskState::Unknown { endtime: upid.starttime };
-
-    let path = upid.log_path();
-
-    let mut file = File::open(path)?;
-
-    /// speedup - only read tail
-    use std::io::Seek;
-    use std::io::SeekFrom;
-    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
-
-    let mut data = Vec::with_capacity(8192);
-    file.read_to_end(&mut data)?;
-
-    // strip newlines at the end of the task logs
-    while data.last() == Some(&b'\n') {
-        data.pop();
-    }
-
-    let last_line = match data.iter().rposition(|c| *c == b'\n') {
-        Some(start) if data.len() > (start+1) => &data[start+1..],
-        Some(_) => &data, // should not happen, since we removed all trailing newlines
-        None => &data,
-    };
-
-    let last_line = std::str::from_utf8(last_line)
-        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
-
-    let mut iter = last_line.splitn(2, ": ");
-    if let Some(time_str) = iter.next() {
-        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
-            // set the endtime even if we cannot parse the state
-            status = TaskState::Unknown { endtime };
-            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
-                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
-                    status = state;
-                }
-            }
-        }
-    }
-
-    Ok(status)
-}
-
 /// Task State
 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
 pub enum TaskState {
@@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
     }
 }
 
-fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
-    open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
-}
-
-/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
-/// rotates it if it is
-pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
-    let _lock = lock_task_list_files(true)?;
-
-    let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
-        .ok_or_else(|| format_err!("could not get archive file names"))?;
-
-    logrotate.rotate(size_threshold, None, max_files)
-}
-
-// atomically read/update the task list, update status of finished tasks
-// new_upid is added to the list when specified.
-fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
-
-    let backup_user = pbs_config::backup_user()?;
-
-    let lock = lock_task_list_files(true)?;
-
-    // TODO remove with 1.x
-    let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
-    let had_index_file = !finish_list.is_empty();
-
-    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
-    // clippy doesn't quite catch this!
-    #[allow(clippy::unnecessary_filter_map)]
-    let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
-        .into_iter()
-        .filter_map(|info| {
-            if info.state.is_some() {
-                // this can happen when the active file still includes finished tasks
-                finish_list.push(info);
-                return None;
-            }
-
-            if !worker_is_active_local(&info.upid) {
-                // println!("Detected stopped task '{}'", &info.upid_str);
-                let now = proxmox::tools::time::epoch_i64();
-                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
-                finish_list.push(TaskListInfo {
-                    upid: info.upid,
-                    upid_str: info.upid_str,
-                    state: Some(status)
-                });
-                return None;
-            }
-
-            Some(info)
-        }).collect();
-
-    if let Some(upid) = new_upid {
-        active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
-    }
-
-    let active_raw = render_task_list(&active_list);
-
-    replace_file(
-        PROXMOX_BACKUP_ACTIVE_TASK_FN,
-        active_raw.as_bytes(),
-        CreateOptions::new()
-            .owner(backup_user.uid)
-            .group(backup_user.gid),
-    )?;
-
-    finish_list.sort_unstable_by(|a, b| {
-        match (&a.state, &b.state) {
-            (Some(s1), Some(s2)) => s1.cmp(&s2),
-            (Some(_), None) => std::cmp::Ordering::Less,
-            (None, Some(_)) => std::cmp::Ordering::Greater,
-            _ => a.upid.starttime.cmp(&b.upid.starttime),
-        }
-    });
-
-    if !finish_list.is_empty() {
-        match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
-            Ok(mut writer) => {
-                for info in &finish_list {
-                    writer.write_all(render_task_line(&info).as_bytes())?;
-                }
-            },
-            Err(err) => bail!("could not write task archive - {}", err),
-        }
-
-        nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
-    }
-
-    // TODO Remove with 1.x
-    // for compatibility, if we had an INDEX file, we do not need it anymore
-    if had_index_file {
-        let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
-    }
-
-    drop(lock);
-
-    Ok(())
-}
-
 fn render_task_line(info: &TaskListInfo) -> String {
     let mut raw = String::new();
     if let Some(status) = &info.state {
@@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
     list: VecDeque<TaskListInfo>,
     end: bool,
     archive: Option<LogRotateFiles>,
-    lock: Option<BackupLockGuard>,
+    lock: Option<TaskListLockGuard>,
 }
 
 impl TaskListInfoIterator {
     pub fn new(active_only: bool) -> Result<Self, Error> {
+
+        let setup = worker_task_setup()?;
+
         let (read_lock, active_list) = {
-            let lock = lock_task_list_files(false)?;
-            let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+            let lock = setup.lock_task_list_files(false)?;
+            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
 
             let needs_update = active_list
                 .iter()
                 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
 
             // TODO remove with 1.x
-            let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
+            let index_exists = setup.task_index_fn.is_file();
 
             if needs_update || index_exists {
                 drop(lock);
-                update_active_workers(None)?;
-                let lock = lock_task_list_files(false)?;
-                let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+                setup.update_active_workers(None)?;
+                let lock = setup.lock_task_list_files(false)?;
+                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
                 (lock, active_list)
             } else {
                 (lock, active_list)
@@ -516,7 +592,7 @@ impl TaskListInfoIterator {
         let archive = if active_only {
             None
         } else {
-            let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
+            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
                 .ok_or_else(|| format_err!("could not get archive file names"))?;
             Some(logrotate.files())
         };
@@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
 /// persistently to files. Task should poll the `abort_requested`
 /// flag, and stop execution when requested.
 pub struct WorkerTask {
+    setup: &'static WorkerTaskSetup,
     upid: UPID,
     data: Mutex<WorkerTaskData>,
     abort_requested: AtomicBool,
@@ -589,17 +666,26 @@ struct WorkerTaskData {
 
 impl WorkerTask {
 
-    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
+    pub fn new(
+        worker_type: &str,
+        worker_id: Option<String>,
+        auth_id: String,
+        to_stdout: bool,
+    ) -> Result<Arc<Self>, Error> {
+
+        let setup = worker_task_setup()?;
+
         let upid = UPID::new(worker_type, worker_id, auth_id)?;
         let task_id = upid.task_id;
 
-        let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
+        let mut path = setup.taskdir.clone();
 
         path.push(format!("{:02X}", upid.pstart & 255));
 
-        let backup_user = pbs_config::backup_user()?;
+        let dir_opts = setup.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
 
-        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
+        create_path(&path, None, Some(dir_opts))?;
 
         path.push(upid.to_string());
 
@@ -608,12 +694,13 @@ impl WorkerTask {
             exclusive: true,
             prefix_time: true,
             read: true,
+            file_opts: setup.file_opts.clone(),
             ..Default::default()
         };
         let logger = FileLogger::new(&path, logger_options)?;
-        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
 
         let worker = Arc::new(Self {
+            setup,
             upid: upid.clone(),
             abort_requested: AtomicBool::new(false),
             data: Mutex::new(WorkerTaskData {
@@ -631,7 +718,7 @@ impl WorkerTask {
             proxmox_rest_server::set_worker_count(hash.len());
         }
 
-        update_active_workers(Some(&upid))?;
+        setup.update_active_workers(Some(&upid))?;
 
         Ok(worker)
     }
@@ -714,7 +801,7 @@ impl WorkerTask {
         self.log(state.result_text());
 
         WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
-        let _ = update_active_workers(None);
+        let _ = self.setup.update_active_workers(None);
         proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
     }
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 8+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate Dietmar Maurer
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 8+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

---
 Cargo.toml                                    |   2 +-
 pbs-api-types/Cargo.toml                      |   2 +-
 pbs-api-types/src/lib.rs                      |  59 ++++-
 pbs-api-types/src/upid.rs                     | 204 ------------------
 pbs-client/Cargo.toml                         |   2 +-
 pbs-config/Cargo.toml                         |   2 +-
 pbs-datastore/Cargo.toml                      |   2 +-
 pbs-fuse-loop/Cargo.toml                      |   2 +-
 pbs-tape/Cargo.toml                           |   2 +-
 pbs-tools/Cargo.toml                          |   2 +-
 proxmox-backup-client/Cargo.toml              |   3 +-
 proxmox-backup-client/src/mount.rs            |   8 +-
 proxmox-file-restore/Cargo.toml               |   3 +-
 proxmox-file-restore/src/block_driver_qemu.rs |   8 +-
 proxmox-rest-server/Cargo.toml                |   2 +-
 proxmox-restore-daemon/Cargo.toml             |   2 +-
 proxmox-systemd/Cargo.toml                    |   2 +-
 proxmox-systemd/src/unit.rs                   |  85 +-------
 pxar-bin/Cargo.toml                           |   2 +-
 src/api2/node/disks/directory.rs              |   4 +-
 src/api2/node/disks/zfs.rs                    |   2 +-
 src/server/worker_task.rs                     |   2 +-
 src/tape/drive/mod.rs                         |   2 +-
 23 files changed, 86 insertions(+), 318 deletions(-)
 delete mode 100644 pbs-api-types/src/upid.rs

diff --git a/Cargo.toml b/Cargo.toml
index 99c56f04..88e8fbd1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
+proxmox = { version = "0.13.4", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
 proxmox-acme-rs = "0.2.1"
 proxmox-apt = "0.7.0"
 proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
diff --git a/pbs-api-types/Cargo.toml b/pbs-api-types/Cargo.toml
index a64d7f0a..878d6417 100644
--- a/pbs-api-types/Cargo.toml
+++ b/pbs-api-types/Cargo.toml
@@ -14,7 +14,7 @@ openssl = "0.10"
 regex = "1.2"
 serde = { version = "1.0", features = ["derive"] }
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "api-macro" ] }
 
 proxmox-systemd = { path = "../proxmox-systemd" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-api-types/src/lib.rs b/pbs-api-types/src/lib.rs
index 6b0246f5..f7521b02 100644
--- a/pbs-api-types/src/lib.rs
+++ b/pbs-api-types/src/lib.rs
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
 use anyhow::bail;
 
 use proxmox::api::api;
-use proxmox::api::schema::{ApiStringFormat, ArraySchema, Schema, StringSchema};
+use proxmox::api::schema::{ApiStringFormat, ApiType, ArraySchema, Schema, StringSchema, ReturnType};
 use proxmox::const_regex;
 use proxmox::{IPRE, IPRE_BRACKET, IPV4OCTET, IPV4RE, IPV6H16, IPV6LS32, IPV6RE};
 
@@ -60,8 +60,7 @@ pub use userid::{PROXMOX_GROUP_ID_SCHEMA, PROXMOX_TOKEN_ID_SCHEMA, PROXMOX_TOKEN
 mod user;
 pub use user::*;
 
-pub mod upid;
-pub use upid::*;
+pub use proxmox::api::upid::*;
 
 mod crypto;
 pub use crypto::{CryptMode, Fingerprint};
@@ -397,3 +396,57 @@ pub enum NodePowerCommand {
     /// Shutdown the server
     Shutdown,
 }
+
+
+#[api()]
+#[derive(Eq, PartialEq, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum TaskStateType {
+    /// Ok
+    OK,
+    /// Warning
+    Warning,
+    /// Error
+    Error,
+    /// Unknown
+    Unknown,
+}
+
+#[api(
+    properties: {
+        upid: { schema: UPID::API_SCHEMA },
+    },
+)]
+#[derive(Serialize, Deserialize)]
+/// Task properties.
+pub struct TaskListItem {
+    pub upid: String,
+    /// The node name where the task is running on.
+    pub node: String,
+    /// The Unix PID
+    pub pid: i64,
+    /// The task start time (Epoch)
+    pub pstart: u64,
+    /// The task start time (Epoch)
+    pub starttime: i64,
+    /// Worker type (arbitrary ASCII string)
+    pub worker_type: String,
+    /// Worker ID (arbitrary ASCII string)
+    pub worker_id: Option<String>,
+    /// The authenticated entity who started the task
+    pub user: String,
+    /// The task end time (Epoch)
+    #[serde(skip_serializing_if="Option::is_none")]
+    pub endtime: Option<i64>,
+    /// Task end status
+    #[serde(skip_serializing_if="Option::is_none")]
+    pub status: Option<String>,
+}
+
+pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
+    optional: false,
+    schema: &ArraySchema::new(
+        "A list of tasks.",
+        &TaskListItem::API_SCHEMA,
+    ).schema(),
+};
diff --git a/pbs-api-types/src/upid.rs b/pbs-api-types/src/upid.rs
deleted file mode 100644
index 29135bca..00000000
--- a/pbs-api-types/src/upid.rs
+++ /dev/null
@@ -1,204 +0,0 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-use anyhow::{bail, Error};
-use serde::{Deserialize, Serialize};
-
-use proxmox::api::api;
-use proxmox::api::schema::{ApiStringFormat, ApiType, Schema, StringSchema, ArraySchema, ReturnType};
-use proxmox::const_regex;
-use proxmox::sys::linux::procfs;
-
-/// Unique Process/Task Identifier
-///
-/// We use this to uniquely identify worker task. UPIDs have a short
-/// string repesentaion, which gives additional information about the
-/// type of the task. for example:
-/// ```text
-/// UPID:{node}:{pid}:{pstart}:{task_id}:{starttime}:{worker_type}:{worker_id}:{userid}:
-/// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam:
-/// ```
-/// Please note that we use tokio, so a single thread can run multiple
-/// tasks.
-// #[api] - manually implemented API type
-#[derive(Debug, Clone)]
-pub struct UPID {
-    /// The Unix PID
-    pub pid: libc::pid_t,
-    /// The Unix process start time from `/proc/pid/stat`
-    pub pstart: u64,
-    /// The task start time (Epoch)
-    pub starttime: i64,
-    /// The task ID (inside the process/thread)
-    pub task_id: usize,
-    /// Worker type (arbitrary ASCII string)
-    pub worker_type: String,
-    /// Worker ID (arbitrary ASCII string)
-    pub worker_id: Option<String>,
-    /// The authenticated entity who started the task
-    pub auth_id: String,
-    /// The node name.
-    pub node: String,
-}
-
-proxmox::forward_serialize_to_display!(UPID);
-proxmox::forward_deserialize_to_from_str!(UPID);
-
-const_regex! {
-    pub PROXMOX_UPID_REGEX = concat!(
-        r"^UPID:(?P<node>[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?):(?P<pid>[0-9A-Fa-f]{8}):",
-        r"(?P<pstart>[0-9A-Fa-f]{8,9}):(?P<task_id>[0-9A-Fa-f]{8,16}):(?P<starttime>[0-9A-Fa-f]{8}):",
-        r"(?P<wtype>[^:\s]+):(?P<wid>[^:\s]*):(?P<authid>[^:\s]+):$"
-    );
-}
-
-pub const PROXMOX_UPID_FORMAT: ApiStringFormat =
-    ApiStringFormat::Pattern(&PROXMOX_UPID_REGEX);
-
-pub const UPID_SCHEMA: Schema = StringSchema::new("Unique Process/Task Identifier")
-    .min_length("UPID:N:12345678:12345678:12345678:::".len())
-    .max_length(128) // arbitrary
-    .format(&PROXMOX_UPID_FORMAT)
-    .schema();
-
-impl ApiType for UPID {
-    const API_SCHEMA: Schema = UPID_SCHEMA;
-}
-
-impl UPID {
-    /// Create a new UPID
-    pub fn new(
-        worker_type: &str,
-        worker_id: Option<String>,
-        auth_id: String,
-    ) -> Result<Self, Error> {
-
-        let pid = unsafe { libc::getpid() };
-
-        let bad: &[_] = &['/', ':', ' '];
-
-        if worker_type.contains(bad) {
-            bail!("illegal characters in worker type '{}'", worker_type);
-        }
-
-        if auth_id.contains(bad) {
-            bail!("illegal characters in auth_id '{}'", auth_id);
-        }
-
-        static WORKER_TASK_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
-
-        let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst);
-
-        Ok(UPID {
-            pid,
-            pstart: procfs::PidStat::read_from_pid(nix::unistd::Pid::from_raw(pid))?.starttime,
-            starttime: proxmox::tools::time::epoch_i64(),
-            task_id,
-            worker_type: worker_type.to_owned(),
-            worker_id,
-            auth_id,
-            node: proxmox::tools::nodename().to_owned(),
-        })
-    }
-}
-
-
-impl std::str::FromStr for UPID {
-    type Err = Error;
-
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
-        if let Some(cap) = PROXMOX_UPID_REGEX.captures(s) {
-
-            let worker_id = if cap["wid"].is_empty() {
-                None
-            } else {
-                let wid = proxmox_systemd::unescape_unit(&cap["wid"])?;
-                Some(wid)
-            };
-
-            Ok(UPID {
-                pid: i32::from_str_radix(&cap["pid"], 16).unwrap(),
-                pstart: u64::from_str_radix(&cap["pstart"], 16).unwrap(),
-                starttime: i64::from_str_radix(&cap["starttime"], 16).unwrap(),
-                task_id: usize::from_str_radix(&cap["task_id"], 16).unwrap(),
-                worker_type: cap["wtype"].to_string(),
-                worker_id,
-                auth_id: cap["authid"].parse()?,
-                node: cap["node"].to_string(),
-            })
-        } else {
-            bail!("unable to parse UPID '{}'", s);
-        }
-
-    }
-}
-
-impl std::fmt::Display for UPID {
-
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-
-        let wid = if let Some(ref id) = self.worker_id {
-            proxmox_systemd::escape_unit(id, false)
-        } else {
-            String::new()
-        };
-
-        // Note: pstart can be > 32bit if uptime > 497 days, so this can result in
-        // more that 8 characters for pstart
-
-        write!(f, "UPID:{}:{:08X}:{:08X}:{:08X}:{:08X}:{}:{}:{}:",
-               self.node, self.pid, self.pstart, self.task_id, self.starttime, self.worker_type, wid, self.auth_id)
-    }
-}
-
-#[api()]
-#[derive(Eq, PartialEq, Debug, Serialize, Deserialize)]
-#[serde(rename_all = "lowercase")]
-pub enum TaskStateType {
-    /// Ok
-    OK,
-    /// Warning
-    Warning,
-    /// Error
-    Error,
-    /// Unknown
-    Unknown,
-}
-
-#[api(
-    properties: {
-        upid: { schema: UPID::API_SCHEMA },
-    },
-)]
-#[derive(Serialize, Deserialize)]
-/// Task properties.
-pub struct TaskListItem {
-    pub upid: String,
-    /// The node name where the task is running on.
-    pub node: String,
-    /// The Unix PID
-    pub pid: i64,
-    /// The task start time (Epoch)
-    pub pstart: u64,
-    /// The task start time (Epoch)
-    pub starttime: i64,
-    /// Worker type (arbitrary ASCII string)
-    pub worker_type: String,
-    /// Worker ID (arbitrary ASCII string)
-    pub worker_id: Option<String>,
-    /// The authenticated entity who started the task
-    pub user: String,
-    /// The task end time (Epoch)
-    #[serde(skip_serializing_if="Option::is_none")]
-    pub endtime: Option<i64>,
-    /// Task end status
-    #[serde(skip_serializing_if="Option::is_none")]
-    pub status: Option<String>,
-}
-
-pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
-    optional: false,
-    schema: &ArraySchema::new(
-        "A list of tasks.",
-        &TaskListItem::API_SCHEMA,
-    ).schema(),
-};
diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml
index 965282bc..dc5d8928 100644
--- a/pbs-client/Cargo.toml
+++ b/pbs-client/Cargo.toml
@@ -28,7 +28,7 @@ tower-service = "0.3.0"
 xdg = "2.2"
 
 pathpatterns = "0.1.2"
-proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "cli" ] }
 proxmox-fuse = "0.1.1"
 proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
diff --git a/pbs-config/Cargo.toml b/pbs-config/Cargo.toml
index 48830045..3249a8e9 100644
--- a/pbs-config/Cargo.toml
+++ b/pbs-config/Cargo.toml
@@ -16,7 +16,7 @@ nix = "0.19.1"
 regex = "1.2"
 once_cell = "1.3.1"
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "cli" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index 5b3c7fab..578b8689 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -23,7 +23,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = "0.10.1"
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "api-macro" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-fuse-loop/Cargo.toml b/pbs-fuse-loop/Cargo.toml
index c3220be7..aa61d006 100644
--- a/pbs-fuse-loop/Cargo.toml
+++ b/pbs-fuse-loop/Cargo.toml
@@ -14,7 +14,7 @@ nix = "0.19.1"
 regex = "1.2"
 tokio = { version = "1.6", features = [] }
 
-proxmox = "0.13.3"
+proxmox = "0.13.4"
 proxmox-fuse = "0.1.1"
 
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-tape/Cargo.toml b/pbs-tape/Cargo.toml
index 4ffae21e..3dbeb17c 100644
--- a/pbs-tape/Cargo.toml
+++ b/pbs-tape/Cargo.toml
@@ -18,7 +18,7 @@ bitflags = "1.2.1"
 regex = "1.2"
 udev = ">= 0.3, <0.5"
 
-proxmox = { version = "0.13.3", default-features = false, features = [] }
+proxmox = { version = "0.13.4", default-features = false, features = [] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
index 88f6f54c..f20a315e 100644
--- a/pbs-tools/Cargo.toml
+++ b/pbs-tools/Cargo.toml
@@ -30,7 +30,7 @@ url = "2.1"
 walkdir = "2"
 zstd = { version = "0.6", features = [ "bindgen" ] }
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "tokio" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "tokio" ] }
 
 pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-runtime = { path = "../pbs-runtime" }
diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
index d3c35534..42a4d09f 100644
--- a/proxmox-backup-client/Cargo.toml
+++ b/proxmox-backup-client/Cargo.toml
@@ -22,7 +22,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
+proxmox = { version = "0.13.4", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
@@ -31,5 +31,4 @@ pbs-client = { path = "../pbs-client" }
 pbs-datastore = { path = "../pbs-datastore" }
 pbs-fuse-loop = { path = "../pbs-fuse-loop" }
 pbs-runtime = { path = "../pbs-runtime" }
-proxmox-systemd = { path = "../proxmox-systemd" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index 9ac1d9c2..7c977864 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -118,7 +118,7 @@ fn complete_mapping_names<S: BuildHasher>(_arg: &str, _param: &HashMap<String, S
     match pbs_fuse_loop::find_all_mappings() {
         Ok(mappings) => mappings
             .filter_map(|(name, _)| {
-                proxmox_systemd::unescape_unit(&name).ok()
+                proxmox::tools::systemd::unescape_unit(&name).ok()
             }).collect(),
         Err(_) => Vec::new()
     }
@@ -279,7 +279,7 @@ async fn mount_do(param: Value, pipe: Option<Fd>) -> Result<Value, Error> {
         let reader = CachedChunkReader::new(chunk_reader, index, 8).seekable();
 
         let name = &format!("{}:{}/{}", repo.to_string(), path, archive_name);
-        let name_escaped = proxmox_systemd::escape_unit(name, false);
+        let name_escaped = proxmox::tools::systemd::escape_unit(name, false);
 
         let mut session = pbs_fuse_loop::FuseLoopSession::map_loop(size, reader, &name_escaped, options).await?;
         let loopdev = session.loopdev_path.clone();
@@ -341,7 +341,7 @@ fn unmap(
             pbs_fuse_loop::cleanup_unused_run_files(None);
             let mut any = false;
             for (backing, loopdev) in pbs_fuse_loop::find_all_mappings()? {
-                let name = proxmox_systemd::unescape_unit(&backing)?;
+                let name = proxmox::tools::systemd::unescape_unit(&backing)?;
                 println!("{}:\t{}", loopdev.unwrap_or_else(|| "(unmapped)".to_string()), name);
                 any = true;
             }
@@ -360,7 +360,7 @@ fn unmap(
     if name.starts_with("/dev/loop") {
         pbs_fuse_loop::unmap_loopdev(name)?;
     } else {
-        let name = proxmox_systemd::escape_unit(&name, false);
+        let name = proxmox::tools::systemd::escape_unit(&name, false);
         pbs_fuse_loop::unmap_name(name)?;
     }
 
diff --git a/proxmox-file-restore/Cargo.toml b/proxmox-file-restore/Cargo.toml
index 1e13fb46..899fc984 100644
--- a/proxmox-file-restore/Cargo.toml
+++ b/proxmox-file-restore/Cargo.toml
@@ -16,7 +16,7 @@ tokio = { version = "1.6", features = [ "io-std", "rt", "rt-multi-thread", "time
 
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "api-macro", "cli" ] }
+proxmox = { version = "0.13.4", features = [ "api-macro", "cli" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
@@ -24,5 +24,4 @@ pbs-config = { path = "../pbs-config" }
 pbs-client = { path = "../pbs-client" }
 pbs-datastore = { path = "../pbs-datastore" }
 pbs-runtime = { path = "../pbs-runtime" }
-proxmox-systemd = { path = "../proxmox-systemd" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-file-restore/src/block_driver_qemu.rs b/proxmox-file-restore/src/block_driver_qemu.rs
index 2f73e669..b6eaf83a 100644
--- a/proxmox-file-restore/src/block_driver_qemu.rs
+++ b/proxmox-file-restore/src/block_driver_qemu.rs
@@ -80,7 +80,7 @@ impl VMStateMap {
 
 fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String {
     let full = format!("qemu_{}/{}", repo, snap);
-    proxmox_systemd::escape_unit(&full, false)
+    proxmox::tools::systemd::escape_unit(&full, false)
 }
 
 /// remove non-responsive VMs from given map, returns 'true' if map was modified
@@ -257,7 +257,7 @@ impl BlockRestoreDriver for QemuBlockDriver {
                 let resp = client
                     .get("api2/json/status", Some(json!({"keep-timeout": true})))
                     .await;
-                let name = proxmox_systemd::unescape_unit(n)
+                let name = proxmox::tools::systemd::unescape_unit(n)
                     .unwrap_or_else(|_| "<invalid name>".to_owned());
                 let mut extra = json!({"pid": s.pid, "cid": s.cid});
 
@@ -295,7 +295,7 @@ impl BlockRestoreDriver for QemuBlockDriver {
 
     fn stop(&self, id: String) -> Async<Result<(), Error>> {
         async move {
-            let name = proxmox_systemd::escape_unit(&id, false);
+            let name = proxmox::tools::systemd::escape_unit(&id, false);
             let mut map = VMStateMap::load()?;
             let map_mod = cleanup_map(&mut map.map).await;
             match map.map.get(&name) {
@@ -325,7 +325,7 @@ impl BlockRestoreDriver for QemuBlockDriver {
         match VMStateMap::load_read_only() {
             Ok(state) => state
                 .iter()
-                .filter_map(|(name, _)| proxmox_systemd::unescape_unit(&name).ok())
+                .filter_map(|(name, _)| proxmox::tools::systemd::unescape_unit(&name).ok())
                 .collect(),
             Err(_) => Vec::new(),
         }
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 2f740e67..b02c20db 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -24,7 +24,7 @@ tokio-openssl = "0.6.1"
 tower-service = "0.3.0"
 url = "2.1"
 
-proxmox = { version = "0.13.3", features = [ "router"] }
+proxmox = { version = "0.13.4", features = [ "router"] }
 
 # fixme: remove this dependency (pbs_tools::broadcast_future)
 pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-restore-daemon/Cargo.toml b/proxmox-restore-daemon/Cargo.toml
index c525dc99..871af5f9 100644
--- a/proxmox-restore-daemon/Cargo.toml
+++ b/proxmox-restore-daemon/Cargo.toml
@@ -26,7 +26,7 @@ tokio-util = { version = "0.6", features = [ "codec", "io" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "router", "sortable-macro" ] }
+proxmox = { version = "0.13.4", features = [ "router", "sortable-macro" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-runtime = { path = "../pbs-runtime" }
diff --git a/proxmox-systemd/Cargo.toml b/proxmox-systemd/Cargo.toml
index c6caa7ec..017a281c 100644
--- a/proxmox-systemd/Cargo.toml
+++ b/proxmox-systemd/Cargo.toml
@@ -11,6 +11,6 @@ bitflags = "1.2.1"
 lazy_static = "1.4"
 nom = "5.1"
 
-proxmox = { version = "0.13.3", default-features = false }
+proxmox = { version = "0.13.4", default-features = false }
 
 #pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-systemd/src/unit.rs b/proxmox-systemd/src/unit.rs
index af3db1a6..15be61fd 100644
--- a/proxmox-systemd/src/unit.rs
+++ b/proxmox-systemd/src/unit.rs
@@ -34,88 +34,6 @@ fn run_command(mut command: Command) -> Result<(), Error> {
     Ok(())
 }
 
-/// Escape strings for usage in systemd unit names
-pub fn escape_unit(mut unit: &str, is_path: bool) -> String {
-    if is_path {
-        unit = unit.trim_matches('/');
-        if unit.is_empty() {
-            return String::from("-");
-        }
-    }
-
-    let unit = unit.as_bytes();
-
-    let mut escaped = String::new();
-
-    for (i, c) in unit.iter().enumerate() {
-        if *c == b'/' {
-            escaped.push('-');
-            continue;
-        }
-        if (i == 0 && *c == b'.')
-            || !(*c == b'_'
-                || *c == b'.'
-                || (*c >= b'0' && *c <= b'9')
-                || (*c >= b'A' && *c <= b'Z')
-                || (*c >= b'a' && *c <= b'z'))
-        {
-            escaped.push_str(&format!("\\x{:0x}", c));
-        } else {
-            escaped.push(*c as char);
-        }
-    }
-    escaped
-}
-
-fn parse_hex_digit(d: u8) -> Result<u8, Error> {
-    if d >= b'0' && d <= b'9' {
-        return Ok(d - b'0');
-    }
-    if d >= b'A' && d <= b'F' {
-        return Ok(d - b'A' + 10);
-    }
-    if d >= b'a' && d <= b'f' {
-        return Ok(d - b'a' + 10);
-    }
-    bail!("got invalid hex digit");
-}
-
-/// Unescape strings used in systemd unit names
-pub fn unescape_unit(text: &str) -> Result<String, Error> {
-    let mut i = text.as_bytes();
-
-    let mut data: Vec<u8> = Vec::new();
-
-    loop {
-        if i.is_empty() {
-            break;
-        }
-        let next = i[0];
-        if next == b'\\' {
-            if i.len() < 4 {
-                bail!("short input");
-            }
-            if i[1] != b'x' {
-                bail!("unkwnown escape sequence");
-            }
-            let h1 = parse_hex_digit(i[2])?;
-            let h0 = parse_hex_digit(i[3])?;
-            data.push(h1 << 4 | h0);
-            i = &i[4..]
-        } else if next == b'-' {
-            data.push(b'/');
-            i = &i[1..]
-        } else {
-            data.push(next);
-            i = &i[1..]
-        }
-    }
-
-    let text = String::from_utf8(data)?;
-
-    Ok(text)
-}
-
 pub fn reload_daemon() -> Result<(), Error> {
     let mut command = std::process::Command::new("systemctl");
     command.arg("daemon-reload");
@@ -178,6 +96,9 @@ pub fn reload_unit(unit: &str) -> Result<(), Error> {
 #[test]
 fn test_escape_unit() -> Result<(), Error> {
     fn test_escape(i: &str, expected: &str, is_path: bool) {
+
+        use proxmox::tools::systemd::{escape_unit, unescape_unit};
+
         let escaped = escape_unit(i, is_path);
         assert_eq!(escaped, expected);
         let unescaped = unescape_unit(&escaped).unwrap();
diff --git a/pxar-bin/Cargo.toml b/pxar-bin/Cargo.toml
index e322e654..6121f7bc 100644
--- a/pxar-bin/Cargo.toml
+++ b/pxar-bin/Cargo.toml
@@ -16,7 +16,7 @@ serde_json = "1.0"
 tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
 
 pathpatterns = "0.1.2"
-proxmox = { version = "0.13.3", default-features = false, features = [] }
+proxmox = { version = "0.13.4", default-features = false, features = [] }
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 pbs-client = { path = "../pbs-client" }
diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
index 2f4a738d..49127586 100644
--- a/src/api2/node/disks/directory.rs
+++ b/src/api2/node/disks/directory.rs
@@ -242,7 +242,7 @@ pub fn delete_datastore_disk(name: String) -> Result<(), Error> {
     }
 
     // disable systemd mount-unit
-    let mut mount_unit_name = proxmox_systemd::escape_unit(&path, true);
+    let mut mount_unit_name = proxmox::tools::systemd::escape_unit(&path, true);
     mount_unit_name.push_str(".mount");
     proxmox_systemd::disable_unit(&mount_unit_name)?;
 
@@ -281,7 +281,7 @@ fn create_datastore_mount_unit(
     what: &str,
 ) -> Result<String, Error> {
 
-    let mut mount_unit_name = proxmox_systemd::escape_unit(&mount_point, true);
+    let mut mount_unit_name = proxmox::tools::systemd::escape_unit(&mount_point, true);
     mount_unit_name.push_str(".mount");
 
     let mount_unit_path = format!("/etc/systemd/system/{}", mount_unit_name);
diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
index 9fe0dac4..8a6cb708 100644
--- a/src/api2/node/disks/zfs.rs
+++ b/src/api2/node/disks/zfs.rs
@@ -271,7 +271,7 @@ pub fn create_zpool(
             worker.log(output);
 
             if std::path::Path::new("/lib/systemd/system/zfs-import@.service").exists() {
-                let import_unit = format!("zfs-import@{}.service", proxmox_systemd::escape_unit(&name, false));
+                let import_unit = format!("zfs-import@{}.service", proxmox::tools::systemd::escape_unit(&name, false));
                 proxmox_systemd::enable_unit(&import_unit)?;
             }
 
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 191d8a44..94ffbeb0 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -18,9 +18,9 @@ use once_cell::sync::OnceCell;
 use proxmox::sys::linux::procfs;
 use proxmox::try_block;
 use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
+use proxmox::api::upid::UPID;
 
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use pbs_api_types::UPID;
 use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
 struct TaskListLockGuard(File);
diff --git a/src/tape/drive/mod.rs b/src/tape/drive/mod.rs
index f477acc7..ef5ffdbf 100644
--- a/src/tape/drive/mod.rs
+++ b/src/tape/drive/mod.rs
@@ -606,7 +606,7 @@ pub struct DeviceLockGuard(std::fs::File);
 // Uses systemd escape_unit to compute a file name from `device_path`, the try
 // to lock `/var/lock/<name>`.
 fn open_device_lock(device_path: &str) -> Result<std::fs::File, Error> {
-    let lock_name = proxmox_systemd::escape_unit(device_path, true);
+    let lock_name = proxmox::tools::systemd::escape_unit(device_path, true);
 
     let mut path = std::path::PathBuf::from(crate::tape::DRIVE_LOCK_DIR);
     path.push(lock_name);
-- 
2.30.2





^ permalink raw reply	[flat|nested] 8+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4 Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket Dietmar Maurer
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 8+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

Also moved pbs-datastore/src/task.rs to pbs-tools, which now depends on 'log'.
---
 pbs-datastore/src/chunk_store.rs              |  5 +-
 pbs-datastore/src/lib.rs                      |  1 -
 pbs-tools/Cargo.toml                          |  1 +
 pbs-tools/src/lib.rs                          |  1 +
 {pbs-datastore => pbs-tools}/src/task.rs      |  0
 proxmox-rest-server/Cargo.toml                |  1 +
 proxmox-rest-server/src/lib.rs                | 40 ++++++++++++++++
 .../src}/worker_task.rs                       | 40 ++++------------
 src/acme/plugin.rs                            |  2 +-
 src/api2/admin/datastore.rs                   |  4 +-
 src/api2/backup/environment.rs                |  4 +-
 src/api2/backup/mod.rs                        |  3 +-
 src/api2/config/acme.rs                       |  2 +-
 src/api2/config/datastore.rs                  |  5 +-
 src/api2/node/apt.rs                          |  2 +-
 src/api2/node/certificates.rs                 |  2 +-
 src/api2/node/disks/directory.rs              |  2 +-
 src/api2/node/disks/mod.rs                    |  2 +-
 src/api2/node/disks/zfs.rs                    |  2 +-
 src/api2/node/mod.rs                          |  2 +-
 src/api2/node/network.rs                      |  2 +-
 src/api2/node/services.rs                     |  2 +-
 src/api2/node/tasks.rs                        | 32 ++++++++++---
 src/api2/pull.rs                              |  3 +-
 src/api2/reader/environment.rs                |  2 +-
 src/api2/reader/mod.rs                        |  2 +-
 src/api2/tape/backup.rs                       |  6 +--
 src/api2/tape/drive.rs                        |  4 +-
 src/api2/tape/restore.rs                      | 10 ++--
 src/backup/datastore.rs                       |  3 +-
 src/backup/verify.rs                          |  4 +-
 src/bin/proxmox-backup-api.rs                 | 17 +++----
 src/bin/proxmox-backup-manager.rs             |  3 +-
 src/bin/proxmox-backup-proxy.rs               | 24 +++++-----
 src/bin/proxmox-daily-update.rs               |  2 +-
 src/bin/proxmox_backup_debug/api.rs           |  4 +-
 src/server/gc_job.rs                          |  2 +-
 src/server/h2service.rs                       |  4 +-
 src/server/jobstate.rs                        |  6 +--
 src/server/mod.rs                             | 48 ++-----------------
 src/server/prune_job.rs                       |  6 +--
 src/server/pull.rs                            |  5 +-
 src/server/verify_job.rs                      |  4 +-
 src/tape/drive/mod.rs                         |  9 ++--
 src/tape/pool_writer/mod.rs                   |  4 +-
 45 files changed, 159 insertions(+), 170 deletions(-)
 rename {pbs-datastore => pbs-tools}/src/task.rs (100%)
 rename {src/server => proxmox-rest-server/src}/worker_task.rs (95%)

diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
index 361cc9a2..5c50e4fc 100644
--- a/pbs-datastore/src/chunk_store.rs
+++ b/pbs-datastore/src/chunk_store.rs
@@ -9,10 +9,9 @@ use proxmox::tools::fs::{CreateOptions, create_path, create_dir};
 
 use pbs_api_types::GarbageCollectionStatus;
 use pbs_tools::process_locker::{self, ProcessLocker};
+use pbs_tools::{task_log, task::TaskState};
 
 use crate::DataBlob;
-use crate::task_log;
-use crate::task::TaskState;
 
 /// File system based chunk store
 pub struct ChunkStore {
@@ -306,7 +305,7 @@ impl ChunkStore {
         for (entry, percentage, bad) in self.get_chunk_iterator()? {
             if last_percentage != percentage {
                 last_percentage = percentage;
-                crate::task_log!(
+                task_log!(
                     worker,
                     "processed {}% ({} chunks)",
                     percentage,
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index cfe39921..5a09666b 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -179,7 +179,6 @@ pub mod paperkey;
 pub mod prune;
 pub mod read_chunk;
 pub mod store_progress;
-pub mod task;
 
 pub mod dynamic_index;
 pub mod fixed_index;
diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
index f20a315e..d37ef865 100644
--- a/pbs-tools/Cargo.toml
+++ b/pbs-tools/Cargo.toml
@@ -17,6 +17,7 @@ foreign-types = "0.3"
 futures = "0.3"
 lazy_static = "1.4"
 libc = "0.2"
+log = "0.4"
 nix = "0.19.1"
 nom = "5.1"
 openssl = "0.10"
diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
index 000591c3..6c2f0ff5 100644
--- a/pbs-tools/src/lib.rs
+++ b/pbs-tools/src/lib.rs
@@ -24,6 +24,7 @@ pub mod str;
 pub mod stream;
 pub mod sync;
 pub mod sys;
+pub mod task;
 pub mod ticket;
 pub mod tokio;
 pub mod xattr;
diff --git a/pbs-datastore/src/task.rs b/pbs-tools/src/task.rs
similarity index 100%
rename from pbs-datastore/src/task.rs
rename to pbs-tools/src/task.rs
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index b02c20db..afaf40e1 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -15,6 +15,7 @@ lazy_static = "1.4"
 libc = "0.2"
 log = "0.4"
 nix = "0.19.1"
+once_cell = "1.3.1"
 percent-encoding = "2.1"
 regex = "1.2"
 serde = { version = "1.0", features = [] }
diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
index 2f29f9cd..9acdb3fd 100644
--- a/proxmox-rest-server/src/lib.rs
+++ b/proxmox-rest-server/src/lib.rs
@@ -1,9 +1,12 @@
 use std::os::unix::io::RawFd;
 
 use anyhow::{bail, format_err, Error};
+use nix::unistd::Pid;
 
 use proxmox::tools::fd::Fd;
+use proxmox::sys::linux::procfs::PidStat;
 use proxmox::api::UserInformation;
+use proxmox::tools::fs::CreateOptions;
 
 mod compression;
 pub use compression::*;
@@ -29,6 +32,9 @@ pub use api_config::ApiConfig;
 mod rest;
 pub use rest::{RestServer, handle_api_request};
 
+mod worker_task;
+pub use worker_task::*;
+
 pub enum AuthError {
     Generic(Error),
     NoData,
@@ -48,6 +54,40 @@ pub trait ApiAuth {
     ) -> Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>;
 }
 
+lazy_static::lazy_static!{
+    static ref PID: i32 = unsafe { libc::getpid() };
+    static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
+}
+
+pub fn pid() -> i32 {
+    *PID
+}
+
+pub fn pstart() -> u64 {
+    *PSTART
+}
+
+pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
+    let pid_str = format!("{}\n", *PID);
+    proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
+}
+
+pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
+    let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
+    let pid = std::str::from_utf8(&pid)?.trim();
+    pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
+}
+
+pub fn ctrl_sock_from_pid(pid: i32) -> String {
+    // Note: The control socket always uses @/run/proxmox-backup/ as prefix
+    // for historc reason.
+    format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
+}
+
+pub fn our_ctrl_sock() -> String {
+    ctrl_sock_from_pid(*PID)
+}
+
 static mut SHUTDOWN_REQUESTED: bool = false;
 
 pub fn request_shutdown() {
diff --git a/src/server/worker_task.rs b/proxmox-rest-server/src/worker_task.rs
similarity index 95%
rename from src/server/worker_task.rs
rename to proxmox-rest-server/src/worker_task.rs
index 94ffbeb0..b6ed6862 100644
--- a/src/server/worker_task.rs
+++ b/proxmox-rest-server/src/worker_task.rs
@@ -21,7 +21,8 @@ use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file,
 use proxmox::api::upid::UPID;
 
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
+
+use crate::{CommandoSocket, FileLogger, FileLogOptions};
 
 struct TaskListLockGuard(File);
 
@@ -280,7 +281,7 @@ lazy_static! {
 
 /// checks if the task UPID refers to a worker from this process
 fn is_local_worker(upid: &UPID) -> bool {
-    upid.pid == crate::server::pid() && upid.pstart == crate::server::pstart()
+    upid.pid == crate::pid() && upid.pstart == crate::pstart()
 }
 
 /// Test if the task is still running
@@ -293,14 +294,14 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
         return Ok(false);
     }
 
-    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
+    let sock = crate::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
         "command": "worker-task-status",
         "args": {
             "upid": upid.to_string(),
         },
     });
-    let status = proxmox_rest_server::send_command(sock, &cmd).await?;
+    let status = crate::send_command(sock, &cmd).await?;
 
     if let Some(active) = status.as_bool() {
         Ok(active)
@@ -366,14 +367,14 @@ pub fn abort_worker_async(upid: UPID) {
 
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 
-    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
+    let sock = crate::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
         "command": "worker-task-abort",
         "args": {
             "upid": upid.to_string(),
         },
     });
-    proxmox_rest_server::send_command(sock, &cmd).map_ok(|_| ()).await
+    crate::send_command(sock, &cmd).map_ok(|_| ()).await
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
@@ -476,27 +477,6 @@ pub struct TaskListInfo {
     pub state: Option<TaskState>, // endtime, status
 }
 
-impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
-    fn into(self) -> pbs_api_types::TaskListItem {
-        let (endtime, status) = self
-            .state
-            .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
-
-        pbs_api_types::TaskListItem {
-            upid: self.upid_str,
-            node: "localhost".to_string(),
-            pid: self.upid.pid as i64,
-            pstart: self.upid.pstart,
-            starttime: self.upid.starttime,
-            worker_type: self.upid.worker_type,
-            worker_id: self.upid.worker_id,
-            user: self.upid.auth_id,
-            endtime,
-            status,
-        }
-    }
-}
-
 fn render_task_line(info: &TaskListInfo) -> String {
     let mut raw = String::new();
     if let Some(status) = &info.state {
@@ -715,7 +695,7 @@ impl WorkerTask {
         {
             let mut hash = WORKER_TASK_LIST.lock().unwrap();
             hash.insert(task_id, worker.clone());
-            proxmox_rest_server::set_worker_count(hash.len());
+            crate::set_worker_count(hash.len());
         }
 
         setup.update_active_workers(Some(&upid))?;
@@ -802,7 +782,7 @@ impl WorkerTask {
 
         WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
         let _ = self.setup.update_active_workers(None);
-        proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
+        crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
     }
 
     /// Log a message.
@@ -879,7 +859,7 @@ impl WorkerTask {
     }
 }
 
-impl pbs_datastore::task::TaskState for WorkerTask {
+impl pbs_tools::task::TaskState for WorkerTask {
     fn check_abort(&self) -> Result<(), Error> {
         self.fail_on_abort()
     }
diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
index 7593aaa4..cb7de082 100644
--- a/src/acme/plugin.rs
+++ b/src/acme/plugin.rs
@@ -13,7 +13,7 @@ use proxmox_acme_rs::{Authorization, Challenge};
 
 use crate::acme::AcmeClient;
 use crate::api2::types::AcmeDomain;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 use crate::config::acme::plugin::{DnsPlugin, PluginData};
 
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index fbb93f35..154a2e84 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -54,7 +54,7 @@ use pbs_tools::blocking::WrappedReaderStream;
 use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
 use pbs_tools::json::{required_integer_param, required_string_param};
 use pbs_config::CachedUserInfo;
-use proxmox_rest_server::formatter;
+use proxmox_rest_server::{WorkerTask, formatter};
 
 use crate::api2::node::rrd::create_value_from_rrd;
 use crate::backup::{
@@ -62,7 +62,7 @@ use crate::backup::{
     DataStore, LocalChunkReader,
 };
 
-use crate::server::{jobstate::Job, WorkerTask};
+use crate::server::jobstate::Job;
 
 
 const GROUP_NOTES_FILE_NAME: &str = "notes";
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 306f91ee..8112737e 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -15,10 +15,10 @@ use pbs_datastore::backup_info::{BackupDir, BackupInfo};
 use pbs_datastore::dynamic_index::DynamicIndexWriter;
 use pbs_datastore::fixed_index::FixedIndexWriter;
 use pbs_api_types::Authid;
-use proxmox_rest_server::formatter::*;
+use proxmox_rest_server::{WorkerTask, formatter::*};
 
 use crate::backup::{verify_backup_dir_with_lock, DataStore};
-use crate::server::WorkerTask;
+
 use hyper::{Body, Response};
 
 #[derive(Copy, Clone, Serialize)]
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index c14f19a4..4333be17 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -23,8 +23,9 @@ use pbs_datastore::PROXMOX_BACKUP_PROTOCOL_ID_V1;
 use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType};
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{WorkerTask, H2Service};
+use crate::server::H2Service;
 use crate::backup::DataStore;
 use pbs_config::CachedUserInfo;
 
diff --git a/src/api2/config/acme.rs b/src/api2/config/acme.rs
index 564cafae..8593acac 100644
--- a/src/api2/config/acme.rs
+++ b/src/api2/config/acme.rs
@@ -24,7 +24,7 @@ use crate::api2::types::{AcmeAccountName, AcmeChallengeSchema, KnownAcmeDirector
 use crate::config::acme::plugin::{
     self, DnsPlugin, DnsPluginCore, DnsPluginCoreUpdater, PLUGIN_ID_SCHEMA,
 };
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub(crate) const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
index 0e6529f8..0f9234ca 100644
--- a/src/api2/config/datastore.rs
+++ b/src/api2/config/datastore.rs
@@ -9,7 +9,6 @@ use proxmox::api::section_config::SectionConfigData;
 use proxmox::api::schema::{ApiType, parse_property_string};
 
 use pbs_datastore::chunk_store::ChunkStore;
-use pbs_datastore::task::TaskState;
 use pbs_config::BackupLockGuard;
 use pbs_api_types::{
     Authid, DatastoreNotify,
@@ -17,6 +16,7 @@ use pbs_api_types::{
     PRIV_DATASTORE_ALLOCATE, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_MODIFY,
     DataStoreConfig, DataStoreConfigUpdater,
 };
+use pbs_tools::task::TaskState;
 
 use crate::api2::config::sync::delete_sync_job;
 use crate::api2::config::verify::delete_verification_job;
@@ -26,8 +26,9 @@ use crate::api2::admin::{
     verify::list_verification_jobs,
 };
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{jobstate, WorkerTask};
+use crate::server::jobstate;
 
 #[api(
     input: {
diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
index f02920c0..4fd81592 100644
--- a/src/api2/node/apt.rs
+++ b/src/api2/node/apt.rs
@@ -19,7 +19,7 @@ use pbs_api_types::{
 };
 
 use crate::config::node;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 use crate::tools::{
     apt,
     pbs_simple_http,
diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs
index 7b31861e..80733fe9 100644
--- a/src/api2/node/certificates.rs
+++ b/src/api2/node/certificates.rs
@@ -18,7 +18,7 @@ use pbs_tools::cert;
 use crate::acme::AcmeClient;
 use crate::api2::types::AcmeDomain;
 use crate::config::node::NodeConfig;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
index 49127586..91369643 100644
--- a/src/api2/node/disks/directory.rs
+++ b/src/api2/node/disks/directory.rs
@@ -17,7 +17,7 @@ use crate::tools::disks::{
 };
 use crate::tools::systemd::{self, types::*};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 const BASE_MOUNT_DIR: &str = "/mnt/datastore/";
 
diff --git a/src/api2/node/disks/mod.rs b/src/api2/node/disks/mod.rs
index b4001a54..f08c340b 100644
--- a/src/api2/node/disks/mod.rs
+++ b/src/api2/node/disks/mod.rs
@@ -15,7 +15,7 @@ use crate::tools::disks::{
     DiskUsageInfo, DiskUsageType, DiskManage, SmartData,
     get_disks, get_smart_data, get_disk_usage_info, inititialize_gpt_disk,
 };
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub mod directory;
 pub mod zfs;
diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
index 8a6cb708..efea9b70 100644
--- a/src/api2/node/disks/zfs.rs
+++ b/src/api2/node/disks/zfs.rs
@@ -19,7 +19,7 @@ use crate::tools::disks::{
     DiskUsageType,
 };
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 
 #[api(
diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
index 8e357311..7a5bb64e 100644
--- a/src/api2/node/mod.rs
+++ b/src/api2/node/mod.rs
@@ -24,7 +24,7 @@ use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_CONSOLE};
 use pbs_tools::auth::private_auth_key;
 use pbs_tools::ticket::{self, Empty, Ticket};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 use crate::tools;
 
 pub mod apt;
diff --git a/src/api2/node/network.rs b/src/api2/node/network.rs
index 0fde9f2a..d496b5f8 100644
--- a/src/api2/node/network.rs
+++ b/src/api2/node/network.rs
@@ -13,7 +13,7 @@ use pbs_api_types::{
 };
 use pbs_config::network::{self, NetworkConfig};
 
-use crate::server::{WorkerTask};
+use proxmox_rest_server::WorkerTask;
 
 fn split_interface_list(list: &str) -> Result<Vec<String>, Error> {
     let value = parse_property_string(&list, &NETWORK_INTERFACE_ARRAY_SCHEMA)?;
diff --git a/src/api2/node/services.rs b/src/api2/node/services.rs
index 6c757f43..8df0fb24 100644
--- a/src/api2/node/services.rs
+++ b/src/api2/node/services.rs
@@ -9,7 +9,7 @@ use proxmox::api::router::SubdirMap;
 
 use pbs_api_types::{Authid, NODE_SCHEMA, SERVICE_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 static SERVICE_NAME_LIST: [&str; 7] = [
     "proxmox-backup",
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index df4673a1..0d2b456c 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -16,7 +16,8 @@ use pbs_api_types::{
 };
 
 use crate::api2::pull::check_pull_privs;
-use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
+
+use proxmox_rest_server::{upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
 use pbs_config::CachedUserInfo;
 
 // matches respective job execution privileges
@@ -125,6 +126,25 @@ pub fn tasktype(state: &TaskState) -> TaskStateType {
     }
 }
 
+fn into_task_list_item(info: proxmox_rest_server::TaskListInfo) -> pbs_api_types::TaskListItem {
+    let (endtime, status) = info
+        .state
+        .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
+
+    pbs_api_types::TaskListItem {
+        upid: info.upid_str,
+        node: "localhost".to_string(),
+        pid: info.upid.pid as i64,
+        pstart: info.upid.pstart,
+        starttime: info.upid.starttime,
+        worker_type: info.upid.worker_type,
+        worker_id: info.upid.worker_id,
+        user: info.upid.auth_id,
+        endtime,
+        status,
+    }
+}
+
 #[api(
     input: {
         properties: {
@@ -217,7 +237,7 @@ async fn get_task_status(
         result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
     }
 
-    if crate::server::worker_is_active(&upid).await? {
+    if proxmox_rest_server::worker_is_active(&upid).await? {
         result["status"] = Value::from("running");
     } else {
         let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
@@ -314,7 +334,7 @@ async fn read_task_log(
     rpcenv["total"] = Value::from(count);
 
     if test_status {
-        let active = crate::server::worker_is_active(&upid).await?;
+        let active = proxmox_rest_server::worker_is_active(&upid).await?;
         rpcenv["active"] = Value::from(active);
     }
 
@@ -354,7 +374,7 @@ fn stop_task(
         user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
     }
 
-    server::abort_worker_async(upid);
+    proxmox_rest_server::abort_worker_async(upid);
 
     Ok(Value::Null)
 }
@@ -502,7 +522,7 @@ pub fn list_tasks(
 
         match (&info.state, &statusfilter) {
             (Some(_), _) if running => continue,
-            (Some(crate::server::TaskState::OK { .. }), _) if errors => continue,
+            (Some(TaskState::OK { .. }), _) if errors => continue,
             (Some(state), Some(filters)) => {
                 if !filters.contains(&tasktype(state)) {
                     continue;
@@ -517,7 +537,7 @@ pub fn list_tasks(
             continue;
         }
 
-        result.push(info.into());
+        result.push(into_task_list_item(info));
 
         if result.len() >= limit {
             break;
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e631920f..0240098d 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -13,8 +13,9 @@ use pbs_api_types::{
     DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
 };
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{WorkerTask, jobstate::Job, pull::pull_store};
+use crate::server::{jobstate::Job, pull::pull_store};
 use crate::backup::DataStore;
 use pbs_config::CachedUserInfo;
 
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index f7d79072..c85ec069 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -10,7 +10,7 @@ use pbs_api_types::Authid;
 use proxmox_rest_server::formatter::*;
 
 use crate::backup::DataStore;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 //use proxmox::tools;
 
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index fada952c..c663e9ae 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -39,12 +39,12 @@ use pbs_datastore::backup_info::BackupDir;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType};
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     api2::helpers,
     backup::DataStore,
     server::{
-        WorkerTask,
         H2Service,
     },
 };
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index fadbfa3d..5effa99d 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -20,10 +20,11 @@ use pbs_api_types::{
     UPID_SCHEMA, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE,
 };
 
-use pbs_datastore::{task_log, task_warn, StoreProgress};
+use pbs_datastore::StoreProgress;
 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
-use pbs_datastore::task::TaskState;
+use pbs_tools::{task_log, task_warn, task::TaskState};
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     server::{
@@ -36,7 +37,6 @@ use crate::{
         },
     },
     backup::{DataStore, SnapshotReader},
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         Inventory,
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index 10aa6842..8227f659 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -28,7 +28,6 @@ use pbs_api_types::{
     LtoDriveAndMediaStatus, Lp17VolumeStatistics,
 };
  
-use pbs_datastore::task_log;
 use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE};
 use pbs_config::CachedUserInfo;
 use pbs_tape::{
@@ -36,13 +35,14 @@ use pbs_tape::{
     sg_tape::tape_alert_flags_critical,
     linux_list_drives::{lto_tape_device_list, lookup_device_identification, open_lto_tape_device},
 };
+use pbs_tools::task_log;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     api2::tape::restore::{
         fast_catalog_restore,
         restore_media,
     },
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         Inventory,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 7739d1a4..045d8d6c 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -34,26 +34,24 @@ use pbs_api_types::{
     UPID_SCHEMA, TAPE_RESTORE_SNAPSHOT_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_TAPE_READ,
 };
-use pbs_datastore::{task_log, task_warn, DataBlob};
+use pbs_datastore::DataBlob;
 use pbs_datastore::backup_info::BackupDir;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
-use pbs_datastore::task::TaskState;
 use pbs_config::CachedUserInfo;
 use pbs_tape::{
     TapeRead, BlockReadError, MediaContentHeader,
     PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0,
 };
+use pbs_tools::{task_log, task_warn, task::TaskState};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     tools::ParallelHandler,
     backup::DataStore,
-    server::{
-        lookup_user_email,
-        WorkerTask,
-    },
+    server::lookup_user_email,
     tape::{
         TAPE_STATUS_DIR,
         MediaId,
diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs
index df8d46b6..fcef2d39 100644
--- a/src/backup/datastore.rs
+++ b/src/backup/datastore.rs
@@ -12,7 +12,6 @@ use lazy_static::lazy_static;
 use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions};
 
 use pbs_api_types::{UPID, DataStoreConfig, Authid, GarbageCollectionStatus};
-use pbs_datastore::{task_log, task_warn};
 use pbs_datastore::DataBlob;
 use pbs_datastore::backup_info::{BackupGroup, BackupDir};
 use pbs_datastore::chunk_store::ChunkStore;
@@ -24,10 +23,10 @@ use pbs_datastore::manifest::{
     ArchiveType, BackupManifest,
     archive_type,
 };
-use pbs_datastore::task::TaskState;
 use pbs_tools::format::HumanByte;
 use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
 use pbs_tools::process_locker::ProcessLockSharedGuard;
+use pbs_tools::{task_log, task_warn, task::TaskState};
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::fail_on_shutdown;
 
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index b8d2b2f3..051d6918 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -7,12 +7,12 @@ use std::time::Instant;
 use anyhow::{bail, format_err, Error};
 
 use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
-use pbs_datastore::{task_log, DataBlob, StoreProgress};
+use pbs_datastore::{DataBlob, StoreProgress};
 use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
-use pbs_datastore::task::TaskState;
 use pbs_tools::fs::lock_dir_noblock_shared;
+use pbs_tools::{task_log, task::TaskState};
 
 use crate::{
     backup::DataStore,
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 9901b85d..86650de6 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -10,14 +10,9 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::tools::fs::CreateOptions;
 
 use pbs_tools::auth::private_auth_key;
-use proxmox_rest_server::{ApiConfig, RestServer};
-
-use proxmox_backup::server::{
-    self,
-    auth::default_api_auth,
-};
-use proxmox_rest_server::daemon;
+use proxmox_rest_server::{daemon, ApiConfig, RestServer};
 
+use proxmox_backup::server::auth::default_api_auth;
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::config;
 
@@ -86,7 +81,7 @@ async fn run() -> Result<(), Error> {
     )?;
 
     let backup_user = pbs_config::backup_user()?;
-    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
 
     let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
     let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
@@ -107,7 +102,7 @@ async fn run() -> Result<(), Error> {
 
 
     let rest_server = RestServer::new(config);
-    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     // http server future:
     let server = daemon::create_daemon(
@@ -130,11 +125,11 @@ async fn run() -> Result<(), Error> {
         "proxmox-backup.service",
     );
 
-    server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::register_task_control_commands(&mut commando_sock)?;
+        proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
         proxmox_rest_server::server_state_init()?;
         Ok(())
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 689f44db..b9e4e2ff 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -14,9 +14,10 @@ use pbs_api_types::{
     IGNORE_VERIFIED_BACKUPS_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
 };
 
+use proxmox_rest_server::wait_for_local_worker;
+
 use proxmox_backup::config;
 use proxmox_backup::api2;
-use proxmox_backup::server::wait_for_local_worker;
 
 mod proxmox_backup_manager;
 use proxmox_backup_manager::*;
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 5d8ed189..ec4da15b 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -19,18 +19,16 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::sys::linux::socket::set_tcp_keepalive;
 use proxmox::tools::fs::CreateOptions;
 
-use proxmox_rest_server::{ApiConfig, RestServer};
+use proxmox_rest_server::{rotate_task_log_archive, ApiConfig, RestServer, WorkerTask};
 
 use proxmox_backup::{
     backup::DataStore,
     server::{
         auth::default_api_auth,
-        WorkerTask,
         jobstate::{
             self,
             Job,
         },
-        rotate_task_log_archive,
     },
 };
 
@@ -188,7 +186,7 @@ async fn run() -> Result<(), Error> {
     config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
 
     let backup_user = pbs_config::backup_user()?;
-    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
 
     let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
     let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
@@ -208,7 +206,7 @@ async fn run() -> Result<(), Error> {
     )?;
 
     let rest_server = RestServer::new(config);
-    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
 
@@ -266,11 +264,11 @@ async fn run() -> Result<(), Error> {
         "proxmox-backup-proxy.service",
     );
 
-    server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::register_task_control_commands(&mut commando_sock)?;
+        proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
         proxmox_rest_server::server_state_init()?;
         Ok(())
@@ -806,11 +804,11 @@ async fn schedule_task_log_rotate() {
 async fn command_reopen_access_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
-    let sock = crate::server::our_ctrl_sock();
+    let sock = proxmox_rest_server::our_ctrl_sock();
     let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
-    let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(pid);
+    let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
     let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
@@ -824,11 +822,11 @@ async fn command_reopen_access_logfiles() -> Result<(), Error> {
 async fn command_reopen_auth_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
-    let sock = crate::server::our_ctrl_sock();
+    let sock = proxmox_rest_server::our_ctrl_sock();
     let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
-    let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(pid);
+    let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
     let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
diff --git a/src/bin/proxmox-daily-update.rs b/src/bin/proxmox-daily-update.rs
index c1580b97..09a768b1 100644
--- a/src/bin/proxmox-daily-update.rs
+++ b/src/bin/proxmox-daily-update.rs
@@ -11,7 +11,7 @@ async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
     let sleep_duration = core::time::Duration::new(0, 100_000_000);
 
     loop {
-        if !proxmox_backup::server::worker_is_active_local(&upid) {
+        if !proxmox_rest_server::worker_is_active_local(&upid) {
             break;
         }
         tokio::time::sleep(sleep_duration).await;
diff --git a/src/bin/proxmox_backup_debug/api.rs b/src/bin/proxmox_backup_debug/api.rs
index bebe9ddc..003f6677 100644
--- a/src/bin/proxmox_backup_debug/api.rs
+++ b/src/bin/proxmox_backup_debug/api.rs
@@ -235,12 +235,12 @@ async fn handle_worker(upid_str: &str) -> Result<(), Error> {
     let abort_future = async move {
         while signal_stream.recv().await.is_some() {
             println!("got shutdown request (SIGINT)");
-            proxmox_backup::server::abort_local_worker(upid.clone());
+            proxmox_rest_server::abort_local_worker(upid.clone());
         }
         Ok::<_, Error>(())
     };
 
-    let result_future = proxmox_backup::server::wait_for_local_worker(upid_str);
+    let result_future = proxmox_rest_server::wait_for_local_worker(upid_str);
 
     futures::select! {
         result = result_future.fuse() => result?,
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 317f4a36..608b5831 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -2,9 +2,9 @@ use std::sync::Arc;
 use anyhow::Error;
 
 use pbs_api_types::Authid;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::WorkerTask,
     server::jobstate::Job,
     backup::DataStore,
 };
diff --git a/src/server/h2service.rs b/src/server/h2service.rs
index 41d628be..0b51a710 100644
--- a/src/server/h2service.rs
+++ b/src/server/h2service.rs
@@ -11,11 +11,9 @@ use hyper::{Body, Request, Response, StatusCode};
 use proxmox::api::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
 use proxmox::http_err;
 
-use proxmox_rest_server::normalize_uri_path;
+use proxmox_rest_server::{normalize_uri_path, WorkerTask};
 use proxmox_rest_server::formatter::*;
 
-use crate::server::WorkerTask;
-
 /// Hyper Service implementation to handle stateful H2 connections.
 ///
 /// We use this kind of service to handle backup protocol
diff --git a/src/server/jobstate.rs b/src/server/jobstate.rs
index 74224f33..ed71ec71 100644
--- a/src/server/jobstate.rs
+++ b/src/server/jobstate.rs
@@ -50,11 +50,7 @@ use proxmox_systemd::time::{compute_next_event, parse_calendar_event};
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use pbs_api_types::{UPID, JobScheduleStatus};
 
-use crate::server::{
-    TaskState,
-    upid_read_status,
-    worker_is_active_local,
-};
+use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
 
 #[derive(Serialize, Deserialize)]
 #[serde(rename_all = "kebab-case")]
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 77320da6..96d57bd4 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -4,51 +4,13 @@
 //! services. We want async IO, so this is built on top of
 //! tokio/hyper.
 
-use anyhow::{format_err, Error};
-use lazy_static::lazy_static;
-use nix::unistd::Pid;
+use anyhow::Error;
 use serde_json::Value;
 
-use proxmox::sys::linux::procfs::PidStat;
 use proxmox::tools::fs::{create_path, CreateOptions};
 
 use pbs_buildcfg;
 
-lazy_static! {
-    static ref PID: i32 = unsafe { libc::getpid() };
-    static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
-}
-
-pub fn pid() -> i32 {
-    *PID
-}
-
-pub fn pstart() -> u64 {
-    *PSTART
-}
-
-pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
-    let pid_str = format!("{}\n", *PID);
-    proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
-}
-
-pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
-    let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
-    let pid = std::str::from_utf8(&pid)?.trim();
-    pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
-}
-
-pub fn ctrl_sock_from_pid(pid: i32) -> String {
-    format!("\0{}/control-{}.sock", pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, pid)
-}
-
-pub fn our_ctrl_sock() -> String {
-    ctrl_sock_from_pid(*PID)
-}
-
-mod worker_task;
-pub use worker_task::*;
-
 mod h2service;
 pub use h2service::*;
 
@@ -76,16 +38,16 @@ pub mod auth;
 pub mod pull;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
-    let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+    let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
     let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
         .await?;
     Ok(())
 }
 
 pub(crate) async fn notify_datastore_removed() -> Result<(), Error> {
-    let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+    let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
     let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
         .await?;
     Ok(())
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 8d971a1c..53740187 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -2,17 +2,17 @@ use std::sync::Arc;
 
 use anyhow::Error;
 
-use pbs_datastore::{task_log, task_warn};
 use pbs_datastore::backup_info::BackupInfo;
 use pbs_datastore::prune::compute_prune_info;
 use pbs_api_types::{Authid, PRIV_DATASTORE_MODIFY, PruneOptions};
 use pbs_config::CachedUserInfo;
+use pbs_tools::{task_log, task_warn};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::DataStore,
     server::jobstate::Job,
-    server::WorkerTask,
-};
+ };
 
 pub fn prune_datastore(
     worker: Arc<WorkerTask>,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5214a218..f913ac8a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -13,7 +13,7 @@ use serde_json::json;
 use proxmox::api::error::{HttpError, StatusCode};
 
 use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_datastore::{BackupInfo, BackupDir, BackupGroup, StoreProgress};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -22,11 +22,12 @@ use pbs_datastore::manifest::{
     CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
 };
 use pbs_tools::sha::sha256;
+use pbs_tools::task_log;
 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::DataStore,
-    server::WorkerTask,
     tools::ParallelHandler,
 };
 
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index 6005b706..62fa6fa8 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -1,10 +1,10 @@
 use anyhow::{format_err, Error};
 
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
 use pbs_api_types::{Authid, VerificationJobConfig};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::WorkerTask,
     server::jobstate::Job,
     backup::{
         DataStore,
diff --git a/src/tape/drive/mod.rs b/src/tape/drive/mod.rs
index ef5ffdbf..e8e60d19 100644
--- a/src/tape/drive/mod.rs
+++ b/src/tape/drive/mod.rs
@@ -30,19 +30,16 @@ use proxmox::{
 
 use pbs_api_types::{VirtualTapeDrive, LtoTapeDrive, Fingerprint};
 use pbs_config::key_config::KeyConfig;
-use pbs_datastore::task::TaskState;
-use pbs_datastore::task_log;
+use pbs_tools::{task_log, task::TaskState};
 
 use pbs_tape::{
     TapeWrite, TapeRead, BlockReadError, MediaContentHeader,
     sg_tape::TapeAlertFlags,
 };
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::{
-        send_load_media_email,
-        WorkerTask,
-    },
+    server::send_load_media_email,
     tape::{
         MediaId,
         drive::{
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 8042de9e..2984173f 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -13,16 +13,16 @@ use anyhow::{bail, Error};
 
 use proxmox::tools::Uuid;
 
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
 use pbs_config::tape_encryption_keys::load_key_configs;
 use pbs_tape::{
     TapeWrite,
     sg_tape::tape_alert_flags_critical,
 };
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::{DataStore, SnapshotReader},
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         MAX_CHUNK_ARCHIVE_SIZE,
-- 
2.30.2





^ permalink raw reply	[flat|nested] 8+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
                   ` (2 preceding siblings ...)
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: " Dietmar Maurer
  2021-09-23 13:20 ` [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Fabian Grünbichler
  5 siblings, 0 replies; 8+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

---
 src/bin/proxmox-daily-update.rs | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/src/bin/proxmox-daily-update.rs b/src/bin/proxmox-daily-update.rs
index 09a768b1..2a04a559 100644
--- a/src/bin/proxmox-daily-update.rs
+++ b/src/bin/proxmox-daily-update.rs
@@ -1,7 +1,8 @@
 use anyhow::Error;
-use serde_json::{json, Value};
+use serde_json::json;
 
 use proxmox::api::{cli::*, RpcEnvironment, ApiHandler};
+use proxmox::tools::fs::CreateOptions;
 
 use proxmox_backup::api2;
 use proxmox_backup::tools::subscription;
@@ -22,7 +23,7 @@ async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
 /// Daily update
 async fn do_update(
     rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+) -> Result<(), Error> {
     let param = json!({});
 
     let method = &api2::node::subscription::API_METHOD_CHECK_SUBSCRIPTION;
@@ -59,7 +60,7 @@ async fn do_update(
 
     // TODO: cleanup tasks like in PVE?
 
-    Ok(Value::Null)
+    Ok(())
 }
 
 async fn check_acme_certificates(rpcenv: &mut dyn RpcEnvironment) -> Result<(), Error> {
@@ -85,13 +86,25 @@ async fn check_acme_certificates(rpcenv: &mut dyn RpcEnvironment) -> Result<(),
     Ok(())
 }
 
+async fn run(rpcenv: &mut dyn RpcEnvironment) -> Result<(), Error> {
+    let backup_user = pbs_config::backup_user()?;
+    let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
+    proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
+    commando_sock.spawn()?;
+
+    do_update(rpcenv).await
+}
+
 fn main() {
     proxmox_backup::tools::setup_safe_path_env();
 
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-    if let Err(err) = pbs_runtime::main(do_update(&mut rpcenv)) {
+    if let Err(err) = pbs_runtime::main(run(&mut rpcenv)) {
         eprintln!("error during update: {}", err);
         std::process::exit(1);
     }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 8+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: setup worker and command socket
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
                   ` (3 preceding siblings ...)
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 13:20 ` [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Fabian Grünbichler
  5 siblings, 0 replies; 8+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

---
 src/bin/proxmox-backup-manager.rs | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b9e4e2ff..a03e1db8 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -5,6 +5,7 @@ use anyhow::{format_err, Error};
 use serde_json::{json, Value};
 
 use proxmox::api::{api, cli::*, RpcEnvironment};
+use proxmox::tools::fs::CreateOptions;
 
 use pbs_client::{connect_to_localhost, display_task_log, view_task_result};
 use pbs_tools::percent_encoding::percent_encode_component;
@@ -359,9 +360,7 @@ async fn get_versions(verbose: bool, param: Value) -> Result<Value, Error> {
     Ok(Value::Null)
 }
 
-fn main() {
-
-    proxmox_backup::tools::setup_safe_path_env();
+async fn run() -> Result<(), Error> {
 
     let cmd_def = CliCommandMap::new()
         .insert("acl", acl_commands())
@@ -401,12 +400,27 @@ fn main() {
             CliCommand::new(&API_METHOD_GET_VERSIONS)
         );
 
+    let backup_user = pbs_config::backup_user()?;
+    let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
+    proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
+    commando_sock.spawn()?;
 
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-   pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
+    run_async_cli_command(cmd_def, rpcenv).await; // this call exit(-1) on error
+
+    Ok(())
+}
+
+fn main() -> Result<(), Error> {
+
+    proxmox_backup::tools::setup_safe_path_env();
+
+    pbs_runtime::main(run())
 }
 
 // shell completion helper
-- 
2.30.2





^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
@ 2021-09-23 11:36   ` Fabian Grünbichler
  0 siblings, 0 replies; 8+ messages in thread
From: Fabian Grünbichler @ 2021-09-23 11:36 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On September 23, 2021 12:13 pm, Dietmar Maurer wrote:
> And application now needs to call init_worker_tasks() before using
> worker tasks.
> 
> Notable changes:
> - need to call  init_worker_tasks() before using worker tasks.
> - create_task_log_dirs() ís called inside init_worker_tasks()
> - removed UpidExt trait
> - use atomic_open_or_create_file()
> - remove pbs_config and pbs_buildcfg dependency
> ---
>  src/api2/node/tasks.rs          |   6 +-
>  src/bin/proxmox-backup-api.rs   |   7 +-
>  src/bin/proxmox-backup-proxy.rs |   5 +-
>  src/server/mod.rs               |   3 -
>  src/server/upid.rs              |  18 --
>  src/server/worker_task.rs       | 475 +++++++++++++++++++-------------
>  6 files changed, 290 insertions(+), 224 deletions(-)
>  delete mode 100644 src/server/upid.rs
> 
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index 169a3d4d..df4673a1 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -16,7 +16,7 @@ use pbs_api_types::{
>  };
>  
>  use crate::api2::pull::check_pull_privs;
> -use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
> +use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
>  use pbs_config::CachedUserInfo;
>  
>  // matches respective job execution privileges
> @@ -220,7 +220,7 @@ async fn get_task_status(
>      if crate::server::worker_is_active(&upid).await? {
>          result["status"] = Value::from("running");
>      } else {
> -        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
> +        let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
>          result["status"] = Value::from("stopped");
>          result["exitstatus"] = Value::from(exitstatus.to_string());
>      };
> @@ -287,7 +287,7 @@ async fn read_task_log(
>  
>      let mut count: u64 = 0;
>  
> -    let path = upid.log_path();
> +    let path = upid_log_path(&upid)?;
>  
>      let file = File::open(path)?;
>  
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 9ad10260..9901b85d 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
>          bail!("unable to inititialize syslog - {}", err);
>      }
>  
> -    server::create_task_log_dirs()?;
> -
>      config::create_configdir()?;
>  
>      config::update_self_signed_cert(false)?;
> @@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
>  
>      config.enable_auth_log(
>          pbs_buildcfg::API_AUTH_LOG_FN,
> -        Some(dir_opts),
> -        Some(file_opts),
> +        Some(dir_opts.clone()),
> +        Some(file_opts.clone()),
>          &mut commando_sock,
>      )?;
>  
>  
>      let rest_server = RestServer::new(config);
> +    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>  
>      // http server future:
>      let server = daemon::create_daemon(
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 518054bf..5d8ed189 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
>  
>      config.enable_auth_log(
>          pbs_buildcfg::API_AUTH_LOG_FN,
> -        Some(dir_opts),
> -        Some(file_opts),
> +        Some(dir_opts.clone()),
> +        Some(file_opts.clone()),
>          &mut commando_sock,
>      )?;
>  
>      let rest_server = RestServer::new(config);
> +    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>  
>      //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
>  
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index a7dcee67..77320da6 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
>      ctrl_sock_from_pid(*PID)
>  }
>  
> -mod upid;
> -pub use upid::*;
> -
>  mod worker_task;
>  pub use worker_task::*;
>  
> diff --git a/src/server/upid.rs b/src/server/upid.rs
> deleted file mode 100644
> index 70a3e3fb..00000000
> --- a/src/server/upid.rs
> +++ /dev/null
> @@ -1,18 +0,0 @@
> -pub trait UPIDExt: private::Sealed {
> -    /// Returns the absolute path to the task log file
> -    fn log_path(&self) -> std::path::PathBuf;
> -}
> -
> -mod private {
> -    pub trait Sealed {}
> -    impl Sealed for  pbs_api_types::UPID {}
> -}
> -
> -impl UPIDExt for  pbs_api_types::UPID {
> -    fn log_path(&self) -> std::path::PathBuf {
> -        let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
> -        path.push(format!("{:02X}", self.pstart % 256));
> -        path.push(self.to_string());
> -        path
> -    }
> -}
> diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
> index 92ab50d7..191d8a44 100644
> --- a/src/server/worker_task.rs
> +++ b/src/server/worker_task.rs
> @@ -1,5 +1,6 @@
>  use std::collections::{HashMap, VecDeque};
>  use std::fs::File;
> +use std::path::PathBuf;
>  use std::io::{Read, Write, BufRead, BufReader};
>  use std::panic::UnwindSafe;
>  use std::sync::atomic::{AtomicBool, Ordering};
> @@ -11,27 +12,267 @@ use lazy_static::lazy_static;
>  use serde_json::{json, Value};
>  use serde::{Serialize, Deserialize};
>  use tokio::sync::oneshot;
> +use nix::fcntl::OFlag;
> +use once_cell::sync::OnceCell;
>  
>  use proxmox::sys::linux::procfs;
>  use proxmox::try_block;
> -use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
> +use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
>  
> -use pbs_buildcfg;
>  use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
>  use pbs_api_types::UPID;
> -use pbs_config::{open_backup_lockfile, BackupLockGuard};
>  use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
>  
> -use super::UPIDExt;
> +struct TaskListLockGuard(File);
>  
> -macro_rules! taskdir {
> -    ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
> +struct WorkerTaskSetup {
> +    file_opts: CreateOptions,
> +    taskdir: PathBuf,
> +    task_lock_fn: PathBuf,
> +    active_tasks_fn: PathBuf,
> +    task_index_fn: PathBuf,
> +    task_archive_fn: PathBuf,
> +}
> +
> +static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
> +
> +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
> +    WORKER_TASK_SETUP.get()
> +        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
> +}
> +
> +impl WorkerTaskSetup {
> +
> +    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
> +
> +        let mut taskdir = basedir.clone();
> +        taskdir.push("tasks");
> +
> +        let mut task_lock_fn = taskdir.clone();
> +        task_lock_fn.push(".active.lock");
> +
> +        let mut active_tasks_fn = taskdir.clone();
> +        active_tasks_fn.push("active");
> +
> +        let mut task_index_fn = taskdir.clone();
> +        task_index_fn.push("index");
> +
> +        let mut task_archive_fn = taskdir.clone();
> +        task_archive_fn.push("archive");
> +
> +        Self {
> +            file_opts,
> +            taskdir,
> +            task_lock_fn,
> +            active_tasks_fn,
> +            task_index_fn,
> +            task_archive_fn,
> +        }
> +    }
> +
> +    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {

since we touch/move this (and thus have to touch all call sites), we 
could take this opportunity to move the locked operations including 
access to the active_task_fn, task_index_fn and task_archive_fn struct 
members to the lock guard (and maybe split it into 
exclusive/non-exclusive guards?) to make misuse impossible? AFAICT all 
the current access is done while holding the lock.

can of course also be done as follow-up in some generic fashion since 
this is a recurring problem, just struck me while reading through the 
rest of the changes.

> +        let options =  self.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +        let timeout = std::time::Duration::new(10, 0);
> +
> +        let file = proxmox::tools::fs::open_file_locked(
> +            &self.task_lock_fn,
> +            timeout,
> +            exclusive,
> +            options,
> +        )?;
> +
> +        Ok(TaskListLockGuard(file))
> +    }
> +
> +    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
> +        let mut path = self.taskdir.clone();
> +        path.push(format!("{:02X}", upid.pstart % 256));
> +        path.push(upid.to_string());
> +        path
> +    }
> +
> +    // atomically read/update the task list, update status of finished tasks
> +    // new_upid is added to the list when specified.
> +    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
> +
> +        let lock = self.lock_task_list_files(true)?;
> +
> +        // TODO remove with 1.x
> +        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
> +        let had_index_file = !finish_list.is_empty();
> +
> +        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> +        // clippy doesn't quite catch this!
> +        #[allow(clippy::unnecessary_filter_map)]
> +        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
> +            .into_iter()
> +            .filter_map(|info| {
> +                if info.state.is_some() {
> +                    // this can happen when the active file still includes finished tasks
> +                    finish_list.push(info);
> +                    return None;
> +                }
> +
> +                if !worker_is_active_local(&info.upid) {
> +                    // println!("Detected stopped task '{}'", &info.upid_str);
> +                    let now = proxmox::tools::time::epoch_i64();
> +                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> +                    finish_list.push(TaskListInfo {
> +                        upid: info.upid,
> +                        upid_str: info.upid_str,
> +                        state: Some(status)
> +                    });
> +                    return None;
> +                }
> +
> +                Some(info)
> +            }).collect();
> +
> +        if let Some(upid) = new_upid {
> +            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> +        }
> +
> +        let active_raw = render_task_list(&active_list);
> +
> +        let options =  self.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +        replace_file(
> +            &self.active_tasks_fn,
> +            active_raw.as_bytes(),
> +            options,
> +        )?;
> +
> +        finish_list.sort_unstable_by(|a, b| {
> +            match (&a.state, &b.state) {
> +                (Some(s1), Some(s2)) => s1.cmp(&s2),
> +                (Some(_), None) => std::cmp::Ordering::Less,
> +                (None, Some(_)) => std::cmp::Ordering::Greater,
> +                _ => a.upid.starttime.cmp(&b.upid.starttime),
> +            }
> +        });
> +
> +        if !finish_list.is_empty() {
> +            let options =  self.file_opts.clone()
> +                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +            let mut writer = atomic_open_or_create_file(
> +                &self.task_archive_fn,
> +                OFlag::O_APPEND | OFlag::O_RDWR,
> +                &[],
> +                options,
> +            )?;
> +            for info in &finish_list {
> +                writer.write_all(render_task_line(&info).as_bytes())?;
> +            }
> +        }
> +
> +        // TODO Remove with 1.x
> +        // for compatibility, if we had an INDEX file, we do not need it anymore
> +        if had_index_file {
> +            let _ = nix::unistd::unlink(&self.task_index_fn);
> +        }
> +
> +        drop(lock);
> +
> +        Ok(())
> +    }
> +
> +    // Create task log directory with correct permissions
> +    fn create_task_log_dirs(&self) -> Result<(), Error> {
> +
> +        try_block!({
> +            let dir_opts = self.file_opts.clone()
> +                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
> +
> +            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
> +            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> +            Ok(())
> +        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
> +    }
> +}
> +
> +/// Initialize the WorkerTask library
> +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
> +    let setup = WorkerTaskSetup::new(basedir, file_opts);
> +    setup.create_task_log_dirs()?;
> +    WORKER_TASK_SETUP.set(setup)
> +        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
> +}
> +
> +/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> +/// rotates it if it is
> +pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> +
> +    let setup = worker_task_setup()?;
> +
> +    let _lock = setup.lock_task_list_files(true)?;
> +
> +    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
> +            .ok_or_else(|| format_err!("could not get archive file names"))?;
> +
> +    logrotate.rotate(size_threshold, None, max_files)
> +}
> +
> +
> +/// Path to the worker log file
> +pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
> +    let setup = worker_task_setup()?;
> +    Ok(setup.log_path(upid))
> +}
> +
> +/// Read endtime (time of last log line) and exitstatus from task log file
> +/// If there is not a single line with at valid datetime, we assume the
> +/// starttime to be the endtime
> +pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> +
> +    let setup = worker_task_setup()?;
> +
> +    let mut status = TaskState::Unknown { endtime: upid.starttime };
> +
> +    let path = setup.log_path(upid);
> +
> +    let mut file = File::open(path)?;
> +
> +    /// speedup - only read tail
> +    use std::io::Seek;
> +    use std::io::SeekFrom;
> +    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> +
> +    let mut data = Vec::with_capacity(8192);
> +    file.read_to_end(&mut data)?;
> +
> +    // strip newlines at the end of the task logs
> +    while data.last() == Some(&b'\n') {
> +        data.pop();
> +    }
> +
> +    let last_line = match data.iter().rposition(|c| *c == b'\n') {
> +        Some(start) if data.len() > (start+1) => &data[start+1..],
> +        Some(_) => &data, // should not happen, since we removed all trailing newlines
> +        None => &data,
> +    };
> +
> +    let last_line = std::str::from_utf8(last_line)
> +        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> +
> +    let mut iter = last_line.splitn(2, ": ");
> +    if let Some(time_str) = iter.next() {
> +        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> +            // set the endtime even if we cannot parse the state
> +            status = TaskState::Unknown { endtime };
> +            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> +                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> +                    status = state;
> +                }
> +            }
> +        }
> +    }
> +
> +    Ok(status)
>  }
> -pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
> -pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
> -pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
> -pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
> -pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
>  
>  lazy_static! {
>      static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
> @@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
>      }
>  }
>  
> -/// Create task log directory with correct permissions
> -pub fn create_task_log_dirs() -> Result<(), Error> {
> -
> -    try_block!({
> -        let backup_user = pbs_config::backup_user()?;
> -        let opts = CreateOptions::new()
> -            .owner(backup_user.uid)
> -            .group(backup_user.gid);
> -
> -        create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
> -        create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
> -        create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> -        Ok(())
> -    }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
> -
> -    Ok(())
> -}
> -
> -/// Read endtime (time of last log line) and exitstatus from task log file
> -/// If there is not a single line with at valid datetime, we assume the
> -/// starttime to be the endtime
> -pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> -
> -    let mut status = TaskState::Unknown { endtime: upid.starttime };
> -
> -    let path = upid.log_path();
> -
> -    let mut file = File::open(path)?;
> -
> -    /// speedup - only read tail
> -    use std::io::Seek;
> -    use std::io::SeekFrom;
> -    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> -
> -    let mut data = Vec::with_capacity(8192);
> -    file.read_to_end(&mut data)?;
> -
> -    // strip newlines at the end of the task logs
> -    while data.last() == Some(&b'\n') {
> -        data.pop();
> -    }
> -
> -    let last_line = match data.iter().rposition(|c| *c == b'\n') {
> -        Some(start) if data.len() > (start+1) => &data[start+1..],
> -        Some(_) => &data, // should not happen, since we removed all trailing newlines
> -        None => &data,
> -    };
> -
> -    let last_line = std::str::from_utf8(last_line)
> -        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> -
> -    let mut iter = last_line.splitn(2, ": ");
> -    if let Some(time_str) = iter.next() {
> -        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> -            // set the endtime even if we cannot parse the state
> -            status = TaskState::Unknown { endtime };
> -            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> -                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> -                    status = state;
> -                }
> -            }
> -        }
> -    }
> -
> -    Ok(status)
> -}
> -
>  /// Task State
>  #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
>  pub enum TaskState {
> @@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
>      }
>  }
>  
> -fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
> -    open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
> -}
> -
> -/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> -/// rotates it if it is
> -pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> -    let _lock = lock_task_list_files(true)?;
> -
> -    let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
> -        .ok_or_else(|| format_err!("could not get archive file names"))?;
> -
> -    logrotate.rotate(size_threshold, None, max_files)
> -}
> -
> -// atomically read/update the task list, update status of finished tasks
> -// new_upid is added to the list when specified.
> -fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
> -
> -    let backup_user = pbs_config::backup_user()?;
> -
> -    let lock = lock_task_list_files(true)?;
> -
> -    // TODO remove with 1.x
> -    let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
> -    let had_index_file = !finish_list.is_empty();
> -
> -    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> -    // clippy doesn't quite catch this!
> -    #[allow(clippy::unnecessary_filter_map)]
> -    let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
> -        .into_iter()
> -        .filter_map(|info| {
> -            if info.state.is_some() {
> -                // this can happen when the active file still includes finished tasks
> -                finish_list.push(info);
> -                return None;
> -            }
> -
> -            if !worker_is_active_local(&info.upid) {
> -                // println!("Detected stopped task '{}'", &info.upid_str);
> -                let now = proxmox::tools::time::epoch_i64();
> -                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> -                finish_list.push(TaskListInfo {
> -                    upid: info.upid,
> -                    upid_str: info.upid_str,
> -                    state: Some(status)
> -                });
> -                return None;
> -            }
> -
> -            Some(info)
> -        }).collect();
> -
> -    if let Some(upid) = new_upid {
> -        active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> -    }
> -
> -    let active_raw = render_task_list(&active_list);
> -
> -    replace_file(
> -        PROXMOX_BACKUP_ACTIVE_TASK_FN,
> -        active_raw.as_bytes(),
> -        CreateOptions::new()
> -            .owner(backup_user.uid)
> -            .group(backup_user.gid),
> -    )?;
> -
> -    finish_list.sort_unstable_by(|a, b| {
> -        match (&a.state, &b.state) {
> -            (Some(s1), Some(s2)) => s1.cmp(&s2),
> -            (Some(_), None) => std::cmp::Ordering::Less,
> -            (None, Some(_)) => std::cmp::Ordering::Greater,
> -            _ => a.upid.starttime.cmp(&b.upid.starttime),
> -        }
> -    });
> -
> -    if !finish_list.is_empty() {
> -        match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
> -            Ok(mut writer) => {
> -                for info in &finish_list {
> -                    writer.write_all(render_task_line(&info).as_bytes())?;
> -                }
> -            },
> -            Err(err) => bail!("could not write task archive - {}", err),
> -        }
> -
> -        nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
> -    }
> -
> -    // TODO Remove with 1.x
> -    // for compatibility, if we had an INDEX file, we do not need it anymore
> -    if had_index_file {
> -        let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
> -    }
> -
> -    drop(lock);
> -
> -    Ok(())
> -}
> -
>  fn render_task_line(info: &TaskListInfo) -> String {
>      let mut raw = String::new();
>      if let Some(status) = &info.state {
> @@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
>      list: VecDeque<TaskListInfo>,
>      end: bool,
>      archive: Option<LogRotateFiles>,
> -    lock: Option<BackupLockGuard>,
> +    lock: Option<TaskListLockGuard>,
>  }
>  
>  impl TaskListInfoIterator {
>      pub fn new(active_only: bool) -> Result<Self, Error> {
> +
> +        let setup = worker_task_setup()?;
> +
>          let (read_lock, active_list) = {
> -            let lock = lock_task_list_files(false)?;
> -            let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> +            let lock = setup.lock_task_list_files(false)?;
> +            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
>  
>              let needs_update = active_list
>                  .iter()
>                  .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
>  
>              // TODO remove with 1.x
> -            let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
> +            let index_exists = setup.task_index_fn.is_file();
>  
>              if needs_update || index_exists {
>                  drop(lock);
> -                update_active_workers(None)?;
> -                let lock = lock_task_list_files(false)?;
> -                let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> +                setup.update_active_workers(None)?;
> +                let lock = setup.lock_task_list_files(false)?;
> +                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
>                  (lock, active_list)
>              } else {
>                  (lock, active_list)
> @@ -516,7 +592,7 @@ impl TaskListInfoIterator {
>          let archive = if active_only {
>              None
>          } else {
> -            let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
> +            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
>                  .ok_or_else(|| format_err!("could not get archive file names"))?;
>              Some(logrotate.files())
>          };
> @@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
>  /// persistently to files. Task should poll the `abort_requested`
>  /// flag, and stop execution when requested.
>  pub struct WorkerTask {
> +    setup: &'static WorkerTaskSetup,
>      upid: UPID,
>      data: Mutex<WorkerTaskData>,
>      abort_requested: AtomicBool,
> @@ -589,17 +666,26 @@ struct WorkerTaskData {
>  
>  impl WorkerTask {
>  
> -    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
> +    pub fn new(
> +        worker_type: &str,
> +        worker_id: Option<String>,
> +        auth_id: String,
> +        to_stdout: bool,
> +    ) -> Result<Arc<Self>, Error> {
> +
> +        let setup = worker_task_setup()?;
> +
>          let upid = UPID::new(worker_type, worker_id, auth_id)?;
>          let task_id = upid.task_id;
>  
> -        let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
> +        let mut path = setup.taskdir.clone();
>  
>          path.push(format!("{:02X}", upid.pstart & 255));
>  
> -        let backup_user = pbs_config::backup_user()?;
> +        let dir_opts = setup.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
>  
> -        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
> +        create_path(&path, None, Some(dir_opts))?;
>  
>          path.push(upid.to_string());
>  
> @@ -608,12 +694,13 @@ impl WorkerTask {
>              exclusive: true,
>              prefix_time: true,
>              read: true,
> +            file_opts: setup.file_opts.clone(),
>              ..Default::default()
>          };
>          let logger = FileLogger::new(&path, logger_options)?;
> -        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
>  
>          let worker = Arc::new(Self {
> +            setup,
>              upid: upid.clone(),
>              abort_requested: AtomicBool::new(false),
>              data: Mutex::new(WorkerTaskData {
> @@ -631,7 +718,7 @@ impl WorkerTask {
>              proxmox_rest_server::set_worker_count(hash.len());
>          }
>  
> -        update_active_workers(Some(&upid))?;
> +        setup.update_active_workers(Some(&upid))?;
>  
>          Ok(worker)
>      }
> @@ -714,7 +801,7 @@ impl WorkerTask {
>          self.log(state.result_text());
>  
>          WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
> -        let _ = update_active_workers(None);
> +        let _ = self.setup.update_active_workers(None);
>          proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
>      }
>  
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 




^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
                   ` (4 preceding siblings ...)
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: " Dietmar Maurer
@ 2021-09-23 13:20 ` Fabian Grünbichler
  5 siblings, 0 replies; 8+ messages in thread
From: Fabian Grünbichler @ 2021-09-23 13:20 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On September 23, 2021 12:13 pm, Dietmar Maurer wrote:
> Because we want to move worker_task.rs into proxmox-rest-server crate.

as discussed off-list, we could use a 

struct AuthidStr(String)

or a trait defined in proxmox instead of a regular String, to avoid 
accidentally passing in other, non authid strings. all Authid 
implementations would then need to implement Into<AuthidStr> (/the 
trait), and the calls below to to_string() would become into() (remain 
as they were).

can also be done as follow-up with a proxmox bump if we want to discuss 
it more in-depth but still go ahead with the split/move now.

> ---
>  pbs-api-types/src/upid.rs        | 13 +++++++------
>  src/api2/admin/datastore.rs      |  6 +++---
>  src/api2/backup/environment.rs   |  2 +-
>  src/api2/backup/mod.rs           |  2 +-
>  src/api2/config/acme.rs          |  6 +++---
>  src/api2/config/datastore.rs     |  4 ++--
>  src/api2/node/apt.rs             |  4 ++--
>  src/api2/node/certificates.rs    |  6 +++---
>  src/api2/node/disks/directory.rs |  4 ++--
>  src/api2/node/disks/mod.rs       |  4 ++--
>  src/api2/node/disks/zfs.rs       |  4 ++--
>  src/api2/node/mod.rs             |  2 +-
>  src/api2/node/network.rs         |  2 +-
>  src/api2/node/services.rs        |  2 +-
>  src/api2/node/tasks.rs           | 15 +++++++++------
>  src/api2/pull.rs                 |  4 ++--
>  src/api2/reader/mod.rs           |  2 +-
>  src/api2/tape/backup.rs          |  4 ++--
>  src/api2/tape/drive.rs           |  2 +-
>  src/api2/tape/restore.rs         |  2 +-
>  src/bin/proxmox-backup-proxy.rs  |  2 +-
>  src/server/gc_job.rs             |  2 +-
>  src/server/prune_job.rs          |  2 +-
>  src/server/verify_job.rs         |  2 +-
>  src/server/worker_task.rs        |  8 ++++----
>  25 files changed, 55 insertions(+), 51 deletions(-)
> 
> diff --git a/pbs-api-types/src/upid.rs b/pbs-api-types/src/upid.rs
> index ba23a646..29135bca 100644
> --- a/pbs-api-types/src/upid.rs
> +++ b/pbs-api-types/src/upid.rs
> @@ -8,8 +8,6 @@ use proxmox::api::schema::{ApiStringFormat, ApiType, Schema, StringSchema, Array
>  use proxmox::const_regex;
>  use proxmox::sys::linux::procfs;
>  
> -use crate::Authid;
> -
>  /// Unique Process/Task Identifier
>  ///
>  /// We use this to uniquely identify worker task. UPIDs have a short
> @@ -37,7 +35,7 @@ pub struct UPID {
>      /// Worker ID (arbitrary ASCII string)
>      pub worker_id: Option<String>,
>      /// The authenticated entity who started the task
> -    pub auth_id: Authid,
> +    pub auth_id: String,
>      /// The node name.
>      pub node: String,
>  }
> @@ -71,7 +69,7 @@ impl UPID {
>      pub fn new(
>          worker_type: &str,
>          worker_id: Option<String>,
> -        auth_id: Authid,
> +        auth_id: String,
>      ) -> Result<Self, Error> {
>  
>          let pid = unsafe { libc::getpid() };
> @@ -82,6 +80,10 @@ impl UPID {
>              bail!("illegal characters in worker type '{}'", worker_type);
>          }
>  
> +        if auth_id.contains(bad) {
> +            bail!("illegal characters in auth_id '{}'", auth_id);
> +        }
> +
>          static WORKER_TASK_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
>  
>          let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst);
> @@ -184,7 +186,7 @@ pub struct TaskListItem {
>      /// Worker ID (arbitrary ASCII string)
>      pub worker_id: Option<String>,
>      /// The authenticated entity who started the task
> -    pub user: Authid,
> +    pub user: String,
>      /// The task end time (Epoch)
>      #[serde(skip_serializing_if="Option::is_none")]
>      pub endtime: Option<i64>,
> @@ -200,4 +202,3 @@ pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
>          &TaskListItem::API_SCHEMA,
>      ).schema(),
>  };
> -
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 0b14dfbf..fbb93f35 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -722,7 +722,7 @@ pub fn verify(
>      let upid_str = WorkerTask::new_thread(
>          worker_type,
>          Some(worker_id),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
> @@ -862,7 +862,7 @@ pub fn prune(
>  
>  
>      // We use a WorkerTask just to have a task log, but run synchrounously
> -    let worker = WorkerTask::new("prune", Some(worker_id), auth_id, true)?;
> +    let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?;
>  
>      if keep_all {
>          worker.log("No prune selection - keeping all files.");
> @@ -957,7 +957,7 @@ pub fn prune_datastore(
>      let upid_str = WorkerTask::new_thread(
>          "prune",
>          Some(store.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| crate::server::prune_datastore(
>              worker.clone(),
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 129ebd2b..306f91ee 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -525,7 +525,7 @@ impl BackupEnvironment {
>          WorkerTask::new_thread(
>              "verify",
>              Some(worker_id),
> -            self.auth_id.clone(),
> +            self.auth_id.to_string(),
>              false,
>              move |worker| {
>                  worker.log("Automatically verifying newly added snapshot");
> diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
> index 8f51f314..c14f19a4 100644
> --- a/src/api2/backup/mod.rs
> +++ b/src/api2/backup/mod.rs
> @@ -166,7 +166,7 @@ async move {
>      if !is_new { bail!("backup directory already exists."); }
>  
>  
> -    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.clone(), true, move |worker| {
> +    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.to_string(), true, move |worker| {
>          let mut env = BackupEnvironment::new(
>              env_type, auth_id, worker.clone(), datastore, backup_dir);
>  
> diff --git a/src/api2/config/acme.rs b/src/api2/config/acme.rs
> index 593b79a3..564cafae 100644
> --- a/src/api2/config/acme.rs
> +++ b/src/api2/config/acme.rs
> @@ -215,7 +215,7 @@ fn register_account(
>      WorkerTask::spawn(
>          "acme-register",
>          Some(name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          true,
>          move |worker| async move {
>              let mut client = AcmeClient::new(directory);
> @@ -275,7 +275,7 @@ pub fn update_account(
>      WorkerTask::spawn(
>          "acme-update",
>          Some(name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          true,
>          move |_worker| async move {
>              let data = match contact {
> @@ -320,7 +320,7 @@ pub fn deactivate_account(
>      WorkerTask::spawn(
>          "acme-deactivate",
>          Some(name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          true,
>          move |worker| async move {
>              match AcmeClient::load(&name)
> diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
> index c6036fc3..0e6529f8 100644
> --- a/src/api2/config/datastore.rs
> +++ b/src/api2/config/datastore.rs
> @@ -119,9 +119,9 @@ pub fn create_datastore(
>      WorkerTask::new_thread(
>          "create-datastore",
>          Some(config.name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          to_stdout,
> -        move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
> +       move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
>      )
>  }
>  
> diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
> index 8f4bc691..f02920c0 100644
> --- a/src/api2/node/apt.rs
> +++ b/src/api2/node/apt.rs
> @@ -14,7 +14,7 @@ use proxmox_apt::repositories::{
>  use proxmox_http::ProxyConfig;
>  
>  use pbs_api_types::{
> -    Authid, APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
> +    APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
>      PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
>  };
>  
> @@ -154,7 +154,7 @@ pub fn apt_update_database(
>      rpcenv: &mut dyn RpcEnvironment,
>  ) -> Result<String, Error> {
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
>      let upid_str = WorkerTask::new_thread("aptupdate", None, auth_id, to_stdout, move |worker| {
> diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs
> index 82fa028d..7b31861e 100644
> --- a/src/api2/node/certificates.rs
> +++ b/src/api2/node/certificates.rs
> @@ -11,7 +11,7 @@ use proxmox::api::router::SubdirMap;
>  use proxmox::api::{api, Permission, Router, RpcEnvironment};
>  use proxmox::list_subdirs_api_method;
>  
> -use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_MODIFY};
> +use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY};
>  use pbs_buildcfg::configdir;
>  use pbs_tools::cert;
>  
> @@ -530,7 +530,7 @@ fn spawn_certificate_worker(
>  
>      let (node_config, _digest) = crate::config::node::config()?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      WorkerTask::spawn(name, None, auth_id, true, move |worker| async move {
>          if let Some(cert) = order_certificate(worker, &node_config).await? {
> @@ -559,7 +559,7 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result<String, Error
>  
>      let cert_pem = get_certificate_pem()?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      WorkerTask::spawn(
>          "acme-revoke-cert",
> diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
> index 38809dcf..2f4a738d 100644
> --- a/src/api2/node/disks/directory.rs
> +++ b/src/api2/node/disks/directory.rs
> @@ -7,7 +7,7 @@ use proxmox::api::section_config::SectionConfigData;
>  use proxmox::api::router::Router;
>  
>  use pbs_api_types::{
> -    Authid, DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
> +    DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
>      DATASTORE_SCHEMA, UPID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
>  };
>  
> @@ -146,7 +146,7 @@ pub fn create_datastore_disk(
>  
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      let info = get_disk_usage_info(&disk, true)?;
>  
> diff --git a/src/api2/node/disks/mod.rs b/src/api2/node/disks/mod.rs
> index 67f8f63a..b4001a54 100644
> --- a/src/api2/node/disks/mod.rs
> +++ b/src/api2/node/disks/mod.rs
> @@ -7,7 +7,7 @@ use proxmox::{sortable, identity};
>  use proxmox::{list_subdirs_api_method};
>  
>  use pbs_api_types::{
> -    Authid, UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
> +    UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
>      PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
>  };
>  
> @@ -144,7 +144,7 @@ pub fn initialize_disk(
>  
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      let info = get_disk_usage_info(&disk, true)?;
>  
> diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
> index 14c2cfec..9fe0dac4 100644
> --- a/src/api2/node/disks/zfs.rs
> +++ b/src/api2/node/disks/zfs.rs
> @@ -8,7 +8,7 @@ use proxmox::api::{
>  use proxmox::api::router::Router;
>  
>  use pbs_api_types::{
> -    Authid, ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
> +    ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
>      NODE_SCHEMA, ZPOOL_NAME_SCHEMA, DATASTORE_SCHEMA, DISK_ARRAY_SCHEMA,
>      DISK_LIST_SCHEMA, ZFS_ASHIFT_SCHEMA, UPID_SCHEMA,
>      PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
> @@ -168,7 +168,7 @@ pub fn create_zpool(
>  
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      let add_datastore = add_datastore.unwrap_or(false);
>  
> diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
> index 9b31d595..8e357311 100644
> --- a/src/api2/node/mod.rs
> +++ b/src/api2/node/mod.rs
> @@ -146,7 +146,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
>      let upid = WorkerTask::spawn(
>          "termproxy",
>          None,
> -        auth_id,
> +        auth_id.to_string(),
>          false,
>          move |worker| async move {
>              // move inside the worker so that it survives and does not close the port
> diff --git a/src/api2/node/network.rs b/src/api2/node/network.rs
> index 351fd11c..0fde9f2a 100644
> --- a/src/api2/node/network.rs
> +++ b/src/api2/node/network.rs
> @@ -703,7 +703,7 @@ pub async fn reload_network_config(
>  
>      let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>  
> -    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id, true, |_worker| async {
> +    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id.to_string(), true, |_worker| async {
>  
>          let _ = std::fs::rename(network::NETWORK_INTERFACES_NEW_FILENAME, network::NETWORK_INTERFACES_FILENAME);
>  
> diff --git a/src/api2/node/services.rs b/src/api2/node/services.rs
> index 25edd1b6..6c757f43 100644
> --- a/src/api2/node/services.rs
> +++ b/src/api2/node/services.rs
> @@ -195,7 +195,7 @@ fn run_service_command(service: &str, cmd: &str, auth_id: Authid) -> Result<Valu
>      let upid = WorkerTask::new_thread(
>          &workerid,
>          Some(service.clone()),
> -        auth_id,
> +        auth_id.to_string(),
>          false,
>          move |_worker| {
>  
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index 7066f889..169a3d4d 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -99,8 +99,8 @@ fn check_job_store(upid: &UPID, store: &str) -> bool {
>  }
>  
>  fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
> -    let task_auth_id = &upid.auth_id;
> -    if auth_id == task_auth_id
> +    let task_auth_id: Authid = upid.auth_id.parse()?;
> +    if auth_id == &task_auth_id
>          || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
>          // task owner can always read
>          Ok(())
> @@ -200,6 +200,8 @@ async fn get_task_status(
>      let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>      check_task_access(&auth_id, &upid)?;
>  
> +    let task_auth_id: Authid = upid.auth_id.parse()?;
> +
>      let mut result = json!({
>          "upid": param["upid"],
>          "node": upid.node,
> @@ -208,11 +210,11 @@ async fn get_task_status(
>          "starttime": upid.starttime,
>          "type": upid.worker_type,
>          "id": upid.worker_id,
> -        "user": upid.auth_id.user(),
> +        "user": task_auth_id.user(),
>      });
>  
> -    if upid.auth_id.is_token() {
> -        result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str());
> +    if task_auth_id.is_token() {
> +        result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
>      }
>  
>      if crate::server::worker_is_active(&upid).await? {
> @@ -344,10 +346,11 @@ fn stop_task(
>  
>      let upid = extract_upid(&param)?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      if auth_id != upid.auth_id {
>          let user_info = CachedUserInfo::new()?;
> +        let auth_id: Authid = auth_id.parse()?;
>          user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
>      }
>  
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index 1eb86ea3..e631920f 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -81,7 +81,7 @@ pub fn do_sync_job(
>      let upid_str = WorkerTask::spawn(
>          &worker_type,
>          Some(job_id.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| async move {
>  
> @@ -183,7 +183,7 @@ async fn pull (
>      let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
>  
>      // fixme: set to_stdout to false?
> -    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.clone(), true, move |worker| async move {
> +    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
>  
>          worker.log(format!("sync datastore '{}' start", store));
>  
> diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
> index 821e83c4..fada952c 100644
> --- a/src/api2/reader/mod.rs
> +++ b/src/api2/reader/mod.rs
> @@ -143,7 +143,7 @@ fn upgrade_to_backup_reader_protocol(
>  
>          let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
>  
> -        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
> +        WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move {
>              let _guard = _guard;
>  
>              let mut env = ReaderEnvironment::new(
> diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
> index 6b533820..fadbfa3d 100644
> --- a/src/api2/tape/backup.rs
> +++ b/src/api2/tape/backup.rs
> @@ -195,7 +195,7 @@ pub fn do_tape_backup_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(job_id.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> @@ -376,7 +376,7 @@ pub fn backup(
>      let upid_str = WorkerTask::new_thread(
>          "tape-backup",
>          Some(job_id),
> -        auth_id,
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              let _drive_lock = drive_lock; // keep lock guard
> diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
> index 58f49f43..10aa6842 100644
> --- a/src/api2/tape/drive.rs
> +++ b/src/api2/tape/drive.rs
> @@ -87,7 +87,7 @@ where
>      let (config, _digest) = pbs_config::drive::config()?;
>      let lock_guard = lock_tape_device(&config, &drive)?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
>      WorkerTask::new_thread(worker_type, job_id, auth_id, to_stdout, move |worker| {
> diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
> index 4ab60e8f..7739d1a4 100644
> --- a/src/api2/tape/restore.rs
> +++ b/src/api2/tape/restore.rs
> @@ -275,7 +275,7 @@ pub fn restore(
>      let upid_str = WorkerTask::new_thread(
>          "tape-restore",
>          Some(taskid),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              let _drive_lock = drive_lock; // keep lock guard
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 6734525b..518054bf 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -745,7 +745,7 @@ async fn schedule_task_log_rotate() {
>      if let Err(err) = WorkerTask::new_thread(
>          worker_type,
>          None,
> -        Authid::root_auth_id().clone(),
> +        Authid::root_auth_id().to_string(),
>          false,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
> index 665b9631..317f4a36 100644
> --- a/src/server/gc_job.rs
> +++ b/src/server/gc_job.rs
> @@ -26,7 +26,7 @@ pub fn do_garbage_collection_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(store.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
> index f298a74c..8d971a1c 100644
> --- a/src/server/prune_job.rs
> +++ b/src/server/prune_job.rs
> @@ -105,7 +105,7 @@ pub fn do_prune_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(job.jobname().to_string()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          false,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
> index 7f6d73ff..6005b706 100644
> --- a/src/server/verify_job.rs
> +++ b/src/server/verify_job.rs
> @@ -36,7 +36,7 @@ pub fn do_verification_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(job_id.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
> index a74b18e1..92ab50d7 100644
> --- a/src/server/worker_task.rs
> +++ b/src/server/worker_task.rs
> @@ -18,7 +18,7 @@ use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
>  
>  use pbs_buildcfg;
>  use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
> -use pbs_api_types::{Authid, UPID};
> +use pbs_api_types::UPID;
>  use pbs_config::{open_backup_lockfile, BackupLockGuard};
>  use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
>  
> @@ -589,7 +589,7 @@ struct WorkerTaskData {
>  
>  impl WorkerTask {
>  
> -    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
> +    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
>          let upid = UPID::new(worker_type, worker_id, auth_id)?;
>          let task_id = upid.task_id;
>  
> @@ -640,7 +640,7 @@ impl WorkerTask {
>      pub fn spawn<F, T>(
>          worker_type: &str,
>          worker_id: Option<String>,
> -        auth_id: Authid,
> +        auth_id: String,
>          to_stdout: bool,
>          f: F,
>      ) -> Result<String, Error>
> @@ -662,7 +662,7 @@ impl WorkerTask {
>      pub fn new_thread<F>(
>          worker_type: &str,
>          worker_id: Option<String>,
> -        auth_id: Authid,
> +        auth_id: String,
>          to_stdout: bool,
>          f: F,
>      ) -> Result<String, Error>
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2021-09-23 13:21 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
2021-09-23 11:36   ` Fabian Grünbichler
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4 Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: " Dietmar Maurer
2021-09-23 13:20 ` [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Fabian Grünbichler

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal