定时器
前言:
Python 提供了几种实现定时器功能的方式,可以用于在特定时间后执行函数或定期执行任务。
threading.Timer
这是 Python 标准库中提供的简单定时器,适用于在指定时间后执行一次函数,属于 threading 模块。
1 | from threading import Timer |
如果需要取消定时器,可以使用 cancel() 方法:
1 | timer = threading.Timer(5.0, hello) |
特点:
- 在单独的线程中运行
- 只执行一次
- 可以调用
cancel()
取消尚未执行的定时器
sched
模块
sched
模块提供了一个通用的调度器类,可以用于更复杂的定时任务。
1 | import sched |
特点:
- 更精确的时间控制
- 可以安排多个事件
- 可以设置优先级
time.sleep
简单定时
对于简单的定时需求,可以直接使用 time.sleep
1 | import time |
第三方库
schedule
schedule 是一个更高级的定时任务库,提供了更人性化的 API:
1 | # 需要先安装:pip install schedule |
APScheduler
-
简介
APScheduler (Advanced Python Scheduler) 是一个强大的 Python 定时任务库,用于在指定时间执行作业或定期运行作业。它比标准库中的定时器功能更强大、更灵活。
-
核心概念
-
主要组件
- 调度器 (Scheduler):负责管理作业的调度和执行
- 作业存储 (Job Store):存储作业信息
- 执行器 (Executor):负责执行作业
- 触发器 (Trigger):决定作业何时运行
-
调度器类型
- BlockingScheduler:阻塞式调度器,适合独立脚本
- BackgroundScheduler:后台调度器,适合非阻塞应用
- AsyncIOScheduler:适用于 asyncio 应用
- GeventScheduler:适用于 gevent 应用
- TornadoScheduler:适用于 Tornado 应用
- TwistedScheduler:适用于 Twisted 应用
-
-
基本使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 需要先安装:pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print("APScheduler 定时任务执行!")
# 创建调度器
scheduler = BlockingScheduler()
# 添加任务:每5秒执行一次
scheduler.add_job(my_job, "interval", seconds=5)
# 启动调度器(这会阻塞主线程)
scheduler.start() -
触发器 (Triggers)
-
日期触发器 (date)
在特定时间点运行一次
1
2
3
4
5
6
7
8from datetime import datetime
scheduler.add_job(
my_job,
'date',
run_date=datetime(2023, 12, 25, 12, 0, 0),
args=['Christmas']
) -
间隔触发器 (interval)
以固定时间间隔运行
1
2
3
4
5
6
7
8scheduler.add_job(
my_job,
'interval',
hours=2, # 每2小时
minutes=30, # 30分钟
start_date='2023-01-01 09:00:00',
end_date='2023-12-31 23:59:59'
) -
日历间隔触发器 (cron)
类似 Unix cron 的调度方式
1
2
3
4
5
6
7scheduler.add_job(
my_job,
'cron',
day_of_week='mon-fri', # 周一到周五
hour=9, # 9点
minute=30 # 30分
)
-
-
作业存储 (Job Stores)
-
内置存储类型
- MemoryJobStore (默认):内存存储,重启后丢失
- SQLAlchemyJobStore:使用 SQL 数据库存储
- MongoDBJobStore:使用 MongoDB 存储
- RedisJobStore:使用 Redis 存储
-
配置示例
1
2
3
4
5
6from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
scheduler = BlockingScheduler(jobstores=jobstores)
-
-
执行器 (Executors)
-
内置执行器
- ThreadPoolExecutor (默认):线程池执行器
- ProcessPoolExecutor:进程池执行器
-
配置示例
1
2
3
4
5
6
7from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
scheduler = BackgroundScheduler(executors=executors)
-
-
高级功能
-
作业管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# 添加作业
job = scheduler.add_job(my_job, 'interval', minutes=2)
# 暂停作业
job.pause()
# 恢复作业
job.resume()
# 移除作业
job.remove()
# 修改作业
job.modify(max_instances=3)
job.reschedule('interval', minutes=5) -
事件监听
1
2
3
4
5
6
7def my_listener(event):
if event.exception:
print("Job crashed :(")
else:
print("Job executed :)")
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) -
作业修饰器
1
2
3
4
5
6
7
8
9
10
11
12
13from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
def job1():
print("Job 1 executed")
def job2():
print("Job 2 executed")
scheduler.start()
-
-
实际应用示例
-
Web 应用集成 (Flask)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
app = Flask(__name__)
scheduler = BackgroundScheduler()
def scheduled_task():
with app.app_context():
print("Running scheduled task...")
scheduler.add_job(scheduled_task, 'interval', minutes=1)
scheduler.start()
def home():
return "Hello, World!"
if __name__ == "__main__":
app.run() -
Django 集成
1
2
3
4
5
6
7
8
9
10
11
12# 在 apps.py 中
from django.apps import AppConfig
from apscheduler.schedulers.background import BackgroundScheduler
class MyAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'myapp'
def ready(self):
scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', minutes=30)
scheduler.start()
-
-
最佳实践
- 选择合适的调度器:根据应用类型选择 Blocking/Background/AsyncIO 等调度器
- 使用持久化存储:生产环境应使用数据库存储作业
- 处理异常:为作业添加适当的异常处理
- 避免长时间运行作业:长时间作业会阻塞其他作业执行
- 考虑时区问题:明确设置时区
timezone='Asia/Shanghai'
- 资源管理:合理配置线程/进程池大小
-
常见问题
-
作业不执行
- 检查调度器是否已启动
- 检查作业是否被暂停
- 检查触发器配置是否正确
-
作业重复执行
- 设置
max_instances
限制并发实例数 - 使用
coalesce=True
合并多次触发
- 设置
-
时区问题
1
2
3from pytz import timezone
scheduler = BlockingScheduler(timezone=timezone('Asia/Shanghai'))
-
异步定时器 (Python 3.7+)
对于异步编程,可以使用 asyncio 提供的定时器功能:
1 | import asyncio |
或者使用 call_later 方法:
1 | import asyncio |
各种定时器的适用场景
- threading.Timer:适合简单的单次延迟执行任务
- sched:适合需要精确控制执行时间和优先级的场景
- schedule:适合编写易于阅读和维护的定时任务代码
- asyncio:适合异步编程环境中的定时任务
- APScheduler:适合复杂的定时任务需求,如 cron 表达式、持久化等
注意事项
- 定时器不是完全精确的,特别是在系统负载高时可能会有延迟
- 使用线程定时器时要注意线程安全问题
- 长时间运行的定时任务应考虑使用专门的作业调度系统
- 在 Web 应用中,通常使用 Celery 等分布式任务队列代替定时器