3

私はこの解決策を思いついた。(まだテストされていません)Web上での多くのバウンスを介して。

Private Function ObserveUDP() As IObservable(Of bytes())


    Dim f = Function(observer)
                Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                Dim client = New UdpClient(endpoint)

                Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
                      ( Nothing _
                      , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
                      , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
                      , Function(task As Task(Of UdpReceiveResult)) task.Result)

                Dim observable = obs.Select(Function(r) r.Buffer)

                dim handle = observable.Subscribe(observer)

                Dim df = Sub() 
                    client.Close()
                    handle.Dispose()
                End Sub

                Return Disposable.Create(df)

    End Function

    Return observable.Create(f)

End Function

私の要件は、サブスクリプションがドロップされたときにUDPクライアントが閉じていることを確認することでした。上記のコードは近いと確信していますが、正しくないと思います。任意の入力をいただければ幸いです。

* 編集 *

実際、上記の例は完全に間違っており、多数のタスクオブジェクトを同期的に作成するだけで、それらを待つことはありません。少し試行錯誤した後、何度も呼び出される待望の展開のための次のジェネリック関数を思いつきました。コメントはありますか?

''' initializer - a function that initializes and returns the state object
''' generator   - a function that asynchronously using await generates each value
''' finalizer   - a function for cleaning up the state object when the sequence is unsubscribed

Private Function ObservableAsyncSeq(Of T, I)( _
    initializer As Func(Of I), _
    generator As Func(Of I, Task(Of T)), _
    finalizer As Action(Of I))  As IObservable(Of T)

    Dim q = Function(observer As IObserver(Of T))
                Dim go = True
                Try
                    Dim r = Async Sub()
                                Dim ii As I = initializer()
                                While go
                                    Dim result = Await generator(ii)
                                    observer.OnNext(result)
                                End While
                                finalizer(ii)
                                observer.OnCompleted()
                            End Sub
                    Task.Run(r)
                Catch ex As Exception
                    observer.OnError(ex)
                End Try

                ' Disposable for stopping the sequence as per
                ' the observable contract
                Return Sub() go = False

            End Function

    Return Observable.Create(q)
End Function

そしてUDPでの使用例

Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
    Dim initializer = Function()
                          Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                          Return New UdpClient(endpoint)
                      End Function

    Dim finalizer = Function(client As UdpClient)
                        client.Close()
                    End Function

    Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
                        Return client.ReceiveAsync()
                    End Function

    Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))

End Function
4

3 に答える 3

5

Enigmativity で述べたように使用するかObservable.Using、単に戻り引数としてObservable.Createを受け入れる通常のメソッドを使用することができIDisposableます - これは安全な廃棄に十分です。

イテレータまたは非同期の使用はまったく問題ありません。これを行うためのよりRxっぽい方法をリストしました:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(Of T, UdpClient)(
        Function() New UdpClient(endpoint),
        Function(udpClient) _
            Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
            .Repeat() _
            .Select(Function(result) processor(result.Buffer))
    )
End Function

従来の方法:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(
        Function() New UdpClient(endpoint),
        Function(udpClient) Observable.Defer( _
        Observable.FromAsyncPattern(
            AddressOf udpClient.BeginReceive,
            Function(iar)
                Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
                Return udpClient.EndReceive(iar, remoteEp)
            End Function)
        ).Repeat() _
         .Select(processor)
    )
End Function

テスト:

Shared Sub Main()
    Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
                    Function(bytes) String.Join(",", bytes)
                    ).Subscribe(AddressOf Console.WriteLine)
        Console.ReadLine()
    End Using

    Console.WriteLine("Done")
    Console.ReadKey()
End Sub
于 2012-09-28T20:29:34.550 に答える
2

見てみましょうObservable.Using- 特に、破棄可能なリソースを使用してその値を生成し、完了するとリソースを自動的に破棄するオブザーバブルを作成するために使用されます。

の&メソッドの実装UdpClientは同一であるため、 を呼び出す場合は呼び出す必要がないことがわかります。CloseDisposeCloseDispose

リフレクターから:

void IDisposable.Dispose()
{
    this.Dispose(true);
}

public void Close()
{
    this.Dispose(true);
}

の署名は次のUsingとおりです。

Public Shared Function Using(Of TSource, TResource As IDisposable)(
    ByVal resourceFactory As Func(Of TResource),
    ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
        As IObservable(Of TSource)
于 2012-09-28T00:19:55.780 に答える
1

以前に UDPClient を使用したことはありませんが、Tasks (Cardinality = 1) を使用してデータのストリーム (Cardinality = many) を受信しようとしているようです。これは、クエリで繰り返しを平手打ちしたことで回避できるようです。これは、クエリがこれを行うことを意味します

  1. UDP クライアントを作成する
  2. データのリクエストを呼び出す
  3. 取得した最初のデータを受け取る
  4. シーケンスにデータをプッシュします
  5. シーケンスを閉じる
  6. UDPClient を破棄する
  7. UDPClient を作成します (手順 1 に戻る)
  8. データのリクエストを呼び出す
  9. 取得した最初のデータを受け取る
  10. ....接続を破棄するまで。

バイトストリームを引き込むことで、ソケット/ネットワーク接続を読み取ることができるはずだと思います。私のブログ投稿でこれを行う方法を示します。

http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator

この方法では、1 つの接続を開いたままにし、受信時にバイトをプッシュするだけです。

簡単なグーグルを持っていると、UDPClient 実装の信頼性について懸念があることもわかりました。 http://www.codeproject.com/Articles/1938/Issues-with-UdpClient-Receive

HTH

リー

using System;
using System.IO;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace MyLib
{
    public static class ObservableExtensions
    {
        //TODO: Could potentially upgrade to using tasks/Await-LC
        public static IObservable<byte> ToObservable(
            this Stream source,
            int buffersize,
            IScheduler scheduler)
        {
            var bytes = Observable.Create<byte>(o =>
            {
                var initialState = new StreamReaderState(source, buffersize);
                var currentStateSubscription = new SerialDisposable();
                Action<StreamReaderState, Action<StreamReaderState>> iterator =
                (state, self) =>
                    currentStateSubscription.Disposable = state.ReadNext()
                        .Subscribe(
                            bytesRead =>
                            {
                                for (int i = 0; i < bytesRead; i++)
                                {
                                    o.OnNext(state.Buffer[i]);
                                }
                                if (bytesRead > 0)
                                    self(state);
                                else
                                    o.OnCompleted();
                            },
                            o.OnError);
                var scheduledWork = scheduler.Schedule(initialState, iterator);
                return new CompositeDisposable(currentStateSubscription, scheduledWork);
            });
            return Observable.Using(() => source, _ => bytes);
        }

        private sealed class StreamReaderState
        {
            private readonly int _bufferSize;
            private readonly Func<byte[], int, int, IObservable<int>> _factory;
            public StreamReaderState(Stream source, int bufferSize)
            {
                _bufferSize = bufferSize;
                _factory = Observable.FromAsyncPattern<byte[], int, int, int>(
                source.BeginRead,
                source.EndRead);
                Buffer = new byte[bufferSize];
            }
            public IObservable<int> ReadNext()
            {
                return _factory(Buffer, 0, _bufferSize);
            }
            public byte[] Buffer { get; set; }
        }
    }
}
于 2012-10-03T09:27:39.073 に答える