From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <s.reiter@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id 4BDF862817
 for <pbs-devel@lists.proxmox.com>; Wed, 30 Sep 2020 16:16:11 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id 457391A2FF
 for <pbs-devel@lists.proxmox.com>; Wed, 30 Sep 2020 16:16:11 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [212.186.127.180])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS id D9D1F1A2D4
 for <pbs-devel@lists.proxmox.com>; Wed, 30 Sep 2020 16:16:08 +0200 (CEST)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 9478A45A48
 for <pbs-devel@lists.proxmox.com>; Wed, 30 Sep 2020 16:16:08 +0200 (CEST)
From: Stefan Reiter <s.reiter@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Wed, 30 Sep 2020 16:15:58 +0200
Message-Id: <20200930141601.27233-3-s.reiter@proxmox.com>
X-Mailer: git-send-email 2.20.1
In-Reply-To: <20200930141601.27233-1-s.reiter@proxmox.com>
References: <20200930141601.27233-1-s.reiter@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL -0.046 Adjusted score from AWL reputation of From: address
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_DNSWL_MED        -2.3 Sender listed at https://www.dnswl.org/,
 medium trust
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [pull.rs, verify.rs]
Subject: [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded
 mode
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Wed, 30 Sep 2020 14:16:11 -0000

Enables non-blocking send. Only use when the data being sent is small,
otherwise the channel buffer might take a lot of memory.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/verify.rs          |  3 ++-
 src/client/pull.rs            |  3 ++-
 src/tools/parallel_handler.rs | 15 ++++++++++++---
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index fd48d907..c24d48e6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -129,7 +129,8 @@ fn verify_index_chunks(
             }
 
             Ok(())
-        }
+        },
+        true
     );
 
     for pos in 0..index.index_count() {
diff --git a/src/client/pull.rs b/src/client/pull.rs
index d88d64f9..08d92d6e 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -57,7 +57,8 @@ async fn pull_index_chunks<I: IndexFile>(
             chunk.verify_unencrypted(size as usize, &digest)?;
             target.insert_chunk(&chunk, &digest)?;
             Ok(())
-       }
+       },
+       true
     );
 
     let verify_and_write_channel = verify_pool.channel();
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..b15fc046 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
 use std::thread::JoinHandle;
 
 use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
+use crossbeam_channel::{bounded, unbounded, Sender};
 
 /// A handle to send data to the worker thread (implements clone)
 pub struct SendHandle<I> {
@@ -57,11 +57,20 @@ impl<I> Clone for SendHandle<I> {
 impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
     /// Create a new thread pool, each thread processing incoming data
     /// with 'handler_fn'.
-    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+    pub fn new<F>(
+        name: &str,
+        threads: usize,
+        handler_fn: F,
+        bounded_mode: bool,
+    ) -> Self
         where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
     {
         let mut handles = Vec::new();
-        let (input_tx, input_rx) = bounded::<I>(threads);
+        let (input_tx, input_rx) = if bounded_mode {
+            bounded::<I>(threads)
+        } else {
+            unbounded::<I>()
+        };
 
         let abort = Arc::new(Mutex::new(None));
 
-- 
2.20.1