* [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers
@ 2026-06-17 12:29 Kefu Chai
2026-06-17 12:29 ` [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai
0 siblings, 1 reply; 2+ messages in thread
From: Kefu Chai @ 2026-06-17 12:29 UTC (permalink / raw)
To: pve-devel
see v2's cover letter [1] for the problem description and the approach.
Changes since v3:
* fix a reference-cycle leak in apply_read_backpressure(): it now reads
the resume callback off the source handle instead of taking it as an
argument, so the proxy readers no longer pass themselves in. a reader
that references its own variable is a self-referential closure, an
uncollectable cycle that pinned $reqstate on every disconnect. this
also lets response_stream() drop the manual `$on_read = undef` it kept
only to break that same cycle.
* split the drain and the teardown in handle_proxy_eof() into separate
eval blocks. the websocket reader dies on a malformed trailing frame
left in rbuf, and v3's single eval let that die skip
client_do_disconnect() and leak the connection.
* deduplicate the on_error handlers into a shared handle_proxy_error();
the three proxy on_error bodies were copy-paste of each other.
checked locally with an out-of-tree synthetic AnyEvent setup (not part
of this series): reverting the eval split leaks a connection on a reader
die, and reverting the reference-cycle fix leaves $reqstate uncollected
after disconnect.
[1] https://lore.proxmox.com/pve-devel/20260413125650.2569621-1-k.chai@proxmox.com/
Kefu Chai (1):
fix #7483: apiserver: add backpressure to proxy handlers
src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++-----------
1 file changed, 135 insertions(+), 62 deletions(-)
--
2.47.3
^ permalink raw reply [flat|nested] 2+ messages in thread
* [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to proxy handlers
2026-06-17 12:29 [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
@ 2026-06-17 12:29 ` Kefu Chai
0 siblings, 0 replies; 2+ messages in thread
From: Kefu Chai @ 2026-06-17 12:29 UTC (permalink / raw)
To: pve-devel
When pveproxy forwards a WebSocket or SPICE connection, data can arrive
faster than the backend takes it. With no flow control pveproxy buffers
the excess and can get OOM-killed. PDM cross-cluster migration to
LVM-thin triggers this easily: zeroing newly-allocated extents on first
write makes the receiving side fall behind the transfer rate. Any remote
migration, the VM/node consoles on the same proxy path, and SPICE
sessions carrying bulk data (USB passthrough) can hit it too.
The backend handle's wbuf_max doesn't help: AnyEvent only checks it in
the `if (!$self->{_ww})` guard in _drain_wbuf, so once the first EAGAIN
installs a write watcher, later push_write calls skip the check and wbuf
grows without bound. And if it did fire it raises ENOSPC and drops the
connection rather than slowing the reader.
Add a shared apply_read_backpressure() helper that pauses reads on the
source when the destination wbuf grows too large, and resumes them from
an on_drain callback. It reads the resume callback off the source handle
instead of taking it as an argument: a reader passed back to itself is a
self-referential closure, an uncollectable cycle that pinned $reqstate
on every disconnect. This also lets response_stream() drop the manual
`$on_read = undef` it kept only to break that cycle. Used from
response_stream() (replacing the inline block, no behaviour change),
websocket_proxy() (no backpressure before), and
handle_spice_proxy_request() (only the ineffective wbuf_max and a TODO).
In on_eof, drain what the pause left in rbuf through the reader before
tearing down, instead of dropping it. This is best-effort:
client_do_disconnect() half-closes the socket without flushing the
peer's wbuf, so only bytes push_write() already handed to the kernel are
sure to land. It still beats dropping the tail every time; a hard
guarantee would need block_disconnect on the client handle and teardown
via on_drain, left out here.
handle_proxy_eof() drains and tears down in separate eval blocks: the
websocket reader dies on a malformed trailing frame, and a shared eval
would let that skip client_do_disconnect() and leak the connection. The
teardown half is the log-and-disconnect the on_error handlers all
duplicated, now shared as handle_proxy_error().
Pausing reads also delays in-band control messages (WebSocket ping/pong,
SPICE keepalives) behind the queued data, but 640 KB drains in a few
milliseconds even on first-write LVM-thin, well under any timeout. Only
a fully stalled backend stretches the pause long enough to matter, and
then a single connection times out instead of pveproxy OOM-killing every
session on the node.
Also drop a stale TODO in response_stream() about using wbuf_max as the
threshold: per the above, it and the backpressure threshold serve
different purposes and stay separate.
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++-----------
1 file changed, 135 insertions(+), 62 deletions(-)
diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm
index 915d678..93f2c23 100644
--- a/src/PVE/APIServer/AnyEvent.pm
+++ b/src/PVE/APIServer/AnyEvent.pm
@@ -206,6 +206,58 @@ sub finish_response {
}
}
+# Pause reads on $read_hdl until $write_hdl's wbuf drains, then resume by
+# re-registering the on_read that was active at pause time. It is read off
+# the handle, not passed in, so a reader need not reference its own closure
+# (a self-referential closure is an uncollectable cycle that pins $reqstate).
+# Caller decides the threshold and must liveness-check in its on_read. No-op
+# if reads are already paused. Chains any existing on_drain on $write_hdl.
+sub apply_read_backpressure {
+ my ($read_hdl, $write_hdl) = @_;
+
+ my $on_read_cb = $read_hdl->{on_read};
+ return if !$on_read_cb;
+
+ $read_hdl->on_read();
+ my $prev_on_drain = $write_hdl->{on_drain};
+ $write_hdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ # uninstall ourselves (restoring any prior handler) before resuming.
+ $wrhdl->on_drain($prev_on_drain);
+ $read_hdl->on_read($on_read_cb);
+ $prev_on_drain->($wrhdl) if $prev_on_drain;
+ });
+}
+
+# shared on_error body: log the aborted request and disconnect.
+sub handle_proxy_error {
+ my ($self, $reqstate, $message) = @_;
+ eval {
+ $self->log_aborted_request($reqstate, $message);
+ $self->client_do_disconnect($reqstate);
+ };
+ if (my $err = $@) { syslog('err', $err); }
+}
+
+# shared on_eof body: drain bytes the backpressure pause left in rbuf
+# through $reader, then disconnect. The loop stops on no progress, so it is
+# safe with any reader (peer gone, empty rbuf, or an unparsable partial
+# frame all leave rbuf unchanged).
+sub handle_proxy_eof {
+ my ($self, $reqstate, $hdl, $reader, $peer_hdl) = @_;
+ # drain and teardown in separate evals: a reader can die (e.g. on a
+ # malformed trailing frame), which must not skip the disconnect.
+ eval {
+ while ($peer_hdl && length($hdl->{rbuf})) {
+ my $before = length($hdl->{rbuf});
+ $reader->($hdl, 1);
+ last if length($hdl->{rbuf}) == $before;
+ }
+ };
+ if (my $err = $@) { syslog('err', $err); }
+ $self->handle_proxy_error($reqstate);
+}
+
sub response_stream {
my ($self, $reqstate, $stream_fh) = @_;
@@ -214,16 +266,13 @@ sub response_stream {
my $buf_size = 4 * 1024 * 1024;
- my $on_read;
- $on_read = sub {
+ my $on_read = sub {
my ($hdl) = @_;
my $reqhdl = $reqstate->{hdl};
return if !$reqhdl;
my $wbuf_len = length($reqhdl->{wbuf});
my $rbuf_len = length($hdl->{rbuf});
- # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
- # that's unbounded, so just assume $buf_size
my $to_read = $buf_size - $wbuf_len;
$to_read = $rbuf_len if $rbuf_len < $to_read;
if ($to_read > 0) {
@@ -241,20 +290,9 @@ sub response_stream {
# apply backpressure so we don't accept any more data into buffer if the client isn't
# downloading fast enough. Note: read_size can double upon read, and we also need to account
- # for one more read after# start_read, so multiply by 4
+ # for one more read after start_read, so multiply by 4
if ($rbuf_len + $hdl->{read_size} * 4 > $buf_size) {
- # stop reading until write buffer is empty
- $hdl->on_read();
- my $prev_on_drain = $reqhdl->{on_drain};
- $reqhdl->on_drain(sub {
- my ($wrhdl) = @_;
- # on_drain called because write buffer is empty, continue reading
- $hdl->on_read($on_read);
- if ($prev_on_drain) {
- $wrhdl->on_drain($prev_on_drain);
- $prev_on_drain->($wrhdl);
- }
- });
+ apply_read_backpressure($hdl, $reqhdl);
}
};
@@ -276,16 +314,10 @@ sub response_stream {
}
};
if (my $err = $@) { syslog('err', "$err"); }
- $on_read = undef;
},
on_error => sub {
my ($hdl, $fatal, $message) = @_;
- eval {
- $self->log_aborted_request($reqstate, $message);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', "$err"); }
- $on_read = undef;
+ $self->handle_proxy_error($reqstate, $message);
},
);
}
@@ -581,44 +613,49 @@ sub websocket_proxy {
$self->dprint("CONNECTed to '$remhost:$remport'");
+ my $wbuf_limit = $max_payload_size * 5;
+
+ # forward-declared for the on_eof closure below.
+ my $proxyhdlreader;
+
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
fh => $fh,
rbuf_max => $max_payload_size,
- wbuf_max => $max_payload_size * 5,
timeout => 5,
+ # drain frames buffered during a backpressure pause before
+ # tearing down, else the tail of a bulk transfer is dropped.
on_eof => sub {
my ($hdl) = @_;
- eval {
- $self->log_aborted_request($reqstate);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', $err); }
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+ );
},
on_error => sub {
my ($hdl, $fatal, $message) = @_;
- eval {
- $self->log_aborted_request($reqstate, $message);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', "$err"); }
+ $self->handle_proxy_error($reqstate, $message);
},
);
- my $proxyhdlreader = sub {
- my ($hdl) = @_;
+ $proxyhdlreader = sub {
+ my ($hdl, $no_backpressure) = @_;
+
+ my $clienthdl = $reqstate->{hdl};
+ return if !$clienthdl;
my $len = length($hdl->{rbuf});
my $data =
substr($hdl->{rbuf}, 0, $len > $max_payload_size ? $max_payload_size : $len,
'');
- my $string = $encode->(\$data);
+ $clienthdl->push_write($encode->(\$data));
- $reqstate->{hdl}->push_write($string) if $reqstate->{hdl};
+ if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+ apply_read_backpressure($hdl, $clienthdl);
+ }
};
my $hdlreader = sub {
- my ($hdl) = @_;
+ my ($hdl, $no_backpressure) = @_;
while (my $len = length($hdl->{rbuf})) {
return if $len < 2;
@@ -672,7 +709,17 @@ sub websocket_proxy {
}
if ($opcode == 1 || $opcode == 2) {
- $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
+ my $proxyhdl = $reqstate->{proxyhdl};
+ if ($proxyhdl) {
+ $proxyhdl->push_write($payload);
+ if (
+ !$no_backpressure
+ && length($proxyhdl->{wbuf}) > $wbuf_limit
+ ) {
+ apply_read_backpressure($hdl, $proxyhdl);
+ return;
+ }
+ }
} elsif ($opcode == 8) {
my $statuscode = unpack("n", $payload);
$self->dprint("websocket received close. status code: '$statuscode'");
@@ -700,7 +747,14 @@ sub websocket_proxy {
$reqstate->{proxyhdl}->on_read($proxyhdlreader);
$reqstate->{hdl}->on_read($hdlreader);
- # todo: use stop_read/start_read if write buffer grows to much
+ # override the accept-time on_eof so a pause's buffered
+ # frames are drained through $hdlreader before teardown.
+ $reqstate->{hdl}->on_eof(sub {
+ my ($hdl) = @_;
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+ );
+ });
# FIXME: remove protocol in PVE/PMG 8.x
#
@@ -1098,7 +1152,8 @@ sub handle_spice_proxy_request {
}
$reqstate->{hdl}->timeout(0);
- $reqstate->{hdl}->wbuf_max(64 * 10 * 1024);
+
+ my $wbuf_limit = 64 * 10 * 1024;
my $remhost = $remip ? $remip : "localhost";
my $remport = $remip ? 3128 : $spiceport;
@@ -1108,47 +1163,58 @@ sub handle_spice_proxy_request {
or die "connect to '$remhost:$remport' failed: $!";
$self->dprint("CONNECTed to '$remhost:$remport'");
+
+ # forward-declared for the on_eof closure below.
+ my $proxyhdlreader;
+
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
fh => $fh,
rbuf_max => 64 * 1024,
- wbuf_max => 64 * 10 * 1024,
timeout => 5,
+ # drain bytes buffered during a backpressure pause before tearing
+ # down, else bulk data (e.g. USB passthrough) is dropped.
on_eof => sub {
my ($hdl) = @_;
- eval {
- $self->log_aborted_request($reqstate);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', $err); }
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+ );
},
on_error => sub {
my ($hdl, $fatal, $message) = @_;
- eval {
- $self->log_aborted_request($reqstate, $message);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', "$err"); }
+ $self->handle_proxy_error($reqstate, $message);
},
);
- my $proxyhdlreader = sub {
- my ($hdl) = @_;
+ $proxyhdlreader = sub {
+ my ($hdl, $no_backpressure) = @_;
+
+ my $clienthdl = $reqstate->{hdl};
+ return if !$clienthdl;
my $len = length($hdl->{rbuf});
my $data = substr($hdl->{rbuf}, 0, $len, '');
- #print "READ1 $len\n";
- $reqstate->{hdl}->push_write($data) if $reqstate->{hdl};
+ $clienthdl->push_write($data);
+
+ if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+ apply_read_backpressure($hdl, $clienthdl);
+ }
};
my $hdlreader = sub {
- my ($hdl) = @_;
+ my ($hdl, $no_backpressure) = @_;
+
+ my $proxyhdl = $reqstate->{proxyhdl};
+ return if !$proxyhdl;
my $len = length($hdl->{rbuf});
my $data = substr($hdl->{rbuf}, 0, $len, '');
- #print "READ0 $len\n";
- $reqstate->{proxyhdl}->push_write($data) if $reqstate->{proxyhdl};
+ $proxyhdl->push_write($data);
+
+ if (!$no_backpressure && length($proxyhdl->{wbuf}) > $wbuf_limit) {
+ apply_read_backpressure($hdl, $proxyhdl);
+ }
};
my $proto = $reqstate->{proto} ? $reqstate->{proto}->{str} : 'HTTP/1.0';
@@ -1158,7 +1224,14 @@ sub handle_spice_proxy_request {
$reqstate->{proxyhdl}->on_read($proxyhdlreader);
$reqstate->{hdl}->on_read($hdlreader);
- # todo: use stop_read/start_read if write buffer grows to much
+ # override the accept-time on_eof so a pause's buffered
+ # bytes are drained through $hdlreader before teardown.
+ $reqstate->{hdl}->on_eof(sub {
+ my ($hdl) = @_;
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+ );
+ });
# a response must be followed by an empty line
my $res = "$proto 200 OK\015\012\015\012";
--
2.47.3
^ permalink raw reply related [flat|nested] 2+ messages in thread
end of thread, other threads:[~2026-06-17 12:30 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2026-06-17 12:29 [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
2026-06-17 12:29 ` [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to " Kefu Chai
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox