public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [PATCH http-server 1/1] fix #7483: apiserver: add backpressure to websocket proxy
Date: Sun, 12 Apr 2026 19:12:09 +0800	[thread overview]
Message-ID: <20260412111209.3960421-2-k.chai@proxmox.com> (raw)
In-Reply-To: <20260412111209.3960421-1-k.chai@proxmox.com>

During PDM cross-cluster migration to LVM-thin storage, pveproxy can
be OOM-killed on the destination host when disk writes are slower than
the incoming network transfer rate.

The existing wbuf_max on the backend handle turns out not to help:
AnyEvent::Handle only checks it inside the `if (!$self->{_ww})` guard
in _drain_wbuf. Once the first EAGAIN installs a write watcher, all
subsequent push_write calls return immediately without ever reaching
the check, so wbuf grows without bound.

Instead, follow the same approach response_stream() already takes:
pause reading from the source handle when the backend write buffer
exceeds the limit, and resume via an on_drain callback once it empties.

Signed-off-by: Kefu Chai <k.chai@proxmox.com>
Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=7483
---
 src/PVE/APIServer/AnyEvent.pm | 45 +++++++++++++++++++++++++++++------
 1 file changed, 38 insertions(+), 7 deletions(-)

diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm
index 915d678..5a4b449 100644
--- a/src/PVE/APIServer/AnyEvent.pm
+++ b/src/PVE/APIServer/AnyEvent.pm
@@ -581,10 +581,11 @@ sub websocket_proxy {
 
             $self->dprint("CONNECTed to '$remhost:$remport'");
 
+            my $wbuf_limit = $max_payload_size * 5;
+
             $reqstate->{proxyhdl} = AnyEvent::Handle->new(
                 fh => $fh,
                 rbuf_max => $max_payload_size,
-                wbuf_max => $max_payload_size * 5,
                 timeout => 5,
                 on_eof => sub {
                     my ($hdl) = @_;
@@ -604,7 +605,30 @@ sub websocket_proxy {
                 },
             );
 
-            my $proxyhdlreader = sub {
+            # Stop reading from $read_hdl until $write_hdl drains its write
+            # buffer, then re-register $on_read_cb. Returns true if
+            # backpressure was applied. We cannot rely on AnyEvent::Handle's
+            # wbuf_max for this because its check in _drain_wbuf is skipped
+            # when a write watcher is already active.
+            my $apply_backpressure = sub {
+                my ($read_hdl, $write_hdl, $on_read_cb, $alive_key) = @_;
+                return if length($write_hdl->{wbuf}) <= $wbuf_limit;
+
+                $read_hdl->on_read();
+                my $prev_on_drain = $write_hdl->{on_drain};
+                $write_hdl->on_drain(sub {
+                    my ($wrhdl) = @_;
+                    $read_hdl->on_read($on_read_cb) if $reqstate->{$alive_key};
+                    if ($prev_on_drain) {
+                        $wrhdl->on_drain($prev_on_drain);
+                        $prev_on_drain->($wrhdl);
+                    }
+                });
+                return 1;
+            };
+
+            my $proxyhdlreader;
+            $proxyhdlreader = sub {
                 my ($hdl) = @_;
 
                 my $len = length($hdl->{rbuf});
@@ -614,10 +638,15 @@ sub websocket_proxy {
 
                 my $string = $encode->(\$data);
 
-                $reqstate->{hdl}->push_write($string) if $reqstate->{hdl};
+                my $clienthdl = $reqstate->{hdl};
+                return if !$clienthdl;
+
+                $clienthdl->push_write($string);
+                $apply_backpressure->($hdl, $clienthdl, $proxyhdlreader, 'proxyhdl');
             };
 
-            my $hdlreader = sub {
+            my $hdlreader;
+            $hdlreader = sub {
                 my ($hdl) = @_;
 
                 while (my $len = length($hdl->{rbuf})) {
@@ -672,7 +701,11 @@ sub websocket_proxy {
                     }
 
                     if ($opcode == 1 || $opcode == 2) {
-                        $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
+                        my $proxyhdl = $reqstate->{proxyhdl};
+                        if ($proxyhdl) {
+                            $proxyhdl->push_write($payload);
+                            return if $apply_backpressure->($hdl, $proxyhdl, $hdlreader, 'hdl');
+                        }
                     } elsif ($opcode == 8) {
                         my $statuscode = unpack("n", $payload);
                         $self->dprint("websocket received close. status code: '$statuscode'");
@@ -700,8 +733,6 @@ sub websocket_proxy {
             $reqstate->{proxyhdl}->on_read($proxyhdlreader);
             $reqstate->{hdl}->on_read($hdlreader);
 
-            # todo: use stop_read/start_read if write buffer grows to much
-
             # FIXME: remove protocol in PVE/PMG 8.x
             #
             # for backwards, compatibility,  we have to reply with the websocket
-- 
2.47.3





      reply	other threads:[~2026-04-12 11:12 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-12 11:12 [PATCH http-server 0/1] fix pveproxy OOM during PDM cross-cluster migration to LVM-thin Kefu Chai
2026-04-12 11:12 ` Kefu Chai [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260412111209.3960421-2-k.chai@proxmox.com \
    --to=k.chai@proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal