以前の質問で説明した、DataWeave がRFC 4180に準拠していない CSV ファイルを読み取らないという問題の回避策を見つけようとしています。org.mule.api.lifecycle.Callable
私が持っていた (ファイルまたは HTTP 応答からのものである可能性があります)を読み取り、処理できない行を削除する実装を実装する Java クラスを作成することにしInputStream
ました (ここでの実装があまりにも多くを引き出していることはわかっています)。 )、読み取り可能な行を新しいストリームにパイプします。残念ながら、DataWeave から 0 バイトの出力が返され、例外が発生します。
編集:明確にするために、この質問は他の質問とは独立して存在する可能性がありますが、以前にこのようなことをする方法を知りました. たまたま、別の問題の回避策としてこの問題を解決しようとしています。
ここに私のCSVファイルがあります:
Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,Something Weird",C,D
A,B,Something Else" Weird,D,
A,",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",C,D
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,
これが私のフローです:
<flow name="nonrfcFlow">
<file:inbound-endpoint path="C:\tmp\" moveToPattern="nonrfc-read.csv" moveToDirectory="C:\tmp\" connector-ref="File-Configuration" responseTimeout="10000" mimeType="application/csv" metadata:id="a344bc19-5643-4bfb-b8c2-2994d7997c75" doc:name="File">
<file:filename-regex-filter pattern="nonrfc\.csv" caseSensitive="true"/>
</file:inbound-endpoint>
<flow-ref name="removequotedlines" doc:name="removequotedlines"/>
<dw:transform-message doc:name="Transform Message" metadata:id="5b01fcc4-2a1c-42fb-9cab-2defed9a1161">
<dw:set-payload><![CDATA[%dw 1.0
%input payload application/csv
%output application/json
---
payload
]]></dw:set-payload>
</dw:transform-message>
<file:outbound-endpoint path="C:\tmp" outputPattern="nonrfc-output.json" connector-ref="OutputFileConfiguration" responseTimeout="10000" doc:name="File"/>
</flow>
<sub-flow name="removequotedlines">
<component class="com.stackoverflow.removeLinesWithQuotes" doc:name="Remove Lines with Quotes"/>
</sub-flow>
Java ファイルは次のとおりです。
package com.stackoverflow;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.apache.log4j.Logger;
import org.jfree.data.io.CSV;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.transformer.types.ListDataType;
public class removeLinesWithQuotes implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
Logger logger = Logger.getLogger(removeLinesWithQuotes.class);
InputStream is = (InputStream) eventContext.getMessage().getPayload();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
pis.connect(pos);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(pos));
String line;
while ((line = br.readLine()) != null) {
if (!line.contains("\"")) {
bw.write(line);
logger.debug(line);
bw.write("\r\n");
bw.flush();
}
}
is.close();
//eventContext.getMessage().setPayload(pis); // doesn't work
//eventContext.getMessage().setPayload(pis, new ListDataType<CSV>(CSV.class, "application/csv")); // doesn't work
return pis; // doesn't work
}
}
this answerによると、InputStream
aを返すことができるはずPipedInputStream
です。フロー参照または DataWeave のいずれかをコメント アウトすると、出力が得られます。それぞれ、無意味な JSON (RFC に従っていない場合に DataWeave CSV パーサーが奇妙な動作をするため) または行が削除された CSV が得られます。つまり、Java クラスまたは DataWeave コンポーネントは独立して機能しますが、一緒には機能しません。
Java クラスがフローから省略された場合の出力は次のとおりです。
[
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "BB",
"Column C": "CCCC",
"Column D": "DDDDDDDD"
},
{
"Column A": "A",
"Column B": "BBB",
"Column C": "CCCCCCCCC",
"Column D": "DDDDDDDDDDDDDDDDDDDDDDDDDDD"
},
{
"Column A": "A",
"Column B": ",C,D\r\nA,B,Something Else",
"Column C": "D",
"Column D": ""
},
{
"Column A": "A",
"Column B": ",S,o,m,e,t,h,i,n,g, ,N,o,r,m,a,l,",
"Column C": "C",
"Column D": "D "
},
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "B",
"Column C": "C",
"Column D": ""
},
{
"Column A": "A",
"Column B": "B",
"Column C": "",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "B",
"Column C": "",
"Column D": ""
},
{
"Column A": "A",
"Column B": "",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "",
"Column C": "C",
"Column D": ""
},
{
"Column A": "A",
"Column B": "",
"Column C": "",
"Column D": "D"
},
{
"Column A": "A",
"Column B": "",
"Column C": "",
"Column D": ""
},
{
"Column A": "",
"Column B": "B",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "",
"Column B": "B",
"Column C": "C",
"Column D": ""
},
{
"Column A": "",
"Column B": "B",
"Column C": "",
"Column D": "D"
},
{
"Column A": "",
"Column B": "B",
"Column C": "",
"Column D": ""
},
{
"Column A": "",
"Column B": "",
"Column C": "C",
"Column D": "D"
},
{
"Column A": "",
"Column B": "",
"Column C": "C",
"Column D": ""
},
{
"Column A": "",
"Column B": "",
"Column C": "",
"Column D": "D"
},
{
"Column A": "",
"Column B": "",
"Column C": "",
"Column D": ""
}
]
DataWeave を省略した場合の出力は次のとおりです。
Column A,Column B,Column C,Column D
A,B,C,D
A,BB,CCCC,DDDDDDDD
A,BBB,CCCCCCCCC,DDDDDDDDDDDDDDDDDDDDDDDDDDD
A,B,C,D
A,B,C,
A,B,,D
A,B,,
A,,C,D
A,,C,
A,,,D
A,,,
,B,C,D
,B,C,
,B,,D
,B,,
,,C,D
,,C,
,,,D
,,,
例外は次のとおりです。
INFO 2015-12-03 17:01:54,716 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.transport.file.FileConnector: Writing file to: C:\tmp\nonrfc-output.json
ERROR 2015-12-03 17:02:56,817 [[asdf].OutputFileConfiguration.dispatcher.01] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
name=OutputFileConfiguration
lifecycle=start
this=6f3e19b3
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[file]
serviceOverrides=<none>
}
, name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler
Type : org.mule.api.transport.DispatchException
Code : MULE_ERROR--2
JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html
Payload : com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler@3e274324
********************************************************************************
Exception stack is:
1. Pipe broken (java.io.IOException)
java.io.PipedInputStream:-1 (null)
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=file:///C:/tmp/, connector=FileConnector
{
name=OutputFileConfiguration
lifecycle=start
this=6f3e19b3
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[file]
serviceOverrides=<none>
}
, name='endpoint..C.tmp', mep=ONE_WAY, properties={outputPattern=nonrfc-output.json}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: WeaveMessageProcessor$WeaveOutputHandler (org.mule.api.transport.DispatchException)
org.mule.transport.AbstractMessageDispatcher:117 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html)
********************************************************************************
Root Exception stack trace:
java.io.IOException: Pipe broken
at java.io.PipedInputStream.read(Unknown Source)
at java.io.PipedInputStream.read(Unknown Source)
at com.mulesoft.weave.reader.DefaultSeekableStream.readUntil(SeekableStream.scala:146)
at com.mulesoft.weave.reader.DefaultSeekableStream.delegate$lzycompute(SeekableStream.scala:153)
at com.mulesoft.weave.reader.DefaultSeekableStream.delegate(SeekableStream.scala:151)
at com.mulesoft.weave.reader.DefaultSeekableStream.seek(SeekableStream.scala:189)
at com.mulesoft.weave.reader.UTF8StreamSourceReader.seek(StreamSourceReader.scala:121)
at com.mulesoft.weave.reader.csv.parser.CSVParser.parse(CSVParser.scala:93)
at com.mulesoft.weave.reader.csv.parser.CSVParser.elementAt(CSVParser.scala:54)
at com.mulesoft.weave.reader.csv.parser.CSVParser.contains(CSVParser.scala:38)
at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.hasNext(CSVReader.scala:52)
at scala.collection.Iterator$class.toStream(Iterator.scala:1188)
at com.mulesoft.weave.reader.csv.CSVRecordsValue$$anon$1.toStream(CSVReader.scala:50)
at com.mulesoft.weave.writer.json.JsonWriter.writeArray(JsonWriter.scala:156)
at com.mulesoft.weave.writer.json.JsonWriter.writeValue(JsonWriter.scala:137)
at com.mulesoft.weave.model.values.Value$class.write(Value.scala:31)
at com.mulesoft.weave.reader.csv.CSVRecordsValue.write(CSVReader.scala:47)
at com.mulesoft.weave.model.values.wrappers.DelegateValue$class.write(DelegateValue.scala:29)
at com.mulesoft.weave.engine.ast.variables.VariableReferenceNode.write(VariableReferenceNode.scala:9)
at com.mulesoft.weave.engine.Engine.internalExecute(Engine.scala:89)
at com.mulesoft.weave.engine.Engine.execute(Engine.scala:54)
at com.mulesoft.weave.engine.Engine.execute(Engine.scala:169)
at com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler.write(WeaveMessageProcessor.scala:159)
at org.mule.transport.file.FileMessageDispatcher.doDispatch(FileMessageDispatcher.java:75)
at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:107)
at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2686)
at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:94)
at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:56)
at org.mule.interceptor.AbstractEnvelopeInterceptor.processBlocking(AbstractEnvelopeInterceptor.java:58)
at org.mule.processor.AbstractRequestResponseMessageProcessor.process(AbstractRequestResponseMessageProcessor.java:47)
at org.mule.processor.AsyncInterceptingMessageProcessor.processNextTimed(AsyncInterceptingMessageProcessor.java:123)
at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:208)
at org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker$1.process(AsyncInterceptingMessageProcessor.java:201)
at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:16)
at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:35)
at org.mule.execution.CommitTransactionInterceptor.execute(CommitTransactionInterceptor.java:22)
at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:30)
at org.mule.execution.HandleExceptionInterceptor.execute(HandleExceptionInterceptor.java:14)
at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:67)
at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:44)
at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:50)
at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:40)
at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(IsolateCurrentTransactionInterceptor.java:41)
at org.mule.execution.ExternalTransactionInterceptor.execute(ExternalTransactionInterceptor.java:48)
at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:28)
at org.mule.execution.RethrowExceptionInterceptor.execute(RethrowExceptionInterceptor.java:13)
at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:110)
at org.mule.execution.TransactionalErrorHandlingExecutionTemplate.execute(TransactionalErrorHandlingExecutionTemplate.java:30)
at ...
********************************************************************************