定时器

前言:

Python 提供了几种实现定时器功能的方式,可以用于在特定时间后执行函数或定期执行任务。

threading.Timer

这是 Python 标准库中提供的简单定时器,适用于在指定时间后执行一次函数,属于 threading 模块。

1
2
3
4
5
6
7
8
from threading import Timer

def hello():
print("Hello, World!")

# 5秒后执行hello函数
t = Timer(5.0, hello)
t.start() # 启动定时器

如果需要取消定时器,可以使用 cancel() 方法:

1
2
3
4
timer = threading.Timer(5.0, hello)
timer.start()
# 在 5 秒内可以取消定时器
timer.cancel()

特点:

  • 在单独的线程中运行
  • 只执行一次
  • 可以调用 cancel() 取消尚未执行的定时器

sched 模块

sched 模块提供了一个通用的调度器类,可以用于更复杂的定时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import sched
import time

def print_time(parameter='default'):
print("当前时间:", time.time(), parameter)

# 创建调度器
s = sched.scheduler(time.time, time.sleep)
# 安排事件:延迟5秒,优先级1,执行print_time函数,传递参数
s.enter(5, 1, print_time, kwargs={'parameter': 'first'})
s.enter(10, 1, print_time, kwargs={'parameter': 'second'})
# 执行所有安排的事件
s.run()

特点:

  • 更精确的时间控制
  • 可以安排多个事件
  • 可以设置优先级

time.sleep 简单定时

对于简单的定时需求,可以直接使用 time.sleep

1
2
3
4
5
import time

while True:
print("This prints once a minute")
time.sleep(60) # 暂停60秒

第三方库

schedule

schedule 是一个更高级的定时任务库,提供了更人性化的 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 需要先安装:pip install schedule
import schedule
import time


def job():
print("执行定时任务...")


# 每隔10秒执行一次
schedule.every(10).seconds.do(job)

# 每隔1分钟执行一次
schedule.every().minute.do(job)

# 每隔一小时执行一次
schedule.every().hour.do(job)

# 每天10:30执行
schedule.every().day.at("10:30").do(job)

while True:
schedule.run_pending()
time.sleep(1)

APScheduler

  • 简介

    APScheduler (Advanced Python Scheduler) 是一个强大的 Python 定时任务库,用于在指定时间执行作业或定期运行作业。它比标准库中的定时器功能更强大、更灵活。

  • 核心概念

    • 主要组件
      • 调度器 (Scheduler):负责管理作业的调度和执行
      • 作业存储 (Job Store):存储作业信息
      • 执行器 (Executor):负责执行作业
      • 触发器 (Trigger):决定作业何时运行
    • 调度器类型
      • BlockingScheduler:阻塞式调度器,适合独立脚本
      • BackgroundScheduler:后台调度器,适合非阻塞应用
      • AsyncIOScheduler:适用于 asyncio 应用
      • GeventScheduler:适用于 gevent 应用
      • TornadoScheduler:适用于 Tornado 应用
      • TwistedScheduler:适用于 Twisted 应用
  • 基本使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # 需要先安装:pip install apscheduler
    from apscheduler.schedulers.blocking import BlockingScheduler


    def my_job():
    print("APScheduler 定时任务执行!")


    # 创建调度器
    scheduler = BlockingScheduler()

    # 添加任务:每5秒执行一次
    scheduler.add_job(my_job, "interval", seconds=5)

    # 启动调度器(这会阻塞主线程)
    scheduler.start()

  • 触发器 (Triggers)

    • 日期触发器 (date)

      在特定时间点运行一次

      1
      2
      3
      4
      5
      6
      7
      8
      from datetime import datetime

      scheduler.add_job(
      my_job,
      'date',
      run_date=datetime(2023, 12, 25, 12, 0, 0),
      args=['Christmas']
      )
    • 间隔触发器 (interval)

      以固定时间间隔运行

      1
      2
      3
      4
      5
      6
      7
      8
      scheduler.add_job(
      my_job,
      'interval',
      hours=2, # 每2小时
      minutes=30, # 30分钟
      start_date='2023-01-01 09:00:00',
      end_date='2023-12-31 23:59:59'
      )
    • 日历间隔触发器 (cron)

      类似 Unix cron 的调度方式

      1
      2
      3
      4
      5
      6
      7
      scheduler.add_job(
      my_job,
      'cron',
      day_of_week='mon-fri', # 周一到周五
      hour=9, # 9点
      minute=30 # 30分
      )
  • 作业存储 (Job Stores)

    • 内置存储类型
      • MemoryJobStore (默认):内存存储,重启后丢失
      • SQLAlchemyJobStore:使用 SQL 数据库存储
      • MongoDBJobStore:使用 MongoDB 存储
      • RedisJobStore:使用 Redis 存储
    • 配置示例
      1
      2
      3
      4
      5
      6
      from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

      jobstores = {
      'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
      }
      scheduler = BlockingScheduler(jobstores=jobstores)
  • 执行器 (Executors)

    • 内置执行器
      • ThreadPoolExecutor (默认):线程池执行器
      • ProcessPoolExecutor:进程池执行器
    • 配置示例
      1
      2
      3
      4
      5
      6
      7
      from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

      executors = {
      'default': ThreadPoolExecutor(20),
      'processpool': ProcessPoolExecutor(5)
      }
      scheduler = BackgroundScheduler(executors=executors)
  • 高级功能

    • 作业管理
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      # 添加作业
      job = scheduler.add_job(my_job, 'interval', minutes=2)

      # 暂停作业
      job.pause()

      # 恢复作业
      job.resume()

      # 移除作业
      job.remove()

      # 修改作业
      job.modify(max_instances=3)
      job.reschedule('interval', minutes=5)
    • 事件监听
      1
      2
      3
      4
      5
      6
      7
      def my_listener(event):
      if event.exception:
      print("Job crashed :(")
      else:
      print("Job executed :)")

      scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    • 作业修饰器
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      from apscheduler.schedulers.background import BackgroundScheduler

      scheduler = BackgroundScheduler()

      @scheduler.scheduled_job('interval', seconds=30)
      def job1():
      print("Job 1 executed")

      @scheduler.scheduled_job('cron', hour=1)
      def job2():
      print("Job 2 executed")

      scheduler.start()
  • 实际应用示例

    • Web 应用集成 (Flask)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      from flask import Flask
      from apscheduler.schedulers.background import BackgroundScheduler

      app = Flask(__name__)
      scheduler = BackgroundScheduler()

      def scheduled_task():
      with app.app_context():
      print("Running scheduled task...")

      scheduler.add_job(scheduled_task, 'interval', minutes=1)
      scheduler.start()

      @app.route("/")
      def home():
      return "Hello, World!"

      if __name__ == "__main__":
      app.run()
    • Django 集成
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      # 在 apps.py 中
      from django.apps import AppConfig
      from apscheduler.schedulers.background import BackgroundScheduler

      class MyAppConfig(AppConfig):
      default_auto_field = 'django.db.models.BigAutoField'
      name = 'myapp'

      def ready(self):
      scheduler = BackgroundScheduler()
      scheduler.add_job(my_task, 'interval', minutes=30)
      scheduler.start()
  • 最佳实践

    • 选择合适的调度器:根据应用类型选择 Blocking/Background/AsyncIO 等调度器
    • 使用持久化存储:生产环境应使用数据库存储作业
    • 处理异常:为作业添加适当的异常处理
    • 避免长时间运行作业:长时间作业会阻塞其他作业执行
    • 考虑时区问题:明确设置时区 timezone='Asia/Shanghai'
    • 资源管理:合理配置线程/进程池大小
  • 常见问题

    • 作业不执行
      • 检查调度器是否已启动
      • 检查作业是否被暂停
      • 检查触发器配置是否正确
    • 作业重复执行
      • 设置 max_instances 限制并发实例数
      • 使用 coalesce=True 合并多次触发
    • 时区问题
      1
      2
      3
      from pytz import timezone

      scheduler = BlockingScheduler(timezone=timezone('Asia/Shanghai'))

异步定时器 (Python 3.7+)

对于异步编程,可以使用 asyncio 提供的定时器功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio


async def display_time():
print("当前时间:", asyncio.get_event_loop().time())


async def main():
# 创建一个周期性任务,每2秒执行一次
while True:
await display_time()
await asyncio.sleep(2)


# 运行异步程序
asyncio.run(main())

或者使用 call_later 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio


def callback(n):
print(f"回调函数被调用,参数: {n}")


async def main():
loop = asyncio.get_running_loop()

# 5秒后调用callback函数
loop.call_later(5, callback, "参数1")

# 等待足够时间以观察效果
await asyncio.sleep(10)


asyncio.run(main())

各种定时器的适用场景

  1. threading.Timer:适合简单的单次延迟执行任务
  2. sched:适合需要精确控制执行时间和优先级的场景
  3. schedule:适合编写易于阅读和维护的定时任务代码
  4. asyncio:适合异步编程环境中的定时任务
  5. APScheduler:适合复杂的定时任务需求,如 cron 表达式、持久化等

注意事项

  1. 定时器不是完全精确的,特别是在系统负载高时可能会有延迟
  2. 使用线程定时器时要注意线程安全问题
  3. 长时间运行的定时任务应考虑使用专门的作业调度系统
  4. 在 Web 应用中,通常使用 Celery 等分布式任务队列代替定时器
本文结束 感谢您的阅读
正在加载今日诗词....