all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers
@ 2026-06-17 12:29 Kefu Chai
  2026-06-17 12:29 ` [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai
  0 siblings, 1 reply; 2+ messages in thread
From: Kefu Chai @ 2026-06-17 12:29 UTC (permalink / raw)
  To: pve-devel

see v2's cover letter [1] for the problem description and the approach.

Changes since v3:

* fix a reference-cycle leak in apply_read_backpressure(): it now reads
  the resume callback off the source handle instead of taking it as an
  argument, so the proxy readers no longer pass themselves in. a reader
  that references its own variable is a self-referential closure, an
  uncollectable cycle that pinned $reqstate on every disconnect. this
  also lets response_stream() drop the manual `$on_read = undef` it kept
  only to break that same cycle.

* split the drain and the teardown in handle_proxy_eof() into separate
  eval blocks. the websocket reader dies on a malformed trailing frame
  left in rbuf, and v3's single eval let that die skip
  client_do_disconnect() and leak the connection.

* deduplicate the on_error handlers into a shared handle_proxy_error();
  the three proxy on_error bodies were copy-paste of each other.

checked locally with an out-of-tree synthetic AnyEvent setup (not part
of this series): reverting the eval split leaks a connection on a reader
die, and reverting the reference-cycle fix leaves $reqstate uncollected
after disconnect.

[1] https://lore.proxmox.com/pve-devel/20260413125650.2569621-1-k.chai@proxmox.com/

Kefu Chai (1):
  fix #7483: apiserver: add backpressure to proxy handlers

 src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++-----------
 1 file changed, 135 insertions(+), 62 deletions(-)

-- 
2.47.3





^ permalink raw reply	[flat|nested] 2+ messages in thread

* [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to proxy handlers
  2026-06-17 12:29 [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
@ 2026-06-17 12:29 ` Kefu Chai
  0 siblings, 0 replies; 2+ messages in thread
From: Kefu Chai @ 2026-06-17 12:29 UTC (permalink / raw)
  To: pve-devel

When pveproxy forwards a WebSocket or SPICE connection, data can arrive
faster than the backend takes it. With no flow control pveproxy buffers
the excess and can get OOM-killed. PDM cross-cluster migration to
LVM-thin triggers this easily: zeroing newly-allocated extents on first
write makes the receiving side fall behind the transfer rate. Any remote
migration, the VM/node consoles on the same proxy path, and SPICE
sessions carrying bulk data (USB passthrough) can hit it too.

The backend handle's wbuf_max doesn't help: AnyEvent only checks it in
the `if (!$self->{_ww})` guard in _drain_wbuf, so once the first EAGAIN
installs a write watcher, later push_write calls skip the check and wbuf
grows without bound. And if it did fire it raises ENOSPC and drops the
connection rather than slowing the reader.

Add a shared apply_read_backpressure() helper that pauses reads on the
source when the destination wbuf grows too large, and resumes them from
an on_drain callback. It reads the resume callback off the source handle
instead of taking it as an argument: a reader passed back to itself is a
self-referential closure, an uncollectable cycle that pinned $reqstate
on every disconnect. This also lets response_stream() drop the manual
`$on_read = undef` it kept only to break that cycle. Used from
response_stream() (replacing the inline block, no behaviour change),
websocket_proxy() (no backpressure before), and
handle_spice_proxy_request() (only the ineffective wbuf_max and a TODO).

In on_eof, drain what the pause left in rbuf through the reader before
tearing down, instead of dropping it. This is best-effort:
client_do_disconnect() half-closes the socket without flushing the
peer's wbuf, so only bytes push_write() already handed to the kernel are
sure to land. It still beats dropping the tail every time; a hard
guarantee would need block_disconnect on the client handle and teardown
via on_drain, left out here.

handle_proxy_eof() drains and tears down in separate eval blocks: the
websocket reader dies on a malformed trailing frame, and a shared eval
would let that skip client_do_disconnect() and leak the connection. The
teardown half is the log-and-disconnect the on_error handlers all
duplicated, now shared as handle_proxy_error().

Pausing reads also delays in-band control messages (WebSocket ping/pong,
SPICE keepalives) behind the queued data, but 640 KB drains in a few
milliseconds even on first-write LVM-thin, well under any timeout. Only
a fully stalled backend stretches the pause long enough to matter, and
then a single connection times out instead of pveproxy OOM-killing every
session on the node.

Also drop a stale TODO in response_stream() about using wbuf_max as the
threshold: per the above, it and the backpressure threshold serve
different purposes and stay separate.

Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
 src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++-----------
 1 file changed, 135 insertions(+), 62 deletions(-)

diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm
index 915d678..93f2c23 100644
--- a/src/PVE/APIServer/AnyEvent.pm
+++ b/src/PVE/APIServer/AnyEvent.pm
@@ -206,6 +206,58 @@ sub finish_response {
     }
 }
 
+# Pause reads on $read_hdl until $write_hdl's wbuf drains, then resume by
+# re-registering the on_read that was active at pause time. It is read off
+# the handle, not passed in, so a reader need not reference its own closure
+# (a self-referential closure is an uncollectable cycle that pins $reqstate).
+# Caller decides the threshold and must liveness-check in its on_read. No-op
+# if reads are already paused. Chains any existing on_drain on $write_hdl.
+sub apply_read_backpressure {
+    my ($read_hdl, $write_hdl) = @_;
+
+    my $on_read_cb = $read_hdl->{on_read};
+    return if !$on_read_cb;
+
+    $read_hdl->on_read();
+    my $prev_on_drain = $write_hdl->{on_drain};
+    $write_hdl->on_drain(sub {
+        my ($wrhdl) = @_;
+        # uninstall ourselves (restoring any prior handler) before resuming.
+        $wrhdl->on_drain($prev_on_drain);
+        $read_hdl->on_read($on_read_cb);
+        $prev_on_drain->($wrhdl) if $prev_on_drain;
+    });
+}
+
+# shared on_error body: log the aborted request and disconnect.
+sub handle_proxy_error {
+    my ($self, $reqstate, $message) = @_;
+    eval {
+        $self->log_aborted_request($reqstate, $message);
+        $self->client_do_disconnect($reqstate);
+    };
+    if (my $err = $@) { syslog('err', $err); }
+}
+
+# shared on_eof body: drain bytes the backpressure pause left in rbuf
+# through $reader, then disconnect. The loop stops on no progress, so it is
+# safe with any reader (peer gone, empty rbuf, or an unparsable partial
+# frame all leave rbuf unchanged).
+sub handle_proxy_eof {
+    my ($self, $reqstate, $hdl, $reader, $peer_hdl) = @_;
+    # drain and teardown in separate evals: a reader can die (e.g. on a
+    # malformed trailing frame), which must not skip the disconnect.
+    eval {
+        while ($peer_hdl && length($hdl->{rbuf})) {
+            my $before = length($hdl->{rbuf});
+            $reader->($hdl, 1);
+            last if length($hdl->{rbuf}) == $before;
+        }
+    };
+    if (my $err = $@) { syslog('err', $err); }
+    $self->handle_proxy_error($reqstate);
+}
+
 sub response_stream {
     my ($self, $reqstate, $stream_fh) = @_;
 
@@ -214,16 +266,13 @@ sub response_stream {
 
     my $buf_size = 4 * 1024 * 1024;
 
-    my $on_read;
-    $on_read = sub {
+    my $on_read = sub {
         my ($hdl) = @_;
         my $reqhdl = $reqstate->{hdl};
         return if !$reqhdl;
 
         my $wbuf_len = length($reqhdl->{wbuf});
         my $rbuf_len = length($hdl->{rbuf});
-        # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
-        # that's unbounded, so just assume $buf_size
         my $to_read = $buf_size - $wbuf_len;
         $to_read = $rbuf_len if $rbuf_len < $to_read;
         if ($to_read > 0) {
@@ -241,20 +290,9 @@ sub response_stream {
 
         # apply backpressure so we don't accept any more data into buffer if the client isn't
         # downloading fast enough. Note: read_size can double upon read, and we also need to account
-        # for one more read after# start_read, so multiply by 4
+        # for one more read after start_read, so multiply by 4
         if ($rbuf_len + $hdl->{read_size} * 4 > $buf_size) {
-            # stop reading until write buffer is empty
-            $hdl->on_read();
-            my $prev_on_drain = $reqhdl->{on_drain};
-            $reqhdl->on_drain(sub {
-                my ($wrhdl) = @_;
-                # on_drain called because write buffer is empty, continue reading
-                $hdl->on_read($on_read);
-                if ($prev_on_drain) {
-                    $wrhdl->on_drain($prev_on_drain);
-                    $prev_on_drain->($wrhdl);
-                }
-            });
+            apply_read_backpressure($hdl, $reqhdl);
         }
     };
 
@@ -276,16 +314,10 @@ sub response_stream {
                 }
             };
             if (my $err = $@) { syslog('err', "$err"); }
-            $on_read = undef;
         },
         on_error => sub {
             my ($hdl, $fatal, $message) = @_;
-            eval {
-                $self->log_aborted_request($reqstate, $message);
-                $self->client_do_disconnect($reqstate);
-            };
-            if (my $err = $@) { syslog('err', "$err"); }
-            $on_read = undef;
+            $self->handle_proxy_error($reqstate, $message);
         },
     );
 }
@@ -581,44 +613,49 @@ sub websocket_proxy {
 
             $self->dprint("CONNECTed to '$remhost:$remport'");
 
+            my $wbuf_limit = $max_payload_size * 5;
+
+            # forward-declared for the on_eof closure below.
+            my $proxyhdlreader;
+
             $reqstate->{proxyhdl} = AnyEvent::Handle->new(
                 fh => $fh,
                 rbuf_max => $max_payload_size,
-                wbuf_max => $max_payload_size * 5,
                 timeout => 5,
+                # drain frames buffered during a backpressure pause before
+                # tearing down, else the tail of a bulk transfer is dropped.
                 on_eof => sub {
                     my ($hdl) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', $err); }
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+                    );
                 },
                 on_error => sub {
                     my ($hdl, $fatal, $message) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate, $message);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', "$err"); }
+                    $self->handle_proxy_error($reqstate, $message);
                 },
             );
 
-            my $proxyhdlreader = sub {
-                my ($hdl) = @_;
+            $proxyhdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
+
+                my $clienthdl = $reqstate->{hdl};
+                return if !$clienthdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data =
                     substr($hdl->{rbuf}, 0, $len > $max_payload_size ? $max_payload_size : $len,
                         '');
 
-                my $string = $encode->(\$data);
+                $clienthdl->push_write($encode->(\$data));
 
-                $reqstate->{hdl}->push_write($string) if $reqstate->{hdl};
+                if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $clienthdl);
+                }
             };
 
             my $hdlreader = sub {
-                my ($hdl) = @_;
+                my ($hdl, $no_backpressure) = @_;
 
                 while (my $len = length($hdl->{rbuf})) {
                     return if $len < 2;
@@ -672,7 +709,17 @@ sub websocket_proxy {
                     }
 
                     if ($opcode == 1 || $opcode == 2) {
-                        $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
+                        my $proxyhdl = $reqstate->{proxyhdl};
+                        if ($proxyhdl) {
+                            $proxyhdl->push_write($payload);
+                            if (
+                                !$no_backpressure
+                                && length($proxyhdl->{wbuf}) > $wbuf_limit
+                            ) {
+                                apply_read_backpressure($hdl, $proxyhdl);
+                                return;
+                            }
+                        }
                     } elsif ($opcode == 8) {
                         my $statuscode = unpack("n", $payload);
                         $self->dprint("websocket received close. status code: '$statuscode'");
@@ -700,7 +747,14 @@ sub websocket_proxy {
             $reqstate->{proxyhdl}->on_read($proxyhdlreader);
             $reqstate->{hdl}->on_read($hdlreader);
 
-            # todo: use stop_read/start_read if write buffer grows to much
+            # override the accept-time on_eof so a pause's buffered
+            # frames are drained through $hdlreader before teardown.
+            $reqstate->{hdl}->on_eof(sub {
+                my ($hdl) = @_;
+                $self->handle_proxy_eof(
+                    $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+                );
+            });
 
             # FIXME: remove protocol in PVE/PMG 8.x
             #
@@ -1098,7 +1152,8 @@ sub handle_spice_proxy_request {
         }
 
         $reqstate->{hdl}->timeout(0);
-        $reqstate->{hdl}->wbuf_max(64 * 10 * 1024);
+
+        my $wbuf_limit = 64 * 10 * 1024;
 
         my $remhost = $remip ? $remip : "localhost";
         my $remport = $remip ? 3128 : $spiceport;
@@ -1108,47 +1163,58 @@ sub handle_spice_proxy_request {
                 or die "connect to '$remhost:$remport' failed: $!";
 
             $self->dprint("CONNECTed to '$remhost:$remport'");
+
+            # forward-declared for the on_eof closure below.
+            my $proxyhdlreader;
+
             $reqstate->{proxyhdl} = AnyEvent::Handle->new(
                 fh => $fh,
                 rbuf_max => 64 * 1024,
-                wbuf_max => 64 * 10 * 1024,
                 timeout => 5,
+                # drain bytes buffered during a backpressure pause before tearing
+                # down, else bulk data (e.g. USB passthrough) is dropped.
                 on_eof => sub {
                     my ($hdl) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', $err); }
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+                    );
                 },
                 on_error => sub {
                     my ($hdl, $fatal, $message) = @_;
-                    eval {
-                        $self->log_aborted_request($reqstate, $message);
-                        $self->client_do_disconnect($reqstate);
-                    };
-                    if (my $err = $@) { syslog('err', "$err"); }
+                    $self->handle_proxy_error($reqstate, $message);
                 },
             );
 
-            my $proxyhdlreader = sub {
-                my ($hdl) = @_;
+            $proxyhdlreader = sub {
+                my ($hdl, $no_backpressure) = @_;
+
+                my $clienthdl = $reqstate->{hdl};
+                return if !$clienthdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data = substr($hdl->{rbuf}, 0, $len, '');
 
-                #print "READ1 $len\n";
-                $reqstate->{hdl}->push_write($data) if $reqstate->{hdl};
+                $clienthdl->push_write($data);
+
+                if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $clienthdl);
+                }
             };
 
             my $hdlreader = sub {
-                my ($hdl) = @_;
+                my ($hdl, $no_backpressure) = @_;
+
+                my $proxyhdl = $reqstate->{proxyhdl};
+                return if !$proxyhdl;
 
                 my $len = length($hdl->{rbuf});
                 my $data = substr($hdl->{rbuf}, 0, $len, '');
 
-                #print "READ0 $len\n";
-                $reqstate->{proxyhdl}->push_write($data) if $reqstate->{proxyhdl};
+                $proxyhdl->push_write($data);
+
+                if (!$no_backpressure && length($proxyhdl->{wbuf}) > $wbuf_limit) {
+                    apply_read_backpressure($hdl, $proxyhdl);
+                }
             };
 
             my $proto = $reqstate->{proto} ? $reqstate->{proto}->{str} : 'HTTP/1.0';
@@ -1158,7 +1224,14 @@ sub handle_spice_proxy_request {
                 $reqstate->{proxyhdl}->on_read($proxyhdlreader);
                 $reqstate->{hdl}->on_read($hdlreader);
 
-                # todo: use stop_read/start_read if write buffer grows to much
+                # override the accept-time on_eof so a pause's buffered
+                # bytes are drained through $hdlreader before teardown.
+                $reqstate->{hdl}->on_eof(sub {
+                    my ($hdl) = @_;
+                    $self->handle_proxy_eof(
+                        $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+                    );
+                });
 
                 # a response must be followed by an empty line
                 my $res = "$proto 200 OK\015\012\015\012";
-- 
2.47.3





^ permalink raw reply related	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2026-06-17 12:30 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2026-06-17 12:29 [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
2026-06-17 12:29 ` [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal