웹 애플리케이션을 개발하다 보면 이메일 발송, 대용량 파일 처리, 외부 API 호출과 같이 시간이 오래 걸리는 작업들을 처리해야 하는 상황이 자주 발생합니다. 이러한 작업들을 동기적으로 처리하게 되면 사용자는 작업이 완료될 때까지 기다려야 하며, 이는 사용자 경험을 저하시키는 요인이 됩니다.
지난 3년간 다양한 규모의 Django 프로젝트를 진행하면서 비동기 작업 처리의 중요성을 절실히 체감했고, 그 과정에서 Celery를 활용한 효율적인 비동기 작업 처리 방법을 터득하게 되었습니다. 이번 글에서는 제가 실무에서 직접 경험하고 적용한 Django와 Celery를 활용한 비동기 작업 처리 방법을 공유하고자 합니다.
Django와 Celery: 최적의 비동기 작업 처리 조합
Django는 강력한 웹 프레임워크이지만, 기본적으로 동기적인 방식으로 작동합니다. 따라서 시간이 오래 걸리는 작업이 있을 경우, 해당 작업이 완료될 때까지 다른 요청을 처리하지 못하는 문제가 발생합니다. 이를 해결하기 위해 Celery라는 분산 태스크 큐 시스템을 활용할 수 있습니다.
Celery는 메시지 브로커를 통해 작업을 큐에 넣고, 워커 프로세스가 이 작업을 비동기적으로 처리하는 방식으로 동작합니다. 이를 통해 시간이 오래 걸리는 작업을 백그라운드에서 처리하고, 웹 서버는 계속해서 다른 요청을 처리할 수 있게 됩니다.
프로젝트 환경 구성하기
Django와 Celery를 활용한 비동기 작업 처리 시스템을 구축하기 위해 필요한 기본 환경을 설정해 보겠습니다. 먼저 필요한 패키지들을 설치합니다.
pip install django celery redis
이 예제에서는 메시지 브로커로 Redis를 사용하겠습니다. Redis는 메모리 기반의 데이터 저장소로, 메시지 브로커 역할을 효율적으로 수행할 수 있습니다.
Django 프로젝트 설정
Django 프로젝트에 Celery를 통합하기 위해 필요한 설정을 진행하겠습니다. 먼저 프로젝트 내에 celery.py 파일을 생성합니다.
# myproject/celery.py
import os
from celery import Celery
# Django 설정 모듈을 기본값으로 설정
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 문자열로 등록된 이유는 Celery Worker가 Windows에서도 작동하게 하기 위함
app.config_from_object('django.conf:settings', namespace='CELERY')
# 등록된 앱에서 task 자동 탐색
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
다음으로, Django 프로젝트의 __init__.py 파일에 Celery 앱을 import하여 Django가 시작될 때 Celery 앱도 함께 로드되도록 설정합니다.
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
마지막으로, Django 설정 파일(settings.py)에 Celery 관련 설정을 추가합니다.
# myproject/settings.py
# Celery 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'
실무에서 자주 발생하는 비동기 처리 시나리오
이론적인 내용은 여기까지 하고, 이제 실무에서 자주 발생하는 비동기 처리 시나리오와 그 해결 방법에 대해 알아보겠습니다.
1. 대량 이메일 발송
웹 애플리케이션에서 대량의 이메일을 발송해야 하는 경우가 많습니다. 예를 들어, 뉴스레터 발송이나 사용자 알림 등이 이에 해당합니다. 이메일 발송 작업을 동기적으로 처리하면 모든 이메일이 발송될 때까지 사용자는 기다려야 하며, 이는 사용자 경험을 저하시킵니다.
Celery를 활용하면 이메일 발송 작업을 비동기적으로 처리할 수 있습니다. 먼저, 앱 내에 tasks.py 파일을 생성하고 이메일 발송 작업을 정의합니다.
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(subject, message, from_email, recipient_list):
send_mail(
subject,
message,
from_email,
recipient_list,
fail_silently=False,
)
return f"이메일이 성공적으로 발송되었습니다: {recipient_list}"
이제 뷰에서 이 작업을 호출하여 비동기적으로 이메일을 발송할 수 있습니다.
# myapp/views.py
from django.shortcuts import render
from django.http import JsonResponse
from .tasks import send_email_task
def newsletter_send_view(request):
if request.method == 'POST':
subject = request.POST.get('subject')
message = request.POST.get('message')
from_email = 'noreply@example.com'
# 구독자 목록 가져오기 (예시)
subscribers = ['user1@example.com', 'user2@example.com', ...]
# 비동기 작업 실행
for email in subscribers:
send_email_task.delay(subject, message, from_email, [email])
return JsonResponse({'status': 'success', 'message': '뉴스레터 발송이 시작되었습니다.'})
return render(request, 'newsletter_form.html')
여기서 delay()
메서드는 Celery에게 작업을 비동기적으로 실행하도록 지시합니다. 이 방식을 사용하면 뉴스레터 발송 요청을 받자마자 사용자에게 응답을 보내고, 실제 이메일 발송은 백그라운드에서 진행됩니다.
2. 대용량 파일 처리
대용량 파일을 처리해야 하는 경우도 비동기 처리가 필요한 대표적인 시나리오입니다. 예를 들어, 사용자가 업로드한 CSV 파일을 파싱하여 데이터베이스에 저장하는 작업이나, 이미지 리사이징 작업 등이 이에 해당합니다.
다음은 CSV 파일을 파싱하여 데이터베이스에 저장하는 비동기 작업의 예시입니다.
# myapp/tasks.py
import csv
from celery import shared_task
from .models import Product
@shared_task
def process_csv_file(file_path):
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
Product.objects.create(
name=row['name'],
price=float(row['price']),
description=row['description']
)
return f"{file_path} 파일 처리가 완료되었습니다."
이 작업을 뷰에서 호출하는 방법은 다음과 같습니다.
# myapp/views.py
import os
from django.shortcuts import render
from django.http import JsonResponse
from django.conf import settings
from .tasks import process_csv_file
def upload_csv_view(request):
if request.method == 'POST' and request.FILES.get('csv_file'):
csv_file = request.FILES['csv_file']
# 파일 저장
file_path = os.path.join(settings.MEDIA_ROOT, csv_file.name)
with open(file_path, 'wb+') as destination:
for chunk in csv_file.chunks():
destination.write(chunk)
# 비동기 작업 실행
process_csv_file.delay(file_path)
return JsonResponse({'status': 'success', 'message': '파일 처리가 시작되었습니다.'})
return render(request, 'upload_form.html')
3. 외부 API 호출
외부 API를 호출하는 작업도 비동기 처리가 필요한 경우가 많습니다. 특히 여러 API를 순차적으로 호출해야 하거나, API 응답 시간이 느린 경우에는 더욱 그렇습니다.
다음은 외부 결제 API를 호출하고 결과를 처리하는 비동기 작업의 예시입니다.
# myapp/tasks.py
import requests
from celery import shared_task
from .models import Payment
@shared_task
def process_payment(payment_id, amount, user_id):
# 결제 정보 가져오기
payment = Payment.objects.get(id=payment_id)
# 외부 API 호출
try:
response = requests.post(
'https://api.payment-gateway.com/v1/charge',
json={
'amount': amount,
'currency': 'KRW',
'user_id': user_id,
'description': f'Payment #{payment_id}'
},
headers={'Authorization': 'Bearer your-api-key'}
)
response_data = response.json()
# 결제 결과 업데이트
payment.transaction_id = response_data.get('transaction_id')
payment.status = response_data.get('status')
payment.save()
return f"결제 {payment_id}가 성공적으로 처리되었습니다."
except Exception as e:
# 오류 처리
payment.status = 'failed'
payment.error_message = str(e)
payment.save()
return f"결제 {payment_id} 처리 중 오류가 발생했습니다: {str(e)}"
이 작업을 뷰에서 호출하는 방법은 다음과 같습니다.
# myapp/views.py
from django.shortcuts import render
from django.http import JsonResponse
from .models import Payment
from .tasks import process_payment
def payment_view(request):
if request.method == 'POST':
amount = float(request.POST.get('amount'))
user_id = request.user.id
# 결제 정보 생성
payment = Payment.objects.create(
user_id=user_id,
amount=amount,
status='pending'
)
# 비동기 작업 실행
process_payment.delay(payment.id, amount, user_id)
return JsonResponse({
'status': 'success',
'message': '결제가 진행 중입니다.',
'payment_id': payment.id
})
return render(request, 'payment_form.html')
Celery 워커 실행 및 모니터링
Celery 작업을 실행하기 위해서는 Celery 워커를 실행해야 합니다. 워커는 큐에서 작업을 가져와 실행하는 역할을 합니다.
celery -A myproject worker -l info
이 명령어를 실행하면 Celery 워커가 시작되고, 큐에 들어오는 작업을 처리하기 시작합니다.
또한, Celery 작업을 모니터링하기 위해 Flower라는 도구를 사용할 수 있습니다. Flower는 웹 기반의 Celery 모니터링 도구로, 작업의 상태를 실시간으로 확인할 수 있습니다.
pip install flower
celery -A myproject flower --port=5555
이 명령어를 실행하면 http://localhost:5555에서 Flower 대시보드에 접속할 수 있습니다.
실무에서 겪은 문제와 해결 방법
지금까지 Django와 Celery를 활용한 비동기 작업 처리 방법에 대해 알아보았습니다. 이제 실무에서 자주 겪는 문제와 그 해결 방법에 대해 공유하겠습니다.
1. 작업 실패 처리
비동기 작업이 실패할 경우, 이를 적절히 처리하는 것이 중요합니다. Celery에서는 작업 실패 시 자동으로 재시도하도록 설정할 수 있습니다.
# myapp/tasks.py
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
@shared_task(bind=True, max_retries=3, retry_backoff=True)
def risky_task(self, arg1, arg2):
try:
# 위험한 작업 수행
result = some_risky_operation(arg1, arg2)
return result
except Exception as exc:
try:
# 작업 재시도
self.retry(exc=exc, countdown=60) # 1분 후 재시도
except MaxRetriesExceededError:
# 최대 재시도 횟수 초과
# 실패 처리 로직
return f"작업이 실패했습니다: {str(exc)}"
여기서 bind=True
는 작업 인스턴스 자체를 첫 번째 인수로 받도록 설정하는 옵션이며, max_retries=3
은 최대 재시도 횟수를, retry_backoff=True
는 지수적으로 증가하는 재시도 간격을 설정합니다.
2. 작업 진행 상황 추적
오래 걸리는 작업의 경우, 사용자에게 작업 진행 상황을 알려주는 것이 좋습니다. Celery에서는 update_state
메서드를 사용하여 작업 진행 상황을 업데이트할 수 있습니다.
# myapp/tasks.py
from celery import shared_task
@shared_task(bind=True)
def long_running_task(self, total_items):
for i in range(total_items):
# 작업 수행
process_item(i)
# 진행 상황 업데이트
progress = (i + 1) / total_items * 100
self.update_state(state='PROGRESS', meta={'progress': progress})
return {'status': 'COMPLETE'}
사용자에게 진행 상황을 보여주기 위해, 작업 ID를 사용하여 작업 상태를 확인하는 엔드포인트를 만들 수 있습니다.
# myapp/views.py
from django.http import JsonResponse
from celery.result import AsyncResult
def get_task_status(request, task_id):
task = AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'status': '작업이 대기 중입니다.'
}
elif task.state == 'PROGRESS':
response = {
'state': task.state,
'status': '작업이 진행 중입니다.',
'progress': task.info.get('progress', 0)
}
elif task.state == 'SUCCESS':
response = {
'state': task.state,
'status': '작업이 완료되었습니다.',
'result': task.result
}
else:
response = {
'state': task.state,
'status': '작업이 실패했습니다.',
'error': str(task.result)
}
return JsonResponse(response)
3. 정기적인 작업 예약
일부 작업은 정기적으로 실행해야 할 필요가 있습니다. 예를 들어, 매일 밤 데이터 백업이나 주간 리포트 생성 등이 이에 해당합니다. Celery Beat는 이러한 정기적인 작업을 예약하는 기능을 제공합니다.
먼저, Django 설정 파일에 Celery Beat 설정을 추가합니다.
# myproject/settings.py
# Celery Beat 설정
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'daily-backup': {
'task': 'myapp.tasks.daily_backup',
'schedule': crontab(hour=2, minute=0), # 매일 오전 2시에 실행
},
'weekly-report': {
'task': 'myapp.tasks.generate_weekly_report',
'schedule': crontab(day_of_week=1, hour=6, minute=0), # 매주 월요일 오전 6시에 실행
},
}
그리고 해당 작업을 정의합니다.
# myapp/tasks.py
from celery import shared_task
@shared_task
def daily_backup():
# 백업 로직
return "일일 백업이 완료되었습니다."
@shared_task
def generate_weekly_report():
# 주간 리포트 생성 로직
return "주간 리포트가 생성되었습니다."
Celery Beat를 실행하려면 다음 명령어를 사용합니다.
celery -A myproject beat
마치며
지금까지 Django와 Celery를 활용한 비동기 작업 처리 방법에 대해 알아보았습니다. 비동기 작업 처리는 웹 애플리케이션의 성능과 사용자 경험을 크게 향상시킬 수 있는 중요한 기술입니다.
실무에서는 이메일 발송, 대용량 파일 처리, 외부 API 호출 등 다양한 시나리오에서 비동기 작업 처리가 필요하며, Django와 Celery를 활용하면 이러한 작업을 효율적으로 처리할 수 있습니다.
또한, 작업 실패 처리, 작업 진행 상황 추적, 정기적인 작업 예약 등의 고급 기능을 활용하면 더욱 안정적이고 유연한 비동기 작업 처리 시스템을 구축할 수 있습니다.
이 글이 Django와 Celery를 활용한 비동기 작업 처리 시스템을 구축하는 데 도움이 되기를 바랍니다. 더 자세한 내용은 Celery 공식 문서를 참고하시기 바랍니다.
Comments
Post a Comment