1

私たちの機器で生成された膨大な量の .CSV ファイルをソートしようとしていますが、ソート部分で在庫がありました。すべてのファイルは 30 以上の列で構成され、無限の数の行を持つことができます。実装する必要があるのは、複数の行の複数の列で同時に発生するイベントをチェックする方法です。たとえば、結果のいずれかを確認する必要があります。

  • 列「Test_Res_1」の値は、15 個の連続した睾丸に対して 12 未満でした
  • 列「Test_Res_2」の値は、10 個の連続した睾丸に対して 5 未満でした
  • 列「Test_Div」の値は、連続する 20 個の精巣で 15 未満でした
  • 列「Test_time」の値は、10 個の連続した睾丸に対して 60 未満でした
  • .........いくつかの連続したテストのためのいくつかの他の条件....

次に、1 つまたは複数の条件が満たされた場合、そのファイルの名前を .txt ファイルに書き込みます。このフォーラムのユーザーから提案されたコードを実装しましたが、スクリプトは正常に動作します。しかし、別の条件をチェックするたびにチェックを実行するブロックをコピーするだけです。そのコードを実装し、現在持っている巨大なスクリプトを削減するためのより良い方法があると確信しています。

ファイルの例を次に示します。 ここに画像の説明を入力

そのフォーラムで見つけたいくつかの提案を試しましたが、どれもうまくいきませんでした。それらのいくつかは 1 つの条件で動作しましたが、前述のようにいくつかの条件を確認する必要があります。条件が満たされた場合にファイルを開いて.txtに保存する方法は知っていますが、複数の列と行で複数の条件を確認する方法がわかりません。1 行をチェックするのは簡単ですが、複数の行をチェックするのは大変です。

