これを書くきっかけとなったユースケースがあり、多くの人がこの状況に直面したと確信しています。状況は、単一の Talend ジョブを介して MongoDB から Snowflake データベースに複数のコレクションを移行し、コレクションの最上位ノードを Snowflake テーブルの個々のフィールドとして保持することでした。
MongoDBコレクションはスキーマを強制しないため、TalendはMongoDBソースの動的スキーマをサポートしていないことがわかっています。これは、取り込みたい既存/新しいコレクションごとに個別のジョブ/サブジョブを作成する必要があることを意味しますドキュメントの将来の変更に備えてジョブを再設計する必要がありますが、常に機能することを保証する必要があるため、代替ソリューションを検討する必要があります。
これがアプローチです、
ステップ 1: MongoDB コレクションからすべての最上位キーとそのタイプを取得します。$objectToArrrrayで集約を使用して、すべてのトップ キーと値のペアをドキュメント配列に変換し、続いて$addToSetで$unwindと$groupを使用して、コレクション全体で個別のキーと値の型を取得しました。
{
"_id" : "1",
"keys" : [
"field1~string",
"field2~object",
"filed3~date",
"_id~objectId"
]
}
ステップ 2 : Mongo データ型と Snowflake データ型の間の 1 対 1 のマップを作成します。この情報を格納するために、「 dataTypes 」と呼ばれるハッシュ マップを作成しました。または、この情報をテーブルやファイルなどに保存することもできます。
java.util.Map<String,String> dataTypes = new java.util.HashMap<String,String>();
dataTypes.put("string","VARCHAR");
dataTypes.put("int","NUMBER");
dataTypes.put("objectId","VARCHAR");
dataTypes.put("object","VARIANT");
dataTypes.put("date","TIMESTAMP_LTZ");
dataTypes.put("array","VARCHAR");
dataTypes.put("bool","BOOLEAN");
ステップ 3: キーを Snowflake と比較します。まず、テーブルが存在するかどうかスノーフレークINFORMATION_SCHEMAにクエリを実行します。存在しない場合はテーブルを作成し、存在する場合はドキュメント内のフィールドの変更を確認し、追加または変更します。スノーフレーク テーブルのそれらの列。DDL スクリプトは、ステップ 2 で「データ型マッピング」を使用し、ステップ 1 でキーを反復することによって生成されます。
ステップ 4 : mongoexportコマンドを使用して、MongoDB からローカル ファイルシステムにデータをアンロードします。
mongoexport --db <databaseName> --collection <collectionName> --type=csv --fields=<fieldList> --out <filename>
これは、ステップ 1 のキーから準備されます。
ステップ 5: Snowsqlを使用してPUTコマンドを使用して、.csv ファイルをローカル ファイルシステムからスノーフレークのステージング場所にステージングします。
snowsql -d <database> -s <schema> -o exit_on_error=true -o log_level=DEBUG -q 'put <fileName> @<internalStage> OVERWRITE=TRUE';
ステップ 6: ステージング場所からスノーフレーク テーブルにデータをロードする
COPY INTO <tableName> FROM @<internalStage>
[file_format=<fileFormat>] [pattern=<regex_pattern>]
ここでは file_format と pattern の指定はオプションです。1 つのスノーフレーク ステージでコレクションごとに複数のファイルをステージングするため、正規表現を使用しました。
ステップ 7 : コレクションのリストを維持します。リストは、ローカル ファイルシステムまたはデータベース テーブル内のファイルに配置できます。Talend ジョブでは、コレクションのリストを反復処理し、コレクション名、テーブル名をパラメータ化することにより、上記の手順で各コレクションを処理します。 、ジョブ内のファイル名、ステージング名など。