public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v2 http-server 08/13] support streaming data form fh to client
Date: Thu, 22 Apr 2021 17:34:52 +0200	[thread overview]
Message-ID: <20210422153457.12265-9-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210422153457.12265-1-s.reiter@proxmox.com>

Use an explicit AnyEvent::Handle similar to websocket proxying.

Needs some special care to make sure we apply backpressure correctly to
avoid caching too much data. Note that because of AnyEvent restrictions,
specifying a "fh" to point to a file or a packet-based socket may result
in unwanted behaviour[0].

[0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

v2:
* move to extra sub "response_stream"
* adapt comments

 PVE/APIServer/AnyEvent.pm | 105 ++++++++++++++++++++++++++++++++++++--
 1 file changed, 101 insertions(+), 4 deletions(-)

diff --git a/PVE/APIServer/AnyEvent.pm b/PVE/APIServer/AnyEvent.pm
index 60a2a1c..f3db0e3 100644
--- a/PVE/APIServer/AnyEvent.pm
+++ b/PVE/APIServer/AnyEvent.pm
@@ -188,8 +188,93 @@ sub finish_response {
     }
 }
 
+sub response_stream {
+    my ($self, $reqstate, $stream_fh) = @_;
+
+    # disable timeout, we don't know how big the data is
+    $reqstate->{hdl}->timeout(0);
+
+    my $buf_size = 4*1024*1024;
+
+    my $on_read;
+    $on_read = sub {
+	my ($hdl) = @_;
+	my $reqhdl = $reqstate->{hdl};
+	return if !$reqhdl;
+
+	my $wbuf_len = length($reqhdl->{wbuf});
+	my $rbuf_len = length($hdl->{rbuf});
+	# TODO: Take into account $reqhdl->{wbuf_max} ? Right now
+	# that's unbounded, so just assume $buf_size
+	my $to_read = $buf_size - $wbuf_len;
+	$to_read = $rbuf_len if $rbuf_len < $to_read;
+	if ($to_read > 0) {
+	    my $data = substr($hdl->{rbuf}, 0, $to_read, '');
+	    $reqhdl->push_write($data);
+	    $rbuf_len -= $to_read;
+	} elsif ($hdl->{_eof}) {
+	    # workaround: AnyEvent gives us a fake EPIPE if we don't consume
+	    # any data when called at EOF, so unregister ourselves - data is
+	    # flushed by on_eof anyway
+	    # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
+	    $hdl->on_read();
+	    return;
+	}
+
+	# apply backpressure so we don't accept any more data into
+	# buffer if the client isn't downloading fast enough
+	# note: read_size can double upon read, and we also need to
+	# account for one more read after start_read, so *4
+	if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
+	    # stop reading until write buffer is empty
+	    $hdl->on_read();
+	    my $prev_on_drain = $reqhdl->{on_drain};
+	    $reqhdl->on_drain(sub {
+		my ($wrhdl) = @_;
+		# on_drain called because write buffer is empty, continue reading
+		$hdl->on_read($on_read);
+		if ($prev_on_drain) {
+		    $wrhdl->on_drain($prev_on_drain);
+		    $prev_on_drain->($wrhdl);
+		}
+	    });
+	}
+    };
+
+    $reqstate->{proxyhdl} = AnyEvent::Handle->new(
+	fh => $stream_fh,
+	rbuf_max => $buf_size,
+	timeout => 0,
+	on_read => $on_read,
+	on_eof => sub {
+	    my ($hdl) = @_;
+	    eval {
+		if (my $reqhdl = $reqstate->{hdl}) {
+		    $self->log_aborted_request($reqstate);
+		    # write out any remaining data
+		    $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
+		    $hdl->{rbuf} = "";
+		    $reqhdl->push_shutdown();
+		    $self->finish_response($reqstate);
+		}
+	    };
+	    if (my $err = $@) { syslog('err', "$err"); }
+	    $on_read = undef;
+	},
+	on_error => sub {
+	    my ($hdl, $fatal, $message) = @_;
+	    eval {
+		$self->log_aborted_request($reqstate, $message);
+		$self->client_do_disconnect($reqstate);
+	    };
+	    if (my $err = $@) { syslog('err', "$err"); }
+	    $on_read = undef;
+	},
+    );
+}
+
 sub response {
-    my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
+    my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
 
     #print "$$: send response: " . Dumper($resp);
 
@@ -231,7 +316,7 @@ sub response {
     $resp->header('Server' => "pve-api-daemon/3.0");
 
     my $content_length;
-    if ($content) {
+    if ($content && !$stream_fh) {
 
 	$content_length = length($content);
 
@@ -258,11 +343,16 @@ sub response {
     #print "SEND(without content) $res\n" if $self->{debug};
 
     $res .= "\015\012";
-    $res .= $content if $content;
+    $res .= $content if $content && !$stream_fh;
 
     $self->log_request($reqstate, $reqstate->{request});
 
-    if ($delay && $delay > 0) {
+    if ($stream_fh) {
+	# write headers and preamble...
+	$reqstate->{hdl}->push_write($res);
+	# ...then stream data via an AnyEvent::Handle
+	$self->response_stream($reqstate, $stream_fh);
+    } elsif ($delay && $delay > 0) {
 	my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
 	    undef $w; # delete reference
 	    $reqstate->{hdl}->push_write($res);
@@ -322,6 +412,13 @@ sub send_file_start {
 	    if (ref($download) eq 'HASH') {
 		$fh = $download->{fh};
 		$mime = $download->{'content-type'};
+
+		if ($download->{stream}) {
+		    my $header = HTTP::Headers->new(Content_Type => $mime);
+		    my $resp = HTTP::Response->new(200, "OK", $header);
+		    $self->response($reqstate, $resp, undef, 1, 0, $fh);
+		    return;
+		}
 	    } else {
 		my $filename = $download;
 		$fh = IO::File->new($filename, '<') ||
-- 
2.20.1





  parent reply	other threads:[~2021-04-22 15:35 UTC|newest]

Thread overview: 27+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-04-22 15:34 [pbs-devel] [PATCH v2 00/13] Single-file-restore GUI for PBS snapshots Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 proxmox-backup 01/13] file-restore: don't force PBS_FINGERPRINT env var Stefan Reiter
2021-04-22 17:07   ` [pbs-devel] applied: " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 proxmox-backup 02/13] client-tools: add crypto_parameters_keep_fd Stefan Reiter
2021-04-22 17:07   ` [pbs-devel] applied: " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 proxmox-backup 03/13] file-restore: support encrypted VM backups Stefan Reiter
2021-04-22 17:07   ` [pbs-devel] applied: " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 common 04/13] PBSClient: adapt error message to include full package names Stefan Reiter
2021-04-23 12:17   ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 common 05/13] PBSClient: add file_restore_list command Stefan Reiter
2021-04-23 12:17   ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 common 06/13] PBSClient: add file_restore_extract function Stefan Reiter
2021-04-23 12:17   ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 common 07/13] PBSClient: use crypt params for file 'list' and 'extract' Stefan Reiter
2021-04-22 19:14   ` [pbs-devel] [pve-devel] " Thomas Lamprecht
2021-04-23 12:18   ` [pbs-devel] applied: " Thomas Lamprecht
2021-04-22 15:34 ` Stefan Reiter [this message]
2021-04-23 11:56   ` [pbs-devel] applied: [pve-devel] [PATCH v2 http-server 08/13] support streaming data form fh to client Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 http-server 09/13] allow stream download from path and over pvedaemon-proxy Stefan Reiter
2021-04-23 11:56   ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 storage 10/13] add FileRestore API for PBS Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 proxmox-widget-toolkit 11/13] Utils: add errorCallback to monStoreErrors Stefan Reiter
2021-04-22 18:41   ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 proxmox-widget-toolkit 12/13] FileBrowser: support 'virtual'/'v' file type Stefan Reiter
2021-04-22 18:41   ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 proxmox-widget-toolkit 13/13] FileBrowser: show errors in messagebox and allow expand 'all' Stefan Reiter
2021-04-22 18:41   ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210422153457.12265-9-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal