From: Wolfgang Bumiller <w.bumiller@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH backup 6/7] refactor send_command
Date: Tue, 11 May 2021 15:53:59 +0200 [thread overview]
Message-ID: <20210511135400.32406-7-w.bumiller@proxmox.com> (raw)
In-Reply-To: <20210511135400.32406-1-w.bumiller@proxmox.com>
- refactor the combinators,
- make it take a `&T: Serialize` instead of a Value, and
allow sending the raw string via `send_raw_command`.
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 8 +---
src/server/command_socket.rs | 71 ++++++++++++++++++---------------
src/server/worker_task.rs | 4 +-
3 files changed, 42 insertions(+), 41 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 793ba67d..fc773459 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -750,15 +750,11 @@ async fn command_reopen_logfiles() -> Result<(), Error> {
// only care about the most recent daemon instance for each, proxy & api, as other older ones
// should not respond to new requests anyway, but only finish their current one and then exit.
let sock = server::our_ctrl_sock();
- let f1 = server::send_command(sock, serde_json::json!({
- "command": "api-access-log-reopen",
- }));
+ let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
let pid = server::read_pid(buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
let sock = server::ctrl_sock_from_pid(pid);
- let f2 = server::send_command(sock, serde_json::json!({
- "command": "api-access-log-reopen",
- }));
+ let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
match futures::join!(f1, f2) {
(Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs
index 89c77585..af41dd16 100644
--- a/src/server/command_socket.rs
+++ b/src/server/command_socket.rs
@@ -2,11 +2,12 @@ use anyhow::{bail, format_err, Error};
use std::collections::HashMap;
use std::os::unix::io::AsRawFd;
-use std::path::PathBuf;
+use std::path::{PathBuf, Path};
use std::sync::Arc;
use futures::*;
use tokio::net::UnixListener;
+use serde::Serialize;
use serde_json::Value;
use nix::sys::socket;
@@ -102,43 +103,47 @@ where
}
-pub async fn send_command<P>(
- path: P,
- params: Value
-) -> Result<Value, Error>
- where P: Into<PathBuf>,
+pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
+ T: ?Sized + Serialize,
{
- let path: PathBuf = path.into();
+ let mut command_string = serde_json::to_string(params)?;
+ command_string.push('\n');
+ send_raw_command(path.as_ref(), &command_string).await
+}
- tokio::net::UnixStream::connect(path)
- .map_err(move |err| format_err!("control socket connect failed - {}", err))
- .and_then(move |mut conn| {
+pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
+{
+ use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
- let mut command_string = params.to_string();
- command_string.push('\n');
+ let mut conn = tokio::net::UnixStream::connect(path)
+ .map_err(move |err| format_err!("control socket connect failed - {}", err))
+ .await?;
- async move {
- use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+ conn.write_all(command_string.as_bytes()).await?;
+ if !command_string.as_bytes().ends_with(b"\n") {
+ conn.write_all(b"\n").await?;
+ }
- conn.write_all(command_string.as_bytes()).await?;
- AsyncWriteExt::shutdown(&mut conn).await?;
- let mut rx = tokio::io::BufReader::new(conn);
- let mut data = String::new();
- if rx.read_line(&mut data).await? == 0 {
- bail!("no response");
- }
- if let Some(res) = data.strip_prefix("OK: ") {
- match res.parse::<Value>() {
- Ok(v) => Ok(v),
- Err(err) => bail!("unable to parse json response - {}", err),
- }
- } else if let Some(err) = data.strip_prefix("ERROR: ") {
- bail!("{}", err);
- } else {
- bail!("unable to parse response: {}", data);
- }
- }
- }).await
+ AsyncWriteExt::shutdown(&mut conn).await?;
+ let mut rx = tokio::io::BufReader::new(conn);
+ let mut data = String::new();
+ if rx.read_line(&mut data).await? == 0 {
+ bail!("no response");
+ }
+ if let Some(res) = data.strip_prefix("OK: ") {
+ match res.parse::<Value>() {
+ Ok(v) => Ok(v),
+ Err(err) => bail!("unable to parse json response - {}", err),
+ }
+ } else if let Some(err) = data.strip_prefix("ERROR: ") {
+ bail!("{}", err);
+ } else {
+ bail!("unable to parse response: {}", data);
+ }
}
/// A callback for a specific commando socket.
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 6c5456c9..84019fef 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -59,7 +59,7 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
"upid": upid.to_string(),
},
});
- let status = super::send_command(sock, cmd).await?;
+ let status = super::send_command(sock, &cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
@@ -133,7 +133,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
"upid": upid.to_string(),
},
});
- super::send_command(sock, cmd).map_ok(|_| ()).await
+ super::send_command(sock, &cmd).map_ok(|_| ()).await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
--
2.20.1
next prev parent reply other threads:[~2021-05-11 13:54 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-05-11 13:53 [pbs-devel] [PATCH backup 0/7] hot-reload proxy certificates Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 1/7] proxy: factor out accept_connection Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 2/7] proxy: "continue on error" for the accept call, too Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 3/7] proxy: Arc usage cleanup Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 4/7] proxy: factor out tls acceptor creation Wolfgang Bumiller
2021-05-11 13:53 ` [pbs-devel] [PATCH backup 5/7] proxy: implement 'reload-certificate' command Wolfgang Bumiller
2021-05-11 13:53 ` Wolfgang Bumiller [this message]
2021-05-11 13:54 ` [pbs-devel] [PATCH backup 7/7] hot-reload proxy certificate when updating via the API Wolfgang Bumiller
2021-05-11 16:11 ` [pbs-devel] applied-series: [PATCH backup 0/7] hot-reload proxy certificates Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210511135400.32406-7-w.bumiller@proxmox.com \
--to=w.bumiller@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.