all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dietmar Maurer <dietmar@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox 2/3] RateLimitedStream: implement poll_write_vectored
Date: Wed,  3 Nov 2021 13:42:46 +0100	[thread overview]
Message-ID: <20211103124247.1727711-2-dietmar@proxmox.com> (raw)
In-Reply-To: <20211103124247.1727711-1-dietmar@proxmox.com>

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
---
 .../src/client/rate_limited_stream.rs         | 92 +++++++++++++------
 1 file changed, 62 insertions(+), 30 deletions(-)

diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 434f923..8b4123f 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::pin::Pin;
 use std::marker::Unpin;
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
+use std::io::IoSlice;
 
 use futures::Future;
 use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
@@ -22,8 +23,6 @@ pub struct RateLimitedStream<S> {
 
 impl <S> RateLimitedStream<S> {
 
-    const MIN_DELAY: Duration = Duration::from_millis(20);
-
     /// Creates a new instance with reads and writes limited to the same `rate`.
     pub fn new(stream: S, rate: u64) -> Self {
         let now = Instant::now();
@@ -48,6 +47,33 @@ impl <S> RateLimitedStream<S> {
     }
 }
 
+fn register_traffic(
+    limiter: &Mutex<RateLimiter>,
+    count: usize,
+) -> Option<Pin<Box<Sleep>>>{
+
+    const MIN_DELAY: Duration = Duration::from_millis(10);
+
+    let now = Instant::now();
+    let delay = limiter.lock().unwrap()
+        .register_traffic(now, count as u64);
+    if delay >= MIN_DELAY {
+        let sleep = tokio::time::sleep(delay);
+        Some(Box::pin(sleep))
+    } else {
+        None
+    }
+}
+
+fn delay_is_ready(delay: &mut Option<Pin<Box<Sleep>>>, ctx: &mut Context<'_>) -> bool {
+    match delay {
+        Some(ref mut future) => {
+            future.as_mut().poll(ctx).is_ready()
+        }
+        None => true,
+    }
+}
+
 impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
     fn poll_write(
@@ -57,12 +83,7 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
     ) -> Poll<Result<usize, std::io::Error>> {
         let this = self.get_mut();
 
-        let is_ready = match this.write_delay {
-            Some(ref mut future) => {
-                future.as_mut().poll(ctx).is_ready()
-            }
-            None => true,
-        };
+        let is_ready = delay_is_ready(&mut this.write_delay, ctx);
 
         if !is_ready { return Poll::Pending; }
 
@@ -70,15 +91,37 @@ impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
 
         let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
 
-        if let Some(ref write_limiter) = this.write_limiter {
-            if let Poll::Ready(Ok(count)) = &result {
-                let now = Instant::now();
-                let delay = write_limiter.lock().unwrap()
-                    .register_traffic(now, *count as u64);
-                if delay >= Self::MIN_DELAY {
-                    let sleep = tokio::time::sleep(delay);
-                    this.write_delay = Some(Box::pin(sleep));
-                }
+        if let Some(ref limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = result {
+                this.write_delay = register_traffic(limiter, count);
+            }
+        }
+
+        result
+    }
+
+    fn is_write_vectored(&self) -> bool {
+        self.stream.is_write_vectored()
+    }
+
+    fn poll_write_vectored(
+        self: Pin<&mut Self>,
+        ctx: &mut Context<'_>,
+        bufs: &[IoSlice<'_>]
+    ) -> Poll<Result<usize, std::io::Error>> {
+        let this = self.get_mut();
+
+        let is_ready = delay_is_ready(&mut this.write_delay, ctx);
+
+        if !is_ready { return Poll::Pending; }
+
+        this.write_delay = None;
+
+        let result = Pin::new(&mut this.stream).poll_write_vectored(ctx, bufs);
+
+        if let Some(ref limiter) = this.write_limiter {
+            if let Poll::Ready(Ok(count)) = result {
+                this.write_delay = register_traffic(limiter, count);
             }
         }
 
@@ -111,12 +154,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
     ) -> Poll<Result<(), std::io::Error>> {
         let this = self.get_mut();
 
-        let is_ready = match this.read_delay {
-            Some(ref mut future) => {
-                future.as_mut().poll(ctx).is_ready()
-            }
-            None => true,
-        };
+        let is_ready = delay_is_ready(&mut this.read_delay, ctx);
 
         if !is_ready { return Poll::Pending; }
 
@@ -128,13 +166,7 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
         if let Some(ref read_limiter) = this.read_limiter {
             if let Poll::Ready(Ok(())) = &result {
                 let count = buf.filled().len() - filled_len;
-                let now = Instant::now();
-                let delay = read_limiter.lock().unwrap()
-                    .register_traffic(now, count as u64);
-                if delay >= Self::MIN_DELAY {
-                    let sleep = tokio::time::sleep(delay);
-                    this.read_delay = Some(Box::pin(sleep));
-                }
+                this.read_delay = register_traffic(read_limiter, count);
             }
         }
 
-- 
2.30.2





  reply	other threads:[~2021-11-03 12:42 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-03 12:42 [pbs-devel] [PATCH proxmox 1/3] Implement a rate limiting stream (AsyncRead, AsyncWrite) Dietmar Maurer
2021-11-03 12:42 ` Dietmar Maurer [this message]
2021-11-03 12:42 ` [pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream Dietmar Maurer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20211103124247.1727711-2-dietmar@proxmox.com \
    --to=dietmar@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal