Streams N-Way ou « Comment créer un configuration multi-source ? »
Streams N-Way est probablement la meilleure architecture que vous puissiez construire… surtout si vous voulez augmenter au maximum vos chances de vous retrouver avec des problèmes tirés par les cheveux ! Imaginez : vous avez plusieurs copies de la même table à plusieurs endroits; chaque changement sur chacune de ces tables est capturé, poussé et appliqué sur les autres copies. Maintenant pensez-y bien :
- Comment créez-vous une nouvelle copie de la table sans tout arrêter ?
- Que ce passe-t-il si une destination de la réplication est indisponible 1 heure ? 1 jour ? 1 semaine ?
- Que se passe-t-il lorsque vous changez la même ligne sur 2 copies différentes à peu près au même moment ?
- Comment répliquez vous les changements sur la structure des tables ?
Dans cet article, vous trouverez l’ensemble des commandes nécessaires pour mettre en oeuvre cette réplication Streams dite « N-Way ». C’est de loin, je pense, l’exemple le plus simple que l’on puisse construire, même s’il a ses limites. Il ne nécessite qu’une seule base de données et vous pourrez le mettre en oeuvre en moins d’une heure. 3 copies de la même table sont synchronisées :

3-Way table replication
A propos, vous ne trouverez aucune réponse aux questions ci-dessus dans cet article. J’aborderai certains de ces points dans mes prochains articles et lors du « Streams 10g et 11g MasterClass » à Colombes le 15 décembre 2009 avec Oracle University. Si vous ne connaissez pas du tout Streams, commencez plutôt par lire l’article précédent intitulé « Oracle Streams : Paramètrer la réplication mono-directionelle d’une table« .
Mais descendons dans le détail ! Dans les sections qui suivent vous allez construire et maintenir 3 copies de la même table nommée T1. Ces copies sont stockées respectivement dans les schémas FRANCE, USA et JAPAN de la même base de données. Pour mettre en place et tester cette configuration, vous devrez réaliser l’ensemble des étapes qui suivent :
- Créer les 3 schémas
- Préparer le dictionnaire multi-version et les copies des tables
- Créer l’administrateur et la file d’attente Oracle Streams
- Créer le process de capture et les règles associées
- Créer les composants « Apply »
- Changer les noms des schémas dans les LCR
- Instancier les tables
- Démarrer les process de capture et d’apply
- Tester la réplication
- Arrêter et supprimer la configuration Streams
Note:
J’ai testé cet article avec une base de données 11.1.0.7 Enterprise Edition sur Linux 32 bits.
Etape 1: Créer les 3 schémas
La seule vrai limite technique de cet exemple de réplication Streams réside dans le fait qu’une table source ne peut être instanciée théoriquement qu’une seule fois par base de données cible. En effet la structure de DBA_APPLY_INSTANTIATED_OBJECTS inclut la base de données source mais pas les objets concernés par l’instantiation. Dans notre exemple, toutes les copies résident dans la même base. Il faudrait donc idéalement pouvoir instancier chaque table 2 fois, c’est à dire une fois par copie. Pour contourner ce problème, nous instancierons toutes les tables avec le même SCN. Cela implique que les données soient consistentes dans toutes les tables au moment de l’instantiaition et donc que l’activité des utilisateurs soit bloquée. Nous n’autoriserons donc la modification des tables et de leurs données qu’après avoir instancié toutes des tables.
Le script ci-dessous crée les 3 schémas, les 3 tables et ajoute 2 lignes dans chacune d’entre-elles de sorte qu’elles soient consistentes :
connect / as sysdba
create user france
identified by france
default tablespace users
temporary tablespace temp;
grant connect,resource to france;
create user usa
identified by usa
default tablespace users
temporary tablespace temp;
grant connect,resource to usa;
create user japan
identified by japan
default tablespace users
temporary tablespace temp;
grant connect,resource to japan;
create table france.t1(
id number primary key,
text varchar2(80));
create table usa.t1(
id number primary key,
text varchar2(80));
create table japan.t1(
id number primary key,
text varchar2(80));
insert into france.t1(id, text)
values (1,'Text 1');
insert into france.t1(id, text)
values (2,'Text 2');
insert into usa.t1
(select * from france.t1);
insert into japan.t1
(select * from france.t1);
commit;
Note:
Assurez-vous que votre base de données est en mode ARCHIVELOG
Etape 2: Préparer le dictionnaire multi-version et les copies des tables
Cette étape permet aux process de capture de maintenir un dictionnaire de données et ainsi de construire les LCR à partir des redologs, même après qu’un objet ait été supprimé du dictionnaire de données original. Utilisez DBMS_CAPTURE_ADM.BUILD comme ci-dessous :
var first_scn number;
set serveroutput on
DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/
First SCN Value = 2260357
Préparez ensuite les 3 tables pour leur instantiation. Cela permet d’alimenter le MVDD et aussi d’assurer qu’il n’y a pas de changement en cours sur ces tables. De cette manière, toutes les modifications qui feront l’objet d’un « commit » après l’instantiation seront bien capturées, puisque le 1er SCN sera avant cette étape de préparation. En outre, vous noterez que le « supplemental logging » n’est pas positionné au niveau de la base de données. Les commandes ci-dessous ajoutent les groupes de supplemental logging sur les tables. Ces groupes incluent les clés primaires des 3 tables; pour effectuer cette opération, utilisez le script ci-dessous :
exec dbms_capture_adm.prepare_table_instantiation(-
table_name=>'france.t1', supplemental_logging=>'keys');
exec dbms_capture_adm.prepare_table_instantiation(-
table_name=>'usa.t1', supplemental_logging=>'keys');
exec dbms_capture_adm.prepare_table_instantiation(-
table_name=>'japan.t1', supplemental_logging=>'keys');
Etape 3: Créer l’administrateur et la file d’attente Oracle Streams
Il faut ensuite créer une file d’attente Oracle Streams pour servir de lien entre les process de capture et d’apply. Puisque nous sommes dans la même base de données, une seule file d’attente suffit. Pour faciliter l’administration et permettre certaines opérations comme les Tablespace Point In Time Recovery, créez un tablespace et un utilisateur séparés :
connect / as sysdba
CREATE TABLESPACE streams_tbs
DATAFILE '/u01/app/oracle/oradata/BLACK/streams_tbs.dbf' size 25M
AUTOEXTEND ON MAXSIZE 256M;
CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;
grant dba to strmadmin;
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/
Etape 4: Créer le process de capture et les règles associées
Pour que la configuration Streams N-Way soit « la plus simple possible », il faut limiter le nombre de composants. Cet exemple n’a qu’une seule base de données; nous allons utiliser un même processus de capture pour les 3 tables. Pour cela :
- Créer un process de capture
- Ajouter une règle d’inclusion pour chaque table:
- Seuls les ordres de manipulation de données (DML) sont inclus
- Les modifications avec des « tags » qui ne sont pas NULL ne sont pas inclus pour qu’une modification générée par un process Streams ne soit pas re-capturée
Le script ci-dessous effectue ces opérations :
connect strmadmin/strmadmin
accept first_scn prompt "Enter the First SCN of the Capture: "
Enter the First SCN of the Capture: 2260357
var first_scn number;
exec :first_scn:=&&first_scn
BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'streams_capture',
rule_set_name => NULL,
source_database => 'BLACK',
use_database_link => false,
first_scn => :first_scn,
logfile_assignment => 'implicit');
END;
/
col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11
select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;
CAPTURE_NAME QUEUE_NAME FIRST_SCN START_SCN RULE_SET_N
--------------- ------------- ------------- ------------- ----------
STREAMS_CAPTURE STREAMS_QUEUE 2260357 2260357
set serveroutput on
DECLARE
type tn is varray(3) of varchar2(100);
ln tn:=tn('france.t1','usa.t1','japan.t1');
BEGIN
for i in ln.first..ln.last loop
dbms_output.put_line('Source is: '||ln(i));
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => ln(i),
streams_type => 'capture',
streams_name => 'streams_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => 'BLACK',
inclusion_rule => true);
end loop;
END;
/
Source is: france.t1
Source is: usa.t1
Source is: japan.t1
set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15
select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES;
STREAMS_NAME STREAMS_T TABLE_OWNE TABLE_NAME RULE_TYP RULE_NAME
---------------- --------- ---------- --------------- -------- ---------
STREAMS_CAPTURE CAPTURE USA T1 DML T118
STREAMS_CAPTURE CAPTURE FRANCE T1 DML T117
STREAMS_CAPTURE CAPTURE JAPAN T1 DML T119
Etape 5: Créer les composants « Apply »
Contrairement à la capture, vous devez créer des process « Apply » séparés pour chacune des tables. En effet, si chaque changement est capturé qu’une seule fois, il est appliqué sur plusieurs tables. Dans le script qui suit, un process d’apply est créé par schéma et avec ce process 2 règles qui lui indique de traiter les évènements capturés sur les tables des autres schémas :
connect strmadmin/strmadmin
set serveroutput on
DECLARE
type t_n is varray(3) of varchar2(100);
l_n t_n:=t_n('france','usa','japan');
BEGIN
for i in l_n.first..l_n.last loop
for j in l_n.first..l_n.last loop
if (l_n(i)!=l_n(j)) then
dbms_output.put_line('APPLY '||l_n(i)||'_apply'||
' applies '||l_n(j)||'.t1');
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => l_n(j)||'.t1',
streams_type => 'apply',
streams_name => l_n(i)||'_apply',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => 'BLACK',
inclusion_rule => true);
end if;
end loop;
end loop;
END;
/
APPLY france_apply applies usa.t1
APPLY france_apply applies japan.t1
APPLY usa_apply applies france.t1
APPLY usa_apply applies japan.t1
APPLY japan_apply applies france.t1
APPLY japan_apply applies usa.t1
col apply_name format a13
col queue_name format a13
col rule_set_name format a11
col tag format a4
select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode,
apply_tag tag
from dba_apply;
APPLY_NAME QUEUE_NAME RULE_SET_NA STATUS MESSAGE_DE TAG
------------- ------------- ----------- -------- ---------- ----
JAPAN_APPLY STREAMS_QUEUE RULESET$_27 ENABLED CAPTURED 00
USA_APPLY STREAMS_QUEUE RULESET$_24 ENABLED CAPTURED 00
FRANCE_APPLY STREAMS_QUEUE RULESET$_21 ENABLED CAPTURED 00
Etape 6: Changer les noms des schémas dans les LCR
Si la base de données est identique pour les 3 tables, les schémas sont différents. Il faut donc qu’un changement effectué sur USA.T1 soit capturé puis appliqué à FRANCE.T1 et JAPAN.T1. Le script ci-dessous ajoute donc les règles de transformation côté apply pour changer les noms des schémas :
connect strmadmin/strmadmin
select rule_owner
, rule_name
, streams_type
, streams_name
, table_name
, table_owner
from dba_streams_table_rules
where streams_type='APPLY';
set serveroutput on
DECLARE
type t_n is varray(3) of varchar2(100);
l_n t_n:=t_n('france','usa','japan');
v_rulename varchar2(30);
BEGIN
for i in l_n.first..l_n.last loop
for j in l_n.first..l_n.last loop
if (l_n(i)!=l_n(j)) then
dbms_output.put_line('APPLY: '||
l_n(i)||'_apply for CAPTURE OF '||
l_n(j)||'.t1');
select rule_name
into v_rulename
from dba_streams_table_rules
where streams_type='APPLY'
and streams_name=upper(l_n(i))||'_APPLY'
and table_name='T1'
and table_owner=upper(l_n(j));
dbms_output.put_line(rpad(' ',10)||'=>'||v_rulename);
dbms_streams_adm.rename_schema(
rule_name => v_rulename,
from_schema_name => l_n(j),
to_schema_name => l_n(i),
step_number => 0,
operation => 'add');
end if;
end loop;
end loop;
END;
/
col rule_name format A6
col from_schema_name format a6
col to_schema_name format a12
select rule_name,
transform_type,
from_schema_name,
to_schema_name,
declarative_type
from dba_streams_transformations;
RULE_N TRANSFORM_TYPE FROM_S TO_SCHEMA_NA DECLARATIVE_T
------ -------------------------- ------ ------------ -------------
T120 DECLARATIVE TRANSFORMATION USA FRANCE RENAME SCHEMA
T123 DECLARATIVE TRANSFORMATION FRANCE USA RENAME SCHEMA
T122 DECLARATIVE TRANSFORMATION JAPAN FRANCE RENAME SCHEMA
T125 DECLARATIVE TRANSFORMATION JAPAN USA RENAME SCHEMA
T126 DECLARATIVE TRANSFORMATION FRANCE JAPAN RENAME SCHEMA
T128 DECLARATIVE TRANSFORMATION USA JAPAN RENAME SCHEMA
Etape 7: Instancier les tables
Pour les raisons expliquées dans l’étape 1 de cet article, les tables sont déjà instanciées et aucune modification n’a été effectuée depuis le début de la configuration. Quoiqu’il en soit, si vous le savez, Streams ne le sait pas encore ! Il faut donc lui indiquer un SCN d’instantiation pour que les process d’apply sachent à partir de quel SCN les modifications doivent être appliquées. Pour cela, il suffit d’utiliser le SCN courant de la base de données :
connect / as sysdba col apply_scn format 999999999999 select dbms_flashback.get_system_change_number apply_scn from dual; APPLY_SCN --------- 2269379 accept instantiation_scn prompt "Enter the Instantiation: " Enter the Instantiation: 2269379
Vous pouvez alors utiliser ce SCN avec la procédure dbms_apply_adm.set_table_instantiation_scn pour chacune des 3 tables sources :
DECLARE
type t_n is varray(3) of varchar2(100);
l_n t_n:=t_n('france','usa','japan');
BEGIN
for i in l_n.first..l_n.last loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => l_n(i)||'.t1',
source_database_name => 'BLACK',
instantiation_scn => &&instantiation_scn);
end loop;
end;
/
col SOURCE_DATABASE format a6
col OBJECT format a10
col INSTANTIATION_SCN format 999999999999
SOURCE OBJECT INSTANTIATION_SCN
------ ---------- -----------------
BLACK FRANCE.T1 2269379
BLACK USA.T1 2269379
BLACK JAPAN.T1 2269379
Etape 8: Démarrer les process de capture et d’apply
Tout est fini, vous pouvez offrir l’accès aux tables et démarrer les différents process :
exec dbms_capture_adm.start_capture('streams_capture');
exec dbms_apply_adm.start_apply('usa_apply');
exec dbms_apply_adm.start_apply('france_apply');
exec dbms_apply_adm.start_apply('japan_apply');
Etape 9: Tester la réplication
Maintenant arrive le moment que vous apprécierez le plus de cet article… Vous pourrez tester la configuration et vérifier que la réplication fonctionne comme promis :
insert into france.t1(id, text)
values (3,'Text 3');
commit;
pause
col id format 99
col text format a6
select id,
text
from usa.t1;
ID TEXT
--- ------
1 Text 1
2 Text 2
3 Text 3
select id,
text
from japan.t1;
ID TEXT
--- ------
1 Text 1
2 Text 2
3 Text 3
insert into usa.t1(id, text)
values (4,'Text 4');
insert into japan.t1(id, text)
values (5,'Text 5');
commit;
select id,
text
from france.t1
order by 1;
ID TEXT
-- ------
1 Text 1
2 Text 2
3 Text 3
4 Text 4
5 Text 5
Vous ne pourrez sans doute pas résister à l’envie de constater ce qui arrive lorsque ce qui doit absolument être évité dans une configuration comme celle-ci arrive; Vous pouvez créer un conflit d’unicité :
insert into usa.t1(id, text)
values (6,'Text 6 Usa');
insert into japan.t1(id, text)
values (6,'Text 6 Japan');
commit;
select id,
text
from france.t1
order by 1;
ID TEXT
-- ------
1 Text 1
2 Text 2
3 Text 3
4 Text 4
5 Text 5
col id format 99
col text format a12
select id,
text
from usa.t1
order by 1;
ID TEXT
--- ----------
1 Text 1
2 Text 2
3 Text 3
4 Text 4
5 Text 5
6 Text 6 Usa
select id,
text
from japan.t1
order by 1;
ID TEXT
--- ------------
1 Text 1
2 Text 2
3 Text 3
4 Text 4
5 Text 5
6 Text 6 Japan
col error_message format a60
select APPLY_NAME, ERROR_MESSAGE
from dba_apply;
APPLY_NAME ERROR_MESSAGE
------------- --------------------------------------------------
JAPAN_APPLY ORA-26714: User error encountered while applying
USA_APPLY ORA-26714: User error encountered while applying
FRANCE_APPLY ORA-26714: User error encountered while applying
select APPLY_NAME, ERROR_MESSAGE
from dba_apply_error;
APPLY_NAME ERROR_MESSAGE
------------- ------------------------------------------------------------
FRANCE_APPLY ORA-00001: unique constraint (FRANCE.SYS_C004474) violated
USA_APPLY ORA-00001: unique constraint (USA.SYS_C004475) violated
JAPAN_APPLY ORA-00001: unique constraint (JAPAN.SYS_C004476) violated
Note:
Si vous utilisiez 3 bases de données distinctes, vous obtiendriez un résultat différent que celui-ci et ce même si vous utilisez une transaction distribuée pour effectué le premier changement. Quoiqu’il en soit, vous aurez toujours une erreur !
Etape 10: Arrêter et supprimer la configuration Streams
La dernière étape, comme souvent, consiste à nettoyez la configuration; de cette manière, vous pourrez recommencer autant de fois que vous le voulez :
connect / as sysdba
exec dbms_capture_adm.stop_capture('streams_capture');
exec dbms_apply_adm.stop_apply('usa_apply');
exec dbms_apply_adm.stop_apply('france_apply');
exec dbms_apply_adm.stop_apply('japan_apply');
exec dbms_apply_adm.delete_all_errors('usa_apply')
exec dbms_apply_adm.delete_all_errors('france_apply')
exec dbms_apply_adm.delete_all_errors('japan_apply')
exec dbms_capture_adm.drop_capture('streams_capture');
exec dbms_apply_adm.drop_apply('usa_apply');
exec dbms_apply_adm.drop_apply('france_apply');
exec dbms_apply_adm.drop_apply('japan_apply');
DECLARE
type t_n is varray(3) of varchar2(100);
l_n t_n:=t_n('france','usa','japan');
BEGIN
for i in l_n.first..l_n.last loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => l_n(i)||'.t1',
source_database_name => 'BLACK',
instantiation_scn => null);
end loop;
end;
/
exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);
drop user strmadmin cascade;
drop tablespace streams_tbs
including contents and datafiles;
drop user france cascade;
drop user usa cascade;
drop user japan cascade;




Enfin quelque chose de bien expliqué. de bien commenter avec une approche simplifié et compréhensible.
Dommage que ca ne fonctionne pas jusqu’au bout
J’ai un petit pb .si :Une propagation envoie les événements d’une file d’attente à une autre, que ces files se trouvent dans la même base de données ou dans deux bases différentes he bien je n’ai a ce niveau rien
Si non tous est bon.
Alors si vous avez une piste pour allez jusqu’au bout de la démo
Merci
Il y a une seule file d’attente dans cette demo! Ca me parait un peu injuste de dire que la démo ne « fonctionne pas jusqu’au bout » dans votre contexte qui est visiblement différent de celui exposé ici.
Pour aller plus loin, la doc 11.2 est plutôt bien faites avec un tas d’exemples. Sinon, je vous conseille les articles sur wedostreams.blogspot.com bien écrit également (ce sont les miens!); ça serait bien le diable si vous ne trouvez pas déjà votre bonheur.
Sinon, il faudrait dire un peu plus que : « cet artcle ne fonctionne pas avec 2 files d’attentes ».