1

3 つのノードに分散システムがあり、データはそれらのノード間で分散されています。たとえば、test.csv3 つのノードすべてに存在するファイルがあり、4 つの列が含まれています。

row   | id,  C1, C2,  C3
----------------------
row1  | A1 , c1 , c2 ,2
row2  | A1 , c1 , c2 ,1 
row3  | A1 , c11, c2 ,1 
row4  | A2 , c1 , c2 ,1 
row5  | A2 , c1 , c2 ,1 
row6  | A2 , c11, c2 ,1 
row7  | A2 , c11, c21,1 
row8  | A3 , c1 , c2 ,1
row9  | A3 , c1 , c2 ,2
row10 | A4 , c1 , c2 ,1

上記の結果セットを集計してみたいと思います。idc1c2、およびc3列ごとにデータセットを集計して、このように出力するにはどうすればよいですか?

row   | id,  C1, C2,  C3
----------------------
row1  | A1 , c1 , c2 ,3
row2  | A1 , c11, c2 ,1 
row3  | A2 , c1 , c2 ,2 
row4  | A2 , c11, c2 ,1 
row5  | A2 , c11, c21,1 
row6  | A3 , c1 , c2 ,3
row7  | A4 , c1 , c2 ,1

私は次のことを試しました:

from array import array 
from datetime import datetime 
import pyspark.sql 
from pyspark.sql import Row, SQLContext, StructField, StringType,  IntegerType

schema = StructType([
    StructField("id", StringType(), False),
    StructField("C1", StringType(), False), 
    StructField("C2", StringType(), False),
    StructField("C3", IntegerType(), False)])
base_rdd = sc.textFile("/home/hduser/spark-1.1.0/Data/test.tsv").map(lambda l: 

l.split(",")

rdd = base_rdd.map(lambda x: Row(id = x[0], C1 = x[1], C2 = x[2], C3 = int(x[3])))
sqlContext = SQLContext(sc)
srdd = sqlContext.inferSchema(rdd)
4

2 に答える 2

1

問題を解決するには、次の手順を実行します。Python の手順がわかりません。以下は Java の手順です。pythonに関連付けていただければ幸いです。

  1. csvファイルを読む

JavaRDD<String> input = sc.textFile(args[0]);

  1. ファイルからペアrddを作成します

    JavaPairRDD<Integer,String> pairMap = input.mapToPair( new PairFunction<String, Integer, String>() { @Override public Tuple2<Integer, String> call(String line) throws Exception { String[] s = line.split(","); String key = s[0]+'#'+s[1]+'#' +s[2];// id,c1,c2 Integer value = Integer.valueOf(s[3]) //c3
    return new Tuple2<Integer,String>(key, value); } });

  2. キーでマップを縮小する

JavaPairRDD<String,Integer> result = pairMap.reduceByKey( new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } });

  1. resultid+'#'+c1+'#'+c2オブジェクトには、キーがあり、値が集計されている期待される結果が含まれていますc3。このマップは、必要に応じてさらに使用できます。キーをトークン化し#て列を取得し、apache-spark-sql を使用してテーブルに挿入できます。

これが役立つことを願っています。

于 2015-06-16T05:20:13.010 に答える