标签存档: python

python定时任务

APScheduler

APScheduler(Python化的Cron)使用总结

简介

APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。官方文档:https://apscheduler.readthedocs.io/en/latest/userguide.html#basic-concepts

APScheduler安装

方法一:使用pip安装

$ pip install apscheduler
方法二:如果pip不起作用,可以从pypi上下载最新的源码包(https://pypi.python.org/pypi/APScheduler/)进行安装:

$ python setup.py install
APScheduler组件

triggers(触发器): 触发器中包含调度逻辑,每个作业都由自己的触发器来决定下次运行时间。除了他们自己初始配置意外,触发器完全是无状态的。

job stores(作业存储器):存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中。当作业被保存到一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化。作业存储器不能共享调度器。

executors(执行器):处理作业的运行,他们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

schedulers(调度器):配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。根据不同的应用场景可以选用不同的调度器,可选的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7种。

调度器

BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。
BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)。
AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。
作业存储器

如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)

执行器

对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。

触发器

当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:

date 一次性指定日期
interval 在某个时间范围内间隔多长时间执行一次
cron 和Linux crontab格式兼容,最为强大
date

最基本的一种调度,作业只会执行一次。它的参数如下:

run_date (datetime|str) – 作业的运行日期或时间
timezone (datetime.tzinfo|str) – 指定时区
举个栗子:

2016-12-12运行一次job_function

sched.add_job(job_function, ‘date’, run_date=date(2016, 12, 12), args=[‘text’])

2016-12-12 12:00:00运行一次job_function

sched.add_job(job_function, ‘date’, run_date=datetime(2016, 12, 12, 12, 0, 0), args=[‘text’])
interval

间隔调度,参数如下:

weeks (int) – 间隔几周
days (int) – 间隔几天
hours (int) – 间隔几小时
minutes (int) – 间隔几分钟
seconds (int) – 间隔多少秒
start_date (datetime|str) – 开始日期
end_date (datetime|str) – 结束日期
timezone (datetime.tzinfo|str) – 时区
举个栗子:

每两个小时调一下job_function

sched.add_job(job_function, ‘interval’, hours=2)
cron

参数如下:

year (int|str) – 年,4位数字
month (int|str) – 月 (范围1-12)
day (int|str) – 日 (范围1-31)
week (int|str) – 周 (范围1-53)
day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – 时 (范围0-23)
minute (int|str) – 分 (范围0-59)
second (int|str) – 秒 (范围0-59)
start_date (datetime|str) – 最早开始日期(包含)
end_date (datetime|str) – 最晚结束时间(包含)
timezone (datetime.tzinfo|str) – 指定时区
举个栗子:

job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行

sched.add_job(job_function, ‘cron’, month=’6-8,11-12′, day=’3rd fri’, hour=’0-3′)

截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function

sched.add_job(job_function, ‘cron’, day_of_week=’mon-fri’, hour=5, minute=30, end_date=’2016-12-31′)

配置调度程序

在应用程序中使用默认作业存储和默认执行程序运行BackgroundScheduler的例子:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
# Initialize the rest of the application here, or before the scheduler initialization

这将生成一个名为“default”的MemoryJobStore和名为“default”的ThreadPoolExecutor的BackgroundScheduler,默认最大线程数为10。

如果不满足于当前配置,如希望使用两个执行器有两个作业存储器,并且还想要调整新作业的默认值并设置不同的时区,可按如下配置:

复制代码
from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

# 配置作业存储器
jobstores = {
‘mongo’: MongoDBJobStore(),
‘default’: SQLAlchemyJobStore(url=’sqlite:///jobs.sqlite’)
}

配置执行器,并设置线程数

executors = {
‘default’: ThreadPoolExecutor(20),
‘processpool’: ProcessPoolExecutor(5)
}
job_defaults = {
‘coalesce’: False, # 默认情况下关闭新的作业
‘max_instances’: 3 # 设置调度程序将同时运行的特定作业的最大实例数3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
复制代码

启动调度器

启动调度器只需要调用start()方法,对于除BlockingScheduler以外的调度程序,此调用将立即返回,您可以继续应用程序的初始化过程,可能会将作业添加到调度程序。

对于BlockingScheduler,只需在完成初始化步骤后调用start()

scheduler.start()

添加作业

方法一:调用add_job()方法

最常见的方法,add_job()方法返回一个apscheduler.job.Job实例,您可以稍后使用它来修改或删除该作业。

方法二:使用装饰器scheduled_job()

此方法主要是方便的声明在应用程序运行时不会改变的作业

删除作业

方法一:通过作业ID或别名调用remove_job()删除作业

方法二:通过add_job()返回的job实例调用remove()方法删除作业

举个栗子:

复制代码

实例删除

job = scheduler.add_job(myfunc, ‘interval’, minutes=2)
job.remove()
# id删除
scheduler.add_job(myfunc, ‘interval’, minutes=2, id=’my_job_id’)
scheduler.remove_job(‘my_job_id’)
复制代码

暂停和恢复作业

可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复作业:

apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

获取作业列表

要获得计划作业的机器可处理列表,可以使用get_jobs()方法。 它将返回一个Job实例列表。 如果您只对特定作业存储中包含的作业感兴趣,则将作业存储别名作为第二个参数。

为了方便起见,可以使用print_jobs()方法,它将打印格式化的作业列表,触发器和下次运行时间。

修改作业属性

您可以通过调用apscheduler.job.Job.modify()或modify_job()来修改除id以外的任何作业属性。

job.modify(max_instances=6, name=’Alternate name’)

关闭调度程序

默认情况下,调度程序关闭其作业存储和执行程序,并等待所有当前正在执行的作业完成,wait=False参数可选,代表立即停止,不用等待。

scheduler.shutdown(wait=False)
附:1、定时任务运行脚本小例子:

复制代码
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler() # 后台运行

# 设置为每日凌晨00:30:30时执行一次调度程序
@scheduler.scheduled_job(“cron”, day_of_week=’*’, hour=’1′, minute=’30’, second=’30’)
def rebate():
print “schedule execute”
sys_logging.debug(“statistic scheduler execute success” + datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”))

if name == ‘main‘:
try:
scheduler.start()
sys_logging.debug(“statistic scheduler start success”)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
sys_logging.debug(“statistic scheduler start-up fail”)
复制代码

python3基础10-数据可视化

python3基础9-测试

python3基础8-文件和异常

#coding:utf-8
#write file 覆盖之前的文件
#with open("1.txt","w") as b:
#  b.write("today is a good day")
#write file 在之前的文件后面新加
#with open("1.txt","a") as b:
#  b.write("\ntoday is a ok day")
#读取和写入模式打开,覆盖之前的内容并且第一行留空
with open("1.txt","r+") as b:
    b.write("\ntoday is a r day")

#read file
#默认为只读r
with open("1.txt") as a:
    '''
    c = a.read()
    print(c.rstrip())
    '''
    '''
    for line in a:
        print(line.rstrip())
    '''
    lines = a.readlines()
    for line in lines:
        print(line.rstrip())

#json数据存储
import json
numbers = [2, 3, 5, 7, 11, 13]
filename = 'numbers.json'
with open(filename,'w') as f_obj:
    json.dump(numbers, f_obj)

with open(filename) as f_obj:
    numbers = json.load(f_obj)
    print(numbers)

#异常
try:
    print(5/0)
except ZeroDivisionError:
    print("zero")

try:
    print(5/0)
except ZeroDivisionError:
#什么也不做
    pass

try:
    print(5/2)
except ZeroDivisionError:
    print("zero")
else:
    print("ok")

filename = 'alice.txt'
try:
    with open(filename) as f_obj:
        contents = f_obj.read()
except FileNotFoundError:
    msg = "Sorry, the file " + filename + " does not exist."
    print(msg)

python3基础7-模块

python的模块和函数命名规则推荐用小写字母+下划线

新建一个food.py的模块,里面定义一个make_food函数

#coding:utf-8
#允许汉语注释
#定义函数
def make_food(a="fruit"):
    print(a)

在相同目录下建立making_food.py

导入整个模块

#coding:utf-8
#允许汉语注释
#导入整个模块,引用方法:module_name.function_name()
import food
food.make_food("apple")
food.make_food("rice")

导入特定函数

#from module_name import function_name
from food import make_food
make_food("orange")

使用as 给函数指定别名

from food import make_food as mf
mf('apple')

使用as 给模块指定别名

import food as f
f.make_food('apple')

导入模块中的所有函数(不推荐)

from food import *
make_food('apple')

python3基础6-函数

#coding:utf-8
#允许汉语注释
#定义函数
def greet(a1,a2='t3'):
    print("a1:"+a1+",a2:"+a2)
#位置实参
greet("t1","t2")
#关键字实参
greet(a2='t2',a1='t1')
#使用默认值时,在形参列表中必须先列出没有默认值的形参,再列出有默认值的实参
greet('t6')
#返回值
def he(a,b):
    return a+b
print(he(1,2))
#传递任意数量的实参,如果要让函数接受不同类型的实参,必须在函数定义中将接纳任意数量实参的形参放在最后
def test(*a):
    print(a)
test(1)
test(1,2)
test(1,2,3)

#传递任意数量的键值对
def test2(**user_info):
    for key,value in user_info.items():
        print(key,value)

test2(a="1")
test2(a="1",b="2")
test2(a="1",b="2",c='3')

python3基础5-用户输入和while循环

#coding:utf-8
#允许汉语注释
#输入和while循环
message = input("tell me sth:")
print(message)
#字符串转换成int
height = int(message)
print(height)
while height!=5:
    t=input("guess: ")
    print(t)
    height = int(t)
print("ok")

python3基础4-字典

#coding:utf-8
#允许汉语注释
#添加字典元素
a= {'color': 'green', 'points': 5}
print(a)
a['x']=1
a['y']=2
a['z']=1
print(a)
#删除键—值对
del a['points']
print(a)
#遍历所有的键—值对
for key,value in a.items():
    print(key,value)

for key1,value1 in a.items():
    print(key1,value1)

for key3 in a.keys():
    print(key3)

for v3 in a.values():
    print(str(v3))
print("\n\n")

#删除重复值
print('del multi\n')
for v3 in set(a.values()):
    print(v3)

python3基础3-if操作

#coding:utf-8
#允许汉语注释
#if
test = ['mushrooms', 'mushrooms', 'pineapple']
if test[0]==test[1]:
    print("ok")
else:
    print("no ok")

#检查某个元素是否在list里面
t = 'pineapple' in test
if t==True:
    print("true")

if 'pineapple1' not in test:
    print("true")

#检查list是否为空
if test:
    print("no empty")
else:
    print("empty")

age = 18
if age < 4:#在条件测试的格式设置方面,PEP 8提供的唯一建议是,在诸如== 、>= 和<= 等比较运算符两边各添加一个空格

    print("Your admission cost is $0.")
elif age < 18:
    print("Your admission cost is $5.")
else:
    print("Your admission cost is $10.")


python3基础2-列表操作

列表操作

#coding:utf-8
#允许汉语注释
#for
pt=["haha1","hshs3","dertery"]
for item in pt:
    print(item)
    print(item+"ok")
print("for end")

for value in range(1,6):
    print(value)
#使用range()创建数字列表
numbers=list(range(1,6))
print(numbers)
#使用函数range() 时,还可指定步长。例如,下面的代码打印1~10内的偶数
even_numbers = list(range(2,11,2))
print(even_numbers)

#平方数列表
squares = []
for value in range(1,11):
    square = value**2
    squares.append(square)
print(squares)

#平方数列表简化
squares = []
for value in range(1,11):
    squares.append(value**2)
print(squares)

#平方数列表超级简化,挺奇怪的语法
squares = [value**2 for value in range(1,11)]
print(squares)

print(min(squares))
print(max(squares))
print(sum(squares))

#切片
players = ['charles', 'martina', 'michael', 'florence', 'eli']
print(players[0:3]) #第一个元素和最后一个元素的索引,与函数range() 一样,Python在到达你指定的第二个索引前面的元素后停止
#如果你没有指定第一个索引,Python将自动从列表开头开始
print(players[:4])
#要让切片终止于列表末尾,也可使用类似的语法
print(players[2:])
#打印最后三名队员的名字
print(players[-3:])

#列表拷贝
my_foods = ['pizza', 'falafel', 'carrot cake']
friend_foods = my_foods[:] #new空间
#friend_foods = my_foods #仅仅拷贝了索引
my_foods.append('cannoli')
friend_foods.append('ice cream')
print("My favorite foods are:")
print(my_foods)
print("\nMy friend's favorite foods are:")
print(friend_foods)

#不可变的列表被称为元组
dimensions = (200, 50)#元组看起来犹如列表,但使用圆括号而不是方括号来标识
print(dimensions[0])
print(dimensions[1])
#请访问https://python.org/dev/peps/pep-0008/ ,阅读PEP 8格式设置指南