IObservable<Tweet>
以下に示すように、LinqToTwitter で観察可能なコレクションを作成しました。問題は、最初のオブザーバブルを破棄して新しいオブザーバブルをサブスクライブするときに、この実装に同時実行の問題があることです。
最初のオブザーバブルを正しく処分するにはどうすればよいですか?
(以下のサンプルは完全であり、そのまま動作するはずです。参照パッケージと Twitter 資格情報を追加するだけです。)
この問題が発生する例を次に示します。
using System;
using System.Reactive.Linq;
namespace Twitter.Cli
{
class Program
{
public static void Main(string[] args)
{
var twitter = new TwitterApi.Twitter();
var search1 = twitter.AllTweetsAbout("windows")
.Sample(TimeSpan.FromSeconds(1));
var search2 = twitter.AllTweetsAbout("android")
.Sample(TimeSpan.FromSeconds(1));
var sub = search1.Subscribe(
x =>
Console.WriteLine("TOPIC = {0} - CONTAINS STRING: {1}", x.Topic, x.Text.ToLower().Contains(x.Topic.ToLower()) ? "YES" : "NO"));
Console.ReadLine();
sub.Dispose();
/*
* If you stop the processing here for a while so that the StartAsync method can be executed
* within the closure everything works fine because disposed is set to true
* before the second observable is created
*/
//Console.ReadLine();
search2.Subscribe(
x =>
Console.WriteLine("TOPIC = {0} - CONTAINS STRING: {1}", x.Topic, x.Text.ToLower().Contains(x.Topic.ToLower()) ? "YES" : "NO"));
Console.ReadLine();
}
}
}
StartAsync
2 番目のオブザーバブルが作成される前に、最初のオブザーバブルの作成のクロージャーのメソッドが実行された場合はdisposed
、に設定されtrue
、すべて問題ありません。
ただし、最初のクロージャーの次の実行の前に 2 番目のオブザーバブルが作成された場合、StartAsync
disposed
再び false に設定され、s.CloseStream();
呼び出されることはありません。
オブザーバブルの作成は次のとおりです。
using System;
using System.Configuration;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using LinqToTwitter;
namespace TwitterApi
{
public class Twitter
{
private readonly SingleUserAuthorizer _auth = new SingleUserAuthorizer
{
CredentialStore = new InMemoryCredentialStore
{
ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
OAuthToken = ConfigurationManager.AppSettings["authtoken"],
OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
}
};
private readonly TwitterContext _twitterCtx;
public Twitter()
{
if (String.IsNullOrWhiteSpace(_auth.CredentialStore.ConsumerKey)
|| String.IsNullOrWhiteSpace(_auth.CredentialStore.ConsumerSecret)
|| String.IsNullOrWhiteSpace(_auth.CredentialStore.OAuthToken)
|| String.IsNullOrWhiteSpace(_auth.CredentialStore.OAuthTokenSecret))
throw new Exception("User Credentials are not set. Please update your App.config file.");
_twitterCtx = new TwitterContext(_auth);
}
public IObservable<Tweet> AllTweetsAbout(string topic)
{
return Observable.Create<Tweet>(o =>
{
var query = from s in _twitterCtx.Streaming
where s.Type == StreamingType.Filter &&
s.Track == topic
select s;
var disposed = false;
query.StartAsync(async s =>
{
if (disposed)
s.CloseStream();
else
{
Tweet t;
if (Tweet.TryParse(s.Content, topic, out t))
{
o.OnNext(t);
}
}
});
return Disposable.Create(() => disposed = true);
});
}
}
}
そして最後にTweet
クラス:
using System;
using Newtonsoft.Json.Linq;
namespace TwitterApi
{
public class Tweet
{
public string User { get; private set; }
public string Text { get; private set; }
public string Topic { get; private set; }
public static bool TryParse(string json, string topic, out Tweet tweet)
{
try
{
dynamic parsed = JObject.Parse(json);
tweet = new Tweet
{
User = parsed.user.screen_name,
Text = parsed.text,
Topic = topic,
};
return true;
}
catch (Exception)
{
tweet = null;
return false;
}
}
private Tweet()
{
}
}
}