次のコードで dbus サーバーを作成し、server1.py という名前を付けました。
#!/usr/bin/python2.7
import dbus.service
import dbus.glib
import glib
import gobject
from dbus.mainloop.glib import DBusGMainLoop
class APP_Server(dbus.service.Object):
def __init__(self):
bus = dbus.SessionBus(private = True, mainloop = DBusGMainLoop())
bus_name = dbus.service.BusName('de.test.app1', bus)
dbus.service.Object.__init__(self, bus_name, '/de/test/app1_obj_path')
@dbus.service.method("test.app1.interface",)
def is_ready(self):
return True
def publish_dbus():
loop = glib.MainLoop()
APP_Server()
loop.run()
if __name__ == '__main__':
gobject.threads_init()
dbus.glib.init_threads()
publish_dbus()
そして、次のコードでserver1.pyのdbusサービスにアクセスしたいと思います。これは、dbusサーバーとしても機能するserver2.pyという名前です。
#!/usr/bin/python2.7
import dbus.service
import dbus.glib
import glib
import dbus.mainloop
import gobject
from dbus.mainloop.glib import DBusGMainLoop
from threading import Thread
from time import sleep
class APP_Server(dbus.service.Object):
def __init__(self):
bus = dbus.SessionBus(private = True, mainloop = DBusGMainLoop())
bus_name = dbus.service.BusName('de.test.app3', bus)
dbus.service.Object.__init__(self, bus_name, '/de/test/app3_obj_path')
@dbus.service.method("test.app3.interface",)
def is_ready(self):
return True
def call_dbus():
bus_name = 'de.test.app1'
obj_path = '/de/test/app1_obj_path'
interface_name = 'test.app1.interface'
count = 1
while count < 1000:
proxy_bus = dbus.SessionBus(private = True)
obj = None
try:
obj = proxy_bus.get_object(bus_name, obj_path)
except:
sleep(1)
obj = proxy_bus.get_object(bus_name, obj_path)
ready = obj.get_dbus_method('is_ready', interface_name)
ready(pid_, bin_path)
count += 1
print count
def publish_dbus():
loop = glib.MainLoop()
APP_Server()
loop.run()
if __name__ == '__main__':
gobject.threads_init()
dbus.glib.init_threads()
th1 = Thread(target = publish_dbus)
th1.start()
th2 = Thread(target = call_dbus)
th2.start()
sleep(10000000)
次に、server2.py を実行した後、アプリケーションはスレッド「call_dbus」ですべての dbus 呼び出しを終了せずに終了します。
そして、次のコードを試してみると、server2.py のコードのみが次のように変更されました。
から:
proxy_bus = dbus.SessionBus(private = True)
に:
proxy_bus = dbus.SessionBus(private = True, mainloop = dbus.mainloop.NULL_MAIN_LOOP)
これで、d-bus デバッガーとして使用できるツール「d-feet」を使用してスレッド「callbus」が終了した後、dbus サーバーの準備ができているか、または dbus 接続が確立されているかどうかを確認するために、多くの接続が確立されます。
誰かがそれを正常に機能させるための提案をすることができれば??