1

HAR アーカイブを作成し、このアーカイブからデータを読み取る必要がある MR ジョブを実行する Oozie ワークフローを作成しました。1. アーカイブが作成されます。 2. ジョブが実行されると、マッパーは分散キャッシュ内のアーカイブを認識します。3.??? このアーカイブを読むにはどうすればよいですか? このアーカイブから行ごとにデータを読み取るための API は何ですか (私の har は複数の改行で区切られたテキスト ファイルのバッチです)。注意: DistirubtedCache に保存されている通常のファイル (HAR アーカイブではない) を操作すると、完全に機能します。HAR からデータを読み取ろうとしているときに問題が発生しました。

コード スニペットを次に示します。

    InputStream inputStream;
    String cachedDatafileName = System.getProperty(DIST_CACHE_FILE_NAME);
    LOG.info(String.format("Looking for[%s]=[%s] in DistributedCache",DIST_CACHE_FILE_NAME, cachedDatafileName));

    URI[] uris = DistributedCache.getCacheArchives(getContext().getConfiguration());
    URI uriToCachedDatafile = null;
    for(URI uri : uris){
        if(uri.toString().endsWith(cachedDatafileName)){
            uriToCachedDatafile = uri;
            break;
        }
    }
    if(uriToCachedDatafile == null){
        throw new RuntimeConfigurationException(String.format("Looking for[%s]=[%s] in DistributedCache failed. There is no such file",
                DIST_CACHE_FILE_NAME, cachedDatafileName));
    }

    Path pathToFile = new Path(uriToCachedDatafile);
    LOG.info(String.format("[%s] has been found. Uri is: [%s]. The path is:[%s]",cachedDatafileName, uriToCachedDatafile, pathToFile));

    FileSystem fileSystem =  pathToFile.getFileSystem(getContext().getConfiguration());
    HarFileSystem harFileSystem = new HarFileSystem(fileSystem);
    inputStream = harFileSystem.open(pathToFile); //NULL POINTER EXCEPTION IS HERE!
    return inputStream;
4

1 に答える 1

0
protected InputStream getInputStreamToDistCacheFile() throws IOException{
        InputStream inputStream;
        String cachedDatafileName = System.getProperty(DIST_CACHE_FILE_NAME);
        LOG.info(String.format("Looking for[%s]=[%s] in DistributedCache",DIST_CACHE_FILE_NAME, cachedDatafileName));

        URI[] uris = DistributedCache.getCacheArchives(getContext().getConfiguration());
        URI uriToCachedDatafile = null;
        for(URI uri : uris){
            if(uri.toString().endsWith(cachedDatafileName)){
                uriToCachedDatafile = uri;
                break;
            }
        }
        if(uriToCachedDatafile == null){
            throw new RuntimeConfigurationException(String.format("Looking for[%s]=[%s] in DistributedCache failed. There is no such file",
                    DIST_CACHE_FILE_NAME, cachedDatafileName));
        }

        //Path pathToFile = new Path(uriToCachedDatafile +"/stf/db_bts_stf.txt");
        Path pathToFile = new Path("har:///"+"home/ssa/devel/megalabs/kyc-solution/kyc-mrjob/target/test-classes/GSMCellSubscriberHomeIntersectionJobDescriptionClusterMRTest/in/gsm_cell_location_stf.har" +"/stf/db_bts_stf.txt");
        //Path pathToFile = new Path(("har://home/ssa/devel/megalabs/kyc-solution/kyc-mrjob/target/test-classes/GSMCellSubscriberHomeIntersectionJobDescriptionClusterMRTest/in/gsm_cell_location_stf.har"));

        LOG.info(String.format("[%s] has been found. Uri is: [%s]. The path is:[%s]",cachedDatafileName, uriToCachedDatafile, pathToFile));
        FileSystem harFileSystem = pathToFile.getFileSystem(context.getConfiguration());
        FSDataInputStream fin = harFileSystem.open(pathToFile);
        LOG.info("fin: " + fin);
//        FileSystem fileSystem =  pathToFile.getFileSystem(getContext().getConfiguration());
//        HarFileSystem harFileSystem = new HarFileSystem(fileSystem);
//        harFileSystem.exists(new Path("har://home/ssa/devel/mycompany/my-solution/my-mrjob/target/test-classes/HomeJobDescriptionClusterMRTest/in/locations.har"));
//        LOG.info("harFileSystem.exists(pathToFile):"+ harFileSystem.exists(pathToFile));
//        harFileSystem.initialize(uriToCachedDatafile, context.getConfiguration());



        FileStatus[] statuses = harFileSystem.listStatus(new Path("har:///"+"har://home/ssa/devel/mycompany/my-solution/my-mrjob/target/test-classes/HomeJobDescriptionClusterMRTest/in/locations.har"));
        for(FileStatus fileStatus : statuses){
            LOG.info("fileStatus isDir"+fileStatus.isDirectory() +" len:" + fileStatus.getLen());
        }

//        String tmpPathToFile = "har:///"+pathToFile.toString(); //+"/stf/db_bts_stf.txt";
//        Path tmpPath = new Path(tmpPathToFile);
//        LOG.info("KILL ME PATH TO FILE IN ARCHIVE: " +tmpPath);
//        inputStream = harFileSystem.open(tmpPath);
//        return inputStream;
        return fin;
    }

ご覧のとおり、ひどいです。アーカイブ内に保存されているインデックス ファイルを手動で読み取り、インデックス ファイルのメタデータを使用してパスを再構築しました。アーカイブに保存されているファイルの正確な名前がわかっている場合 (私の例のように)、パスを手動で作成できます。

それは便利ではありません.Zip->zipEntryのようなものを期待していました.アーカイブの構造を知らなくても、アーカイブのエントリを反復処理できます.

于 2013-03-13T06:40:50.020 に答える