0

次のコードで dbus サーバーを作成し、server1.py という名前を付けました。

#!/usr/bin/python2.7
import dbus.service
import dbus.glib
import glib
import gobject
from dbus.mainloop.glib import DBusGMainLoop


class APP_Server(dbus.service.Object):
    def __init__(self):
        bus = dbus.SessionBus(private = True, mainloop = DBusGMainLoop())
        bus_name = dbus.service.BusName('de.test.app1', bus)
        dbus.service.Object.__init__(self, bus_name, '/de/test/app1_obj_path')

    @dbus.service.method("test.app1.interface",)
    def is_ready(self):           
        return True

def publish_dbus():
    loop = glib.MainLoop()
    APP_Server()
    loop.run()

if __name__ == '__main__': 
    gobject.threads_init()
    dbus.glib.init_threads()
    publish_dbus()

そして、次のコードでserver1.pyのdbusサービスにアクセスしたいと思います。これは、dbusサーバーとしても機能するserver2.pyという名前です。

#!/usr/bin/python2.7
import dbus.service
import dbus.glib
import glib
import dbus.mainloop
import gobject
from dbus.mainloop.glib import DBusGMainLoop
from threading import Thread
from time import sleep


class APP_Server(dbus.service.Object):
    def __init__(self):
        bus = dbus.SessionBus(private = True, mainloop = DBusGMainLoop())
        bus_name = dbus.service.BusName('de.test.app3', bus)
        dbus.service.Object.__init__(self, bus_name, '/de/test/app3_obj_path')

    @dbus.service.method("test.app3.interface",)
    def is_ready(self):           
        return True

def call_dbus():
    bus_name = 'de.test.app1'
    obj_path = '/de/test/app1_obj_path'
    interface_name = 'test.app1.interface'
    count = 1
    while count < 1000:
        proxy_bus = dbus.SessionBus(private = True)
        obj = None
        try:
            obj = proxy_bus.get_object(bus_name, obj_path)
        except:
            sleep(1)
            obj = proxy_bus.get_object(bus_name, obj_path)
        ready = obj.get_dbus_method('is_ready', interface_name)
        ready(pid_, bin_path)
        count += 1
        print count 

def publish_dbus():
    loop = glib.MainLoop()
    APP_Server()
    loop.run()

if __name__ == '__main__':
    gobject.threads_init()
    dbus.glib.init_threads()
    th1 = Thread(target = publish_dbus)
    th1.start()
    th2 = Thread(target = call_dbus)
    th2.start()
    sleep(10000000)

次に、server2.py を実行した後、アプリケーションはスレッド「call_dbus」ですべての dbus 呼び出しを終了せずに終了します。

そして、次のコードを試してみると、server2.py のコードのみが次のように変更されました。

から:

 proxy_bus = dbus.SessionBus(private = True)

に:

  proxy_bus = dbus.SessionBus(private = True, mainloop = dbus.mainloop.NULL_MAIN_LOOP)

これで、d-bus デバッガーとして使用できるツール「d-​​feet」を使用してスレッド「callbus」が終了した後、dbus サーバーの準備ができているか、または dbus 接続が確立されているかどうかを確認するために、多くの接続が確立されます。

誰かがそれを正常に機能させるための提案をすることができれば??

4

1 に答える 1