0

変換と同期symmetricdsするように構成mysqlするために使用しようとしています。postgres100% の CPU 負荷で初期ロード時の挿入パフォーマンスが非常に低くなっていますpostgrespostgresログを見ると、使用されていることがわかりましたINSERT。何百万ものレコードがあるため、通常の作業には問題ありませんが、初期化には問題ありません。私は代わりPostgresBulkDatabaseWriterに使用するソースコードを見つけましたが、それは良い解決策のように見えます( SQLリクエストは私にとってはかなりうまく機能します)が、それをどのように使用できるかわかりません。COPYINSERTCOPY

だから私の質問:

symmetricds何百万ものレコードで初期ロードを行うにはどうすればよいでしょうか?

PostgresBulkDatabaseWriter初期 (逆初期) ロードを有効にするにはどうすればよいですか?

ありがとう

更新日:

ソース テーブルmysql:

CREATE TABLE `companies` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cid` int(11) NOT NULL,
  `universalName` text NOT NULL,
  `name` text NOT NULL,
  `country` text NOT NULL,
  `city` text NOT NULL,
  `street` text NOT NULL,
  `phone` text NOT NULL,
  `foundedYear` text NOT NULL,
  `employeeCountRange` text NOT NULL,
  `specialties` text NOT NULL,
  `websiteUrl` text NOT NULL,
  `twitterId` text NOT NULL,
  `check` tinyint(4) NOT NULL DEFAULT '0',
  `date` datetime NOT NULL,
  PRIMARY KEY (`id`)
);

CREATE TABLE `search_results` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cid` int(11) NOT NULL,
  `title` text NOT NULL,
  `description` text NOT NULL,
  `link` text NOT NULL,
  `raw` text NOT NULL,
  `date` datetime NOT NULL,
  PRIMARY KEY (`id`)
);

コア テーブルpostgres:

CREATE TABLE res_country (
    id integer NOT NULL,
    create_uid integer,
    create_date timestamp without time zone,
    write_date timestamp without time zone,
    write_uid integer,
    address_format text,
    currency_id integer,
    code character varying(2),
    name character varying(64) NOT NULL
);

INSERT INTO res_country VALUES (1, 1, '2013-11-16 06:53:31.030363', '2013-11-16 06:53:31.030363', 1, '%(street)s
%(street2)s
%(city)s %(state_code)s %(zip)s
%(country_name)s', 1, 'AD', 'Andorra, Principality of');

CREATE TABLE res_partner (
    id integer NOT NULL,
    name character varying(128) NOT NULL,
    lang character varying(64),
    company_id integer,
    create_uid integer,
    create_date timestamp without time zone,
    write_date timestamp without time zone,
    write_uid integer,
    comment text,
    ean13 character varying(13),
    color integer,
    image bytea,
    use_parent_address boolean,
    active boolean,
    street character varying(128),
    supplier boolean,
    city character varying(128),
    user_id integer,
    zip character varying(24),
    title integer,
    function character varying(128),
    country_id integer,
    parent_id integer,
    employee boolean,
    type character varying,
    email character varying(240),
    vat character varying(32),
    website character varying(64),
    fax character varying(64),
    street2 character varying(128),
    phone character varying(64),
    credit_limit double precision,
    date date,
    tz character varying(64),
    customer boolean,
    image_medium bytea,
    mobile character varying(64),
    ref character varying(64),
    image_small bytea,
    birthdate character varying(64),
    is_company boolean,
    state_id integer,
    notification_email_send character varying NOT NULL,
    opt_out boolean,
    signup_type character varying,
    signup_expiration timestamp without time zone,
    signup_token character varying,
    last_reconciliation_date timestamp without time zone,
    debit_limit double precision,
    display_name character varying,
    vat_subjected boolean,
    section_id integer
);


CREATE TABLE my_res_partner_companies (
  id INT8 NOT NULL PRIMARY KEY,
  cid INT8 NOT NULL,
  universalName VARCHAR NOT NULL,
  employeeCountRange VARCHAR NOT NULL,
  specialties VARCHAR NOT NULL,
  twitterId VARCHAR NOT NULL,
  "check" INT4 NOT NULL DEFAULT '0',
  date TIMESTAMP NOT NULL
);

CREATE TABLE my_res_partner_search_result (
  id INT8 NOT NULL PRIMARY KEY,
  link VARCHAR NOT NULL,
  raw VARCHAR NOT NULL,
  date TIMESTAMP NOT NULL
);

ソース プロパティ:

engine.name=source-001

# The class name for the JDBC Driver
db.driver=com.mysql.jdbc.Driver

# The JDBC URL used to connect to the database
db.url=jdbc:mysql://localhost:3307/data?tinyInt1isBit=false

# The user to login as who can create and update tables
db.user=root

# The password for the user to login as
db.password=

# The HTTP URL of the root node to contact for registration
registration.url=http://localhost:8080/sync/core-000
#auto.reload.reverse=true

# Do not change these for running the demo
group.id=source
external.id=001

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000

コア プロパティ:

engine.name=core-000

# The class name for the JDBC Driver
db.driver=org.postgresql.Driver

# The JDBC URL used to connect to the database
db.url=jdbc:postgresql://localhost:5432/data2?stringtype=unspecified

# The user to login as who can create and update tables
db.user=admin

# The password for the user to login as
db.password=admin

registration.url=
sync.url=http://localhost:8080/sync/core-000
auto.reload.reverse=true
datareload.batch.insert.transactional=true

# Do not change these for running the demo
group.id=core
external.id=000

# Don't muddy the waters with purge logging
job.purge.period.time.ms=7200000

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000

symmetricな構成:

-- Nodes
insert into sym_node_group (node_group_id, description)
values ('core', 'Core Storage');
insert into sym_node_group (node_group_id, description)
values ('source', 'Source Storage');

insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('source', 'core', 'P');
insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('core', 'source', 'W');

insert into sym_node (node_id, node_group_id, external_id, sync_enabled)
values ('000', 'core', '000', 1);
insert into sym_node_security (node_id,node_password,registration_enabled,registration_time,initial_load_enabled,initial_load_time,initial_load_id,initial_load_create_by,rev_initial_load_enabled,rev_initial_load_time,rev_initial_load_id,rev_initial_load_create_by,created_at_node_id)
values ('000','changeme',1,current_timestamp,1,current_timestamp,null,null,0,null,null,null,'000');
insert into sym_node_identity values ('000');

-- Channels
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('source__acc', 1, 100000, 1, 'accounting synchronisation');

-- Triggers
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('source__companies','companies','source__acc',current_timestamp,current_timestamp);

insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('source__search_results','search_results','source__acc',current_timestamp,current_timestamp);

-- Routers
insert into sym_router
(router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time)
values('source_2_core', 'source', 'core', 'default',current_timestamp, current_timestamp);

-- Trigger Router Links
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time)
values('source__companies','source_2_core', 100, 0, current_timestamp, current_timestamp);

insert into sym_trigger_router
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time)
values('source__search_results','source_2_core', 200, 0, current_timestamp, current_timestamp);

主な変換:

-- Transform
insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');
--  ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL),
  ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'),
  ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL),
  ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL),
  ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL),
  ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL),
  ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1');

insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__residue', 'source', 'core', 'LOAD', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED');
  -- ('source__companies__residue', 'source', 'core', 'EXTRACT', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__residue', '*', 'cid', 'cid', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'universalName', 'universalName', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'employeeCountRange', 'employeeCountRange', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'specialties', 'specialties', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'twitterId', 'twitterId', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'check', 'check', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'date', 'date', 0, 'copy', NULL);


insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL),
  ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL),
  ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'),
  ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0');

insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__residue', 'source', 'core', 'LOAD', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__residue', 'source', 'core', 'EXTRACT', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__residue', '*', 'link', 'link', 0, 'copy', NULL),
  ('source__search_results__residue', '*', 'raw', 'raw', 0, 'copy', NULL),
  ('source__search_results__residue', '*', 'date', 'date', 0, 'copy', NULL);

単純化された変換:

-- Transform
insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');
--  ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL),
  ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'),
  ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL),
  ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL),
  ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL),
  ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL),
  ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1');


insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL),
  ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL),
  ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'),
  ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0');

コアセットアップ:

update sym_channel set DATA_LOADER_TYPE = 'postgres_bulk' where channel_id = 'reload';

主な変換 (LOAD および EXTRACT) と単純化された変換 (LOAD および EXTRACT) を使用して、1 つずつレコードをsymmetric挿入するように見えます。COPY

4

2 に答える 2

6

PostgresBulkDataLoaderFactory が答えです。

初期ロードとリロードにバルク ライターを使用するだけの場合は、リロード チャネルをバルク ライターのみを使用するように構成することをお勧めします。

チャネル テーブル (デフォルトでは sym_channel) で、リロード チャネルの data_loader_type 列を「postgres_bulk」に更新します。

ユーザー ガイドでは、DatabaseWriters の実装方法について簡単に説明しています。

于 2014-01-15T13:43:37.343 に答える