all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid
@ 2021-09-23 10:13 Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
                   ` (5 more replies)
  0 siblings, 6 replies; 9+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

Because we want to move worker_task.rs into proxmox-rest-server crate.
---
 pbs-api-types/src/upid.rs        | 13 +++++++------
 src/api2/admin/datastore.rs      |  6 +++---
 src/api2/backup/environment.rs   |  2 +-
 src/api2/backup/mod.rs           |  2 +-
 src/api2/config/acme.rs          |  6 +++---
 src/api2/config/datastore.rs     |  4 ++--
 src/api2/node/apt.rs             |  4 ++--
 src/api2/node/certificates.rs    |  6 +++---
 src/api2/node/disks/directory.rs |  4 ++--
 src/api2/node/disks/mod.rs       |  4 ++--
 src/api2/node/disks/zfs.rs       |  4 ++--
 src/api2/node/mod.rs             |  2 +-
 src/api2/node/network.rs         |  2 +-
 src/api2/node/services.rs        |  2 +-
 src/api2/node/tasks.rs           | 15 +++++++++------
 src/api2/pull.rs                 |  4 ++--
 src/api2/reader/mod.rs           |  2 +-
 src/api2/tape/backup.rs          |  4 ++--
 src/api2/tape/drive.rs           |  2 +-
 src/api2/tape/restore.rs         |  2 +-
 src/bin/proxmox-backup-proxy.rs  |  2 +-
 src/server/gc_job.rs             |  2 +-
 src/server/prune_job.rs          |  2 +-
 src/server/verify_job.rs         |  2 +-
 src/server/worker_task.rs        |  8 ++++----
 25 files changed, 55 insertions(+), 51 deletions(-)

diff --git a/pbs-api-types/src/upid.rs b/pbs-api-types/src/upid.rs
index ba23a646..29135bca 100644
--- a/pbs-api-types/src/upid.rs
+++ b/pbs-api-types/src/upid.rs
@@ -8,8 +8,6 @@ use proxmox::api::schema::{ApiStringFormat, ApiType, Schema, StringSchema, Array
 use proxmox::const_regex;
 use proxmox::sys::linux::procfs;
 
-use crate::Authid;
-
 /// Unique Process/Task Identifier
 ///
 /// We use this to uniquely identify worker task. UPIDs have a short
@@ -37,7 +35,7 @@ pub struct UPID {
     /// Worker ID (arbitrary ASCII string)
     pub worker_id: Option<String>,
     /// The authenticated entity who started the task
-    pub auth_id: Authid,
+    pub auth_id: String,
     /// The node name.
     pub node: String,
 }
@@ -71,7 +69,7 @@ impl UPID {
     pub fn new(
         worker_type: &str,
         worker_id: Option<String>,
-        auth_id: Authid,
+        auth_id: String,
     ) -> Result<Self, Error> {
 
         let pid = unsafe { libc::getpid() };
@@ -82,6 +80,10 @@ impl UPID {
             bail!("illegal characters in worker type '{}'", worker_type);
         }
 
+        if auth_id.contains(bad) {
+            bail!("illegal characters in auth_id '{}'", auth_id);
+        }
+
         static WORKER_TASK_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
 
         let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst);
@@ -184,7 +186,7 @@ pub struct TaskListItem {
     /// Worker ID (arbitrary ASCII string)
     pub worker_id: Option<String>,
     /// The authenticated entity who started the task
-    pub user: Authid,
+    pub user: String,
     /// The task end time (Epoch)
     #[serde(skip_serializing_if="Option::is_none")]
     pub endtime: Option<i64>,
@@ -200,4 +202,3 @@ pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
         &TaskListItem::API_SCHEMA,
     ).schema(),
 };
-
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 0b14dfbf..fbb93f35 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -722,7 +722,7 @@ pub fn verify(
     let upid_str = WorkerTask::new_thread(
         worker_type,
         Some(worker_id),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
@@ -862,7 +862,7 @@ pub fn prune(
 
 
     // We use a WorkerTask just to have a task log, but run synchrounously
-    let worker = WorkerTask::new("prune", Some(worker_id), auth_id, true)?;
+    let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?;
 
     if keep_all {
         worker.log("No prune selection - keeping all files.");
@@ -957,7 +957,7 @@ pub fn prune_datastore(
     let upid_str = WorkerTask::new_thread(
         "prune",
         Some(store.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| crate::server::prune_datastore(
             worker.clone(),
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 129ebd2b..306f91ee 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -525,7 +525,7 @@ impl BackupEnvironment {
         WorkerTask::new_thread(
             "verify",
             Some(worker_id),
-            self.auth_id.clone(),
+            self.auth_id.to_string(),
             false,
             move |worker| {
                 worker.log("Automatically verifying newly added snapshot");
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index 8f51f314..c14f19a4 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -166,7 +166,7 @@ async move {
     if !is_new { bail!("backup directory already exists."); }
 
 
-    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.clone(), true, move |worker| {
+    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.to_string(), true, move |worker| {
         let mut env = BackupEnvironment::new(
             env_type, auth_id, worker.clone(), datastore, backup_dir);
 
diff --git a/src/api2/config/acme.rs b/src/api2/config/acme.rs
index 593b79a3..564cafae 100644
--- a/src/api2/config/acme.rs
+++ b/src/api2/config/acme.rs
@@ -215,7 +215,7 @@ fn register_account(
     WorkerTask::spawn(
         "acme-register",
         Some(name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         true,
         move |worker| async move {
             let mut client = AcmeClient::new(directory);
@@ -275,7 +275,7 @@ pub fn update_account(
     WorkerTask::spawn(
         "acme-update",
         Some(name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         true,
         move |_worker| async move {
             let data = match contact {
@@ -320,7 +320,7 @@ pub fn deactivate_account(
     WorkerTask::spawn(
         "acme-deactivate",
         Some(name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         true,
         move |worker| async move {
             match AcmeClient::load(&name)
diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
index c6036fc3..0e6529f8 100644
--- a/src/api2/config/datastore.rs
+++ b/src/api2/config/datastore.rs
@@ -119,9 +119,9 @@ pub fn create_datastore(
     WorkerTask::new_thread(
         "create-datastore",
         Some(config.name.to_string()),
-        auth_id,
+        auth_id.to_string(),
         to_stdout,
-        move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
+       move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
     )
 }
 
diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
index 8f4bc691..f02920c0 100644
--- a/src/api2/node/apt.rs
+++ b/src/api2/node/apt.rs
@@ -14,7 +14,7 @@ use proxmox_apt::repositories::{
 use proxmox_http::ProxyConfig;
 
 use pbs_api_types::{
-    Authid, APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
+    APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
 };
 
@@ -154,7 +154,7 @@ pub fn apt_update_database(
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
     let upid_str = WorkerTask::new_thread("aptupdate", None, auth_id, to_stdout, move |worker| {
diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs
index 82fa028d..7b31861e 100644
--- a/src/api2/node/certificates.rs
+++ b/src/api2/node/certificates.rs
@@ -11,7 +11,7 @@ use proxmox::api::router::SubdirMap;
 use proxmox::api::{api, Permission, Router, RpcEnvironment};
 use proxmox::list_subdirs_api_method;
 
-use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_MODIFY};
+use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY};
 use pbs_buildcfg::configdir;
 use pbs_tools::cert;
 
@@ -530,7 +530,7 @@ fn spawn_certificate_worker(
 
     let (node_config, _digest) = crate::config::node::config()?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     WorkerTask::spawn(name, None, auth_id, true, move |worker| async move {
         if let Some(cert) = order_certificate(worker, &node_config).await? {
@@ -559,7 +559,7 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result<String, Error
 
     let cert_pem = get_certificate_pem()?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     WorkerTask::spawn(
         "acme-revoke-cert",
diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
index 38809dcf..2f4a738d 100644
--- a/src/api2/node/disks/directory.rs
+++ b/src/api2/node/disks/directory.rs
@@ -7,7 +7,7 @@ use proxmox::api::section_config::SectionConfigData;
 use proxmox::api::router::Router;
 
 use pbs_api_types::{
-    Authid, DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
+    DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
     DATASTORE_SCHEMA, UPID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
 };
 
@@ -146,7 +146,7 @@ pub fn create_datastore_disk(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     let info = get_disk_usage_info(&disk, true)?;
 
diff --git a/src/api2/node/disks/mod.rs b/src/api2/node/disks/mod.rs
index 67f8f63a..b4001a54 100644
--- a/src/api2/node/disks/mod.rs
+++ b/src/api2/node/disks/mod.rs
@@ -7,7 +7,7 @@ use proxmox::{sortable, identity};
 use proxmox::{list_subdirs_api_method};
 
 use pbs_api_types::{
-    Authid, UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
+    UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
 };
 
@@ -144,7 +144,7 @@ pub fn initialize_disk(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     let info = get_disk_usage_info(&disk, true)?;
 
diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
index 14c2cfec..9fe0dac4 100644
--- a/src/api2/node/disks/zfs.rs
+++ b/src/api2/node/disks/zfs.rs
@@ -8,7 +8,7 @@ use proxmox::api::{
 use proxmox::api::router::Router;
 
 use pbs_api_types::{
-    Authid, ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
+    ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
     NODE_SCHEMA, ZPOOL_NAME_SCHEMA, DATASTORE_SCHEMA, DISK_ARRAY_SCHEMA,
     DISK_LIST_SCHEMA, ZFS_ASHIFT_SCHEMA, UPID_SCHEMA,
     PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
@@ -168,7 +168,7 @@ pub fn create_zpool(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     let add_datastore = add_datastore.unwrap_or(false);
 
diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
index 9b31d595..8e357311 100644
--- a/src/api2/node/mod.rs
+++ b/src/api2/node/mod.rs
@@ -146,7 +146,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
     let upid = WorkerTask::spawn(
         "termproxy",
         None,
-        auth_id,
+        auth_id.to_string(),
         false,
         move |worker| async move {
             // move inside the worker so that it survives and does not close the port
diff --git a/src/api2/node/network.rs b/src/api2/node/network.rs
index 351fd11c..0fde9f2a 100644
--- a/src/api2/node/network.rs
+++ b/src/api2/node/network.rs
@@ -703,7 +703,7 @@ pub async fn reload_network_config(
 
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
-    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id, true, |_worker| async {
+    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id.to_string(), true, |_worker| async {
 
         let _ = std::fs::rename(network::NETWORK_INTERFACES_NEW_FILENAME, network::NETWORK_INTERFACES_FILENAME);
 
diff --git a/src/api2/node/services.rs b/src/api2/node/services.rs
index 25edd1b6..6c757f43 100644
--- a/src/api2/node/services.rs
+++ b/src/api2/node/services.rs
@@ -195,7 +195,7 @@ fn run_service_command(service: &str, cmd: &str, auth_id: Authid) -> Result<Valu
     let upid = WorkerTask::new_thread(
         &workerid,
         Some(service.clone()),
-        auth_id,
+        auth_id.to_string(),
         false,
         move |_worker| {
 
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 7066f889..169a3d4d 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -99,8 +99,8 @@ fn check_job_store(upid: &UPID, store: &str) -> bool {
 }
 
 fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
-    let task_auth_id = &upid.auth_id;
-    if auth_id == task_auth_id
+    let task_auth_id: Authid = upid.auth_id.parse()?;
+    if auth_id == &task_auth_id
         || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
         // task owner can always read
         Ok(())
@@ -200,6 +200,8 @@ async fn get_task_status(
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     check_task_access(&auth_id, &upid)?;
 
+    let task_auth_id: Authid = upid.auth_id.parse()?;
+
     let mut result = json!({
         "upid": param["upid"],
         "node": upid.node,
@@ -208,11 +210,11 @@ async fn get_task_status(
         "starttime": upid.starttime,
         "type": upid.worker_type,
         "id": upid.worker_id,
-        "user": upid.auth_id.user(),
+        "user": task_auth_id.user(),
     });
 
-    if upid.auth_id.is_token() {
-        result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str());
+    if task_auth_id.is_token() {
+        result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
     }
 
     if crate::server::worker_is_active(&upid).await? {
@@ -344,10 +346,11 @@ fn stop_task(
 
     let upid = extract_upid(&param)?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
 
     if auth_id != upid.auth_id {
         let user_info = CachedUserInfo::new()?;
+        let auth_id: Authid = auth_id.parse()?;
         user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
     }
 
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 1eb86ea3..e631920f 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -81,7 +81,7 @@ pub fn do_sync_job(
     let upid_str = WorkerTask::spawn(
         &worker_type,
         Some(job_id.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| async move {
 
@@ -183,7 +183,7 @@ async fn pull (
     let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
 
     // fixme: set to_stdout to false?
-    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.clone(), true, move |worker| async move {
+    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
 
         worker.log(format!("sync datastore '{}' start", store));
 
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 821e83c4..fada952c 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -143,7 +143,7 @@ fn upgrade_to_backup_reader_protocol(
 
         let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
 
-        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
+        WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move {
             let _guard = _guard;
 
             let mut env = ReaderEnvironment::new(
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index 6b533820..fadbfa3d 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -195,7 +195,7 @@ pub fn do_tape_backup_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(job_id.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             job.start(&worker.upid().to_string())?;
@@ -376,7 +376,7 @@ pub fn backup(
     let upid_str = WorkerTask::new_thread(
         "tape-backup",
         Some(job_id),
-        auth_id,
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             let _drive_lock = drive_lock; // keep lock guard
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index 58f49f43..10aa6842 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -87,7 +87,7 @@ where
     let (config, _digest) = pbs_config::drive::config()?;
     let lock_guard = lock_tape_device(&config, &drive)?;
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let auth_id = rpcenv.get_auth_id().unwrap();
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
     WorkerTask::new_thread(worker_type, job_id, auth_id, to_stdout, move |worker| {
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 4ab60e8f..7739d1a4 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -275,7 +275,7 @@ pub fn restore(
     let upid_str = WorkerTask::new_thread(
         "tape-restore",
         Some(taskid),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             let _drive_lock = drive_lock; // keep lock guard
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 6734525b..518054bf 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -745,7 +745,7 @@ async fn schedule_task_log_rotate() {
     if let Err(err) = WorkerTask::new_thread(
         worker_type,
         None,
-        Authid::root_auth_id().clone(),
+        Authid::root_auth_id().to_string(),
         false,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 665b9631..317f4a36 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -26,7 +26,7 @@ pub fn do_garbage_collection_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(store.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index f298a74c..8d971a1c 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -105,7 +105,7 @@ pub fn do_prune_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(job.jobname().to_string()),
-        auth_id.clone(),
+        auth_id.to_string(),
         false,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index 7f6d73ff..6005b706 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -36,7 +36,7 @@ pub fn do_verification_job(
     let upid_str = WorkerTask::new_thread(
         &worker_type,
         Some(job_id.clone()),
-        auth_id.clone(),
+        auth_id.to_string(),
         to_stdout,
         move |worker| {
             job.start(&worker.upid().to_string())?;
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index a74b18e1..92ab50d7 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -18,7 +18,7 @@ use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
 
 use pbs_buildcfg;
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use pbs_api_types::{Authid, UPID};
+use pbs_api_types::UPID;
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
@@ -589,7 +589,7 @@ struct WorkerTaskData {
 
 impl WorkerTask {
 
-    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
+    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
         let upid = UPID::new(worker_type, worker_id, auth_id)?;
         let task_id = upid.task_id;
 
@@ -640,7 +640,7 @@ impl WorkerTask {
     pub fn spawn<F, T>(
         worker_type: &str,
         worker_id: Option<String>,
-        auth_id: Authid,
+        auth_id: String,
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
@@ -662,7 +662,7 @@ impl WorkerTask {
     pub fn new_thread<F>(
         worker_type: &str,
         worker_id: Option<String>,
-        auth_id: Authid,
+        auth_id: String,
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 11:36   ` Fabian Grünbichler
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4 Dietmar Maurer
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 9+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

And application now needs to call init_worker_tasks() before using
worker tasks.

Notable changes:
- need to call  init_worker_tasks() before using worker tasks.
- create_task_log_dirs() ís called inside init_worker_tasks()
- removed UpidExt trait
- use atomic_open_or_create_file()
- remove pbs_config and pbs_buildcfg dependency
---
 src/api2/node/tasks.rs          |   6 +-
 src/bin/proxmox-backup-api.rs   |   7 +-
 src/bin/proxmox-backup-proxy.rs |   5 +-
 src/server/mod.rs               |   3 -
 src/server/upid.rs              |  18 --
 src/server/worker_task.rs       | 475 +++++++++++++++++++-------------
 6 files changed, 290 insertions(+), 224 deletions(-)
 delete mode 100644 src/server/upid.rs

diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 169a3d4d..df4673a1 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -16,7 +16,7 @@ use pbs_api_types::{
 };
 
 use crate::api2::pull::check_pull_privs;
-use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
+use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
 use pbs_config::CachedUserInfo;
 
 // matches respective job execution privileges
@@ -220,7 +220,7 @@ async fn get_task_status(
     if crate::server::worker_is_active(&upid).await? {
         result["status"] = Value::from("running");
     } else {
-        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
+        let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
         result["status"] = Value::from("stopped");
         result["exitstatus"] = Value::from(exitstatus.to_string());
     };
@@ -287,7 +287,7 @@ async fn read_task_log(
 
     let mut count: u64 = 0;
 
-    let path = upid.log_path();
+    let path = upid_log_path(&upid)?;
 
     let file = File::open(path)?;
 
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 9ad10260..9901b85d 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
         bail!("unable to inititialize syslog - {}", err);
     }
 
-    server::create_task_log_dirs()?;
-
     config::create_configdir()?;
 
     config::update_self_signed_cert(false)?;
@@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
 
     config.enable_auth_log(
         pbs_buildcfg::API_AUTH_LOG_FN,
-        Some(dir_opts),
-        Some(file_opts),
+        Some(dir_opts.clone()),
+        Some(file_opts.clone()),
         &mut commando_sock,
     )?;
 
 
     let rest_server = RestServer::new(config);
+    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     // http server future:
     let server = daemon::create_daemon(
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 518054bf..5d8ed189 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
 
     config.enable_auth_log(
         pbs_buildcfg::API_AUTH_LOG_FN,
-        Some(dir_opts),
-        Some(file_opts),
+        Some(dir_opts.clone()),
+        Some(file_opts.clone()),
         &mut commando_sock,
     )?;
 
     let rest_server = RestServer::new(config);
+    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
 
diff --git a/src/server/mod.rs b/src/server/mod.rs
index a7dcee67..77320da6 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
     ctrl_sock_from_pid(*PID)
 }
 
-mod upid;
-pub use upid::*;
-
 mod worker_task;
 pub use worker_task::*;
 
diff --git a/src/server/upid.rs b/src/server/upid.rs
deleted file mode 100644
index 70a3e3fb..00000000
--- a/src/server/upid.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-pub trait UPIDExt: private::Sealed {
-    /// Returns the absolute path to the task log file
-    fn log_path(&self) -> std::path::PathBuf;
-}
-
-mod private {
-    pub trait Sealed {}
-    impl Sealed for  pbs_api_types::UPID {}
-}
-
-impl UPIDExt for  pbs_api_types::UPID {
-    fn log_path(&self) -> std::path::PathBuf {
-        let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
-        path.push(format!("{:02X}", self.pstart % 256));
-        path.push(self.to_string());
-        path
-    }
-}
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 92ab50d7..191d8a44 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -1,5 +1,6 @@
 use std::collections::{HashMap, VecDeque};
 use std::fs::File;
+use std::path::PathBuf;
 use std::io::{Read, Write, BufRead, BufReader};
 use std::panic::UnwindSafe;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -11,27 +12,267 @@ use lazy_static::lazy_static;
 use serde_json::{json, Value};
 use serde::{Serialize, Deserialize};
 use tokio::sync::oneshot;
+use nix::fcntl::OFlag;
+use once_cell::sync::OnceCell;
 
 use proxmox::sys::linux::procfs;
 use proxmox::try_block;
-use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
+use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
 
-use pbs_buildcfg;
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
 use pbs_api_types::UPID;
-use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
-use super::UPIDExt;
+struct TaskListLockGuard(File);
 
-macro_rules! taskdir {
-    ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+struct WorkerTaskSetup {
+    file_opts: CreateOptions,
+    taskdir: PathBuf,
+    task_lock_fn: PathBuf,
+    active_tasks_fn: PathBuf,
+    task_index_fn: PathBuf,
+    task_archive_fn: PathBuf,
+}
+
+static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
+
+fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
+    WORKER_TASK_SETUP.get()
+        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
+}
+
+impl WorkerTaskSetup {
+
+    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
+
+        let mut taskdir = basedir.clone();
+        taskdir.push("tasks");
+
+        let mut task_lock_fn = taskdir.clone();
+        task_lock_fn.push(".active.lock");
+
+        let mut active_tasks_fn = taskdir.clone();
+        active_tasks_fn.push("active");
+
+        let mut task_index_fn = taskdir.clone();
+        task_index_fn.push("index");
+
+        let mut task_archive_fn = taskdir.clone();
+        task_archive_fn.push("archive");
+
+        Self {
+            file_opts,
+            taskdir,
+            task_lock_fn,
+            active_tasks_fn,
+            task_index_fn,
+            task_archive_fn,
+        }
+    }
+
+    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        let timeout = std::time::Duration::new(10, 0);
+
+        let file = proxmox::tools::fs::open_file_locked(
+            &self.task_lock_fn,
+            timeout,
+            exclusive,
+            options,
+        )?;
+
+        Ok(TaskListLockGuard(file))
+    }
+
+    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
+        let mut path = self.taskdir.clone();
+        path.push(format!("{:02X}", upid.pstart % 256));
+        path.push(upid.to_string());
+        path
+    }
+
+    // atomically read/update the task list, update status of finished tasks
+    // new_upid is added to the list when specified.
+    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
+
+        let lock = self.lock_task_list_files(true)?;
+
+        // TODO remove with 1.x
+        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
+        let had_index_file = !finish_list.is_empty();
+
+        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+        // clippy doesn't quite catch this!
+        #[allow(clippy::unnecessary_filter_map)]
+        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
+            .into_iter()
+            .filter_map(|info| {
+                if info.state.is_some() {
+                    // this can happen when the active file still includes finished tasks
+                    finish_list.push(info);
+                    return None;
+                }
+
+                if !worker_is_active_local(&info.upid) {
+                    // println!("Detected stopped task '{}'", &info.upid_str);
+                    let now = proxmox::tools::time::epoch_i64();
+                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+                    finish_list.push(TaskListInfo {
+                        upid: info.upid,
+                        upid_str: info.upid_str,
+                        state: Some(status)
+                    });
+                    return None;
+                }
+
+                Some(info)
+            }).collect();
+
+        if let Some(upid) = new_upid {
+            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+        }
+
+        let active_raw = render_task_list(&active_list);
+
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        replace_file(
+            &self.active_tasks_fn,
+            active_raw.as_bytes(),
+            options,
+        )?;
+
+        finish_list.sort_unstable_by(|a, b| {
+            match (&a.state, &b.state) {
+                (Some(s1), Some(s2)) => s1.cmp(&s2),
+                (Some(_), None) => std::cmp::Ordering::Less,
+                (None, Some(_)) => std::cmp::Ordering::Greater,
+                _ => a.upid.starttime.cmp(&b.upid.starttime),
+            }
+        });
+
+        if !finish_list.is_empty() {
+            let options =  self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+            let mut writer = atomic_open_or_create_file(
+                &self.task_archive_fn,
+                OFlag::O_APPEND | OFlag::O_RDWR,
+                &[],
+                options,
+            )?;
+            for info in &finish_list {
+                writer.write_all(render_task_line(&info).as_bytes())?;
+            }
+        }
+
+        // TODO Remove with 1.x
+        // for compatibility, if we had an INDEX file, we do not need it anymore
+        if had_index_file {
+            let _ = nix::unistd::unlink(&self.task_index_fn);
+        }
+
+        drop(lock);
+
+        Ok(())
+    }
+
+    // Create task log directory with correct permissions
+    fn create_task_log_dirs(&self) -> Result<(), Error> {
+
+        try_block!({
+            let dir_opts = self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
+            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+            Ok(())
+        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+    }
+}
+
+/// Initialize the WorkerTask library
+pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
+    let setup = WorkerTaskSetup::new(basedir, file_opts);
+    setup.create_task_log_dirs()?;
+    WORKER_TASK_SETUP.set(setup)
+        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
+}
+
+/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
+/// rotates it if it is
+pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let _lock = setup.lock_task_list_files(true)?;
+
+    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
+            .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+    logrotate.rotate(size_threshold, None, max_files)
+}
+
+
+/// Path to the worker log file
+pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
+    let setup = worker_task_setup()?;
+    Ok(setup.log_path(upid))
+}
+
+/// Read endtime (time of last log line) and exitstatus from task log file
+/// If there is not a single line with at valid datetime, we assume the
+/// starttime to be the endtime
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let mut status = TaskState::Unknown { endtime: upid.starttime };
+
+    let path = setup.log_path(upid);
+
+    let mut file = File::open(path)?;
+
+    /// speedup - only read tail
+    use std::io::Seek;
+    use std::io::SeekFrom;
+    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
+
+    let mut data = Vec::with_capacity(8192);
+    file.read_to_end(&mut data)?;
+
+    // strip newlines at the end of the task logs
+    while data.last() == Some(&b'\n') {
+        data.pop();
+    }
+
+    let last_line = match data.iter().rposition(|c| *c == b'\n') {
+        Some(start) if data.len() > (start+1) => &data[start+1..],
+        Some(_) => &data, // should not happen, since we removed all trailing newlines
+        None => &data,
+    };
+
+    let last_line = std::str::from_utf8(last_line)
+        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
+
+    let mut iter = last_line.splitn(2, ": ");
+    if let Some(time_str) = iter.next() {
+        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+            // set the endtime even if we cannot parse the state
+            status = TaskState::Unknown { endtime };
+            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
+                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
+                    status = state;
+                }
+            }
+        }
+    }
+
+    Ok(status)
 }
-pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
 
 lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
@@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
     }
 }
 
-/// Create task log directory with correct permissions
-pub fn create_task_log_dirs() -> Result<(), Error> {
-
-    try_block!({
-        let backup_user = pbs_config::backup_user()?;
-        let opts = CreateOptions::new()
-            .owner(backup_user.uid)
-            .group(backup_user.gid);
-
-        create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
-        create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
-        create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
-        Ok(())
-    }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
-
-    Ok(())
-}
-
-/// Read endtime (time of last log line) and exitstatus from task log file
-/// If there is not a single line with at valid datetime, we assume the
-/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
-    let mut status = TaskState::Unknown { endtime: upid.starttime };
-
-    let path = upid.log_path();
-
-    let mut file = File::open(path)?;
-
-    /// speedup - only read tail
-    use std::io::Seek;
-    use std::io::SeekFrom;
-    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
-
-    let mut data = Vec::with_capacity(8192);
-    file.read_to_end(&mut data)?;
-
-    // strip newlines at the end of the task logs
-    while data.last() == Some(&b'\n') {
-        data.pop();
-    }
-
-    let last_line = match data.iter().rposition(|c| *c == b'\n') {
-        Some(start) if data.len() > (start+1) => &data[start+1..],
-        Some(_) => &data, // should not happen, since we removed all trailing newlines
-        None => &data,
-    };
-
-    let last_line = std::str::from_utf8(last_line)
-        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
-
-    let mut iter = last_line.splitn(2, ": ");
-    if let Some(time_str) = iter.next() {
-        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
-            // set the endtime even if we cannot parse the state
-            status = TaskState::Unknown { endtime };
-            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
-                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
-                    status = state;
-                }
-            }
-        }
-    }
-
-    Ok(status)
-}
-
 /// Task State
 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
 pub enum TaskState {
@@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
     }
 }
 
-fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
-    open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
-}
-
-/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
-/// rotates it if it is
-pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
-    let _lock = lock_task_list_files(true)?;
-
-    let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
-        .ok_or_else(|| format_err!("could not get archive file names"))?;
-
-    logrotate.rotate(size_threshold, None, max_files)
-}
-
-// atomically read/update the task list, update status of finished tasks
-// new_upid is added to the list when specified.
-fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
-
-    let backup_user = pbs_config::backup_user()?;
-
-    let lock = lock_task_list_files(true)?;
-
-    // TODO remove with 1.x
-    let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
-    let had_index_file = !finish_list.is_empty();
-
-    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
-    // clippy doesn't quite catch this!
-    #[allow(clippy::unnecessary_filter_map)]
-    let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
-        .into_iter()
-        .filter_map(|info| {
-            if info.state.is_some() {
-                // this can happen when the active file still includes finished tasks
-                finish_list.push(info);
-                return None;
-            }
-
-            if !worker_is_active_local(&info.upid) {
-                // println!("Detected stopped task '{}'", &info.upid_str);
-                let now = proxmox::tools::time::epoch_i64();
-                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
-                finish_list.push(TaskListInfo {
-                    upid: info.upid,
-                    upid_str: info.upid_str,
-                    state: Some(status)
-                });
-                return None;
-            }
-
-            Some(info)
-        }).collect();
-
-    if let Some(upid) = new_upid {
-        active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
-    }
-
-    let active_raw = render_task_list(&active_list);
-
-    replace_file(
-        PROXMOX_BACKUP_ACTIVE_TASK_FN,
-        active_raw.as_bytes(),
-        CreateOptions::new()
-            .owner(backup_user.uid)
-            .group(backup_user.gid),
-    )?;
-
-    finish_list.sort_unstable_by(|a, b| {
-        match (&a.state, &b.state) {
-            (Some(s1), Some(s2)) => s1.cmp(&s2),
-            (Some(_), None) => std::cmp::Ordering::Less,
-            (None, Some(_)) => std::cmp::Ordering::Greater,
-            _ => a.upid.starttime.cmp(&b.upid.starttime),
-        }
-    });
-
-    if !finish_list.is_empty() {
-        match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
-            Ok(mut writer) => {
-                for info in &finish_list {
-                    writer.write_all(render_task_line(&info).as_bytes())?;
-                }
-            },
-            Err(err) => bail!("could not write task archive - {}", err),
-        }
-
-        nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
-    }
-
-    // TODO Remove with 1.x
-    // for compatibility, if we had an INDEX file, we do not need it anymore
-    if had_index_file {
-        let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
-    }
-
-    drop(lock);
-
-    Ok(())
-}
-
 fn render_task_line(info: &TaskListInfo) -> String {
     let mut raw = String::new();
     if let Some(status) = &info.state {
@@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
     list: VecDeque<TaskListInfo>,
     end: bool,
     archive: Option<LogRotateFiles>,
-    lock: Option<BackupLockGuard>,
+    lock: Option<TaskListLockGuard>,
 }
 
 impl TaskListInfoIterator {
     pub fn new(active_only: bool) -> Result<Self, Error> {
+
+        let setup = worker_task_setup()?;
+
         let (read_lock, active_list) = {
-            let lock = lock_task_list_files(false)?;
-            let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+            let lock = setup.lock_task_list_files(false)?;
+            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
 
             let needs_update = active_list
                 .iter()
                 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
 
             // TODO remove with 1.x
-            let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
+            let index_exists = setup.task_index_fn.is_file();
 
             if needs_update || index_exists {
                 drop(lock);
-                update_active_workers(None)?;
-                let lock = lock_task_list_files(false)?;
-                let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+                setup.update_active_workers(None)?;
+                let lock = setup.lock_task_list_files(false)?;
+                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
                 (lock, active_list)
             } else {
                 (lock, active_list)
@@ -516,7 +592,7 @@ impl TaskListInfoIterator {
         let archive = if active_only {
             None
         } else {
-            let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
+            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
                 .ok_or_else(|| format_err!("could not get archive file names"))?;
             Some(logrotate.files())
         };
@@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
 /// persistently to files. Task should poll the `abort_requested`
 /// flag, and stop execution when requested.
 pub struct WorkerTask {
+    setup: &'static WorkerTaskSetup,
     upid: UPID,
     data: Mutex<WorkerTaskData>,
     abort_requested: AtomicBool,
@@ -589,17 +666,26 @@ struct WorkerTaskData {
 
 impl WorkerTask {
 
-    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
+    pub fn new(
+        worker_type: &str,
+        worker_id: Option<String>,
+        auth_id: String,
+        to_stdout: bool,
+    ) -> Result<Arc<Self>, Error> {
+
+        let setup = worker_task_setup()?;
+
         let upid = UPID::new(worker_type, worker_id, auth_id)?;
         let task_id = upid.task_id;
 
-        let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
+        let mut path = setup.taskdir.clone();
 
         path.push(format!("{:02X}", upid.pstart & 255));
 
-        let backup_user = pbs_config::backup_user()?;
+        let dir_opts = setup.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
 
-        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
+        create_path(&path, None, Some(dir_opts))?;
 
         path.push(upid.to_string());
 
@@ -608,12 +694,13 @@ impl WorkerTask {
             exclusive: true,
             prefix_time: true,
             read: true,
+            file_opts: setup.file_opts.clone(),
             ..Default::default()
         };
         let logger = FileLogger::new(&path, logger_options)?;
-        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
 
         let worker = Arc::new(Self {
+            setup,
             upid: upid.clone(),
             abort_requested: AtomicBool::new(false),
             data: Mutex::new(WorkerTaskData {
@@ -631,7 +718,7 @@ impl WorkerTask {
             proxmox_rest_server::set_worker_count(hash.len());
         }
 
-        update_active_workers(Some(&upid))?;
+        setup.update_active_workers(Some(&upid))?;
 
         Ok(worker)
     }
@@ -714,7 +801,7 @@ impl WorkerTask {
         self.log(state.result_text());
 
         WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
-        let _ = update_active_workers(None);
+        let _ = self.setup.update_active_workers(None);
         proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
     }
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate Dietmar Maurer
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

---
 Cargo.toml                                    |   2 +-
 pbs-api-types/Cargo.toml                      |   2 +-
 pbs-api-types/src/lib.rs                      |  59 ++++-
 pbs-api-types/src/upid.rs                     | 204 ------------------
 pbs-client/Cargo.toml                         |   2 +-
 pbs-config/Cargo.toml                         |   2 +-
 pbs-datastore/Cargo.toml                      |   2 +-
 pbs-fuse-loop/Cargo.toml                      |   2 +-
 pbs-tape/Cargo.toml                           |   2 +-
 pbs-tools/Cargo.toml                          |   2 +-
 proxmox-backup-client/Cargo.toml              |   3 +-
 proxmox-backup-client/src/mount.rs            |   8 +-
 proxmox-file-restore/Cargo.toml               |   3 +-
 proxmox-file-restore/src/block_driver_qemu.rs |   8 +-
 proxmox-rest-server/Cargo.toml                |   2 +-
 proxmox-restore-daemon/Cargo.toml             |   2 +-
 proxmox-systemd/Cargo.toml                    |   2 +-
 proxmox-systemd/src/unit.rs                   |  85 +-------
 pxar-bin/Cargo.toml                           |   2 +-
 src/api2/node/disks/directory.rs              |   4 +-
 src/api2/node/disks/zfs.rs                    |   2 +-
 src/server/worker_task.rs                     |   2 +-
 src/tape/drive/mod.rs                         |   2 +-
 23 files changed, 86 insertions(+), 318 deletions(-)
 delete mode 100644 pbs-api-types/src/upid.rs

diff --git a/Cargo.toml b/Cargo.toml
index 99c56f04..88e8fbd1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
+proxmox = { version = "0.13.4", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
 proxmox-acme-rs = "0.2.1"
 proxmox-apt = "0.7.0"
 proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
diff --git a/pbs-api-types/Cargo.toml b/pbs-api-types/Cargo.toml
index a64d7f0a..878d6417 100644
--- a/pbs-api-types/Cargo.toml
+++ b/pbs-api-types/Cargo.toml
@@ -14,7 +14,7 @@ openssl = "0.10"
 regex = "1.2"
 serde = { version = "1.0", features = ["derive"] }
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "api-macro" ] }
 
 proxmox-systemd = { path = "../proxmox-systemd" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-api-types/src/lib.rs b/pbs-api-types/src/lib.rs
index 6b0246f5..f7521b02 100644
--- a/pbs-api-types/src/lib.rs
+++ b/pbs-api-types/src/lib.rs
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
 use anyhow::bail;
 
 use proxmox::api::api;
-use proxmox::api::schema::{ApiStringFormat, ArraySchema, Schema, StringSchema};
+use proxmox::api::schema::{ApiStringFormat, ApiType, ArraySchema, Schema, StringSchema, ReturnType};
 use proxmox::const_regex;
 use proxmox::{IPRE, IPRE_BRACKET, IPV4OCTET, IPV4RE, IPV6H16, IPV6LS32, IPV6RE};
 
@@ -60,8 +60,7 @@ pub use userid::{PROXMOX_GROUP_ID_SCHEMA, PROXMOX_TOKEN_ID_SCHEMA, PROXMOX_TOKEN
 mod user;
 pub use user::*;
 
-pub mod upid;
-pub use upid::*;
+pub use proxmox::api::upid::*;
 
 mod crypto;
 pub use crypto::{CryptMode, Fingerprint};
@@ -397,3 +396,57 @@ pub enum NodePowerCommand {
     /// Shutdown the server
     Shutdown,
 }
+
+
+#[api()]
+#[derive(Eq, PartialEq, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum TaskStateType {
+    /// Ok
+    OK,
+    /// Warning
+    Warning,
+    /// Error
+    Error,
+    /// Unknown
+    Unknown,
+}
+
+#[api(
+    properties: {
+        upid: { schema: UPID::API_SCHEMA },
+    },
+)]
+#[derive(Serialize, Deserialize)]
+/// Task properties.
+pub struct TaskListItem {
+    pub upid: String,
+    /// The node name where the task is running on.
+    pub node: String,
+    /// The Unix PID
+    pub pid: i64,
+    /// The task start time (Epoch)
+    pub pstart: u64,
+    /// The task start time (Epoch)
+    pub starttime: i64,
+    /// Worker type (arbitrary ASCII string)
+    pub worker_type: String,
+    /// Worker ID (arbitrary ASCII string)
+    pub worker_id: Option<String>,
+    /// The authenticated entity who started the task
+    pub user: String,
+    /// The task end time (Epoch)
+    #[serde(skip_serializing_if="Option::is_none")]
+    pub endtime: Option<i64>,
+    /// Task end status
+    #[serde(skip_serializing_if="Option::is_none")]
+    pub status: Option<String>,
+}
+
+pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
+    optional: false,
+    schema: &ArraySchema::new(
+        "A list of tasks.",
+        &TaskListItem::API_SCHEMA,
+    ).schema(),
+};
diff --git a/pbs-api-types/src/upid.rs b/pbs-api-types/src/upid.rs
deleted file mode 100644
index 29135bca..00000000
--- a/pbs-api-types/src/upid.rs
+++ /dev/null
@@ -1,204 +0,0 @@
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-use anyhow::{bail, Error};
-use serde::{Deserialize, Serialize};
-
-use proxmox::api::api;
-use proxmox::api::schema::{ApiStringFormat, ApiType, Schema, StringSchema, ArraySchema, ReturnType};
-use proxmox::const_regex;
-use proxmox::sys::linux::procfs;
-
-/// Unique Process/Task Identifier
-///
-/// We use this to uniquely identify worker task. UPIDs have a short
-/// string repesentaion, which gives additional information about the
-/// type of the task. for example:
-/// ```text
-/// UPID:{node}:{pid}:{pstart}:{task_id}:{starttime}:{worker_type}:{worker_id}:{userid}:
-/// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam:
-/// ```
-/// Please note that we use tokio, so a single thread can run multiple
-/// tasks.
-// #[api] - manually implemented API type
-#[derive(Debug, Clone)]
-pub struct UPID {
-    /// The Unix PID
-    pub pid: libc::pid_t,
-    /// The Unix process start time from `/proc/pid/stat`
-    pub pstart: u64,
-    /// The task start time (Epoch)
-    pub starttime: i64,
-    /// The task ID (inside the process/thread)
-    pub task_id: usize,
-    /// Worker type (arbitrary ASCII string)
-    pub worker_type: String,
-    /// Worker ID (arbitrary ASCII string)
-    pub worker_id: Option<String>,
-    /// The authenticated entity who started the task
-    pub auth_id: String,
-    /// The node name.
-    pub node: String,
-}
-
-proxmox::forward_serialize_to_display!(UPID);
-proxmox::forward_deserialize_to_from_str!(UPID);
-
-const_regex! {
-    pub PROXMOX_UPID_REGEX = concat!(
-        r"^UPID:(?P<node>[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?):(?P<pid>[0-9A-Fa-f]{8}):",
-        r"(?P<pstart>[0-9A-Fa-f]{8,9}):(?P<task_id>[0-9A-Fa-f]{8,16}):(?P<starttime>[0-9A-Fa-f]{8}):",
-        r"(?P<wtype>[^:\s]+):(?P<wid>[^:\s]*):(?P<authid>[^:\s]+):$"
-    );
-}
-
-pub const PROXMOX_UPID_FORMAT: ApiStringFormat =
-    ApiStringFormat::Pattern(&PROXMOX_UPID_REGEX);
-
-pub const UPID_SCHEMA: Schema = StringSchema::new("Unique Process/Task Identifier")
-    .min_length("UPID:N:12345678:12345678:12345678:::".len())
-    .max_length(128) // arbitrary
-    .format(&PROXMOX_UPID_FORMAT)
-    .schema();
-
-impl ApiType for UPID {
-    const API_SCHEMA: Schema = UPID_SCHEMA;
-}
-
-impl UPID {
-    /// Create a new UPID
-    pub fn new(
-        worker_type: &str,
-        worker_id: Option<String>,
-        auth_id: String,
-    ) -> Result<Self, Error> {
-
-        let pid = unsafe { libc::getpid() };
-
-        let bad: &[_] = &['/', ':', ' '];
-
-        if worker_type.contains(bad) {
-            bail!("illegal characters in worker type '{}'", worker_type);
-        }
-
-        if auth_id.contains(bad) {
-            bail!("illegal characters in auth_id '{}'", auth_id);
-        }
-
-        static WORKER_TASK_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
-
-        let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst);
-
-        Ok(UPID {
-            pid,
-            pstart: procfs::PidStat::read_from_pid(nix::unistd::Pid::from_raw(pid))?.starttime,
-            starttime: proxmox::tools::time::epoch_i64(),
-            task_id,
-            worker_type: worker_type.to_owned(),
-            worker_id,
-            auth_id,
-            node: proxmox::tools::nodename().to_owned(),
-        })
-    }
-}
-
-
-impl std::str::FromStr for UPID {
-    type Err = Error;
-
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
-        if let Some(cap) = PROXMOX_UPID_REGEX.captures(s) {
-
-            let worker_id = if cap["wid"].is_empty() {
-                None
-            } else {
-                let wid = proxmox_systemd::unescape_unit(&cap["wid"])?;
-                Some(wid)
-            };
-
-            Ok(UPID {
-                pid: i32::from_str_radix(&cap["pid"], 16).unwrap(),
-                pstart: u64::from_str_radix(&cap["pstart"], 16).unwrap(),
-                starttime: i64::from_str_radix(&cap["starttime"], 16).unwrap(),
-                task_id: usize::from_str_radix(&cap["task_id"], 16).unwrap(),
-                worker_type: cap["wtype"].to_string(),
-                worker_id,
-                auth_id: cap["authid"].parse()?,
-                node: cap["node"].to_string(),
-            })
-        } else {
-            bail!("unable to parse UPID '{}'", s);
-        }
-
-    }
-}
-
-impl std::fmt::Display for UPID {
-
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-
-        let wid = if let Some(ref id) = self.worker_id {
-            proxmox_systemd::escape_unit(id, false)
-        } else {
-            String::new()
-        };
-
-        // Note: pstart can be > 32bit if uptime > 497 days, so this can result in
-        // more that 8 characters for pstart
-
-        write!(f, "UPID:{}:{:08X}:{:08X}:{:08X}:{:08X}:{}:{}:{}:",
-               self.node, self.pid, self.pstart, self.task_id, self.starttime, self.worker_type, wid, self.auth_id)
-    }
-}
-
-#[api()]
-#[derive(Eq, PartialEq, Debug, Serialize, Deserialize)]
-#[serde(rename_all = "lowercase")]
-pub enum TaskStateType {
-    /// Ok
-    OK,
-    /// Warning
-    Warning,
-    /// Error
-    Error,
-    /// Unknown
-    Unknown,
-}
-
-#[api(
-    properties: {
-        upid: { schema: UPID::API_SCHEMA },
-    },
-)]
-#[derive(Serialize, Deserialize)]
-/// Task properties.
-pub struct TaskListItem {
-    pub upid: String,
-    /// The node name where the task is running on.
-    pub node: String,
-    /// The Unix PID
-    pub pid: i64,
-    /// The task start time (Epoch)
-    pub pstart: u64,
-    /// The task start time (Epoch)
-    pub starttime: i64,
-    /// Worker type (arbitrary ASCII string)
-    pub worker_type: String,
-    /// Worker ID (arbitrary ASCII string)
-    pub worker_id: Option<String>,
-    /// The authenticated entity who started the task
-    pub user: String,
-    /// The task end time (Epoch)
-    #[serde(skip_serializing_if="Option::is_none")]
-    pub endtime: Option<i64>,
-    /// Task end status
-    #[serde(skip_serializing_if="Option::is_none")]
-    pub status: Option<String>,
-}
-
-pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
-    optional: false,
-    schema: &ArraySchema::new(
-        "A list of tasks.",
-        &TaskListItem::API_SCHEMA,
-    ).schema(),
-};
diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml
index 965282bc..dc5d8928 100644
--- a/pbs-client/Cargo.toml
+++ b/pbs-client/Cargo.toml
@@ -28,7 +28,7 @@ tower-service = "0.3.0"
 xdg = "2.2"
 
 pathpatterns = "0.1.2"
-proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "cli" ] }
 proxmox-fuse = "0.1.1"
 proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
diff --git a/pbs-config/Cargo.toml b/pbs-config/Cargo.toml
index 48830045..3249a8e9 100644
--- a/pbs-config/Cargo.toml
+++ b/pbs-config/Cargo.toml
@@ -16,7 +16,7 @@ nix = "0.19.1"
 regex = "1.2"
 once_cell = "1.3.1"
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "cli" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index 5b3c7fab..578b8689 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -23,7 +23,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = "0.10.1"
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "api-macro" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-fuse-loop/Cargo.toml b/pbs-fuse-loop/Cargo.toml
index c3220be7..aa61d006 100644
--- a/pbs-fuse-loop/Cargo.toml
+++ b/pbs-fuse-loop/Cargo.toml
@@ -14,7 +14,7 @@ nix = "0.19.1"
 regex = "1.2"
 tokio = { version = "1.6", features = [] }
 
-proxmox = "0.13.3"
+proxmox = "0.13.4"
 proxmox-fuse = "0.1.1"
 
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-tape/Cargo.toml b/pbs-tape/Cargo.toml
index 4ffae21e..3dbeb17c 100644
--- a/pbs-tape/Cargo.toml
+++ b/pbs-tape/Cargo.toml
@@ -18,7 +18,7 @@ bitflags = "1.2.1"
 regex = "1.2"
 udev = ">= 0.3, <0.5"
 
-proxmox = { version = "0.13.3", default-features = false, features = [] }
+proxmox = { version = "0.13.4", default-features = false, features = [] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
index 88f6f54c..f20a315e 100644
--- a/pbs-tools/Cargo.toml
+++ b/pbs-tools/Cargo.toml
@@ -30,7 +30,7 @@ url = "2.1"
 walkdir = "2"
 zstd = { version = "0.6", features = [ "bindgen" ] }
 
-proxmox = { version = "0.13.3", default-features = false, features = [ "tokio" ] }
+proxmox = { version = "0.13.4", default-features = false, features = [ "tokio" ] }
 
 pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-runtime = { path = "../pbs-runtime" }
diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml
index d3c35534..42a4d09f 100644
--- a/proxmox-backup-client/Cargo.toml
+++ b/proxmox-backup-client/Cargo.toml
@@ -22,7 +22,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
+proxmox = { version = "0.13.4", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
@@ -31,5 +31,4 @@ pbs-client = { path = "../pbs-client" }
 pbs-datastore = { path = "../pbs-datastore" }
 pbs-fuse-loop = { path = "../pbs-fuse-loop" }
 pbs-runtime = { path = "../pbs-runtime" }
-proxmox-systemd = { path = "../proxmox-systemd" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index 9ac1d9c2..7c977864 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -118,7 +118,7 @@ fn complete_mapping_names<S: BuildHasher>(_arg: &str, _param: &HashMap<String, S
     match pbs_fuse_loop::find_all_mappings() {
         Ok(mappings) => mappings
             .filter_map(|(name, _)| {
-                proxmox_systemd::unescape_unit(&name).ok()
+                proxmox::tools::systemd::unescape_unit(&name).ok()
             }).collect(),
         Err(_) => Vec::new()
     }
@@ -279,7 +279,7 @@ async fn mount_do(param: Value, pipe: Option<Fd>) -> Result<Value, Error> {
         let reader = CachedChunkReader::new(chunk_reader, index, 8).seekable();
 
         let name = &format!("{}:{}/{}", repo.to_string(), path, archive_name);
-        let name_escaped = proxmox_systemd::escape_unit(name, false);
+        let name_escaped = proxmox::tools::systemd::escape_unit(name, false);
 
         let mut session = pbs_fuse_loop::FuseLoopSession::map_loop(size, reader, &name_escaped, options).await?;
         let loopdev = session.loopdev_path.clone();
@@ -341,7 +341,7 @@ fn unmap(
             pbs_fuse_loop::cleanup_unused_run_files(None);
             let mut any = false;
             for (backing, loopdev) in pbs_fuse_loop::find_all_mappings()? {
-                let name = proxmox_systemd::unescape_unit(&backing)?;
+                let name = proxmox::tools::systemd::unescape_unit(&backing)?;
                 println!("{}:\t{}", loopdev.unwrap_or_else(|| "(unmapped)".to_string()), name);
                 any = true;
             }
@@ -360,7 +360,7 @@ fn unmap(
     if name.starts_with("/dev/loop") {
         pbs_fuse_loop::unmap_loopdev(name)?;
     } else {
-        let name = proxmox_systemd::escape_unit(&name, false);
+        let name = proxmox::tools::systemd::escape_unit(&name, false);
         pbs_fuse_loop::unmap_name(name)?;
     }
 
diff --git a/proxmox-file-restore/Cargo.toml b/proxmox-file-restore/Cargo.toml
index 1e13fb46..899fc984 100644
--- a/proxmox-file-restore/Cargo.toml
+++ b/proxmox-file-restore/Cargo.toml
@@ -16,7 +16,7 @@ tokio = { version = "1.6", features = [ "io-std", "rt", "rt-multi-thread", "time
 
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "api-macro", "cli" ] }
+proxmox = { version = "0.13.4", features = [ "api-macro", "cli" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
@@ -24,5 +24,4 @@ pbs-config = { path = "../pbs-config" }
 pbs-client = { path = "../pbs-client" }
 pbs-datastore = { path = "../pbs-datastore" }
 pbs-runtime = { path = "../pbs-runtime" }
-proxmox-systemd = { path = "../proxmox-systemd" }
 pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-file-restore/src/block_driver_qemu.rs b/proxmox-file-restore/src/block_driver_qemu.rs
index 2f73e669..b6eaf83a 100644
--- a/proxmox-file-restore/src/block_driver_qemu.rs
+++ b/proxmox-file-restore/src/block_driver_qemu.rs
@@ -80,7 +80,7 @@ impl VMStateMap {
 
 fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String {
     let full = format!("qemu_{}/{}", repo, snap);
-    proxmox_systemd::escape_unit(&full, false)
+    proxmox::tools::systemd::escape_unit(&full, false)
 }
 
 /// remove non-responsive VMs from given map, returns 'true' if map was modified
@@ -257,7 +257,7 @@ impl BlockRestoreDriver for QemuBlockDriver {
                 let resp = client
                     .get("api2/json/status", Some(json!({"keep-timeout": true})))
                     .await;
-                let name = proxmox_systemd::unescape_unit(n)
+                let name = proxmox::tools::systemd::unescape_unit(n)
                     .unwrap_or_else(|_| "<invalid name>".to_owned());
                 let mut extra = json!({"pid": s.pid, "cid": s.cid});
 
@@ -295,7 +295,7 @@ impl BlockRestoreDriver for QemuBlockDriver {
 
     fn stop(&self, id: String) -> Async<Result<(), Error>> {
         async move {
-            let name = proxmox_systemd::escape_unit(&id, false);
+            let name = proxmox::tools::systemd::escape_unit(&id, false);
             let mut map = VMStateMap::load()?;
             let map_mod = cleanup_map(&mut map.map).await;
             match map.map.get(&name) {
@@ -325,7 +325,7 @@ impl BlockRestoreDriver for QemuBlockDriver {
         match VMStateMap::load_read_only() {
             Ok(state) => state
                 .iter()
-                .filter_map(|(name, _)| proxmox_systemd::unescape_unit(&name).ok())
+                .filter_map(|(name, _)| proxmox::tools::systemd::unescape_unit(&name).ok())
                 .collect(),
             Err(_) => Vec::new(),
         }
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index 2f740e67..b02c20db 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -24,7 +24,7 @@ tokio-openssl = "0.6.1"
 tower-service = "0.3.0"
 url = "2.1"
 
-proxmox = { version = "0.13.3", features = [ "router"] }
+proxmox = { version = "0.13.4", features = [ "router"] }
 
 # fixme: remove this dependency (pbs_tools::broadcast_future)
 pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-restore-daemon/Cargo.toml b/proxmox-restore-daemon/Cargo.toml
index c525dc99..871af5f9 100644
--- a/proxmox-restore-daemon/Cargo.toml
+++ b/proxmox-restore-daemon/Cargo.toml
@@ -26,7 +26,7 @@ tokio-util = { version = "0.6", features = [ "codec", "io" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.3", features = [ "router", "sortable-macro" ] }
+proxmox = { version = "0.13.4", features = [ "router", "sortable-macro" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-runtime = { path = "../pbs-runtime" }
diff --git a/proxmox-systemd/Cargo.toml b/proxmox-systemd/Cargo.toml
index c6caa7ec..017a281c 100644
--- a/proxmox-systemd/Cargo.toml
+++ b/proxmox-systemd/Cargo.toml
@@ -11,6 +11,6 @@ bitflags = "1.2.1"
 lazy_static = "1.4"
 nom = "5.1"
 
-proxmox = { version = "0.13.3", default-features = false }
+proxmox = { version = "0.13.4", default-features = false }
 
 #pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-systemd/src/unit.rs b/proxmox-systemd/src/unit.rs
index af3db1a6..15be61fd 100644
--- a/proxmox-systemd/src/unit.rs
+++ b/proxmox-systemd/src/unit.rs
@@ -34,88 +34,6 @@ fn run_command(mut command: Command) -> Result<(), Error> {
     Ok(())
 }
 
-/// Escape strings for usage in systemd unit names
-pub fn escape_unit(mut unit: &str, is_path: bool) -> String {
-    if is_path {
-        unit = unit.trim_matches('/');
-        if unit.is_empty() {
-            return String::from("-");
-        }
-    }
-
-    let unit = unit.as_bytes();
-
-    let mut escaped = String::new();
-
-    for (i, c) in unit.iter().enumerate() {
-        if *c == b'/' {
-            escaped.push('-');
-            continue;
-        }
-        if (i == 0 && *c == b'.')
-            || !(*c == b'_'
-                || *c == b'.'
-                || (*c >= b'0' && *c <= b'9')
-                || (*c >= b'A' && *c <= b'Z')
-                || (*c >= b'a' && *c <= b'z'))
-        {
-            escaped.push_str(&format!("\\x{:0x}", c));
-        } else {
-            escaped.push(*c as char);
-        }
-    }
-    escaped
-}
-
-fn parse_hex_digit(d: u8) -> Result<u8, Error> {
-    if d >= b'0' && d <= b'9' {
-        return Ok(d - b'0');
-    }
-    if d >= b'A' && d <= b'F' {
-        return Ok(d - b'A' + 10);
-    }
-    if d >= b'a' && d <= b'f' {
-        return Ok(d - b'a' + 10);
-    }
-    bail!("got invalid hex digit");
-}
-
-/// Unescape strings used in systemd unit names
-pub fn unescape_unit(text: &str) -> Result<String, Error> {
-    let mut i = text.as_bytes();
-
-    let mut data: Vec<u8> = Vec::new();
-
-    loop {
-        if i.is_empty() {
-            break;
-        }
-        let next = i[0];
-        if next == b'\\' {
-            if i.len() < 4 {
-                bail!("short input");
-            }
-            if i[1] != b'x' {
-                bail!("unkwnown escape sequence");
-            }
-            let h1 = parse_hex_digit(i[2])?;
-            let h0 = parse_hex_digit(i[3])?;
-            data.push(h1 << 4 | h0);
-            i = &i[4..]
-        } else if next == b'-' {
-            data.push(b'/');
-            i = &i[1..]
-        } else {
-            data.push(next);
-            i = &i[1..]
-        }
-    }
-
-    let text = String::from_utf8(data)?;
-
-    Ok(text)
-}
-
 pub fn reload_daemon() -> Result<(), Error> {
     let mut command = std::process::Command::new("systemctl");
     command.arg("daemon-reload");
@@ -178,6 +96,9 @@ pub fn reload_unit(unit: &str) -> Result<(), Error> {
 #[test]
 fn test_escape_unit() -> Result<(), Error> {
     fn test_escape(i: &str, expected: &str, is_path: bool) {
+
+        use proxmox::tools::systemd::{escape_unit, unescape_unit};
+
         let escaped = escape_unit(i, is_path);
         assert_eq!(escaped, expected);
         let unescaped = unescape_unit(&escaped).unwrap();
diff --git a/pxar-bin/Cargo.toml b/pxar-bin/Cargo.toml
index e322e654..6121f7bc 100644
--- a/pxar-bin/Cargo.toml
+++ b/pxar-bin/Cargo.toml
@@ -16,7 +16,7 @@ serde_json = "1.0"
 tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
 
 pathpatterns = "0.1.2"
-proxmox = { version = "0.13.3", default-features = false, features = [] }
+proxmox = { version = "0.13.4", default-features = false, features = [] }
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 pbs-client = { path = "../pbs-client" }
diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
index 2f4a738d..49127586 100644
--- a/src/api2/node/disks/directory.rs
+++ b/src/api2/node/disks/directory.rs
@@ -242,7 +242,7 @@ pub fn delete_datastore_disk(name: String) -> Result<(), Error> {
     }
 
     // disable systemd mount-unit
-    let mut mount_unit_name = proxmox_systemd::escape_unit(&path, true);
+    let mut mount_unit_name = proxmox::tools::systemd::escape_unit(&path, true);
     mount_unit_name.push_str(".mount");
     proxmox_systemd::disable_unit(&mount_unit_name)?;
 
@@ -281,7 +281,7 @@ fn create_datastore_mount_unit(
     what: &str,
 ) -> Result<String, Error> {
 
-    let mut mount_unit_name = proxmox_systemd::escape_unit(&mount_point, true);
+    let mut mount_unit_name = proxmox::tools::systemd::escape_unit(&mount_point, true);
     mount_unit_name.push_str(".mount");
 
     let mount_unit_path = format!("/etc/systemd/system/{}", mount_unit_name);
diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
index 9fe0dac4..8a6cb708 100644
--- a/src/api2/node/disks/zfs.rs
+++ b/src/api2/node/disks/zfs.rs
@@ -271,7 +271,7 @@ pub fn create_zpool(
             worker.log(output);
 
             if std::path::Path::new("/lib/systemd/system/zfs-import@.service").exists() {
-                let import_unit = format!("zfs-import@{}.service", proxmox_systemd::escape_unit(&name, false));
+                let import_unit = format!("zfs-import@{}.service", proxmox::tools::systemd::escape_unit(&name, false));
                 proxmox_systemd::enable_unit(&import_unit)?;
             }
 
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 191d8a44..94ffbeb0 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -18,9 +18,9 @@ use once_cell::sync::OnceCell;
 use proxmox::sys::linux::procfs;
 use proxmox::try_block;
 use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
+use proxmox::api::upid::UPID;
 
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use pbs_api_types::UPID;
 use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
 struct TaskListLockGuard(File);
diff --git a/src/tape/drive/mod.rs b/src/tape/drive/mod.rs
index f477acc7..ef5ffdbf 100644
--- a/src/tape/drive/mod.rs
+++ b/src/tape/drive/mod.rs
@@ -606,7 +606,7 @@ pub struct DeviceLockGuard(std::fs::File);
 // Uses systemd escape_unit to compute a file name from `device_path`, the try
 // to lock `/var/lock/<name>`.
 fn open_device_lock(device_path: &str) -> Result<std::fs::File, Error> {
-    let lock_name = proxmox_systemd::escape_unit(device_path, true);
+    let lock_name = proxmox::tools::systemd::escape_unit(device_path, true);
 
     let mut path = std::path::PathBuf::from(crate::tape::DRIVE_LOCK_DIR);
     path.push(lock_name);
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4 Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket Dietmar Maurer
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

Also moved pbs-datastore/src/task.rs to pbs-tools, which now depends on 'log'.
---
 pbs-datastore/src/chunk_store.rs              |  5 +-
 pbs-datastore/src/lib.rs                      |  1 -
 pbs-tools/Cargo.toml                          |  1 +
 pbs-tools/src/lib.rs                          |  1 +
 {pbs-datastore => pbs-tools}/src/task.rs      |  0
 proxmox-rest-server/Cargo.toml                |  1 +
 proxmox-rest-server/src/lib.rs                | 40 ++++++++++++++++
 .../src}/worker_task.rs                       | 40 ++++------------
 src/acme/plugin.rs                            |  2 +-
 src/api2/admin/datastore.rs                   |  4 +-
 src/api2/backup/environment.rs                |  4 +-
 src/api2/backup/mod.rs                        |  3 +-
 src/api2/config/acme.rs                       |  2 +-
 src/api2/config/datastore.rs                  |  5 +-
 src/api2/node/apt.rs                          |  2 +-
 src/api2/node/certificates.rs                 |  2 +-
 src/api2/node/disks/directory.rs              |  2 +-
 src/api2/node/disks/mod.rs                    |  2 +-
 src/api2/node/disks/zfs.rs                    |  2 +-
 src/api2/node/mod.rs                          |  2 +-
 src/api2/node/network.rs                      |  2 +-
 src/api2/node/services.rs                     |  2 +-
 src/api2/node/tasks.rs                        | 32 ++++++++++---
 src/api2/pull.rs                              |  3 +-
 src/api2/reader/environment.rs                |  2 +-
 src/api2/reader/mod.rs                        |  2 +-
 src/api2/tape/backup.rs                       |  6 +--
 src/api2/tape/drive.rs                        |  4 +-
 src/api2/tape/restore.rs                      | 10 ++--
 src/backup/datastore.rs                       |  3 +-
 src/backup/verify.rs                          |  4 +-
 src/bin/proxmox-backup-api.rs                 | 17 +++----
 src/bin/proxmox-backup-manager.rs             |  3 +-
 src/bin/proxmox-backup-proxy.rs               | 24 +++++-----
 src/bin/proxmox-daily-update.rs               |  2 +-
 src/bin/proxmox_backup_debug/api.rs           |  4 +-
 src/server/gc_job.rs                          |  2 +-
 src/server/h2service.rs                       |  4 +-
 src/server/jobstate.rs                        |  6 +--
 src/server/mod.rs                             | 48 ++-----------------
 src/server/prune_job.rs                       |  6 +--
 src/server/pull.rs                            |  5 +-
 src/server/verify_job.rs                      |  4 +-
 src/tape/drive/mod.rs                         |  9 ++--
 src/tape/pool_writer/mod.rs                   |  4 +-
 45 files changed, 159 insertions(+), 170 deletions(-)
 rename {pbs-datastore => pbs-tools}/src/task.rs (100%)
 rename {src/server => proxmox-rest-server/src}/worker_task.rs (95%)

diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
index 361cc9a2..5c50e4fc 100644
--- a/pbs-datastore/src/chunk_store.rs
+++ b/pbs-datastore/src/chunk_store.rs
@@ -9,10 +9,9 @@ use proxmox::tools::fs::{CreateOptions, create_path, create_dir};
 
 use pbs_api_types::GarbageCollectionStatus;
 use pbs_tools::process_locker::{self, ProcessLocker};
+use pbs_tools::{task_log, task::TaskState};
 
 use crate::DataBlob;
-use crate::task_log;
-use crate::task::TaskState;
 
 /// File system based chunk store
 pub struct ChunkStore {
@@ -306,7 +305,7 @@ impl ChunkStore {
         for (entry, percentage, bad) in self.get_chunk_iterator()? {
             if last_percentage != percentage {
                 last_percentage = percentage;
-                crate::task_log!(
+                task_log!(
                     worker,
                     "processed {}% ({} chunks)",
                     percentage,
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index cfe39921..5a09666b 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -179,7 +179,6 @@ pub mod paperkey;
 pub mod prune;
 pub mod read_chunk;
 pub mod store_progress;
-pub mod task;
 
 pub mod dynamic_index;
 pub mod fixed_index;
diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml
index f20a315e..d37ef865 100644
--- a/pbs-tools/Cargo.toml
+++ b/pbs-tools/Cargo.toml
@@ -17,6 +17,7 @@ foreign-types = "0.3"
 futures = "0.3"
 lazy_static = "1.4"
 libc = "0.2"
+log = "0.4"
 nix = "0.19.1"
 nom = "5.1"
 openssl = "0.10"
diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
index 000591c3..6c2f0ff5 100644
--- a/pbs-tools/src/lib.rs
+++ b/pbs-tools/src/lib.rs
@@ -24,6 +24,7 @@ pub mod str;
 pub mod stream;
 pub mod sync;
 pub mod sys;
+pub mod task;
 pub mod ticket;
 pub mod tokio;
 pub mod xattr;
diff --git a/pbs-datastore/src/task.rs b/pbs-tools/src/task.rs
similarity index 100%
rename from pbs-datastore/src/task.rs
rename to pbs-tools/src/task.rs
diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml
index b02c20db..afaf40e1 100644
--- a/proxmox-rest-server/Cargo.toml
+++ b/proxmox-rest-server/Cargo.toml
@@ -15,6 +15,7 @@ lazy_static = "1.4"
 libc = "0.2"
 log = "0.4"
 nix = "0.19.1"
+once_cell = "1.3.1"
 percent-encoding = "2.1"
 regex = "1.2"
 serde = { version = "1.0", features = [] }
diff --git a/proxmox-rest-server/src/lib.rs b/proxmox-rest-server/src/lib.rs
index 2f29f9cd..9acdb3fd 100644
--- a/proxmox-rest-server/src/lib.rs
+++ b/proxmox-rest-server/src/lib.rs
@@ -1,9 +1,12 @@
 use std::os::unix::io::RawFd;
 
 use anyhow::{bail, format_err, Error};
+use nix::unistd::Pid;
 
 use proxmox::tools::fd::Fd;
+use proxmox::sys::linux::procfs::PidStat;
 use proxmox::api::UserInformation;
+use proxmox::tools::fs::CreateOptions;
 
 mod compression;
 pub use compression::*;
@@ -29,6 +32,9 @@ pub use api_config::ApiConfig;
 mod rest;
 pub use rest::{RestServer, handle_api_request};
 
+mod worker_task;
+pub use worker_task::*;
+
 pub enum AuthError {
     Generic(Error),
     NoData,
@@ -48,6 +54,40 @@ pub trait ApiAuth {
     ) -> Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>;
 }
 
+lazy_static::lazy_static!{
+    static ref PID: i32 = unsafe { libc::getpid() };
+    static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
+}
+
+pub fn pid() -> i32 {
+    *PID
+}
+
+pub fn pstart() -> u64 {
+    *PSTART
+}
+
+pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
+    let pid_str = format!("{}\n", *PID);
+    proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
+}
+
+pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
+    let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
+    let pid = std::str::from_utf8(&pid)?.trim();
+    pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
+}
+
+pub fn ctrl_sock_from_pid(pid: i32) -> String {
+    // Note: The control socket always uses @/run/proxmox-backup/ as prefix
+    // for historc reason.
+    format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
+}
+
+pub fn our_ctrl_sock() -> String {
+    ctrl_sock_from_pid(*PID)
+}
+
 static mut SHUTDOWN_REQUESTED: bool = false;
 
 pub fn request_shutdown() {
diff --git a/src/server/worker_task.rs b/proxmox-rest-server/src/worker_task.rs
similarity index 95%
rename from src/server/worker_task.rs
rename to proxmox-rest-server/src/worker_task.rs
index 94ffbeb0..b6ed6862 100644
--- a/src/server/worker_task.rs
+++ b/proxmox-rest-server/src/worker_task.rs
@@ -21,7 +21,8 @@ use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file,
 use proxmox::api::upid::UPID;
 
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
+
+use crate::{CommandoSocket, FileLogger, FileLogOptions};
 
 struct TaskListLockGuard(File);
 
@@ -280,7 +281,7 @@ lazy_static! {
 
 /// checks if the task UPID refers to a worker from this process
 fn is_local_worker(upid: &UPID) -> bool {
-    upid.pid == crate::server::pid() && upid.pstart == crate::server::pstart()
+    upid.pid == crate::pid() && upid.pstart == crate::pstart()
 }
 
 /// Test if the task is still running
@@ -293,14 +294,14 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
         return Ok(false);
     }
 
-    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
+    let sock = crate::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
         "command": "worker-task-status",
         "args": {
             "upid": upid.to_string(),
         },
     });
-    let status = proxmox_rest_server::send_command(sock, &cmd).await?;
+    let status = crate::send_command(sock, &cmd).await?;
 
     if let Some(active) = status.as_bool() {
         Ok(active)
@@ -366,14 +367,14 @@ pub fn abort_worker_async(upid: UPID) {
 
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 
-    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
+    let sock = crate::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
         "command": "worker-task-abort",
         "args": {
             "upid": upid.to_string(),
         },
     });
-    proxmox_rest_server::send_command(sock, &cmd).map_ok(|_| ()).await
+    crate::send_command(sock, &cmd).map_ok(|_| ()).await
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
@@ -476,27 +477,6 @@ pub struct TaskListInfo {
     pub state: Option<TaskState>, // endtime, status
 }
 
-impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
-    fn into(self) -> pbs_api_types::TaskListItem {
-        let (endtime, status) = self
-            .state
-            .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
-
-        pbs_api_types::TaskListItem {
-            upid: self.upid_str,
-            node: "localhost".to_string(),
-            pid: self.upid.pid as i64,
-            pstart: self.upid.pstart,
-            starttime: self.upid.starttime,
-            worker_type: self.upid.worker_type,
-            worker_id: self.upid.worker_id,
-            user: self.upid.auth_id,
-            endtime,
-            status,
-        }
-    }
-}
-
 fn render_task_line(info: &TaskListInfo) -> String {
     let mut raw = String::new();
     if let Some(status) = &info.state {
@@ -715,7 +695,7 @@ impl WorkerTask {
         {
             let mut hash = WORKER_TASK_LIST.lock().unwrap();
             hash.insert(task_id, worker.clone());
-            proxmox_rest_server::set_worker_count(hash.len());
+            crate::set_worker_count(hash.len());
         }
 
         setup.update_active_workers(Some(&upid))?;
@@ -802,7 +782,7 @@ impl WorkerTask {
 
         WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
         let _ = self.setup.update_active_workers(None);
-        proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
+        crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
     }
 
     /// Log a message.
@@ -879,7 +859,7 @@ impl WorkerTask {
     }
 }
 
-impl pbs_datastore::task::TaskState for WorkerTask {
+impl pbs_tools::task::TaskState for WorkerTask {
     fn check_abort(&self) -> Result<(), Error> {
         self.fail_on_abort()
     }
diff --git a/src/acme/plugin.rs b/src/acme/plugin.rs
index 7593aaa4..cb7de082 100644
--- a/src/acme/plugin.rs
+++ b/src/acme/plugin.rs
@@ -13,7 +13,7 @@ use proxmox_acme_rs::{Authorization, Challenge};
 
 use crate::acme::AcmeClient;
 use crate::api2::types::AcmeDomain;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 use crate::config::acme::plugin::{DnsPlugin, PluginData};
 
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index fbb93f35..154a2e84 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -54,7 +54,7 @@ use pbs_tools::blocking::WrappedReaderStream;
 use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
 use pbs_tools::json::{required_integer_param, required_string_param};
 use pbs_config::CachedUserInfo;
-use proxmox_rest_server::formatter;
+use proxmox_rest_server::{WorkerTask, formatter};
 
 use crate::api2::node::rrd::create_value_from_rrd;
 use crate::backup::{
@@ -62,7 +62,7 @@ use crate::backup::{
     DataStore, LocalChunkReader,
 };
 
-use crate::server::{jobstate::Job, WorkerTask};
+use crate::server::jobstate::Job;
 
 
 const GROUP_NOTES_FILE_NAME: &str = "notes";
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 306f91ee..8112737e 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -15,10 +15,10 @@ use pbs_datastore::backup_info::{BackupDir, BackupInfo};
 use pbs_datastore::dynamic_index::DynamicIndexWriter;
 use pbs_datastore::fixed_index::FixedIndexWriter;
 use pbs_api_types::Authid;
-use proxmox_rest_server::formatter::*;
+use proxmox_rest_server::{WorkerTask, formatter::*};
 
 use crate::backup::{verify_backup_dir_with_lock, DataStore};
-use crate::server::WorkerTask;
+
 use hyper::{Body, Response};
 
 #[derive(Copy, Clone, Serialize)]
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index c14f19a4..4333be17 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -23,8 +23,9 @@ use pbs_datastore::PROXMOX_BACKUP_PROTOCOL_ID_V1;
 use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType};
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{WorkerTask, H2Service};
+use crate::server::H2Service;
 use crate::backup::DataStore;
 use pbs_config::CachedUserInfo;
 
diff --git a/src/api2/config/acme.rs b/src/api2/config/acme.rs
index 564cafae..8593acac 100644
--- a/src/api2/config/acme.rs
+++ b/src/api2/config/acme.rs
@@ -24,7 +24,7 @@ use crate::api2::types::{AcmeAccountName, AcmeChallengeSchema, KnownAcmeDirector
 use crate::config::acme::plugin::{
     self, DnsPlugin, DnsPluginCore, DnsPluginCoreUpdater, PLUGIN_ID_SCHEMA,
 };
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub(crate) const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
index 0e6529f8..0f9234ca 100644
--- a/src/api2/config/datastore.rs
+++ b/src/api2/config/datastore.rs
@@ -9,7 +9,6 @@ use proxmox::api::section_config::SectionConfigData;
 use proxmox::api::schema::{ApiType, parse_property_string};
 
 use pbs_datastore::chunk_store::ChunkStore;
-use pbs_datastore::task::TaskState;
 use pbs_config::BackupLockGuard;
 use pbs_api_types::{
     Authid, DatastoreNotify,
@@ -17,6 +16,7 @@ use pbs_api_types::{
     PRIV_DATASTORE_ALLOCATE, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_MODIFY,
     DataStoreConfig, DataStoreConfigUpdater,
 };
+use pbs_tools::task::TaskState;
 
 use crate::api2::config::sync::delete_sync_job;
 use crate::api2::config::verify::delete_verification_job;
@@ -26,8 +26,9 @@ use crate::api2::admin::{
     verify::list_verification_jobs,
 };
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{jobstate, WorkerTask};
+use crate::server::jobstate;
 
 #[api(
     input: {
diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
index f02920c0..4fd81592 100644
--- a/src/api2/node/apt.rs
+++ b/src/api2/node/apt.rs
@@ -19,7 +19,7 @@ use pbs_api_types::{
 };
 
 use crate::config::node;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 use crate::tools::{
     apt,
     pbs_simple_http,
diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs
index 7b31861e..80733fe9 100644
--- a/src/api2/node/certificates.rs
+++ b/src/api2/node/certificates.rs
@@ -18,7 +18,7 @@ use pbs_tools::cert;
 use crate::acme::AcmeClient;
 use crate::api2::types::AcmeDomain;
 use crate::config::node::NodeConfig;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
index 49127586..91369643 100644
--- a/src/api2/node/disks/directory.rs
+++ b/src/api2/node/disks/directory.rs
@@ -17,7 +17,7 @@ use crate::tools::disks::{
 };
 use crate::tools::systemd::{self, types::*};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 const BASE_MOUNT_DIR: &str = "/mnt/datastore/";
 
diff --git a/src/api2/node/disks/mod.rs b/src/api2/node/disks/mod.rs
index b4001a54..f08c340b 100644
--- a/src/api2/node/disks/mod.rs
+++ b/src/api2/node/disks/mod.rs
@@ -15,7 +15,7 @@ use crate::tools::disks::{
     DiskUsageInfo, DiskUsageType, DiskManage, SmartData,
     get_disks, get_smart_data, get_disk_usage_info, inititialize_gpt_disk,
 };
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub mod directory;
 pub mod zfs;
diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
index 8a6cb708..efea9b70 100644
--- a/src/api2/node/disks/zfs.rs
+++ b/src/api2/node/disks/zfs.rs
@@ -19,7 +19,7 @@ use crate::tools::disks::{
     DiskUsageType,
 };
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 
 #[api(
diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
index 8e357311..7a5bb64e 100644
--- a/src/api2/node/mod.rs
+++ b/src/api2/node/mod.rs
@@ -24,7 +24,7 @@ use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_CONSOLE};
 use pbs_tools::auth::private_auth_key;
 use pbs_tools::ticket::{self, Empty, Ticket};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 use crate::tools;
 
 pub mod apt;
diff --git a/src/api2/node/network.rs b/src/api2/node/network.rs
index 0fde9f2a..d496b5f8 100644
--- a/src/api2/node/network.rs
+++ b/src/api2/node/network.rs
@@ -13,7 +13,7 @@ use pbs_api_types::{
 };
 use pbs_config::network::{self, NetworkConfig};
 
-use crate::server::{WorkerTask};
+use proxmox_rest_server::WorkerTask;
 
 fn split_interface_list(list: &str) -> Result<Vec<String>, Error> {
     let value = parse_property_string(&list, &NETWORK_INTERFACE_ARRAY_SCHEMA)?;
diff --git a/src/api2/node/services.rs b/src/api2/node/services.rs
index 6c757f43..8df0fb24 100644
--- a/src/api2/node/services.rs
+++ b/src/api2/node/services.rs
@@ -9,7 +9,7 @@ use proxmox::api::router::SubdirMap;
 
 use pbs_api_types::{Authid, NODE_SCHEMA, SERVICE_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 static SERVICE_NAME_LIST: [&str; 7] = [
     "proxmox-backup",
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index df4673a1..0d2b456c 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -16,7 +16,8 @@ use pbs_api_types::{
 };
 
 use crate::api2::pull::check_pull_privs;
-use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
+
+use proxmox_rest_server::{upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
 use pbs_config::CachedUserInfo;
 
 // matches respective job execution privileges
@@ -125,6 +126,25 @@ pub fn tasktype(state: &TaskState) -> TaskStateType {
     }
 }
 
+fn into_task_list_item(info: proxmox_rest_server::TaskListInfo) -> pbs_api_types::TaskListItem {
+    let (endtime, status) = info
+        .state
+        .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
+
+    pbs_api_types::TaskListItem {
+        upid: info.upid_str,
+        node: "localhost".to_string(),
+        pid: info.upid.pid as i64,
+        pstart: info.upid.pstart,
+        starttime: info.upid.starttime,
+        worker_type: info.upid.worker_type,
+        worker_id: info.upid.worker_id,
+        user: info.upid.auth_id,
+        endtime,
+        status,
+    }
+}
+
 #[api(
     input: {
         properties: {
@@ -217,7 +237,7 @@ async fn get_task_status(
         result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
     }
 
-    if crate::server::worker_is_active(&upid).await? {
+    if proxmox_rest_server::worker_is_active(&upid).await? {
         result["status"] = Value::from("running");
     } else {
         let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
@@ -314,7 +334,7 @@ async fn read_task_log(
     rpcenv["total"] = Value::from(count);
 
     if test_status {
-        let active = crate::server::worker_is_active(&upid).await?;
+        let active = proxmox_rest_server::worker_is_active(&upid).await?;
         rpcenv["active"] = Value::from(active);
     }
 
@@ -354,7 +374,7 @@ fn stop_task(
         user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
     }
 
-    server::abort_worker_async(upid);
+    proxmox_rest_server::abort_worker_async(upid);
 
     Ok(Value::Null)
 }
@@ -502,7 +522,7 @@ pub fn list_tasks(
 
         match (&info.state, &statusfilter) {
             (Some(_), _) if running => continue,
-            (Some(crate::server::TaskState::OK { .. }), _) if errors => continue,
+            (Some(TaskState::OK { .. }), _) if errors => continue,
             (Some(state), Some(filters)) => {
                 if !filters.contains(&tasktype(state)) {
                     continue;
@@ -517,7 +537,7 @@ pub fn list_tasks(
             continue;
         }
 
-        result.push(info.into());
+        result.push(into_task_list_item(info));
 
         if result.len() >= limit {
             break;
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e631920f..0240098d 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -13,8 +13,9 @@ use pbs_api_types::{
     DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
 };
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{WorkerTask, jobstate::Job, pull::pull_store};
+use crate::server::{jobstate::Job, pull::pull_store};
 use crate::backup::DataStore;
 use pbs_config::CachedUserInfo;
 
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index f7d79072..c85ec069 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -10,7 +10,7 @@ use pbs_api_types::Authid;
 use proxmox_rest_server::formatter::*;
 
 use crate::backup::DataStore;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 //use proxmox::tools;
 
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index fada952c..c663e9ae 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -39,12 +39,12 @@ use pbs_datastore::backup_info::BackupDir;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType};
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     api2::helpers,
     backup::DataStore,
     server::{
-        WorkerTask,
         H2Service,
     },
 };
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index fadbfa3d..5effa99d 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -20,10 +20,11 @@ use pbs_api_types::{
     UPID_SCHEMA, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE,
 };
 
-use pbs_datastore::{task_log, task_warn, StoreProgress};
+use pbs_datastore::StoreProgress;
 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
-use pbs_datastore::task::TaskState;
+use pbs_tools::{task_log, task_warn, task::TaskState};
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     server::{
@@ -36,7 +37,6 @@ use crate::{
         },
     },
     backup::{DataStore, SnapshotReader},
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         Inventory,
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index 10aa6842..8227f659 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -28,7 +28,6 @@ use pbs_api_types::{
     LtoDriveAndMediaStatus, Lp17VolumeStatistics,
 };
  
-use pbs_datastore::task_log;
 use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE};
 use pbs_config::CachedUserInfo;
 use pbs_tape::{
@@ -36,13 +35,14 @@ use pbs_tape::{
     sg_tape::tape_alert_flags_critical,
     linux_list_drives::{lto_tape_device_list, lookup_device_identification, open_lto_tape_device},
 };
+use pbs_tools::task_log;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     api2::tape::restore::{
         fast_catalog_restore,
         restore_media,
     },
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         Inventory,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 7739d1a4..045d8d6c 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -34,26 +34,24 @@ use pbs_api_types::{
     UPID_SCHEMA, TAPE_RESTORE_SNAPSHOT_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_TAPE_READ,
 };
-use pbs_datastore::{task_log, task_warn, DataBlob};
+use pbs_datastore::DataBlob;
 use pbs_datastore::backup_info::BackupDir;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
-use pbs_datastore::task::TaskState;
 use pbs_config::CachedUserInfo;
 use pbs_tape::{
     TapeRead, BlockReadError, MediaContentHeader,
     PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0,
 };
+use pbs_tools::{task_log, task_warn, task::TaskState};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     tools::ParallelHandler,
     backup::DataStore,
-    server::{
-        lookup_user_email,
-        WorkerTask,
-    },
+    server::lookup_user_email,
     tape::{
         TAPE_STATUS_DIR,
         MediaId,
diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs
index df8d46b6..fcef2d39 100644
--- a/src/backup/datastore.rs
+++ b/src/backup/datastore.rs
@@ -12,7 +12,6 @@ use lazy_static::lazy_static;
 use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions};
 
 use pbs_api_types::{UPID, DataStoreConfig, Authid, GarbageCollectionStatus};
-use pbs_datastore::{task_log, task_warn};
 use pbs_datastore::DataBlob;
 use pbs_datastore::backup_info::{BackupGroup, BackupDir};
 use pbs_datastore::chunk_store::ChunkStore;
@@ -24,10 +23,10 @@ use pbs_datastore::manifest::{
     ArchiveType, BackupManifest,
     archive_type,
 };
-use pbs_datastore::task::TaskState;
 use pbs_tools::format::HumanByte;
 use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
 use pbs_tools::process_locker::ProcessLockSharedGuard;
+use pbs_tools::{task_log, task_warn, task::TaskState};
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::fail_on_shutdown;
 
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index b8d2b2f3..051d6918 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -7,12 +7,12 @@ use std::time::Instant;
 use anyhow::{bail, format_err, Error};
 
 use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
-use pbs_datastore::{task_log, DataBlob, StoreProgress};
+use pbs_datastore::{DataBlob, StoreProgress};
 use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
-use pbs_datastore::task::TaskState;
 use pbs_tools::fs::lock_dir_noblock_shared;
+use pbs_tools::{task_log, task::TaskState};
 
 use crate::{
     backup::DataStore,
diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
index 9901b85d..86650de6 100644
--- a/src/bin/proxmox-backup-api.rs
+++ b/src/bin/proxmox-backup-api.rs
@@ -10,14 +10,9 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::tools::fs::CreateOptions;
 
 use pbs_tools::auth::private_auth_key;
-use proxmox_rest_server::{ApiConfig, RestServer};
-
-use proxmox_backup::server::{
-    self,
-    auth::default_api_auth,
-};
-use proxmox_rest_server::daemon;
+use proxmox_rest_server::{daemon, ApiConfig, RestServer};
 
+use proxmox_backup::server::auth::default_api_auth;
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::config;
 
@@ -86,7 +81,7 @@ async fn run() -> Result<(), Error> {
     )?;
 
     let backup_user = pbs_config::backup_user()?;
-    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
 
     let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
     let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
@@ -107,7 +102,7 @@ async fn run() -> Result<(), Error> {
 
 
     let rest_server = RestServer::new(config);
-    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     // http server future:
     let server = daemon::create_daemon(
@@ -130,11 +125,11 @@ async fn run() -> Result<(), Error> {
         "proxmox-backup.service",
     );
 
-    server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::register_task_control_commands(&mut commando_sock)?;
+        proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
         proxmox_rest_server::server_state_init()?;
         Ok(())
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 689f44db..b9e4e2ff 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -14,9 +14,10 @@ use pbs_api_types::{
     IGNORE_VERIFIED_BACKUPS_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
 };
 
+use proxmox_rest_server::wait_for_local_worker;
+
 use proxmox_backup::config;
 use proxmox_backup::api2;
-use proxmox_backup::server::wait_for_local_worker;
 
 mod proxmox_backup_manager;
 use proxmox_backup_manager::*;
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 5d8ed189..ec4da15b 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -19,18 +19,16 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::sys::linux::socket::set_tcp_keepalive;
 use proxmox::tools::fs::CreateOptions;
 
-use proxmox_rest_server::{ApiConfig, RestServer};
+use proxmox_rest_server::{rotate_task_log_archive, ApiConfig, RestServer, WorkerTask};
 
 use proxmox_backup::{
     backup::DataStore,
     server::{
         auth::default_api_auth,
-        WorkerTask,
         jobstate::{
             self,
             Job,
         },
-        rotate_task_log_archive,
     },
 };
 
@@ -188,7 +186,7 @@ async fn run() -> Result<(), Error> {
     config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
 
     let backup_user = pbs_config::backup_user()?;
-    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
 
     let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
     let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
@@ -208,7 +206,7 @@ async fn run() -> Result<(), Error> {
     )?;
 
     let rest_server = RestServer::new(config);
-    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
 
@@ -266,11 +264,11 @@ async fn run() -> Result<(), Error> {
         "proxmox-backup-proxy.service",
     );
 
-    server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::register_task_control_commands(&mut commando_sock)?;
+        proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
         proxmox_rest_server::server_state_init()?;
         Ok(())
@@ -806,11 +804,11 @@ async fn schedule_task_log_rotate() {
 async fn command_reopen_access_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
-    let sock = crate::server::our_ctrl_sock();
+    let sock = proxmox_rest_server::our_ctrl_sock();
     let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
-    let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(pid);
+    let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
     let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
@@ -824,11 +822,11 @@ async fn command_reopen_access_logfiles() -> Result<(), Error> {
 async fn command_reopen_auth_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
-    let sock = crate::server::our_ctrl_sock();
+    let sock = proxmox_rest_server::our_ctrl_sock();
     let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
-    let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(pid);
+    let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
     let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
diff --git a/src/bin/proxmox-daily-update.rs b/src/bin/proxmox-daily-update.rs
index c1580b97..09a768b1 100644
--- a/src/bin/proxmox-daily-update.rs
+++ b/src/bin/proxmox-daily-update.rs
@@ -11,7 +11,7 @@ async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
     let sleep_duration = core::time::Duration::new(0, 100_000_000);
 
     loop {
-        if !proxmox_backup::server::worker_is_active_local(&upid) {
+        if !proxmox_rest_server::worker_is_active_local(&upid) {
             break;
         }
         tokio::time::sleep(sleep_duration).await;
diff --git a/src/bin/proxmox_backup_debug/api.rs b/src/bin/proxmox_backup_debug/api.rs
index bebe9ddc..003f6677 100644
--- a/src/bin/proxmox_backup_debug/api.rs
+++ b/src/bin/proxmox_backup_debug/api.rs
@@ -235,12 +235,12 @@ async fn handle_worker(upid_str: &str) -> Result<(), Error> {
     let abort_future = async move {
         while signal_stream.recv().await.is_some() {
             println!("got shutdown request (SIGINT)");
-            proxmox_backup::server::abort_local_worker(upid.clone());
+            proxmox_rest_server::abort_local_worker(upid.clone());
         }
         Ok::<_, Error>(())
     };
 
-    let result_future = proxmox_backup::server::wait_for_local_worker(upid_str);
+    let result_future = proxmox_rest_server::wait_for_local_worker(upid_str);
 
     futures::select! {
         result = result_future.fuse() => result?,
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 317f4a36..608b5831 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -2,9 +2,9 @@ use std::sync::Arc;
 use anyhow::Error;
 
 use pbs_api_types::Authid;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::WorkerTask,
     server::jobstate::Job,
     backup::DataStore,
 };
diff --git a/src/server/h2service.rs b/src/server/h2service.rs
index 41d628be..0b51a710 100644
--- a/src/server/h2service.rs
+++ b/src/server/h2service.rs
@@ -11,11 +11,9 @@ use hyper::{Body, Request, Response, StatusCode};
 use proxmox::api::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
 use proxmox::http_err;
 
-use proxmox_rest_server::normalize_uri_path;
+use proxmox_rest_server::{normalize_uri_path, WorkerTask};
 use proxmox_rest_server::formatter::*;
 
-use crate::server::WorkerTask;
-
 /// Hyper Service implementation to handle stateful H2 connections.
 ///
 /// We use this kind of service to handle backup protocol
diff --git a/src/server/jobstate.rs b/src/server/jobstate.rs
index 74224f33..ed71ec71 100644
--- a/src/server/jobstate.rs
+++ b/src/server/jobstate.rs
@@ -50,11 +50,7 @@ use proxmox_systemd::time::{compute_next_event, parse_calendar_event};
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use pbs_api_types::{UPID, JobScheduleStatus};
 
-use crate::server::{
-    TaskState,
-    upid_read_status,
-    worker_is_active_local,
-};
+use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
 
 #[derive(Serialize, Deserialize)]
 #[serde(rename_all = "kebab-case")]
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 77320da6..96d57bd4 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -4,51 +4,13 @@
 //! services. We want async IO, so this is built on top of
 //! tokio/hyper.
 
-use anyhow::{format_err, Error};
-use lazy_static::lazy_static;
-use nix::unistd::Pid;
+use anyhow::Error;
 use serde_json::Value;
 
-use proxmox::sys::linux::procfs::PidStat;
 use proxmox::tools::fs::{create_path, CreateOptions};
 
 use pbs_buildcfg;
 
-lazy_static! {
-    static ref PID: i32 = unsafe { libc::getpid() };
-    static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
-}
-
-pub fn pid() -> i32 {
-    *PID
-}
-
-pub fn pstart() -> u64 {
-    *PSTART
-}
-
-pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
-    let pid_str = format!("{}\n", *PID);
-    proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
-}
-
-pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
-    let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
-    let pid = std::str::from_utf8(&pid)?.trim();
-    pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
-}
-
-pub fn ctrl_sock_from_pid(pid: i32) -> String {
-    format!("\0{}/control-{}.sock", pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, pid)
-}
-
-pub fn our_ctrl_sock() -> String {
-    ctrl_sock_from_pid(*PID)
-}
-
-mod worker_task;
-pub use worker_task::*;
-
 mod h2service;
 pub use h2service::*;
 
@@ -76,16 +38,16 @@ pub mod auth;
 pub mod pull;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
-    let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+    let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
     let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
         .await?;
     Ok(())
 }
 
 pub(crate) async fn notify_datastore_removed() -> Result<(), Error> {
-    let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+    let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
     let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
         .await?;
     Ok(())
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 8d971a1c..53740187 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -2,17 +2,17 @@ use std::sync::Arc;
 
 use anyhow::Error;
 
-use pbs_datastore::{task_log, task_warn};
 use pbs_datastore::backup_info::BackupInfo;
 use pbs_datastore::prune::compute_prune_info;
 use pbs_api_types::{Authid, PRIV_DATASTORE_MODIFY, PruneOptions};
 use pbs_config::CachedUserInfo;
+use pbs_tools::{task_log, task_warn};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::DataStore,
     server::jobstate::Job,
-    server::WorkerTask,
-};
+ };
 
 pub fn prune_datastore(
     worker: Arc<WorkerTask>,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5214a218..f913ac8a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -13,7 +13,7 @@ use serde_json::json;
 use proxmox::api::error::{HttpError, StatusCode};
 
 use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_datastore::{BackupInfo, BackupDir, BackupGroup, StoreProgress};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -22,11 +22,12 @@ use pbs_datastore::manifest::{
     CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
 };
 use pbs_tools::sha::sha256;
+use pbs_tools::task_log;
 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::DataStore,
-    server::WorkerTask,
     tools::ParallelHandler,
 };
 
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index 6005b706..62fa6fa8 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -1,10 +1,10 @@
 use anyhow::{format_err, Error};
 
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
 use pbs_api_types::{Authid, VerificationJobConfig};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::WorkerTask,
     server::jobstate::Job,
     backup::{
         DataStore,
diff --git a/src/tape/drive/mod.rs b/src/tape/drive/mod.rs
index ef5ffdbf..e8e60d19 100644
--- a/src/tape/drive/mod.rs
+++ b/src/tape/drive/mod.rs
@@ -30,19 +30,16 @@ use proxmox::{
 
 use pbs_api_types::{VirtualTapeDrive, LtoTapeDrive, Fingerprint};
 use pbs_config::key_config::KeyConfig;
-use pbs_datastore::task::TaskState;
-use pbs_datastore::task_log;
+use pbs_tools::{task_log, task::TaskState};
 
 use pbs_tape::{
     TapeWrite, TapeRead, BlockReadError, MediaContentHeader,
     sg_tape::TapeAlertFlags,
 };
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::{
-        send_load_media_email,
-        WorkerTask,
-    },
+    server::send_load_media_email,
     tape::{
         MediaId,
         drive::{
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 8042de9e..2984173f 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -13,16 +13,16 @@ use anyhow::{bail, Error};
 
 use proxmox::tools::Uuid;
 
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
 use pbs_config::tape_encryption_keys::load_key_configs;
 use pbs_tape::{
     TapeWrite,
     sg_tape::tape_alert_flags_critical,
 };
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::{DataStore, SnapshotReader},
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         MAX_CHUNK_ARCHIVE_SIZE,
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
                   ` (2 preceding siblings ...)
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: " Dietmar Maurer
  2021-09-23 13:20 ` [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Fabian Grünbichler
  5 siblings, 0 replies; 9+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

---
 src/bin/proxmox-daily-update.rs | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/src/bin/proxmox-daily-update.rs b/src/bin/proxmox-daily-update.rs
index 09a768b1..2a04a559 100644
--- a/src/bin/proxmox-daily-update.rs
+++ b/src/bin/proxmox-daily-update.rs
@@ -1,7 +1,8 @@
 use anyhow::Error;
-use serde_json::{json, Value};
+use serde_json::json;
 
 use proxmox::api::{cli::*, RpcEnvironment, ApiHandler};
+use proxmox::tools::fs::CreateOptions;
 
 use proxmox_backup::api2;
 use proxmox_backup::tools::subscription;
@@ -22,7 +23,7 @@ async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
 /// Daily update
 async fn do_update(
     rpcenv: &mut dyn RpcEnvironment,
-) -> Result<Value, Error> {
+) -> Result<(), Error> {
     let param = json!({});
 
     let method = &api2::node::subscription::API_METHOD_CHECK_SUBSCRIPTION;
@@ -59,7 +60,7 @@ async fn do_update(
 
     // TODO: cleanup tasks like in PVE?
 
-    Ok(Value::Null)
+    Ok(())
 }
 
 async fn check_acme_certificates(rpcenv: &mut dyn RpcEnvironment) -> Result<(), Error> {
@@ -85,13 +86,25 @@ async fn check_acme_certificates(rpcenv: &mut dyn RpcEnvironment) -> Result<(),
     Ok(())
 }
 
+async fn run(rpcenv: &mut dyn RpcEnvironment) -> Result<(), Error> {
+    let backup_user = pbs_config::backup_user()?;
+    let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
+    proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
+    commando_sock.spawn()?;
+
+    do_update(rpcenv).await
+}
+
 fn main() {
     proxmox_backup::tools::setup_safe_path_env();
 
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-    if let Err(err) = pbs_runtime::main(do_update(&mut rpcenv)) {
+    if let Err(err) = pbs_runtime::main(run(&mut rpcenv)) {
         eprintln!("error during update: {}", err);
         std::process::exit(1);
     }
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: setup worker and command socket
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
                   ` (3 preceding siblings ...)
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket Dietmar Maurer
@ 2021-09-23 10:13 ` Dietmar Maurer
  2021-09-23 13:20 ` [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Fabian Grünbichler
  5 siblings, 0 replies; 9+ messages in thread
From: Dietmar Maurer @ 2021-09-23 10:13 UTC (permalink / raw)
  To: pbs-devel

---
 src/bin/proxmox-backup-manager.rs | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b9e4e2ff..a03e1db8 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -5,6 +5,7 @@ use anyhow::{format_err, Error};
 use serde_json::{json, Value};
 
 use proxmox::api::{api, cli::*, RpcEnvironment};
+use proxmox::tools::fs::CreateOptions;
 
 use pbs_client::{connect_to_localhost, display_task_log, view_task_result};
 use pbs_tools::percent_encoding::percent_encode_component;
@@ -359,9 +360,7 @@ async fn get_versions(verbose: bool, param: Value) -> Result<Value, Error> {
     Ok(Value::Null)
 }
 
-fn main() {
-
-    proxmox_backup::tools::setup_safe_path_env();
+async fn run() -> Result<(), Error> {
 
     let cmd_def = CliCommandMap::new()
         .insert("acl", acl_commands())
@@ -401,12 +400,27 @@ fn main() {
             CliCommand::new(&API_METHOD_GET_VERSIONS)
         );
 
+    let backup_user = pbs_config::backup_user()?;
+    let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
+    proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
+    commando_sock.spawn()?;
 
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-   pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
+    run_async_cli_command(cmd_def, rpcenv).await; // this call exit(-1) on error
+
+    Ok(())
+}
+
+fn main() -> Result<(), Error> {
+
+    proxmox_backup::tools::setup_safe_path_env();
+
+    pbs_runtime::main(run())
 }
 
 // shell completion helper
-- 
2.30.2





^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
@ 2021-09-23 11:36   ` Fabian Grünbichler
  0 siblings, 0 replies; 9+ messages in thread
From: Fabian Grünbichler @ 2021-09-23 11:36 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On September 23, 2021 12:13 pm, Dietmar Maurer wrote:
> And application now needs to call init_worker_tasks() before using
> worker tasks.
> 
> Notable changes:
> - need to call  init_worker_tasks() before using worker tasks.
> - create_task_log_dirs() ís called inside init_worker_tasks()
> - removed UpidExt trait
> - use atomic_open_or_create_file()
> - remove pbs_config and pbs_buildcfg dependency
> ---
>  src/api2/node/tasks.rs          |   6 +-
>  src/bin/proxmox-backup-api.rs   |   7 +-
>  src/bin/proxmox-backup-proxy.rs |   5 +-
>  src/server/mod.rs               |   3 -
>  src/server/upid.rs              |  18 --
>  src/server/worker_task.rs       | 475 +++++++++++++++++++-------------
>  6 files changed, 290 insertions(+), 224 deletions(-)
>  delete mode 100644 src/server/upid.rs
> 
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index 169a3d4d..df4673a1 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -16,7 +16,7 @@ use pbs_api_types::{
>  };
>  
>  use crate::api2::pull::check_pull_privs;
> -use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
> +use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
>  use pbs_config::CachedUserInfo;
>  
>  // matches respective job execution privileges
> @@ -220,7 +220,7 @@ async fn get_task_status(
>      if crate::server::worker_is_active(&upid).await? {
>          result["status"] = Value::from("running");
>      } else {
> -        let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
> +        let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
>          result["status"] = Value::from("stopped");
>          result["exitstatus"] = Value::from(exitstatus.to_string());
>      };
> @@ -287,7 +287,7 @@ async fn read_task_log(
>  
>      let mut count: u64 = 0;
>  
> -    let path = upid.log_path();
> +    let path = upid_log_path(&upid)?;
>  
>      let file = File::open(path)?;
>  
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 9ad10260..9901b85d 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -54,8 +54,6 @@ async fn run() -> Result<(), Error> {
>          bail!("unable to inititialize syslog - {}", err);
>      }
>  
> -    server::create_task_log_dirs()?;
> -
>      config::create_configdir()?;
>  
>      config::update_self_signed_cert(false)?;
> @@ -102,13 +100,14 @@ async fn run() -> Result<(), Error> {
>  
>      config.enable_auth_log(
>          pbs_buildcfg::API_AUTH_LOG_FN,
> -        Some(dir_opts),
> -        Some(file_opts),
> +        Some(dir_opts.clone()),
> +        Some(file_opts.clone()),
>          &mut commando_sock,
>      )?;
>  
>  
>      let rest_server = RestServer::new(config);
> +    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>  
>      // http server future:
>      let server = daemon::create_daemon(
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 518054bf..5d8ed189 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -202,12 +202,13 @@ async fn run() -> Result<(), Error> {
>  
>      config.enable_auth_log(
>          pbs_buildcfg::API_AUTH_LOG_FN,
> -        Some(dir_opts),
> -        Some(file_opts),
> +        Some(dir_opts.clone()),
> +        Some(file_opts.clone()),
>          &mut commando_sock,
>      )?;
>  
>      let rest_server = RestServer::new(config);
> +    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
>  
>      //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
>  
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index a7dcee67..77320da6 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -46,9 +46,6 @@ pub fn our_ctrl_sock() -> String {
>      ctrl_sock_from_pid(*PID)
>  }
>  
> -mod upid;
> -pub use upid::*;
> -
>  mod worker_task;
>  pub use worker_task::*;
>  
> diff --git a/src/server/upid.rs b/src/server/upid.rs
> deleted file mode 100644
> index 70a3e3fb..00000000
> --- a/src/server/upid.rs
> +++ /dev/null
> @@ -1,18 +0,0 @@
> -pub trait UPIDExt: private::Sealed {
> -    /// Returns the absolute path to the task log file
> -    fn log_path(&self) -> std::path::PathBuf;
> -}
> -
> -mod private {
> -    pub trait Sealed {}
> -    impl Sealed for  pbs_api_types::UPID {}
> -}
> -
> -impl UPIDExt for  pbs_api_types::UPID {
> -    fn log_path(&self) -> std::path::PathBuf {
> -        let mut path = std::path::PathBuf::from(super::PROXMOX_BACKUP_TASK_DIR);
> -        path.push(format!("{:02X}", self.pstart % 256));
> -        path.push(self.to_string());
> -        path
> -    }
> -}
> diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
> index 92ab50d7..191d8a44 100644
> --- a/src/server/worker_task.rs
> +++ b/src/server/worker_task.rs
> @@ -1,5 +1,6 @@
>  use std::collections::{HashMap, VecDeque};
>  use std::fs::File;
> +use std::path::PathBuf;
>  use std::io::{Read, Write, BufRead, BufReader};
>  use std::panic::UnwindSafe;
>  use std::sync::atomic::{AtomicBool, Ordering};
> @@ -11,27 +12,267 @@ use lazy_static::lazy_static;
>  use serde_json::{json, Value};
>  use serde::{Serialize, Deserialize};
>  use tokio::sync::oneshot;
> +use nix::fcntl::OFlag;
> +use once_cell::sync::OnceCell;
>  
>  use proxmox::sys::linux::procfs;
>  use proxmox::try_block;
> -use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
> +use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
>  
> -use pbs_buildcfg;
>  use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
>  use pbs_api_types::UPID;
> -use pbs_config::{open_backup_lockfile, BackupLockGuard};
>  use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
>  
> -use super::UPIDExt;
> +struct TaskListLockGuard(File);
>  
> -macro_rules! taskdir {
> -    ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
> +struct WorkerTaskSetup {
> +    file_opts: CreateOptions,
> +    taskdir: PathBuf,
> +    task_lock_fn: PathBuf,
> +    active_tasks_fn: PathBuf,
> +    task_index_fn: PathBuf,
> +    task_archive_fn: PathBuf,
> +}
> +
> +static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
> +
> +fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
> +    WORKER_TASK_SETUP.get()
> +        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
> +}
> +
> +impl WorkerTaskSetup {
> +
> +    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
> +
> +        let mut taskdir = basedir.clone();
> +        taskdir.push("tasks");
> +
> +        let mut task_lock_fn = taskdir.clone();
> +        task_lock_fn.push(".active.lock");
> +
> +        let mut active_tasks_fn = taskdir.clone();
> +        active_tasks_fn.push("active");
> +
> +        let mut task_index_fn = taskdir.clone();
> +        task_index_fn.push("index");
> +
> +        let mut task_archive_fn = taskdir.clone();
> +        task_archive_fn.push("archive");
> +
> +        Self {
> +            file_opts,
> +            taskdir,
> +            task_lock_fn,
> +            active_tasks_fn,
> +            task_index_fn,
> +            task_archive_fn,
> +        }
> +    }
> +
> +    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {

since we touch/move this (and thus have to touch all call sites), we 
could take this opportunity to move the locked operations including 
access to the active_task_fn, task_index_fn and task_archive_fn struct 
members to the lock guard (and maybe split it into 
exclusive/non-exclusive guards?) to make misuse impossible? AFAICT all 
the current access is done while holding the lock.

can of course also be done as follow-up in some generic fashion since 
this is a recurring problem, just struck me while reading through the 
rest of the changes.

> +        let options =  self.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +        let timeout = std::time::Duration::new(10, 0);
> +
> +        let file = proxmox::tools::fs::open_file_locked(
> +            &self.task_lock_fn,
> +            timeout,
> +            exclusive,
> +            options,
> +        )?;
> +
> +        Ok(TaskListLockGuard(file))
> +    }
> +
> +    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
> +        let mut path = self.taskdir.clone();
> +        path.push(format!("{:02X}", upid.pstart % 256));
> +        path.push(upid.to_string());
> +        path
> +    }
> +
> +    // atomically read/update the task list, update status of finished tasks
> +    // new_upid is added to the list when specified.
> +    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
> +
> +        let lock = self.lock_task_list_files(true)?;
> +
> +        // TODO remove with 1.x
> +        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
> +        let had_index_file = !finish_list.is_empty();
> +
> +        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> +        // clippy doesn't quite catch this!
> +        #[allow(clippy::unnecessary_filter_map)]
> +        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
> +            .into_iter()
> +            .filter_map(|info| {
> +                if info.state.is_some() {
> +                    // this can happen when the active file still includes finished tasks
> +                    finish_list.push(info);
> +                    return None;
> +                }
> +
> +                if !worker_is_active_local(&info.upid) {
> +                    // println!("Detected stopped task '{}'", &info.upid_str);
> +                    let now = proxmox::tools::time::epoch_i64();
> +                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> +                    finish_list.push(TaskListInfo {
> +                        upid: info.upid,
> +                        upid_str: info.upid_str,
> +                        state: Some(status)
> +                    });
> +                    return None;
> +                }
> +
> +                Some(info)
> +            }).collect();
> +
> +        if let Some(upid) = new_upid {
> +            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> +        }
> +
> +        let active_raw = render_task_list(&active_list);
> +
> +        let options =  self.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +        replace_file(
> +            &self.active_tasks_fn,
> +            active_raw.as_bytes(),
> +            options,
> +        )?;
> +
> +        finish_list.sort_unstable_by(|a, b| {
> +            match (&a.state, &b.state) {
> +                (Some(s1), Some(s2)) => s1.cmp(&s2),
> +                (Some(_), None) => std::cmp::Ordering::Less,
> +                (None, Some(_)) => std::cmp::Ordering::Greater,
> +                _ => a.upid.starttime.cmp(&b.upid.starttime),
> +            }
> +        });
> +
> +        if !finish_list.is_empty() {
> +            let options =  self.file_opts.clone()
> +                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +            let mut writer = atomic_open_or_create_file(
> +                &self.task_archive_fn,
> +                OFlag::O_APPEND | OFlag::O_RDWR,
> +                &[],
> +                options,
> +            )?;
> +            for info in &finish_list {
> +                writer.write_all(render_task_line(&info).as_bytes())?;
> +            }
> +        }
> +
> +        // TODO Remove with 1.x
> +        // for compatibility, if we had an INDEX file, we do not need it anymore
> +        if had_index_file {
> +            let _ = nix::unistd::unlink(&self.task_index_fn);
> +        }
> +
> +        drop(lock);
> +
> +        Ok(())
> +    }
> +
> +    // Create task log directory with correct permissions
> +    fn create_task_log_dirs(&self) -> Result<(), Error> {
> +
> +        try_block!({
> +            let dir_opts = self.file_opts.clone()
> +                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
> +
> +            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
> +            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> +            Ok(())
> +        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
> +    }
> +}
> +
> +/// Initialize the WorkerTask library
> +pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
> +    let setup = WorkerTaskSetup::new(basedir, file_opts);
> +    setup.create_task_log_dirs()?;
> +    WORKER_TASK_SETUP.set(setup)
> +        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
> +}
> +
> +/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> +/// rotates it if it is
> +pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> +
> +    let setup = worker_task_setup()?;
> +
> +    let _lock = setup.lock_task_list_files(true)?;
> +
> +    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
> +            .ok_or_else(|| format_err!("could not get archive file names"))?;
> +
> +    logrotate.rotate(size_threshold, None, max_files)
> +}
> +
> +
> +/// Path to the worker log file
> +pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
> +    let setup = worker_task_setup()?;
> +    Ok(setup.log_path(upid))
> +}
> +
> +/// Read endtime (time of last log line) and exitstatus from task log file
> +/// If there is not a single line with at valid datetime, we assume the
> +/// starttime to be the endtime
> +pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> +
> +    let setup = worker_task_setup()?;
> +
> +    let mut status = TaskState::Unknown { endtime: upid.starttime };
> +
> +    let path = setup.log_path(upid);
> +
> +    let mut file = File::open(path)?;
> +
> +    /// speedup - only read tail
> +    use std::io::Seek;
> +    use std::io::SeekFrom;
> +    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> +
> +    let mut data = Vec::with_capacity(8192);
> +    file.read_to_end(&mut data)?;
> +
> +    // strip newlines at the end of the task logs
> +    while data.last() == Some(&b'\n') {
> +        data.pop();
> +    }
> +
> +    let last_line = match data.iter().rposition(|c| *c == b'\n') {
> +        Some(start) if data.len() > (start+1) => &data[start+1..],
> +        Some(_) => &data, // should not happen, since we removed all trailing newlines
> +        None => &data,
> +    };
> +
> +    let last_line = std::str::from_utf8(last_line)
> +        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> +
> +    let mut iter = last_line.splitn(2, ": ");
> +    if let Some(time_str) = iter.next() {
> +        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> +            // set the endtime even if we cannot parse the state
> +            status = TaskState::Unknown { endtime };
> +            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> +                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> +                    status = state;
> +                }
> +            }
> +        }
> +    }
> +
> +    Ok(status)
>  }
> -pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
> -pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
> -pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
> -pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
> -pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
>  
>  lazy_static! {
>      static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
> @@ -152,73 +393,6 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskStat
>      }
>  }
>  
> -/// Create task log directory with correct permissions
> -pub fn create_task_log_dirs() -> Result<(), Error> {
> -
> -    try_block!({
> -        let backup_user = pbs_config::backup_user()?;
> -        let opts = CreateOptions::new()
> -            .owner(backup_user.uid)
> -            .group(backup_user.gid);
> -
> -        create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
> -        create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
> -        create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
> -        Ok(())
> -    }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
> -
> -    Ok(())
> -}
> -
> -/// Read endtime (time of last log line) and exitstatus from task log file
> -/// If there is not a single line with at valid datetime, we assume the
> -/// starttime to be the endtime
> -pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
> -
> -    let mut status = TaskState::Unknown { endtime: upid.starttime };
> -
> -    let path = upid.log_path();
> -
> -    let mut file = File::open(path)?;
> -
> -    /// speedup - only read tail
> -    use std::io::Seek;
> -    use std::io::SeekFrom;
> -    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
> -
> -    let mut data = Vec::with_capacity(8192);
> -    file.read_to_end(&mut data)?;
> -
> -    // strip newlines at the end of the task logs
> -    while data.last() == Some(&b'\n') {
> -        data.pop();
> -    }
> -
> -    let last_line = match data.iter().rposition(|c| *c == b'\n') {
> -        Some(start) if data.len() > (start+1) => &data[start+1..],
> -        Some(_) => &data, // should not happen, since we removed all trailing newlines
> -        None => &data,
> -    };
> -
> -    let last_line = std::str::from_utf8(last_line)
> -        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
> -
> -    let mut iter = last_line.splitn(2, ": ");
> -    if let Some(time_str) = iter.next() {
> -        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
> -            // set the endtime even if we cannot parse the state
> -            status = TaskState::Unknown { endtime };
> -            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
> -                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
> -                    status = state;
> -                }
> -            }
> -        }
> -    }
> -
> -    Ok(status)
> -}
> -
>  /// Task State
>  #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
>  pub enum TaskState {
> @@ -323,107 +497,6 @@ impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
>      }
>  }
>  
> -fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
> -    open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
> -}
> -
> -/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
> -/// rotates it if it is
> -pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
> -    let _lock = lock_task_list_files(true)?;
> -
> -    let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
> -        .ok_or_else(|| format_err!("could not get archive file names"))?;
> -
> -    logrotate.rotate(size_threshold, None, max_files)
> -}
> -
> -// atomically read/update the task list, update status of finished tasks
> -// new_upid is added to the list when specified.
> -fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
> -
> -    let backup_user = pbs_config::backup_user()?;
> -
> -    let lock = lock_task_list_files(true)?;
> -
> -    // TODO remove with 1.x
> -    let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
> -    let had_index_file = !finish_list.is_empty();
> -
> -    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
> -    // clippy doesn't quite catch this!
> -    #[allow(clippy::unnecessary_filter_map)]
> -    let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
> -        .into_iter()
> -        .filter_map(|info| {
> -            if info.state.is_some() {
> -                // this can happen when the active file still includes finished tasks
> -                finish_list.push(info);
> -                return None;
> -            }
> -
> -            if !worker_is_active_local(&info.upid) {
> -                // println!("Detected stopped task '{}'", &info.upid_str);
> -                let now = proxmox::tools::time::epoch_i64();
> -                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
> -                finish_list.push(TaskListInfo {
> -                    upid: info.upid,
> -                    upid_str: info.upid_str,
> -                    state: Some(status)
> -                });
> -                return None;
> -            }
> -
> -            Some(info)
> -        }).collect();
> -
> -    if let Some(upid) = new_upid {
> -        active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
> -    }
> -
> -    let active_raw = render_task_list(&active_list);
> -
> -    replace_file(
> -        PROXMOX_BACKUP_ACTIVE_TASK_FN,
> -        active_raw.as_bytes(),
> -        CreateOptions::new()
> -            .owner(backup_user.uid)
> -            .group(backup_user.gid),
> -    )?;
> -
> -    finish_list.sort_unstable_by(|a, b| {
> -        match (&a.state, &b.state) {
> -            (Some(s1), Some(s2)) => s1.cmp(&s2),
> -            (Some(_), None) => std::cmp::Ordering::Less,
> -            (None, Some(_)) => std::cmp::Ordering::Greater,
> -            _ => a.upid.starttime.cmp(&b.upid.starttime),
> -        }
> -    });
> -
> -    if !finish_list.is_empty() {
> -        match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
> -            Ok(mut writer) => {
> -                for info in &finish_list {
> -                    writer.write_all(render_task_line(&info).as_bytes())?;
> -                }
> -            },
> -            Err(err) => bail!("could not write task archive - {}", err),
> -        }
> -
> -        nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
> -    }
> -
> -    // TODO Remove with 1.x
> -    // for compatibility, if we had an INDEX file, we do not need it anymore
> -    if had_index_file {
> -        let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
> -    }
> -
> -    drop(lock);
> -
> -    Ok(())
> -}
> -
>  fn render_task_line(info: &TaskListInfo) -> String {
>      let mut raw = String::new();
>      if let Some(status) = &info.state {
> @@ -486,27 +559,30 @@ pub struct TaskListInfoIterator {
>      list: VecDeque<TaskListInfo>,
>      end: bool,
>      archive: Option<LogRotateFiles>,
> -    lock: Option<BackupLockGuard>,
> +    lock: Option<TaskListLockGuard>,
>  }
>  
>  impl TaskListInfoIterator {
>      pub fn new(active_only: bool) -> Result<Self, Error> {
> +
> +        let setup = worker_task_setup()?;
> +
>          let (read_lock, active_list) = {
> -            let lock = lock_task_list_files(false)?;
> -            let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> +            let lock = setup.lock_task_list_files(false)?;
> +            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
>  
>              let needs_update = active_list
>                  .iter()
>                  .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
>  
>              // TODO remove with 1.x
> -            let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
> +            let index_exists = setup.task_index_fn.is_file();
>  
>              if needs_update || index_exists {
>                  drop(lock);
> -                update_active_workers(None)?;
> -                let lock = lock_task_list_files(false)?;
> -                let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
> +                setup.update_active_workers(None)?;
> +                let lock = setup.lock_task_list_files(false)?;
> +                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
>                  (lock, active_list)
>              } else {
>                  (lock, active_list)
> @@ -516,7 +592,7 @@ impl TaskListInfoIterator {
>          let archive = if active_only {
>              None
>          } else {
> -            let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
> +            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
>                  .ok_or_else(|| format_err!("could not get archive file names"))?;
>              Some(logrotate.files())
>          };
> @@ -568,6 +644,7 @@ impl Iterator for TaskListInfoIterator {
>  /// persistently to files. Task should poll the `abort_requested`
>  /// flag, and stop execution when requested.
>  pub struct WorkerTask {
> +    setup: &'static WorkerTaskSetup,
>      upid: UPID,
>      data: Mutex<WorkerTaskData>,
>      abort_requested: AtomicBool,
> @@ -589,17 +666,26 @@ struct WorkerTaskData {
>  
>  impl WorkerTask {
>  
> -    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
> +    pub fn new(
> +        worker_type: &str,
> +        worker_id: Option<String>,
> +        auth_id: String,
> +        to_stdout: bool,
> +    ) -> Result<Arc<Self>, Error> {
> +
> +        let setup = worker_task_setup()?;
> +
>          let upid = UPID::new(worker_type, worker_id, auth_id)?;
>          let task_id = upid.task_id;
>  
> -        let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
> +        let mut path = setup.taskdir.clone();
>  
>          path.push(format!("{:02X}", upid.pstart & 255));
>  
> -        let backup_user = pbs_config::backup_user()?;
> +        let dir_opts = setup.file_opts.clone()
> +            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
>  
> -        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
> +        create_path(&path, None, Some(dir_opts))?;
>  
>          path.push(upid.to_string());
>  
> @@ -608,12 +694,13 @@ impl WorkerTask {
>              exclusive: true,
>              prefix_time: true,
>              read: true,
> +            file_opts: setup.file_opts.clone(),
>              ..Default::default()
>          };
>          let logger = FileLogger::new(&path, logger_options)?;
> -        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
>  
>          let worker = Arc::new(Self {
> +            setup,
>              upid: upid.clone(),
>              abort_requested: AtomicBool::new(false),
>              data: Mutex::new(WorkerTaskData {
> @@ -631,7 +718,7 @@ impl WorkerTask {
>              proxmox_rest_server::set_worker_count(hash.len());
>          }
>  
> -        update_active_workers(Some(&upid))?;
> +        setup.update_active_workers(Some(&upid))?;
>  
>          Ok(worker)
>      }
> @@ -714,7 +801,7 @@ impl WorkerTask {
>          self.log(state.result_text());
>  
>          WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
> -        let _ = update_active_workers(None);
> +        let _ = self.setup.update_active_workers(None);
>          proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
>      }
>  
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 




^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid
  2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
                   ` (4 preceding siblings ...)
  2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: " Dietmar Maurer
@ 2021-09-23 13:20 ` Fabian Grünbichler
  5 siblings, 0 replies; 9+ messages in thread
From: Fabian Grünbichler @ 2021-09-23 13:20 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On September 23, 2021 12:13 pm, Dietmar Maurer wrote:
> Because we want to move worker_task.rs into proxmox-rest-server crate.

as discussed off-list, we could use a 

struct AuthidStr(String)

or a trait defined in proxmox instead of a regular String, to avoid 
accidentally passing in other, non authid strings. all Authid 
implementations would then need to implement Into<AuthidStr> (/the 
trait), and the calls below to to_string() would become into() (remain 
as they were).

can also be done as follow-up with a proxmox bump if we want to discuss 
it more in-depth but still go ahead with the split/move now.

> ---
>  pbs-api-types/src/upid.rs        | 13 +++++++------
>  src/api2/admin/datastore.rs      |  6 +++---
>  src/api2/backup/environment.rs   |  2 +-
>  src/api2/backup/mod.rs           |  2 +-
>  src/api2/config/acme.rs          |  6 +++---
>  src/api2/config/datastore.rs     |  4 ++--
>  src/api2/node/apt.rs             |  4 ++--
>  src/api2/node/certificates.rs    |  6 +++---
>  src/api2/node/disks/directory.rs |  4 ++--
>  src/api2/node/disks/mod.rs       |  4 ++--
>  src/api2/node/disks/zfs.rs       |  4 ++--
>  src/api2/node/mod.rs             |  2 +-
>  src/api2/node/network.rs         |  2 +-
>  src/api2/node/services.rs        |  2 +-
>  src/api2/node/tasks.rs           | 15 +++++++++------
>  src/api2/pull.rs                 |  4 ++--
>  src/api2/reader/mod.rs           |  2 +-
>  src/api2/tape/backup.rs          |  4 ++--
>  src/api2/tape/drive.rs           |  2 +-
>  src/api2/tape/restore.rs         |  2 +-
>  src/bin/proxmox-backup-proxy.rs  |  2 +-
>  src/server/gc_job.rs             |  2 +-
>  src/server/prune_job.rs          |  2 +-
>  src/server/verify_job.rs         |  2 +-
>  src/server/worker_task.rs        |  8 ++++----
>  25 files changed, 55 insertions(+), 51 deletions(-)
> 
> diff --git a/pbs-api-types/src/upid.rs b/pbs-api-types/src/upid.rs
> index ba23a646..29135bca 100644
> --- a/pbs-api-types/src/upid.rs
> +++ b/pbs-api-types/src/upid.rs
> @@ -8,8 +8,6 @@ use proxmox::api::schema::{ApiStringFormat, ApiType, Schema, StringSchema, Array
>  use proxmox::const_regex;
>  use proxmox::sys::linux::procfs;
>  
> -use crate::Authid;
> -
>  /// Unique Process/Task Identifier
>  ///
>  /// We use this to uniquely identify worker task. UPIDs have a short
> @@ -37,7 +35,7 @@ pub struct UPID {
>      /// Worker ID (arbitrary ASCII string)
>      pub worker_id: Option<String>,
>      /// The authenticated entity who started the task
> -    pub auth_id: Authid,
> +    pub auth_id: String,
>      /// The node name.
>      pub node: String,
>  }
> @@ -71,7 +69,7 @@ impl UPID {
>      pub fn new(
>          worker_type: &str,
>          worker_id: Option<String>,
> -        auth_id: Authid,
> +        auth_id: String,
>      ) -> Result<Self, Error> {
>  
>          let pid = unsafe { libc::getpid() };
> @@ -82,6 +80,10 @@ impl UPID {
>              bail!("illegal characters in worker type '{}'", worker_type);
>          }
>  
> +        if auth_id.contains(bad) {
> +            bail!("illegal characters in auth_id '{}'", auth_id);
> +        }
> +
>          static WORKER_TASK_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
>  
>          let task_id = WORKER_TASK_NEXT_ID.fetch_add(1, Ordering::SeqCst);
> @@ -184,7 +186,7 @@ pub struct TaskListItem {
>      /// Worker ID (arbitrary ASCII string)
>      pub worker_id: Option<String>,
>      /// The authenticated entity who started the task
> -    pub user: Authid,
> +    pub user: String,
>      /// The task end time (Epoch)
>      #[serde(skip_serializing_if="Option::is_none")]
>      pub endtime: Option<i64>,
> @@ -200,4 +202,3 @@ pub const NODE_TASKS_LIST_TASKS_RETURN_TYPE: ReturnType = ReturnType {
>          &TaskListItem::API_SCHEMA,
>      ).schema(),
>  };
> -
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 0b14dfbf..fbb93f35 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -722,7 +722,7 @@ pub fn verify(
>      let upid_str = WorkerTask::new_thread(
>          worker_type,
>          Some(worker_id),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
> @@ -862,7 +862,7 @@ pub fn prune(
>  
>  
>      // We use a WorkerTask just to have a task log, but run synchrounously
> -    let worker = WorkerTask::new("prune", Some(worker_id), auth_id, true)?;
> +    let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?;
>  
>      if keep_all {
>          worker.log("No prune selection - keeping all files.");
> @@ -957,7 +957,7 @@ pub fn prune_datastore(
>      let upid_str = WorkerTask::new_thread(
>          "prune",
>          Some(store.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| crate::server::prune_datastore(
>              worker.clone(),
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 129ebd2b..306f91ee 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -525,7 +525,7 @@ impl BackupEnvironment {
>          WorkerTask::new_thread(
>              "verify",
>              Some(worker_id),
> -            self.auth_id.clone(),
> +            self.auth_id.to_string(),
>              false,
>              move |worker| {
>                  worker.log("Automatically verifying newly added snapshot");
> diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
> index 8f51f314..c14f19a4 100644
> --- a/src/api2/backup/mod.rs
> +++ b/src/api2/backup/mod.rs
> @@ -166,7 +166,7 @@ async move {
>      if !is_new { bail!("backup directory already exists."); }
>  
>  
> -    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.clone(), true, move |worker| {
> +    WorkerTask::spawn(worker_type, Some(worker_id), auth_id.to_string(), true, move |worker| {
>          let mut env = BackupEnvironment::new(
>              env_type, auth_id, worker.clone(), datastore, backup_dir);
>  
> diff --git a/src/api2/config/acme.rs b/src/api2/config/acme.rs
> index 593b79a3..564cafae 100644
> --- a/src/api2/config/acme.rs
> +++ b/src/api2/config/acme.rs
> @@ -215,7 +215,7 @@ fn register_account(
>      WorkerTask::spawn(
>          "acme-register",
>          Some(name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          true,
>          move |worker| async move {
>              let mut client = AcmeClient::new(directory);
> @@ -275,7 +275,7 @@ pub fn update_account(
>      WorkerTask::spawn(
>          "acme-update",
>          Some(name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          true,
>          move |_worker| async move {
>              let data = match contact {
> @@ -320,7 +320,7 @@ pub fn deactivate_account(
>      WorkerTask::spawn(
>          "acme-deactivate",
>          Some(name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          true,
>          move |worker| async move {
>              match AcmeClient::load(&name)
> diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
> index c6036fc3..0e6529f8 100644
> --- a/src/api2/config/datastore.rs
> +++ b/src/api2/config/datastore.rs
> @@ -119,9 +119,9 @@ pub fn create_datastore(
>      WorkerTask::new_thread(
>          "create-datastore",
>          Some(config.name.to_string()),
> -        auth_id,
> +        auth_id.to_string(),
>          to_stdout,
> -        move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
> +       move |worker| do_create_datastore(lock, section_config, config, Some(&worker)),
>      )
>  }
>  
> diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs
> index 8f4bc691..f02920c0 100644
> --- a/src/api2/node/apt.rs
> +++ b/src/api2/node/apt.rs
> @@ -14,7 +14,7 @@ use proxmox_apt::repositories::{
>  use proxmox_http::ProxyConfig;
>  
>  use pbs_api_types::{
> -    Authid, APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
> +    APTUpdateInfo, NODE_SCHEMA, PROXMOX_CONFIG_DIGEST_SCHEMA, UPID_SCHEMA,
>      PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
>  };
>  
> @@ -154,7 +154,7 @@ pub fn apt_update_database(
>      rpcenv: &mut dyn RpcEnvironment,
>  ) -> Result<String, Error> {
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
>      let upid_str = WorkerTask::new_thread("aptupdate", None, auth_id, to_stdout, move |worker| {
> diff --git a/src/api2/node/certificates.rs b/src/api2/node/certificates.rs
> index 82fa028d..7b31861e 100644
> --- a/src/api2/node/certificates.rs
> +++ b/src/api2/node/certificates.rs
> @@ -11,7 +11,7 @@ use proxmox::api::router::SubdirMap;
>  use proxmox::api::{api, Permission, Router, RpcEnvironment};
>  use proxmox::list_subdirs_api_method;
>  
> -use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_MODIFY};
> +use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_MODIFY};
>  use pbs_buildcfg::configdir;
>  use pbs_tools::cert;
>  
> @@ -530,7 +530,7 @@ fn spawn_certificate_worker(
>  
>      let (node_config, _digest) = crate::config::node::config()?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      WorkerTask::spawn(name, None, auth_id, true, move |worker| async move {
>          if let Some(cert) = order_certificate(worker, &node_config).await? {
> @@ -559,7 +559,7 @@ pub fn revoke_acme_cert(rpcenv: &mut dyn RpcEnvironment) -> Result<String, Error
>  
>      let cert_pem = get_certificate_pem()?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      WorkerTask::spawn(
>          "acme-revoke-cert",
> diff --git a/src/api2/node/disks/directory.rs b/src/api2/node/disks/directory.rs
> index 38809dcf..2f4a738d 100644
> --- a/src/api2/node/disks/directory.rs
> +++ b/src/api2/node/disks/directory.rs
> @@ -7,7 +7,7 @@ use proxmox::api::section_config::SectionConfigData;
>  use proxmox::api::router::Router;
>  
>  use pbs_api_types::{
> -    Authid, DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
> +    DataStoreConfig, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
>      DATASTORE_SCHEMA, UPID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
>  };
>  
> @@ -146,7 +146,7 @@ pub fn create_datastore_disk(
>  
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      let info = get_disk_usage_info(&disk, true)?;
>  
> diff --git a/src/api2/node/disks/mod.rs b/src/api2/node/disks/mod.rs
> index 67f8f63a..b4001a54 100644
> --- a/src/api2/node/disks/mod.rs
> +++ b/src/api2/node/disks/mod.rs
> @@ -7,7 +7,7 @@ use proxmox::{sortable, identity};
>  use proxmox::{list_subdirs_api_method};
>  
>  use pbs_api_types::{
> -    Authid, UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
> +    UPID_SCHEMA, NODE_SCHEMA, BLOCKDEVICE_NAME_SCHEMA,
>      PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
>  };
>  
> @@ -144,7 +144,7 @@ pub fn initialize_disk(
>  
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      let info = get_disk_usage_info(&disk, true)?;
>  
> diff --git a/src/api2/node/disks/zfs.rs b/src/api2/node/disks/zfs.rs
> index 14c2cfec..9fe0dac4 100644
> --- a/src/api2/node/disks/zfs.rs
> +++ b/src/api2/node/disks/zfs.rs
> @@ -8,7 +8,7 @@ use proxmox::api::{
>  use proxmox::api::router::Router;
>  
>  use pbs_api_types::{
> -    Authid, ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
> +    ZpoolListItem, ZfsRaidLevel, ZfsCompressionType, DataStoreConfig,
>      NODE_SCHEMA, ZPOOL_NAME_SCHEMA, DATASTORE_SCHEMA, DISK_ARRAY_SCHEMA,
>      DISK_LIST_SCHEMA, ZFS_ASHIFT_SCHEMA, UPID_SCHEMA,
>      PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
> @@ -168,7 +168,7 @@ pub fn create_zpool(
>  
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      let add_datastore = add_datastore.unwrap_or(false);
>  
> diff --git a/src/api2/node/mod.rs b/src/api2/node/mod.rs
> index 9b31d595..8e357311 100644
> --- a/src/api2/node/mod.rs
> +++ b/src/api2/node/mod.rs
> @@ -146,7 +146,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
>      let upid = WorkerTask::spawn(
>          "termproxy",
>          None,
> -        auth_id,
> +        auth_id.to_string(),
>          false,
>          move |worker| async move {
>              // move inside the worker so that it survives and does not close the port
> diff --git a/src/api2/node/network.rs b/src/api2/node/network.rs
> index 351fd11c..0fde9f2a 100644
> --- a/src/api2/node/network.rs
> +++ b/src/api2/node/network.rs
> @@ -703,7 +703,7 @@ pub async fn reload_network_config(
>  
>      let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>  
> -    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id, true, |_worker| async {
> +    let upid_str = WorkerTask::spawn("srvreload", Some(String::from("networking")), auth_id.to_string(), true, |_worker| async {
>  
>          let _ = std::fs::rename(network::NETWORK_INTERFACES_NEW_FILENAME, network::NETWORK_INTERFACES_FILENAME);
>  
> diff --git a/src/api2/node/services.rs b/src/api2/node/services.rs
> index 25edd1b6..6c757f43 100644
> --- a/src/api2/node/services.rs
> +++ b/src/api2/node/services.rs
> @@ -195,7 +195,7 @@ fn run_service_command(service: &str, cmd: &str, auth_id: Authid) -> Result<Valu
>      let upid = WorkerTask::new_thread(
>          &workerid,
>          Some(service.clone()),
> -        auth_id,
> +        auth_id.to_string(),
>          false,
>          move |_worker| {
>  
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index 7066f889..169a3d4d 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -99,8 +99,8 @@ fn check_job_store(upid: &UPID, store: &str) -> bool {
>  }
>  
>  fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
> -    let task_auth_id = &upid.auth_id;
> -    if auth_id == task_auth_id
> +    let task_auth_id: Authid = upid.auth_id.parse()?;
> +    if auth_id == &task_auth_id
>          || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
>          // task owner can always read
>          Ok(())
> @@ -200,6 +200,8 @@ async fn get_task_status(
>      let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>      check_task_access(&auth_id, &upid)?;
>  
> +    let task_auth_id: Authid = upid.auth_id.parse()?;
> +
>      let mut result = json!({
>          "upid": param["upid"],
>          "node": upid.node,
> @@ -208,11 +210,11 @@ async fn get_task_status(
>          "starttime": upid.starttime,
>          "type": upid.worker_type,
>          "id": upid.worker_id,
> -        "user": upid.auth_id.user(),
> +        "user": task_auth_id.user(),
>      });
>  
> -    if upid.auth_id.is_token() {
> -        result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str());
> +    if task_auth_id.is_token() {
> +        result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
>      }
>  
>      if crate::server::worker_is_active(&upid).await? {
> @@ -344,10 +346,11 @@ fn stop_task(
>  
>      let upid = extract_upid(&param)?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>  
>      if auth_id != upid.auth_id {
>          let user_info = CachedUserInfo::new()?;
> +        let auth_id: Authid = auth_id.parse()?;
>          user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
>      }
>  
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index 1eb86ea3..e631920f 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -81,7 +81,7 @@ pub fn do_sync_job(
>      let upid_str = WorkerTask::spawn(
>          &worker_type,
>          Some(job_id.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| async move {
>  
> @@ -183,7 +183,7 @@ async fn pull (
>      let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
>  
>      // fixme: set to_stdout to false?
> -    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.clone(), true, move |worker| async move {
> +    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
>  
>          worker.log(format!("sync datastore '{}' start", store));
>  
> diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
> index 821e83c4..fada952c 100644
> --- a/src/api2/reader/mod.rs
> +++ b/src/api2/reader/mod.rs
> @@ -143,7 +143,7 @@ fn upgrade_to_backup_reader_protocol(
>  
>          let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
>  
> -        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
> +        WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move {
>              let _guard = _guard;
>  
>              let mut env = ReaderEnvironment::new(
> diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
> index 6b533820..fadbfa3d 100644
> --- a/src/api2/tape/backup.rs
> +++ b/src/api2/tape/backup.rs
> @@ -195,7 +195,7 @@ pub fn do_tape_backup_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(job_id.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> @@ -376,7 +376,7 @@ pub fn backup(
>      let upid_str = WorkerTask::new_thread(
>          "tape-backup",
>          Some(job_id),
> -        auth_id,
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              let _drive_lock = drive_lock; // keep lock guard
> diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
> index 58f49f43..10aa6842 100644
> --- a/src/api2/tape/drive.rs
> +++ b/src/api2/tape/drive.rs
> @@ -87,7 +87,7 @@ where
>      let (config, _digest) = pbs_config::drive::config()?;
>      let lock_guard = lock_tape_device(&config, &drive)?;
>  
> -    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let auth_id = rpcenv.get_auth_id().unwrap();
>      let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>  
>      WorkerTask::new_thread(worker_type, job_id, auth_id, to_stdout, move |worker| {
> diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
> index 4ab60e8f..7739d1a4 100644
> --- a/src/api2/tape/restore.rs
> +++ b/src/api2/tape/restore.rs
> @@ -275,7 +275,7 @@ pub fn restore(
>      let upid_str = WorkerTask::new_thread(
>          "tape-restore",
>          Some(taskid),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              let _drive_lock = drive_lock; // keep lock guard
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 6734525b..518054bf 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -745,7 +745,7 @@ async fn schedule_task_log_rotate() {
>      if let Err(err) = WorkerTask::new_thread(
>          worker_type,
>          None,
> -        Authid::root_auth_id().clone(),
> +        Authid::root_auth_id().to_string(),
>          false,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
> index 665b9631..317f4a36 100644
> --- a/src/server/gc_job.rs
> +++ b/src/server/gc_job.rs
> @@ -26,7 +26,7 @@ pub fn do_garbage_collection_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(store.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
> index f298a74c..8d971a1c 100644
> --- a/src/server/prune_job.rs
> +++ b/src/server/prune_job.rs
> @@ -105,7 +105,7 @@ pub fn do_prune_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(job.jobname().to_string()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          false,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
> index 7f6d73ff..6005b706 100644
> --- a/src/server/verify_job.rs
> +++ b/src/server/verify_job.rs
> @@ -36,7 +36,7 @@ pub fn do_verification_job(
>      let upid_str = WorkerTask::new_thread(
>          &worker_type,
>          Some(job_id.clone()),
> -        auth_id.clone(),
> +        auth_id.to_string(),
>          to_stdout,
>          move |worker| {
>              job.start(&worker.upid().to_string())?;
> diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
> index a74b18e1..92ab50d7 100644
> --- a/src/server/worker_task.rs
> +++ b/src/server/worker_task.rs
> @@ -18,7 +18,7 @@ use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
>  
>  use pbs_buildcfg;
>  use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
> -use pbs_api_types::{Authid, UPID};
> +use pbs_api_types::UPID;
>  use pbs_config::{open_backup_lockfile, BackupLockGuard};
>  use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
>  
> @@ -589,7 +589,7 @@ struct WorkerTaskData {
>  
>  impl WorkerTask {
>  
> -    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
> +    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: String, to_stdout: bool) -> Result<Arc<Self>, Error> {
>          let upid = UPID::new(worker_type, worker_id, auth_id)?;
>          let task_id = upid.task_id;
>  
> @@ -640,7 +640,7 @@ impl WorkerTask {
>      pub fn spawn<F, T>(
>          worker_type: &str,
>          worker_id: Option<String>,
> -        auth_id: Authid,
> +        auth_id: String,
>          to_stdout: bool,
>          f: F,
>      ) -> Result<String, Error>
> @@ -662,7 +662,7 @@ impl WorkerTask {
>      pub fn new_thread<F>(
>          worker_type: &str,
>          worker_id: Option<String>,
> -        auth_id: Authid,
> +        auth_id: String,
>          to_stdout: bool,
>          f: F,
>      ) -> Result<String, Error>
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid
@ 2021-09-24  9:03 Dietmar Maurer
  0 siblings, 0 replies; 9+ messages in thread
From: Dietmar Maurer @ 2021-09-24  9:03 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Fabian Grünbichler

> On 09/23/2021 3:20 PM Fabian Grünbichler <f.gruenbichler@proxmox.com> wrote:
> 
>  
> On September 23, 2021 12:13 pm, Dietmar Maurer wrote:
> > Because we want to move worker_task.rs into proxmox-rest-server crate.
> 
> as discussed off-list, we could use a 
> 
> struct AuthidStr(String)
> 
> or a trait defined in proxmox instead of a regular String, to avoid 
> accidentally passing in other, non authid strings. all Authid 
> implementations would then need to implement Into<AuthidStr> (/the 
> trait), and the calls below to to_string() would become into() (remain 
> as they were).
> 
> can also be done as follow-up with a proxmox bump if we want to discuss 
> it more in-depth but still go ahead with the split/move now.

I am still not sure whats the best way to implement this, so I committed
what we have so far...




^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2021-09-24  9:04 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-23 10:13 [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 2/6] worker task: allow to configure path and owner/group Dietmar Maurer
2021-09-23 11:36   ` Fabian Grünbichler
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 3/6] use UPID and systemd helpers from proxmox 0.13.4 Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 4/6] move worker_task.rs into proxmox-rest-server crate Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-daily-update: setup worker and command socket Dietmar Maurer
2021-09-23 10:13 ` [pbs-devel] [PATCH proxmox-backup 6/6] proxmox-backup-manager: " Dietmar Maurer
2021-09-23 13:20 ` [pbs-devel] [PATCH proxmox-backup 1/6] src/server/worker_task.rs: Avoid using pbs-api-type::Authid Fabian Grünbichler
2021-09-24  9:03 Dietmar Maurer

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal