pg_cron优化案例--terminate pg_cron launcher可自动拉起
场景
在PostgreSQL中我们可以使用pg_cron来实现数据库定时任务
我有一个select 1的定时任务,每分钟触发一次
testdb=# select * from cron.job ;jobid | schedule | command | nodename | nodeport | database | username | active | jobname
-------+-------------+----------+-----------+----------+----------+----------+--------+---------------2 | */1 * * * * | select 1 | localhost | 1142 | testdb | admin | t | manual active
(1 row)
testdb=#
从执行记录来看从某个时刻开始不执行了
testdb=# select * from cron.job_run_details where jobid='2';jobid | runid | job_pid | database | username | command | status | return_message | start_time | end_time
-------+-------+---------+----------+----------+----------+-----------+----------------+-------------------------------+-------------------------------2 | 3 | 29616 | testdb | admin | select 1 | succeeded | 1 row | 2023-02-08 22:37:00.014232+08 | 2023-02-08 22:37:00.015855+082 | 4 | 29772 | testdb | admin | select 1 | succeeded | 1 row | 2023-02-08 22:38:00.010803+08 | 2023-02-08 22:38:00.012029+082 | 5 | 29995 | testdb | admin | select 1 | succeeded | 1 row | 2023-02-08 22:39:00.013508+08 | 2023-02-08 22:39:00.015362+08
(3 rows)testdb=# select now();now
-----------------------------2023-02-13 11:11:10.7302+08
(1 row)testdb=#
从日志来看是pg_cron launcher进程shutdown了,pg_cron launcher是job的调度进程,当它停止了,任务也就不调度了。
2023-02-08 22:17:04.788 CST,,,25712,,63e3aee0.6470,1,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"pg_cron scheduler started",,,,,,,,,"","pg_cron launcher",,0
2023-02-08 22:37:00.008 CST,,,25712,,63e3aee0.6470,2,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"cron job 2 starting: select 1",,,,,,,,,"","pg_cron launcher",,0
2023-02-08 22:37:00.017 CST,,,25712,,63e3aee0.6470,3,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"cron job 2 completed: 1 row",,,,,,,,,"","pg_cron launcher",,0
2023-02-08 22:38:00.006 CST,,,25712,,63e3aee0.6470,4,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"cron job 2 starting: select 1",,,,,,,,,"","pg_cron launcher",,0
2023-02-08 22:38:00.013 CST,,,25712,,63e3aee0.6470,5,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"cron job 2 completed: 1 row",,,,,,,,,"","pg_cron launcher",,0
2023-02-08 22:39:00.006 CST,,,25712,,63e3aee0.6470,6,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"cron job 2 starting: select 1",,,,,,,,,"","pg_cron launcher",,0
2023-02-08 22:39:00.017 CST,,,25712,,63e3aee0.6470,7,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"cron job 2 completed: 1 row",,,,,,,,,"","pg_cron launcher",,0
2023-02-08 22:39:54.618 CST,,,25712,,63e3aee0.6470,8,,2023-02-08 22:17:04 CST,2/0,0,LOG,00000,"pg_cron scheduler shutting down",,,,,,,,,"","pg_cron launcher",,0
总的来说就是实例未停止的情况下,pg_cron launcher shutdown导致job未调度。
分析
从代码来看当pg_cron launcher 收到SIGTERM后退出时会打印"pg_cron scheduler shutting down"这条日志。
/** PgCronLauncherMain is the main entry-point for the background worker* that performs tasks.*/
void
PgCronLauncherMain(Datum arg)
{MemoryContext CronLoopContext = NULL;struct rlimit limit;/* Establish signal handlers before unblocking signals. */pqsignal(SIGHUP, pg_cron_sighup);pqsignal(SIGINT, SIG_IGN);pqsignal(SIGTERM, pg_cron_sigterm);/* We're now ready to receive signals */BackgroundWorkerUnblockSignals();/* 省略部分代码行 *//* 当未接收到SIGTERM时一直在while循环中 */while (!got_sigterm){/* 省略部分代码行 */ }/* 那么当接收到SIGTERM时,打印日志并exit(0)退出 */ereport(LOG, (errmsg("pg_cron scheduler shutting down")));proc_exit(0);
}
由此得知,在实例shutdown或者使用select pg_terminate_backend() 终止pg_cron launcher这两种场景下会打印对应的日志,看起来我这个实例的pg_cron launcher就是被 pg_terminate_backend()函数终止了。
相比其他bgworker比如logical replication launcher,当进程被pg_terminate_backend() 终止后,postmaster会检测到并且再次拉起该进程。为什么pg_cron launcher被SIGTERM终止后,没有被再次拉起呢?
这里其实就在于对进程退出的处理不同。
可以看到PgCronLauncherMain中当接收到SIGTERM时,打印日志后proc_exit(0)退出。
而logical replication launcher这里的实现,SIGTERM注册的处理函数是die,当接收到SIGTERM信号后除了setlatch wakeup进程,还会将Interrupt的全局flag置为ture 进入CHECK_FOR_INTERRUPTS()中执行对应的报错逻辑,最终进程会走FATAL报错退出,可以看到errfinish中对于FATAL错误的处理就是调用proc_exit(1)退出进程。
注册信号处理函数
/** Main loop for the apply launcher process.*/
void
ApplyLauncherMain(Datum main_arg)
{/* 省略部分代码行 *//* Establish signal handlers. */pqsignal(SIGHUP, SignalHandlerForConfigReload);/* 注册SIGTERM处理函数为die */pqsignal(SIGTERM, die);BackgroundWorkerUnblockSignals();/* 省略部分代码行 *//* Enter main loop */for (;;){/* 省略部分代码行 *//* Wait for more work. */rc = WaitLatch(MyLatch,WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,wait_time,WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);if (rc & WL_LATCH_SET){ /* 当进程wakeup,则检测是否发生INTERRUPT */ResetLatch(MyLatch);CHECK_FOR_INTERRUPTS();}if (ConfigReloadPending){ConfigReloadPending = false;ProcessConfigFile(PGC_SIGHUP);}}/* Not reachable */
}
ProcessInterrupts中对bgworker的处理
/** ProcessInterrupts: out-of-line portion of CHECK_FOR_INTERRUPTS() macro** If an interrupt condition is pending, and it's safe to service it,* then clear the flag and accept the interrupt. Called only when* InterruptPending is true.** Note: if INTERRUPTS_CAN_BE_PROCESSED() is true, then ProcessInterrupts* is guaranteed to clear the InterruptPending flag before returning.* (This is not the same as guaranteeing that it's still clear when we* return; another interrupt could have arrived. But we promise that* any pre-existing one will have been serviced.)*/
void
ProcessInterrupts(void)
{/* OK to accept any interrupts now? */if (InterruptHoldoffCount != 0 || CritSectionCount != 0)return;InterruptPending = false;if (ProcDiePending){/* 省略部分代码行 */else if (IsBackgroundWorker)ereport(FATAL,(errcode(ERRCODE_ADMIN_SHUTDOWN),errmsg("terminating background worker \"%s\" due to administrator command",MyBgworkerEntry->bgw_type)));/* 省略部分代码行 */}/* 省略部分代码行 */
}
errfinish中对于FATAL错误的处理
/** errfinish --- end an error-reporting cycle** Produce the appropriate error report(s) and pop the error stack.** If elevel, as passed to errstart(), is ERROR or worse, control does not* return to the caller. See elog.h for the error level definitions.*/
void
errfinish(const char *filename, int lineno, const char *funcname)
{/* 省略部分代码行 *//** Perform error recovery action as specified by elevel.*/if (elevel == FATAL){/** For a FATAL error, we let proc_exit clean up and exit.** If we just reported a startup failure, the client will disconnect* on receiving it, so don't send any more to the client.*/if (PG_exception_stack == NULL && whereToSendOutput == DestRemote)whereToSendOutput = DestNone;/** fflush here is just to improve the odds that we get to see the* error message, in case things are so hosed that proc_exit crashes.* Any other code you might be tempted to add here should probably be* in an on_proc_exit or on_shmem_exit callback instead.*/fflush(stdout);fflush(stderr);/** Let the statistics collector know. Only mark the session as* terminated by fatal error if there is no other known cause.*/if (pgStatSessionEndCause == DISCONNECT_NORMAL)pgStatSessionEndCause = DISCONNECT_FATAL;/** Do normal process-exit cleanup, then return exit code 1 to indicate* FATAL termination. The postmaster may or may not consider this* worthy of panic, depending on which subprocess returns it.*/proc_exit(1);}/* 省略部分代码行 */
}
在C语言中exit(0)表示的是程序正常退出,exit(1)则为异常退出。
当子进程退出时,会向父进程Postmaster发送SIGCHLD信号,postmaster注册了这个信号的信号处理函数reaper,通过waitpid去回收子进程,并做一些处理。
那么在Postmaster进程的serverLoop主循环里会检测子进程状态判断是否需要拉起子进程,以bgworker这种为例,在maybe_start_bgworkers里获取BackgroundWorkerList读取对应bgworker信息,
如果对应的bgworker是正常退出的,那么则不在这个列表中,因此不会拉起。
当bgworker是异常退出,对应信息会保留在BackgroundWorkerList里,但是当前的pid为0,因此就会将其拉起。
maybe_start_bgworkers的处理逻辑
/** If the time is right, start background worker(s).** As a side effect, the bgworker control variables are set or reset* depending on whether more workers may need to be started.** We limit the number of workers started per call, to avoid consuming the* postmaster's attention for too long when many such requests are pending.* As long as StartWorkerNeeded is true, ServerLoop will not block and will* call this function again after dealing with any other issues.*/
static void
maybe_start_bgworkers(void)
{
#define MAX_BGWORKERS_TO_LAUNCH 100int num_launched = 0;TimestampTz now = 0;slist_mutable_iter iter;/** During crash recovery, we have no need to be called until the state* transition out of recovery.*/if (FatalError){StartWorkerNeeded = false;HaveCrashedWorker = false;return;}/* Don't need to be called again unless we find a reason for it below */StartWorkerNeeded = false;HaveCrashedWorker = false;/* 这里对BackgroundWorkerList进行遍历,看是否有bgworker需要start */slist_foreach_modify(iter, &BackgroundWorkerList){RegisteredBgWorker *rw;rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur);/* 跳过pid非0的进程,这些bgworker已启动。*//* ignore if already running */if (rw->rw_pid != 0)continue;/* 省略部分代码行 *//* 这里拉起这些pid为0的bgworker */if (!do_start_bgworker(rw)){StartWorkerNeeded = true;return;}/* 省略部分代码行 */
}
按照这里的逻辑在PgCronLauncherMain中当接收到SIGTERM使用proc_exit(1)退出那就可以再次拉起了。
方案
给插件加入一个参数,打开参数后,当pg_terminate_backend() pg_cron launcher后就会自动被拉起
DefineCustomBoolVariable("cron.enable_autostart_launcher",gettext_noop("Allow postmaster to pull pg_cron launcher process when it is terminated by SIGTERM."),NULL,&EnableAutostartLauncher,false,PGC_POSTMASTER,GUC_SUPERUSER_ONLY,NULL, NULL, NULL);
对于进程退出的处理,打开参数时exit(1)
void
PgCronLauncherMain(Datum arg)
{MemoryContext CronLoopContext = NULL;struct rlimit limit;/* Establish signal handlers before unblocking signals. */pqsignal(SIGHUP, pg_cron_sighup);pqsignal(SIGINT, SIG_IGN);pqsignal(SIGTERM, pg_cron_sigterm);/* We're now ready to receive signals */BackgroundWorkerUnblockSignals();/* 省略部分代码行 *//* 当未接收到SIGTERM时一直在while循环中 */while (!got_sigterm){/* 省略部分代码行 */ }ereport(LOG, (errmsg("pg_cron scheduler shutting down")));/* Modify by Nickyong at 2023-02-13 PM *//* 如果cron.enable_autostart_launcher ='on' ,则proc_exit(1),否则 proc_exit(0) */if (EnableAutostartLauncher)proc_exit(1);elseproc_exit(0);/* End at 2023-02-13 PM */
}
验证
参数展示
testdb=# select * from pg_settings where name='cron.enable_autostart_launcher';
-[ RECORD 1 ]---+------------------------------------------------------------------------------------
name | cron.enable_autostart_launcher
setting | off
unit |
category | Customized Options
short_desc | Allow postmaster to pull pg_cron launcher process when it is terminated by SIGTERM.
extra_desc |
context | postmaster
vartype | bool
source | configuration file
min_val |
max_val |
enumvals |
boot_val | off
reset_val | off
sourcefile | /data/pg14-2debug/master/postgresql.auto.conf
sourceline | 3
pending_restart | f
默认关闭
testdb=# show cron.enable_autostart_launcher;
-[ RECORD 1 ]------------------+----
cron.enable_autostart_launcher | off
terminate pg_cron launcher后没自动拉起
testdb=# select * from pg_stat_activity where backend_type like '%pg_cron launcher%' ;
-[ RECORD 1 ]----+-------------------------------------------------------------------------------------------------
datid | 24589
datname | testdb
pid | 23893
leader_pid |
usesysid | 10
usename | postgres
application_name | pg_cron scheduler
client_addr |
client_hostname |
client_port |
backend_start | 2023-02-13 19:22:16.062689+08
xact_start |
query_start | 2023-02-13 19:23:10.023643+08
state_change | 2023-02-13 19:23:10.025066+08
wait_event_type | Extension
wait_event | Extension
state | idle
backend_xid |
backend_xmin |
query_id |
query | update cron.job_run_details set status = $1, return_message = $2, end_time = $3 where runid = $4
backend_type | pg_cron launchertestdb=# select pg_terminate_backend(23893);
-[ RECORD 1 ]--------+--
pg_terminate_backend | ttestdb=# select * from pg_stat_activity where backend_type like '%pg_cron launcher%' ;
(0 rows)testdb=#
打开参数
testdb=# show cron.enable_autostart_launcher;
-[ RECORD 1 ]------------------+---
cron.enable_autostart_launcher | on
terminate pg_cron launcher后自动拉起
testdb=# select * from pg_stat_activity where backend_type like '%pg_cron launcher%' ;
-[ RECORD 1 ]----+-------------------------------------------------------------------------------------------------
datid | 24589
datname | testdb
pid | 24125
leader_pid |
usesysid | 10
usename | postgres
application_name | pg_cron scheduler
client_addr |
client_hostname |
client_port |
backend_start | 2023-02-13 19:23:59.601739+08
xact_start |
query_start | 2023-02-13 19:24:10.019018+08
state_change | 2023-02-13 19:24:10.020397+08
wait_event_type | Extension
wait_event | Extension
state | idle
backend_xid |
backend_xmin |
query_id |
query | update cron.job_run_details set status = $1, return_message = $2, end_time = $3 where runid = $4
backend_type | pg_cron launchertestdb=# select pg_terminate_backend(24125);
-[ RECORD 1 ]--------+--
pg_terminate_backend | ttestdb=# select * from pg_stat_activity where backend_type like '%pg_cron launcher%' ;
-[ RECORD 1 ]----+------------------------------------------------------------------------------------------------------------------------------
datid | 24589
datname | testdb
pid | 24329
leader_pid |
usesysid | 10
usename | postgres
application_name | pg_cron scheduler
client_addr |
client_hostname |
client_port |
backend_start | 2023-02-13 19:24:54.976542+08
xact_start |
query_start | 2023-02-13 19:24:54.978153+08
state_change | 2023-02-13 19:24:54.981451+08
wait_event_type | Extension
wait_event | Extension
state | idle
backend_xid |
backend_xmin |
query_id |
query | update cron.job_run_details set status = 'failed', return_message = 'server restarted' where status in ('starting','running')
backend_type | pg_cron launchertestdb=#
小结
pg_cron的作者并没有说明这样设计的原因,我猜测是预留了一个可以强制停止所有job的入口。
如果job对于业务来说比较重要,希望被终止后可以自动拉起,以免job不调度造成一些损失,个人感觉可以做成参数来控制的方式。默认关闭,打开参数当pg_terminate_backend() 后可以自动拉起,虽然重启实例也能再次拉起pg_cron launcher,但并不是任何时候都可以重启实例的。