Retour aux articles
Tech·· 24 min read

Haute disponibilité sur NATS JetStream + taskiq dans Kubernetes

D'un scheduler fragile en replica unique à une architecture HA : cinq bugs silencieux, cinq corrections — et une race condition dans nats.py invisible sans lire le source.

Haute disponibilité sur NATS JetStream + taskiq dans Kubernetes

Le monitoring est l'un de ces rares domaines où « ça marche la plupart du temps » ne suffit pas. Si le scheduler de votre produit tombe 30 secondes pendant un rolling update Kubernetes et que personne ne le remarque, c'est tolérable. Si votre scheduler de monitoring tombe 30 secondes et que personne ne le remarque — vous venez d'introduire exactement le mode de panne que vos utilisateurs vous paient pour détecter. Vous êtes aveugle.

Cet article retrace le chemin parcouru, d'un scheduler de monitoring fragile en replica unique jusqu'à une architecture haute disponibilité sur NATS JetStream et taskiq, le tout tournant sur Kubernetes. Ce n'était pas un sprint planifié. C'était une succession d'incidents et de bugs silencieux, chaque fix révèle le problème suivant. Cinq problèmes, cinq corrections — et un bug dans nats.py qui n'a resurgi qu'en lisant le code source.


Le système en bref

Le backend de monitoring de Perimon comprend trois composants : une Web API (FastAPI), un monitoring worker (consomme les tâches depuis NATS JetStream, exécute les checks HTTP/SSL) et un monitoring scheduler (cadence l'exécution des tâches de monitoring). L'infrastructure de messagerie est NATS JetStream, utilisé à la fois comme queue de tâches (via taskiq-nats) et comme KV store pour la coordination.

perimonSites surveillesHTTP / SSLWeb APIFastAPINATSMonitoring SchedulerWorkers de monitoring N instance(s)KV StoreplanningsJetStreamfile de checks ecrit les plannings Récupére le planning Envoie les taches probes
click to expand

Le scheduler est le cœur battant du système. S'il tombe, les checks s'arrêtent silencieusement — pas d'erreurs, pas de logs, juste rien. Les workers vident leur queue et se taisent. Depuis l'extérieur, Perimon a l'air sain. En interne, chaque site surveillé vient de devenir invisible.

Cela crée une asymétrie dans les modes de défaillance acceptables. Un check dupliqué est gênant mais récupérable. Un check manqué est un faux négatif : un site peut être tombé et Perimon ne l'a pas vu. Pour le monitoring, les faux négatifs sont pires que les doublons. Ce compromis structure toutes les décisions de conception ci-dessous.


Problème 1 — Le worker qui s'arrête d'écouter (silencieusement)

Symptôme

Après un redémarrage du serveur NATS (ou une brève partition réseau), les workers de monitoring semblaient sains. Les readiness probes K8s passaient, les connexions NATS apparaissaient actives, mémoire et CPU normaux. Mais aucune tâche n'était consommée. La profondeur de file augmentait. Aucune erreur dans les logs.

Cause

taskiq-nats utilise un consumer JetStream pull-based : le worker appelle explicitement consumer.fetch() pour récupérer des messages. La boucle listen() de PullBasedJetStreamBroker, la classe de la librairie, ressemble à ceci :

while True:
    try:
        messages = await self.consumer.fetch(
            batch=self.pull_consume_batch,
            timeout=self.pull_consume_timeout,
        )
        for msg in messages:
            yield AckableMessage(data=msg.data, ack=msg.ack)
    except NatsTimeoutError:
        continue  # poll vide, normal

Quand timeout=None (ou une valeur très grande), fetch() bloque indéfiniment jusqu'à ce que des messages arrivent. Après un reconnect NATS, nats.py reconstruit la connexion au niveau transport — mais le contexte JetStream (self.js) et le pull consumer (self.consumer) deviennent obsolètes. Ils avaient été créés pour la connexion précédente. fetch() attend maintenant sur une subscription morte qui ne livrera jamais rien.

Le reconnected_cb se déclenche et loggue quelque chose, mais personne ne l'observe. La boucle listen est bloquée dans await self.consumer.fetch(...) pour toujours. Le worker est vivant ; il ne sort simplement jamais du fetch().

Solution

La solution fut de créer une classe ResilientPullBroker qui étend PullBasedJetStreamBroker avec un _reinit_event: asyncio.Event et réécrit la boucle listen() pour racer deux futures à chaque itération :

Reset de la boucle du consumer

workerSites surveillesHTTP / SSLlisten()consumer.fetch()_reinit_event.wait()(Event trigger par reconnection callback)asyncio.wait()Au premier des deux events completéEffectue les tachesreinit consumer Connection alive Connection reset consumer.fetch() attendsur une connection morte relance reçois les taches
click to expand
async def listen(self):
    while True:
        fetch_task = None
        reinit_task = None
        try:
            fetch_task = asyncio.ensure_future(
                self.consumer.fetch(
                    batch=self.pull_consume_batch,
                    timeout=self.pull_consume_timeout,
                )
            )
            reinit_task = asyncio.ensure_future(self._reinit_event.wait())

            done, pending = await asyncio.wait(
                {fetch_task, reinit_task},
                return_when=asyncio.FIRST_COMPLETED,
            )
            for t in pending:
                t.cancel()
                try:
                    await t
                except (asyncio.CancelledError, Exception):
                    pass

            if reinit_task in done:
                self._reinit_event.clear()
                await self._reinit_consumer()
                continue

            messages = fetch_task.result()
            for msg in messages:
                yield AckableMessage(data=msg.data, ack=msg.ack)

        except NatsTimeoutError:
            continue
        except Exception as e:
            _logger.warning(
                f"Pull consumer fetch error ({type(e).__name__}: {e})"
                " — reinitializing consumer"
            )
            await self._reinit_consumer()
        finally:
            # À l'arrêt (CancelledError), annuler les tâches orphelines
            for t in [fetch_task, reinit_task]:
                if t is not None and not t.done():
                    t.cancel()
                    try:
                        await t
                    except (asyncio.CancelledError, Exception):
                        pass

À chaque itération, fetch() et _reinit_event.wait() s'exécutent désormais en parallèle. Quand reconnected_cb se déclenche (enveloppé dans le constructeur), il active l'event. L'itération en cours annule le fetch() en attente, réinitialise l'event, et appelle _reinit_consumer() :

async def _reinit_consumer(self) -> None:
    _logger.warning("Reconnect détecté — réinitialisation du pull consumer")
    try:
        self.js = self.client.jetstream()
        await self._startup_consumer()
        _logger.info("Pull consumer réinitialisé avec succès")
    except Exception as e:
        _logger.error(f"Échec de la réinitialisation du consumer : {e}")
        os.kill(os.getpid(), signal.SIGTERM)

Si la réinitialisation échoue (NATS encore indisponible par exemple), le processus s'envoie lui-même un SIGTERM. Kubernetes redémarre le pod. L'alternative — boucler silencieusement dans un état cassé — est pire.

Le bloc except Exception dans listen() appelle aussi _reinit_consumer() directement. Cela couvre le cas où fetch() lève une exception autre que NatsTimeoutError avant que reconnected_cb ait eu le temps de se déclencher — une erreur mi-fetch que l'event ne capturerait pas.

Deux détails d'implémentation à noter : _reinit_event est créé dans startup() — pas dans __init__() — car les events asyncio doivent être liés à une boucle événementielle active. Et un flag _shutting_down supprime tous les callbacks du cycle de vie NATS lors d'un broker.shutdown() propre, pour que le closed_cb qui envoie normalement un SIGTERM ne se déclenche pas à l'arrêt intentionnel.


Problème 2 — Le scheduler qui récupère un planning vide

Symptôme

Certains cycles de dispatch se déclenchaient sans aucune tâche — comme si le planning était vide. Pas toutes les tâches manquantes, pas à chaque cycle : le problème était intermittent et se reproduisait le plus fiablement juste après un redémarrage de pod scheduler, quand les premiers dispatches cron manquaient. Au cycle suivant, tout fonctionnait.

Cause

taskiq-nats fournit NATSKeyValueScheduleSource — une source de planning stockée dans NATS KV. Le scheduler appelle get_schedules() à chaque cycle de dispatch. Chaque appel ouvre un kv.watch() avec un sentinel : quand le watcher délivre une entrée None, cela signale « toutes les clés courantes ont été livrées », et la liste est considérée complète. La race condition est présente à chacun de ces appels.

Le problème est une race condition dans l'implémentation de kv.watch() de nats.py. Voici la logique pertinente (simplifiée depuis le code source) :

kv.watch() subscribe → récupère consumer_info → vérifie :
  si num_pending == 0 et received == 0 → déclenche le sentinel immédiatement

num_pending est un compteur côté serveur JetStream des messages en attente de livraison au consumer. received est un compteur côté client des messages déjà traités. La logique du sentinel : « si le serveur dit qu'il n'y a rien en attente et qu'on n'a rien reçu, il n'y a rien à lire. »

Le problème : nats.py appelle asyncio.sleep(0) (un seul yield de boucle événementielle) entre la souscription et l'appel à consumer_info(). Dans ce seul yield, le serveur peut avoir déjà marqué num_pending=0 (tous les messages envoyés) pendant que les messages sont encore dans le buffer TCP / _pending_queue côté client. received=0 est correct — ils n'ont pas encore été traités — mais num_pending=0 est aussi techniquement correct du point de vue du serveur. Le sentinel se déclenche immédiatement. kv.watch() retourne un résultat vide. Zéro planning.

Le fix correct dans nats.py serait :

buffered = watcher._sub.pending_msgs
if cinfo.num_pending == 0 and received == 0 and buffered == 0:  # ajouter buffered
    # déclencher le sentinel seulement si le buffer TCP est aussi vide

Ce fix n'a pas été intégré à la librairie à ce jour (2026-03-24). Le bug nats.py n'a pas été signalé — il a été diagnostiqué en lisant le code source, pas depuis une documentation quelconque.

Solution

La solution fut de créer une classe ResilientNATSScheduleSource. Le contournement est pragmatique : réessayer jusqu'à 3 fois avec un sleep de 10ms entre les tentatives. 10ms est suffisant pour que le buffer TCP se vide dans tous les scénarios réalistes :

class ResilientNATSScheduleSource(NATSKeyValueScheduleSource):
    _resilience_logger = logging.getLogger("Scheduler.Resilience")

    async def get_schedules(self, _retries: int = 3) -> list[ScheduledTask]:
        schedules = await super().get_schedules()
        if schedules:
            return schedules

        if _retries == 0:
            self._resilience_logger.error(
                "get_schedules returned empty after all retries — returning []"
            )
            return []

        self._resilience_logger.warning(
            "get_schedules returned empty — retrying (%d left)", _retries
        )
        await asyncio.sleep(0.01)
        return await self.get_schedules(_retries=_retries - 1)

En pratique, la première relance (après 10ms) réussit dans la quasi-totalité des cas. Le log error en cas d'épuisement des tentatives sert d'indicateur si le bucket n'a réellement aucun planning — ce qui ne devrait jamais arriver réellement puisque les jobs de monitoring sont enregistrés au démarrage de l'application.


Problème 3 — Deux replicas dispatchent la même tâche deux fois

Symptôme

Une fois la résilience du worker réglée, l'objectif suivant était d'assurer la haute disponibilité du scheduler. Un scheduler en replica unique, c'est crash = downtime. Le correctif semble évident : lancer deux replicas. Le problème est tout aussi évident : deux schedulers verront la même tâche cron se déclencher, l'enverront tous les deux au broker, et les utilisateurs se retrouvent avec des résultats en double.

La question devient : comment faire tourner deux instances d'un dispatcher de tâches en garantissant qu'une seule envoie les tâches à chaque slot ?

Cause

Deux processus indépendants voient le même planning cron et dispatchent indépendamment — il n'existe aucune couche de coordination entre les replicas.

Il existe des patterns établis pour ça : verrous distribués (Zookeeper, etcd), advisory locks de base de données, services de cron externes. Perimon utilisant déjà NATS KV pour le stockage de coordination, l'étendre au verrouillage distribué ne rajoutait aucune dépendance d'infrastructure.

Solution

La solution comporte trois couches, chacune adressant un mode de défaillance découvert lors de l'implémentation de la précédente.

Leader election via NATS KV

Le design est un verrou distribué sur NATS KV avec heartbeat TTL par message :

  • Une instance tente kv.create(key, instance_id, msg_ttl=5s) au démarrage.
  • kv.create() est atomique — au plus une instance réussit. Le gagnant est le leader.
  • Le leader renouvelle le verrou toutes les 2 secondes via kv.update(key, ..., last=last_revision, msg_ttl=5s). C'est une opération CAS (compare-and-swap) : elle réussit seulement si last_revision correspond à la séquence KV courante, empêchant les scénarios split-brain où deux instances se croient toutes les deux leaders.
  • Le standby surveille la clé avec kv.watch(key). Quand la clé disparaît (expiration TTL sur crash, ou delete explicite à l'arrêt propre), le standby tente de prendre la main.
  • Fenêtre de failover : le TTL de la clé leader est 5 secondes. Temps maximum entre le crash et l'acquisition du verrou par le nouveau leader : ≤5 secondes (au pire un slot cron manqué).

Election du leader

Au démmarageNats JetStreamscheduler-haExecution normalPod APod BPod APod BLeader ElectionLeader Electionleaderheartbeatstandbywatch Essaie de devenir leader Essaie de devenir leader 1s gagne perd Refresh le kv leader Observe le kv leader
click to expand

Le role du standby

SIGTERM signalExecution normalNats JetStreamscheduler-haLeaderStandbyheartbeatshutdownwatchLeader ElectionStandbyNouveau Leader 1s stop la boucle Si plus de leader gagne perd Observe le kv leader supprime le kv leader kv leader supprimé
click to expand
async def _heartbeat(self) -> None:
    while True:
        await asyncio.sleep(self._heartbeat_interval)  # 2s
        if not self._is_leader or self._kv is None:
            continue
        try:
            self._last_revision = await self._kv.update(
                self._key,
                self._instance_id.encode(),
                last=self._last_revision,
                msg_ttl=self._msg_ttl,
            )
        except KeyWrongLastSequenceError:
            self._logger.warning("heartbeat CAS conflict — passage en standby")
            self._is_leader = False
            self._last_revision = None
        except Exception as exc:
            self._logger.warning(f"heartbeat error: {exc}")

Le CAS sur le heartbeat est intentionnel. Si une autre instance a acquis la clé (split-brain théorique), le kv.update(last=...) du leader échouera avec KeyWrongLastSequenceError parce que la révision ne correspond plus. Le leader passe lui-même en standby plutôt que de se battre. C'est l'issue la plus sûre.

Le comportement non documenté des TTL markers NATS

C'est ici que ça devient subtil — et non documenté.

Quand une clé expire par TTL, NATS ne la supprime pas silencieusement — il délivre un limit marker à tous les watchers, pour qu'ils puissent distinguer « clé expirée » de « pas encore de messages ». Ce marker a Nats-Marker-Reason: MaxAge dans ses headers, mais pas de header KV-Operation (contrairement à un DELETE ou PURGE explicite). Dans nats.py, cela signifie entry.operation is None.

L'implémentation initiale de _watch() vérifiait entry.operation is not None pour détecter la suppression de clé et déclencher le takeover — correct pour les deletes explicites. Mais lors d'une expiration TTL, entry.operation est None. Le standby ne se déclenchait jamais.

Pire : lors d'une expiration TTL, kv.create() échoue. On pourrait s'attendre que si la clé n'existe plus, kv.create() réussisse. Mais le limit marker a un numéro de séquence JetStream (un numéro de révision). kv.create() appelle en interne kv.update(last=0) (aucune révision précédente attendue). Le numéro de séquence du limit marker est non-zéro, donc le CAS échoue avec BadRequestError.

Le chemin correct pour le takeover sur expiration TTL est kv.update(last=entry.revision) — un CAS directement sur le numéro de séquence du limit marker. Exactement un standby gagne ; les autres obtiennent KeyWrongLastSequenceError.

async def _watch(self) -> None:
    while True:
        try:
            watcher = await self._kv.watch(self._key)
            async for entry in watcher:
                if entry is None:
                    continue

                # Deux signaux de suppression distincts :
                # - DEL/PURGE explicite : entry.operation is not None
                # - TTL limit marker : entry.operation is None ET entry.value est vide
                key_is_gone = entry.operation is not None or not entry.value
                if key_is_gone and not self._is_leader:
                    self._logger.info("Clé leader supprimée — tentative d'acquisition du verrou")
                    await self._try_acquire(last_revision=entry.revision)
        except Exception as exc:
            self._logger.warning(f"watcher error: {exc} — redémarrage dans 2s")
            await asyncio.sleep(2)

Et _try_acquire() gère les deux chemins :

async def _try_acquire(self, last_revision: int | None = None) -> None:
    try:
        if last_revision is not None:
            # Chemin expiration TTL : CAS sur le numéro de séquence du limit marker
            self._last_revision = await self._kv.update(
                self._key,
                self._instance_id.encode(),
                last=last_revision,
                msg_ttl=self._msg_ttl,
            )
        else:
            # Chemin démarrage : create propre (pas de clé préexistante)
            self._last_revision = await self._kv.create(
                self._key, self._instance_id.encode(), msg_ttl=self._msg_ttl
            )
        self._is_leader = True
    except (KeyWrongLastSequenceError, BadRequestError):
        # Une autre instance a gagné la race
        self._is_leader = False

Ce comportement n'est pas documenté dans le README de nats.py ni dans l'API doc. Il a été découvert en lisant les internals de JetStream et en observant BadRequestError dans les logs alors que KeyWrongLastSequenceError n'apparaissait jamais.

Déduplication des slots : la couche at-least-once

La leader election garantit qu'une seule instance envoie les tâches. Mais elle a une faille : si le leader crashe après avoir envoyé une tâche mais avant que le nouveau leader acquière le verrou, ce dernier enverra à nouveau la même tâche. C'est le problème « at-least-once ».

La solution repose sur des slot markers : une clé KV de courte durée écrite après chaque envoi, identifiant la paire (tâche, fenêtre temporelle) spécifique. HAScheduleSource encapsule n'importe quelle ScheduleSource avec deux hooks. Sur pre_send, il vérifie d'abord le leadership (le standby lève ScheduledTaskCancelledError immédiatement), puis cherche un slot marker existant pour cette tâche+fenêtre (doublon d'un leader précédent — aussi annulé). Sur post_send, il écrit le marker : kv.create(slot_key, b"1", msg_ttl=...).

schedulerChaque tâcheleader ?slot markerexiste ?skipEnvoie la tacheEcrit le slot marker non oui oui non
click to expand

Le marker est écrit après l'envoi : un crash entre l'envoi et l'écriture cause un doublon rare, mais un crash avant causerait un gap — pire pour le monitoring. La clé de slot encode la tâche et la fenêtre temporelle ; le TTL est calibré pour expirer juste avant le prochain déclenchement. Le verrou leader et tous les slot markers vivent dans un seul bucket (perimon-scheduler-ha), chacun avec son propre msg_ttl indépendant.


Problème 4 — SIGTERM tue le nettoyage de la clé leader

Avec la couche HA en place, il restait un mode de défaillance subtil caché dans le chemin d'arrêt.

Symptôme

Lors d'un rolling update Kubernetes (le cycle de déploiement normal), le pod défaillant recevait un SIGTERM. Comportement attendu : le leader exécute LeaderElection.shutdown(), qui supprime explicitement la clé leader. Le standby voit la suppression de clé, exécute immédiatement _try_acquire(), et prend la main. Failover en millisecondes.

Comportement réel : le pod défaillant était tué immédiatement (« terminated » dans kubectl logs). Aucun log d'arrêt. Aucune suppression de clé. Le standby attendait les 5 secondes complètes d'expiration TTL avant de prendre la main.

5 secondes de downtime du scheduler à chaque déploiement. Petit mais réel.

Cause

Les gestionnaires de signaux asyncio sont enregistrés via loop.add_signal_handler(). Le scheduler runner de taskiq enregistre un handler pour SIGINT — que Python mappe sur KeyboardInterrupt, que la boucle événementielle asyncio convertit en CancelledError sur la tâche principale, que taskiq intercepte pour exécuter sa séquence d'arrêt gracieux. Cette séquence appelle scheduler.shutdown()source.shutdown()leader.shutdown()kv.delete(key).

Mais SIGTERM est un signal différent. L'action par défaut de l'OS pour SIGTERM est la terminaison immédiate du processus. Pas d'exception Python. Pas d'asyncio. Pas de handlers d'arrêt. Le processus est simplement supprimé.

$ kill -s SIGINT $PID
# logs: "Shutting down scheduler."
# logs: "LeaderElection shutdown — leader key deleted"

$ kill -s SIGTERM $PID
# logs: "terminated"
# rien d'autre

Dans Kubernetes, les pods reçoivent toujours SIGTERM (pas SIGINT) lors de la terminaison gracieuse. Le chemin d'arrêt gracieux était contourné à chaque déploiement.

Sans le fixAvec le fixSIGTERMOS tue le processclé leader orphelinestandby attend ≤ 5sSIGTERMmain_task.cancel()CancelledErrorkv.delete(key)standby acquiert < 1s
click to expand

Solution

La solution fut d'ajouter une fonction _register_sigterm_handler() :

def _register_sigterm_handler() -> None:
    """Faire se comporter SIGTERM comme SIGINT : annuler la tâche asyncio principale
    pour que le handler CancelledError de taskiq exécute l'arrêt gracieux du scheduler."""
    loop = asyncio.get_running_loop()
    main_task = asyncio.current_task()
    loop.add_signal_handler(signal.SIGTERM, main_task.cancel)

C'est une ligne unique qui route SIGTERM par le même chemin CancelledError que SIGINT utilise déjà. Le handler d'arrêt existant de taskiq s'exécute identiquement pour les deux signaux. _register_sigterm_handler() est appelé depuis LeaderElection.startup(), donc il est câblé automatiquement quand la couche HA s'initialise — aucun changement nécessaire dans le runner taskiq ou tout autre appelant.

L'impact : sur SIGTERM, le standby acquiert maintenant le leadership en millisecondes (suppression explicite via kv.delete()) plutôt qu'en ≤5 secondes (expiration TTL). Pour un rolling update avec deux replicas, le cycle de déploiement est passé de « 5 secondes de gap scheduler par déploiement » à « failover sub-seconde sans gap ».

C'est le type de bug facile à rater. Le chemin d'arrêt gracieux semble correct à la review de code. La méthode LeaderElection.shutdown() existe, est appelée depuis la source, est testée en local (avec Ctrl+C = SIGINT). Elle ne s'exécute juste jamais dans l'environnement où ça compte.


Problème 5 — La fenêtre de gap lors du rolling update

Symptôme

Même avec une gestion propre de SIGTERM, il restait une fenêtre lors des rolling updates où le cluster n'avait aucun scheduler fonctionnel. Le vieux pod était déjà terminé ; le nouveau pod était encore en train de se connecter à NATS, d'initialiser la leader election, et d'établir son rôle. Pendant cette fenêtre, le dispatch s'arrêtait simplement.

Cause

Kubernetes n'a aucune visibilité sur le fait que le nouveau scheduler est réellement prêt. Sans readiness probe reflétant l'état d'initialisation réel, Kubernetes termine le vieux pod dès que le processus du nouveau démarre — avant que NATS soit connecté ou que la leader election soit terminée. Pour un scheduler de monitoring, même quelques secondes de downtime à chaque déploiement laissent des trous dans les données d'uptime à chaque mise à jour.

Solution

La solution fut de créer une classe K8sScheduler, un wrapper léger autour de TaskiqScheduler qui écrit un fichier de readiness seulement après initialisation complète :

class K8sScheduler(TaskiqScheduler):
    def __init__(self, broker, sources, servers, leader_key):
        leader = LeaderElection(servers=servers, key=leader_key)
        super().__init__(
            broker=broker,
            sources=[HAScheduleSource(s, leader) for s in sources]
        )

    async def startup(self) -> None:
        await super().startup()
        # super().startup() exécute source.startup() pour toutes les sources,
        # qui exécute LeaderElection.startup() — pleinement connecté + rôle établi
        Path("/tmp/ready").touch()

La readiness probe dans le manifest Kubernetes Deployment :

probe:
  readiness:
    exec:
      command: ["test", "-f", "/tmp/ready"]
    initialDelaySeconds: 5
    periodSeconds: 3
    failureThreshold: 10

Avec ce setup, la séquence de rolling update est :

  1. Le nouveau pod démarre, se connecte à NATS, établit son rôle dans la leader election → /tmp/ready écrit
  2. Kubernetes marque le nouveau pod comme Ready
  3. Seulement alors Kubernetes commence à terminer le vieux pod (SIGTERM)
  4. Le vieux pod reçoit SIGTERM → arrêt propre → clé leader supprimée
  5. Le nouveau pod (déjà connecté, déjà en train de surveiller) acquiert immédiatement le leadership

Zéro gap. Le transfert de leadership se produit proprement, séquentiellement, sans que les deux pods soient jamais simultanément absents du cluster.

Le PodDisruptionBudget (minAvailable: 1) impose qu'au moins un replica de scheduler soit vivant lors de toute perturbation :

replicaCount: 2  # mode HA

pdb:
  minAvailable: 1

affinity:
  podAntiAffinity:
    preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        podAffinityTerm:
          topologyKey: kubernetes.io/hostname

La règle d'anti-affinité est preferred (souple), pas required (stricte). Avec un cluster à 3 nœuds, cela signifie que les deux replicas sont généralement sur des nœuds différents (survivant à la défaillance d'un nœud) mais ne bloqueront pas le scheduling si le cluster est sous pression de ressources.


Timeline de failover (état stable)

Avec tout ce qui précède en place :

Leader (scheduler A)NATS KVStandby (scheduler B) heartbeat CAS (t=0 → t=2s) crash (t=X) TTL expire (t=X+5s) limit marker kv.update CAS (last=revision) acquiert le leadership vérifie slot markers dispatche les tâches suivantes (t=X+6s)
click to expand

Fenêtre de failover maximum sur crash : 5 secondes (expiration TTL), après quoi le nouveau leader reprend le dispatch en vérifiant les slot markers pour éviter les doublons. Gap maximum sur arrêt propre : 0 (suppression explicite de la clé, takeover sub-seconde).


Et la suite : contribuer aux librairies

Deux de ces corrections sont assez génériques pour vivre dans les librairies elles-mêmes plutôt que dans le code de Perimon :

  1. ResilientPullBroker — appartient à taskiq-nats. Tout déploiement de production utilisant PullBasedJetStreamBroker avec réplication NATS ou redémarrages de nœuds rencontrera ce bug.
  2. La race condition du buffer TCP nats.py — le fix d'une ligne (buffered = watcher._sub.pending_msgs ajouté à la condition du sentinel) appartient à nats.py lui-même. Plan : ouvrir une issue avec un test de reproduction.

L'objectif est de valider la stabilité des rolling upgrades sur quelques semaines d'exploitation avant d'ouvrir les PRs. Le NATS reconnect handling tourne depuis le 2026-03-17 sans incident ; la stack HA complète (leader election + slot dedup + fix SIGTERM) a été déployée le 2026-03-24.


Enseignements

1. « Semble sain » n'est pas « est sain ». Le consumer obsolète et silencieux était le bug le plus dangereux de la liste. Métriques et health probes — tout passait. Le seul symptôme était une queue depth grandissante et des résultats d'uptime manquants — et seulement si on regardait.

2. Les internals des librairies comptent aux frontières. Tant la race condition du buffer TCP nats.py que le comportement des TTL limit markers ont nécessité de lire le code source. Rien n'était documenté. Au niveau infrastructure, « c'est une dépendance tierce » cesse d'être une excuse quand votre logique métier dépend de ses cas limites.

3. L'arrêt gracieux est une propriété du système, pas seulement du code. LeaderElection.shutdown() était correct dès le premier jour. Il était inutile parce que le signal qui le déclenche (SIGTERM) n'avait jamais été câblé. Le chemin d'arrêt doit être validé dans l'environnement de déploiement, pas juste dans les tests locaux.

4. At-least-once est le bon compromis pour le monitoring. Choisir « slot markers post-dispatch » plutôt que « slot markers pré-dispatch » était un choix délibéré : doublons rares plutôt que gaps rares. Tout système de monitoring qui peut rater silencieusement des événements est cassé par design.

5. NATS KV peut remplacer beaucoup d'infrastructure. Leader election, verrous distribués, slot markers, stockage de planning, cache de réponses — tout dans un seul bucket NATS KV. Pas de Zookeeper, pas d'etcd. Pour un petit cluster, cette simplicité vaut beaucoup.


Glossaire

NATS JetStream — La couche de persistance de NATS : messages durables, consumers, streams, et un KV store intégré. Distinct du NATS core (pub/sub sans garantie de livraison).

Pull consumer — Un consumer JetStream où le client demande explicitement des lots de messages. Opposé du push, où le serveur les pousse automatiquement. taskiq-nats utilise le pull pour contrôler la pression de traitement.

KV store — Stockage clé-valeur. NATS JetStream en expose un nativement, utilisé ici à la fois pour le verrou leader et les slot markers.

CAS (compare-and-swap) — Opération atomique : « écrire cette valeur seulement si la révision courante correspond à celle attendue. » Empêche deux instances d'écrire simultanément sans détecter le conflit.

TTL (Time To Live) — Durée de vie d'une clé. Passé ce délai, NATS invalide automatiquement la clé et notifie les watchers.

Limit marker — La notification que NATS délivre aux watchers quand une clé expire par TTL. Elle porte un numéro de révision mais pas de header KV-Operation — distincte d'un DELETE ou PURGE explicite.

Leader election — Le processus par lequel une instance parmi plusieurs revendique le rôle de coordinatrice. Implémenté ici via un kv.create() atomique avec renouvellement TTL par heartbeat.

Slot marker — Clé KV de courte durée écrite après chaque envoi de tâche, encodant la paire (tâche, fenêtre temporelle). Sert à éviter un re-dispatch après un failover.

SIGTERM — Le signal Unix envoyé par Kubernetes à un pod avant de le terminer, laissant au processus une fenêtre pour se nettoyer proprement. Distinct de SIGKILL, qui est immédiat et non interceptable.