私はこの解決策を思いついた。(まだテストされていません)Web上での多くのバウンスを介して。
Private Function ObserveUDP() As IObservable(Of bytes())
Dim f = Function(observer)
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Dim client = New UdpClient(endpoint)
Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
( Nothing _
, Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
, Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
, Function(task As Task(Of UdpReceiveResult)) task.Result)
Dim observable = obs.Select(Function(r) r.Buffer)
dim handle = observable.Subscribe(observer)
Dim df = Sub()
client.Close()
handle.Dispose()
End Sub
Return Disposable.Create(df)
End Function
Return observable.Create(f)
End Function
私の要件は、サブスクリプションがドロップされたときにUDPクライアントが閉じていることを確認することでした。上記のコードは近いと確信していますが、正しくないと思います。任意の入力をいただければ幸いです。
* 編集 *
実際、上記の例は完全に間違っており、多数のタスクオブジェクトを同期的に作成するだけで、それらを待つことはありません。少し試行錯誤した後、何度も呼び出される待望の展開のための次のジェネリック関数を思いつきました。コメントはありますか?
''' initializer - a function that initializes and returns the state object
''' generator - a function that asynchronously using await generates each value
''' finalizer - a function for cleaning up the state object when the sequence is unsubscribed
Private Function ObservableAsyncSeq(Of T, I)( _
initializer As Func(Of I), _
generator As Func(Of I, Task(Of T)), _
finalizer As Action(Of I)) As IObservable(Of T)
Dim q = Function(observer As IObserver(Of T))
Dim go = True
Try
Dim r = Async Sub()
Dim ii As I = initializer()
While go
Dim result = Await generator(ii)
observer.OnNext(result)
End While
finalizer(ii)
observer.OnCompleted()
End Sub
Task.Run(r)
Catch ex As Exception
observer.OnError(ex)
End Try
' Disposable for stopping the sequence as per
' the observable contract
Return Sub() go = False
End Function
Return Observable.Create(q)
End Function
そしてUDPでの使用例
Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
Dim initializer = Function()
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Return New UdpClient(endpoint)
End Function
Dim finalizer = Function(client As UdpClient)
client.Close()
End Function
Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
Return client.ReceiveAsync()
End Function
Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))
End Function