all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Wolfgang Bumiller <w.bumiller@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH backup 6/7] refactor send_command
Date: Tue, 11 May 2021 15:53:59 +0200	[thread overview]
Message-ID: <20210511135400.32406-7-w.bumiller@proxmox.com> (raw)
In-Reply-To: <20210511135400.32406-1-w.bumiller@proxmox.com>

- refactor the combinators,
- make it take a `&T: Serialize` instead of a Value, and
  allow sending the raw string via `send_raw_command`.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
 src/bin/proxmox-backup-proxy.rs |  8 +---
 src/server/command_socket.rs    | 71 ++++++++++++++++++---------------
 src/server/worker_task.rs       |  4 +-
 3 files changed, 42 insertions(+), 41 deletions(-)

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 793ba67d..fc773459 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -750,15 +750,11 @@ async fn command_reopen_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
     let sock = server::our_ctrl_sock();
-    let f1 = server::send_command(sock, serde_json::json!({
-        "command": "api-access-log-reopen",
-    }));
+    let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     let pid = server::read_pid(buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
     let sock = server::ctrl_sock_from_pid(pid);
-    let f2 = server::send_command(sock, serde_json::json!({
-        "command": "api-access-log-reopen",
-    }));
+    let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
         (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs
index 89c77585..af41dd16 100644
--- a/src/server/command_socket.rs
+++ b/src/server/command_socket.rs
@@ -2,11 +2,12 @@ use anyhow::{bail, format_err, Error};
 
 use std::collections::HashMap;
 use std::os::unix::io::AsRawFd;
-use std::path::PathBuf;
+use std::path::{PathBuf, Path};
 use std::sync::Arc;
 
 use futures::*;
 use tokio::net::UnixListener;
+use serde::Serialize;
 use serde_json::Value;
 use nix::sys::socket;
 
@@ -102,43 +103,47 @@ where
 }
 
 
-pub async fn send_command<P>(
-    path: P,
-    params: Value
-) -> Result<Value, Error>
-    where P: Into<PathBuf>,
+pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
+where
+    P: AsRef<Path>,
+    T: ?Sized + Serialize,
 {
-    let path: PathBuf = path.into();
+    let mut command_string = serde_json::to_string(params)?;
+    command_string.push('\n');
+    send_raw_command(path.as_ref(), &command_string).await
+}
 
-    tokio::net::UnixStream::connect(path)
-        .map_err(move |err| format_err!("control socket connect failed - {}", err))
-        .and_then(move |mut conn| {
+pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
+where
+    P: AsRef<Path>,
+{
+    use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
 
-            let mut command_string = params.to_string();
-            command_string.push('\n');
+    let mut conn = tokio::net::UnixStream::connect(path)
+        .map_err(move |err| format_err!("control socket connect failed - {}", err))
+        .await?;
 
-            async move {
-                use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+    conn.write_all(command_string.as_bytes()).await?;
+    if !command_string.as_bytes().ends_with(b"\n") {
+        conn.write_all(b"\n").await?;
+    }
 
-                conn.write_all(command_string.as_bytes()).await?;
-                AsyncWriteExt::shutdown(&mut conn).await?;
-                let mut rx = tokio::io::BufReader::new(conn);
-                let mut data = String::new();
-                if rx.read_line(&mut data).await? == 0 {
-                    bail!("no response");
-                }
-                if let Some(res) = data.strip_prefix("OK: ") {
-                    match res.parse::<Value>() {
-                        Ok(v) => Ok(v),
-                        Err(err) => bail!("unable to parse json response - {}", err),
-                    }
-                } else if let Some(err) = data.strip_prefix("ERROR: ") {
-                    bail!("{}", err);
-                } else {
-                    bail!("unable to parse response: {}", data);
-                }
-            }
-        }).await
+    AsyncWriteExt::shutdown(&mut conn).await?;
+    let mut rx = tokio::io::BufReader::new(conn);
+    let mut data = String::new();
+    if rx.read_line(&mut data).await? == 0 {
+        bail!("no response");
+    }
+    if let Some(res) = data.strip_prefix("OK: ") {
+        match res.parse::<Value>() {
+            Ok(v) => Ok(v),
+            Err(err) => bail!("unable to parse json response - {}", err),
+        }
+    } else if let Some(err) = data.strip_prefix("ERROR: ") {
+        bail!("{}", err);
+    } else {
+        bail!("unable to parse response: {}", data);
+    }
 }
 
 /// A callback for a specific commando socket.
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 6c5456c9..84019fef 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -59,7 +59,7 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
             "upid": upid.to_string(),
         },
     });
-    let status = super::send_command(sock, cmd).await?;
+    let status = super::send_command(sock, &cmd).await?;
 
     if let Some(active) = status.as_bool() {
         Ok(active)
@@ -133,7 +133,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
             "upid": upid.to_string(),
         },
     });
-    super::send_command(sock, cmd).map_ok(|_| ()).await
+    super::send_command(sock, &cmd).map_ok(|_| ()).await
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
-- 
2.20.1





  parent reply	other threads:[~2021-05-11 13:54 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-05-11 13:53 [pbs-devel] [PATCH backup 0/7] hot-reload proxy certificates Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 1/7] proxy: factor out accept_connection Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 2/7] proxy: "continue on error" for the accept call, too Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 3/7] proxy: Arc usage cleanup Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 4/7] proxy: factor out tls acceptor creation Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 5/7] proxy: implement 'reload-certificate' command Wolfgang Bumiller
2021-05-11 13:53 ` Wolfgang Bumiller [this message]
2021-05-11 13:54 ` [pbs-devel] [PATCH backup 7/7] hot-reload proxy certificate when updating via the API Wolfgang Bumiller
2021-05-11 16:11 ` [pbs-devel] applied-series: [PATCH backup 0/7] hot-reload proxy certificates Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210511135400.32406-7-w.bumiller@proxmox.com \
    --to=w.bumiller@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal