1

私はtwistedにかなり慣れていないので、トライアルテストフレームワークを使用していくつかのユニットテストを作成しようとしています。私のテストは期待どおりに実行され、合格しましたが、何らかの理由でテストの間に試行がぶら下がっています。 次のテストに進むには、各テストの後にCTRL+を押す必要があります。C何かが正しく構成されていないか、テストが完了したことを試行するように指示する必要があるメソッドを呼び出していないと推測しています。

テスト中のクラスは次のとおりです。

from twisted.internet import reactor, defer
import threading
import time


class SomeClass:
   def doSomething(self):
      return self.asyncMethod()

   def asyncMethod(self):
       d = defer.Deferred()
       t = SomeThread(d)
       t.start()
       return d


class SomeThread(threading.Thread):
    def __init__(self, d):
        super(SomeThread, self).__init__()
        self.d = d

    def run(self):
        time.sleep(2) # pretend to do something
        retVal = 123
        self.d.callback(retVal)

ユニットテストクラスは次のとおりです。

from twisted.trial import unittest
import tested


class SomeTest(unittest.TestCase):
    def testOne(self):
        sc = tested.SomeClass()
        d = sc.doSomething()
        return d.addCallback(self.allDone)

    def allDone(self, retVal):
        self.assertEquals(retVal, 123)

    def testTwo(self):
        sc = tested.SomeClass()
        d = sc.doSomething()
        return d.addCallback(self.allDone2)

    def allDone2(self, retVal):
        self.assertEquals(retVal, 123)

コマンドライン出力は次のようになります。

me$ trial test.py 
test
  SomeTest
    testOne ... ^C                                                           [OK]
    testTwo ... ^C                                                           [OK]

-------------------------------------------------------------------------------
Ran 2 tests in 8.499s

PASSED (successes=2)
4

1 に答える 1

1

あなたの問題があなたのスレッドに関係していると思います。Twistedはスレッドセーフではありません。スレッドとのインターフェースが必要な場合は、、、を使用してリアクターに処理させる必要がdeferToThreadありcallInThreadますcallFromThread。Twistedでスレッドセーフにする方法については、こちらをご覧ください。

于 2012-04-15T18:09:00.810 に答える