From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH http-server 07/10] support streaming data form fh to client
Date: Wed, 21 Apr 2021 13:15:36 +0200 [thread overview]
Message-ID: <20210421111539.29261-8-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210421111539.29261-1-s.reiter@proxmox.com>
Use an explicit AnyEvent::Handle similar to websocket proxying.
Needs some special care to make sure we apply backpressure correctly to
avoid caching too much data. Note that because of AnyEvent restrictions,
specifying a "fh" to point to a file or a packet-based socket may result
in unwanted behaviour[0].
[0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
PVE/APIServer/AnyEvent.pm | 97 +++++++++++++++++++++++++++++++++++++--
1 file changed, 93 insertions(+), 4 deletions(-)
diff --git a/PVE/APIServer/AnyEvent.pm b/PVE/APIServer/AnyEvent.pm
index 60a2a1c..643ae88 100644
--- a/PVE/APIServer/AnyEvent.pm
+++ b/PVE/APIServer/AnyEvent.pm
@@ -189,7 +189,7 @@ sub finish_response {
}
sub response {
- my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
+ my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
#print "$$: send response: " . Dumper($resp);
@@ -231,7 +231,7 @@ sub response {
$resp->header('Server' => "pve-api-daemon/3.0");
my $content_length;
- if ($content) {
+ if ($content && !$stream_fh) {
$content_length = length($content);
@@ -258,11 +258,93 @@ sub response {
#print "SEND(without content) $res\n" if $self->{debug};
$res .= "\015\012";
- $res .= $content if $content;
+ $res .= $content if $content && !$stream_fh;
$self->log_request($reqstate, $reqstate->{request});
- if ($delay && $delay > 0) {
+ if ($stream_fh) {
+ # write headers and preamble
+ $reqstate->{hdl}->push_write($res);
+
+ # then attach an AnyEvent::Handle to pass through the data
+ my $buf_size = 4*1024*1024;
+
+ my $on_read;
+ $on_read = sub {
+ my ($hdl) = @_;
+ my $reqhdl = $reqstate->{hdl};
+ return if !$reqhdl;
+
+ my $wbuf_len = length($reqhdl->{wbuf});
+ my $rbuf_len = length($hdl->{rbuf});
+ # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
+ # that's unbounded, so just assume $buf_size
+ my $to_read = $buf_size - $wbuf_len;
+ $to_read = $rbuf_len if $rbuf_len < $to_read;
+ if ($to_read > 0) {
+ my $data = substr($hdl->{rbuf}, 0, $to_read, '');
+ $reqhdl->push_write($data);
+ $rbuf_len -= $to_read;
+ } elsif ($hdl->{_eof}) {
+ # workaround: AnyEvent gives us a fake EPIPE if we don't consume
+ # any data when called at EOF, so unregister ourselves - data is
+ # flushed by on_eof anyway
+ # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
+ $hdl->on_read();
+ return;
+ }
+
+ # apply backpressure so we don't accept any more data into
+ # buffer if the client isn't downloading fast enough
+ # note: read_size can double upon read, and we also need to
+ # account for one more read after start_read, so *4
+ if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
+ # stop reading
+ $hdl->on_read();
+ my $prev_on_drain = $reqhdl->{on_drain};
+ $reqhdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ # write buffer is empty, continue reading
+ $hdl->on_read($on_read);
+ if ($prev_on_drain) {
+ $wrhdl->on_drain($prev_on_drain);
+ $prev_on_drain->($wrhdl);
+ }
+ });
+ }
+ };
+
+ $reqstate->{proxyhdl} = AnyEvent::Handle->new(
+ fh => $stream_fh,
+ rbuf_max => $buf_size,
+ timeout => 0,
+ on_read => $on_read,
+ on_eof => sub {
+ my ($hdl) = @_;
+ eval {
+ if (my $reqhdl = $reqstate->{hdl}) {
+ $self->log_aborted_request($reqstate);
+ # write out any remaining data
+ $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
+ $hdl->{rbuf} = "";
+ $reqhdl->push_shutdown();
+ $self->finish_response($reqstate);
+ }
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ on_error => sub {
+ my ($hdl, $fatal, $message) = @_;
+ eval {
+ $self->log_aborted_request($reqstate, $message);
+ $self->client_do_disconnect($reqstate);
+ };
+ if (my $err = $@) { syslog('err', "$err"); }
+ $on_read = undef;
+ },
+ );
+ } elsif ($delay && $delay > 0) {
my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
undef $w; # delete reference
$reqstate->{hdl}->push_write($res);
@@ -322,6 +404,13 @@ sub send_file_start {
if (ref($download) eq 'HASH') {
$fh = $download->{fh};
$mime = $download->{'content-type'};
+
+ if ($download->{stream}) {
+ my $header = HTTP::Headers->new(Content_Type => $mime);
+ my $resp = HTTP::Response->new(200, "OK", $header);
+ $self->response($reqstate, $resp, undef, 1, 0, $fh);
+ return;
+ }
} else {
my $filename = $download;
$fh = IO::File->new($filename, '<') ||
--
2.20.1
next prev parent reply other threads:[~2021-04-21 11:24 UTC|newest]
Thread overview: 26+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-04-21 11:15 [pve-devel] [PATCH 00/10] Single-file-restore GUI for PBS snapshots Stefan Reiter
2021-04-21 11:15 ` [pve-devel] [PATCH RESEND common 01/10] JSONSchema: don't cycle-check 'download' responses Stefan Reiter
2021-04-21 15:37 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-21 11:15 ` [pve-devel] [PATCH common 02/10] PBSClient: allow running other binaries Stefan Reiter
2021-04-21 14:29 ` Thomas Lamprecht
2021-04-21 14:38 ` Stefan Reiter
2021-04-21 14:50 ` Thomas Lamprecht
2021-04-21 15:37 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-21 11:15 ` [pve-devel] [PATCH common 03/10] PBSClient: add file_restore_list command Stefan Reiter
[not found] ` <<20210421111539.29261-4-s.reiter@proxmox.com>
2021-04-21 13:19 ` Fabian Grünbichler
2021-04-21 11:15 ` [pve-devel] [PATCH common 04/10] PBSClient: allow different command execution callback Stefan Reiter
[not found] ` <<20210421111539.29261-5-s.reiter@proxmox.com>
2021-04-21 13:19 ` Fabian Grünbichler
2021-04-21 13:39 ` Stefan Reiter
2021-04-21 11:15 ` [pve-devel] [PATCH common 05/10] PBSClient: add file_restore_extract function Stefan Reiter
2021-04-21 11:15 ` [pve-devel] [PATCH RESEND http-server 06/10] allow 'download' to be passed from API handler Stefan Reiter
2021-04-21 15:43 ` [pve-devel] applied: " Thomas Lamprecht
2021-04-21 11:15 ` Stefan Reiter [this message]
[not found] ` <<20210421111539.29261-8-s.reiter@proxmox.com>
2021-04-21 13:25 ` [pve-devel] [PATCH http-server 07/10] support streaming data form fh to client Fabian Grünbichler
2021-04-21 11:15 ` [pve-devel] [PATCH http-server 08/10] allow stream download from path and over pvedaemon-proxy Stefan Reiter
2021-04-21 11:15 ` [pve-devel] [PATCH storage 09/10] add FileRestore API for PBS Stefan Reiter
[not found] ` <<20210421111539.29261-10-s.reiter@proxmox.com>
2021-04-21 13:26 ` Fabian Grünbichler
2021-04-21 13:38 ` Stefan Reiter
2021-04-22 6:19 ` Fabian Grünbichler
2021-04-21 11:15 ` [pve-devel] [PATCH manager 10/10] backupview: add file restore button Stefan Reiter
2021-04-22 10:33 ` [pve-devel] [PATCH 00/10] Single-file-restore GUI for PBS snapshots Dominic Jäger
2021-04-22 12:12 ` Stefan Reiter
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210421111539.29261-8-s.reiter@proxmox.com \
--to=s.reiter@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox