public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers
@ 2026-06-17 12:29 Kefu Chai
  2026-06-17 12:29 ` [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai
  0 siblings, 1 reply; 2+ messages in thread
From: Kefu Chai @ 2026-06-17 12:29 UTC (permalink / raw)
  To: pve-devel

see v2's cover letter [1] for the problem description and the approach.

Changes since v3:

* fix a reference-cycle leak in apply_read_backpressure(): it now reads
  the resume callback off the source handle instead of taking it as an
  argument, so the proxy readers no longer pass themselves in. a reader
  that references its own variable is a self-referential closure, an
  uncollectable cycle that pinned $reqstate on every disconnect. this
  also lets response_stream() drop the manual `$on_read = undef` it kept
  only to break that same cycle.

* split the drain and the teardown in handle_proxy_eof() into separate
  eval blocks. the websocket reader dies on a malformed trailing frame
  left in rbuf, and v3's single eval let that die skip
  client_do_disconnect() and leak the connection.

* deduplicate the on_error handlers into a shared handle_proxy_error();
  the three proxy on_error bodies were copy-paste of each other.

checked locally with an out-of-tree synthetic AnyEvent setup (not part
of this series): reverting the eval split leaks a connection on a reader
die, and reverting the reference-cycle fix leaves $reqstate uncollected
after disconnect.

[1] https://lore.proxmox.com/pve-devel/20260413125650.2569621-1-k.chai@proxmox.com/

Kefu Chai (1):
  fix #7483: apiserver: add backpressure to proxy handlers

 src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++-----------
 1 file changed, 135 insertions(+), 62 deletions(-)

-- 
2.47.3





^ permalink raw reply	[flat|nested] 2+ messages in thread

* [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to proxy handlers
  2026-06-17 12:29 [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
@ 2026-06-17 12:29 ` Kefu Chai
  0 siblings, 0 replies; 2+ messages in thread
From: Kefu Chai @ 2026-06-17 12:29 UTC (permalink / raw)
  To: pve-devel

When pveproxy forwards a WebSocket or SPICE connection, data can arrive
faster than the backend takes it. With no flow control pveproxy buffers
the excess and can get OOM-killed. PDM cross-cluster migration to
LVM-thin triggers this easily: zeroing newly-allocated extents on first
write makes the receiving side fall behind the transfer rate. Any remote
migration, the VM/node consoles on the same proxy path, and SPICE
sessions carrying bulk data (USB passthrough) can hit it too.

The backend handle's wbuf_max doesn't help: AnyEvent only checks it in
the `if (!$self->{_ww})` guard in _drain_wbuf, so once the first EAGAIN
installs a write watcher, later push_write calls skip the check and wbuf
grows without bound. And if it did fire it raises ENOSPC and drops the
connection rather than slowing the reader.

Add a shared apply_read_backpressure() helper that pauses reads on the
source when the destination wbuf grows too large, and resumes them from
an on_drain callback. It reads the resume callback off the source handle
instead of taking it as an argument: a reader passed back to itself is a
self-referential closure, an uncollectable cycle that pinned $reqstate
on every disconnect. This also lets response_stream() drop the manual
`$on_read = undef` it kept only to break that cycle. Used from
response_stream() (replacing the inline block, no behaviour change),
websocket_proxy() (no backpressure before), and
handle_spice_proxy_request() (only the ineffective wbuf_max and a TODO).

In on_eof, drain what the pause left in rbuf through the reader before
tearing down, instead of dropping it. This is best-effort:
client_do_disconnect() half-closes the socket without flushing the
peer's wbuf, so only bytes push_write() already handed to the kernel are
sure to land. It still beats dropping the tail every time; a hard
guarantee would need block_disconnect on the client handle and teardown
via on_drain, left out here.

handle_proxy_eof() drains and tears down in separate eval blocks: the
websocket reader dies on a malformed trailing frame, and a shared eval
would let that skip client_do_disconnect() and leak the connection. The
teardown half is the log-and-disconnect the on_error handlers all
duplicated, now shared as handle_proxy_error().

Pausing reads also delays in-band control messages (WebSocket ping/pong,
SPICE keepalives) behind the queued data, but 640 KB drains in a few
milliseconds even on first-write LVM-thin, well under any timeout. Only
a fully stalled backend stretches the pause long enough to matter, and
then a single connection times out instead of pveproxy OOM-killing every
session on the node.

Also drop a stale TODO in response_stream() about using wbuf_max as the
threshold: per the above, it and the backpressure threshold serve
different purposes and stay separate.

Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
 src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++-----------
 1 file changed, 135 insertions(+), 62 deletions(-)

diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm
index 915d678..93f2c23 100644
--- a/src/PVE/APIServer/AnyEvent.pm
+++ b/src/PVE/APIServer/AnyEvent.pm
@@ -206,6 +206,58 @@ sub finish_response {
     }
 }
 
+# Pause reads on $read_hdl until $write_hdl's wbuf drains, then resume by
+# re-registering the on_read that was active at pause time. It is read off
+# the handle, not passed in, so a reader need not reference its own closure
+# (a self-referential closure is an uncollectable cycle that pins $reqstate).
+# Caller decides the threshold and must liveness-check in its on_read. No-op
+# if reads are already paused. Chains any existing on_drain on $write_hdl.
+sub apply_read_backpressure {
+    my ($read_hdl, $write_hdl) = @_;
+
+    my $on_read_cb = $read_hdl->{on_read};
+    return if !$on_read_cb;
+
+    $read_hdl->on_read();
+    my $prev_on_drain = $write_hdl->{on_drain};
+    $write_hdl->on_drain(sub {
+        my ($wrhdl) = @_;
+        # uninstall ourselves (restoring any prior handler) before resuming.
+        $wrhdl->on_drain($prev_on_drain);
+        $read_hdl->on_read($on_read_cb);
+        $prev_on_drain->($wrhdl) if $prev_on_drain;
+    });
+}
+
+# shared on_error body: log the aborted request and disconnect.
+sub handle_proxy_error {
+    my ($self, $reqstate, $message) = @_;
+    eval {
+        $self->log_aborted_request($reqstate, $message);
+        $self->client_do_disconnect($reqstate);
+    };
+    if (my $err = $@) { syslog('err', $err); }
+}
+
+# shared on_eof body: drain bytes the backpressure pause left in rbuf
+# through $reader, then disconnect. The loop stops on no progress, so it is
+# safe with any reader (peer gone, empty rbuf, or an unparsable partial
+# frame all leave rbuf unchanged).
+sub handle_proxy_eof {
+    my ($self, $reqstate, $hdl, $reader, $peer_hdl) = @_;
+    # drain and teardown in separate evals: a reader can die (e.g. on a
+    # malformed trailing frame), which must not skip the disconnect.
+    eval {
+        while ($peer_hdl && length($hdl->{rbuf})) {
+            my $before = length($hdl->{rbuf});
+            $reader->($hdl, 1);
+            last if length($hdl->{rbuf}) == $before;
+        }
+    };
+    if (my $err = $@) { syslog('err', $err); }
+    $self->handle_proxy_error($reqstate);
+}
+
 sub response_stream {
     my ($self, $reqstate, $stream_fh) = @_;
 
@@ -214,16 +266,13 @@ sub response_stream {
 
     my $buf_size = 4 * 1024 * 1024;
 
-    my $on_read;
-    $on_read = sub {
+    my $on_read = sub {
         my ($hdl) = @_;
         my $reqhdl = $reqstate->{hdl};
         return if !$reqhdl;
 
         my $wbuf_len = length($reqhdl->{wbuf});
         my $rbuf_len = length($hdl->{rbuf});
-        # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
-        # that's unbounded, so just assume $buf_size
         my $to_read = $buf_size - $wbuf_len;
         $to_read = $rbuf_len if $rbuf_len < $to_read;
         if ($to_read > 0) {
@@ -241,20 +290,9 @@ sub response_stream {
 
         # apply backpressure so we don't accept any more data into buffer if the client isn't
         # downloading fast enough. Note: read_size can double upon read, and we also need to account
-        # for one more read after# start_read, so multiply by 4
+        # for one more read after start_read, so multiply by 4
         if ($rbuf_len + $hdl->{read_size} * 4 > $buf_size) {
-            # stop reading until write buffer is empty
-            $hdl->on_read();
-            my $prev_on_drain = $reqhdl->{on_drain};
-            $reqhdl->on_drain(sub {
-                my ($wrhdl) = @_;
-                # on_drain called because write buffer is empty, continue reading
-                $hdl->on_read($on_read);
-                if ($prev_on_drain) {
-                    $wrhdl->on_drain($prev_on_drain);
-                    $prev_on_drain->($wrhdl);
-                }
-            });
+            apply_read_backpressure($hdl, $reqhdl);
         }
     };
 
@@ -276,16 +314,10 @@ sub response_stream {
                 }
             };
             if (my $err = $@) { syslog('err', "$err"); }
-            $on_read = undef;
         },
         on_error => sub {
             my ($hdl, $fatal, $message) = @_;
-            eval {
-                $self->log_aborted_request($reqstate, $message);
-                $self->client_do_disconnect($reqstate);
-            };
-            if (my $err = $@) { syslog('err', "$err"); }
-            $on_read = undef;
+            $self->handle_proxy_error($reqstate, $message);
         },
     );
 }
@@ -581,44 +613,49 @@ sub websocket_proxy {
 
             $self->dprint("CONNECTed to '$remhost:$remport'");
 
+            my $wbuf_limit = $max_payload_size * 5;
+
+            # forward-declared for the on_eof closure below.
+            my $proxyhdlreader;
+
             $reqstate->{proxyhdl} = AnyEvent::Handle->new(
                 fh => $fh,
                 rbuf_max => $max_payload_size,
-                wbuf_max => $max_payload_size * 5,
                 timeout => 5,
+                # drain frames buffered during a backpressure pause before
+                # tearing down, else the tail of a bulk transfer is dropped.
                 on_eof => sub {
                     my ($hdl) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', $err); }
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+                    );
                 },
                 on_error => sub {
                     my ($hdl, $fatal, $message) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate, $message);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', "$err"); }
+                    $self->handle_proxy_error($reqstate, $message);
                 },
             );
 
-            my $proxyhdlreader = sub {
-                my ($hdl) = @_;
+            $proxyhdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
+
+                my $clienthdl = $reqstate->{hdl};
+                return if !$clienthdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data =
                     substr($hdl->{rbuf}, 0, $len > $max_payload_size ? $max_payload_size : $len,
                         '');
 
-                my $string = $encode->(\$data);
+                $clienthdl->push_write($encode->(\$data));
 
-                $reqstate->{hdl}->push_write($string) if $reqstate->{hdl};
+                if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $clienthdl);
+                }
             };
 
             my $hdlreader = sub {
-                my ($hdl) = @_;
+                my ($hdl, $no_backpressure) = @_;
 
                 while (my $len = length($hdl->{rbuf})) {
                     return if $len < 2;
@@ -672,7 +709,17 @@ sub websocket_proxy {
                     }
 
                     if ($opcode == 1 || $opcode == 2) {
-                        $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
+                        my $proxyhdl = $reqstate->{proxyhdl};
+                        if ($proxyhdl) {
+                            $proxyhdl->push_write($payload);
+                            if (
+                                !$no_backpressure
+                                && length($proxyhdl->{wbuf}) > $wbuf_limit
+                            ) {
+                                apply_read_backpressure($hdl, $proxyhdl);
+                                return;
+                            }
+                        }
                     } elsif ($opcode == 8) {
                         my $statuscode = unpack("n", $payload);
                         $self->dprint("websocket received close. status code: '$statuscode'");
@@ -700,7 +747,14 @@ sub websocket_proxy {
             $reqstate->{proxyhdl}->on_read($proxyhdlreader);
             $reqstate->{hdl}->on_read($hdlreader);
 
-            # todo: use stop_read/start_read if write buffer grows to much
+            # override the accept-time on_eof so a pause's buffered
+            # frames are drained through $hdlreader before teardown.
+            $reqstate->{hdl}->on_eof(sub {
+                my ($hdl) = @_;
+                $self->handle_proxy_eof(
+                    $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+                );
+            });
 
             # FIXME: remove protocol in PVE/PMG 8.x
             #
@@ -1098,7 +1152,8 @@ sub handle_spice_proxy_request {
         }
 
         $reqstate->{hdl}->timeout(0);
-        $reqstate->{hdl}->wbuf_max(64 * 10 * 1024);
+
+        my $wbuf_limit = 64 * 10 * 1024;
 
         my $remhost = $remip ? $remip : "localhost";
         my $remport = $remip ? 3128 : $spiceport;
@@ -1108,47 +1163,58 @@ sub handle_spice_proxy_request {
                 or die "connect to '$remhost:$remport' failed: $!";
 
             $self->dprint("CONNECTed to '$remhost:$remport'");
+
+            # forward-declared for the on_eof closure below.
+            my $proxyhdlreader;
+
             $reqstate->{proxyhdl} = AnyEvent::Handle->new(
                 fh => $fh,
                 rbuf_max => 64 * 1024,
-                wbuf_max => 64 * 10 * 1024,
                 timeout => 5,
+                # drain bytes buffered during a backpressure pause before tearing
+                # down, else bulk data (e.g. USB passthrough) is dropped.
                 on_eof => sub {
                     my ($hdl) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', $err); }
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+                    );
                 },
                 on_error => sub {
                     my ($hdl, $fatal, $message) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate, $message);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', "$err"); }
+                    $self->handle_proxy_error($reqstate, $message);
                 },
             );
 
-            my $proxyhdlreader = sub {
-                my ($hdl) = @_;
+            $proxyhdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
+
+                my $clienthdl = $reqstate->{hdl};
+                return if !$clienthdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data = substr($hdl->{rbuf}, 0, $len, '');
 
-                #print "READ1 $len\n";
-                $reqstate->{hdl}->push_write($data) if $reqstate->{hdl};
+                $clienthdl->push_write($data);
+
+                if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $clienthdl);
+                }
             };
 
             my $hdlreader = sub {
-                my ($hdl) = @_;
+                my ($hdl, $no_backpressure) = @_;
+
+                my $proxyhdl = $reqstate->{proxyhdl};
+                return if !$proxyhdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data = substr($hdl->{rbuf}, 0, $len, '');
 
-                #print "READ0 $len\n";
-                $reqstate->{proxyhdl}->push_write($data) if $reqstate->{proxyhdl};
+                $proxyhdl->push_write($data);
+
+                if (!$no_backpressure && length($proxyhdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $proxyhdl);
+                }
             };
 
             my $proto = $reqstate->{proto} ? $reqstate->{proto}->{str} : 'HTTP/1.0';
@@ -1158,7 +1224,14 @@ sub handle_spice_proxy_request {
                 $reqstate->{proxyhdl}->on_read($proxyhdlreader);
                 $reqstate->{hdl}->on_read($hdlreader);
 
-                # todo: use stop_read/start_read if write buffer grows to much
+                # override the accept-time on_eof so a pause's buffered
+                # bytes are drained through $hdlreader before teardown.
+                $reqstate->{hdl}->on_eof(sub {
+                    my ($hdl) = @_;
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+                    );
+                });
 
                 # a response must be followed by an empty line
                 my $res = "$proto 200 OK\015\012\015\012";
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2026-06-17 12:30 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2026-06-17 12:29 [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
2026-06-17 12:29 ` [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal