1

ストレージ方法としてカーボンとセレスを使用して、グラファイトの監視を行っています。不良データの修正に問題があります。(さまざまな問題により)ファイルが重複してしまったようです。つまり、Carbon / Ceres はデータを timestamp@interval.slice として保存するため、時間範囲が重複する 2 つ以上のファイルを持つことができます。

オーバーラップには次の 2 種類があります。

File A:  +------------+        orig file
File B:      +-----+           subset
File C:          +---------+   overlap

これは、利用可能な既存のツール (ceres-maintenance デフラグおよびロールアップ) がこれらの重複に対処できないため、問題を引き起こしています。代わりに、ディレクトリをスキップして先に進みます。これは明らかに問題です。

4

1 に答える 1

0

次のように、この問題を修正するスクリプトを作成しました。

  1. サブセットの場合は、サブセット ファイルを削除するだけです。

  2. オーバーラップの場合、次のファイルが開始するポイントで orig ファイルに対してファイル システム「truncate」を使用します。オーバーラップ ファイルの先頭を切り取って適切に名前を変更することは可能ですが、これには危険が伴うことをお勧めします。

これは、次の 2 つの方法で実行できることがわかりました。

  1. ディレクトリをたどり、ファイルを繰り返し処理し、修正しながらファイルのサブセットを見つけて削除します。

  2. 先に進む前に、dirs をウォークし、dirs 内のすべての問題を修正します。dir ウォークは非常に時間がかかるため、これははるかに高速なアプローチです。

コード:

#!/usr/bin/env python2.6
################################################################################

import io
import os
import time
import sys
import string
import logging
import unittest
import datetime
import random
import zmq
import json
import socket
import traceback
import signal
import select
import simplejson
import cPickle as pickle
import re
import shutil
import collections
from pymongo import Connection
from optparse import OptionParser
from pprint import pprint, pformat

################################################################################

class SliceFile(object):
    def __init__(self, fname):
        self.name       = fname
        basename        = fname.split('/')[-1]
        fnArray         = basename.split('@')
        self.timeStart  = int(fnArray[0])
        self.freq       = int(fnArray[1].split('.')[0])
        self.size       = None
        self.numPoints  = None
        self.timeEnd    = None
        self.deleted    = False

    def __repr__(self):
        out = "Name: %s, tstart=%s tEnd=%s, freq=%s, size=%s, npoints=%s." % (
            self.name, self.timeStart, self.timeEnd, self.freq, self.size, self.numPoints)
        return out

    def setVars(self):
        self.size       = os.path.getsize(self.name)
        self.numPoints  = int(self.size / 8)
        self.timeEnd    = self.timeStart + (self.numPoints * self.freq)

################################################################################

class CeresOverlapFixup(object):

    def __del__(self):
        import datetime
        self.writeLog("Ending at %s" % (str(datetime.datetime.today())))
        self.LOGFILE.flush()
        self.LOGFILE.close()

    def __init__(self):
        self.verbose            = False
        self.debug              = False
        self.LOGFILE            = open("ceresOverlapFixup.log", "a")
        self.badFilesList       = set()
        self.truncated          = 0
        self.subsets            = 0
        self.dirsExamined       = 0            
        self.lastStatusTime     = 0

    def getOptionParser(self):
        return OptionParser()

    def getOptions(self):
        parser = self.getOptionParser()
        parser.add_option("-d", "--debug",      action="store_true",                 dest="debug",   default=False, help="debug mode for this program, writes debug messages to logfile." )
        parser.add_option("-v", "--verbose",    action="store_true",                 dest="verbose", default=False, help="verbose mode for this program, prints a lot to stdout." )
        parser.add_option("-b", "--basedir",    action="store",      type="string",  dest="basedir", default=None,  help="base directory location to start converting." )
        (options, args)     = parser.parse_args()
        self.debug          = options.debug
        self.verbose        = options.verbose
        self.basedir        = options.basedir
        assert self.basedir, "must provide base directory."

    # Examples:
    # ./updateOperations/1346805360@60.slice
    # ./updateOperations/1349556660@60.slice
    # ./updateOperations/1346798040@60.slice

    def getFileData(self, inFilename):
        ret = SliceFile(inFilename)
        ret.setVars()
        return ret

    def removeFile(self, inFilename):
        os.remove(inFilename)
        #self.writeLog("removing file: %s" % (inFilename))
        self.subsets += 1

    def truncateFile(self, fname, newSize):
        if self.verbose:
            self.writeLog("Truncating file, name=%s, newsize=%s" % (pformat(fname), pformat(newSize)))
        IFD = None
        try:
            IFD = os.open(fname, os.O_RDWR|os.O_CREAT)
            os.ftruncate(IFD, newSize)
            os.close(IFD)
            self.truncated += 1
        except:
            self.writeLog("Exception during truncate: %s" % (traceback.format_exc()))
        try:
            os.close(IFD)
        except:
            pass
        return

    def printStatus(self):
        now = self.getNowTime()
        if ((now - self.lastStatusTime) > 10):
            self.writeLog("Status: time=%d, Walked %s dirs, subsetFilesRemoved=%s, truncated %s files." % (now, self.dirsExamined, self.subsets, self.truncated))
            self.lastStatusTime = now

    def fixupThisDir(self, inPath, inFiles):

        # self.writeLog("Fixing files in dir: %s" % (inPath))
        if not '.ceres-node' in inFiles:
            # self.writeLog("--> Not a slice directory, skipping.")
            return

        self.dirsExamined += 1            

        sortedFiles = sorted(inFiles)
        sortedFiles = [x for x in sortedFiles if ((x != '.ceres-node') and (x.count('@') > 0)) ]
        lastFile    = None
        fileObjList = []
        for thisFile in sortedFiles:
            wholeFilename = os.path.join(inPath, thisFile)
            try:
                curFile = self.getFileData(wholeFilename)
                fileObjList.append(curFile)
            except:
                self.badFilesList.add(wholeFilename)
                self.writeLog("ERROR: file %s, %s" % (wholeFilename, traceback.format_exc()))

        # name is timeStart, really.
        fileObjList = sorted(fileObjList, key=lambda thisObj: thisObj.name)

        while fileObjList:

            self.printStatus()

            changes = False
            firstFile = fileObjList[0]
            removedFiles = []
            for curFile in fileObjList[1:]:
                if (curFile.timeEnd <= firstFile.timeEnd):
                    # have subset file. elim.
                    self.removeFile(curFile.name)
                    removedFiles.append(curFile.name)
                    self.subsets += 1
                    changes = True
                    if self.verbose:
                        self.writeLog("Subset file situation.  First=%s, overlap=%s" % (firstFile, curFile))
            fileObjList = [x for x in fileObjList if x.name not in removedFiles]
            if (len(fileObjList) < 2):
                break
            secondFile = fileObjList[1]

            # LT is right.  FirstFile's timeEnd is always the first open time after first is done.
            # so, first starts@100, len=2, end=102, positions used=100,101. second start@102 == OK.
            if (secondFile.timeStart < firstFile.timeEnd):
                # truncate first file.
                # file_A (last):    +---------+
                # file_B (curr):         +----------+ 
                # solve by truncating previous file at startpoint of current file.
                newLenFile_A_seconds = int(secondFile.timeStart - firstFile.timeStart)
                newFile_A_datapoints = int(newLenFile_A_seconds / firstFile.freq)
                newFile_A_bytes      = int(newFile_A_datapoints) * 8
                if (not newFile_A_bytes):
                    fileObjList = fileObjList[1:]
                    continue
                assert newFile_A_bytes, "Must have size.  newLenFile_A_seconds=%s, newFile_A_datapoints=%s, newFile_A_bytes=%s." % (newLenFile_A_seconds, newFile_A_datapoints, newFile_A_bytes)
                self.truncateFile(firstFile.name, newFile_A_bytes)
                if self.verbose:
                    self.writeLog("Truncate situation.  First=%s, overlap=%s" % (firstFile, secondFile))
                self.truncated += 1
                fileObjList = fileObjList[1:]
                changes = True

            if not changes:
                fileObjList = fileObjList[1:]


    def getNowTime(self):
        return time.time()


    def walkDirStructure(self):

        startTime           = self.getNowTime()
        self.lastStatusTime = startTime
        updateStatsDict     = {}
        self.okayFiles      = 0
        emptyFiles          = 0 

        for (thisPath, theseDirs, theseFiles) in os.walk(self.basedir):
            self.printStatus()
            self.fixupThisDir(thisPath, theseFiles)
            self.dirsExamined += 1

        endTime = time.time()
        # time.sleep(11)
        self.printStatus()
        self.writeLog( "now = %s, started at %s, elapsed time = %s seconds." % (startTime, endTime, endTime - startTime))
        self.writeLog( "Done.")


    def writeLog(self, instring):
        print instring
        print >> self.LOGFILE, instring
        self.LOGFILE.flush()

    def main(self):
        self.getOptions()
        self.walkDirStructure()
于 2012-11-12T18:53:58.940 に答える