Sanic中添加基于Cron表达式的协程定时任务
需求
从页面上触发某一个接口之后,要在sanic里面添加一个计划任务。
例如:从页面上填写一个con表达式,后台根据此cron表达式定时执行任务
'*/1 * * * *' # 这个是每1分钟执行一次的表达式
需要用到的python包有:Sanic,Apscheduler
示例代码
import time
from sanic import Sanic
from sanic.response import json
app = Sanic("App Name")
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
def cron_job():
print('打印当前时间:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
@app.route('/')
async def index(request):
# generate a URL for the endpoint `post_handler`
print("浏览器里打开一下首页")
scheduler = AsyncIOScheduler()
scheduler.add_job(cron_job, CronTrigger.from_crontab('*/1 * * * *'))
scheduler.start()
return json({"hello": "thank you"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)