public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [PATCH v3 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers
@ 2026-04-24 12:11 Kefu Chai
  2026-04-24 12:11 ` [PATCH v3 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai
  0 siblings, 1 reply; 2+ messages in thread
From: Kefu Chai @ 2026-04-24 12:11 UTC (permalink / raw)
  To: pve-devel

see v2's cover letter [1] for the problem description and the approach.

Changes since v2:

* extract handle_proxy_eof(); the four on_eof sites were copy-paste of
  each other with only $reader and the peer handle differing.

* fix a busy-loop in the on_eof drain loop: v2's unguarded
  `while length($hdl->{rbuf})` spins when the reader's
  `return if !$peer` short-circuits without consuming rbuf. reachable
  on a ws client close that sets block_disconnect on the backend
  handle, so a final reply from the backend pins the worker at 100%
  CPU instead of completing teardown. the new loop bails on
  peer-gone or zero progress.

* clear on_drain in apply_read_backpressure() after firing instead of
  leaving the wrapper installed when prev_on_drain is undef. no
  functional impact (idempotent re-set of on_read) but stops pinning
  a reader reference for the rest of the connection.

both of the above are verified with the same synthetic AnyEvent setup
used for v1/v2. reverting just the busy-loop guard reproduces a spin
that trips a 2 s alarm; reverting just the on_drain clear leaves the
wrapper installed after the drain.

on the peer-gone branch the drain loop no-ops and rbuf is released on
handle teardown, same as the pre-v2 behavior (before this series added
on_eof draining, rbuf at on_eof was always discarded). I audited the
users:

* PDM migration's control tunnel (mtunnel) completes each command
  synchronously via write_tunnel, so its teardown carries no protocol
  data; disk data goes over separate NBD-over-ws tunnels set up by
  forward_unix_socket, and a connection drop there surfaces as a clean
  migration abort on the source side rather than silent corruption.
* NoVNC and SPICE display (plus termproxy shell output) lose at most a
  final frame or line, cosmetic.
* SPICE USB passthrough is the one case with potential real data loss,
  but that requires an abrupt ws client close mid-transfer, which is rare.

[1] https://lore.proxmox.com/pve-devel/20260413125650.2569621-1-k.chai@proxmox.com/

Kefu Chai (1):
  fix #7483: apiserver: add backpressure to proxy handlers

 src/PVE/APIServer/AnyEvent.pm | 178 +++++++++++++++++++++++++---------
 1 file changed, 133 insertions(+), 45 deletions(-)

-- 
2.47.3





^ permalink raw reply	[flat|nested] 2+ messages in thread

* [PATCH v3 http-server 1/1] fix #7483: apiserver: add backpressure to proxy handlers
  2026-04-24 12:11 [PATCH v3 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
@ 2026-04-24 12:11 ` Kefu Chai
  0 siblings, 0 replies; 2+ messages in thread
From: Kefu Chai @ 2026-04-24 12:11 UTC (permalink / raw)
  To: pve-devel

When pveproxy forwards a WebSocket or SPICE connection to a backend,
incoming data can arrive faster than the backend consumes it. With no
flow control, pveproxy buffers the excess in memory and can be
OOM-killed. This is easy to trigger with PDM cross-cluster migration
to LVM-thin storage because of LVM-thin's zero-on-alloc behaviour:
writing to newly-allocated extents is much slower than steady-state,
so the receiving side cannot keep up with the network transfer rate.
In principle this affects any remote migration, the VM/node console
sessions served through the same proxy path, and SPICE sessions that
carry bulk data such as USB passthrough.

The existing wbuf_max on the backend handle does not help. AnyEvent
only checks it inside the `if (!$self->{_ww})` guard in _drain_wbuf,
so once the first EAGAIN installs a write watcher, subsequent
push_write calls return immediately without ever reaching the check
and wbuf grows without bound. Even if the check did fire it would
raise ENOSPC and tear the connection down rather than apply back
pressure.

Add a shared apply_read_backpressure() helper that pauses reads on
the source handle when the destination wbuf exceeds a limit, and
resumes via an on_drain callback once it empties. Use it from:

* response_stream(), replacing the existing inline backpressure block
  (no behaviour change)
* websocket_proxy(), which had no backpressure at all
* handle_spice_proxy_request(), which had only an ineffective wbuf_max
  and a TODO comment for this exact problem

In addition, fix on_eof in websocket_proxy() and
handle_spice_proxy_request() to drain any data still in rbuf through
the reader before tearing down. Without this the tail of a bulk
transfer could be silently dropped if the backend closes while reads
are paused.

A side effect of pausing reads is that any in-band control messages
multiplexed on the same channel are also delayed. For WebSocket this
means ping/pong frames in the paused direction sit behind the queued
bulk data; for SPICE it means whatever protocol-level keepalives the
client uses. In normal operation this is imperceptible, since 640 KB
drains in single-digit milliseconds even on first-time LVM-thin
allocations, well within any realistic ping timeout. Only if the
backend stalls completely does the pause last long enough for the
client to give up, and in that case a single connection times out
gracefully, which is strictly better than the previous behaviour of
OOM-killing pveproxy and taking down every session on the node.

Also drop a stale TODO in response_stream() that asked whether
$reqhdl->{wbuf_max} should be used as the backpressure threshold;
the wbuf_max investigation above answers it: no, wbuf_max and the
backpressure threshold serve different purposes and should stay
independent.

Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
 src/PVE/APIServer/AnyEvent.pm | 178 +++++++++++++++++++++++++---------
 1 file changed, 133 insertions(+), 45 deletions(-)

diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm
index 915d678..2297085 100644
--- a/src/PVE/APIServer/AnyEvent.pm
+++ b/src/PVE/APIServer/AnyEvent.pm
@@ -206,6 +206,48 @@ sub finish_response {
     }
 }
 
+# pause reads on $read_hdl until $write_hdl's write buffer drains, then
+# re-register $on_read_cb via an on_drain callback. The caller decides
+# when to apply backpressure based on its own threshold and must
+# liveness-check at the top of $on_read_cb. Chains with any existing
+# on_drain handler on $write_hdl.
+sub apply_read_backpressure {
+    my ($read_hdl, $write_hdl, $on_read_cb) = @_;
+
+    $read_hdl->on_read();
+    my $prev_on_drain = $write_hdl->{on_drain};
+    $write_hdl->on_drain(sub {
+        my ($wrhdl) = @_;
+        # restore (or clear, when there was no prior handler) before
+        # resuming reads and invoking the previous handler, so we do
+        # not stay installed after backpressure has ended.
+        $wrhdl->on_drain($prev_on_drain);
+        $read_hdl->on_read($on_read_cb);
+        $prev_on_drain->($wrhdl) if $prev_on_drain;
+    });
+}
+
+# common on_eof body for a proxied connection: drain whatever bytes the
+# backpressure pause may have left in rbuf through $reader, then log
+# and disconnect. The drain loop exits when the peer has gone away
+# (reader would short-circuit), rbuf is empty, or the reader made no
+# progress on the last call (e.g., a partial WebSocket frame it cannot
+# parse yet). The progress check lets us call any reader safely without
+# caring about its specific minimum-bytes-to-consume threshold.
+sub handle_proxy_eof {
+    my ($self, $reqstate, $hdl, $reader, $peer_hdl) = @_;
+    eval {
+        while ($peer_hdl && length($hdl->{rbuf})) {
+            my $before = length($hdl->{rbuf});
+            $reader->($hdl, 1);
+            last if length($hdl->{rbuf}) == $before;
+        }
+        $self->log_aborted_request($reqstate);
+        $self->client_do_disconnect($reqstate);
+    };
+    if (my $err = $@) { syslog('err', $err); }
+}
+
 sub response_stream {
     my ($self, $reqstate, $stream_fh) = @_;
 
@@ -222,8 +264,6 @@ sub response_stream {
 
         my $wbuf_len = length($reqhdl->{wbuf});
         my $rbuf_len = length($hdl->{rbuf});
-        # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
-        # that's unbounded, so just assume $buf_size
         my $to_read = $buf_size - $wbuf_len;
         $to_read = $rbuf_len if $rbuf_len < $to_read;
         if ($to_read > 0) {
@@ -241,20 +281,9 @@ sub response_stream {
 
         # apply backpressure so we don't accept any more data into buffer if the client isn't
         # downloading fast enough. Note: read_size can double upon read, and we also need to account
-        # for one more read after# start_read, so multiply by 4
+        # for one more read after start_read, so multiply by 4
         if ($rbuf_len + $hdl->{read_size} * 4 > $buf_size) {
-            # stop reading until write buffer is empty
-            $hdl->on_read();
-            my $prev_on_drain = $reqhdl->{on_drain};
-            $reqhdl->on_drain(sub {
-                my ($wrhdl) = @_;
-                # on_drain called because write buffer is empty, continue reading
-                $hdl->on_read($on_read);
-                if ($prev_on_drain) {
-                    $wrhdl->on_drain($prev_on_drain);
-                    $prev_on_drain->($wrhdl);
-                }
-            });
+            apply_read_backpressure($hdl, $reqhdl, $on_read);
         }
     };
 
@@ -581,18 +610,26 @@ sub websocket_proxy {
 
             $self->dprint("CONNECTed to '$remhost:$remport'");
 
+            my $wbuf_limit = $max_payload_size * 5;
+
+            # forward-declare so the on_eof closures below can reference
+            # the reader callbacks that are defined further down.
+            my $proxyhdlreader;
+            my $hdlreader;
+
             $reqstate->{proxyhdl} = AnyEvent::Handle->new(
                 fh => $fh,
                 rbuf_max => $max_payload_size,
-                wbuf_max => $max_payload_size * 5,
                 timeout => 5,
+                # drain any frames buffered during the backpressure pause
+                # before tearing down; otherwise the tail of a bulk transfer
+                # can be silently dropped if the backend closes while reads
+                # are paused.
                 on_eof => sub {
                     my ($hdl) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', $err); }
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+                    );
                 },
                 on_error => sub {
                     my ($hdl, $fatal, $message) = @_;
@@ -604,21 +641,26 @@ sub websocket_proxy {
                 },
             );
 
-            my $proxyhdlreader = sub {
-                my ($hdl) = @_;
+            $proxyhdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
+
+                my $clienthdl = $reqstate->{hdl};
+                return if !$clienthdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data =
                     substr($hdl->{rbuf}, 0, $len > $max_payload_size ? $max_payload_size : $len,
                         '');
 
-                my $string = $encode->(\$data);
+                $clienthdl->push_write($encode->(\$data));
 
-                $reqstate->{hdl}->push_write($string) if $reqstate->{hdl};
+                if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $clienthdl, $proxyhdlreader);
+                }
             };
 
-            my $hdlreader = sub {
-                my ($hdl) = @_;
+            $hdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
 
                 while (my $len = length($hdl->{rbuf})) {
                     return if $len < 2;
@@ -672,7 +714,17 @@ sub websocket_proxy {
                     }
 
                     if ($opcode == 1 || $opcode == 2) {
-                        $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
+                        my $proxyhdl = $reqstate->{proxyhdl};
+                        if ($proxyhdl) {
+                            $proxyhdl->push_write($payload);
+                            if (
+                                !$no_backpressure
+                                && length($proxyhdl->{wbuf}) > $wbuf_limit
+                            ) {
+                                apply_read_backpressure($hdl, $proxyhdl, $hdlreader);
+                                return;
+                            }
+                        }
                     } elsif ($opcode == 8) {
                         my $statuscode = unpack("n", $payload);
                         $self->dprint("websocket received close. status code: '$statuscode'");
@@ -700,7 +752,16 @@ sub websocket_proxy {
             $reqstate->{proxyhdl}->on_read($proxyhdlreader);
             $reqstate->{hdl}->on_read($hdlreader);
 
-            # todo: use stop_read/start_read if write buffer grows to much
+            # override the client handle's on_eof for the websocket
+            # session so frames buffered by the backpressure pause are
+            # drained before tearing down. The handle's default on_eof
+            # (set at accept time) does not know about $hdlreader.
+            $reqstate->{hdl}->on_eof(sub {
+                my ($hdl) = @_;
+                $self->handle_proxy_eof(
+                    $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+                );
+            });
 
             # FIXME: remove protocol in PVE/PMG 8.x
             #
@@ -1098,7 +1159,8 @@ sub handle_spice_proxy_request {
         }
 
         $reqstate->{hdl}->timeout(0);
-        $reqstate->{hdl}->wbuf_max(64 * 10 * 1024);
+
+        my $wbuf_limit = 64 * 10 * 1024;
 
         my $remhost = $remip ? $remip : "localhost";
         my $remport = $remip ? 3128 : $spiceport;
@@ -1108,18 +1170,23 @@ sub handle_spice_proxy_request {
                 or die "connect to '$remhost:$remport' failed: $!";
 
             $self->dprint("CONNECTed to '$remhost:$remport'");
+
+            # forward-declare so the on_eof closure below can reference
+            # the reader callback that is defined further down.
+            my $proxyhdlreader;
+
             $reqstate->{proxyhdl} = AnyEvent::Handle->new(
                 fh => $fh,
                 rbuf_max => 64 * 1024,
-                wbuf_max => 64 * 10 * 1024,
                 timeout => 5,
+                # drain anything buffered during the backpressure pause
+                # before tearing down, so bulk data (e.g., SPICE USB
+                # passthrough) is not silently dropped.
                 on_eof => sub {
                     my ($hdl) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', $err); }
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+                    );
                 },
                 on_error => sub {
                     my ($hdl, $fatal, $message) = @_;
@@ -1131,24 +1198,37 @@ sub handle_spice_proxy_request {
                 },
             );
 
-            my $proxyhdlreader = sub {
-                my ($hdl) = @_;
+            $proxyhdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
+
+                my $clienthdl = $reqstate->{hdl};
+                return if !$clienthdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data = substr($hdl->{rbuf}, 0, $len, '');
 
-                #print "READ1 $len\n";
-                $reqstate->{hdl}->push_write($data) if $reqstate->{hdl};
+                $clienthdl->push_write($data);
+
+                if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $clienthdl, $proxyhdlreader);
+                }
             };
 
-            my $hdlreader = sub {
-                my ($hdl) = @_;
+            my $hdlreader;
+            $hdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
+
+                my $proxyhdl = $reqstate->{proxyhdl};
+                return if !$proxyhdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data = substr($hdl->{rbuf}, 0, $len, '');
 
-                #print "READ0 $len\n";
-                $reqstate->{proxyhdl}->push_write($data) if $reqstate->{proxyhdl};
+                $proxyhdl->push_write($data);
+
+                if (!$no_backpressure && length($proxyhdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $proxyhdl, $hdlreader);
+                }
             };
 
             my $proto = $reqstate->{proto} ? $reqstate->{proto}->{str} : 'HTTP/1.0';
@@ -1158,7 +1238,15 @@ sub handle_spice_proxy_request {
                 $reqstate->{proxyhdl}->on_read($proxyhdlreader);
                 $reqstate->{hdl}->on_read($hdlreader);
 
-                # todo: use stop_read/start_read if write buffer grows to much
+                # override the client handle's on_eof for the spice
+                # session so bytes buffered by the backpressure pause
+                # are drained before tearing down.
+                $reqstate->{hdl}->on_eof(sub {
+                    my ($hdl) = @_;
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+                    );
+                });
 
                 # a response must be followed by an empty line
                 my $res = "$proto 200 OK\015\012\015\012";
-- 
2.47.3





^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2026-04-24 12:19 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-04-24 12:11 [PATCH v3 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
2026-04-24 12:11 ` [PATCH v3 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal