1

以前の質問で説明した、DataWeave がRFC 4180に準拠していない CSV ファイルを読み取らないという問題の回避策を見つけようとしています。org.mule.api.lifecycle.Callable私が持っていた (ファイルまたは HTTP 応答からのものである可能性があります)を読み取り、処理できない行を削除する実装を実装する Java クラスを作成することにしInputStreamました (ここでの実装があまりにも多くを引き出していることはわかっています)。 )、読み取り可能な行を新しいストリームにパイプします。残念ながら、DataWeave から 0 バイトの出力が返され、例外が発生します。

編集:明確にするために、この質問は他の質問とは独立して存在する可能性がありますが、以前にこのようなことをする方法を知りました. たまたま、別の問題の回避策としてこの問題を解決しようとしています。

ここに私のCSVファイルがあります:

Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,Something Weird",C,D
A,B,Something Else" Weird,D,
A,",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",C,D   
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,

これが私のフローです:

<flow name="nonrfcFlow">
    <file:inbound-endpoint path="C:\tmp\" moveToPattern="nonrfc-read.csv" moveToDirectory="C:\tmp\" connector-ref="File-Configuration" responseTimeout="10000" mimeType="application/csv" metadata:id="a344bc19-5643-4bfb-b8c2-2994d7997c75" doc:name="File">
        <file:filename-regex-filter pattern="nonrfc\.csv" caseSensitive="true"/>
    </file:inbound-endpoint>
    <flow-ref name="removequotedlines" doc:name="removequotedlines"/>
    <dw:transform-message doc:name="Transform Message" metadata:id="5b01fcc4-2a1c-42fb-9cab-2defed9a1161">
        <dw:set-payload><![CDATA[%dw 1.0
%input payload application/csv
%output application/json
---
payload
]]></dw:set-payload>
    </dw:transform-message>
    <file:outbound-endpoint path="C:\tmp" outputPattern="nonrfc-output.json" connector-ref="OutputFileConfiguration" responseTimeout="10000" doc:name="File"/>
</flow>

<sub-flow name="removequotedlines">
    <component class="com.stackoverflow.removeLinesWithQuotes" doc:name="Remove Lines with Quotes"/>
</sub-flow>

Java ファイルは次のとおりです。

package com.stackoverflow;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import org.apache.log4j.Logger;
import org.jfree.data.io.CSV;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.transformer.types.ListDataType;

public class removeLinesWithQuotes implements Callable {

    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {
        Logger logger = Logger.getLogger(removeLinesWithQuotes.class);
        InputStream is = (InputStream) eventContext.getMessage().getPayload();
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        PipedOutputStream pos = new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream();
        pis.connect(pos);
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(pos));
        String line;
        while ((line = br.readLine()) != null) {
            if (!line.contains("\"")) {
                bw.write(line);
                logger.debug(line);
                bw.write("\r\n");
                bw.flush();
            }
        }
        is.close();

        //eventContext.getMessage().setPayload(pis); // doesn't work
        //eventContext.getMessage().setPayload(pis, new ListDataType<CSV>(CSV.class, "application/csv")); // doesn't work
        return pis; // doesn't work
    }
}

this answerによると、InputStreamaを返すことができるはずPipedInputStreamです。フロー参照または DataWeave のいずれかをコメント アウトすると、出力が得られます。それぞれ、無意味な JSON (RFC に従っていない場合に DataWeave CSV パーサーが奇妙な動作をするため) または行が削除された CSV が得られます。つまり、Java クラスまたは DataWeave コンポーネントは独立して機能しますが、一緒には機能しません。

Java クラスがフローから省略された場合の出力は次のとおりです。

[
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "BB",
    "Column C": "CCCC",
    "Column D": "DDDDDDDD"
  },
  {
    "Column A": "A",
    "Column B": "BBB",
    "Column C": "CCCCCCCCC",
    "Column D": "DDDDDDDDDDDDDDDDDDDDDDDDDDD"
  },
  {
    "Column A": "A",
    "Column B": ",C,D\r\nA,B,Something Else",
    "Column C": "D",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": ",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",
    "Column C": "C",
    "Column D": "D   "
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "B",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "A",
    "Column B": "",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "B",
    "Column C": "",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "C",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "C",
    "Column D": ""
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "",
    "Column D": "D"
  },
  {
    "Column A": "",
    "Column B": "",
    "Column C": "",
    "Column D": ""
  }
]

DataWeave を省略した場合の出力は次のとおりです。

Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,

例外は次のとおりです。

INFO  2015-12-03 17:01:54,716 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.transport.file.FileConnector: Writing file to: C:\tmp\nonrfc-output.json
ERROR 2015-12-03 17:02:56,817 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.exception.DefaultMessagingExceptionStrategy: 
********************************************************************************
Message               : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
  name=OutputFileConfiguration
  lifecycle=start
  this=6f3e19b3
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[file]
  serviceOverrides=<none>
}
,  name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler
Type                  : org.mule.api.transport.DispatchException
Code                  : MULE_ERROR--2
JavaDoc               : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
Payload               : com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler@3e274324
********************************************************************************
Exception stack is:
1. Pipe broken (java.io.IOException)
  java.io.PipedInputStream:-1 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
  name=OutputFileConfiguration
  lifecycle=start
  this=6f3e19b3
  numberOfConcurrentTransactedReceivers=4
  createMultipleTransactedReceivers=true
  connected=true
  supportedProtocols=[file]
  serviceOverrides=<none>
}
,  name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler (org.mule.api.transport.DispatchException)
  org.mule.transport.AbstractMessageDispatcher:117 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.io.IOException: Pipe broken
    at java.io.PipedInputStream.read(Unknown Source)
    at java.io.PipedInputStream.read(Unknown Source)
    at com.mulesoft.weave.reader.DefaultSeekableStream.readUntil(SeekableStream.scala:146)
    at com.mulesoft.weave.reader.DefaultSeekableStream.delegate$lzycompute(SeekableStream.scala:153)
    at com.mulesoft.weave.reader.DefaultSeekableStream.delegate(SeekableStream.scala:151)
    at com.mulesoft.weave.reader.DefaultSeekableStream.seek(SeekableStream.scala:189)
    at com.mulesoft.weave.reader.UTF8StreamSourceReader.seek(StreamSourceReader.scala:121)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.parse(CSVParser.scala:93)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.elementAt(CSVParser.scala:54)
    at com.mulesoft.weave.reader.csv.parser.CSVParser.contains(CSVParser.scala:38)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.hasNext(CSVReader.scala:52)
    at scala.collection.Iterator$class.toStream(Iterator.scala:1188)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.toStream(CSVReader.scala:50)
    at com.mulesoft.weave.writer.json.JsonWriter.writeArray(JsonWriter.scala:156)
    at com.mulesoft.weave.writer.json.JsonWriter.writeValue(JsonWriter.scala:137)
    at com.mulesoft.weave.model.values.Value$class.write(Value.scala:31)
    at com.mulesoft.weave.reader.csv.CSVRecordsValue.write(CSVReader.scala:47)
    at com.mulesoft.weave.model.values.wrappers.DelegateValue$class.write(DelegateValue.scala:29)
    at com.mulesoft.weave.engine.ast.variables.VariableReferenceNode.write(VariableReferenceNode.scala:9)
    at com.mulesoft.weave.engine.Engine.internalExecute(Engine.scala:89)
    at com.mulesoft.weave.engine.Engine.execute(Engine.scala:54)
    at com.mulesoft.weave.engine.Engine.execute(Engine.scala:169)
    at com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler.write(WeaveMessageProcessor.scala:159)
    at org.mule.transport.file.FileMessageDispatcher.doDispatch(FileMessageDispatcher.java:75)
    at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:107)
    at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2686)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:94)
    at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:56)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.processBlocking(AbstractEnvelopeInterceptor.java:58)
    at org.mule.processor.AbstractRequestResponseMessageProcessor.process(AbstractRequestResponseMessageProcessor.java:47)
    at org.mule.processor.AsyncInterceptingMessageProcessor.processNextTimed(AsyncInterceptingMessageProcessor.java:123)
    at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:208)
    at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:201)
    at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:16)
    at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:35)
    at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:22)
    at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:30)
    at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:14)
    at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:67)
    at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:44)
    at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:50)
    at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:40)
    at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:41)
    at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:48)
    at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:28)
    at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:13)
    at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:110)
    at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:30)
    at ...
********************************************************************************
4

0 に答える 0