0

私はpythonが初めてです。

私は試していHbase thrift client using thriftます。の最新バージョンで動作するように変更しただけですthriftが、コードを実行すると終了するだけで、スレッドは開始されません。

これがコードです。

import json, traceback, sys, datetime, time, logging, threading, random
import logging.handlers

import thrift
sys.path.append('gen-py')


from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol
from hbase import THBaseService



gWritenItems = 0
gStartT = 0
gEndT = 0

recordsPerBatch = 300 #reports per client per day
columns = 3

#config
concurrent = 10
records = 60000#6000000 #6 million
bytesPerRecord = 1024



mylock = threading.RLock()
class writeThread(threading.Thread):
    def __init__(self, threadname, RecordsThreadwillwrite):
        threading.Thread.__init__(self, name = threadname)
        bytesPerColumn = int(bytesPerRecord/columns) - 11 #suppose 3 columns

        self.columnvalue = "value_" + "x"*bytesPerColumn + "_endv"
        self.tbwBatch = int (RecordsThreadwillwrite / recordsPerBatch)

        self.transport = TBufferedTransport(TSocket('pnq-adongrevm1', 5151), 40960)
        self.transport.open()
        protocol = TBinaryProtocol.TBinaryProtocol(self.transport)

        self.client = THBaseService.Client(protocol)
        self.table = "example"

    def run(self):
        print "+%s start" % (self.getName())
        global gEndT
        global gWritenItems           

        threadWritenItem = 0
        for loopidx in xrange(0, self.tbwBatch):            
            self.write_hbase() #write                                           
            threadWritenItem += recordsPerBatch   

        mylock.acquire()
        gEndT = time.time()  
        gWritenItems += threadWritenItem
        print "%s done, %s seconds past, %d reocrds saved" % (self.getName(), gEndT-gStartT, gWritenItems)
        mylock.release()
        self.transport.close()                   

    def write_hbase(self): #write 50 rowkyes, and  3 column families in each rowkey
        print self.getName(), "Start write"
        batchmutations = []
        for i in xrange(0, recordsPerBatch): # write to db, 300 items together
            mutations = []
            rowkey = "RK_%s_%s" % (random.random(), time.time())       
            for ii in xrange(0, columns):
                mutations.append(THBaseService.TPut(row=rowkey, columnValues=[TColumnValue(family="f1", qualifier="%s"%ii, value=self.columnvalue)]))
        self.client.putMultiple(self.table,mutations)        



itemsPerThread = int(records / concurrent)
for threadid in xrange(0, concurrent):    
    gStartT = time.time()
    t = writeThread("Thread_%s" % threadid, itemsPerThread)
    t.start();
print "%d thread created, each thread will write %d records" % (concurrent, itemsPerThread)

メッセージが届くだけ10 thread created, each thread will write 6000 records

4

1 に答える 1

2

うん、これは、スレッドがジョブを完了するのを待っていないためです。そのため、メイン スレッドはそのまま終了します。これを試して:

itemsPerThread = int(records / concurrent)
threads = []
for threadid in xrange(0, concurrent):    
    gStartT = time.time()
    t = writeThread("Thread_%s" % threadid, itemsPerThread)
    t.start();
    threads.append(t)

# wait until all finish the job
for t in threads:
    t.join()

編集ハ、スレッドをデーモンとしてマークしなかったので、私はここにいるとは思いません。参加しなくても動作するはずです。しかし、このコードを見てください:

class CustomThread(threading.Thread):
    def run(self):
        print "test"

for x in xrange(0, 10):
    t = CustomThread()
    t.start()

print "test"何があっても必ず一線に届きます。print "+%s start" % (self.getName())したがって、コードでは、何があっても常に到達する必要があります。うまくいきませんか?:)

そうでない場合は、次の 2 つの可能性しかありません。

  1. メソッドにブロック操作または例外があります__init__。しかし、それでは最終版に到達しません。
  2. concurrentvariable には0何らかの理由があります (これは最終的な出力と一致しません)。
于 2013-03-29T14:20:58.117 に答える