From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 287BD6779D for ; Tue, 12 Jan 2021 14:59:18 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1F77C26FE3 for ; Tue, 12 Jan 2021 14:58:48 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 59E2226FC4 for ; Tue, 12 Jan 2021 14:58:46 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 26E2145F58 for ; Tue, 12 Jan 2021 14:58:46 +0100 (CET) From: =?UTF-8?q?Fabian=20Gr=C3=BCnbichler?= To: pbs-devel@lists.proxmox.com Date: Tue, 12 Jan 2021 14:58:25 +0100 Message-Id: <20210112135830.2798301-16-f.gruenbichler@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210112135830.2798301-1-f.gruenbichler@proxmox.com> References: <20210112135830.2798301-1-f.gruenbichler@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.026 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [h2server.rs, h2client.rs, h2s-server.rs, h2s-client.rs] Subject: [pbs-devel] [PATCH proxmox-backup 11/12] examples: unify h2 examples X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 12 Jan 2021 13:59:18 -0000 update them to the new tokio-openssl API and remove socket buffer size setting - it was removed from the TcpStream API, and is now only available via TcpSocket (which can in turn be converted to a TcpListener), but this is not needed for this example. Signed-off-by: Fabian Grünbichler --- examples/h2client.rs | 37 +++++++++++------------ examples/h2s-client.rs | 67 +++++++++++++++++++----------------------- examples/h2s-server.rs | 42 +++++++++++++------------- examples/h2server.rs | 56 +++++++++++++++++++---------------- 4 files changed, 100 insertions(+), 102 deletions(-) diff --git a/examples/h2client.rs b/examples/h2client.rs index 87a6e326..8551af87 100644 --- a/examples/h2client.rs +++ b/examples/h2client.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use anyhow::{Error}; +use anyhow::Error; use futures::future::TryFutureExt; use futures::stream::Stream; use tokio::net::TcpStream; @@ -38,11 +38,11 @@ impl Future for Process { this.body.flow_control().release_capacity(chunk.len())?; this.bytes += chunk.len(); // println!("GOT FRAME {}", chunk.len()); - }, + } Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), None => { this.trailers = true; - }, + } } } } @@ -52,7 +52,6 @@ impl Future for Process { fn send_request( mut client: h2::client::SendRequest, ) -> impl Future> { - println!("sending request"); let request = http::Request::builder() @@ -62,11 +61,11 @@ fn send_request( let (response, _stream) = client.send_request(request, true).unwrap(); - response - .map_err(Error::from) - .and_then(|response| { - Process { body: response.into_body(), trailers: false, bytes: 0 } - }) + response.map_err(Error::from).and_then(|response| Process { + body: response.into_body(), + trailers: false, + bytes: 0, + }) } fn main() -> Result<(), Error> { @@ -74,16 +73,15 @@ fn main() -> Result<(), Error> { } async fn run() -> Result<(), Error> { - let start = std::time::SystemTime::now(); - let conn = TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))) - .await?; + let conn = TcpStream::connect(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; + conn.set_nodelay(true).unwrap(); let (client, h2) = h2::client::Builder::new() - .initial_connection_window_size(1024*1024*1024) - .initial_window_size(1024*1024*1024) - .max_frame_size(4*1024*1024) + .initial_connection_window_size(1024 * 1024 * 1024) + .initial_window_size(1024 * 1024 * 1024) + .max_frame_size(4 * 1024 * 1024) .handshake(conn) .await?; @@ -99,10 +97,13 @@ async fn run() -> Result<(), Error> { } let elapsed = start.elapsed().unwrap(); - let elapsed = (elapsed.as_secs() as f64) + - (elapsed.subsec_millis() as f64)/1000.0; + let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0; - println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); + println!( + "Downloaded {} bytes, {} MB/s", + bytes, + (bytes as f64) / (elapsed * 1024.0 * 1024.0) + ); Ok(()) } diff --git a/examples/h2s-client.rs b/examples/h2s-client.rs index 0d6beaef..a3b0d22c 100644 --- a/examples/h2s-client.rs +++ b/examples/h2s-client.rs @@ -5,6 +5,7 @@ use std::task::{Context, Poll}; use anyhow::{format_err, Error}; use futures::future::TryFutureExt; use futures::stream::Stream; +use tokio::net::TcpStream; // Simple H2 client to test H2 download speed using h2s-server.rs @@ -37,11 +38,11 @@ impl Future for Process { this.body.flow_control().release_capacity(chunk.len())?; this.bytes += chunk.len(); // println!("GOT FRAME {}", chunk.len()); - }, + } Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), None => { this.trailers = true; - }, + } } } } @@ -60,11 +61,11 @@ fn send_request( let (response, _stream) = client.send_request(request, true).unwrap(); - response - .map_err(Error::from) - .and_then(|response| { - Process { body: response.into_body(), trailers: false, bytes: 0 } - }) + response.map_err(Error::from).and_then(|response| Process { + body: response.into_body(), + trailers: false, + bytes: 0, + }) } fn main() -> Result<(), Error> { @@ -74,57 +75,51 @@ fn main() -> Result<(), Error> { async fn run() -> Result<(), Error> { let start = std::time::SystemTime::now(); - let conn = - tokio::net::TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; - + let conn = TcpStream::connect(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; conn.set_nodelay(true).unwrap(); - conn.set_recv_buffer_size(1024*1024).unwrap(); use openssl::ssl::{SslConnector, SslMethod}; let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); - let conn = - tokio_openssl::connect( - ssl_connector_builder.build().configure()?, - "localhost", - conn, - ) + let ssl = ssl_connector_builder + .build() + .configure()? + .into_ssl("localhost")?; + + let conn = tokio_openssl::SslStream::new(ssl, conn)?; + let mut conn = Box::pin(conn); + conn.as_mut() + .connect() .await .map_err(|err| format_err!("connect failed - {}", err))?; let (client, h2) = h2::client::Builder::new() - .initial_connection_window_size(1024*1024*1024) - .initial_window_size(1024*1024*1024) - .max_frame_size(4*1024*1024) + .initial_connection_window_size(1024 * 1024 * 1024) + .initial_window_size(1024 * 1024 * 1024) + .max_frame_size(4 * 1024 * 1024) .handshake(conn) .await?; - // Spawn a task to run the conn... tokio::spawn(async move { - if let Err(e) = h2.await { - println!("GOT ERR={:?}", e); + if let Err(err) = h2.await { + println!("GOT ERR={:?}", err); } }); let mut bytes = 0; - for _ in 0..100 { - match send_request(client.clone()).await { - Ok(b) => { - bytes += b; - } - Err(e) => { - println!("ERROR {}", e); - return Ok(()); - } - } + for _ in 0..2000 { + bytes += send_request(client.clone()).await?; } let elapsed = start.elapsed().unwrap(); - let elapsed = (elapsed.as_secs() as f64) + - (elapsed.subsec_millis() as f64)/1000.0; + let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0; - println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); + println!( + "Downloaded {} bytes, {} MB/s", + bytes, + (bytes as f64) / (elapsed * 1024.0 * 1024.0) + ); Ok(()) } diff --git a/examples/h2s-server.rs b/examples/h2s-server.rs index 8481b8ad..4357fe45 100644 --- a/examples/h2s-server.rs +++ b/examples/h2s-server.rs @@ -2,14 +2,12 @@ use std::sync::Arc; use anyhow::{format_err, Error}; use futures::*; -use hyper::{Request, Response, Body}; -use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use hyper::{Body, Request, Response}; +use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use tokio::net::{TcpListener, TcpStream}; use proxmox_backup::configdir; -// Simple H2 server to test H2 speed with h2s-client.rs - fn main() -> Result<(), Error> { proxmox_backup::tools::runtime::main(run()) } @@ -19,38 +17,38 @@ async fn run() -> Result<(), Error> { let cert_path = configdir!("/proxy.pem"); let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); - acceptor.set_private_key_file(key_path, SslFiletype::PEM) + acceptor + .set_private_key_file(key_path, SslFiletype::PEM) .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?; - acceptor.set_certificate_chain_file(cert_path) + acceptor + .set_certificate_chain_file(cert_path) .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?; acceptor.check_private_key().unwrap(); let acceptor = Arc::new(acceptor.build()); - let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; + let listener = TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); loop { let (socket, _addr) = listener.accept().await?; - tokio::spawn(handle_connection(socket, Arc::clone(&acceptor)) - .map(|res| { - if let Err(err) = res { - eprintln!("Error: {}", err); - } - })); + tokio::spawn(handle_connection(socket, Arc::clone(&acceptor)).map(|res| { + if let Err(err) = res { + eprintln!("Error: {}", err); + } + })); } } -async fn handle_connection( - socket: TcpStream, - acceptor: Arc, -) -> Result<(), Error> { +async fn handle_connection(socket: TcpStream, acceptor: Arc) -> Result<(), Error> { socket.set_nodelay(true).unwrap(); - socket.set_send_buffer_size(1024*1024).unwrap(); - socket.set_recv_buffer_size(1024*1024).unwrap(); - let socket = tokio_openssl::accept(acceptor.as_ref(), socket).await?; + let ssl = openssl::ssl::Ssl::new(acceptor.context())?; + let stream = tokio_openssl::SslStream::new(ssl, socket)?; + let mut stream = Box::pin(stream); + + stream.as_mut().accept().await?; let mut http = hyper::server::conn::Http::new(); http.http2_only(true); @@ -61,7 +59,7 @@ async fn handle_connection( let service = hyper::service::service_fn(|_req: Request| { println!("Got request"); - let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...] + let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...] let body = Body::from(buffer); let response = Response::builder() @@ -72,7 +70,7 @@ async fn handle_connection( future::ok::<_, Error>(response) }); - http.serve_connection(socket, service) + http.serve_connection(stream, service) .map_err(Error::from) .await?; diff --git a/examples/h2server.rs b/examples/h2server.rs index 1669347f..1b06557c 100644 --- a/examples/h2server.rs +++ b/examples/h2server.rs @@ -1,51 +1,55 @@ -use anyhow::{Error}; +use anyhow::Error; use futures::*; +use hyper::{Body, Request, Response}; -// Simple H2 server to test H2 speed with h2client.rs - -use tokio::net::TcpListener; -use tokio::io::{AsyncRead, AsyncWrite}; - -use proxmox_backup::client::pipe_to_stream::PipeToSendStream; +use tokio::net::{TcpListener, TcpStream}; fn main() -> Result<(), Error> { proxmox_backup::tools::runtime::main(run()) } async fn run() -> Result<(), Error> { - let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; + let listener = TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); loop { let (socket, _addr) = listener.accept().await?; - tokio::spawn(handle_connection(socket) - .map(|res| { - if let Err(err) = res { - eprintln!("Error: {}", err); - } - })); + tokio::spawn(handle_connection(socket).map(|res| { + if let Err(err) = res { + eprintln!("Error: {}", err); + } + })); } } -async fn handle_connection(socket: T) -> Result<(), Error> { - let mut conn = h2::server::handshake(socket).await?; +async fn handle_connection(socket: TcpStream) -> Result<(), Error> { + socket.set_nodelay(true).unwrap(); - println!("H2 connection bound"); + let mut http = hyper::server::conn::Http::new(); + http.http2_only(true); + // increase window size: todo - find optiomal size + let max_window_size = (1 << 31) - 2; + http.http2_initial_stream_window_size(max_window_size); + http.http2_initial_connection_window_size(max_window_size); - while let Some((request, mut respond)) = conn.try_next().await? { - println!("GOT request: {:?}", request); + let service = hyper::service::service_fn(|_req: Request| { + println!("Got request"); + let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...] + let body = Body::from(buffer); - let response = http::Response::builder() + let response = Response::builder() .status(http::StatusCode::OK) - .body(()) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .body(body) .unwrap(); + future::ok::<_, Error>(response) + }); - let send = respond.send_response(response, false).unwrap(); - let data = vec![65u8; 1024*1024]; - PipeToSendStream::new(bytes::Bytes::from(data), send).await?; - println!("DATA SENT"); - } + http.serve_connection(socket, service) + .map_err(Error::from) + .await?; + println!("H2 connection CLOSE !"); Ok(()) } -- 2.20.1