問題の背景
私は現在、古いディレクトリに表示されるファイルのグループを処理するラクダベースの ETL アプリケーションに取り組んでいます。ファイルは、ファイル名の先頭で決まるグループとしてまとめて処理する必要があります。ファイルは、完了ファイル (「.flag」) がディレクトリに書き込まれた後にのみ処理できます。キャメル ファイル コンポーネントに完了ファイル オプションがあることは知っていますが、それでは完了ファイルと同じ名前のファイルしか取得できません。アプリケーションは継続的に実行し、日付が変わると翌日のディレクトリのポーリングを開始する必要があります。
ディレクトリ構造の例:
/process-directory
/03-09-2011
/03-10-2011
/GROUPNAME_ID1_staticfilename.xml
/GROUPNAME_staticfilename2.xml
/GROUPNAME.flag
/GROUPNAME2_ID1_staticfilename.xml
/GROUPNAME2_staticfilename2.xml
/GROUPNAME2_staticfilename3.xml
/GROUPNAME2.flag
これまでの試み
処理を開始する次のルート (難読化された名前) があります。
@Override
public void configure() throws Exception
{
getContext().addEndpoint("processShare", createProcessShareEndpoint());
from("processShare")
.process(new InputFileRouter())
.choice()
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE1 + "'")
.to("seda://type1?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE2 + "'")
.to("seda://type2?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE3 + "'")
.to("seda://type3?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE4 + "'")
.to("seda://type4?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE5 + "'")
.to("seda://type5?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE6 + "'")
.to("seda://type6?size=1")
.when()
.simple("${header.processorName} == '" + InputFileType.TYPE7 + "'")
.to("seda://type7?size=1")
.otherwise()
.log(LoggingLevel.FATAL, "Unknown file type encountered during processing! --> ${body}");
}
私の問題は、ファイル エンドポイントの設定方法に関するものです。私は現在、運が悪くてもエンドポイントをプログラムで構成しようとしています。これまでの camel での私の経験では、主に Spring DSL を使用しており、Java DSL は使用していません。
FileEndpoint オブジェクトをインスタンス化しようとするルートをたどりましたが、ルートが構築されるたびに、ファイル プロパティが null であるというエラーが表示されます。これは、エンドポイントではなく FileComponent を作成する必要があるためだと思います。uriを使用してディレクトリ名に動的な日付を指定できないため、uriを使用せずにエンドポイントを作成していません。
private FileEndpoint createProcessShareEndpoint() throws ConfigurationException
{
FileEndpoint endpoint = new FileEndpoint();
//Custom directory "ready to process" implementation.
endpoint.setProcessStrategy(getContext().getRegistry().lookup(
"inputFileProcessStrategy", MyFileInputProcessStrategy.class));
try
{
//Controls the number of files returned per directory poll.
endpoint.setMaxMessagesPerPoll(Integer.parseInt(
PropertiesUtil.getProperty(
AdapterConstants.OUTDIR_MAXFILES, "1")));
}
catch (NumberFormatException e)
{
throw new ConfigurationException(String.format(
"Property %s is required to be an integer.",
AdapterConstants.OUTDIR_MAXFILES), e);
}
Map<String, Object> consumerPropertiesMap = new HashMap<String, Object>();
//Controls the delay between directory polls.
consumerPropertiesMap.put("delay", PropertiesUtil.getProperty(
AdapterConstants.OUTDIR_POLLING_MILLIS));
//Controls which files are included in directory polls.
//Regex that matches file extensions (eg. {SOME_FILE}.flag)
consumerPropertiesMap.put("include", "^.*(." + PropertiesUtil.getProperty(
AdapterConstants.OUTDIR_FLAGFILE_EXTENSION, "flag") + ")");
endpoint.setConsumerProperties(consumerPropertiesMap);
GenericFileConfiguration configuration = new GenericFileConfiguration();
//Controls the directory to be polled by the endpoint.
if(CommandLineOptions.getInstance().getInputDirectory() != null)
{
configuration.setDirectory(CommandLineOptions.getInstance().getInputDirectory());
}
else
{
SimpleDateFormat dateFormat = new SimpleDateFormat(PropertiesUtil.getProperty(AdapterConstants.OUTDIR_DATE_FORMAT, "MM-dd-yyyy"));
configuration.setDirectory(
PropertiesUtil.getProperty(AdapterConstants.OUTDIR_ROOT) + "\\" +
dateFormat.format(new Date()));
}
endpoint.setConfiguration(configuration);
return endpoint;
質問
この状況で GenericFileProcessingStrategy を実装するのは正しいことですか? もしそうなら、これの例はどこかにありますか?キャメル ファイルの単体テストを調べましたが、飛び出してきたものは何もありませんでした。
エンドポイントの構成で何が間違っていますか? この混乱を解消するための答えは、質問 3 に結びついているように感じます。
ポーリング時および日付変更時に日付付きフォルダーをローリングするようにファイル エンドポイントを構成できますか?
いつも助けてくれてありがとう。