CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TIMEZONE = 'Europe/London'
INSTALLED_APPS = [
...
'django_celery_results',
...
]
# Celery Results settings
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
from django.core.mail import send_mail
from celery import shared_task
@shared_task
def send_email_task(email, subject, message):
send_mail(
subject=subject,
message=message,
from_email='your_email@example.com',
recipient_list=[email],
fail_silently=False,
)
from rest_framework.views import APIView
from rest_framework.response import Response
from celery_app.tasks import send_email_task
class SendEmailView(APIView):
def post(self, request):
email = request.data.get('email')
subject = request.data.get('subject')
message = request.data.get('message')
send_email_task.delay(email, subject, message)
return Response({'message': 'Email sent successfully'})
from django.urls import path
from .views import SendEmailView
urlpatterns = [
path('send-email', SendEmailView.as_view(), name='send-email'),
]