0

私はtokioStreamを使用してクレート マルチキューのベンチマークを実行し、反復可能な s を作成することで、パブリッシャー/サブスクライバーのラインに沿って何かを実装しようとしています。効率性については確信が持てません (項目をフィルター処理する数十または数百のリスナーが必要になる可能性があり、1 つの発行者が 1 ミリ秒あたり約 10 メッセージを発行することになります)。それ。しかし、今のところ、tokio::timer::Intervalまったく起動していないように見える奇妙なバグに遭遇しています。

完全なコードは以下のとおりです。

#![feature(test)]

extern crate futures;
extern crate multiqueue;
extern crate test;
extern crate tokio;

#[cfg(test)]
mod tests {
    use super::*;
    use futures::future::lazy;
    use futures::sync::mpsc::{channel, Receiver, Sender};
    use futures::{Async, Poll, Stream};
    use futures::{Future, Sink};
    use test::Bencher;
    use tokio::timer::Interval;

    #[bench]
    fn bench_many(b: &mut Bencher) {
        tokio::run(lazy(|| {
            let (tx, rx) = multiqueue::mpmc_fut_queue(1000);
            tokio::spawn(
                Interval::new_interval(std::time::Duration::from_micros(100))
                    .take(100)
                    .map(|_| 100)
                    .map_err(|e| {
                        eprintln!("Got interval error = {:?}", e);
                    })
                    .fold(tx, |tx, num| {
                        println!("Sending {}", num);
                        tx.send(num).map_err(|e| println!("send err = {:?}", e))
                    })
                    .map(|_| ()),
            );

            for i in 0..3 {
                println!("Starting");
                let rx = rx.clone();
                tokio::spawn(rx.for_each(move |num| {
                    println!("{} Got a num! {}", i, num);
                    Ok(())
                }));
            }

            Ok(())
        }));
    }
}

で実行していcargo benchます。futuresバージョン"0.1"tokioバージョン"0.1"multiqueueバージョン"0.3"

テスト全体が「[0-2] Got a num! 100」と「Sending 100」という多くのメッセージで完了することもありますが、途中でハングすることもあります (いくつかの「Sending」と「Got a」メッセージの後) または3 つの「Starting」メッセージだけでハングします。

これは で同時に実行できるタスクの数に問題があるのでtokioはないかと思いますが、生成している両方のタイプのタスクが発生するため、これがなぜ私が実行する制限になるのかよくわかりません頻繁にエグゼキュータに時間をかけます。

これをより信頼できるものにするにはどうすればよいですか?

4

0 に答える 0