From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id E0E431FF14F for ; Wed, 17 Jun 2026 14:30:05 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 8B96233742; Wed, 17 Jun 2026 14:30:05 +0200 (CEST) From: Kefu Chai To: pve-devel@lists.proxmox.com Subject: [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to proxy handlers Date: Wed, 17 Jun 2026 20:29:05 +0800 Message-ID: <20260617122905.3822836-2-k.chai@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260617122905.3822836-1-k.chai@proxmox.com> References: <20260617122905.3822836-1-k.chai@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1781699306434 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.280 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: JKXYYC2TWIEN6COPFX7HD5A2DB5TWRNV X-Message-ID-Hash: JKXYYC2TWIEN6COPFX7HD5A2DB5TWRNV X-MailFrom: k.chai@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: When pveproxy forwards a WebSocket or SPICE connection, data can arrive faster than the backend takes it. With no flow control pveproxy buffers the excess and can get OOM-killed. PDM cross-cluster migration to LVM-thin triggers this easily: zeroing newly-allocated extents on first write makes the receiving side fall behind the transfer rate. Any remote migration, the VM/node consoles on the same proxy path, and SPICE sessions carrying bulk data (USB passthrough) can hit it too. The backend handle's wbuf_max doesn't help: AnyEvent only checks it in the `if (!$self->{_ww})` guard in _drain_wbuf, so once the first EAGAIN installs a write watcher, later push_write calls skip the check and wbuf grows without bound. And if it did fire it raises ENOSPC and drops the connection rather than slowing the reader. Add a shared apply_read_backpressure() helper that pauses reads on the source when the destination wbuf grows too large, and resumes them from an on_drain callback. It reads the resume callback off the source handle instead of taking it as an argument: a reader passed back to itself is a self-referential closure, an uncollectable cycle that pinned $reqstate on every disconnect. This also lets response_stream() drop the manual `$on_read = undef` it kept only to break that cycle. Used from response_stream() (replacing the inline block, no behaviour change), websocket_proxy() (no backpressure before), and handle_spice_proxy_request() (only the ineffective wbuf_max and a TODO). In on_eof, drain what the pause left in rbuf through the reader before tearing down, instead of dropping it. This is best-effort: client_do_disconnect() half-closes the socket without flushing the peer's wbuf, so only bytes push_write() already handed to the kernel are sure to land. It still beats dropping the tail every time; a hard guarantee would need block_disconnect on the client handle and teardown via on_drain, left out here. handle_proxy_eof() drains and tears down in separate eval blocks: the websocket reader dies on a malformed trailing frame, and a shared eval would let that skip client_do_disconnect() and leak the connection. The teardown half is the log-and-disconnect the on_error handlers all duplicated, now shared as handle_proxy_error(). Pausing reads also delays in-band control messages (WebSocket ping/pong, SPICE keepalives) behind the queued data, but 640 KB drains in a few milliseconds even on first-write LVM-thin, well under any timeout. Only a fully stalled backend stretches the pause long enough to matter, and then a single connection times out instead of pveproxy OOM-killing every session on the node. Also drop a stale TODO in response_stream() about using wbuf_max as the threshold: per the above, it and the backpressure threshold serve different purposes and stay separate. Signed-off-by: Kefu Chai --- src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++----------- 1 file changed, 135 insertions(+), 62 deletions(-) diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm index 915d678..93f2c23 100644 --- a/src/PVE/APIServer/AnyEvent.pm +++ b/src/PVE/APIServer/AnyEvent.pm @@ -206,6 +206,58 @@ sub finish_response { } } +# Pause reads on $read_hdl until $write_hdl's wbuf drains, then resume by +# re-registering the on_read that was active at pause time. It is read off +# the handle, not passed in, so a reader need not reference its own closure +# (a self-referential closure is an uncollectable cycle that pins $reqstate). +# Caller decides the threshold and must liveness-check in its on_read. No-op +# if reads are already paused. Chains any existing on_drain on $write_hdl. +sub apply_read_backpressure { + my ($read_hdl, $write_hdl) = @_; + + my $on_read_cb = $read_hdl->{on_read}; + return if !$on_read_cb; + + $read_hdl->on_read(); + my $prev_on_drain = $write_hdl->{on_drain}; + $write_hdl->on_drain(sub { + my ($wrhdl) = @_; + # uninstall ourselves (restoring any prior handler) before resuming. + $wrhdl->on_drain($prev_on_drain); + $read_hdl->on_read($on_read_cb); + $prev_on_drain->($wrhdl) if $prev_on_drain; + }); +} + +# shared on_error body: log the aborted request and disconnect. +sub handle_proxy_error { + my ($self, $reqstate, $message) = @_; + eval { + $self->log_aborted_request($reqstate, $message); + $self->client_do_disconnect($reqstate); + }; + if (my $err = $@) { syslog('err', $err); } +} + +# shared on_eof body: drain bytes the backpressure pause left in rbuf +# through $reader, then disconnect. The loop stops on no progress, so it is +# safe with any reader (peer gone, empty rbuf, or an unparsable partial +# frame all leave rbuf unchanged). +sub handle_proxy_eof { + my ($self, $reqstate, $hdl, $reader, $peer_hdl) = @_; + # drain and teardown in separate evals: a reader can die (e.g. on a + # malformed trailing frame), which must not skip the disconnect. + eval { + while ($peer_hdl && length($hdl->{rbuf})) { + my $before = length($hdl->{rbuf}); + $reader->($hdl, 1); + last if length($hdl->{rbuf}) == $before; + } + }; + if (my $err = $@) { syslog('err', $err); } + $self->handle_proxy_error($reqstate); +} + sub response_stream { my ($self, $reqstate, $stream_fh) = @_; @@ -214,16 +266,13 @@ sub response_stream { my $buf_size = 4 * 1024 * 1024; - my $on_read; - $on_read = sub { + my $on_read = sub { my ($hdl) = @_; my $reqhdl = $reqstate->{hdl}; return if !$reqhdl; my $wbuf_len = length($reqhdl->{wbuf}); my $rbuf_len = length($hdl->{rbuf}); - # TODO: Take into account $reqhdl->{wbuf_max} ? Right now - # that's unbounded, so just assume $buf_size my $to_read = $buf_size - $wbuf_len; $to_read = $rbuf_len if $rbuf_len < $to_read; if ($to_read > 0) { @@ -241,20 +290,9 @@ sub response_stream { # apply backpressure so we don't accept any more data into buffer if the client isn't # downloading fast enough. Note: read_size can double upon read, and we also need to account - # for one more read after# start_read, so multiply by 4 + # for one more read after start_read, so multiply by 4 if ($rbuf_len + $hdl->{read_size} * 4 > $buf_size) { - # stop reading until write buffer is empty - $hdl->on_read(); - my $prev_on_drain = $reqhdl->{on_drain}; - $reqhdl->on_drain(sub { - my ($wrhdl) = @_; - # on_drain called because write buffer is empty, continue reading - $hdl->on_read($on_read); - if ($prev_on_drain) { - $wrhdl->on_drain($prev_on_drain); - $prev_on_drain->($wrhdl); - } - }); + apply_read_backpressure($hdl, $reqhdl); } }; @@ -276,16 +314,10 @@ sub response_stream { } }; if (my $err = $@) { syslog('err', "$err"); } - $on_read = undef; }, on_error => sub { my ($hdl, $fatal, $message) = @_; - eval { - $self->log_aborted_request($reqstate, $message); - $self->client_do_disconnect($reqstate); - }; - if (my $err = $@) { syslog('err', "$err"); } - $on_read = undef; + $self->handle_proxy_error($reqstate, $message); }, ); } @@ -581,44 +613,49 @@ sub websocket_proxy { $self->dprint("CONNECTed to '$remhost:$remport'"); + my $wbuf_limit = $max_payload_size * 5; + + # forward-declared for the on_eof closure below. + my $proxyhdlreader; + $reqstate->{proxyhdl} = AnyEvent::Handle->new( fh => $fh, rbuf_max => $max_payload_size, - wbuf_max => $max_payload_size * 5, timeout => 5, + # drain frames buffered during a backpressure pause before + # tearing down, else the tail of a bulk transfer is dropped. on_eof => sub { my ($hdl) = @_; - eval { - $self->log_aborted_request($reqstate); - $self->client_do_disconnect($reqstate); - }; - if (my $err = $@) { syslog('err', $err); } + $self->handle_proxy_eof( + $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl}, + ); }, on_error => sub { my ($hdl, $fatal, $message) = @_; - eval { - $self->log_aborted_request($reqstate, $message); - $self->client_do_disconnect($reqstate); - }; - if (my $err = $@) { syslog('err', "$err"); } + $self->handle_proxy_error($reqstate, $message); }, ); - my $proxyhdlreader = sub { - my ($hdl) = @_; + $proxyhdlreader = sub { + my ($hdl, $no_backpressure) = @_; + + my $clienthdl = $reqstate->{hdl}; + return if !$clienthdl; my $len = length($hdl->{rbuf}); my $data = substr($hdl->{rbuf}, 0, $len > $max_payload_size ? $max_payload_size : $len, ''); - my $string = $encode->(\$data); + $clienthdl->push_write($encode->(\$data)); - $reqstate->{hdl}->push_write($string) if $reqstate->{hdl}; + if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) { + apply_read_backpressure($hdl, $clienthdl); + } }; my $hdlreader = sub { - my ($hdl) = @_; + my ($hdl, $no_backpressure) = @_; while (my $len = length($hdl->{rbuf})) { return if $len < 2; @@ -672,7 +709,17 @@ sub websocket_proxy { } if ($opcode == 1 || $opcode == 2) { - $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl}; + my $proxyhdl = $reqstate->{proxyhdl}; + if ($proxyhdl) { + $proxyhdl->push_write($payload); + if ( + !$no_backpressure + && length($proxyhdl->{wbuf}) > $wbuf_limit + ) { + apply_read_backpressure($hdl, $proxyhdl); + return; + } + } } elsif ($opcode == 8) { my $statuscode = unpack("n", $payload); $self->dprint("websocket received close. status code: '$statuscode'"); @@ -700,7 +747,14 @@ sub websocket_proxy { $reqstate->{proxyhdl}->on_read($proxyhdlreader); $reqstate->{hdl}->on_read($hdlreader); - # todo: use stop_read/start_read if write buffer grows to much + # override the accept-time on_eof so a pause's buffered + # frames are drained through $hdlreader before teardown. + $reqstate->{hdl}->on_eof(sub { + my ($hdl) = @_; + $self->handle_proxy_eof( + $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl}, + ); + }); # FIXME: remove protocol in PVE/PMG 8.x # @@ -1098,7 +1152,8 @@ sub handle_spice_proxy_request { } $reqstate->{hdl}->timeout(0); - $reqstate->{hdl}->wbuf_max(64 * 10 * 1024); + + my $wbuf_limit = 64 * 10 * 1024; my $remhost = $remip ? $remip : "localhost"; my $remport = $remip ? 3128 : $spiceport; @@ -1108,47 +1163,58 @@ sub handle_spice_proxy_request { or die "connect to '$remhost:$remport' failed: $!"; $self->dprint("CONNECTed to '$remhost:$remport'"); + + # forward-declared for the on_eof closure below. + my $proxyhdlreader; + $reqstate->{proxyhdl} = AnyEvent::Handle->new( fh => $fh, rbuf_max => 64 * 1024, - wbuf_max => 64 * 10 * 1024, timeout => 5, + # drain bytes buffered during a backpressure pause before tearing + # down, else bulk data (e.g. USB passthrough) is dropped. on_eof => sub { my ($hdl) = @_; - eval { - $self->log_aborted_request($reqstate); - $self->client_do_disconnect($reqstate); - }; - if (my $err = $@) { syslog('err', $err); } + $self->handle_proxy_eof( + $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl}, + ); }, on_error => sub { my ($hdl, $fatal, $message) = @_; - eval { - $self->log_aborted_request($reqstate, $message); - $self->client_do_disconnect($reqstate); - }; - if (my $err = $@) { syslog('err', "$err"); } + $self->handle_proxy_error($reqstate, $message); }, ); - my $proxyhdlreader = sub { - my ($hdl) = @_; + $proxyhdlreader = sub { + my ($hdl, $no_backpressure) = @_; + + my $clienthdl = $reqstate->{hdl}; + return if !$clienthdl; my $len = length($hdl->{rbuf}); my $data = substr($hdl->{rbuf}, 0, $len, ''); - #print "READ1 $len\n"; - $reqstate->{hdl}->push_write($data) if $reqstate->{hdl}; + $clienthdl->push_write($data); + + if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) { + apply_read_backpressure($hdl, $clienthdl); + } }; my $hdlreader = sub { - my ($hdl) = @_; + my ($hdl, $no_backpressure) = @_; + + my $proxyhdl = $reqstate->{proxyhdl}; + return if !$proxyhdl; my $len = length($hdl->{rbuf}); my $data = substr($hdl->{rbuf}, 0, $len, ''); - #print "READ0 $len\n"; - $reqstate->{proxyhdl}->push_write($data) if $reqstate->{proxyhdl}; + $proxyhdl->push_write($data); + + if (!$no_backpressure && length($proxyhdl->{wbuf}) > $wbuf_limit) { + apply_read_backpressure($hdl, $proxyhdl); + } }; my $proto = $reqstate->{proto} ? $reqstate->{proto}->{str} : 'HTTP/1.0'; @@ -1158,7 +1224,14 @@ sub handle_spice_proxy_request { $reqstate->{proxyhdl}->on_read($proxyhdlreader); $reqstate->{hdl}->on_read($hdlreader); - # todo: use stop_read/start_read if write buffer grows to much + # override the accept-time on_eof so a pause's buffered + # bytes are drained through $hdlreader before teardown. + $reqstate->{hdl}->on_eof(sub { + my ($hdl) = @_; + $self->handle_proxy_eof( + $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl}, + ); + }); # a response must be followed by an empty line my $res = "$proto 200 OK\015\012\015\012"; -- 2.47.3