PostgreSQL中StartLogStreamer分析

本篇內(nèi)容主要講解“PostgreSQL中StartLogStreamer分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“PostgreSQL中StartLogStreamer分析”吧!

我們提供的服務(wù)有:網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、臨安ssl等。為數(shù)千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的臨安網(wǎng)站制作公司

本節(jié)簡單介紹了PostgreSQL的備份工具pg_basebackup源碼中實際執(zhí)行備份邏輯的BaseBackup中對WAL數(shù)據(jù)進(jìn)行備份的實現(xiàn)函數(shù)StartLogStreamer.

一、數(shù)據(jù)結(jié)構(gòu)

logstreamer_param
WAL data streamer參數(shù).

typedef struct
{
     ////后臺連接
    PGconn     *bgconn;
    //開始位置
    XLogRecPtr  startptr;
    //目錄或者tar文件,依賴于使用的模式
    char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode */
    //系統(tǒng)標(biāo)識符
    char       *sysidentifier;
    //時間線
    int         timeline;
} logstreamer_param;

StreamCtl
接收xlog流數(shù)據(jù)時的全局參數(shù)

/*
 * Global parameters when receiving xlog stream. For details about the individual fields,
 * see the function comment for ReceiveXlogStream().
 * 接收xlog流數(shù)據(jù)時的全局參數(shù).
 * 每個域字段的詳細(xì)解釋,參見ReceiveXlogStream()函數(shù)注釋.
 */
typedef struct StreamCtl
{
    //streaming的開始位置
    XLogRecPtr  startpos;       /* Start position for streaming */
    //時間線
    TimeLineID  timeline;       /* Timeline to stream data from */
    //系統(tǒng)標(biāo)識符
    char       *sysidentifier;  /* Validate this system identifier and
                                 * timeline */
    //standby超時信息
    int         standby_message_timeout;    /* Send status messages this often */
    //是否同步(寫入時是否馬上Flush WAL data)
    bool        synchronous;    /* Flush immediately WAL data on write */
    //在已歸檔的數(shù)據(jù)中標(biāo)記segment為已完成
    bool        mark_done;      /* Mark segment as done in generated archive */
    //刷新到磁盤上以確保數(shù)據(jù)的一致性狀態(tài)(是否已刷新到磁盤上)
    bool        do_sync;        /* Flush to disk to ensure consistent state of
                                 * data */
    //在返回T時停止streaming
    stream_stop_callback stream_stop;   /* Stop streaming when returns true */
    //如有效,監(jiān)測該socket中的輸入并檢查stream_stop()的返回
    pgsocket    stop_socket;    /* if valid, watch for input on this socket
                                 * and check stream_stop() when there is any */
    //如何寫WAL
    WalWriteMethod *walmethod;  /* How to write the WAL */
    //附加到部分接受文件的后綴
    char       *partial_suffix; /* Suffix appended to partially received files */
    //使用的replication slot,如無則為NULL
    char       *replication_slot;   /* Replication slot to use, or NULL */
} StreamCtl;

二、源碼解讀

StartLogStreamer
StartLogStreamer用于在備份時初始化后臺進(jìn)程用于接收WAL.接收進(jìn)程將創(chuàng)建自己的數(shù)據(jù)庫連接以并行的方式對文件進(jìn)行streaming復(fù)制.

/*
 * Initiate background process for receiving xlog during the backup.
 * The background stream will use its own database connection so we can
 * stream the logfile in parallel with the backups.
 * 在備份時初始化后臺進(jìn)程用于接收WAL.
 * 后臺stream進(jìn)程將用自己的數(shù)據(jù)庫連接以使以并行的方式stream文件.
 */
static void
StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
{
    //參數(shù)
    logstreamer_param *param;
    uint32      hi,
                lo;//高位/低位
    char        statusdir[MAXPGPATH];
    param = pg_malloc0(sizeof(logstreamer_param));
    param->timeline = timeline;
    param->sysidentifier = sysidentifier;
    /* Convert the starting position */
    //轉(zhuǎn)換開始位置(高低位轉(zhuǎn)換)
    if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)
    {
        fprintf(stderr,
                _("%s: could not parse write-ahead log location \"%s\"\n"),
                progname, startpos);
        exit(1);
    }
    //開始位置,轉(zhuǎn)換為64bit的地址
    param->startptr = ((uint64) hi) << 32 | lo;
    /* Round off to even segment position */
    //按segment取整
    param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);
#ifndef WIN32
    //WIN32使用的代碼
    /* Create our background pipe */
    if (pipe(bgpipe) < 0)
    {
        fprintf(stderr,
                _("%s: could not create pipe for background process: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
#endif
    /* Get a second connection */
    //獲取第二個連接
    param->bgconn = GetConnection();
    if (!param->bgconn)
        /* Error message already written in GetConnection() */
        exit(1);
    /* In post-10 cluster, pg_xlog has been renamed to pg_wal */
    //在PG 10,pg_xlog已命名為pg_wal
    snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
             basedir,
             PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
             "pg_xlog" : "pg_wal");
    /* Temporary replication slots are only supported in 10 and newer */
    //臨時復(fù)制slots只在PG10+支持
    if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
        temp_replication_slot = false;
    /*
     * Create replication slot if requested
     * 如要求,則創(chuàng)建復(fù)制slot
     */
    //static char *replication_slot = NULL;
    //static bool temp_replication_slot = true;
    if (temp_replication_slot && !replication_slot)
        //創(chuàng)建replication slot
        replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
    if (temp_replication_slot || create_slot)
    {
        //創(chuàng)建replication slot
        if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
                                   temp_replication_slot, true, true, false))
            exit(1);
        if (verbose)
        {
            //顯示診斷信息
            if (temp_replication_slot)
                fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
                        progname, replication_slot);
            else
                fprintf(stderr, _("%s: created replication slot \"%s\"\n"),
                        progname, replication_slot);
        }
    }
    if (format == 'p')
    {
        /*
         * Create pg_wal/archive_status or pg_xlog/archive_status (and thus
         * pg_wal or pg_xlog) depending on the target server so we can write
         * to basedir/pg_wal or basedir/pg_xlog as the directory entry in the
         * tar file may arrive later.
         * 基于目標(biāo)服務(wù)器創(chuàng)建pg_wal/archive_status或pg_xlog/archive_status,
         * 這樣可以寫入到basedir/pg_wal 貨 basedir/pg_xlog,可作為后續(xù)訪問的tar文件目錄條目
         */
        snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
                 basedir,
                 PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
                 "pg_xlog" : "pg_wal");
        if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)
        {
            fprintf(stderr,
                    _("%s: could not create directory \"%s\": %s\n"),
                    progname, statusdir, strerror(errno));
            exit(1);
        }
    }
    /*
     * Start a child process and tell it to start streaming. On Unix, this is
     * a fork(). On Windows, we create a thread.
     * 啟動子進(jìn)程開始streaming.
     * 在UNIX平臺,是一個fork進(jìn)程,在Windows平臺,創(chuàng)建線程.
     */
#ifndef WIN32
    //UNIX:fork進(jìn)程
    bgchild = fork();
    if (bgchild == 0)
    {
        //這是子進(jìn)程,返回0
        /* in child process */
        //啟動新進(jìn)程
        exit(LogStreamerMain(param));
    }
    else if (bgchild < 0)
    {
        fprintf(stderr, _("%s: could not create background process: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
    /*
     * Else we are in the parent process and all is well.
     * 在父進(jìn)程中,返回的bgchild是子進(jìn)程PID.
     */
    atexit(kill_bgchild_atexit);
#else                           /* WIN32 */
    //WIN32:創(chuàng)建線程
    bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
    if (bgchild == 0)
    {
        fprintf(stderr, _("%s: could not create background thread: %s\n"),
                progname, strerror(errno));
        exit(1);
    }
#endif
}

LogStreamerMain
WAL流復(fù)制主函數(shù),用于fork后的子進(jìn)程調(diào)用

static int
LogStreamerMain(logstreamer_param *param)
{
    StreamCtl   stream;//接收xlog流數(shù)據(jù)時的全局參數(shù)
    in_log_streamer = true;
    //初始化StreamCtl結(jié)構(gòu)體
    MemSet(&stream, 0, sizeof(stream));
    stream.startpos = param->startptr;
    stream.timeline = param->timeline;
    stream.sysidentifier = param->sysidentifier;
    stream.stream_stop = reached_end_position;
#ifndef WIN32
    stream.stop_socket = bgpipe[0];
#else
    stream.stop_socket = PGINVALID_SOCKET;
#endif
    stream.standby_message_timeout = standby_message_timeout;
    stream.synchronous = false;
    stream.do_sync = do_sync;
    stream.mark_done = true;
    stream.partial_suffix = NULL;
    stream.replication_slot = replication_slot;
    if (format == 'p')
        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
    else
        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
    //接收數(shù)據(jù)
    if (!ReceiveXlogStream(param->bgconn, &stream))
        /*
         * Any errors will already have been reported in the function process,
         * but we need to tell the parent that we didn't shutdown in a nice
         * way.
         * 在函數(shù)執(zhí)行過程中出現(xiàn)的錯誤已通過警告的方式發(fā)出,
         * 但仍需要告知父進(jìn)程不能優(yōu)雅的關(guān)閉本進(jìn)程.
         */
        return 1;
    if (!stream.walmethod->finish())
    {
        fprintf(stderr,
                _("%s: could not finish writing WAL files: %s\n"),
                progname, strerror(errno));
        return 1;
    }
    //結(jié)束連接
    PQfinish(param->bgconn);
    //普通文件格式
    if (format == 'p')
        FreeWalDirectoryMethod();
    else
        FreeWalTarMethod();
    //是否內(nèi)存
    pg_free(stream.walmethod);
    return 0;
}

三、跟蹤分析

備份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

啟動gdb跟蹤

[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done.
(gdb) b StartLogStreamer
Breakpoint 1 at 0x403e6b: file pg_basebackup.c, line 555.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Password: 
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/57000060 on timeline 16
pg_basebackup: starting background WAL receiver
Breakpoint 1, StartLogStreamer (startpos=0x7fffffffdf60 "0/57000060", timeline=16, 
    sysidentifier=0x61f1a0 "6666964067616600474") at pg_basebackup.c:555
555     param = pg_malloc0(sizeof(logstreamer_param));
(gdb)

輸入?yún)?shù)
startpos=0x7fffffffdf60 “0/57000060”,
timeline=16,
sysidentifier=0x61f1a0 “6666964067616600474”
構(gòu)造參數(shù)

(gdb) n
556     param->timeline = timeline;
(gdb) 
557     param->sysidentifier = sysidentifier;
(gdb) 
560     if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)
(gdb) 
567     param->startptr = ((uint64) hi) << 32 | lo;
(gdb) p hi
$1 = 0
(gdb) p lo
$2 = 1459617888
(gdb) n
569     param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz);
(gdb) n
573     if (pipe(bgpipe) < 0)
(gdb) p *param
$3 = {bgconn = 0x0, startptr = 1459617792, xlog = '\000' <repeats 1023 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)

建立連接,創(chuàng)建replication slot

(gdb) n
583     param->bgconn = GetConnection();
(gdb) 
584     if (!param->bgconn)
(gdb) 
591              PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
(gdb) 
589     snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
(gdb) 
595     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
(gdb) 
601     if (temp_replication_slot && !replication_slot)
(gdb) 
602         replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
(gdb) 
603     if (temp_replication_slot || create_slot)
(gdb) 
605         if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
(gdb) 
609         if (verbose)
(gdb) 
611             if (temp_replication_slot)
(gdb) 
612                 fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
(gdb) 
pg_basebackup: created temporary replication slot "pg_basebackup_59378"
620     if (format == 'p')
(gdb) 
(gdb) n
630                  PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
(gdb) 
628         snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",

創(chuàng)建備份目錄

(gdb) 
633         if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST)
(gdb) p *param
$4 = {bgconn = 0x62a280, startptr = 1459617792, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb) n
647     bgchild = fork();
(gdb) 
#############
[xdb@localhost backup]$ ls
pg_wal

fork進(jìn)程,父進(jìn)程返回子進(jìn)程的PID

(gdb) n
647     bgchild = fork();
(gdb) n
Detaching after fork from child process 43001.
648     if (bgchild == 0)
(gdb) p bgchild
$5 = 43001
(gdb)

子進(jìn)程(PID=43001)

[xdb@localhost backup]$ ps -ef|grep 43001
xdb      43001 42820  1 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[xdb@localhost backup]$ ps -ef|grep 192.168.26.25
xdb      42820 42756  0 11:48 pts/1    00:00:00 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
xdb      43001 42820  0 11:54 pts/1    00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

完成調(diào)用

(gdb) n
653     else if (bgchild < 0)
(gdb) 
672 }
(gdb) 
BaseBackup () at pg_basebackup.c:1937
1937        for (i = 0; i < PQntuples(res); i++)
(gdb)

pg_wal目錄中的數(shù)據(jù)

[xdb@localhost backup]$ ls -l ./pg_wal/
total 16388
-rw-------. 1 xdb xdb 16777216 Mar 18 11:54 000000100000000000000057
-rw-------. 1 xdb xdb      217 Mar 18 11:54 00000010.history
drwx------. 2 xdb xdb       35 Mar 18 11:54 archive_status
[xdb@localhost backup]$

到此,相信大家對“PostgreSQL中StartLogStreamer分析”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

分享名稱:PostgreSQL中StartLogStreamer分析
分享URL:http://muchs.cn/article0/gpheio.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計、App開發(fā)、軟件開發(fā)云服務(wù)器、網(wǎng)站導(dǎo)航企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)