📝 Introduction : Ce guide détaillé vous explique comment créer, configurer et exécuter plusieurs instances de Celery Beat (Instance A et Instance B) dans un même projet Django. Nous verrons comment les synchroniser via Redis/RabbitMQ et éviter les conflits d'exécution.
# requirements.txt Django==4.2.7 celery==5.3.4 django-celery-beat==2.5.0 redis==5.0.1 kombu==5.3.4 pika==1.3.2 python-decouple==3.8 psycopg2-binary==2.9.9 supervisor==4.2.5
# Installation des dépendances pip install -r requirements.txt# Installation Redis sudo apt update sudo apt install redis-server sudo systemctl start redis-server sudo systemctl enable redis-server# Installation RabbitMQ sudo apt install rabbitmq-server sudo systemctl start rabbitmq-server sudo systemctl enable rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management
my_django_project/ ├── manage.py ├── requirements.txt ├── .env ├── my_project/ │ ├── __init__.py │ ├── settings/ │ │ ├── __init__.py │ │ ├── base.py │ │ ├── redis.py │ │ ├── rabbitmq.py │ │ └── production.py │ ├── urls.py │ └── wsgi.py ├── apps/ │ ├── __init__.py │ ├── celery_app/ │ │ ├── __init__.py │ │ ├── celery.py │ │ ├── models.py │ │ ├── tasks.py │ │ ├── admin.py │ │ ├── redis_sync.py │ │ ├── rabbitmq_sync.py │ │ └── management/ │ │ └── commands/ │ │ ├── __init__.py │ │ ├── start_beat_instance.py │ │ ├── manage_instances.py │ │ └── test_sync.py │ └── notifications/ │ ├── models.py │ └── tasks.py ├── static/ ├── media/ ├── logs/ ├── scripts/ │ ├── start_instance_a.sh │ ├── start_instance_b.sh │ └── monitor_instances.sh └── docker-compose.yml
# apps/celery_app/models.py from django.db import models from django.utils import timezone import uuid import jsonclass BeatInstance(models.Model): """Modèle pour gérer les instances Beat""" INSTANCE_TYPES = [ ('instance_a', 'Instance A - Haute Priorité'), ('instance_b', 'Instance B - Basse Priorité'), ] STATUS_CHOICES = [ ('active', 'Active'), ('inactive', 'Inactive'), ('error', 'Erreur'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) name = models.CharField(max_length=100, unique=True) instance_type = models.CharField(max_length=20, choices=INSTANCE_TYPES) queue_name = models.CharField(max_length=50) status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='active') broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')], default='redis') last_heartbeat = models.DateTimeField(default=timezone.now) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) config_data = models.JSONField(default=dict, blank=True) class Meta: verbose_name = "Instance Beat" verbose_name_plural = "Instances Beat" ordering = ['-last_heartbeat'] def __str__(self): return f"{self.name} ({self.instance_type}) - {self.broker_type}" def is_online(self): """Vérifie si l'instance est en ligne""" time_diff = timezone.now() - self.last_heartbeat return time_diff.total_seconds() < 300 # 5 minutes def get_config(self): """Retourne la configuration de l'instance""" return self.config_data def set_config(self, config_dict): """Définit la configuration de l'instance""" self.config_data = config_dict self.save(update_fields=['config_data'])class BeatTaskLock(models.Model): """Modèle pour gérer les verrous des tâches""" task_name = models.CharField(max_length=255) instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE) lock_key = models.CharField(max_length=100, unique=True) broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')]) acquired_at = models.DateTimeField(auto_now_add=True) expires_at = models.DateTimeField() is_active = models.BooleanField(default=True) class Meta: verbose_name = "Verrou de Tâche Beat" verbose_name_plural = "Verrous de Tâches Beat" indexes = [ models.Index(fields=['lock_key', 'expires_at']), models.Index(fields=['instance', 'is_active']), models.Index(fields=['broker_type']), ] def __str__(self): return f"{self.task_name} - {self.instance.name} ({self.broker_type})" def is_expired(self): """Vérifie si le verrou est expiré""" return timezone.now() > self.expires_atclass BeatTaskExecution(models.Model): """Modèle pour tracer l'exécution des tâches""" task_name = models.CharField(max_length=255) instance = models.ForeignKey(BeatInstance, on_delete=models.CASCADE) started_at = models.DateTimeField(auto_now_add=True) completed_at = models.DateTimeField(null=True, blank=True) status = models.CharField( max_length=20, choices=[ ('running', 'En cours'), ('completed', 'Terminé'), ('failed', 'Échoué'), ('skipped', 'Ignoré'), ], default='running' ) result = models.TextField(blank=True) error_message = models.TextField(blank=True) execution_time = models.FloatField(null=True, blank=True) broker_type = models.CharField(max_length=10, choices=[('redis', 'Redis'), ('rabbitmq', 'RabbitMQ')]) class Meta: verbose_name = "Exécution de Tâche Beat" verbose_name_plural = "Exécutions de Tâches Beat" ordering = ['-started_at'] def __str__(self): return f"{self.task_name} - {self.instance.name} ({self.broker_type})" def mark_completed(self, result=None): """Marque la tâche comme terminée""" self.status = 'completed' self.completed_at = timezone.now() if result: self.result = str(result) if self.started_at: execution_time = (self.completed_at - self.started_at).total_seconds() self.execution_time = execution_time self.save() def mark_failed(self, error_message=None): """Marque la tâche comme échouée""" self.status = 'failed' self.completed_at = timezone.now() if error_message: self.error_message = error_message self.save()
# my_project/settings/redis.py from decouple import config import os# Configuration Redis REDIS_HOST = config('REDIS_HOST', default='localhost') REDIS_PORT = config('REDIS_PORT', default=6379) REDIS_PASSWORD = config('REDIS_PASSWORD', default='') REDIS_DB = config('REDIS_DB', default=0)# URLs Redis REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}" if REDIS_PASSWORD: REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"# Configuration Celery avec Redis CELERY_BROKER_URL = REDIS_URL CELERY_RESULT_BACKEND = REDIS_URL# Configuration Redis pour les instances Beat REDIS_BEAT_CONFIG = { 'instance_a': { 'database': 1, # DB séparée pour Instance A 'key_prefix': 'beat_a:', 'lock_timeout': 300, # 5 minutes 'heartbeat_interval': 30, # 30 secondes }, 'instance_b': { 'database': 2, # DB séparée pour Instance B 'key_prefix': 'beat_b:', 'lock_timeout': 600, # 10 minutes 'heartbeat_interval': 60, # 1 minute }, }# Configuration des queues Redis CELERY_TASK_ROUTES = { 'apps.celery_app.tasks.health_check': {'queue': 'instance_a_queue'}, 'apps.celery_app.tasks.system_monitor': {'queue': 'instance_a_queue'}, 'apps.celery_app.tasks.critical_alerts': {'queue': 'instance_a_queue'}, 'apps.celery_app.tasks.daily_cleanup': {'queue': 'instance_b_queue'}, 'apps.celery_app.tasks.weekly_reports': {'queue': 'instance_b_queue'}, 'apps.celery_app.tasks.data_export': {'queue': 'instance_b_queue'}, }# Configuration des queues from kombu import Queue CELERY_TASK_QUEUES = ( Queue('instance_a_queue', routing_key='instance_a'), Queue('instance_b_queue', routing_key='instance_b'), Queue('default', routing_key='default'), )# Configuration Beat avec Redis CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' CELERY_BEAT_SYNC_EVERY = 1# Configuration Redis pour la persistance REDIS_BEAT_SCHEDULE = { 'instance_a': { 'health-check-instance-a': { 'task': 'apps.celery_app.tasks.health_check', 'schedule': 30.0, 'options': {'queue': 'instance_a_queue'} }, 'system-monitor-instance-a': { 'task': 'apps.celery_app.tasks.system_monitor', 'schedule': 60.0, 'options': {'queue': 'instance_a_queue'} }, 'critical-alerts-instance-a': { 'task': 'apps.celery_app.tasks.critical_alerts', 'schedule': 120.0, 'options': {'queue': 'instance_a_queue'} }, }, 'instance_b': { 'daily-cleanup-instance-b': { 'task': 'apps.celery_app.tasks.daily_cleanup', 'schedule': crontab(hour=2, minute=0), 'options': {'queue': 'instance_b_queue'} }, 'weekly-reports-instance-b': { 'task': 'apps.celery_app.tasks.weekly_reports', 'schedule': crontab(hour=6, minute=0, day_of_week=1), 'options': {'queue': 'instance_b_queue'} }, 'data-export-instance-b': { 'task': 'apps.celery_app.tasks.data_export', 'schedule': crontab(hour=4, minute=0, day_of_week=0), 'options': {'queue': 'instance_b_queue'} }, }, }
# apps/celery_app/redis_sync.py import redis import json import time from django.conf import settings from django.utils import timezone from .models import BeatInstance, BeatTaskLock import logginglogger = logging.getLogger(__name__)class RedisBeatSyncService: """Service de synchronisation Beat avec Redis""" def __init__(self, instance_name, instance_type): self.instance_name = instance_name self.instance_type = instance_type self.config = settings.REDIS_BEAT_CONFIG.get(instance_type, {}) # Configuration Redis spécifique à l'instance db = self.config.get('database', 0) self.redis_url = f"{settings.REDIS_URL.split('/')[0]}/{db}" self.redis_client = redis.from_url(self.redis_url) self.key_prefix = self.config.get('key_prefix', 'beat:') self.lock_timeout = self.config.get('lock_timeout', 300) self.heartbeat_interval = self.config.get('heartbeat_interval', 60) def register_instance(self): """Enregistre l'instance Beat dans Redis""" instance_data = { 'name': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'registered_at': timezone.now().isoformat(), 'last_heartbeat': timezone.now().isoformat(), 'status': 'active', 'config': self.config } # Sauvegarder dans Redis avec expiration redis_key = f"{self.key_prefix}instance" self.redis_client.setex( redis_key, self.lock_timeout, json.dumps(instance_data) ) # Ajouter à la liste des instances actives self.redis_client.sadd(f"{self.key_prefix}active_instances", self.instance_name) # Créer ou mettre à jour dans Django instance, created = BeatInstance.objects.get_or_create( name=self.instance_name, defaults={ 'instance_type': self.instance_type, 'queue_name': f"{self.instance_type}_queue", 'broker_type': 'redis', 'config_data': self.config } ) if not created: instance.last_heartbeat = timezone.now() instance.broker_type = 'redis' instance.save() logger.info(f"Instance Redis {self.instance_name} enregistrée") def update_heartbeat(self): """Met à jour le heartbeat de l'instance""" redis_key = f"{self.key_prefix}instance" instance_data = self.redis_client.get(redis_key) if instance_data: data = json.loads(instance_data) data['last_heartbeat'] = timezone.now().isoformat() # Renouveler avec expiration self.redis_client.setex(redis_key, self.lock_timeout, json.dumps(data)) # Mettre à jour dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) instance.last_heartbeat = timezone.now() instance.save(update_fields=['last_heartbeat']) except BeatInstance.DoesNotExist: self.register_instance() def acquire_task_lock(self, task_name, timeout=None): """Acquiert un verrou pour une tâche""" if timeout is None: timeout = self.lock_timeout lock_key = f"{self.key_prefix}lock:{task_name}" lock_value = json.dumps({ 'instance': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'acquired_at': timezone.now().isoformat(), 'timeout': timeout }) # Essayer d'acquérir le verrou avec SET NX EX result = self.redis_client.set( lock_key, lock_value, nx=True, # Ne définir que si la clé n'existe pas ex=timeout # Expiration en secondes ) if result: # Enregistrer le verrou dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) BeatTaskLock.objects.create( task_name=task_name, instance=instance, lock_key=lock_key, broker_type='redis', expires_at=timezone.now() + timezone.timedelta(seconds=timeout) ) except BeatInstance.DoesNotExist: pass logger.info(f"Verrou Redis acquis par {self.instance_name} pour {task_name}") return True else: logger.info(f"Verrou Redis déjà tenu pour {task_name}") return False def release_task_lock(self, task_name): """Libère un verrou pour une tâche""" lock_key = f"{self.key_prefix}lock:{task_name}" # Script Lua pour libérer seulement si c'est notre verrou lua_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ lock_value = json.dumps({ 'instance': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'acquired_at': timezone.now().isoformat(), 'timeout': self.lock_timeout }) result = self.redis_client.eval(lua_script, 1, lock_key, lock_value) if result: # Supprimer le verrou de Django BeatTaskLock.objects.filter( lock_key=lock_key, instance__name=self.instance_name ).delete() logger.info(f"Verrou Redis libéré par {self.instance_name} pour {task_name}") return True else: logger.warning(f"Impossible de libérer le verrou Redis {task_name}") return False def get_active_instances(self): """Récupère la liste des instances actives""" instances = [] active_instance_names = self.redis_client.smembers(f"{self.key_prefix}active_instances") for instance_name in active_instance_names: instance_key = f"{self.key_prefix}instance" instance_data = self.redis_client.get(instance_key) if instance_data: instances.append(json.loads(instance_data)) return instances def get_task_locks(self): """Récupère tous les verrous de tâches actifs""" locks = [] lock_pattern = f"{self.key_prefix}lock:*" for lock_key in self.redis_client.scan_iter(match=lock_pattern): lock_data = self.redis_client.get(lock_key) if lock_data: lock_info = json.loads(lock_data) lock_info['lock_key'] = lock_key.decode('utf-8') locks.append(lock_info) return locks def cleanup_expired_locks(self): """Nettoie les verrous expirés""" lock_pattern = f"{self.key_prefix}lock:*" cleaned_count = 0 for lock_key in self.redis_client.scan_iter(match=lock_pattern): ttl = self.redis_client.ttl(lock_key) if ttl == -1: # Pas d'expiration définie self.redis_client.delete(lock_key) cleaned_count += 1 elif ttl == -2: # Clé expirée cleaned_count += 1 logger.info(f"Nettoyage Redis: {cleaned_count} verrous expirés supprimés") return cleaned_count def get_instance_stats(self): """Récupère les statistiques de l'instance""" stats = { 'instance_name': self.instance_name, 'instance_type': self.instance_type, 'broker': 'redis', 'active_locks': len(self.get_task_locks()), 'config': self.config, 'redis_info': self.redis_client.info(), } return stats
# apps/celery_app/redis_sync.py import redis import json import time from django.conf import settings from django.utils import timezone from .models import BeatInstance, BeatTaskLock import logginglogger = logging.getLogger(__name__)class RedisBeatSyncService: """Service de synchronisation Beat avec Redis""" def __init__(self, instance_name, instance_type): self.instance_name = instance_name self.instance_type = instance_type self.config = settings.REDIS_BEAT_CONFIG.get(instance_type, {}) # Configuration Redis spécifique à l'instance db = self.config.get('database', 0) self.redis_url = f"{settings.REDIS_URL.split('/')[0]}/{db}" self.redis_client = redis.from_url(self.redis_url) self.key_prefix = self.config.get('key_prefix', 'beat:') self.lock_timeout = self.config.get('lock_timeout', 300) self.heartbeat_interval = self.config.get('heartbeat_interval', 60) def register_instance(self): """Enregistre l'instance Beat dans Redis""" instance_data = { 'name': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'registered_at': timezone.now().isoformat(), 'last_heartbeat': timezone.now().isoformat(), 'status': 'active', 'config': self.config } # Sauvegarder dans Redis avec expiration redis_key = f"{self.key_prefix}instance" self.redis_client.setex( redis_key, self.lock_timeout, json.dumps(instance_data) ) # Ajouter à la liste des instances actives self.redis_client.sadd(f"{self.key_prefix}active_instances", self.instance_name) # Créer ou mettre à jour dans Django instance, created = BeatInstance.objects.get_or_create( name=self.instance_name, defaults={ 'instance_type': self.instance_type, 'queue_name': f"{self.instance_type}_queue", 'broker_type': 'redis', 'config_data': self.config } ) if not created: instance.last_heartbeat = timezone.now() instance.broker_type = 'redis' instance.save() logger.info(f"Instance Redis {self.instance_name} enregistrée") def update_heartbeat(self): """Met à jour le heartbeat de l'instance""" redis_key = f"{self.key_prefix}instance" instance_data = self.redis_client.get(redis_key) if instance_data: data = json.loads(instance_data) data['last_heartbeat'] = timezone.now().isoformat() # Renouveler avec expiration self.redis_client.setex(redis_key, self.lock_timeout, json.dumps(data)) # Mettre à jour dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) instance.last_heartbeat = timezone.now() instance.save(update_fields=['last_heartbeat']) except BeatInstance.DoesNotExist: self.register_instance() def acquire_task_lock(self, task_name, timeout=None): """Acquiert un verrou pour une tâche""" if timeout is
Découvrez nos actualités, conseils et tutoriels pour rester à jour
Oublier ou devoir réinitialiser le mot d...
_set est associé à une relation i...
Concept vraiment simple aujourd'hui les gars! U...
Découvre Capacitor, le dernier-né...
Inscription en cours...
Restez informé de nos dernières actualités et recevez nos meilleurs conseils !
Chargement du formulaire de devis...