Suponga que necesita procesar una gran cantidad (no, no esa ... GRANDE) de registros en PostgreSQL para calcular algunos agregados. En el artículo anterior , analizamos varias opciones sobre cómo se puede organizar esto, y en este artículo veremos cómo no bloquear a nadie especialmente , incluido el "flujo entrante" de datos.
Por ejemplo, puede ser recalcular saldos y mantener ventas consolidadas de mercancías con sus envíos constantes, o sumar saldos y rotaciones para cuentas contables, con cambios masivos en las transacciones, u otra cosa ... En cualquier sistema de gestión habrá una diapositiva , y VLSI tampoco es una excepción.
Pero todas estas situaciones tienen un punto en común: el número de cambios es mucho mayor que el número de agregados objetivo. Por ejemplo: miles de mercancías, cada una con decenas de miles de envíos por día.
En otras consideraciones, nos basaremos en este modelo de "Amazon con mercancías".
Objetivo: ventas diarias consolidadas
Queremos tener los agregados de ventas desglosados por producto / día / cantidad .
En concreto, en este caso, realizaremos agregados "directamente en la base de datos" para poder recibirlos de forma rápida y holística para varios informes.
, , , - ClickHouse, . , , , , , , , ...
- ( ), 2PC- , - .
, - , , - , . , "" , .
?.. ...
, , "" - - , - - , "" , "" .
- "", - "--".
""
"". , - , .
"" . INSERT, UPDATE DELETE
, "" - INSERT
. , PostgreSQL , - unique-.
, " " , - . , - , "" flow-.
"" ""
flow- , / "" "- - ".
, , . , --, , .
, , , "" - … fail, :
DELETE FROM flow WHERE (it, dt) = (1, '2018-07-29') RETURNING *;
, - " ". , 1K/, 10K/.
:
SET statement_timeout = 1000;
, ! , , - flow- , . …
, "" , . " ", .
, flow , , "" .
DECLARE curs CURSOR FOR SELECT ctid, * FROM flow WHERE (it, dt) = (1, '2018-07-29') FOR UPDATE;
-- , ,
FETCH %d FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
flow ctid - "" .
SAVEPOINT
, "". %d FETCH? - , - ... ?
PostgreSQL " " SAVEPOINT/ROLLBACK TO, "" .
:
, ( - , ).
.
/ - COMMIT' .
, - COMMIT'.
, - , "" ( , ).
, !
BEGIN;
DECLARE curs CURSOR FOR SELECT ctid, * FROM flow WHERE (it, dt) = (1, '2018-07-29') FOR UPDATE;
FETCH 1 FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
-- processing
INSERT INTO agg ...
SAVEPOINT _1;
FETCH 2 FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
-- processing
INSERT INTO agg ...
SAVEPOINT _2;
FETCH 4 FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
-- processing...
INSERT INTO agg ...
-- oops! timeout exception!
ROLLBACK TO _2;
CLOSE curs;
COMMIT;
. , , !