import os, os.path, zipfile, csv, datetime
import smtplib, os
f = open("test.txt", "w")
flagtotal=[]
path="datafiles/"  # insert the path to the directory of interest
dirList=os.listdir(path)
for filename in dirList:
    if filename.endswith((".csv")):       
        file=os.path.splitext(filename)
        reader = csv.reader(open(filename))
        # I GOT STOCK HERE!!!! Although the code seems to work just fine. I create a completely  new instance for reader every time I want to add new condition. reader.next() # skip header row
    GROUP_SIZE = 5
    THRESHOLD = 0.5
    cond_deque = deque(maxlen=GROUP_SIZE) # *maxlen* requires Python version 2.6+        
    linenum = 0
    while len(cond_deque) < GROUP_SIZE-1:
        try:
            row = reader.next()
            linenum += 1
            col0, col1, col4, col5, col6, col23, col24, col25 = (
                float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
            cond_deque.append(col1 < THRESHOLD)
        except StopIteration:
            print 'less that {} rows of data in file'.format(GROUP_SIZE)
            break
    # then process any remaining lines
    for row in reader:
        col0, col1, col4, col5, col6, col23, col24, col25 = (
            float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
        linenum += 1
        cond_deque.append(col1 < THRESHOLD)
        if cond_deque.count(True) == GROUP_SIZE:
            str1 = 'Condition 1 in cycles {}-{} had {} consecutive cycles  < {}'.format(
                linenum-GROUP_SIZE+1, linenum, GROUP_SIZE, THRESHOLD)
            #print str1
            flag.append(str1)
            break  # stop looking

    #checking for the second condition
    reader = csv.reader(open('processed_data/'+filename))
    reader.next()        
    GROUP_SIZE = 2
    THRESHOLD = 20
    cond_deque = deque(maxlen=GROUP_SIZE) # *maxlen* requires Python version 2.6+        
    linenum = 0
    while len(cond_deque) < GROUP_SIZE-1:
        try:
            row = reader.next()
            linenum += 1
            col0, col1, col4, col5, col6, col23, col24, col25 = (
                float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
            cond_deque.append(col1 < THRESHOLD)
        except StopIteration:
            #print 'less that {} rows of data in file'.format(GROUP_SIZE)
            break
    # then process any remaining lines
    for row in reader:
        col0, col1, col4, col5, col6, col23, col24, col25 = (
            float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
        linenum += 1
        cond_deque.append(col5 < THRESHOLD/60)
        if cond_deque.count(True) == GROUP_SIZE:
            str1 = 'Condition 2 {}-{} had {} consecutive cycles  < {} minutes'.format(
                linenum-GROUP_SIZE+1, linenum, GROUP_SIZE, THRESHOLD)
            #print str1
            flag.append(str1)
            break  # stop looking
today = datetime.date.today()
datestring='Date of testing: '+today.strftime('%m/%d/%Y')
if len(flagtotal)>0:
    flagtotal.insert(0,datestring)
    flagtotal.insert(1,'The following files met the criteria.\n--------------------------------------------')
    f.write("\n".join(map(lambda x: str(x), flagtotal)))
f.close()
4

2 に答える 2

1

以下は、あなたが望むことを行う方法を示していると思います。collections.dequeクラスの使用に基づいています。これは基本的に、他の質問に対する私の回答のロジックの一般化されたバージョンです。これは、基準からのすべてのデータと関連する処理をアプリケーション中心のクラスにカプセル化することによって行われました。

その結果、かなりの量のコードが必要になりますが、試みていたアプローチよりもコンパクトで高速になる可能性があります。各ファイルを 1 回だけ読み取り、その 1 回のパスで可変数の条件のいずれかが満たされるかどうかをチェックします。

import csv
from collections import deque
import datetime
from glob import iglob
import os

class Criterion(object):
    """ represents one of the criteria to be checked.
        in_a_row is the number of consecutive rows in which the expression,
        given as a string, evaluated to True """
    def __init__(self, in_a_row, expression):
        self.in_a_row = in_a_row
        self.expression = expression
        self.bytecode = compile(expression, '<string>', 'eval')
        self.deque = deque(maxlen=in_a_row)
    def eval_and_check(self, local_vars):
        """ evaluate expression in context of local variables, append result
            to deque, and return whether 'in_a_row' criterion was satisfied """
        self.deque.append(eval(self.bytecode, globals(), local_vars))
        return self.deque.count(True) == self.in_a_row
    def reset(self):
        self.deque.clear()
    def format_match(self, filename, linenum):
        return 'lines {}-{} in {} had {} consecutive rows with "{}"'.format(
            linenum-self.in_a_row+1, linenum, filename, self.in_a_row,
            self.expression)

criteria = [Criterion(5, 'Test_Res_2 < 40'),
            Criterion(3, '13 <= Test_Res_4 <= 15'), ]
flagtotal = []
datapath = "datafiles"  # directory path to location of csv files

for filename in iglob(os.path.join(datapath, '*.csv')):
    with open(filename) as csvfile:
        reader = csv.reader(csvfile, skipinitialspace=True)
        reader.next() # skip over initial fieldnames row
        for criterion in criteria:  # initialize all before processing file
            criterion.reset()
        condition_satisfied = False
        for linenum, row in enumerate(reader, start=1):
            # define local vars for use in criterion expression evaluation
            (Test_num, Test_Res_1, Test_Res_2, Test_Res_3, Test_Res_4, 
             Test_div, Test_time) = [int(row[0])] + map(float, row[1:])
            for criterion in criteria:
                if criterion.eval_and_check(locals()):
                    #print criterion.format_match(filename, linenum)
                    flagtotal.append(os.path.basename(filename))
                    condition_satisfied = True
                    break  # quit criterion checking for this row
            if condition_satisfied:
                break  # quit processing rows of this csv file

with open('test.txt', 'w') as f:
    f.write('Date of testing: {}\n'.format(
            datetime.date.today().strftime('%m/%d/%Y')) +
            'The following files met the criteria:\n'
            '-------------------------------------\n')
    if flagtotal:
        print('\n'.join(flagtotal))
        f.write('\n'.join(flagtotal) + '\n')
    else:
        print('no files met the criteria')
        f.write('no files met the criteria\n')
于 2013-07-16T10:05:34.623 に答える
0

モジュールはわかりませんがcsv、列の辞書を取得できると仮定しましょう。次に、これを実行して、1 つの列で連続したエントリを見つけることができます。

import itertools

# in column "Test_Res_1" had values less than 12 for for 15 consecutive tests
col = reader["Test_Res_1"] # get the column as a list

# Find a count of the consecutive values < 12
consec_lt_12 = [len(list(cnt)) for val,cnt in itertools.groupby(col, lambda x: x < 12) if val]

# Check if the maximum is >= 15
if (max(consec_lt_12) >= 15):
   # ok! found it

それが機能する場合は、必要な列と値に対してそれを繰り返し、好きなように連鎖させます(たとえば、列Aと列B、または列Aまたは列Bで値を見つける必要があります?等。)

于 2013-07-15T15:05:48.353 に答える