From: Kefu Chai <k.chai@proxmox.com>
To: pve-devel@lists.proxmox.com
Subject: [PATCH v4 http-server 1/1] fix #7483: apiserver: add backpressure to proxy handlers
Date: Wed, 17 Jun 2026 20:29:05 +0800 [thread overview]
Message-ID: <20260617122905.3822836-2-k.chai@proxmox.com> (raw)
In-Reply-To: <20260617122905.3822836-1-k.chai@proxmox.com>
When pveproxy forwards a WebSocket or SPICE connection, data can arrive
faster than the backend takes it. With no flow control pveproxy buffers
the excess and can get OOM-killed. PDM cross-cluster migration to
LVM-thin triggers this easily: zeroing newly-allocated extents on first
write makes the receiving side fall behind the transfer rate. Any remote
migration, the VM/node consoles on the same proxy path, and SPICE
sessions carrying bulk data (USB passthrough) can hit it too.
The backend handle's wbuf_max doesn't help: AnyEvent only checks it in
the `if (!$self->{_ww})` guard in _drain_wbuf, so once the first EAGAIN
installs a write watcher, later push_write calls skip the check and wbuf
grows without bound. And if it did fire it raises ENOSPC and drops the
connection rather than slowing the reader.
Add a shared apply_read_backpressure() helper that pauses reads on the
source when the destination wbuf grows too large, and resumes them from
an on_drain callback. It reads the resume callback off the source handle
instead of taking it as an argument: a reader passed back to itself is a
self-referential closure, an uncollectable cycle that pinned $reqstate
on every disconnect. This also lets response_stream() drop the manual
`$on_read = undef` it kept only to break that cycle. Used from
response_stream() (replacing the inline block, no behaviour change),
websocket_proxy() (no backpressure before), and
handle_spice_proxy_request() (only the ineffective wbuf_max and a TODO).
In on_eof, drain what the pause left in rbuf through the reader before
tearing down, instead of dropping it. This is best-effort:
client_do_disconnect() half-closes the socket without flushing the
peer's wbuf, so only bytes push_write() already handed to the kernel are
sure to land. It still beats dropping the tail every time; a hard
guarantee would need block_disconnect on the client handle and teardown
via on_drain, left out here.
handle_proxy_eof() drains and tears down in separate eval blocks: the
websocket reader dies on a malformed trailing frame, and a shared eval
would let that skip client_do_disconnect() and leak the connection. The
teardown half is the log-and-disconnect the on_error handlers all
duplicated, now shared as handle_proxy_error().
Pausing reads also delays in-band control messages (WebSocket ping/pong,
SPICE keepalives) behind the queued data, but 640 KB drains in a few
milliseconds even on first-write LVM-thin, well under any timeout. Only
a fully stalled backend stretches the pause long enough to matter, and
then a single connection times out instead of pveproxy OOM-killing every
session on the node.
Also drop a stale TODO in response_stream() about using wbuf_max as the
threshold: per the above, it and the backpressure threshold serve
different purposes and stay separate.
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
---
src/PVE/APIServer/AnyEvent.pm | 197 +++++++++++++++++++++++-----------
1 file changed, 135 insertions(+), 62 deletions(-)
diff --git a/src/PVE/APIServer/AnyEvent.pm b/src/PVE/APIServer/AnyEvent.pm
index 915d678..93f2c23 100644
--- a/src/PVE/APIServer/AnyEvent.pm
+++ b/src/PVE/APIServer/AnyEvent.pm
@@ -206,6 +206,58 @@ sub finish_response {
}
}
+# Pause reads on $read_hdl until $write_hdl's wbuf drains, then resume by
+# re-registering the on_read that was active at pause time. It is read off
+# the handle, not passed in, so a reader need not reference its own closure
+# (a self-referential closure is an uncollectable cycle that pins $reqstate).
+# Caller decides the threshold and must liveness-check in its on_read. No-op
+# if reads are already paused. Chains any existing on_drain on $write_hdl.
+sub apply_read_backpressure {
+ my ($read_hdl, $write_hdl) = @_;
+
+ my $on_read_cb = $read_hdl->{on_read};
+ return if !$on_read_cb;
+
+ $read_hdl->on_read();
+ my $prev_on_drain = $write_hdl->{on_drain};
+ $write_hdl->on_drain(sub {
+ my ($wrhdl) = @_;
+ # uninstall ourselves (restoring any prior handler) before resuming.
+ $wrhdl->on_drain($prev_on_drain);
+ $read_hdl->on_read($on_read_cb);
+ $prev_on_drain->($wrhdl) if $prev_on_drain;
+ });
+}
+
+# shared on_error body: log the aborted request and disconnect.
+sub handle_proxy_error {
+ my ($self, $reqstate, $message) = @_;
+ eval {
+ $self->log_aborted_request($reqstate, $message);
+ $self->client_do_disconnect($reqstate);
+ };
+ if (my $err = $@) { syslog('err', $err); }
+}
+
+# shared on_eof body: drain bytes the backpressure pause left in rbuf
+# through $reader, then disconnect. The loop stops on no progress, so it is
+# safe with any reader (peer gone, empty rbuf, or an unparsable partial
+# frame all leave rbuf unchanged).
+sub handle_proxy_eof {
+ my ($self, $reqstate, $hdl, $reader, $peer_hdl) = @_;
+ # drain and teardown in separate evals: a reader can die (e.g. on a
+ # malformed trailing frame), which must not skip the disconnect.
+ eval {
+ while ($peer_hdl && length($hdl->{rbuf})) {
+ my $before = length($hdl->{rbuf});
+ $reader->($hdl, 1);
+ last if length($hdl->{rbuf}) == $before;
+ }
+ };
+ if (my $err = $@) { syslog('err', $err); }
+ $self->handle_proxy_error($reqstate);
+}
+
sub response_stream {
my ($self, $reqstate, $stream_fh) = @_;
@@ -214,16 +266,13 @@ sub response_stream {
my $buf_size = 4 * 1024 * 1024;
- my $on_read;
- $on_read = sub {
+ my $on_read = sub {
my ($hdl) = @_;
my $reqhdl = $reqstate->{hdl};
return if !$reqhdl;
my $wbuf_len = length($reqhdl->{wbuf});
my $rbuf_len = length($hdl->{rbuf});
- # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
- # that's unbounded, so just assume $buf_size
my $to_read = $buf_size - $wbuf_len;
$to_read = $rbuf_len if $rbuf_len < $to_read;
if ($to_read > 0) {
@@ -241,20 +290,9 @@ sub response_stream {
# apply backpressure so we don't accept any more data into buffer if the client isn't
# downloading fast enough. Note: read_size can double upon read, and we also need to account
- # for one more read after# start_read, so multiply by 4
+ # for one more read after start_read, so multiply by 4
if ($rbuf_len + $hdl->{read_size} * 4 > $buf_size) {
- # stop reading until write buffer is empty
- $hdl->on_read();
- my $prev_on_drain = $reqhdl->{on_drain};
- $reqhdl->on_drain(sub {
- my ($wrhdl) = @_;
- # on_drain called because write buffer is empty, continue reading
- $hdl->on_read($on_read);
- if ($prev_on_drain) {
- $wrhdl->on_drain($prev_on_drain);
- $prev_on_drain->($wrhdl);
- }
- });
+ apply_read_backpressure($hdl, $reqhdl);
}
};
@@ -276,16 +314,10 @@ sub response_stream {
}
};
if (my $err = $@) { syslog('err', "$err"); }
- $on_read = undef;
},
on_error => sub {
my ($hdl, $fatal, $message) = @_;
- eval {
- $self->log_aborted_request($reqstate, $message);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', "$err"); }
- $on_read = undef;
+ $self->handle_proxy_error($reqstate, $message);
},
);
}
@@ -581,44 +613,49 @@ sub websocket_proxy {
$self->dprint("CONNECTed to '$remhost:$remport'");
+ my $wbuf_limit = $max_payload_size * 5;
+
+ # forward-declared for the on_eof closure below.
+ my $proxyhdlreader;
+
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
fh => $fh,
rbuf_max => $max_payload_size,
- wbuf_max => $max_payload_size * 5,
timeout => 5,
+ # drain frames buffered during a backpressure pause before
+ # tearing down, else the tail of a bulk transfer is dropped.
on_eof => sub {
my ($hdl) = @_;
- eval {
- $self->log_aborted_request($reqstate);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', $err); }
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+ );
},
on_error => sub {
my ($hdl, $fatal, $message) = @_;
- eval {
- $self->log_aborted_request($reqstate, $message);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', "$err"); }
+ $self->handle_proxy_error($reqstate, $message);
},
);
- my $proxyhdlreader = sub {
- my ($hdl) = @_;
+ $proxyhdlreader = sub {
+ my ($hdl, $no_backpressure) = @_;
+
+ my $clienthdl = $reqstate->{hdl};
+ return if !$clienthdl;
my $len = length($hdl->{rbuf});
my $data =
substr($hdl->{rbuf}, 0, $len > $max_payload_size ? $max_payload_size : $len,
'');
- my $string = $encode->(\$data);
+ $clienthdl->push_write($encode->(\$data));
- $reqstate->{hdl}->push_write($string) if $reqstate->{hdl};
+ if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+ apply_read_backpressure($hdl, $clienthdl);
+ }
};
my $hdlreader = sub {
- my ($hdl) = @_;
+ my ($hdl, $no_backpressure) = @_;
while (my $len = length($hdl->{rbuf})) {
return if $len < 2;
@@ -672,7 +709,17 @@ sub websocket_proxy {
}
if ($opcode == 1 || $opcode == 2) {
- $reqstate->{proxyhdl}->push_write($payload) if $reqstate->{proxyhdl};
+ my $proxyhdl = $reqstate->{proxyhdl};
+ if ($proxyhdl) {
+ $proxyhdl->push_write($payload);
+ if (
+ !$no_backpressure
+ && length($proxyhdl->{wbuf}) > $wbuf_limit
+ ) {
+ apply_read_backpressure($hdl, $proxyhdl);
+ return;
+ }
+ }
} elsif ($opcode == 8) {
my $statuscode = unpack("n", $payload);
$self->dprint("websocket received close. status code: '$statuscode'");
@@ -700,7 +747,14 @@ sub websocket_proxy {
$reqstate->{proxyhdl}->on_read($proxyhdlreader);
$reqstate->{hdl}->on_read($hdlreader);
- # todo: use stop_read/start_read if write buffer grows to much
+ # override the accept-time on_eof so a pause's buffered
+ # frames are drained through $hdlreader before teardown.
+ $reqstate->{hdl}->on_eof(sub {
+ my ($hdl) = @_;
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+ );
+ });
# FIXME: remove protocol in PVE/PMG 8.x
#
@@ -1098,7 +1152,8 @@ sub handle_spice_proxy_request {
}
$reqstate->{hdl}->timeout(0);
- $reqstate->{hdl}->wbuf_max(64 * 10 * 1024);
+
+ my $wbuf_limit = 64 * 10 * 1024;
my $remhost = $remip ? $remip : "localhost";
my $remport = $remip ? 3128 : $spiceport;
@@ -1108,47 +1163,58 @@ sub handle_spice_proxy_request {
or die "connect to '$remhost:$remport' failed: $!";
$self->dprint("CONNECTed to '$remhost:$remport'");
+
+ # forward-declared for the on_eof closure below.
+ my $proxyhdlreader;
+
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
fh => $fh,
rbuf_max => 64 * 1024,
- wbuf_max => 64 * 10 * 1024,
timeout => 5,
+ # drain bytes buffered during a backpressure pause before tearing
+ # down, else bulk data (e.g. USB passthrough) is dropped.
on_eof => sub {
my ($hdl) = @_;
- eval {
- $self->log_aborted_request($reqstate);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', $err); }
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $proxyhdlreader, $reqstate->{hdl},
+ );
},
on_error => sub {
my ($hdl, $fatal, $message) = @_;
- eval {
- $self->log_aborted_request($reqstate, $message);
- $self->client_do_disconnect($reqstate);
- };
- if (my $err = $@) { syslog('err', "$err"); }
+ $self->handle_proxy_error($reqstate, $message);
},
);
- my $proxyhdlreader = sub {
- my ($hdl) = @_;
+ $proxyhdlreader = sub {
+ my ($hdl, $no_backpressure) = @_;
+
+ my $clienthdl = $reqstate->{hdl};
+ return if !$clienthdl;
my $len = length($hdl->{rbuf});
my $data = substr($hdl->{rbuf}, 0, $len, '');
- #print "READ1 $len\n";
- $reqstate->{hdl}->push_write($data) if $reqstate->{hdl};
+ $clienthdl->push_write($data);
+
+ if (!$no_backpressure && length($clienthdl->{wbuf}) > $wbuf_limit) {
+ apply_read_backpressure($hdl, $clienthdl);
+ }
};
my $hdlreader = sub {
- my ($hdl) = @_;
+ my ($hdl, $no_backpressure) = @_;
+
+ my $proxyhdl = $reqstate->{proxyhdl};
+ return if !$proxyhdl;
my $len = length($hdl->{rbuf});
my $data = substr($hdl->{rbuf}, 0, $len, '');
- #print "READ0 $len\n";
- $reqstate->{proxyhdl}->push_write($data) if $reqstate->{proxyhdl};
+ $proxyhdl->push_write($data);
+
+ if (!$no_backpressure && length($proxyhdl->{wbuf}) > $wbuf_limit) {
+ apply_read_backpressure($hdl, $proxyhdl);
+ }
};
my $proto = $reqstate->{proto} ? $reqstate->{proto}->{str} : 'HTTP/1.0';
@@ -1158,7 +1224,14 @@ sub handle_spice_proxy_request {
$reqstate->{proxyhdl}->on_read($proxyhdlreader);
$reqstate->{hdl}->on_read($hdlreader);
- # todo: use stop_read/start_read if write buffer grows to much
+ # override the accept-time on_eof so a pause's buffered
+ # bytes are drained through $hdlreader before teardown.
+ $reqstate->{hdl}->on_eof(sub {
+ my ($hdl) = @_;
+ $self->handle_proxy_eof(
+ $reqstate, $hdl, $hdlreader, $reqstate->{proxyhdl},
+ );
+ });
# a response must be followed by an empty line
my $res = "$proto 200 OK\015\012\015\012";
--
2.47.3
prev parent reply other threads:[~2026-06-17 12:30 UTC|newest]
Thread overview: 2+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-06-17 12:29 [PATCH v4 http-server 0/1] fix pveproxy OOM in websocket and spice proxy handlers Kefu Chai
2026-06-17 12:29 ` Kefu Chai [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260617122905.3822836-2-k.chai@proxmox.com \
--to=k.chai@proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.