Humble Trader

Sunday, October 08, 2006

Serialised Bulk Operations

This is a demonstration of how to serialise bulk operations.

Problem:

A bulk operation takes data from a table and inserts or updates another, i.e. bulk insert or bulk update. The source table can, itself, receive updates or inserts from other processes so, in order to know what has been processed and what has not, a flag column is added to the source. This flag takes the values 'Y' (processed) and 'N' (not processed). The bulk operation identifies rows set to 'N', processes them and then flips the flag to 'Y' to indicated that it is done.

It is possible that, while the bulk process is running, other processes change or add data to the source. Should the bulk process be run by another session while one is already running, there is a chance that transactional integrity breaks.

Also, the source can contain a lot of data so performance needs to be considered.

Solution:

This solution uses SELECT ... FOR UPDATE to ensure transactional integrity, and BULK COLLECT ... LIMIT and FORALL for performance.

Example Procedure:

  • This is presented as an anonymous block but can be implemented in a package procedure.
  • Create an INDEX BY table type based on the primary key of the source table.
  • Instantate an INDEX BY table based on the type.
  • Create a local variable to hold the LIMIT (i.e. the number of rows processed in each run of the loop). This would be provisionally set to 1000 and later changed in the light of any tuning information.
  • Create a cursor to fetch the primary keys of the source into the INDEX BY table where the flag is 'N' using FOR UPDATE. This locks the rows to be processed.
  • Open the cursor. This applies the lock.
  • Loop. Start of cursor loop - this is required to loop on the LIMIT clause in the FETCH.
    • Fetch the cursor BULK COLLECT into the INDEX BY table with the LIMIT clause.
    • *** Do the bulk process *** - At this point the primary bulk process is executed. The target table is inserted into or updated based on the set of primary keys in the INDEX BY table.
    • Update the source flags. Use FORALL to update the flags based on the set of primary keys in the INDEX BY table.
    • Exit the loop when there are no more rows fetched by the cursor.
  • End; Loop. Start of cursor loop...
  • Commit work.
  • Close the cursor.
Here is a (non-workng) stub:

-- Anonymous block:
DECLARE
-- Create an INDEX BY table type based on the primary key of the source
-- table.
TYPE x_type IS TABLE OF source_table.id%TYPE INDEX BY BINARY_INTEGER;

-- Instantate an INDEX BY table based on the type.
x_table x_type;

-- Create a local variable to hold the LIMIT (i.e. the number of rows
--processed in each run of the loop).
l_limit NUMBER := 1000;

-- Create a local to count any errors.
l_errors NUMBER;

-- Create a cursor to fetch the primary keys of the source into the
-- INDEX BY table where the flag is 'N' using FOR UPDATE. This locks
-- the rows to be processed.
CURSOR r_cursor IS
SELECT id
FROM source_table
WHERE flag = 'N'
FOR UPDATE;

BEGIN
-- Open the cursor. This applies the lock.
OPEN r_cursor;

-- Loop. Start of cursor loop - this is required to loop on the LIMIT
-- clause in the FETCH.
LOOP
-- Fetch the cursor BULK COLLECT into the INDEX BY table with the
-- LIMIT clause.
FETCH r_cursor BULK COLLECT INTO x_table LIMIT l_limit;

-- *** Do the bulk process ***
-- At this point the primary bulk process is executed. The target
-- table is inserted into or updated based on the set of primary keys
-- in the INDEX BY table. The SAVE EXCEPTIONS clause ensures that all
-- bulk errors are kept.
FORALL l_i IN x_table.FIRST .. x_table.LAST SAVE EXCEPTIONS
INSERT INTO target_table
SELECT *
FROM source_table
WHERE id = x_table(l_i);

-- Update the source flags. Use FORALL to update the flags based on
-- the set of primary keys in the INDEX BY table.
FORALL l_i IN x_table.FIRST .. x_table.LAST
UPDATE source_table
SET flag = 'Y'
WHERE id = x_tabla(l_i);

-- Exit the loop when there are no more rows fetched by the cursor.
EXIT WHEN r_ids%NOTFOUND;

-- End; Loop. Start of cursor loop...
END LOOP;

-- Commit work.
COMMIT;

-- Close the cursor.
CLOSE r_ids;

-- Bulk exception handler.
EXCEPTION
WHEN others THEN

-- Commit successful work.
COMMIT;

-- Count the errors.
l_errors := SQL%BULK_EXCEPTIONS.COUNT;

FOR l_i IN 1 .. l_errors
LOOP
dbms_output.put_line('Error(id=' ||
x_table(SQL%BULK_EXCEPTIONS(l_i).ERROR_INDEX) || ') = ' ||
SQLERRM(-SQL%BULK_EXCEPTIONS(l_i).ERROR_CODE));
END LOOP;
END;
/


Testing:

There are two test scripts to be run in different sessions - these simulate different processes operating on the same object at the same time. The first script contains a delay statement. This 5 second period is to give enough time to switch sessions and start the second script:

test_script_a.sql

DROP TABLE test
/

CREATE table test
(
id NUMBER
,flag VARCHAR2(1) NOT NULL
,sid NUMBER
,fetch_loop NUMBER
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
1
,'N'
,NULL
,NULL
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
2
,'N'
,NULL
,NULL
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
3
,'N'
,NULL
,NULL
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
4
,'N'
,NULL
,NULL
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
5
,'N'
,NULL
,NULL
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
6
,'N'
,NULL
,NULL
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
7
,'Y'
,NULL
,NULL
)
/

INSERT INTO test
(
id
,flag
,sid
,fetch_loop
)
VALUES
(
8
,'Y'
,NULL
,NULL
)
/

COMMIT
/

SELECT *
FROM test
ORDER BY id
/

PROMPT Run test_script_b.sql in another session now.

DECLARE
TYPE id_type IS TABLE OF test.id%TYPE INDEX BY BINARY_INTEGER;

l_sid NUMBER;
l_fetch_loop NUMBER := 0;
l_limit NUMBER := 3;

id_tab id_type;

CURSOR r_ids IS
SELECT id
FROM test
WHERE flag = 'N'
FOR UPDATE;

BEGIN
SELECT sid
INTO l_sid
FROM v$session
WHERE audsid = SYS_CONTEXT('userenv', 'sessionid');

OPEN r_ids;

dbms_lock.sleep(5);

LOOP
l_fetch_loop := l_fetch_loop + 1;

FETCH r_ids BULK COLLECT INTO id_tab LIMIT l_limit;

FORALL l_i IN id_tab.FIRST .. id_tab.LAST
UPDATE test
SET flag = 'Y'
,sid = l_sid
,fetch_loop = l_fetch_loop
WHERE id = id_tab(l_i);

EXIT WHEN r_ids%NOTFOUND;
END LOOP;

COMMIT;

CLOSE r_ids;
END;
/




test_script_b.sql

SET SERVEROUTPUT ON SIZE 1000000

PROMPT -
PROMPT - Initialising...

UPDATE test
SET flag = 'N'
WHERE id IN (7, 8)
/

PROMPT - Done
PROMPT -

PROMPT - Current state of table:

SELECT *
FROM test
ORDER BY id
/

DECLARE
TYPE id_type IS TABLE OF test.id%TYPE INDEX BY BINARY_INTEGER;

l_sid NUMBER;
l_fetch_loop NUMBER := 0;
l_count_other_fl_1 NUMBER;
l_count_other_fl_2 NUMBER;
l_count_this_fl_1 NUMBER;
l_passed BOOLEAN := TRUE;
l_errors NUMBER;
l_limit NUMBER := 3;

id_tab id_type;

CURSOR r_ids IS
SELECT id
FROM test
WHERE flag = 'N'
FOR UPDATE;

CURSOR r_err IS
SELECT id
FROM test
WHERE flag = 'Y'
FOR UPDATE;

BEGIN
dbms_output.put_line('+- Test 1 start:');
dbms_output.put_line('| Serialised transaction:');
dbms_output.put_line('|');
dbms_output.put_line('| Expected result: 6 rows updated to ''Y'' by ' ||
'other process, 3 by fetch loop');
dbms_output.put_line('| 1 and 3 by fetch loop 2.');
dbms_output.put_line('| 2 rows updated to ''Y'' by ' ||
'this process by fetch loop 1.');
dbms_output.put_line('|');

SELECT sid
INTO l_sid
FROM v$session
WHERE audsid = SYS_CONTEXT('userenv', 'sessionid');

OPEN r_ids;

LOOP
l_fetch_loop := l_fetch_loop + 1;

FETCH r_ids BULK COLLECT INTO id_tab LIMIT l_limit;

FORALL l_i IN id_tab.FIRST .. id_tab.LAST
UPDATE test
SET flag = 'Y'
,sid = l_sid
,fetch_loop = l_fetch_loop
WHERE id = id_tab(l_i);

EXIT WHEN r_ids%NOTFOUND;
END LOOP;

COMMIT;

CLOSE r_ids;

SELECT COUNT(*)
INTO l_count_other_fl_1
FROM test
WHERE flag = 'Y'
AND sid != l_sid
AND fetch_loop = 1;

dbms_output.put_line('| Result: Updated to ''Y'' by other process by ' ||
'fetch loop 1: ' || l_count_other_fl_1);

SELECT COUNT(*)
INTO l_count_other_fl_2
FROM test
WHERE flag = 'Y'
AND sid != l_sid
AND fetch_loop = 2;

dbms_output.put_line('| Updated to ''Y'' by other process by ' ||
'fetch loop 2: ' || l_count_other_fl_2);

SELECT COUNT(*)
INTO l_count_this_fl_1
FROM test
WHERE flag = 'Y'
AND sid = l_sid
AND fetch_loop = 1;

dbms_output.put_line('| Updated to ''Y'' by this process by ' ||
'fetch loop 1: ' || l_count_this_fl_1);
dbms_output.put_line('|');

dbms_output.put_line('| +----------+');

IF l_count_other_fl_1 = 3
AND
l_count_other_fl_2 = 3
AND
l_count_this_fl_1 = 2
THEN
dbms_output.put_line('| | PASSED |');
ELSE
dbms_output.put_line('| | FAILED |');
l_passed := FALSE;
END IF;

dbms_output.put_line('| +----------+');
dbms_output.put_line('|');
dbms_output.put_line('+- Test 1 end.');

dbms_output.put_line('-');

dbms_output.put_line('+- Test 2 start:');
dbms_output.put_line('| Error handling:');
dbms_output.put_line('|');
dbms_output.put_line('| Expected result: 8 failed updates.');
dbms_output.put_line('|');

OPEN r_err;

l_fetch_loop := 0;

DELETE FROM test
WHERE id IN (2, 6);

BEGIN
LOOP
l_fetch_loop := l_fetch_loop + 1;

FETCH r_err BULK COLLECT INTO id_tab;

FORALL l_i IN id_tab.FIRST .. id_tab.LAST SAVE EXCEPTIONS
UPDATE test
SET flag = NULL
,sid = l_sid
,fetch_loop = l_fetch_loop
WHERE id = id_tab(l_i);

EXIT WHEN r_err%NOTFOUND;
END LOOP;

COMMIT;
EXCEPTION
WHEN others THEN
COMMIT;

l_errors := SQL%BULK_EXCEPTIONS.COUNT;

dbms_output.put_line('| Result: Failures: ' || l_errors);
dbms_output.put_line('|');

FOR l_i IN 1 .. l_errors
LOOP
dbms_output.put_line('| Error(id=' ||
id_tab(SQL%BULK_EXCEPTIONS(l_i).ERROR_INDEX) || ') = ' ||
SQLERRM(-SQL%BULK_EXCEPTIONS(l_i).ERROR_CODE));
END LOOP;

dbms_output.put_line('|');
END;

CLOSE r_err;

dbms_output.put_line('| +----------+');

IF l_errors = 6
THEN
dbms_output.put_line('| | PASSED |');
ELSE
dbms_output.put_line('| | FAILED |');
l_passed := FALSE;
END IF;

dbms_output.put_line('| +----------+');
dbms_output.put_line('|');
dbms_output.put_line('+- Test 1 end.');

dbms_output.put_line('-');

dbms_output.put_line('- Overall result:');
dbms_output.put_line('-');
dbms_output.put_line('- +----------+');
IF l_passed = TRUE
THEN
dbms_output.put_line('- | PASSED |');
ELSE
dbms_output.put_line('- | FAILED |');
END IF;
dbms_output.put_line('- +----------+');

dbms_output.put_line('-');
END;
/

PROMPT - Final state of table:

SELECT *
FROM test
ORDER BY id
/

DROP TABLE test
/


PL/SQL Developer

This post contains links to other posts that make up a library of PL/SQL scripts and techniques:

Sunday, October 01, 2006

Country Dimension - Process M_O2W_COUNTRIES Specification

  • Package: m_o2w_countries
  • Description: Container for all procedures and functions relating to mapping data for the Countries Dimension from ODS to WHS.
    • Procedure: map (Overload 1)
      • Description: Wrapper to do mapping with housekeeping.
      • Parameters: None.
      • Action:
        • Run initialise mapping with parameters p_mapping = 'M_O2W_COUNTRIES', run_no = local variable to capture the run number.
        • Run map (overload 2) with parameter p_run_no = the captured run number.
        • Run finalise mapping with parameter run_no = the captured run number.
    • Procedure: map (Overload 2)
      • Description: Map ODS.COUNTRIES to WHS.D_COUNTRIES.
      • Parameters:
        • p_run_no:
          • Datatype: NUMBER
          • Direction: IN
          • Description: Run number of this map run.
        • Action:
          • Insert into ODS.COUNTRIES any country in VAL.V_COUNTRIES that is not already there.
          • Update and rows in D_COUNTRIES that differ in detail from COUNTRIES.
          • Commit work.

Country Dimension - Process M_V2O_COUNTRIES Specification

  • Package: m_v2o_countries
  • Description: Container for all procedures and functions relating to mapping data for the Countries Dimension from VAL to ODS.
    • Procedure: map (Overload 1)
      • Description: Wrapper to do mapping with housekeeping.
      • Action:
        • Run initialise mapping with parameters p_mapping = 'M_V2O_COUNTRIES', run_no = local variable to capture the run number.
        • Run map (overload 2) with parameter p_run_no = the captured run number.
        • Run finalise mapping with parameter run_no = the captured run number.
    • Procedure: map (Overload 2)
      • Description: Map VAL.V_COUNTRIES to ODS.COUNTRIES.
      • Parameters:
        • p_run_no:
          • Datatype: NUMBER
          • Direction: IN
          • Description: The run number for this run.
      • Action:
        • Insert into ODS.COUNTRIES any country in VAL.V_COUNTRIES that is not already there.
        • Commit work.

Country Dimension - m_o2w_countries_test.sql

SET SERVEROUTPUT ON SIZE 1000000;

DECLARE
l_run_no NUMBER;
l_mapping_histories_count NUMBER;
l_countries_count NUMBER;
l_d_countries_count NUMBER;
l_init_d_countries_count NUMBER;
l_final_d_countries_count NUMBER;
l_iso2_code VARCHAR2(2);
l_iso3_code VARCHAR2(3);
l_un3_number VARCHAR2(3);
l_start_date DATE;
l_end_date DATE;
l_row_data VARCHAR2(4000);
l_passed BOOLEAN := TRUE;
BEGIN
-- Initialisation code.
DELETE FROM d_country;

COMMIT;

dbms_output.put_line('-');
dbms_output.put_line('- Prepare a COUNTRIES dataset by running');
dbms_output.put_line('- get_html.fetch_html, get_html.parse_html, ' ||
'm_s2s_countries.map,');
dbms_output.put_line('- m_s2v_countries.map and m_v2o_countries.');
dbms_output.put_line('-');

get_html.fetch_html('COUNTRIES');

dbms_output.put_line('- Fetch countries data done.');

get_html.parse_html;

dbms_output.put_line('- Parse countries data done.');

m_s2s_countries.map;

dbms_output.put_line('- Fetch S_COUNTRIES prepared.');

m_s2v_countries.map;

dbms_output.put_line('- Fetch V_COUNTRIES prepared.');

m_v2o_countries.map;

dbms_output.put_line('- Fetch O_COUNTRIES prepared.');

SELECT COUNT(*)
INTO l_countries_count
FROM o_countries;

dbms_output.put_line('- O_COUNTRIES row count = ' ||
l_countries_count);
dbms_output.put_line('-');

dbms_output.put_line('+- PROCEDURE m_o2w_countries.map test 1 start:');
dbms_output.put_line('| * Load fresh data to WHS.D_COUNTRY *');
dbms_output.put_line('|');

dbms_output.put_line
('| Test 1: Pass : NO PARAMETERS');
dbms_output.put_line('|');
dbms_output.put_line
('| Expected Result: Inserts a row into MAPPING_HISTORIES');
dbms_output.put_line
('| Expected Result: Inserts 240 rows into D_COUNTRY');
dbms_output.put_line('|');

m_o2w_countries.map;

SELECT MAX(run_no)
INTO l_run_no
FROM mapping_histories
WHERE mapping_library = 'M_O2W_COUNTRIES';

SELECT COUNT(*)
INTO l_mapping_histories_count
FROM mapping_histories
WHERE run_no = l_run_no;

dbms_output.put_line('| Result: MAPPING_HISTORIES row count = ' ||
l_mapping_histories_count);
dbms_output.put_line('| MAPPING_HISTORIES data :');

SELECT '| [' || run_no || ', ' || mapping_library || ', ' ||
TO_CHAR(start_tsp, 'DD-MON-YYYY HH24:MI:SS') ||
', ' || NVL(TO_CHAR(end_tsp, 'DD-MON-YYYY HH24:MI:SS'), '*NULL*') ||
']'
INTO l_row_data
FROM mapping_histories
WHERE run_no = l_run_no;

dbms_output.put_line(l_row_data);

SELECT COUNT(*)
INTO l_d_countries_count
FROM d_country;

dbms_output.put_line('| D_COUNTRY row count = ' ||
l_d_countries_count);
dbms_output.put_line('|');

dbms_output.put_line('| +----------+');
IF l_mapping_histories_count = 1
AND
l_d_countries_count = 240
THEN
dbms_output.put_line('| | PASSED |');
ELSE
dbms_output.put_line('| | FAILED |');
l_passed := FALSE;
END IF;
dbms_output.put_line('| +----------+');

dbms_output.put_line('|');
dbms_output.put_line('+- PROCEDURE m_o2w_countries.map test 1 end:');
dbms_output.put_line('-');

DELETE FROM d_country
WHERE iso2_code = 'GB';

UPDATE d_country
SET iso2_code = 'XX'
WHERE iso2_code = 'AF';

UPDATE d_country
SET iso3_code = 'XXX'
WHERE iso2_code = 'AL';

UPDATE d_country
SET un3_number = 'XXX'
WHERE iso2_code = 'DZ';

UPDATE d_country
SET start_date = '01-FEB-2006'
WHERE iso2_code = 'AS';

UPDATE d_country
SET end_date = '01-FEB-2006'
WHERE iso2_code = 'AD';

COMMIT;

dbms_output.put_line('+- PROCEDURE m_o2w_countries.map test 2 start:');
dbms_output.put_line('| * Load and update data to WHS.D_COUNTRY *');
dbms_output.put_line('|');

dbms_output.put_line
('| Test 1: Pass : NO PARAMETERS');
dbms_output.put_line('|');
dbms_output.put_line
('| Expected Result: Inserts a row into MAPPING_HISTORIES');
dbms_output.put_line
('| Expected Result: Inserts 1 row into D_COUNTRY');
dbms_output.put_line
('| Expected Result: Updates 5 rows in D_COUNTRY');
dbms_output.put_line('|');

SELECT COUNT(*)
INTO l_init_d_countries_count
FROM d_country;

dbms_output.put_line('| Initial D_COUNTRY row count = ' ||
l_init_d_countries_count);
dbms_output.put_line('| Rows to update: = ');

FOR r_row IN
(
SELECT name
,iso2_code
,iso3_code
,un3_number
,start_date
,end_date
FROM d_country
WHERE iso2_code IN ('XX', 'AL', 'DZ', 'AS', 'AD')
)
LOOP
dbms_output.put_line('| [' || r_row.name || ', ' || r_row.iso2_code ||
', ' || r_row.iso3_code || ', ' || r_row.un3_number || ', ' ||
r_row.start_date || ', ' || r_row.end_date || ']');
END LOOP;

dbms_output.put_line('|');

m_o2w_countries.map;

SELECT MAX(run_no)
INTO l_run_no
FROM mapping_histories
WHERE mapping_library = 'M_V2O_COUNTRIES';

SELECT COUNT(*)
INTO l_mapping_histories_count
FROM mapping_histories
WHERE run_no = l_run_no;

dbms_output.put_line('| Result: MAPPING_HISTORIES row count = ' ||
l_mapping_histories_count);
dbms_output.put_line('| MAPPING_HISTORIES data :');

SELECT '| [' || run_no || ', ' || mapping_library || ', ' ||
TO_CHAR(start_tsp, 'DD-MON-YYYY HH24:MI:SS') ||
', ' || NVL(TO_CHAR(end_tsp, 'DD-MON-YYYY HH24:MI:SS'), '*NULL*') ||
']'
INTO l_row_data
FROM mapping_histories
WHERE run_no = l_run_no;

dbms_output.put_line(l_row_data);

SELECT COUNT(*)
INTO l_final_d_countries_count
FROM o_countries;

dbms_output.put_line('| Final O_COUNTRIES row count = ' ||
l_final_d_countries_count);
dbms_output.put_line('| Rows updated: = ');

FOR r_row IN
(
SELECT name
,iso2_code
,iso3_code
,un3_number
,start_date
,end_date
FROM d_country
WHERE iso2_code IN ('AF', 'AL', 'DZ', 'AS', 'AD')
)
LOOP
dbms_output.put_line('| [' || r_row.name || ', ' || r_row.iso2_code ||
', ' || r_row.iso3_code || ', ' || r_row.un3_number || ', ' ||
r_row.start_date || ', ' || r_row.end_date || ']');

IF r_row.iso2_code = 'AF'
THEN
l_iso2_code := r_row.iso2_code;
END IF;

IF r_row.iso2_code = 'AL'
THEN
l_iso3_code := r_row.iso3_code;
END IF;

IF r_row.iso2_code = 'DZ'
THEN
l_un3_number := r_row.un3_number;
END IF;

IF r_row.iso2_code = 'AS'
THEN
l_start_date := r_row.start_date;
END IF;

IF r_row.iso2_code = 'AD'
THEN
l_end_date := r_row.end_date;
END IF;
END LOOP;

dbms_output.put_line('|');

dbms_output.put_line('| +----------+');
IF l_mapping_histories_count = 1
AND
l_final_d_countries_count - l_init_d_countries_count = 1
AND
l_iso2_code = 'AF'
AND
l_iso3_code = 'ALB'
AND
l_un3_number = '012'
AND
l_start_date = '01-JAN-1000'
AND
l_end_date = '31-DEC-3000'
THEN
dbms_output.put_line('| | PASSED |');
ELSE
dbms_output.put_line('| | FAILED |');
l_passed := FALSE;
END IF;
dbms_output.put_line('| +----------+');

dbms_output.put_line('|');
dbms_output.put_line('+- PROCEDURE m_o2w_countries.map test 2 end:');
dbms_output.put_line('-');

dbms_output.put_line('-');
dbms_output.put_line('- Overall result:');
dbms_output.put_line('-');
dbms_output.put_line('- +----------+');
IF l_passed = TRUE
THEN
dbms_output.put_line('- | PASSED |');
ELSE
dbms_output.put_line('- | FAILED |');
END IF;
dbms_output.put_line('- +----------+');
dbms_output.put_line('-');

END;
/