1

IObservable<Tweet>以下に示すように、LinqToTwitter で観察可能なコレクションを作成しました。問題は、最初のオブザーバブルを破棄して新しいオブザーバブルをサブスクライブするときに、この実装に同時実行の問題があることです。

最初のオブザーバブルを正しく処分するにはどうすればよいですか?

(以下のサンプルは完全であり、そのまま動作するはずです。参照パッケージと Twitter 資格情報を追加するだけです。)

この問題が発生する例を次に示します。

using System;
using System.Reactive.Linq;

namespace Twitter.Cli
{
    class Program
    {
        public static void Main(string[] args)
        {
            var twitter = new TwitterApi.Twitter();

            var search1 = twitter.AllTweetsAbout("windows")
                .Sample(TimeSpan.FromSeconds(1));

            var search2 = twitter.AllTweetsAbout("android")
                .Sample(TimeSpan.FromSeconds(1));

            var sub = search1.Subscribe(
                x =>
                    Console.WriteLine("TOPIC = {0} - CONTAINS STRING: {1}", x.Topic, x.Text.ToLower().Contains(x.Topic.ToLower()) ? "YES" : "NO"));

            Console.ReadLine();

            sub.Dispose();

            /* 
            * If you stop the processing here for a while so that the StartAsync method can be executed 
            * within the closure everything works fine because disposed is set to true 
            * before the second observable is created 
            */
            //Console.ReadLine(); 

            search2.Subscribe(
                x =>
                    Console.WriteLine("TOPIC = {0} - CONTAINS STRING: {1}", x.Topic, x.Text.ToLower().Contains(x.Topic.ToLower()) ? "YES" : "NO"));

            Console.ReadLine();
        }
    }
}

StartAsync2 番目のオブザーバブルが作成される前に、最初のオブザーバブルの作成のクロージャーのメソッドが実行された場合はdisposed、に設定されtrue、すべて問題ありません。

ただし、最初のクロージャーの次の実行の前に 2 番目のオブザーバブルが作成された場合、StartAsync disposed再び false に設定され、s.CloseStream();呼び出されることはありません。

オブザーバブルの作成は次のとおりです。

using System;
using System.Configuration;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using LinqToTwitter;

namespace TwitterApi
{
    public class Twitter
    {
        private readonly SingleUserAuthorizer _auth = new SingleUserAuthorizer
        {
            CredentialStore = new InMemoryCredentialStore
            {
                ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
                ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
                OAuthToken = ConfigurationManager.AppSettings["authtoken"],
                OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
            }
        };

        private readonly TwitterContext _twitterCtx;

        public Twitter()
        {
            if (String.IsNullOrWhiteSpace(_auth.CredentialStore.ConsumerKey)
                || String.IsNullOrWhiteSpace(_auth.CredentialStore.ConsumerSecret)
                || String.IsNullOrWhiteSpace(_auth.CredentialStore.OAuthToken)
                || String.IsNullOrWhiteSpace(_auth.CredentialStore.OAuthTokenSecret))
                throw new Exception("User Credentials are not set. Please update your App.config file.");

            _twitterCtx = new TwitterContext(_auth);
        }

        public IObservable<Tweet> AllTweetsAbout(string topic)
        {
            return Observable.Create<Tweet>(o =>
            {
                var query = from s in _twitterCtx.Streaming
                            where s.Type == StreamingType.Filter &&
                                    s.Track == topic
                            select s;

                var disposed = false;

                query.StartAsync(async s =>
                {
                    if (disposed)
                        s.CloseStream();
                    else
                    {
                        Tweet t;
                        if (Tweet.TryParse(s.Content, topic, out t))
                        {
                            o.OnNext(t);
                        }
                    }
                });

                return Disposable.Create(() => disposed = true);
            });
        }
    }
}

そして最後にTweetクラス:

using System;
using Newtonsoft.Json.Linq;

namespace TwitterApi
{
    public class Tweet
    {
        public string User { get; private set; }
        public string Text { get; private set; }
        public string Topic { get; private set; }

        public static bool TryParse(string json, string topic, out Tweet tweet)
        {
            try
            {
                dynamic parsed = JObject.Parse(json);
                tweet = new Tweet
                {
                    User = parsed.user.screen_name,
                    Text = parsed.text,
                    Topic = topic,
                };
                return true;
            }
            catch (Exception)
            {
                tweet = null;
                return false;
            }
        }

        private Tweet()
        {

        }
    }
}
4

0 に答える 0