* [PATCH http-server 1/1] fix #7483: apiserver: add backpressure to websocket proxy
2026-04-12 11:12 [PATCH http-server 0/1] fix pveproxy OOM during PDM cross-cluster migration to LVM-thin Kefu Chai
@ 2026-04-12 11:12 ` Kefu Chai
0 siblings, 0 replies; 2+ messages in thread
From: Kefu Chai @ 2026-04-12 11:12 UTC (permalink / raw)
To: pve-devel
During PDM cross-cluster migration to LVM-thin storage, pveproxy can
be OOM-killed on the destination host when disk writes are slower than
the incoming network transfer rate.
The existing wbuf_max on the backend handle turns out not to help:
AnyEvent::Handle only checks it inside the `if (!$self->{_ww})` guard
in _drain_wbuf. Once the first EAGAIN installs a write watcher, all
subsequent push_write calls return immediately without ever reaching
the check, so wbuf grows without bound.
Instead, follow the same approach response_stream() already takes:
pause reading from the source handle when the backend write buffer
exceeds the limit, and resume via an on_drain callback once it empties.
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=7483
---
src/PVE/APIServer/AnyEvent.pm | 45 +++++++++++++++++++++++++++++------
1 file changed, 38 insertions(+), 7 deletions(-)
diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm
index 915d678..5a4b449 100644
--- a/src/PVE/APIServer/AnyEvent.pm
+++ b/src/PVE/APIServer/AnyEvent.pm
@@ -581,10 +581,11 @@ sub websocket_proxy {
$self->dprint("CONNECTed to '$remhost:$remport'");
+ my $wbuf_limit = $max_payload_size * 5;
+
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
fh => $fh,
rbuf_max => $max_payload_size,
- wbuf_max => $max_payload_size * 5,
timeout => 5,
on_eof => sub {
my ($hdl) = @_;
@@ -604,7 +605,30 @@ sub websocket_proxy {
},
);
- my $proxyhdlreader = sub {
+ # Stop reading from $read_hdl until $write_hdl drains its write
+ # buffer, then re-register $on_read_cb. Returns true if
+ # backpressure was applied. We cannot rely on AnyEvent::Handle's
+ # wbuf_max for this because its check in _drain_wbuf is skipped
+ # when a write watcher is already active.
+ my $apply_backpressure = sub {
+ my ($read_hdl, $write_hdl, $on_read_cb, $alive_key) = @_;
+ return if length($write_hdl->{wbuf}) <= $wbuf_limit;
+
+ $read_hdl->on_read();
+ my $prev_on_drain = $write_hdl->{on_drain};
+ $write_hdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ $read_hdl->on_read($on_read_cb) if $reqstate->{$alive_key};
+ if ($prev_on_drain) {
+ $wrhdl->on_drain($prev_on_drain);
+ $prev_on_drain->($wrhdl);
+ }
+ });
+ return 1;
+ };
+
+ my $proxyhdlreader;
+ $proxyhdlreader = sub {
my ($hdl) = @_;
my $len = length($hdl->{rbuf});
@@ -614,10 +638,15 @@ sub websocket_proxy {
my $string = $encode->(\$data);
- $reqstate->{hdl}->push_write($string) if $reqstate->{hdl};
+ my $clienthdl = $reqstate->{hdl};
+ return if !$clienthdl;
+
+ $clienthdl->push_write($string);
+ $apply_backpressure->($hdl, $clienthdl, $proxyhdlreader, 'proxyhdl');
};
- my $hdlreader = sub {
+ my $hdlreader;
+ $hdlreader = sub {
my ($hdl) = @_;
while (my $len = length($hdl->{rbuf})) {
@@ -672,7 +701,11 @@ sub websocket_proxy {
}
if ($opcode == 1 || $opcode == 2) {
- $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
+ my $proxyhdl = $reqstate->{proxyhdl};
+ if ($proxyhdl) {
+ $proxyhdl->push_write($payload);
+ return if $apply_backpressure->($hdl, $proxyhdl, $hdlreader, 'hdl');
+ }
} elsif ($opcode == 8) {
my $statuscode = unpack("n", $payload);
$self->dprint("websocket received close. status code: '$statuscode'");
@@ -700,8 +733,6 @@ sub websocket_proxy {
$reqstate->{proxyhdl}->on_read($proxyhdlreader);
$reqstate->{hdl}->on_read($hdlreader);
- # todo: use stop_read/start_read if write buffer grows to much
-
# FIXME: remove protocol in PVE/PMG 8.x
#
# for backwards, compatibility, we have to reply with the websocket
--
2.47.3
^ permalink raw reply [flat|nested] 2+ messages in thread