Python には JavaScript に似た機能がありますsetInterval()
か?
私はを頂きたい:
def set_interval(func, interval):
...
func
それは毎回interval
ユニットを呼び出します。
Python には JavaScript に似た機能がありますsetInterval()
か?
私はを頂きたい:
def set_interval(func, interval):
...
func
それは毎回interval
ユニットを呼び出します。
素敵でシンプルにしてください。
import threading
def setInterval(func,time):
e = threading.Event()
while not e.wait(time):
func()
def foo():
print "hello"
# using
setInterval(foo,5)
# output:
hello
hello
.
.
.
EDIT : このコードはノンブロッキングです
import threading
class ThreadJob(threading.Thread):
def __init__(self,callback,event,interval):
'''runs the callback function after interval seconds
:param callback: callback function to invoke
:param event: external event for controlling the update operation
:param interval: time in seconds after which are required to fire the callback
:type callback: function
:type interval: int
'''
self.callback = callback
self.event = event
self.interval = interval
super(ThreadJob,self).__init__()
def run(self):
while not self.event.wait(self.interval):
self.callback()
event = threading.Event()
def foo():
print "hello"
k = ThreadJob(foo,event,2)
k.start()
print "It is non-blocking"
Nailxxの回答を少し変更すると、回答が得られました。
from threading import Timer
def hello():
print "hello, world"
Timer(30.0, hello).start()
Timer(30.0, hello).start() # after 30 seconds, "hello, world" will be printed
このsched
モジュールは、一般的な Python コードにこれらの機能を提供します。ただし、そのドキュメントが示唆するように、コードがマルチスレッド化されている場合は、threading.Timer
代わりにクラスを使用する方が理にかなっています。
これがあなたが求めているものだと思います:
#timertest.py
import sched, time
def dostuff():
print "stuff is being done!"
s.enter(3, 1, dostuff, ())
s = sched.scheduler(time.time, time.sleep)
s.enter(3, 1, dostuff, ())
s.run()
繰り返しメソッドの最後に別のエントリをスケジューラに追加すると、そのまま続行されます。
上記の解決策では、プログラムがシャットダウンされる状況が発生した場合、正常にシャットダウンされるという保証はありません。常にソフトキルを介してプログラムをシャットダウンすることをお勧めしますが、それらのほとんどには停止する機能もありませんでした。素敵な記事を見つけましたこれらの問題の両方を解決する Sankalp によって書かれた媒体 ( Python で定期的なタスクを実行する) については、添付のリンクを参照して、より深い洞察を得てください。以下のサンプルでは、signal という名前のライブラリを使用して、キルがソフト キルかハード キルかを追跡します。
import threading, time, signal
from datetime import timedelta
WAIT_TIME_SECONDS = 1
class ProgramKilled(Exception):
pass
def foo():
print time.ctime()
def signal_handler(signum, frame):
raise ProgramKilled
class Job(threading.Thread):
def __init__(self, interval, execute, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = False
self.stopped = threading.Event()
self.interval = interval
self.execute = execute
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)
if __name__ == "__main__":
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
job = Job(interval=timedelta(seconds=WAIT_TIME_SECONDS), execute=foo)
job.start()
while True:
try:
time.sleep(1)
except ProgramKilled:
print "Program killed: running cleanup code"
job.stop()
break
#output
#Tue Oct 16 17:47:51 2018
#Tue Oct 16 17:47:52 2018
#Tue Oct 16 17:47:53 2018
#^CProgram killed: running cleanup code
間隔をキャンセルできるようにする必要があったため、上記の方法ではうまくいきませんでした。関数をクラスに変えて、次のように思いつきました。
class setInterval():
def __init__(self, func, sec):
def func_wrapper():
self.t = threading.Timer(sec, func_wrapper)
self.t.start()
func()
self.t = threading.Timer(sec, func_wrapper)
self.t.start()
def cancel(self):
self.t.cancel()
最近、私はあなたと同じ問題を抱えています。そして、私はこれらの解決策を見つけます:
1. ライブラリを使用できます: threading.Time (これには上記の紹介があります)
2.ライブラリを使用できます:sched(これにも上記の紹介があります)
3. ライブラリを使用できます: Advanced Python Scheduler (推奨)
setInterval は複数のスレッドで実行する必要があり、ループを実行しているときにタスクをフリーズしないでください。
マルチスレッド機能をサポートするRUNTIMEパッケージは次のとおりです。
短くて簡単です。関数を直接入力する場合、Python にはラムダが必要ですが、ラムダはコマンド ブロックをサポートしていないため、setInterval に入れる前に関数の内容を定義する必要があります。
### DEMO PYTHON MULTITHREAD ASYNCHRONOUS LOOP ###
import time;
import threading;
import random;
def delay(ms):time.sleep(ms/1000); # Controil while speed
def setTimeout(R,delayMS):
t=threading.Timer(delayMS/1000,R)
t.start();
return t;
def delayF(R,delayMS):
t=threading.Timer(delayMS/1000,R)
t.start();
return t;
class THREAD:
def __init__(this):
this.R_onRun=None;
this.thread=None;
def run(this):
this.thread=threading.Thread(target=this.R_onRun);
this.thread.start();
def isRun(this): return this.thread.isAlive();
class setInterval :
def __init__(this,R_onRun,msInterval) :
this.ms=msInterval;
this.R_onRun=R_onRun;
this.kStop=False;
this.thread=THREAD();
this.thread.R_onRun=this.Clock;
this.thread.run();
def Clock(this) :
while not this.kStop :
this.R_onRun();
delay(this.ms);
def pause(this) :
this.kStop=True;
def stop(this) :
this.kStop=True;
def resume(this) :
if (this.kStop) :
this.kStop=False;
this.thread.run();
def clearInterval(Timer): Timer.stop();
# EXAMPLE
def p():print(random.random());
tm=setInterval(p,20);
tm2=setInterval(lambda:print("AAAAA"),20);
delayF(tm.pause,1000);
delayF(tm.resume,2000);
delayF(lambda:clearInterval(tm),3000);
ファイル .py に保存して実行します。乱数と文字列 "AAAAA" の両方が表示されます。印刷番号スレッドは、1 秒後に印刷を一時停止し、再び 1 秒間印刷を再開してから停止しますが、印刷文字列は印刷テキストを破損しないように維持します。
グラフィック アニメーションにOpenCV を使用し、アニメーション速度を上げるために setInterval を使用する場合、waitKey を適用するメイン スレッドが 1 つ必要です。
def p:... # Your drawing task
setInterval(p,1); # Subthread1 running draw
setInterval(p,1); # Subthread2 running draw
setInterval(p,1); # Subthread3 running draw
while True: cv2.waitKey(10); # Main thread which waitKey have effect
Python では動作が異なりsleep()
ます。現在のスレッドをブロックするか、新しいスレッドを開始する必要があります。http://docs.python.org/library/threading.htmlを参照してください
from threading import Timer
def hello():
print "hello, world"
t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed