2

編集 2

RDD を 8 つのパーティションに再分割することで、問題を間接的に解決しました。avro オブジェクトが「Java シリアライズ可能」ではないという障害にぶつかり、avro シリアライゼーションを kryo に委譲するためのスニペットが見つかりました。元の問題はまだ残っています。

編集 1: map 関数のローカル変数参照を削除

io/schema に parquet と avro を使用して、spark で計算負荷の高いジョブを実行するためのドライバーを作成しています。Spark にすべてのコアを使用させることができないようです。私は何を間違っていますか?キーを null に設定したからですか?

Hadoop がファイルを整理する方法について頭を悩ませています。私のファイルにはギガバイトの生データがあるので、デフォルトのブロックとページのサイズで物事が並列化されることを期待する必要があります。

処理のために入力を ETL する関数は次のようになります。

def genForum {
    class MyWriter extends AvroParquetWriter[Topic](new Path("posts.parq"), Topic.getClassSchema) {
      override def write(t: Topic) {
        synchronized {
          super.write(t)
        }
      }
    }

    def makeTopic(x: ForumTopic): Topic = {
      // Ommited to save space
    }

    val writer = new MyWriter

    val q =
      DBCrawler.db.withSession {
        Query(ForumTopics).filter(x => x.crawlState === TopicCrawlState.Done).list()
      }

    val sz = q.size
    val c = new AtomicInteger(0)

    q.par.foreach {
      x =>
        writer.write(makeTopic(x))
        val count = c.incrementAndGet()
        print(f"\r${count.toFloat * 100 / sz}%4.2f%%")
    }
    writer.close()
  }

そして私の変換は次のようになります:

def sparkNLPTransformation() {
    val sc = new SparkContext("local[8]", "forumAddNlp")

    // io configuration
    val job = new Job()
    ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[Topic]])
    ParquetOutputFormat.setWriteSupportClass(job,classOf[AvroWriteSupport])
    AvroParquetOutputFormat.setSchema(job, Topic.getClassSchema)


    // configure annotator
    val props = new Properties()
    props.put("annotators", "tokenize,ssplit,pos,lemma,parse")
    val an = DAnnotator(props)


    // annotator function
    def annotatePosts(ann : DAnnotator, top : Topic) : Topic = {
      val new_p = top.getPosts.map{ x=>
        val at = new Annotation(x.getPostText.toString)
        ann.annotator.annotate(at)
        val t = at.get(classOf[SentencesAnnotation]).map(_.get(classOf[TreeAnnotation])).toList

        val r = SpecificData.get().deepCopy[Post](x.getSchema,x)
        if(t.nonEmpty) r.setTrees(t)
        r
      }
      val new_t = SpecificData.get().deepCopy[Topic](top.getSchema,top)
      new_t.setPosts(new_p)
      new_t
    }

    // transformation
    val ds = sc.newAPIHadoopFile("forum_dataset.parq", classOf[ParquetInputFormat[Topic]], classOf[Void], classOf[Topic], job.getConfiguration)
    val new_ds = ds.map(x=> ( null, annotatePosts(x._2) ) )

    new_ds.saveAsNewAPIHadoopFile("annotated_posts.parq",
      classOf[Void],
      classOf[Topic],
      classOf[ParquetOutputFormat[Topic]],
      job.getConfiguration
    )
  }
4

1 に答える 1

0

データが実際に HDFS の複数のブロックにあることを確認できますか? forum_dataset.parq ファイルの合計ブロック数

于 2014-02-02T17:27:38.397 に答える