From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 9FB2675E01; Thu, 22 Apr 2021 17:35:39 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 406D01EBFA; Thu, 22 Apr 2021 17:35:14 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 572071EB36; Thu, 22 Apr 2021 17:35:05 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 2FFBB4200F; Thu, 22 Apr 2021 17:35:05 +0200 (CEST) From: Stefan Reiter To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com Date: Thu, 22 Apr 2021 17:34:52 +0200 Message-Id: <20210422153457.12265-9-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210422153457.12265-1-s.reiter@proxmox.com> References: <20210422153457.12265-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [anyevent.pm, metacpan.org] Subject: [pve-devel] [PATCH v2 http-server 08/13] support streaming data form fh to client X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 22 Apr 2021 15:35:40 -0000 Use an explicit AnyEvent::Handle similar to websocket proxying. Needs some special care to make sure we apply backpressure correctly to avoid caching too much data. Note that because of AnyEvent restrictions, specifying a "fh" to point to a file or a packet-based socket may result in unwanted behaviour[0]. [0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION Signed-off-by: Stefan Reiter --- v2: * move to extra sub "response_stream" * adapt comments PVE/APIServer/AnyEvent.pm | 105 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 4 deletions(-) diff --git a/PVE/APIServer/AnyEvent.pm b/PVE/APIServer/AnyEvent.pm index 60a2a1c..f3db0e3 100644 --- a/PVE/APIServer/AnyEvent.pm +++ b/PVE/APIServer/AnyEvent.pm @@ -188,8 +188,93 @@ sub finish_response { } } +sub response_stream { + my ($self, $reqstate, $stream_fh) = @_; + + # disable timeout, we don't know how big the data is + $reqstate->{hdl}->timeout(0); + + my $buf_size = 4*1024*1024; + + my $on_read; + $on_read = sub { + my ($hdl) = @_; + my $reqhdl = $reqstate->{hdl}; + return if !$reqhdl; + + my $wbuf_len = length($reqhdl->{wbuf}); + my $rbuf_len = length($hdl->{rbuf}); + # TODO: Take into account $reqhdl->{wbuf_max} ? Right now + # that's unbounded, so just assume $buf_size + my $to_read = $buf_size - $wbuf_len; + $to_read = $rbuf_len if $rbuf_len < $to_read; + if ($to_read > 0) { + my $data = substr($hdl->{rbuf}, 0, $to_read, ''); + $reqhdl->push_write($data); + $rbuf_len -= $to_read; + } elsif ($hdl->{_eof}) { + # workaround: AnyEvent gives us a fake EPIPE if we don't consume + # any data when called at EOF, so unregister ourselves - data is + # flushed by on_eof anyway + # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329 + $hdl->on_read(); + return; + } + + # apply backpressure so we don't accept any more data into + # buffer if the client isn't downloading fast enough + # note: read_size can double upon read, and we also need to + # account for one more read after start_read, so *4 + if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) { + # stop reading until write buffer is empty + $hdl->on_read(); + my $prev_on_drain = $reqhdl->{on_drain}; + $reqhdl->on_drain(sub { + my ($wrhdl) = @_; + # on_drain called because write buffer is empty, continue reading + $hdl->on_read($on_read); + if ($prev_on_drain) { + $wrhdl->on_drain($prev_on_drain); + $prev_on_drain->($wrhdl); + } + }); + } + }; + + $reqstate->{proxyhdl} = AnyEvent::Handle->new( + fh => $stream_fh, + rbuf_max => $buf_size, + timeout => 0, + on_read => $on_read, + on_eof => sub { + my ($hdl) = @_; + eval { + if (my $reqhdl = $reqstate->{hdl}) { + $self->log_aborted_request($reqstate); + # write out any remaining data + $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0; + $hdl->{rbuf} = ""; + $reqhdl->push_shutdown(); + $self->finish_response($reqstate); + } + }; + if (my $err = $@) { syslog('err', "$err"); } + $on_read = undef; + }, + on_error => sub { + my ($hdl, $fatal, $message) = @_; + eval { + $self->log_aborted_request($reqstate, $message); + $self->client_do_disconnect($reqstate); + }; + if (my $err = $@) { syslog('err', "$err"); } + $on_read = undef; + }, + ); +} + sub response { - my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_; + my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_; #print "$$: send response: " . Dumper($resp); @@ -231,7 +316,7 @@ sub response { $resp->header('Server' => "pve-api-daemon/3.0"); my $content_length; - if ($content) { + if ($content && !$stream_fh) { $content_length = length($content); @@ -258,11 +343,16 @@ sub response { #print "SEND(without content) $res\n" if $self->{debug}; $res .= "\015\012"; - $res .= $content if $content; + $res .= $content if $content && !$stream_fh; $self->log_request($reqstate, $reqstate->{request}); - if ($delay && $delay > 0) { + if ($stream_fh) { + # write headers and preamble... + $reqstate->{hdl}->push_write($res); + # ...then stream data via an AnyEvent::Handle + $self->response_stream($reqstate, $stream_fh); + } elsif ($delay && $delay > 0) { my $w; $w = AnyEvent->timer(after => $delay, cb => sub { undef $w; # delete reference $reqstate->{hdl}->push_write($res); @@ -322,6 +412,13 @@ sub send_file_start { if (ref($download) eq 'HASH') { $fh = $download->{fh}; $mime = $download->{'content-type'}; + + if ($download->{stream}) { + my $header = HTTP::Headers->new(Content_Type => $mime); + my $resp = HTTP::Response->new(200, "OK", $header); + $self->response($reqstate, $resp, undef, 1, 0, $fh); + return; + } } else { my $filename = $download; $fh = IO::File->new($filename, '<') || -- 2.20.1