From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 296D57B16C for ; Tue, 11 May 2021 15:54:08 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 276C922CAE for ; Tue, 11 May 2021 15:54:08 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 81AA722C93 for ; Tue, 11 May 2021 15:54:05 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 5D03942D85 for ; Tue, 11 May 2021 15:54:05 +0200 (CEST) From: Wolfgang Bumiller To: pbs-devel@lists.proxmox.com Date: Tue, 11 May 2021 15:53:59 +0200 Message-Id: <20210511135400.32406-7-w.bumiller@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210511135400.32406-1-w.bumiller@proxmox.com> References: <20210511135400.32406-1-w.bumiller@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.018 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox-backup-proxy.rs] Subject: [pbs-devel] [PATCH backup 6/7] refactor send_command X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 11 May 2021 13:54:08 -0000 - refactor the combinators, - make it take a `&T: Serialize` instead of a Value, and allow sending the raw string via `send_raw_command`. Signed-off-by: Wolfgang Bumiller --- src/bin/proxmox-backup-proxy.rs | 8 +--- src/server/command_socket.rs | 71 ++++++++++++++++++--------------- src/server/worker_task.rs | 4 +- 3 files changed, 42 insertions(+), 41 deletions(-) diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 793ba67d..fc773459 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -750,15 +750,11 @@ async fn command_reopen_logfiles() -> Result<(), Error> { // only care about the most recent daemon instance for each, proxy & api, as other older ones // should not respond to new requests anyway, but only finish their current one and then exit. let sock = server::our_ctrl_sock(); - let f1 = server::send_command(sock, serde_json::json!({ - "command": "api-access-log-reopen", - })); + let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); let pid = server::read_pid(buildcfg::PROXMOX_BACKUP_API_PID_FN)?; let sock = server::ctrl_sock_from_pid(pid); - let f2 = server::send_command(sock, serde_json::json!({ - "command": "api-access-log-reopen", - })); + let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); match futures::join!(f1, f2) { (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)), diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs index 89c77585..af41dd16 100644 --- a/src/server/command_socket.rs +++ b/src/server/command_socket.rs @@ -2,11 +2,12 @@ use anyhow::{bail, format_err, Error}; use std::collections::HashMap; use std::os::unix::io::AsRawFd; -use std::path::PathBuf; +use std::path::{PathBuf, Path}; use std::sync::Arc; use futures::*; use tokio::net::UnixListener; +use serde::Serialize; use serde_json::Value; use nix::sys::socket; @@ -102,43 +103,47 @@ where } -pub async fn send_command

( - path: P, - params: Value -) -> Result - where P: Into, +pub async fn send_command(path: P, params: &T) -> Result +where + P: AsRef, + T: ?Sized + Serialize, { - let path: PathBuf = path.into(); + let mut command_string = serde_json::to_string(params)?; + command_string.push('\n'); + send_raw_command(path.as_ref(), &command_string).await +} - tokio::net::UnixStream::connect(path) - .map_err(move |err| format_err!("control socket connect failed - {}", err)) - .and_then(move |mut conn| { +pub async fn send_raw_command

(path: P, command_string: &str) -> Result +where + P: AsRef, +{ + use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; - let mut command_string = params.to_string(); - command_string.push('\n'); + let mut conn = tokio::net::UnixStream::connect(path) + .map_err(move |err| format_err!("control socket connect failed - {}", err)) + .await?; - async move { - use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; + conn.write_all(command_string.as_bytes()).await?; + if !command_string.as_bytes().ends_with(b"\n") { + conn.write_all(b"\n").await?; + } - conn.write_all(command_string.as_bytes()).await?; - AsyncWriteExt::shutdown(&mut conn).await?; - let mut rx = tokio::io::BufReader::new(conn); - let mut data = String::new(); - if rx.read_line(&mut data).await? == 0 { - bail!("no response"); - } - if let Some(res) = data.strip_prefix("OK: ") { - match res.parse::() { - Ok(v) => Ok(v), - Err(err) => bail!("unable to parse json response - {}", err), - } - } else if let Some(err) = data.strip_prefix("ERROR: ") { - bail!("{}", err); - } else { - bail!("unable to parse response: {}", data); - } - } - }).await + AsyncWriteExt::shutdown(&mut conn).await?; + let mut rx = tokio::io::BufReader::new(conn); + let mut data = String::new(); + if rx.read_line(&mut data).await? == 0 { + bail!("no response"); + } + if let Some(res) = data.strip_prefix("OK: ") { + match res.parse::() { + Ok(v) => Ok(v), + Err(err) => bail!("unable to parse json response - {}", err), + } + } else if let Some(err) = data.strip_prefix("ERROR: ") { + bail!("{}", err); + } else { + bail!("unable to parse response: {}", data); + } } /// A callback for a specific commando socket. diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 6c5456c9..84019fef 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -59,7 +59,7 @@ pub async fn worker_is_active(upid: &UPID) -> Result { "upid": upid.to_string(), }, }); - let status = super::send_command(sock, cmd).await?; + let status = super::send_command(sock, &cmd).await?; if let Some(active) = status.as_bool() { Ok(active) @@ -133,7 +133,7 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> { "upid": upid.to_string(), }, }); - super::send_command(sock, cmd).map_ok(|_| ()).await + super::send_command(sock, &cmd).map_ok(|_| ()).await } fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option), Error> { -- 2.20.1