From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 2822A615E5 for ; Wed, 21 Oct 2020 09:29:11 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1E48D152E1 for ; Wed, 21 Oct 2020 09:29:11 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 39975152CF for ; Wed, 21 Oct 2020 09:29:10 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 05DD045E99 for ; Wed, 21 Oct 2020 09:29:10 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Wed, 21 Oct 2020 09:29:07 +0200 Message-Id: <20201021072908.10516-2-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20201021072908.10516-1-d.csapak@proxmox.com> References: <20201021072908.10516-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.486 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [tools.rs] Subject: [pbs-devel] [PATCH proxmox-backup v3 2/3] tools: add AsyncChannelWriter X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 21 Oct 2020 07:29:11 -0000 similar to StdChannelWriter, but implements AsyncWrite and sends to a tokio::sync::mpsc::Sender Signed-off-by: Dominik Csapak --- changes from v2: * use ByteBuffer instead of Vec (nicer interface) src/tools.rs | 4 ++ src/tools/async_channel_writer.rs | 106 ++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 src/tools/async_channel_writer.rs diff --git a/src/tools.rs b/src/tools.rs index 5a9f020a..22d6c344 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -44,6 +44,10 @@ pub use parallel_handler::*; mod wrapped_reader_stream; pub use wrapped_reader_stream::*; +mod async_channel_writer; +pub use async_channel_writer::*; + + mod std_channel_writer; pub use std_channel_writer::*; diff --git a/src/tools/async_channel_writer.rs b/src/tools/async_channel_writer.rs new file mode 100644 index 00000000..4bb56ac5 --- /dev/null +++ b/src/tools/async_channel_writer.rs @@ -0,0 +1,106 @@ +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::{Error, Result}; +use futures::{future::FutureExt, ready}; +use tokio::io::AsyncWrite; +use tokio::sync::mpsc::Sender; + +use proxmox::io_format_err; +use proxmox::tools::byte_buffer::ByteBuffer; +use proxmox::sys::error::io_err_other; + +/// Wrapper around tokio::sync::mpsc::Sender, which implements Write +pub struct AsyncChannelWriter { + sender: Option, Error>>>, + buf: ByteBuffer, + state: WriterState, +} + +type SendResult = io::Result>>>; + +enum WriterState { + Ready, + Sending(Pin + Send + 'static>>), +} + +impl AsyncChannelWriter { + pub fn new(sender: Sender, Error>>, buf_size: usize) -> Self { + Self { + sender: Some(sender), + buf: ByteBuffer::with_capacity(buf_size), + state: WriterState::Ready, + } + } + + fn poll_write_impl( + &mut self, + cx: &mut Context, + buf: &[u8], + flush: bool, + ) -> Poll> { + loop { + match &mut self.state { + WriterState::Ready => { + if flush { + if self.buf.is_empty() { + return Poll::Ready(Ok(0)); + } + } else { + let free_size = self.buf.free_size(); + if free_size > buf.len() || self.buf.is_empty() { + let count = free_size.min(buf.len()); + self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]); + self.buf.add_size(count); + return Poll::Ready(Ok(count)); + } + } + + let mut sender = match self.sender.take() { + Some(sender) => sender, + None => return Poll::Ready(Err(io_err_other("no sender"))), + }; + + let data = self.buf.remove_data(self.buf.len()).to_vec(); + let future = async move { + sender + .send(Ok(data)) + .await + .map(move |_| sender) + .map_err(|err| io_format_err!("could not send: {}", err)) + }; + + self.state = WriterState::Sending(future.boxed()); + } + WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) { + Ok(sender) => { + self.sender = Some(sender); + self.state = WriterState::Ready; + } + Err(err) => return Poll::Ready(Err(err)), + }, + } + } + } +} + +impl AsyncWrite for AsyncChannelWriter { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let this = self.get_mut(); + this.poll_write_impl(cx, buf, false) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + match ready!(this.poll_write_impl(cx, &[], true)) { + Ok(_) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(err)), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.poll_flush(cx) + } +} -- 2.20.1