From: Dietmar Maurer <dietmar@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 2/7] RateLimitedStream: implement poll_write_vectored
Date: Tue, 9 Nov 2021 07:52:40 +0100 [thread overview]
Message-ID: <20211109065253.980304-4-dietmar@proxmox.com> (raw)
In-Reply-To: <20211109065253.980304-1-dietmar@proxmox.com>
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
.../src/client/rate_limited_stream.rs | 92 +++++++++++++------
1 file changed, 62 insertions(+), 30 deletions(-)
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index a11b59e..0cc0ebb 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
use std::marker::Unpin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use std::io::IoSlice;
use futures::Future;
use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
impl <S> RateLimitedStream<S> {
- const MIN_DELAY: Duration = Duration::from_millis(20);
-
/// Creates a new instance with reads and writes limited to the same `rate`.
pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
}
}
+fn register_traffic(
+ limiter: &Mutex<RateLimiter>,
+ count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+ const MIN_DELAY: Duration = Duration::from_millis(10);
+
+ let now = Instant::now();
+ let delay = limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ Some(Box::pin(sleep))
+ } else {
+ None
+ }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+ match delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ }
+}
+
impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
) -> Poll<Result<usize, std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.write_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
- if let Some(ref write_limiter) = this.write_limiter {
- if let Poll::Ready(Ok(count)) = &result {
- let now = Instant::now();
- let delay = write_limiter.lock().unwrap()
- .register_traffic(now, *count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.write_delay = Some(Box::pin(sleep));
- }
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
+ }
+ }
+
+ result
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.stream.is_write_vectored()
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+ if let Some(ref limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = result {
+ this.write_delay = register_traffic(limiter, count);
}
}
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
) -> Poll<Result<(), std::io::Error>> {
let this = self.get_mut();
- let is_ready = match this.read_delay {
- Some(ref mut future) => {
- future.as_mut().poll(ctx).is_ready()
- }
- None => true,
- };
+ let is_ready = delay_is_ready(&mut this.read_delay, ctx);
if !is_ready { return Poll::Pending; }
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
if let Some(ref read_limiter) = this.read_limiter {
if let Poll::Ready(Ok(())) = &result {
let count = buf.filled().len() - filled_len;
- let now = Instant::now();
- let delay = read_limiter.lock().unwrap()
- .register_traffic(now, count as u64);
- if delay >= Self::MIN_DELAY {
- let sleep = tokio::time::sleep(delay);
- this.read_delay = Some(Box::pin(sleep));
- }
+ this.read_delay = register_traffic(read_limiter, count);
}
}
--
2.30.2
next prev parent reply other threads:[~2021-11-09 6:53 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-11-09 6:52 [pbs-devel] [PATCH proxmox/proxmox-backup] Rate Limiter Implementation Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 1/9] pbs-client: add option to use the new RateLimiter Dietmar Maurer
2021-11-09 6:52 ` Dietmar Maurer [this message]
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 2/9] proxmox-backup-client: add rate/burst parameter to backup CLI Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 3/7] HttpsConnector: use RateLimitedStream Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 3/9] implement Servive for RateLimitedStream Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 4/9] New DailyDuration type with nom parser Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 4/7] RateLimitedStream: allow periodic limiter updates Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 5/9] DailyDuration: implement time_match() Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 5/7] RateLimiter: avoid panic in time computations Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 6/9] Add traffic control configuration config with API Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 6/7] RateLimitedStream: implement peer_addr Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox 7/7] RateLimiter: add update_rate method Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 7/9] traffic_control: use Memcom to track. config versions Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups Dietmar Maurer
2021-11-09 6:52 ` [pbs-devel] [PATCH proxmox-backup 9/9] proxmox-backup-proxy: implement traffic control Dietmar Maurer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20211109065253.980304-4-dietmar@proxmox.com \
--to=dietmar@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.