From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH v2 http-server 08/13] support streaming data form fh to client
Date: Thu, 22 Apr 2021 17:34:52 +0200 [thread overview]
Message-ID: <20210422153457.12265-9-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210422153457.12265-1-s.reiter@proxmox.com>
Use an explicit AnyEvent::Handle similar to websocket proxying.
Needs some special care to make sure we apply backpressure correctly to
avoid caching too much data. Note that because of AnyEvent restrictions,
specifying a "fh" to point to a file or a packet-based socket may result
in unwanted behaviour[0].
[0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
v2:
* move to extra sub "response_stream"
* adapt comments
PVE/APIServer/AnyEvent.pm | 105 ++++++++++++++++++++++++++++++++++++--
1 file changed, 101 insertions(+), 4 deletions(-)
diff --git a/PVE/APIServer/AnyEvent.pm b/PVE/APIServer/AnyEvent.pm
index 60a2a1c..f3db0e3 100644
--- a/PVE/APIServer/AnyEvent.pm
+++ b/PVE/APIServer/AnyEvent.pm
@@ -188,8 +188,93 @@ sub finish_response {
}
}
+sub response_stream {
+ my ($self, $reqstate, $stream_fh) = @_;
+
+ # disable timeout, we don't know how big the data is
+ $reqstate->{hdl}->timeout(0);
+
+ my $buf_size = 4*1024*1024;
+
+ my $on_read;
+ $on_read = sub {
+ my ($hdl) = @_;
+ my $reqhdl = $reqstate->{hdl};
+ return if !$reqhdl;
+
+ my $wbuf_len = length($reqhdl->{wbuf});
+ my $rbuf_len = length($hdl->{rbuf});
+ # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
+ # that's unbounded, so just assume $buf_size
+ my $to_read = $buf_size - $wbuf_len;
+ $to_read = $rbuf_len if $rbuf_len < $to_read;
+ if ($to_read > 0) {
+ my $data = substr($hdl->{rbuf}, 0, $to_read, '');
+ $reqhdl->push_write($data);
+ $rbuf_len -= $to_read;
+ } elsif ($hdl->{_eof}) {
+ # workaround: AnyEvent gives us a fake EPIPE if we don't consume
+ # any data when called at EOF, so unregister ourselves - data is
+ # flushed by on_eof anyway
+ # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
+ $hdl->on_read();
+ return;
+ }
+
+ # apply backpressure so we don't accept any more data into
+ # buffer if the client isn't downloading fast enough
+ # note: read_size can double upon read, and we also need to
+ # account for one more read after start_read, so *4
+ if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
+ # stop reading until write buffer is empty
+ $hdl->on_read();
+ my $prev_on_drain = $reqhdl->{on_drain};
+ $reqhdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ # on_drain called because write buffer is empty, continue reading
+ $hdl->on_read($on_read);
+ if ($prev_on_drain) {
+ $wrhdl->on_drain($prev_on_drain);
+ $prev_on_drain->($wrhdl);
+ }
+ });
+ }
+ };
+
+ $reqstate->{proxyhdl} = AnyEvent::Handle->new(
+ fh => $stream_fh,
+ rbuf_max => $buf_size,
+ timeout => 0,
+ on_read => $on_read,
+ on_eof => sub {
+ my ($hdl) = @_;
+ eval {
+ if (my $reqhdl = $reqstate->{hdl}) {
+ $self->log_aborted_request($reqstate);
+ # write out any remaining data
+ $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
+ $hdl->{rbuf} = "";
+ $reqhdl->push_shutdown();
+ $self->finish_response($reqstate);
+ }
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ on_error => sub {
+ my ($hdl, $fatal, $message) = @_;
+ eval {
+ $self->log_aborted_request($reqstate, $message);
+ $self->client_do_disconnect($reqstate);
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ );
+}
+
sub response {
- my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
+ my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
#print "$$: send response: " . Dumper($resp);
@@ -231,7 +316,7 @@ sub response {
$resp->header('Server' => "pve-api-daemon/3.0");
my $content_length;
- if ($content) {
+ if ($content && !$stream_fh) {
$content_length = length($content);
@@ -258,11 +343,16 @@ sub response {
#print "SEND(without content) $res\n" if $self->{debug};
$res .= "\015\012";
- $res .= $content if $content;
+ $res .= $content if $content && !$stream_fh;
$self->log_request($reqstate, $reqstate->{request});
- if ($delay && $delay > 0) {
+ if ($stream_fh) {
+ # write headers and preamble...
+ $reqstate->{hdl}->push_write($res);
+ # ...then stream data via an AnyEvent::Handle
+ $self->response_stream($reqstate, $stream_fh);
+ } elsif ($delay && $delay > 0) {
my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
undef $w; # delete reference
$reqstate->{hdl}->push_write($res);
@@ -322,6 +412,13 @@ sub send_file_start {
if (ref($download) eq 'HASH') {
$fh = $download->{fh};
$mime = $download->{'content-type'};
+
+ if ($download->{stream}) {
+ my $header = HTTP::Headers->new(Content_Type => $mime);
+ my $resp = HTTP::Response->new(200, "OK", $header);
+ $self->response($reqstate, $resp, undef, 1, 0, $fh);
+ return;
+ }
} else {
my $filename = $download;
$fh = IO::File->new($filename, '<') ||
--
2.20.1
WARNING: multiple messages have this Message-ID
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v2 http-server 08/13] support streaming data form fh to client
Date: Thu, 22 Apr 2021 17:34:52 +0200 [thread overview]
Message-ID: <20210422153457.12265-9-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210422153457.12265-1-s.reiter@proxmox.com>
Use an explicit AnyEvent::Handle similar to websocket proxying.
Needs some special care to make sure we apply backpressure correctly to
avoid caching too much data. Note that because of AnyEvent restrictions,
specifying a "fh" to point to a file or a packet-based socket may result
in unwanted behaviour[0].
[0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
v2:
* move to extra sub "response_stream"
* adapt comments
PVE/APIServer/AnyEvent.pm | 105 ++++++++++++++++++++++++++++++++++++--
1 file changed, 101 insertions(+), 4 deletions(-)
diff --git a/PVE/APIServer/AnyEvent.pm b/PVE/APIServer/AnyEvent.pm
index 60a2a1c..f3db0e3 100644
--- a/PVE/APIServer/AnyEvent.pm
+++ b/PVE/APIServer/AnyEvent.pm
@@ -188,8 +188,93 @@ sub finish_response {
}
}
+sub response_stream {
+ my ($self, $reqstate, $stream_fh) = @_;
+
+ # disable timeout, we don't know how big the data is
+ $reqstate->{hdl}->timeout(0);
+
+ my $buf_size = 4*1024*1024;
+
+ my $on_read;
+ $on_read = sub {
+ my ($hdl) = @_;
+ my $reqhdl = $reqstate->{hdl};
+ return if !$reqhdl;
+
+ my $wbuf_len = length($reqhdl->{wbuf});
+ my $rbuf_len = length($hdl->{rbuf});
+ # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
+ # that's unbounded, so just assume $buf_size
+ my $to_read = $buf_size - $wbuf_len;
+ $to_read = $rbuf_len if $rbuf_len < $to_read;
+ if ($to_read > 0) {
+ my $data = substr($hdl->{rbuf}, 0, $to_read, '');
+ $reqhdl->push_write($data);
+ $rbuf_len -= $to_read;
+ } elsif ($hdl->{_eof}) {
+ # workaround: AnyEvent gives us a fake EPIPE if we don't consume
+ # any data when called at EOF, so unregister ourselves - data is
+ # flushed by on_eof anyway
+ # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
+ $hdl->on_read();
+ return;
+ }
+
+ # apply backpressure so we don't accept any more data into
+ # buffer if the client isn't downloading fast enough
+ # note: read_size can double upon read, and we also need to
+ # account for one more read after start_read, so *4
+ if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
+ # stop reading until write buffer is empty
+ $hdl->on_read();
+ my $prev_on_drain = $reqhdl->{on_drain};
+ $reqhdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ # on_drain called because write buffer is empty, continue reading
+ $hdl->on_read($on_read);
+ if ($prev_on_drain) {
+ $wrhdl->on_drain($prev_on_drain);
+ $prev_on_drain->($wrhdl);
+ }
+ });
+ }
+ };
+
+ $reqstate->{proxyhdl} = AnyEvent::Handle->new(
+ fh => $stream_fh,
+ rbuf_max => $buf_size,
+ timeout => 0,
+ on_read => $on_read,
+ on_eof => sub {
+ my ($hdl) = @_;
+ eval {
+ if (my $reqhdl = $reqstate->{hdl}) {
+ $self->log_aborted_request($reqstate);
+ # write out any remaining data
+ $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
+ $hdl->{rbuf} = "";
+ $reqhdl->push_shutdown();
+ $self->finish_response($reqstate);
+ }
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ on_error => sub {
+ my ($hdl, $fatal, $message) = @_;
+ eval {
+ $self->log_aborted_request($reqstate, $message);
+ $self->client_do_disconnect($reqstate);
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ );
+}
+
sub response {
- my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
+ my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
#print "$$: send response: " . Dumper($resp);
@@ -231,7 +316,7 @@ sub response {
$resp->header('Server' => "pve-api-daemon/3.0");
my $content_length;
- if ($content) {
+ if ($content && !$stream_fh) {
$content_length = length($content);
@@ -258,11 +343,16 @@ sub response {
#print "SEND(without content) $res\n" if $self->{debug};
$res .= "\015\012";
- $res .= $content if $content;
+ $res .= $content if $content && !$stream_fh;
$self->log_request($reqstate, $reqstate->{request});
- if ($delay && $delay > 0) {
+ if ($stream_fh) {
+ # write headers and preamble...
+ $reqstate->{hdl}->push_write($res);
+ # ...then stream data via an AnyEvent::Handle
+ $self->response_stream($reqstate, $stream_fh);
+ } elsif ($delay && $delay > 0) {
my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
undef $w; # delete reference
$reqstate->{hdl}->push_write($res);
@@ -322,6 +412,13 @@ sub send_file_start {
if (ref($download) eq 'HASH') {
$fh = $download->{fh};
$mime = $download->{'content-type'};
+
+ if ($download->{stream}) {
+ my $header = HTTP::Headers->new(Content_Type => $mime);
+ my $resp = HTTP::Response->new(200, "OK", $header);
+ $self->response($reqstate, $resp, undef, 1, 0, $fh);
+ return;
+ }
} else {
my $filename = $download;
$fh = IO::File->new($filename, '<') ||
--
2.20.1
next prev parent reply other threads:[~2021-04-22 15:35 UTC|newest]
Thread overview: 59+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-04-22 15:34 [pve-devel] [PATCH v2 00/13] Single-file-restore GUI for PBS snapshots Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 15:34 ` [pve-devel] [PATCH v2 proxmox-backup 01/13] file-restore: don't force PBS_FINGERPRINT env var Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 17:07 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-22 17:07 ` [pbs-devel] applied: " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 proxmox-backup 02/13] client-tools: add crypto_parameters_keep_fd Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 17:07 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-22 17:07 ` [pbs-devel] applied: " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 proxmox-backup 03/13] file-restore: support encrypted VM backups Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 17:07 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-22 17:07 ` [pbs-devel] applied: " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 common 04/13] PBSClient: adapt error message to include full package names Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-23 12:17 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-23 12:17 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 common 05/13] PBSClient: add file_restore_list command Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-23 12:17 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-23 12:17 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 common 06/13] PBSClient: add file_restore_extract function Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-23 12:17 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-23 12:17 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 common 07/13] PBSClient: use crypt params for file 'list' and 'extract' Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 19:14 ` [pve-devel] " Thomas Lamprecht
2021-04-22 19:14 ` [pbs-devel] " Thomas Lamprecht
2021-04-23 12:18 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-23 12:18 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` Stefan Reiter [this message]
2021-04-22 15:34 ` [pbs-devel] [PATCH v2 http-server 08/13] support streaming data form fh to client Stefan Reiter
2021-04-23 11:56 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-23 11:56 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 http-server 09/13] allow stream download from path and over pvedaemon-proxy Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-23 11:56 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-23 11:56 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 storage 10/13] add FileRestore API for PBS Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-23 10:34 ` [pve-devel] [PATCH manager] file-restore: pass in full volume ID Fabian Grünbichler
2021-04-23 10:34 ` [pve-devel] [PATCH storage 1/2] file-restore: return perl-y booleans Fabian Grünbichler
2021-04-23 10:34 ` [pve-devel] [PATCH storage 2/2] file-restore: pass in volume ID or name Fabian Grünbichler
2021-04-22 15:34 ` [pve-devel] [PATCH v2 proxmox-widget-toolkit 11/13] Utils: add errorCallback to monStoreErrors Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 18:41 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-22 18:41 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 proxmox-widget-toolkit 12/13] FileBrowser: support 'virtual'/'v' file type Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 18:41 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-22 18:41 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:34 ` [pve-devel] [PATCH v2 proxmox-widget-toolkit 13/13] FileBrowser: show errors in messagebox and allow expand 'all' Stefan Reiter
2021-04-22 15:34 ` [pbs-devel] " Stefan Reiter
2021-04-22 18:41 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-22 18:41 ` [pbs-devel] applied: [pve-devel] " Thomas Lamprecht
2021-04-22 15:47 ` [pve-devel] [PATCH v2 manager 1/2] backupview: add file restore button Stefan Reiter
2021-04-22 15:47 ` [pve-devel] [PATCH v2 manager 2/2] gui: add task name for 'pbs-download' Stefan Reiter
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210422153457.12265-9-s.reiter@proxmox.com \
--to=s.reiter@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.