From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 8F90B1FF144 for ; Mon, 13 Apr 2026 14:56:22 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 517EA23F3D; Mon, 13 Apr 2026 14:57:09 +0200 (CEST) From: Kefu Chai To: pve-devel@lists.proxmox.com Subject: [PATCH v2 http-server] fix #7483: apiserver: add backpressure to proxy handlers Date: Mon, 13 Apr 2026 20:56:50 +0800 Message-ID: <20260413125650.2569621-2-k.chai@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260413125650.2569621-1-k.chai@proxmox.com> References: <20260413125650.2569621-1-k.chai@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776084949684 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.356 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: PFK66TEU3SLGC6P2IXI7ITXACWKUXABJ X-Message-ID-Hash: PFK66TEU3SLGC6P2IXI7ITXACWKUXABJ X-MailFrom: k.chai@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox VE development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: When pveproxy forwards a WebSocket or SPICE connection to a backend, incoming data can arrive faster than the backend consumes it. With no flow control, pveproxy buffers the excess in memory and can be OOM-killed. This is easy to trigger with PDM cross-cluster migration to LVM-thin storage because of LVM-thin's zero-on-alloc behaviour: writing to newly-allocated extents is much slower than steady-state, so the receiving side cannot keep up with the network transfer rate. In principle this affects any remote migration, the VM/node console sessions served through the same proxy path, and SPICE sessions that carry bulk data such as USB passthrough. The existing wbuf_max on the backend handle does not help. AnyEvent only checks it inside the `if (!$self->{_ww})` guard in _drain_wbuf, so once the first EAGAIN installs a write watcher, subsequent push_write calls return immediately without ever reaching the check and wbuf grows without bound. Even if the check did fire it would raise ENOSPC and tear the connection down rather than apply back pressure. Add a shared apply_read_backpressure() helper that pauses reads on the source handle when the destination wbuf exceeds a limit, and resumes via an on_drain callback once it empties. Use it from: * response_stream(), replacing the existing inline backpressure block (no behaviour change) * websocket_proxy(), which had no backpressure at all * handle_spice_proxy_request(), which had only an ineffective wbuf_max and a TODO comment for this exact problem In addition, fix on_eof in websocket_proxy() and handle_spice_proxy_request() to drain any data still in rbuf through the reader before tearing down. Without this the tail of a bulk transfer could be silently dropped if the backend closes while reads are paused. A side effect of pausing reads is that any in-band control messages multiplexed on the same channel are also delayed. For WebSocket this means ping/pong frames in the paused direction sit behind the queued bulk data; for SPICE it means whatever protocol-level keepalives the client uses. In normal operation this is imperceptible, since 640 KB drains in single-digit milliseconds even on first-time LVM-thin allocations, well within any realistic ping timeout. Only if the backend stalls completely does the pause last long enough for the client to give up, and in that case a single connection times out gracefully, which is strictly better than the previous behaviour of OOM-killing pveproxy and taking down every session on the node. Also drop a stale TODO in response_stream() that asked whether $reqhdl->{wbuf_max} should be used as the backpressure threshold; the wbuf_max investigation above answers it: no, wbuf_max and the backpressure threshold serve different purposes and should stay independent. Signed-off-by: Kefu Chai --- src/PVE/APIServer/AnyEvent.pm | 148 ++++++++++++++++++++++++++-------- 1 file changed, 113 insertions(+), 35 deletions(-) diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm index 915d678..666c3b0 100644 --- a/src/PVE/APIServer/AnyEvent.pm +++ b/src/PVE/APIServer/AnyEvent.pm @@ -206,6 +206,26 @@ sub finish_response { } } +# pause reads on $read_hdl until $write_hdl's write buffer drains, then +# re-register $on_read_cb via an on_drain callback. The caller decides +# when to apply backpressure based on its own threshold and must +# liveness-check at the top of $on_read_cb. Chains with any existing +# on_drain handler on $write_hdl. +sub apply_read_backpressure { + my ($read_hdl, $write_hdl, $on_read_cb) = @_; + + $read_hdl->on_read(); + my $prev_on_drain = $write_hdl->{on_drain}; + $write_hdl->on_drain(sub { + my ($wrhdl) = @_; + $read_hdl->on_read($on_read_cb); + if ($prev_on_drain) { + $wrhdl->on_drain($prev_on_drain); + $prev_on_drain->($wrhdl); + } + }); +} + sub response_stream { my ($self, $reqstate, $stream_fh) = @_; @@ -222,8 +242,6 @@ sub response_stream { my $wbuf_len = length($reqhdl->{wbuf}); my $rbuf_len = length($hdl->{rbuf}); - # TODO: Take into account $reqhdl->{wbuf_max} ? Right now - # that's unbounded, so just assume $buf_size my $to_read = $buf_size - $wbuf_len; $to_read = $rbuf_len if $rbuf_len < $to_read; if ($to_read > 0) { @@ -241,20 +259,9 @@ sub response_stream { # apply backpressure so we don't accept any more data into buffer if the client isn't # downloading fast enough. Note: read_size can double upon read, and we also need to account - # for one more read after# start_read, so multiply by 4 + # for one more read after start_read, so multiply by 4 if ($rbuf_len + $hdl->{read_size} * 4 > $buf_size) { - # stop reading until write buffer is empty - $hdl->on_read(); - my $prev_on_drain = $reqhdl->{on_drain}; - $reqhdl->on_drain(sub { - my ($wrhdl) = @_; - # on_drain called because write buffer is empty, continue reading - $hdl->on_read($on_read); - if ($prev_on_drain) { - $wrhdl->on_drain($prev_on_drain); - $prev_on_drain->($wrhdl); - } - }); + apply_read_backpressure($hdl, $reqhdl, $on_read); } }; @@ -581,14 +588,25 @@ sub websocket_proxy { $self->dprint("CONNECTed to '$remhost:$remport'"); + my $wbuf_limit = $max_payload_size * 5; + + # forward-declare so the on_eof closures below can reference + # the reader callbacks that are defined further down. + my $proxyhdlreader; + my $hdlreader; + $reqstate->{proxyhdl} = AnyEvent::Handle->new( fh => $fh, rbuf_max => $max_payload_size, - wbuf_max => $max_payload_size * 5, timeout => 5, on_eof => sub { my ($hdl) = @_; eval { + # flush any frames buffered by the backpressure + # pause before tearing down; without this, the + # tail of a bulk transfer can be silently dropped + # if the backend closes while reads are paused. + $proxyhdlreader->($hdl, 1) while length($hdl->{rbuf}); $self->log_aborted_request($reqstate); $self->client_do_disconnect($reqstate); }; @@ -604,21 +622,26 @@ sub websocket_proxy { }, ); - my $proxyhdlreader = sub { - my ($hdl) = @_; + $proxyhdlreader = sub { + my ($hdl, $no_backpressure) = @_; + + my $clienthdl = $reqstate->{hdl}; + return if !$clienthdl; my $len = length($hdl->{rbuf}); my $data = substr($hdl->{rbuf}, 0, $len > $max_payload_size ? $max_payload_size : $len, ''); - my $string = $encode->(\$data); + $clienthdl->push_write($encode->(\$data)); - $reqstate->{hdl}->push_write($string) if $reqstate->{hdl}; + if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) { + apply_read_backpressure($hdl, $clienthdl, $proxyhdlreader); + } }; - my $hdlreader = sub { - my ($hdl) = @_; + $hdlreader = sub { + my ($hdl, $no_backpressure) = @_; while (my $len = length($hdl->{rbuf})) { return if $len < 2; @@ -672,7 +695,17 @@ sub websocket_proxy { } if ($opcode == 1 || $opcode == 2) { - $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl}; + my $proxyhdl = $reqstate->{proxyhdl}; + if ($proxyhdl) { + $proxyhdl->push_write($payload); + if ( + !$no_backpressure + && length($proxyhdl->{wbuf}) > $wbuf_limit + ) { + apply_read_backpressure($hdl, $proxyhdl, $hdlreader); + return; + } + } } elsif ($opcode == 8) { my $statuscode = unpack("n", $payload); $self->dprint("websocket received close. status code: '$statuscode'"); @@ -700,7 +733,19 @@ sub websocket_proxy { $reqstate->{proxyhdl}->on_read($proxyhdlreader); $reqstate->{hdl}->on_read($hdlreader); - # todo: use stop_read/start_read if write buffer grows to much + # override the client handle's on_eof for the websocket + # session so frames buffered by the backpressure pause are + # drained before tearing down. The handle's default on_eof + # (set at accept time) does not know about $hdlreader. + $reqstate->{hdl}->on_eof(sub { + my ($hdl) = @_; + eval { + $hdlreader->($hdl, 1) if length($hdl->{rbuf}) >= 2; + $self->log_aborted_request($reqstate); + $self->client_do_disconnect($reqstate); + }; + if (my $err = $@) { syslog('err', $err); } + }); # FIXME: remove protocol in PVE/PMG 8.x # @@ -1098,7 +1143,8 @@ sub handle_spice_proxy_request { } $reqstate->{hdl}->timeout(0); - $reqstate->{hdl}->wbuf_max(64 * 10 * 1024); + + my $wbuf_limit = 64 * 10 * 1024; my $remhost = $remip ? $remip : "localhost"; my $remport = $remip ? 3128 : $spiceport; @@ -1108,14 +1154,22 @@ sub handle_spice_proxy_request { or die "connect to '$remhost:$remport' failed: $!"; $self->dprint("CONNECTed to '$remhost:$remport'"); + + # forward-declare so the on_eof closure below can reference + # the reader callback that is defined further down. + my $proxyhdlreader; + $reqstate->{proxyhdl} = AnyEvent::Handle->new( fh => $fh, rbuf_max => 64 * 1024, - wbuf_max => 64 * 10 * 1024, timeout => 5, on_eof => sub { my ($hdl) = @_; eval { + # flush anything buffered by the backpressure + # pause before tearing down, so bulk data (e.g., + # SPICE USB passthrough) is not silently dropped. + $proxyhdlreader->($hdl, 1) while length($hdl->{rbuf}); $self->log_aborted_request($reqstate); $self->client_do_disconnect($reqstate); }; @@ -1131,24 +1185,37 @@ sub handle_spice_proxy_request { }, ); - my $proxyhdlreader = sub { - my ($hdl) = @_; + $proxyhdlreader = sub { + my ($hdl, $no_backpressure) = @_; + + my $clienthdl = $reqstate->{hdl}; + return if !$clienthdl; my $len = length($hdl->{rbuf}); my $data = substr($hdl->{rbuf}, 0, $len, ''); - #print "READ1 $len\n"; - $reqstate->{hdl}->push_write($data) if $reqstate->{hdl}; + $clienthdl->push_write($data); + + if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) { + apply_read_backpressure($hdl, $clienthdl, $proxyhdlreader); + } }; - my $hdlreader = sub { - my ($hdl) = @_; + my $hdlreader; + $hdlreader = sub { + my ($hdl, $no_backpressure) = @_; + + my $proxyhdl = $reqstate->{proxyhdl}; + return if !$proxyhdl; my $len = length($hdl->{rbuf}); my $data = substr($hdl->{rbuf}, 0, $len, ''); - #print "READ0 $len\n"; - $reqstate->{proxyhdl}->push_write($data) if $reqstate->{proxyhdl}; + $proxyhdl->push_write($data); + + if (!$no_backpressure && length($proxyhdl->{wbuf}) > $wbuf_limit) { + apply_read_backpressure($hdl, $proxyhdl, $hdlreader); + } }; my $proto = $reqstate->{proto} ? $reqstate->{proto}->{str} : 'HTTP/1.0'; @@ -1158,7 +1225,18 @@ sub handle_spice_proxy_request { $reqstate->{proxyhdl}->on_read($proxyhdlreader); $reqstate->{hdl}->on_read($hdlreader); - # todo: use stop_read/start_read if write buffer grows to much + # override the client handle's on_eof for the spice + # session so bytes buffered by the backpressure pause + # are drained before tearing down. + $reqstate->{hdl}->on_eof(sub { + my ($hdl) = @_; + eval { + $hdlreader->($hdl, 1) while length($hdl->{rbuf}); + $self->log_aborted_request($reqstate); + $self->client_do_disconnect($reqstate); + }; + if (my $err = $@) { syslog('err', $err); } + }); # a response must be followed by an empty line my $res = "$proto 200 OK\015\012\015\012"; -- 2.47.3