all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 1/2] sys: add wrapper for POSIX semaphores
Date: Thu,  4 Sep 2025 16:37:30 +0200	[thread overview]
Message-ID: <20250904143735.125857-2-h.laimer@proxmox.com> (raw)
In-Reply-To: <20250904143735.125857-1-h.laimer@proxmox.com>

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 proxmox-sys/Cargo.toml       |   1 +
 proxmox-sys/src/lib.rs       |   2 +
 proxmox-sys/src/semaphore.rs | 164 +++++++++++++++++++++++++++++++++++
 3 files changed, 167 insertions(+)
 create mode 100644 proxmox-sys/src/semaphore.rs

diff --git a/proxmox-sys/Cargo.toml b/proxmox-sys/Cargo.toml
index 64cd7cc6..500009b1 100644
--- a/proxmox-sys/Cargo.toml
+++ b/proxmox-sys/Cargo.toml
@@ -31,3 +31,4 @@ logrotate = ["dep:zstd"]
 acl = []
 crypt = ["dep:openssl"]
 timer = []
+semaphore = []
diff --git a/proxmox-sys/src/lib.rs b/proxmox-sys/src/lib.rs
index d8213438..be7471a7 100644
--- a/proxmox-sys/src/lib.rs
+++ b/proxmox-sys/src/lib.rs
@@ -18,6 +18,8 @@ pub mod logrotate;
 pub mod macros;
 pub mod mmap;
 pub mod process_locker;
+#[cfg(feature = "semaphore")]
+pub mod semaphore;
 pub mod systemd;
 
 /// Returns the hosts node name (UTS node name)
diff --git a/proxmox-sys/src/semaphore.rs b/proxmox-sys/src/semaphore.rs
new file mode 100644
index 00000000..61774607
--- /dev/null
+++ b/proxmox-sys/src/semaphore.rs
@@ -0,0 +1,164 @@
+use std::ffi::CString;
+use std::time::{Duration, SystemTime};
+
+use nix::errno::Errno;
+use nix::libc;
+
+use crate::c_try;
+
+/// POSIX named semaphore wrapper using `sem_open(3)`, `sem_wait(3)`, `sem_post(3)`,
+/// `sem_close(3)`, and `sem_unlink(3)`.
+///
+/// Notes:
+/// - On Linux, named semaphores reside in `/dev/shm` as `sem.<name>`.
+/// - Names start with a leading slash (for example `"/mysem"`). If
+///   omitted, we prepend one automatically.
+/// - The `mode` passed to [`Self::create`] is filtered by the creating process `umask`.
+/// - [`Self::close`] only drops this process' handle; use [`Self::unlink`] to remove the name.
+/// - The object is freed once the last open handle is closed and it has been unlinked.
+/// - Errors are returned as [`std::io::Error`] mapped from `errno`; callers can
+///   use [`crate::error::SysError`].
+
+pub struct PosixSemaphore {
+    sem: *mut libc::sem_t,
+}
+
+unsafe impl Send for PosixSemaphore {}
+unsafe impl Sync for PosixSemaphore {}
+
+impl PosixSemaphore {
+    /// Open an existing named semaphore.
+    ///
+    /// Equivalent to `sem_open(name, 0)`. Fails with `ENOENT` (as
+    /// `io::ErrorKind::NotFound`) if the name does not exist.
+    pub fn open(name: &str) -> std::io::Result<Self> {
+        let cname = if name.starts_with('/') {
+            name.to_string()
+        } else {
+            format!("/{}", name)
+        };
+        let cstr = CString::new(cname.clone()).map_err(std::io::Error::other)?;
+        let sem_ptr = unsafe { libc::sem_open(cstr.as_ptr(), 0) };
+        if sem_ptr == libc::SEM_FAILED {
+            return Err(std::io::Error::from_raw_os_error(Errno::last() as i32));
+        }
+        Ok(Self { sem: sem_ptr })
+    }
+
+    /// Create a named semaphore if missing, with `mode` and initial `value`.
+    ///
+    /// Equivalent to `sem_open(name, O_CREAT, mode, value)`. If the semaphore
+    /// already exists, POSIX opens it and ignores `value`.
+    /// The effective permissions are subject to the current process' `umask`.
+    pub fn create(name: &str, mode: u32, value: u32) -> std::io::Result<Self> {
+        let cname = if name.starts_with('/') {
+            name.to_string()
+        } else {
+            format!("/{}", name)
+        };
+        let cstr = CString::new(cname.clone()).map_err(std::io::Error::other)?;
+        let sem_ptr = unsafe { libc::sem_open(cstr.as_ptr(), libc::O_CREAT, mode, value) };
+        if sem_ptr == libc::SEM_FAILED {
+            return Err(std::io::Error::from_raw_os_error(Errno::last() as i32));
+        }
+        Ok(Self { sem: sem_ptr })
+    }
+
+    /// Decrement the semaphore, blocking until a permit is available.
+    ///
+    /// Equivalent to `sem_wait(3)`. May return `Interrupted` on `EINTR`.
+    pub fn wait(&self) -> std::io::Result<()> {
+        c_try!(unsafe { libc::sem_wait(self.sem) });
+        Ok(())
+    }
+
+    /// Try to decrement the semaphore without blocking (`sem_trywait`).
+    /// Returns `Ok(true)` if acquired, `Ok(false)` on `EAGAIN`.
+    pub fn try_wait(&self) -> std::io::Result<bool> {
+        let rc = unsafe { libc::sem_trywait(self.sem) };
+        if rc == 0 {
+            return Ok(true);
+        }
+        let err = Errno::last();
+        if err == Errno::EAGAIN {
+            return Ok(false);
+        }
+        Err(std::io::Error::from_raw_os_error(err as i32))
+    }
+
+    /// Decrement the semaphore or time out after `timeout` (`sem_timedwait`).
+    /// Returns `Ok(true)` if acquired, `Ok(false)` on `ETIMEDOUT`.
+    pub fn timed_wait(&self, timeout: Duration) -> std::io::Result<bool> {
+        let abs = SystemTime::now() + timeout;
+        let d = abs
+            .duration_since(SystemTime::UNIX_EPOCH)
+            .unwrap_or(Duration::from_secs(0));
+        let ts = libc::timespec {
+            tv_sec: d.as_secs() as i64,
+            tv_nsec: d.subsec_nanos() as i64,
+        };
+
+        let rc = unsafe { libc::sem_timedwait(self.sem, &ts as *const libc::timespec) };
+        if rc == 0 {
+            return Ok(true);
+        }
+        let err = Errno::last();
+        if err == Errno::ETIMEDOUT {
+            return Ok(false);
+        }
+        Err(std::io::Error::from_raw_os_error(err as i32))
+    }
+
+    /// Increment the semaphore (`sem_post`).
+    /// Can fail with `EOVERFLOW` if exceeding `SEM_VALUE_MAX`.
+    pub fn post(&self) -> std::io::Result<()> {
+        c_try!(unsafe { libc::sem_post(self.sem) });
+        Ok(())
+    }
+
+    /// Close this process' handle (`sem_close`).
+    /// Use [`Self::unlink`] to remove the name from the namespace.
+    pub fn close(&self) -> std::io::Result<()> {
+        c_try!(unsafe { libc::sem_close(self.sem) });
+        Ok(())
+    }
+
+    /// Unlink the named semaphore (`sem_unlink`). Removes the name so new opens fail.
+    /// Existing handles remain valid until closed.
+    pub fn unlink(name: &str) -> std::io::Result<()> {
+        let cname = if name.starts_with('/') {
+            name.to_string()
+        } else {
+            format!("/{}", name)
+        };
+        let cstr = CString::new(cname).map_err(std::io::Error::other)?;
+        c_try!(unsafe { libc::sem_unlink(cstr.as_ptr()) });
+        Ok(())
+    }
+}
+
+impl Drop for PosixSemaphore {
+    fn drop(&mut self) {
+        unsafe { libc::sem_close(self.sem) };
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn basic_ops() -> std::io::Result<()> {
+        let name = format!("test-semaphore-{}", std::process::id());
+        let _ = PosixSemaphore::unlink(&name);
+        let sem = PosixSemaphore::create(&name, 0o600, 1)?;
+        // Immediately unlink the name so the semaphore is cleaned up once all handles close.
+        PosixSemaphore::unlink(&name)?;
+        assert!(sem.try_wait()?);
+        assert!(!sem.try_wait()?);
+        sem.post()?;
+        assert!(sem.try_wait()?);
+        sem.close()?;
+        Ok(())
+    }
+}
-- 
2.47.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  reply	other threads:[~2025-09-04 14:37 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-09-04 14:37 [pbs-devel] [RFC proxmox{, -backup} 0/6] add support for configuring max Hannes Laimer
2025-09-04 14:37 ` Hannes Laimer [this message]
2025-09-04 14:37 ` [pbs-devel] [PATCH proxmox 2/2] pbs-api-types: add concurrency_limit to DataStoreConfig Hannes Laimer
2025-09-04 14:37 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: config: update/delete concurrency_limit on datastore Hannes Laimer
2025-09-04 14:37 ` [pbs-devel] [PATCH proxmox-backup 2/4] Cargo.toml: add 'semaphore' feature to proxmox-sys dep Hannes Laimer
2025-09-04 14:37 ` [pbs-devel] [PATCH proxmox-backup 3/4] bin: proxy: initialize concurrency semaphores for datastores Hannes Laimer
2025-09-04 14:37 ` [pbs-devel] [PATCH proxmox-backup 4/4] api: backup: wait for semaphore if one exists Hannes Laimer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250904143735.125857-2-h.laimer@proxmox.com \
    --to=h.laimer@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal