PostgreSQL - 实现可靠的队列

问题描述 投票:0回答:2

我正在尝试使用 postgres 数据库实现一个具有多个写入器和多个读取器的可靠队列。当队列读取器扫描表并在读取后提交正在进行的事务时,如何避免丢失行。

我们有一个读取器使用“检查点”时间批量选择行,其中每个批次获取上一个批次中最后一个时间戳之后的行,并且我们丢失了行。 (原因:时间戳值基于 INSERT 发生的时间 (00.00.00)。在负载较重时,如果事务需要较长时间,那么它会在 10 秒后 (00.00.10) 插入,读者将错过这一行(row1) 如果它在这 10 秒内读取并找到其 INSERT 时间晚于 row1 (00.00.05) 的行。问题的完整描述与本博客中所写的类似。http: //blog.thefourthparty.com/stopping-time-in-postgresql/)

上下文相关的先前问题:Postgres LISTEN/NOTIFY - 低延迟,实时?

更新:我已将问题从单一读者更新为多个读者。读者阅读的顺序很重要。

postgresql concurrency queue
2个回答
7
投票

考虑到多个读者,有必要控制每个读者已经收到的记录。

此外,据说该命令也是将记录发送给读者的条件。因此,如果其他事务在较早的事务之前提交,我们必须“停止”并在提交后再次发送记录,以维持发送给读取器的记录的顺序。

也就是说,检查实施情况:

-- lets create our queue table 
drop table if exists queue_records cascade;
create table if not exists queue_records 
(
  cod serial primary key,
  date_posted timestamp default timeofday()::timestamp,
  message text
);


-- lets create a table to save "checkpoints" per reader_id
drop table if exists queue_reader_checkpoint cascade;
create table if not exists queue_reader_checkpoint 
(
  reader_id text primary key,
  last_checkpoint numeric
);



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
RETURNS SETOF queue_records AS
$BODY$
DECLARE
    vLAST_CHECKPOINT    numeric;
    vCHECKPOINT_EXISTS  integer;
    vRECORD         queue_records%rowtype;
BEGIN

    -- let's get the last record sent to the reader 
    SELECT  last_checkpoint
    INTO    vLAST_CHECKPOINT
    FROM    queue_reader_checkpoint
    WHERE   reader_id = pREADER_ID;

    -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
    -- sets it to the last cod from queue. It means that reader will get records from now on.
    if (vLAST_CHECKPOINT is null) then
        -- sets the flag indicating the reader does not have any checkpoint recorded
        vCHECKPOINT_EXISTS = 0;
        -- gets the very last commited record
        SELECT  MAX(cod)
        INTO    vLAST_CHECKPOINT
        FROM    queue_records;
    else
        -- sets the flag indicating the reader already have a checkpoint recorded
        vCHECKPOINT_EXISTS = 1; 
    end if;

    -- now let's get the records from the queue one-by-one 
    FOR vRECORD IN 
            SELECT  *
            FROM    queue_records
            WHERE   COD > vLAST_CHECKPOINT 
            ORDER   BY COD
    LOOP

        -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
        if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then

            -- let's save the last record read
            vLAST_CHECKPOINT = vRECORD.COD;

            -- and return it
            RETURN NEXT vRECORD;

        -- but, if it is not, then is out of order
        else
            -- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
            -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
            exit;
        end if;
    END LOOP;


    -- now we have to persist the last record read to be retrieved on next call
    if (vCHECKPOINT_EXISTS = 0) then
        INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
    else        
        UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
    end if; 
end;
$BODY$ LANGUAGE plpgsql VOLATILE;

0
投票

另一种选择是使用 https://github.com/tembo-io/pgmq(需要管理员访问权限才能安装 PostgreSQL 扩展)

© www.soinside.com 2019 - 2024. All rights reserved.