Django中使用Celery的方法步驟
(一)、概述
Celery是一個簡單、靈活和可靠的基于多任務的分布式系統,為運營提供用于維護此系統的工具。專注于實時處理的任務隊列,同時也支持任務的調度。執行單元為任務(task),利用多線程這些任務可以被并發的在單個或多個職程(worker)上運行。
Celery通過消息機制通信,通常通過中間人(broker)來分配和調節客戶端與職程服務器(worker)之間的通信。客戶端發送一條消息,中間人把消息分配給一個職程,最后由職程來負責執行此任務。
Celery可以有多個職程和中間人,這樣提高了高可用性和橫向的擴展能力
Celery由python語言開發,但是該協議可以用任何語言拉力實現,例如:Django中的Celery、node中的node-celery和php中的celery-php
(二)、Django中使用Celery的流程與配置
導入Celery:pip3 install Celery
在 與項目同名的目錄下 創建celery.py文件,特別注意:項目同名的目錄下
復制內容到該文件
修改兩處內容
os.environ.setdefault(’DJANGO_SETTINGS_MODULE’, ’proj.settings’)中的proj改為項目名 app = Celery(’pro’)中的pro改為項目名import osfrom celery import Celery# set the default Django settings module for the ’celery’ program.os.environ.setdefault(’DJANGO_SETTINGS_MODULE’, ’proj.settings’)app = Celery(’pro’)# Using a string here means the worker doesn’t have to serialize# the configuration object to child processes.# - namespace=’CELERY’ means all celery-related configuration keys# should have a `CELERY_` prefix.app.config_from_object(’django.conf:settings’, namespace=’CELERY’)# Load task modules from all registered Django app configs.app.autodiscover_tasks()@app.task(bind=True)def debug_task(self): print(f’Request: {self.request!r}’)
在 與項目同名的目錄下 的__init__.py文件中添加內容
# This will make sure the app is always imported when# Django starts so that shared_task will use this app.from .celery import app as celery_app__all__ = (’celery_app’,)
在settings.py文件中添加配置
CELERY_BROKER_URL:中間人url,可以配置redis或者RabbitMQ CELERY_RESULT_BACKEND:返回結果的存儲地址 CELERY_ACCEPT_CONTENT:接收內容的格式,分為兩種:json和msgpack。msgpack比json格式的數據體積更小,傳輸速度更快。 CELERY_TASK_SERIALIZER:任務載荷的序列化方式-->json CELERY_TIMEZONE CELERY_TASK_TRACK_STARTED:是否開啟任務跟蹤 CELERY_TASK_TIME_LIMIT:任務超時限制# Celery配置CELERY_BROKER_URL = env('CELERY_BROKER_URL')CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')CELERY_ACCEPT_CONTENT = ['json', 'msgpack']CELERY_TASK_SERIALIZER = 'json'CELERY_TIMEZONE = 'Asia/Shanghai'CELERY_TASK_TRACK_STARTED = TrueCELERY_TASK_TIME_LIMIT = 30 * 60
在app下創建tasks.py文件,創建發送消息功能,任務方法必須添加裝飾器:@shared_task
from rest_framework.response import Responsefrom rest_framework.generics import GenericAPIViewfrom time import sleepfrom celery import shared_taskclass TestView3(GenericAPIView): @classmethod @shared_task def sleep(self, duration): sleep(duration) return Response('成功', status=200)
創建視圖和路由
### views.pyfrom .tasks import TestView3class TestView1(GenericAPIView): def get(self, request): TestView3.sleep(10) return Response('celery實驗成功')test_view_1 = TestView1.as_view()### urls.pyfrom django.urls import pathfrom .views import ( test_view_1)urlpatterns = [ path(’celery/’, test_view_1, name='test1')]
安裝redis并啟動
啟動django項目
使用Celery命令啟動Celery服務,命令:celery -A 項目名 worker -l info,如果如下所示則為啟動成功.
celery@AppledeMacBook-Air.local v5.0.3 (singularity)Darwin-20.1.0-x86_64-i386-64bit 2020-12-05 20:52:17[config].> app: drf_email_project:0x7f84a0c4ad68.> transport: redis://127.0.0.1:6379/1%20.> results: redis://127.0.0.1:6379/2.> concurrency: 4 (prefork).> task events: OFF (enable -E to monitor tasks in this worker)[queues].> celery exchange=celery(direct) key=celery[tasks] . drf_email_project.celery.debug_task . users.tasks.sleep[2020-12-05 20:52:18,166: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1%20[2020-12-05 20:52:18,179: INFO/MainProcess] mingle: searching for neighbors[2020-12-05 20:52:19,212: INFO/MainProcess] mingle: all alone[2020-12-05 20:52:19,248: WARNING/MainProcess] /Users/apple/drf-email/lib/python3.7/site-packages/celery/fixups/django.py:204: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! leak, never use this setting in production environments!’’’)[2020-12-05 20:52:19,249: INFO/MainProces
到此這篇關于Django中使用Celery的方法步驟的文章就介紹到這了,更多相關Django使用Celery的方法步驟內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章:
