1

問題の背景

私は現在、古いディレクトリに表示されるファイルのグループを処理するラクダベースの ETL アプリケーションに取り組んでいます。ファイルは、ファイル名の先頭で決まるグループとしてまとめて処理する必要があります。ファイルは、完了ファイル (「.flag」) がディレクトリに書き込まれた後にのみ処理できます。キャメル ファイル コンポーネントに完了ファイル オプションがあることは知っていますが、それでは完了ファイルと同じ名前のファイルしか取得できません。アプリケーションは継続的に実行し、日付が変わると翌日のディレクトリのポーリングを開始する必要があります。

ディレクトリ構造の例:

 /process-directory
      /03-09-2011
      /03-10-2011
           /GROUPNAME_ID1_staticfilename.xml
           /GROUPNAME_staticfilename2.xml
           /GROUPNAME.flag
           /GROUPNAME2_ID1_staticfilename.xml
           /GROUPNAME2_staticfilename2.xml
           /GROUPNAME2_staticfilename3.xml
           /GROUPNAME2.flag


これまでの試み

処理を開始する次のルート (難読化された名前) があります。

@Override
public void configure() throws Exception 
{
    getContext().addEndpoint("processShare", createProcessShareEndpoint());

    from("processShare")
        .process(new InputFileRouter())
        .choice()
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE1 + "'")
                .to("seda://type1?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE2 + "'")
                .to("seda://type2?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE3 + "'")
                .to("seda://type3?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE4 + "'")
                .to("seda://type4?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE5 + "'")
                .to("seda://type5?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE6 + "'")
                .to("seda://type6?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE7 + "'")
                .to("seda://type7?size=1")
            .otherwise()
                .log(LoggingLevel.FATAL, "Unknown file type encountered during processing! --> ${body}");
}


私の問題は、ファイル エンドポイントの設定方法に関するものです。私は現在、運が悪くてもエンドポイントをプログラムで構成しようとしています。これまでの camel での私の経験では、主に Spring DSL を使用しており、Java DSL は使用していません。

FileEndpoint オブジェクトをインスタンス化しようとするルートをたどりましたが、ルートが構築されるたびに、ファイル プロパティが null であるというエラーが表示されます。これは、エンドポイントではなく FileComponent を作成する必要があるためだと思います。uriを使用してディレクトリ名に動的な日付を指定できないため、uriを使用せずにエンドポイントを作成していません。

private FileEndpoint createProcessShareEndpoint() throws ConfigurationException
    {
        FileEndpoint endpoint = new FileEndpoint();

        //Custom directory "ready to process" implementation.
        endpoint.setProcessStrategy(getContext().getRegistry().lookup(
                "inputFileProcessStrategy", MyFileInputProcessStrategy.class));

        try 
        {
            //Controls the number of files returned per directory poll.
            endpoint.setMaxMessagesPerPoll(Integer.parseInt(
                    PropertiesUtil.getProperty(
                            AdapterConstants.OUTDIR_MAXFILES, "1")));
        } 
        catch (NumberFormatException e) 
        {
            throw new ConfigurationException(String.format(
                    "Property %s is required to be an integer.", 
                    AdapterConstants.OUTDIR_MAXFILES), e);
        }

        Map<String, Object> consumerPropertiesMap = new HashMap<String, Object>();

        //Controls the delay between directory polls.
        consumerPropertiesMap.put("delay", PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_POLLING_MILLIS));

        //Controls which files are included in directory polls.
        //Regex that matches file extensions (eg. {SOME_FILE}.flag)
        consumerPropertiesMap.put("include", "^.*(." + PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_FLAGFILE_EXTENSION, "flag") + ")");

        endpoint.setConsumerProperties(consumerPropertiesMap);

        GenericFileConfiguration configuration = new GenericFileConfiguration();

        //Controls the directory to be polled by the endpoint.
        if(CommandLineOptions.getInstance().getInputDirectory() != null)
        {
            configuration.setDirectory(CommandLineOptions.getInstance().getInputDirectory());
        }
        else
        {
            SimpleDateFormat dateFormat = new SimpleDateFormat(PropertiesUtil.getProperty(AdapterConstants.OUTDIR_DATE_FORMAT, "MM-dd-yyyy"));

            configuration.setDirectory(
                    PropertiesUtil.getProperty(AdapterConstants.OUTDIR_ROOT) + "\\" +
                    dateFormat.format(new Date()));
        }

        endpoint.setConfiguration(configuration);

        return endpoint;


質問

  1. この状況で GenericFileProcessingStrategy を実装するのは正しいことですか? もしそうなら、これの例はどこかにありますか?キャメル ファイルの単体テストを調べましたが、飛び出してきたものは何もありませんでした。

  2. エンドポイントの構成で何が間違っていますか? この混乱を解消するための答えは、質問 3 に結びついているように感じます。

  3. ポーリング時および日付変更時に日付付きフォルダーをローリングするようにファイル エンドポイントを構成できますか?

いつも助けてくれてありがとう。

4

2 に答える 2

2

たとえば、file:xxxx?processStrategy=#myProcess のように、processStrategy オプションを使用して、エンドポイント uri からカスタム ProcessStrategy を参照できます。値の前に # を付けて、レジストリから検索する必要があることを示していることに注意してください。したがって、Spring XML では、 <bean id="myProcess" ...> タグを追加するだけです

Java では、CamelContext API からエンドポイントを取得する方がおそらく簡単です。

FileEndpoint file = context.getEndpoint("file:xxx?aaa=123&bbb=456", FileEndpoint.class);

これにより、エンドポイントを事前に構成できます。もちろん、後で FileEndpoint の API を使用して、他の構成を設定することもできます。

于 2011-03-12T13:53:43.847 に答える
0

Java で GenericFileProcessingStrategy を使用する方法は次のとおりです。

@Component
public class CustomGenericFileProcessingStrategy<T> extends GenericFileProcessStrategySupport<T> {
public CustomFileReadyToCopyProcessStrategy() {
}

public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
    super.begin(operations, endpoint, exchange, file);
    ...
}

public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
    super.commit(operations, endpoint, exchange, file);
    ...
}

public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
   super.rollback(operations, endpoint, exchange, file);
   ...
   }
}

次に、ルーティング Builder クラスを作成します。

public class myRoutes() extends RouteBuilder {
    
    private final static CustomGenericFileProcessingStrategy  customGenericFileProcessingStrategy; 

    public myRoutes(CustomGenericFileProcessingStrategy 
 customGenericFileProcessingStrategy) {   
        this.customGenericFileProcessingStrategy  = customGenericFileProcessingStrategy ; }
    
@Override public void configure() throws Exception {
    
    FileEndpoint fileEndPoint= camelContext.getEndpoint("file://mySourceDirectory"); 
    fileEndPoint.setProcessStrategy(myCustomGenericFileProcessingStrategy ); 
    from(fileEndPoint).setBody(...)process(...).toD(...);
    ...
}
于 2020-09-07T13:54:13.233 に答える