* Re: [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper
@ 2022-02-18 7:36 Dietmar Maurer
2022-02-18 15:27 ` Wolfgang Bumiller
0 siblings, 1 reply; 3+ messages in thread
From: Dietmar Maurer @ 2022-02-18 7:36 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Dominik Csapak
This can block the executor? If so, should we use block_in_place?
Or does self.sender.blocking_send() consider that automatically?
> On 02/17/2022 10:40 AM Dominik Csapak <d.csapak@proxmox.com> wrote:
>
>
> this wraps around a tokio Sender for Vec<u8>, but implements a blocking
> write. We can use thas as an adapter for something that only takes a
> writer, and can read from it asynchonously
/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper
2022-02-18 7:36 [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dietmar Maurer
@ 2022-02-18 15:27 ` Wolfgang Bumiller
0 siblings, 0 replies; 3+ messages in thread
From: Wolfgang Bumiller @ 2022-02-18 15:27 UTC (permalink / raw)
To: Dietmar Maurer
Cc: Proxmox Backup Server development discussion, Dominik Csapak
On Fri, Feb 18, 2022 at 08:36:05AM +0100, Dietmar Maurer wrote:
> This can block the executor? If so, should we use block_in_place?
> Or does self.sender.blocking_send() consider that automatically?
The sender in this case is a thread. The tokio side is the receiver.
The type implements `Write` which is blocking by definition, adding a
`block_in_place` inside it is never correct.
If mixing of blocking/async code is really a concern (which I don't
think it is), I'd rather not expose `from_sender` (or under a different
name as an `unsafe fn`) and instead only have a function creating the
*tuple* of `(SenderWriter, Receiver)` with appropriate documentation
about the sync/async nature of the two sides.
But I don't think this is truly warranted.
^ permalink raw reply [flat|nested] 3+ messages in thread
* [pbs-devel] [RFC PATCH proxmox/proxmox-backup] implement streaming serialization for api calls
@ 2022-02-17 9:40 Dominik Csapak
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
0 siblings, 1 reply; 3+ messages in thread
From: Dominik Csapak @ 2022-02-17 9:40 UTC (permalink / raw)
To: pbs-devel
this series aims to improve memory behaviour for api calls where the
result is large. We currently convert that result into a serde `Value`
which, depending on the actual structure of the data, can use much more
memory than the original rust structure (e.g. i saw factor 5 to 10 for
the backup snapshot list) which in addition seems to trigger bad
behaviour in the memory allocator (this again....).
by streaming the serialization, we don't need any in memory copy of the
result, and we could probably even return an Arc<> or Rc<> so we maybe
don't have to copy the data at all (we would have to implement Serialize
on that ourselfs, or enable the 'rc' feature for serde to use that)
the only downside to that is that we need a thread per api call in which
we serialize into our tokio mpsc channel, since the serde serialization
has no async functionality.
sending as rfc because:
* not sure about the naming of things
* could probably use improved tests
* not really documented (i looked, but found no obvious place
where to add that to the api macro docs?)
proxmox:
Dominik Csapak (4):
proxmox-async: add SenderWriter helper
promxox-router: add SerializableReturn Trait
proxmox-router: add new ApiHandler variants for streaming
serialization
proxmox-api-macro: add 'streaming' option
proxmox-api-macro/src/api/method.rs | 127 ++++++++++++++------
proxmox-api-macro/tests/api1.rs | 16 +++
proxmox-async/src/blocking/mod.rs | 3 +
proxmox-async/src/blocking/sender_writer.rs | 47 ++++++++
proxmox-router/Cargo.toml | 2 +
proxmox-router/src/cli/command.rs | 45 +++++++
proxmox-router/src/lib.rs | 2 +
proxmox-router/src/router.rs | 78 ++++++++++++
proxmox-router/src/serializable_return.rs | 62 ++++++++++
9 files changed, 343 insertions(+), 39 deletions(-)
create mode 100644 proxmox-async/src/blocking/sender_writer.rs
create mode 100644 proxmox-router/src/serializable_return.rs
proxmox-backup:
Dominik Csapak (3):
proxmox-rest-server: OutputFormatter: add new format_data_streaming
method
adapt to the new ApiHandler variants
api: admin/datastore: enable streaming for some api calls
proxmox-rest-server/Cargo.toml | 1 +
proxmox-rest-server/src/formatter.rs | 52 +++++++++++++++++++++++++++-
proxmox-rest-server/src/rest.rs | 10 ++++++
src/api2/admin/datastore.rs | 1 +
src/api2/node/tasks.rs | 1 +
src/bin/proxmox_backup_debug/api.rs | 8 +++++
6 files changed, 72 insertions(+), 1 deletion(-)
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
* [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper
2022-02-17 9:40 [pbs-devel] [RFC PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
@ 2022-02-17 9:40 ` Dominik Csapak
0 siblings, 0 replies; 3+ messages in thread
From: Dominik Csapak @ 2022-02-17 9:40 UTC (permalink / raw)
To: pbs-devel
this wraps around a tokio Sender for Vec<u8>, but implements a blocking
write. We can use thas as an adapter for something that only takes a
writer, and can read from it asynchonously
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-async/src/blocking/mod.rs | 3 ++
proxmox-async/src/blocking/sender_writer.rs | 47 +++++++++++++++++++++
2 files changed, 50 insertions(+)
create mode 100644 proxmox-async/src/blocking/sender_writer.rs
diff --git a/proxmox-async/src/blocking/mod.rs b/proxmox-async/src/blocking/mod.rs
index 28247b3..06f821a 100644
--- a/proxmox-async/src/blocking/mod.rs
+++ b/proxmox-async/src/blocking/mod.rs
@@ -9,3 +9,6 @@ pub use tokio_writer_adapter::TokioWriterAdapter;
mod wrapped_reader_stream;
pub use wrapped_reader_stream::WrappedReaderStream;
+
+mod sender_writer;
+pub use sender_writer::SenderWriter;
diff --git a/proxmox-async/src/blocking/sender_writer.rs b/proxmox-async/src/blocking/sender_writer.rs
new file mode 100644
index 0000000..62682e5
--- /dev/null
+++ b/proxmox-async/src/blocking/sender_writer.rs
@@ -0,0 +1,47 @@
+use std::io;
+
+use anyhow::Error;
+use tokio::sync::mpsc::Sender;
+
+/// Wrapper struct around [`tokio::sync::mpsc::Sender`] for `Result<Vec<u8>, Error>` that implements [`std::io::Write`]
+pub struct SenderWriter {
+ sender: Sender<Result<Vec<u8>, Error>>,
+}
+
+impl SenderWriter {
+ pub fn from_sender(sender: tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>) -> Self {
+ Self { sender }
+ }
+
+ fn write_impl(&mut self, buf: &[u8]) -> io::Result<usize> {
+ if let Err(err) = self.sender.blocking_send(Ok(buf.to_vec())) {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ format!("could not send: {}", err),
+ ));
+ }
+
+ Ok(buf.len())
+ }
+
+ fn flush_impl(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl io::Write for SenderWriter {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.write_impl(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.flush_impl()
+ }
+}
+
+impl Drop for SenderWriter {
+ fn drop(&mut self) {
+ // ignore errors
+ let _ = self.flush_impl();
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2022-02-18 15:27 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-02-18 7:36 [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dietmar Maurer
2022-02-18 15:27 ` Wolfgang Bumiller
-- strict thread matches above, loose matches on Subject: below --
2022-02-17 9:40 [pbs-devel] [RFC PATCH proxmox/proxmox-backup] implement streaming serialization for api calls Dominik Csapak
2022-02-17 9:40 ` [pbs-devel] [RFC PATCH proxmox 1/4] proxmox-async: add SenderWriter helper Dominik Csapak
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox