all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 11/12] examples: unify h2 examples
Date: Tue, 12 Jan 2021 14:58:25 +0100	[thread overview]
Message-ID: <20210112135830.2798301-16-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20210112135830.2798301-1-f.gruenbichler@proxmox.com>

update them to the new tokio-openssl API and remove socket buffer size
setting - it was removed from the TcpStream API, and is now only
available via TcpSocket (which can in turn be converted to a
TcpListener), but this is not needed for this example.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 examples/h2client.rs   | 37 +++++++++++------------
 examples/h2s-client.rs | 67 +++++++++++++++++++-----------------------
 examples/h2s-server.rs | 42 +++++++++++++-------------
 examples/h2server.rs   | 56 +++++++++++++++++++----------------
 4 files changed, 100 insertions(+), 102 deletions(-)

diff --git a/examples/h2client.rs b/examples/h2client.rs
index 87a6e326..8551af87 100644
--- a/examples/h2client.rs
+++ b/examples/h2client.rs
@@ -2,7 +2,7 @@ use std::future::Future;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
-use anyhow::{Error};
+use anyhow::Error;
 use futures::future::TryFutureExt;
 use futures::stream::Stream;
 use tokio::net::TcpStream;
@@ -38,11 +38,11 @@ impl Future for Process {
                         this.body.flow_control().release_capacity(chunk.len())?;
                         this.bytes += chunk.len();
                         // println!("GOT FRAME {}", chunk.len());
-                    },
+                    }
                     Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
                     None => {
                         this.trailers = true;
-                    },
+                    }
                 }
             }
         }
@@ -52,7 +52,6 @@ impl Future for Process {
 fn send_request(
     mut client: h2::client::SendRequest<bytes::Bytes>,
 ) -> impl Future<Output = Result<usize, Error>> {
-
     println!("sending request");
 
     let request = http::Request::builder()
@@ -62,11 +61,11 @@ fn send_request(
 
     let (response, _stream) = client.send_request(request, true).unwrap();
 
-    response
-        .map_err(Error::from)
-        .and_then(|response| {
-            Process { body: response.into_body(), trailers: false, bytes: 0 }
-        })
+    response.map_err(Error::from).and_then(|response| Process {
+        body: response.into_body(),
+        trailers: false,
+        bytes: 0,
+    })
 }
 
 fn main() -> Result<(), Error> {
@@ -74,16 +73,15 @@ fn main() -> Result<(), Error> {
 }
 
 async fn run() -> Result<(), Error> {
-
     let start = std::time::SystemTime::now();
 
-    let conn = TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008)))
-        .await?;
+    let conn = TcpStream::connect(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?;
+    conn.set_nodelay(true).unwrap();
 
     let (client, h2) = h2::client::Builder::new()
-        .initial_connection_window_size(1024*1024*1024)
-        .initial_window_size(1024*1024*1024)
-        .max_frame_size(4*1024*1024)
+        .initial_connection_window_size(1024 * 1024 * 1024)
+        .initial_window_size(1024 * 1024 * 1024)
+        .max_frame_size(4 * 1024 * 1024)
         .handshake(conn)
         .await?;
 
@@ -99,10 +97,13 @@ async fn run() -> Result<(), Error> {
     }
 
     let elapsed = start.elapsed().unwrap();
-    let elapsed = (elapsed.as_secs() as f64) +
-        (elapsed.subsec_millis() as f64)/1000.0;
+    let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0;
 
-    println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0));
+    println!(
+        "Downloaded {} bytes, {} MB/s",
+        bytes,
+        (bytes as f64) / (elapsed * 1024.0 * 1024.0)
+    );
 
     Ok(())
 }
diff --git a/examples/h2s-client.rs b/examples/h2s-client.rs
index 0d6beaef..a3b0d22c 100644
--- a/examples/h2s-client.rs
+++ b/examples/h2s-client.rs
@@ -5,6 +5,7 @@ use std::task::{Context, Poll};
 use anyhow::{format_err, Error};
 use futures::future::TryFutureExt;
 use futures::stream::Stream;
+use tokio::net::TcpStream;
 
 // Simple H2 client to test H2 download speed using h2s-server.rs
 
@@ -37,11 +38,11 @@ impl Future for Process {
                         this.body.flow_control().release_capacity(chunk.len())?;
                         this.bytes += chunk.len();
                         // println!("GOT FRAME {}", chunk.len());
-                    },
+                    }
                     Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
                     None => {
                         this.trailers = true;
-                    },
+                    }
                 }
             }
         }
@@ -60,11 +61,11 @@ fn send_request(
 
     let (response, _stream) = client.send_request(request, true).unwrap();
 
-    response
-        .map_err(Error::from)
-        .and_then(|response| {
-            Process { body: response.into_body(), trailers: false, bytes: 0 }
-        })
+    response.map_err(Error::from).and_then(|response| Process {
+        body: response.into_body(),
+        trailers: false,
+        bytes: 0,
+    })
 }
 
 fn main() -> Result<(), Error> {
@@ -74,57 +75,51 @@ fn main() -> Result<(), Error> {
 async fn run() -> Result<(), Error> {
     let start = std::time::SystemTime::now();
 
-    let conn =
-        tokio::net::TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
-
+    let conn = TcpStream::connect(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?;
     conn.set_nodelay(true).unwrap();
-    conn.set_recv_buffer_size(1024*1024).unwrap();
 
     use openssl::ssl::{SslConnector, SslMethod};
 
     let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
     ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE);
-    let conn =
-        tokio_openssl::connect(
-            ssl_connector_builder.build().configure()?,
-            "localhost",
-            conn,
-        )
+    let ssl = ssl_connector_builder
+        .build()
+        .configure()?
+        .into_ssl("localhost")?;
+
+    let conn = tokio_openssl::SslStream::new(ssl, conn)?;
+    let mut conn = Box::pin(conn);
+    conn.as_mut()
+        .connect()
         .await
         .map_err(|err| format_err!("connect failed - {}", err))?;
 
     let (client, h2) = h2::client::Builder::new()
-        .initial_connection_window_size(1024*1024*1024)
-        .initial_window_size(1024*1024*1024)
-        .max_frame_size(4*1024*1024)
+        .initial_connection_window_size(1024 * 1024 * 1024)
+        .initial_window_size(1024 * 1024 * 1024)
+        .max_frame_size(4 * 1024 * 1024)
         .handshake(conn)
         .await?;
 
-    // Spawn a task to run the conn...
     tokio::spawn(async move {
-        if let Err(e) = h2.await {
-            println!("GOT ERR={:?}", e);
+        if let Err(err) = h2.await {
+            println!("GOT ERR={:?}", err);
         }
     });
 
     let mut bytes = 0;
-    for _ in 0..100 {
-        match send_request(client.clone()).await {
-            Ok(b) => {
-                bytes += b;
-            }
-            Err(e) => {
-                println!("ERROR {}", e);
-                return Ok(());
-            }
-        }
+    for _ in 0..2000 {
+        bytes += send_request(client.clone()).await?;
     }
 
     let elapsed = start.elapsed().unwrap();
-    let elapsed = (elapsed.as_secs() as f64) +
-        (elapsed.subsec_millis() as f64)/1000.0;
+    let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0;
 
-    println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0));
+    println!(
+        "Downloaded {} bytes, {} MB/s",
+        bytes,
+        (bytes as f64) / (elapsed * 1024.0 * 1024.0)
+    );
 
     Ok(())
 }
diff --git a/examples/h2s-server.rs b/examples/h2s-server.rs
index 8481b8ad..4357fe45 100644
--- a/examples/h2s-server.rs
+++ b/examples/h2s-server.rs
@@ -2,14 +2,12 @@ use std::sync::Arc;
 
 use anyhow::{format_err, Error};
 use futures::*;
-use hyper::{Request, Response, Body};
-use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
+use hyper::{Body, Request, Response};
+use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
 use tokio::net::{TcpListener, TcpStream};
 
 use proxmox_backup::configdir;
 
-// Simple H2 server to test H2 speed with h2s-client.rs
-
 fn main() -> Result<(), Error> {
     proxmox_backup::tools::runtime::main(run())
 }
@@ -19,38 +17,38 @@ async fn run() -> Result<(), Error> {
     let cert_path = configdir!("/proxy.pem");
 
     let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
-    acceptor.set_private_key_file(key_path, SslFiletype::PEM)
+    acceptor
+        .set_private_key_file(key_path, SslFiletype::PEM)
         .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?;
-    acceptor.set_certificate_chain_file(cert_path)
+    acceptor
+        .set_certificate_chain_file(cert_path)
         .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?;
     acceptor.check_private_key().unwrap();
 
     let acceptor = Arc::new(acceptor.build());
 
-    let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
+    let listener = TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?;
 
     println!("listening on {:?}", listener.local_addr());
 
     loop {
         let (socket, _addr) = listener.accept().await?;
-        tokio::spawn(handle_connection(socket, Arc::clone(&acceptor))
-            .map(|res| {
-                if let Err(err) = res {
-                    eprintln!("Error: {}", err);
-                }
-            }));
+        tokio::spawn(handle_connection(socket, Arc::clone(&acceptor)).map(|res| {
+            if let Err(err) = res {
+                eprintln!("Error: {}", err);
+            }
+        }));
     }
 }
 
-async fn handle_connection(
-    socket: TcpStream,
-    acceptor: Arc<SslAcceptor>,
-) -> Result<(), Error> {
+async fn handle_connection(socket: TcpStream, acceptor: Arc<SslAcceptor>) -> Result<(), Error> {
     socket.set_nodelay(true).unwrap();
-    socket.set_send_buffer_size(1024*1024).unwrap();
-    socket.set_recv_buffer_size(1024*1024).unwrap();
 
-    let socket = tokio_openssl::accept(acceptor.as_ref(), socket).await?;
+    let ssl = openssl::ssl::Ssl::new(acceptor.context())?;
+    let stream = tokio_openssl::SslStream::new(ssl, socket)?;
+    let mut stream = Box::pin(stream);
+
+    stream.as_mut().accept().await?;
 
     let mut http = hyper::server::conn::Http::new();
     http.http2_only(true);
@@ -61,7 +59,7 @@ async fn handle_connection(
 
     let service = hyper::service::service_fn(|_req: Request<Body>| {
         println!("Got request");
-        let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...]
+        let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...]
         let body = Body::from(buffer);
 
         let response = Response::builder()
@@ -72,7 +70,7 @@ async fn handle_connection(
         future::ok::<_, Error>(response)
     });
 
-    http.serve_connection(socket, service)
+    http.serve_connection(stream, service)
         .map_err(Error::from)
         .await?;
 
diff --git a/examples/h2server.rs b/examples/h2server.rs
index 1669347f..1b06557c 100644
--- a/examples/h2server.rs
+++ b/examples/h2server.rs
@@ -1,51 +1,55 @@
-use anyhow::{Error};
+use anyhow::Error;
 use futures::*;
+use hyper::{Body, Request, Response};
 
-// Simple H2 server to test H2 speed with h2client.rs
-
-use tokio::net::TcpListener;
-use tokio::io::{AsyncRead, AsyncWrite};
-
-use proxmox_backup::client::pipe_to_stream::PipeToSendStream;
+use tokio::net::{TcpListener, TcpStream};
 
 fn main() -> Result<(), Error> {
     proxmox_backup::tools::runtime::main(run())
 }
 
 async fn run() -> Result<(), Error> {
-    let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
+    let listener = TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?;
 
     println!("listening on {:?}", listener.local_addr());
 
     loop {
         let (socket, _addr) = listener.accept().await?;
-        tokio::spawn(handle_connection(socket)
-            .map(|res| {
-                if let Err(err) = res {
-                    eprintln!("Error: {}", err);
-                }
-            }));
+        tokio::spawn(handle_connection(socket).map(|res| {
+            if let Err(err) = res {
+                eprintln!("Error: {}", err);
+            }
+        }));
     }
 }
 
-async fn handle_connection<T: AsyncRead + AsyncWrite + Unpin>(socket: T) -> Result<(), Error> {
-    let mut conn = h2::server::handshake(socket).await?;
+async fn handle_connection(socket: TcpStream) -> Result<(), Error> {
+    socket.set_nodelay(true).unwrap();
 
-    println!("H2 connection bound");
+    let mut http = hyper::server::conn::Http::new();
+    http.http2_only(true);
+    // increase window size: todo - find optiomal size
+    let max_window_size = (1 << 31) - 2;
+    http.http2_initial_stream_window_size(max_window_size);
+    http.http2_initial_connection_window_size(max_window_size);
 
-    while let Some((request, mut respond)) = conn.try_next().await? {
-        println!("GOT request: {:?}", request);
+    let service = hyper::service::service_fn(|_req: Request<Body>| {
+        println!("Got request");
+        let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...]
+        let body = Body::from(buffer);
 
-        let response = http::Response::builder()
+        let response = Response::builder()
             .status(http::StatusCode::OK)
-            .body(())
+            .header(http::header::CONTENT_TYPE, "application/octet-stream")
+            .body(body)
             .unwrap();
+        future::ok::<_, Error>(response)
+    });
 
-        let send = respond.send_response(response, false).unwrap();
-        let data = vec![65u8; 1024*1024];
-        PipeToSendStream::new(bytes::Bytes::from(data), send).await?;
-        println!("DATA SENT");
-    }
+    http.serve_connection(socket, service)
+        .map_err(Error::from)
+        .await?;
 
+    println!("H2 connection CLOSE !");
     Ok(())
 }
-- 
2.20.1





  parent reply	other threads:[~2021-01-12 13:59 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-12 13:58 [pbs-devel] [PATCH-SERIES 0/20] update to tokio 1.0 and friends Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 1/4] Cargo.toml: update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 2/4] update to rustyline 7 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 3/4] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 4/4] tokio 1.0: drop TimeoutFutureExt Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 01/12] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 02/12] tokio 1.0: delay -> sleep Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 03/12] proxmox XXX: use tokio::time::timeout directly Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 04/12] tokio 1.0: AsyncRead/Seek with ReadBuf Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 05/12] tokio: adapt to 1.0 runtime changes Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 06/12] tokio: adapt to 1.0 process:Child changes Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 07/12] tokio 1.0: use ReceiverStream from tokio-stream Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 08/12] tokio 1.0: update to new tokio-openssl interface Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 09/12] tokio 1.0: update to new Signal interface Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 10/12] hyper: use new hyper::upgrade Fabian Grünbichler
2021-01-12 13:58 ` Fabian Grünbichler [this message]
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 12/12] cleanup: remove unnecessary 'mut' and '.clone()' Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-fuse] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH pxar 1/3] " Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [RFC pxar 2/3] clippy: use matches! instead of match Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [RFC pxar 3/3] remove futures-io feature Fabian Grünbichler
2021-01-12 14:42   ` Wolfgang Bumiller
2021-01-12 14:52 ` [pbs-devel] [PATCH-SERIES 0/20] update to tokio 1.0 and friends Wolfgang Bumiller
2021-01-14 13:39   ` [pbs-devel] [PATCH proxmox 1/3] fix u2f example Fabian Grünbichler
2021-01-14 13:39     ` [pbs-devel] [PATCH proxmox-backup] proxmox XXX: adapt to moved ParameterSchema Fabian Grünbichler
2021-01-14 13:39     ` [pbs-devel] [PATCH proxmox 2/3] move ParameterSchema from router to schema Fabian Grünbichler
2021-01-14 13:39     ` [pbs-devel] [PATCH proxmox 3/3] build: add autopkgtest target Fabian Grünbichler
2021-01-14 13:41   ` [pbs-devel] [PATCH pxar 1/2] fix example Fabian Grünbichler
2021-01-14 13:41     ` [pbs-devel] [PATCH pxar 2/2] build: fix --no-default-features Fabian Grünbichler

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210112135830.2798301-16-f.gruenbichler@proxmox.com \
    --to=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal