2

わかりました。このクラスは、Googleコード検索でグーグルで検索した他のスピナークラスに基づいて作成しました。

意図したとおりに機能していますが、KeyboardInterruptとSystemExitの例外を処理するためのより良い方法を探しています。より良いアプローチはありますか?

これが私のコードです:

#!/usr/bin/env python
import itertools
import sys
import threading

class Spinner(threading.Thread):
    '''Represent a random work indicator, handled in a separate thread'''

    # Spinner glyphs
    glyphs = ('|', '/', '-', '\\', '|', '/', '-')
    # Output string format
    output_format = '%-78s%-2s'
    # Message to output while spin
    spin_message = ''
    # Message to output when done
    done_message = ''
    # Time between spins
    spin_delay = 0.1

    def __init__(self, *args, **kwargs):
        '''Spinner constructor'''
        threading.Thread.__init__(self, *args, **kwargs)
        self.daemon = True
        self.__started = False
        self.__stopped = False
        self.__glyphs = itertools.cycle(iter(self.glyphs))

    def __call__(self, func, *args, **kwargs):
        '''Convenient way to run a routine with a spinner'''
        self.init()
        skipped = False

        try:
            return func(*args, **kwargs)
        except (KeyboardInterrupt, SystemExit):
            skipped = True
        finally:
            self.stop(skipped)

    def init(self):
        '''Shows a spinner'''
        self.__started = True
        self.start()

    def run(self):
        '''Spins the spinner while do some task'''
        while not self.__stopped:
            self.spin()

    def spin(self):
        '''Spins the spinner'''
        if not self.__started:
            raise NotStarted('You must call init() first before using spin()')

        if sys.stdin.isatty():
            sys.stdout.write('\r')
            sys.stdout.write(self.output_format % (self.spin_message,
                                                   self.__glyphs.next()))
            sys.stdout.flush()
            time.sleep(self.spin_delay)

    def stop(self, skipped=None):
        '''Stops the spinner'''
        if not self.__started:
            raise NotStarted('You must call init() first before using stop()')

        self.__stopped = True
        self.__started = False

        if sys.stdin.isatty() and not skipped:
            sys.stdout.write('\b%s%s\n' % ('\b' * len(self.done_message),
                                           self.done_message))
            sys.stdout.flush()

class NotStarted(Exception):
    '''Spinner not started exception'''
    pass

if __name__ == '__main__':
    import time

    # Normal example
    spinner1 = Spinner()
    spinner1.spin_message = 'Scanning...'
    spinner1.done_message = 'DONE'
    spinner1.init()
    skipped = False

    try:
        time.sleep(5)
    except (KeyboardInterrupt, SystemExit):
        skipped = True
    finally:
        spinner1.stop(skipped)

    # Callable example
    spinner2 = Spinner()
    spinner2.spin_message = 'Scanning...'
    spinner2.done_message = 'DONE'
    spinner2(time.sleep, 5)

前もって感謝します。

4

1 に答える 1

2

SystemExitによって発生するため、キャッチについて心配する必要はありませんsys.exit()。プログラムが終了する直前に、いくつかのリソースをクリーンアップするためにそれをキャッチすることをお勧めします。

キャッチするもう1つの方法KeyboardInterruptは、キャッチする信号ハンドラーを登録することSIGINTです。ただし、例を使用try..exceptする方が理にかなっているため、正しい方向に進んでいます。

いくつかのマイナーな提案:

  • おそらく、__call__メソッドの名前をに変更しstartて、ジョブを開始していることをより明確にします。
  • startコンストラクターではなく、メソッド内に新しいスレッドをアタッチすることで、Spinnerクラスを再利用可能にすることもできます。
  • また、ユーザーが現在のスピナージョブでCTRL-Cを押すとどうなるかを検討してください。次のジョブを開始できますか、それともアプリを終了する必要がありますか?
  • また、実行しようとしているタスクに関連付けるためspin_messageの最初の引数を作成することもできます。start

たとえば、誰かがSpinnerを使用する方法は次のとおりです。

dbproc = MyDatabaseProc()
spinner = Spinner()
spinner.done_message = 'OK'
try:
    spinner.start("Dropping the database", dbproc.drop, "mydb")
    spinner.start("Re-creating the database", dbproc.create, "mydb")
    spinner.start("Inserting data into tables", dbproc.populate)
    ...
except (KeyboardInterrupt, SystemExit):
    # stop the currently executing job
    spinner.stop()
    # do some cleanup if needed..
    dbproc.cleanup()
于 2011-03-06T00:44:15.883 に答える