問題タブ [databricks]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - spark: schema change -- 存在する場合、列のデータフレームを変換およびフィルター処理します。そうでない場合はしないでください
私が扱っているデータにはスキーマの変更がありました。古いデータと新しいデータを組み合わせた結果のdataFrameの場合、変換およびフィルタリングしたい列は、古いデータには存在しませんでした。「null」は入力されません。可能な限り、その列を変換してフィルター処理したいと考えています。そのような列のない以前のデータについては、すべての行を保持します。
java.lang.NullPointerException
問題は、以前のデータに「ip」列がないため、次のコードの結果が になることです。
上記の「firstIp」関数は、配列から最初の IP アドレスを取得する単純な udf です。によって定義されval firstIp = udf[String, String](_.split(",")(0))
ます。スキーマごとにデータを 2 つの部分 ("ip" 列があるものとないもの) に分割したくありません... しかし、データをそのように分割しなくても目的を達成できますか?
scala - Sparkデータフレームでネストされた構造体を分解する
私は Databricks の例に取り組んでいます。データフレームのスキーマは次のようになります。
この例では、従業員列を 4 つの追加列に分解する方法を示しています。
部門列で同様のことを行うにはどうすればよいですか (つまり、「id」と「name」というデータフレームに 2 つの列を追加します)。メソッドはまったく同じではなく、以下を使用して新しいデータ フレームを作成する方法しかわかりません。
私が試してみると:
警告とエラーが表示されます:
scala - ライブラリは intellij 内の sbt で解決されませんが、コマンドラインから解決およびコンパイルされます
以下の sbt ファイルは、コマンド ラインで正常に動作する intelliJ Idea 内から spark-xml databricks パッケージを解決しません。
sbt は、ローカルにインストールされた sbt を指すバンドルとその他の両方に設定されていましたが、どちらの方法でも機能しませんでした。
以下のパッケージは解決され、コマンドラインから完全に機能します
apache-spark - スパークは単一のワーカーで大きなファイルを処理しますか
Apache Spark で大きなファイルを処理する場合、たとえば、.executor
sc.textFile("somefile.xml")
間で並列処理するためにファイルを分割しますか?それとも、単一のexecutor で単一のチャンクとして処理しますか?データフレームを使用する場合、Databricks
implicit XMLContext
からこのような大規模なデータセット処理用に事前に構築された最適化はありますか?
scala - Spark RDD をテキストファイルとして S3 バケットに書き込む
Spark RDD を gzip されたテキスト ファイル (または複数のテキスト ファイル) として S3 バケットに保存しようとしています。S3 バケットは dbfs にマウントされます。次を使用してファイルを保存しようとしています。
しかし、これを試してみると、エラーが発生し続けます:
ただし、S3 バケットにいくつかのファイルが書き込まれています。ここrddDataset.repartition(1).saveAsTextFile("/mnt/mymount/myfolder/")
でアドバイスされているように、も使用してみましたが、これは同じエラーで終了しました。
これはこの質問に似ているように見えるので、RDD の null 値が原因でエラーが発生している可能性がありますか? しかし、val newRDD = rddDataset.map(line => line).filter(x => x!= null).filter(x => x!=" ").filter(x => x!="")
この RDD を保存しようとすると、同じエラーが発生します。
さらに、rddDataset.count()
同様のエラーがスローされます。データフレームから rddDataset を作成しています。これにより、すべての行が正常に表示されます。ただし、java.lang.NullPointerException
元のデータフレームを RDD に変換すると、次のように再現できます。
以下のスタック トレースの 1 つを提供しました。
また、実行後にステージの情報タブを開くとrddDataset.repartition(200).saveAsTextFile(/mnt/mymount/myfolder)
、エラーの詳細が表示されます。
scala - Apache Spark で DateType の null 値を処理する
Apache Spark で日付列の null 値を処理できません。nullを空の文字列とnull値に置き換えてみました。
私が得ているエラーは
java.text.ParseException: Unparseable date: ""
私はcsv-spark、databricks:spark-redshift_2.11 2.0.1を使用しています
コード
var originalDataFrame = sqlContext.load(
"com.databricks.spark.csv",
schema = sourceSchema,
Map("path" -> filePath,
"header" -> "false",
"codec"->"org.apache.hadoop.io.compress.GzipCodec",
"delimiter"->"|",
"dateFormat" -> dateFormat,
"nullValue"->""
))
前もって感謝します。
apache-spark - Spark com.databricks.spark.csv は、node-snappy を使用して snappy 圧縮ファイルを読み込むことができません
S3 には、snappy 圧縮アルゴリズム (node-snappy
パッケージを使用) を使用して圧縮された csv ファイルがいくつかあります。Spark を使用してこれらのファイルを処理しcom.databricks.spark.csv
たいのですが、常に無効なファイル入力エラーが発生します。
コード:
エラーメッセージ:
16/09/24 21:57:25 WARN TaskSetManager: ステージ 0.0 でタスク 0.0 が失われました (TID 0、ip-10-0-32-5.ec2.internal): java.lang.InternalError: データを解凍できませんでした。入力が無効です。org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressBytesDirect(ネイティブメソッド)で org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:239)で org.apache.hadoop. io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) で org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) で org.apache.hadoop.util.LineReader.readLine (LineReader.java: