4

Mule Studio 3.4.0 Community Edition を使用しています。File Endpoint で受信した大きな CSV ファイルを解析する方法について大きな問題があります。シナリオは、3 つの CSV ファイルがあり、ファイルのコンテンツをデータベースに入れることです。しかし、巨大なファイル (約 144MB) を読み込もうとすると、「OutOfMemory」例外が発生します。大きなCSVを小さなサイズのCSVに分割/分割する解決策として考えました(この解決策が最適かどうかはわかりません)o例外をスローせずにCSVを処理する方法を見つけようとしました。

<file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>

<flow name="CsvToFile" doc:name="CsvToFile">
        <file:inbound-endpoint path="src/main/resources/inbox" moveToDirectory="src/main/resources/processed"  responseTimeout="10000" doc:name="CSV" connector-ref="File">
            <file:filename-wildcard-filter pattern="*.csv" caseSensitive="true"/>
        </file:inbound-endpoint>
        <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property"/>
        <choice doc:name="Choice">
            <when expression="INVOCATION:nome_file=azienda" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/companies-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Azienda"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertAziende" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Azienda">
                    <jdbc-ee:query key="InsertAziende" value="INSERT INTO aw006_azienda VALUES (#[map-payload:AW006_ID], #[map-payload:AW006_ID_CLIENTE], #[map-payload:AW006_RAGIONE_SOCIALE])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=servizi" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/services-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Servizi"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertServizi" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Servizi">
                    <jdbc-ee:query key="InsertServizi" value="INSERT INTO ctrl_aemd_unb_servizi VALUES (#[map-payload:CTRL_ID_TIPO_OPERAZIONE], #[map-payload:CTRL_DESCRIZIONE], #[map-payload:CTRL_COD_SERVIZIO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=richiesta" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/requests-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Richiesta"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertRichieste" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Richiesta">
                    <jdbc-ee:query key="InsertRichieste" value="INSERT INTO ctrl_aemd_unb_richiesta VALUES (#[map-payload:CTRL_ID_CONTROLLER], #[map-payload:CTRL_NUM_RICH_VENDITORE], #[map-payload:CTRL_VENDITORE], #[map-payload:CTRL_CANALE_VENDITORE], #[map-payload:CTRL_CODICE_SERVIZIO], #[map-payload:CTRL_STATO_AVANZ_SERVIZIO], #[map-payload:CTRL_DATA_INSERIMENTO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
        </choice>   
    </flow>

この問題を解決する方法がわかりません。あらゆる種類の助けを事前にありがとう

4

2 に答える 2

3

SteveS が言ったように、 はcsv-to-maps-transformerファイルを処理する前にファイル全体をメモリにロードしようとするかもしれません。あなたがしようとすることができるのは、csvファイルを小さな部分に分割し、それらの部分をVM個別に処理するために送信することです. まず、この最初のステップを達成するためのコンポーネントを作成します。

public class CSVReader implements Callable{
    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {

        InputStream fileStream = (InputStream) eventContext.getMessage().getPayload();
        DataInputStream ds = new DataInputStream(fileStream);
        BufferedReader br = new BufferedReader(new InputStreamReader(ds));

        MuleClient muleClient = eventContext.getMuleContext().getClient();

        String line;
        while ((line = br.readLine()) != null) {
            muleClient.dispatch("vm://in", line, null);
        }

        fileStream.close();
        return null;
    }
}

次に、メイン フローを 2 つに分割します

<file:connector name="File" 
    workDirectory="yourWorkDirPath" autoDelete="false" streaming="true"/>

<flow name="CsvToFile" doc:name="Split and dispatch">
    <file:inbound-endpoint path="inboxPath"
        moveToDirectory="processedPath" pollingFrequency="60000"
        doc:name="CSV" connector-ref="File">
        <file:filename-wildcard-filter pattern="*.csv"
            caseSensitive="true" />
    </file:inbound-endpoint>
    <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property" />
    <component class="com.dgonza.CSVReader" doc:name="Split the file and dispatch every line to VM" />
</flow>

<flow name="storeInDatabase" doc:name="receive lines and store in database">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="in" doc:name="VM" />
    <Choice>
        .
        .
        Your JDBC Stuff
        .
        .
    <Choice />
</flow>

file-connectorストリーミングを有効にするには、現在の構成を維持してください。このソリューションを使用すると、最初にファイル全体をメモリにロードする必要なく、csv データを処理できます。HTH

于 2013-05-06T23:20:37.493 に答える
1

csv-to-maps-transformer は、ファイル全体を強制的にメモリに格納しようとしていると思います。あなたは 1 つの大きなファイルを扱っているので、個人的には、それを処理するための Java クラスを作成する傾向があります。File エンドポイントは、ファイル ストリームをカスタム トランスフォーマーに渡します。その後、JDBC 接続を確立し、ファイル全体をロードすることなく、一度に 1 行ずつ情報を取得できます。OpenCSVを使用して CSV を解析しました。したがって、Java クラスには次のようなものが含まれます。

protected Object doTransform(Object src, String enc) throws TransformerException {  

    try {
        //Make a JDBC connection here

        //Now read and parse the CSV

        FileReader csvFileData = (FileReader) src;


        BufferedReader br = new BufferedReader(csvFileData);
        CSVReader reader = new CSVReader(br);

        //Read the CSV file and add the row to the appropriate List(s)
        String[] nextLine;
        while ((nextLine = reader.readNext()) != null) {
            //Push your data into the database through your JDBC connection
        }
        //Close connection.

               }catch (Exception e){
    }
于 2013-05-06T18:06:23.853 に答える