None: timeout = self.lock_timeout lock_key = f"{self.key_prefix}lock:{task_name}" lock_value = json.dumps({ 'instance': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'acquired_at': timezone.now().isoformat(), 'timeout': timeout }) # Essayer d'acquérir le verrou avec SET NX EX result = self.redis_client.set( lock_key, lock_value, nx=True, # Ne définir que si la clé n'existe pas ex=timeout # Expiration en secondes ) if result: # Enregistrer le verrou dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) BeatTaskLock.objects.create( task_name=task_name, instance=instance, lock_key=lock_key, broker_type='redis', expires_at=timezone.now() + timezone.timedelta(seconds=timeout) ) except BeatInstance.DoesNotExist: pass logger.info(f"Verrou Redis acquis par {self.instance_name} pour {task_name}") return True else: logger.info(f"Verrou Redis déjà tenu pour {task_name}") return False def release_task_lock(self, task_name): """Libère un verrou pour une tâche""" lock_key = f"{self.key_prefix}lock:{task_name}" # Script Lua pour libérer seulement si c'est notre verrou lua_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ lock_value = json.dumps({ 'instance': self.instance_name, 'type': self.instance_type, 'broker': 'redis', 'acquired_at': timezone.now().isoformat(), 'timeout': self.lock_timeout }) result = self.redis_client.eval(lua_script, 1, lock_key, lock_value) if result: # Supprimer le verrou de Django BeatTaskLock.objects.filter( lock_key=lock_key, instance__name=self.instance_name ).delete() logger.info(f"Verrou Redis libéré par {self.instance_name} pour {task_name}") return True else: logger.warning(f"Impossible de libérer le verrou Redis {task_name}") return False def get_active_instances(self): """Récupère la liste des instances actives""" instances = [] active_instance_names = self.redis_client.smembers(f"{self.key_prefix}active_instances") for instance_name in active_instance_names: instance_key = f"{self.key_prefix}instance" instance_data = self.redis_client.get(instance_key) if instance_data: instances.append(json.loads(instance_data)) return instances def get_task_locks(self): """Récupère tous les verrous de tâches actifs""" locks = [] lock_pattern = f"{self.key_prefix}lock:*" for lock_key in self.redis_client.scan_iter(match=lock_pattern): lock_data = self.redis_client.get(lock_key) if lock_data: lock_info = json.loads(lock_data) lock_info['lock_key'] = lock_key.decode('utf-8') locks.append(lock_info) return locks def cleanup_expired_locks(self): """Nettoie les verrous expirés""" lock_pattern = f"{self.key_prefix}lock:*" cleaned_count = 0 for lock_key in self.redis_client.scan_iter(match=lock_pattern): ttl = self.redis_client.ttl(lock_key) if ttl == -1: # Pas d'expiration définie self.redis_client.delete(lock_key) cleaned_count += 1 elif ttl == -2: # Clé expirée cleaned_count += 1 logger.info(f"Nettoyage Redis: {cleaned_count} verrous expirés supprimés") return cleaned_count def get_instance_stats(self): """Récupère les statistiques de l'instance""" stats = { 'instance_name': self.instance_name, 'instance_type': self.instance_type, 'broker': 'redis', 'active_locks': len(self.get_task_locks()), 'config': self.config, 'redis_info': self.redis_client.info(), } return stats
# my_project/settings/rabbitmq.py from decouple import config import os# Configuration RabbitMQ RABBITMQ_HOST = config('RABBITMQ_HOST', default='localhost') RABBITMQ_PORT = config('RABBITMQ_PORT', default=5672) RABBITMQ_USER = config('RABBITMQ_USER', default='guest') RABBITMQ_PASSWORD = config('RABBITMQ_PASSWORD', default='guest') RABBITMQ_VHOST = config('RABBITMQ_VHOST', default='/')# URLs RabbitMQ RABBITMQ_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}{RABBITMQ_VHOST}"# Configuration Celery avec RabbitMQ CELERY_BROKER_URL = RABBITMQ_URL CELERY_RESULT_BACKEND = 'rpc://'# Configuration RabbitMQ pour les instances Beat RABBITMQ_BEAT_CONFIG = { 'instance_a': { 'exchange': 'beat_a_exchange', 'queue_prefix': 'beat_a', 'routing_key_prefix': 'beat_a', 'lock_timeout': 300, 'heartbeat_interval': 30, 'durable': True, 'auto_delete': False, }, 'instance_b': { 'exchange': 'beat_b_exchange', 'queue_prefix': 'beat_b', 'routing_key_prefix': 'beat_b', 'lock_timeout': 600, 'heartbeat_interval': 60, 'durable': True, 'auto_delete': False, }, }# Configuration des exchanges et queues RabbitMQ from kombu import Exchange, Queue# Exchanges BEAT_A_EXCHANGE = Exchange('beat_a_exchange', type='direct', durable=True) BEAT_B_EXCHANGE = Exchange('beat_b_exchange', type='direct', durable=True)# Queues CELERY_TASK_QUEUES = ( Queue('instance_a_queue', exchange=BEAT_A_EXCHANGE, routing_key='beat_a.tasks'), Queue('instance_b_queue', exchange=BEAT_B_EXCHANGE, routing_key='beat_b.tasks'), Queue('beat_a_locks', exchange=BEAT_A_EXCHANGE, routing_key='beat_a.locks'), Queue('beat_b_locks', exchange=BEAT_B_EXCHANGE, routing_key='beat_b.locks'), Queue('default', routing_key='default'), )# Configuration Beat avec RabbitMQ CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' CELERY_BEAT_SYNC_EVERY = 1# Configuration des tâches par instance CELERY_TASK_ROUTES = { 'apps.celery_app.tasks.health_check': { 'queue': 'instance_a_queue', 'routing_key': 'beat_a.tasks.health_check' }, 'apps.celery_app.tasks.system_monitor': { 'queue': 'instance_a_queue', 'routing_key': 'beat_a.tasks.system_monitor' }, 'apps.celery_app.tasks.critical_alerts': { 'queue': 'instance_a_queue', 'routing_key': 'beat_a.tasks.critical_alerts' }, 'apps.celery_app.tasks.daily_cleanup': { 'queue': 'instance_b_queue', 'routing_key': 'beat_b.tasks.daily_cleanup' }, 'apps.celery_app.tasks.weekly_reports': { 'queue': 'instance_b_queue', 'routing_key': 'beat_b.tasks.weekly_reports' }, 'apps.celery_app.tasks.data_export': { 'queue': 'instance_b_queue', 'routing_key': 'beat_b.tasks.data_export' }, }# Configuration RabbitMQ pour la persistance RABBITMQ_BEAT_SCHEDULE = { 'instance_a': { 'health-check-instance-a': { 'task': 'apps.celery_app.tasks.health_check', 'schedule': 30.0, 'options': { 'queue': 'instance_a_queue', 'routing_key': 'beat_a.tasks.health_check' } }, 'system-monitor-instance-a': { 'task': 'apps.celery_app.tasks.system_monitor', 'schedule': 60.0, 'options': { 'queue': 'instance_a_queue', 'routing_key': 'beat_a.tasks.system_monitor' } }, 'critical-alerts-instance-a': { 'task': 'apps.celery_app.tasks.critical_alerts', 'schedule': 120.0, 'options': { 'queue': 'instance_a_queue', 'routing_key': 'beat_a.tasks.critical_alerts' } }, }, 'instance_b': { 'daily-cleanup-instance-b': { 'task': 'apps.celery_app.tasks.daily_cleanup', 'schedule': crontab(hour=2, minute=0), 'options': { 'queue': 'instance_b_queue', 'routing_key': 'beat_b.tasks.daily_cleanup' } }, 'weekly-reports-instance-b': { 'task': 'apps.celery_app.tasks.weekly_reports', 'schedule': crontab(hour=6, minute=0, day_of_week=1), 'options': { 'queue': 'instance_b_queue', 'routing_key': 'beat_b.tasks.weekly_reports' } }, 'data-export-instance-b': { 'task': 'apps.celery_app.tasks.data_export', 'schedule': crontab(hour=4, minute=0, day_of_week=0), 'options': { 'queue': 'instance_b_queue', 'routing_key': 'beat_b.tasks.data_export' } }, }, }
# apps/celery_app/rabbitmq_sync.py import pika import json import time from django.conf import settings from django.utils import timezone from .models import BeatInstance, BeatTaskLock import logginglogger = logging.getLogger(__name__)class RabbitMQBeatSyncService: """Service de synchronisation Beat avec RabbitMQ""" def __init__(self, instance_name, instance_type): self.instance_name = instance_name self.instance_type = instance_type self.config = settings.RABBITMQ_BEAT_CONFIG.get(instance_type, {}) # Configuration RabbitMQ self.connection_params = pika.ConnectionParameters( host=settings.RABBITMQ_HOST, port=settings.RABBITMQ_PORT, virtual_host=settings.RABBITMQ_VHOST, credentials=pika.PlainCredentials( settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD ) ) self.connection = None self.channel = None self.exchange = self.config.get('exchange', f'beat_{instance_type}_exchange') self.queue_prefix = self.config.get('queue_prefix', f'beat_{instance_type}') self.routing_key_prefix = self.config.get('routing_key_prefix', f'beat_{instance_type}') self.lock_timeout = self.config.get('lock_timeout', 300) self.heartbeat_interval = self.config.get('heartbeat_interval', 60) def connect(self): """Établit la connexion RabbitMQ""" try: self.connection = pika.BlockingConnection(self.connection_params) self.channel = self.connection.channel() # Déclarer l'exchange self.channel.exchange_declare( exchange=self.exchange, exchange_type='direct', durable=True ) logger.info(f"Connexion RabbitMQ établie pour {self.instance_name}") return True except Exception as e: logger.error(f"Erreur de connexion RabbitMQ: {e}") return False def disconnect(self): """Ferme la connexion RabbitMQ""" try: if self.channel: self.channel.close() if self.connection: self.connection.close() logger.info(f"Connexion RabbitMQ fermée pour {self.instance_name}") except Exception as e: logger.error(f"Erreur lors de la fermeture RabbitMQ: {e}") def register_instance(self): """Enregistre l'instance Beat dans RabbitMQ""" if not self.connect(): return False try: instance_data = { 'name': self.instance_name, 'type': self.instance_type, 'broker': 'rabbitmq', 'registered_at': timezone.now().isoformat(), 'last_heartbeat': timezone.now().isoformat(), 'status': 'active', 'config': self.config } # Créer une queue pour cette instance instance_queue = f"{self.queue_prefix}.instance" self.channel.queue_declare( queue=instance_queue, durable=True, auto_delete=False ) # Publier les données de l'instance self.channel.basic_publish( exchange=self.exchange, routing_key=f"{self.routing_key_prefix}.instance", body=json.dumps(instance_data), properties=pika.BasicProperties( delivery_mode=2, # Rendre le message persistant expiration=str(self.lock_timeout * 1000) # TTL en millisecondes ) ) # Créer ou mettre à jour dans Django instance, created = BeatInstance.objects.get_or_create( name=self.instance_name, defaults={ 'instance_type': self.instance_type, 'queue_name': f"{self.instance_type}_queue", 'broker_type': 'rabbitmq', 'config_data': self.config } ) if not created: instance.last_heartbeat = timezone.now() instance.broker_type = 'rabbitmq' instance.save() logger.info(f"Instance RabbitMQ {self.instance_name} enregistrée") return True except Exception as e: logger.error(f"Erreur lors de l'enregistrement RabbitMQ: {e}") return False finally: self.disconnect() def update_heartbeat(self): """Met à jour le heartbeat de l'instance""" if not self.connect(): return False try: instance_data = { 'name': self.instance_name, 'type': self.instance_type, 'broker': 'rabbitmq', 'last_heartbeat': timezone.now().isoformat(), 'status': 'active' } # Publier le heartbeat self.channel.basic_publish( exchange=self.exchange, routing_key=f"{self.routing_key_prefix}.heartbeat", body=json.dumps(instance_data), properties=pika.BasicProperties( delivery_mode=2, expiration=str(self.lock_timeout * 1000) ) ) # Mettre à jour dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) instance.last_heartbeat = timezone.now() instance.save(update_fields=['last_heartbeat']) except BeatInstance.DoesNotExist: self.register_instance() return True except Exception as e: logger.error(f"Erreur lors du heartbeat RabbitMQ: {e}") return False finally: self.disconnect() def acquire_task_lock(self, task_name, timeout=None): """Acquiert un verrou pour une tâche via RabbitMQ""" if timeout is None: timeout = self.lock_timeout if not self.connect(): return False try: lock_data = { 'instance': self.instance_name, 'type': self.instance_type, 'broker': 'rabbitmq', 'acquired_at': timezone.now().isoformat(), 'timeout': timeout } # Créer une queue pour le verrou lock_queue = f"{self.queue_prefix}.lock.{task_name}" self.channel.queue_declare( queue=lock_queue, durable=True, auto_delete=False, arguments={'x-message-ttl': timeout * 1000} # TTL en millisecondes ) # Vérifier si le verrou existe déjà try: method = self.channel.queue_declare( queue=lock_queue, passive=True # Vérifier l'existence sans créer ) # Si on arrive ici, la queue existe déjà (verrou tenu) logger.info(f"Verrou RabbitMQ déjà tenu pour {task_name}") return False except pika.exceptions.AMQPChannelError: # La queue n'existe pas, on peut acquérir le verrou pass # Publier le verrou self.channel.basic_publish( exchange='', # Exchange par défaut routing_key=lock_queue, body=json.dumps(lock_data), properties=pika.BasicProperties( delivery_mode=2, expiration=str(timeout * 1000) ) ) # Enregistrer le verrou dans Django try: instance = BeatInstance.objects.get(name=self.instance_name) BeatTaskLock.objects.create( task_name=task_name, instance=instance, lock_key=lock_queue, broker_type='rabbitmq', expires_at=timezone.now() + timezone.timedelta(seconds=timeout) ) except BeatInstance.DoesNotExist: pass logger.info(f"Verrou RabbitMQ acquis par {self.instance_name} pour {task_name}") return True except Exception as e: logger.error(f"Erreur lors de l'acquisition du verrou RabbitMQ: {e}") return False finally: self.disconnect() def release_task_lock(self, task_name): """Libère un verrou pour une tâche""" if not self.connect(): return False try: lock_queue = f"{self.queue_prefix}.lock.{task_name}" # Supprimer la queue (ce qui libère le verrou) self.channel.queue_delete(queue=lock_queue) # Supprimer le verrou de Django BeatTaskLock.objects.filter( lock_key=lock_queue, instance__name=self.instance_name ).delete() logger.info(f"Verrou RabbitMQ libéré par {self.instance_name} pour {task_name}") return True except Exception as e: logger.error(f"Erreur lors de la libération du verrou RabbitMQ: {e}") return False finally: self.disconnect() def get_instance_stats(self): """Récupère les statistiques de l'instance""" stats = { 'instance_name': self.instance_name, 'instance_type': self.instance_type, 'broker': 'rabbitmq', 'config': self.config, 'exchange': self.exchange, 'queue_prefix': self.queue_prefix, } return stats
# apps/celery_app/rabbitmq_sync.py import pika import json import time from django.conf import settings from django.utils import timezone from .models import BeatInstance, BeatTaskLock import logginglogger = logging.getLogger(__name__)class RabbitMQBeatSyncService: """Service de synchronisation Beat avec RabbitMQ""" def __init__(self, instance_name, instance_type): self.instance_name = instance_name self.instance_type = instance_type self.config = settings.RABBITMQ_BEAT_CONFIG.get(instance_type, {}) # Configuration RabbitMQ self.connection_params = pika.ConnectionParameters( host=settings.RABBITMQ_HOST, port=settings.RABBITMQ_PORT, virtual_host=settings.RABBITMQ_VHOST, credentials=pika.PlainCredentials( settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD ) ) self.connection = None self.channel = None self.exchange = self.config.get('exchange', f'beat_{instance_type}_exchange') self.queue_prefix = self.config.get('queue_prefix', f'beat_{instance_type}') self.routing_key_prefix = self.config.get('routing_key_prefix', f'beat_{instance_type}') self.lock_timeout = self.config.get('lock_timeout', 300) self.heartbeat_interval = self.config.get('heartbeat_interval', 60) def connect(self): """Établit la connexion RabbitMQ""" try: self.connection = pika.BlockingConnection(self.connection_params) self.channel = self.connection.channel() # Déclarer l'exchange self.channel.exchange_declare( exchange=self.exchange, exchange_type='direct', durable=True ) logger.info(f"Connexion RabbitMQ établie pour {self.instance_name}") return True except Exception as e: logger.error(f"Erreur de connexion RabbitMQ: {e}") return False def disconnect(self): """Ferme la connexion RabbitMQ""" try: if self.channel: self.channel.close() if self.connection: self.connection.close() logger.info(f"Connexion RabbitMQ fermée pour {self.instance_name}") except Exception as e: logger.error(f"Erreur lors de la fermeture RabbitMQ: {e}") def register_instance(self): """Enregistre l'instance Beat dans RabbitMQ""" if not self.connect(): return False try: instance_data = { 'name': self.instance_name, 'type': self.instance_type, 'broker': 'rabbitmq', 'registered_at': timezone.now().isoformat(), 'last_heartbeat': timezone.now().isoformat(), 'status': 'active', 'config': self.config } # Créer une queue pour cette instance instance_queue = f"{self.queue_prefix}.instance" self.channel.queue_declare( queue=instance_queue, durable=True, auto_delete=False ) # Publier les données de l'instance self.channel.basic_publish( exchange=self.exchange, routing_key=f"{self.routing_key_prefix}.instance", body=json.dumps(instance_data), properties=pika.BasicProperties( delivery_mode=2, # Rendre le message persistant expiration=str(self.lock_timeout * 1000) # TTL en millisecondes ) ) # Créer ou mettre à jour dans Django instance, created = BeatInstance.objects.get_or_create( name=self.instance_name, defaults={ 'instance_type': self.instance_type, 'queue_name': f"{self.instance_type}_queue", 'broker_type': 'rabbitmq', 'config_data': self.config } ) if not created: instance.last_heartbeat = timezone.now() instance.broker_type = 'rabbitmq' instance.save() logger.info(f"Instance RabbitMQ {self.instance_name} enregistrée") return True except Exception as e: logger.error(f"Erreur lors de l'enregistrement RabbitMQ:
Découvrez nos actualités, conseils et tutoriels pour rester à jour
Dans ce tutoriel, nous allons implémente...
Découvre Capacitor, le dernier-né...
Ce guide est conçu pour les début...
_set est associé à une relation i...
Inscription en cours...
Restez informé de nos dernières actualités et recevez nos meilleurs conseils !
Chargement du formulaire de devis...