0

pymqi を使用して IBM MQ にメッセージを送信しようとしているときに問題が発生しました。私が取り組んでいるプロジェクト

  • DB2 からの読み取り
  • のデータを準備しますWorkerProcess
  • WorkerProcessそのデータに基づいて何らかの決定を行い、MQ にメッセージを送信します

WorkerProcessMQ のメッセージをコミットしようとすると、次のエラーが表示されます。

Stacktrace:
    worker_process.py:
        self.mq.commit()
    mq_service.py:
        self.qmgr.commit()
    pymqi/__init__.py line 1689, in commit
        raise MQMIError (rv[0], rv[1])
pymqi.MQMIError: MQI Error. Comp: 2, Reason 2012: FAILED: MQRC_ENVIRONMENT_ERROR

コードとスタック トレースは手で入力されているため、タイプミスが含まれている可能性があります。

以下のコードは、私がやっていることの疑似コードです。

どんな助けやアドバイスも大歓迎です。

main.py:

c_processors = []
for i in range(num_of_proccessors):
    p = WorkerProcess()
    p.start()
    c_processors.append(p)

for p in c_processors:
    p.join()

worker_process.py

class WorkerProcess(Process):
    def __init__(self):
        Process.__init__(self)
        self.mq = MQService()

    def run(self):
        self.mq.send_message('test')
        self.mq.commit()
        self.mq.close_connection()

mq_service.py

class MQService():
    
    def __init__(self):
        self.connect()
        self.pmd = pymqi.MD()
        self.pmd.Format = pymqi.CMQC.MQFMT_STRING
        self.pmo = pymqi.PMO(Options=pymqi.CMQC.MQPMO_FAIL_IF_QUIESCING)
        self.pmo.Options |= pymqi.CMQC.MQGMO_SYNCPOINT

    def connect(self):
        self._connect_to_qmgr(manager,chanel,host,port,user,password) #these arguments are retrieved from config
        self._q = pymqi.Queue(self.qmgr, queue_name) 

    def _connect_to_qmgr(self,manager,chanel,host,port,user,password):
        self.qmgr = pymqi.QueueManager(None)
        _cd = pymqi.CD()
        _cd.ChannelName = channel.encode('utf-8')
        _cd.ConnectionName = f"{host} ({port})".encode('utf-8')
        _cd.ChannelType = pymqi.CMQC.MQCHT_CLNTCONN
        _cd.TransportType = pymqi.CMQC.MQXPT_TCP
        _connect_options = pymqi.CMQC.MQCNO_HANDLE_SHARE_BLOCK

        _qmgr.connect_with_options(manager, cd=_cd, opts=_connect_options, user=user, password=password)

    def send_message(self, message):
        self._q.put(message, self.pmd, self.pmo)

    def commit(self):
        self.qmgr.commit()
    
    def rollback(self):
        self.qmgr.backout()

    def close_connection(self):
        self.qmgr.disconnect()

編集: 追加情報:

  • IBM MQ クライアント バージョン 9.1.0.1 を実行しています。

  • AMQERR0*.LOGファイルにエラーはありません。

  • LD_LIBRARY_PATH が設定されている

  • このエラーは、コードのリファクタリング中に表示されました。

以下は、機能しているコードです(リファクタリング前)

関数シグネチャの一部の引数は、簡潔さと読みやすさのために(args ) に置き換えられます*

main.py:


def connect_to_mq():
    return MQService(args*)  # these arguments are read from Config file


def process_chunk(args*): 
    _mq = connect_to_mq()
    _mq.send_message('test')
    _mq.commit()
    _mq.close_connection()

c_processors = []
for i in range(num_of_proccessors):
    p = Process(target=process_chunk, args=(args*))
    p.start()
    c_processors.append(p)

for p in c_processors:
    p.join()

mq_service.py

class MQService():
    
    def __init__(self, args*):
        self.pmd = pymqi.MD()
        self.pmd.Format = pymqi.CMQC.MQFMT_STRING
        self.pmo = pymqi.PMO(Options=pymqi.CMQC.MQPMO_FAIL_IF_QUIESCING)
        self.pmo.Options |= pymqi.CMQC.MQGMO_SYNCPOINT
        self.connect_to_qmgr(args*)
        self.connect_to_queue(args*)


    def _connect_to_qmgr(self,manager,chanel,host,port,user,password):
        self.qmgr = pymqi.connect(manager, 
                                  chanel, 
                                  "%s (%s)" % (host, port), 
                                  user=user, 
                                  password=password)
   
    def connect_to_queue(q_name):
        self._q = pymqi.Queue(self.qmgr, q_name)

 
    def send_message(self, message):
        self._q.put(message, self.pmd, self.pmo)

    def commit(self):
        self.qmgr.commit()
    
    def rollback(self):
        self.qmgr.backout()

    def close_connection(self):
        self.qmgr.disconnect()
4

0 に答える 0