0

いくつかのテーブルに SQL Server CDC をセットアップしています。CDC が開始されると、cdc テーブルにデータが入力されます。これらの変更を処理し、変更が発生するたびに MQ メッセージを生成して、外部メッセージ キューに送信したいと考えています。

このデータを処理する最善の方法は何ですか。私は sqdata のようないくつかの製品を見てきましたが、それを行うためのより良い方法があるかどうかを考えていました. Service Broker を使用して CDC を確認しましたが、これにより、外部アプリケーションにのみ送信されるメッセージが生成されます。

もう 1 つの問題は、CDC の変更によってメッセージが生成され、変更データを削除してほしいというメッセージが表示されるため、この処理サービスをスケーリングする場合は、既に処理されたデータを処理しないようにすることです。

4

2 に答える 2

1

使用される CDC は、LSN を使用して、ストリームで処理したものを見つけることを前提としています。何らかの方法で処理した間隔を追跡する必要があります (データベース内のテーブルに貼り付けるのが好きです)。テーブルは次のようになります。

create table dbo.CDCProcessing (
   ID int identity not null,
   CaptureInstance sysname not null,
   FarEndpoint binary(10),
   IsProcessed bit
);

create unique index [OnlyOneOpenRange] 
on dbo.CDCProcessing (CaptureInstance)
where IsProcessed = 0;

処理ループは次のようになります (キャプチャ インスタンスごと)。

  1. CDCProcessing テーブルで、中断された可能性のある処理間隔 (つまり、" where IsProcessed = 0"
  2. 上記の手順で間隔が見つからない場合は、挿入します。FarEndpoint の値には、 の値を使用しますsys.fn_cdc_max_lsn()。見つかった場合は、自分で挿入したかのように使用してください。
  3. 状況に応じてcdc.fn_cdc_get_all_changes_<capture_instance>またはを使用して間隔を処理します。cdc.fn_cdc_get_net_changes_<capture_instance>いずれにせよ、from_lsn の値が必要になります。dbo.CDCProcessing テーブルにこの CaptureInstance の行がある場合は、FarEndpoint 値が最大の行を取得して呼び出しますsys.fn_cdc_increment_lsn。行がない場合はsys.fn_cdc_get_min_lsn、このキャプチャ インスタンスを呼び出します。
  4. すべての行を正常に処理したら、IsProcessed 列を未処理間隔の 1 に更新します。

上記は、間隔の途中で中断した場合に何が起こるかについては、かなりばかげています。つまり、一部の CDC レコードを複数回処理することになる可能性があります。これが重要な場合は、プロセスを修正して、ダウンストリーム システムによって処理された最後のメッセージを考慮し、それに応じて CDCProcessing テーブルを更新できます。しかし、それは読者への演習として残されています。

パージに関する他の問題については、それがどのように機能するかではありません。CDC をセットアップするとき、ローリング間隔を維持するジョブを作成する必要があります (デフォルトでは 3 日分に設定されていると思います)。ジョブは定期的に実行され、CDC データを保持間隔にトリムします。したがって、そのジョブが実行されると仮定すると、心配する必要はありません。

于 2015-02-23T18:24:13.440 に答える
0

最近、仮想 cdc インスタンスを使用するソリューションを作成しました (これにより、単一の実際のキャプチャ インスタンスを使用して、無制限の数の別個の個別のインスタンスが単一のターゲット テーブルに対して動作できます。これは、克服する必要がある問題の 1 つでした。

私の解決策は、永続化されたデータ テーブルを使用して CT テーブルからデータを受け取り、そこからデータを処理することでした。30 秒ごとに実行されるジョブと、インスタンス名と最後の LSN を格納するテーブルを使用してこれをセットアップすることで、同じクライアント データベースまたは別の共通データベースの永続ストレージにデータをハイブすることができます。 (同じ SQL インスタンスのオンまたはオフ)。

これにより、いつでもクリーンアップを指定して、ローカルに保存されるデータの量を減らすことができます。

処理の面では、インスタンスの作成時に作成される cdc 関数を使用する必要があります。[ $update_mask] 列を使用することで、実際の列の変更を特定し、[ $start_lsn]/__$seqval、タイムスタンプ (収集されたLSN から)。

update_mask 列とタイムスタンプを解読するには、次のコードを使用できます。

SELECT 
[__$start_lsn] ,
[__$end_lsn] ,
[__$seqval] ,
[__$operation] ,
[__$update_mask] ,
sys.fn_cdc_map_lsn_to_time(__$start_lsn) [RowTimestamp],
reverse(stuff(reverse(
       ( SELECT    CC.column_name + ','
    FROM      cdc.captured_columns CC
             INNER JOIN cdc.change_tables CT ON CC.[object_id] = CT.[object_id]
    WHERE     capture_instance = 'MyCaptureInstanceName'
         AND sys.fn_cdc_is_bit_set(CC.column_ordinal, __$update_mask) = 1
FOR
    XML PATH('')
))
       ,1,1,'')) [ChangedColumns],
SYSDATETIME() [CopyTs] 
FROM cdc.MyCaptureInstanceName_CT
WHERE [__$start_lsn] > (SELECT ISNULL(MAX(MaxLSN),0) FROM MyLogTable WHERE CaptureInstance = 'MyCaptureInstanceName')
于 2016-09-16T20:51:58.047 に答える