Back to articles
Tech·· 19 min read

Building HA Monitoring on NATS JetStream + taskiq in Kubernetes

From a single-replica fragile scheduler to a proper HA setup: five silent bugs, five fixes — and a nats.py race condition only visible by reading the source.

Building HA Monitoring on NATS JetStream + taskiq in Kubernetes

Monitoring is one of those rare domains where "mostly works" isn't good enough. If your product scheduler goes down for 30 seconds during a rolling Kubernetes update and nobody notices, that's fine. If your monitoring scheduler goes down for 30 seconds and nobody notices — you've just introduced the exact failure mode your users pay you to detect. You're blind.

This post documents the journey from a single-replica, fragile monitoring scheduler to a proper high-availability setup on NATS JetStream and taskiq, running on Kubernetes. It was not a planned sprint. It was a sequence of incidents and quiet bugs, each fix revealing the next problem. Five problems, five fixes — and a nats.py bug that only surfaced by reading the source.


The system in brief

Perimon's monitoring backend has three components: a Web API (FastAPI), a monitoring worker (consumes tasks from NATS JetStream, runs HTTP/SSL checks), and a monitoring scheduler (paces the execution of monitoring tasks). The message infrastructure is NATS JetStream, used both as the task queue (via taskiq-nats) and as the KV store for coordination.

perimonMonitored sitesHTTP / SSLWeb APIFastAPINATSMonitoring SchedulerMonitoring workersN instance(s)KV StoreschedulesJetStreamcheck queue writes schedules reads schedules dispatch probes
click to expand

The scheduler is the heartbeat of the whole system. If it goes down, checks stop silently — no errors, no logs, just nothing. The workers drain their queue and go quiet. From the outside, Perimon looks healthy. Internally, every monitored website just became invisible.

This creates an asymmetry in acceptable failure modes. A duplicate check is annoying but recoverable. A missed check is a false negative: a website may have gone down and Perimon didn't notice. For monitoring, false negatives are worse than duplicates. That trade-off shapes every design decision below.


Problem 1 — The worker that stopped listening (silently)

Symptom

After a NATS server restart (or a brief network partition), the monitoring workers appeared healthy. K8s readiness probes passed, NATS connections reported as alive, memory and CPU were normal. But no tasks were being consumed. The queue depth climbed. No errors in the logs.

Root cause

taskiq-nats uses a pull-based JetStream consumer: the worker explicitly calls consumer.fetch() to poll for messages. The PullBasedJetStreamBroker.listen() loop from the taskiq-nats library looks roughly like:

while True:
    try:
        messages = await self.consumer.fetch(
            batch=self.pull_consume_batch,
            timeout=self.pull_consume_timeout,
        )
        for msg in messages:
            yield AckableMessage(data=msg.data, ack=msg.ack)
    except NatsTimeoutError:
        continue  # normal empty poll

When timeout=None (or a very large value), fetch() blocks indefinitely until messages arrive. After a NATS reconnect, nats.py rebuilds the connection at the transport layer — but the JetStream context (self.js) and the pull consumer (self.consumer) become stale. They were created against the previous connection. fetch() is now waiting on a dead subscription that will never deliver.

The reconnected_cb fires and logs something, but nobody is watching it. The listen loop is stuck in await self.consumer.fetch(...) forever. The worker is alive; it just never exits the fetch() call.

Solution

The solution was to create a ResilientPullBroker class, extending PullBasedJetStreamBroker with a _reinit_event: asyncio.Event and overwriting the listen() loop to race two futures on every iteration:

Consumer loop reset

workerMonitored sitesHTTP / SSLlisten()consumer.fetch()_reinit_event.wait()(Event triggered by reconnect callback)asyncio.wait()First of the two events to completeProcess tasksreinit consumer Connection alive Connection reset consumer.fetch() waitingon a dead connection restart receive tasks
click to expand
async def listen(self):
    while True:
        fetch_task = None
        reinit_task = None
        try:
            fetch_task = asyncio.ensure_future(
                self.consumer.fetch(
                    batch=self.pull_consume_batch,
                    timeout=self.pull_consume_timeout,
                )
            )
            reinit_task = asyncio.ensure_future(self._reinit_event.wait())

            done, pending = await asyncio.wait(
                {fetch_task, reinit_task},
                return_when=asyncio.FIRST_COMPLETED,
            )
            for t in pending:
                t.cancel()
                try:
                    await t
                except (asyncio.CancelledError, Exception):
                    pass

            if reinit_task in done:
                self._reinit_event.clear()
                await self._reinit_consumer()
                continue

            messages = fetch_task.result()
            for msg in messages:
                yield AckableMessage(data=msg.data, ack=msg.ack)

        except NatsTimeoutError:
            continue
        except Exception as e:
            _logger.warning(
                f"Pull consumer fetch error ({type(e).__name__}: {e})"
                " — reinitializing consumer"
            )
            await self._reinit_consumer()
        finally:
            # On shutdown (CancelledError), cancel orphaned tasks
            for t in [fetch_task, reinit_task]:
                if t is not None and not t.done():
                    t.cancel()
                    try:
                        await t
                    except (asyncio.CancelledError, Exception):
                        pass

Every iteration now races fetch() against _reinit_event.wait(). When reconnected_cb fires (wrapped in the constructor), it sets the event. The running iteration cancels the pending fetch(), clears the event, and calls _reinit_consumer():

async def _reinit_consumer(self) -> None:
    _logger.warning("Reconnect detected — reinitializing pull consumer")
    try:
        self.js = self.client.jetstream()
        await self._startup_consumer()
        _logger.info("Pull consumer reinitialized successfully")
    except Exception as e:
        _logger.error(f"Consumer reinitialization failed: {e}")
        os.kill(os.getpid(), signal.SIGTERM)

If reinitialization itself fails (e.g. NATS is still down), the process SIGTERMs itself. Kubernetes restarts the pod. The alternative — silently looping in a broken state — is worse.

Two implementation details worth noting: _reinit_event is created in startup() — not __init__() — because asyncio events must bind to a running event loop. And a _shutting_down flag suppresses all NATS lifecycle callbacks during a clean broker.shutdown(), so the closed_cb that normally SIGTERMs the process doesn't fire on intentional shutdown.


Problem 2 — The scheduler fetching an empty schedule

Symptom

Some dispatch cycles would fire with no tasks — as if the schedule were empty. Not all tasks missed, not every cycle: the issue was intermittent and most reliably reproducible right after a scheduler pod restart, when the first few cron dispatches would be missing. On the next cycle, everything was fine.

Root cause

taskiq-nats provides NATSKeyValueScheduleSource — a schedule source backed by NATS KV. The scheduler calls get_schedules() on every dispatch cycle. Each call opens a kv.watch() with a sentinel: when the watcher delivers a None entry, it signals "all current keys have been delivered", and the list is considered complete. The race condition is present on every one of those calls.

The problem is a race condition in nats.py's KV watch implementation:

kv.watch() subscribes → gets consumer_info → checks:
  if num_pending == 0 and received == 0 → fire sentinel immediately

num_pending is a JetStream server-side counter of messages waiting to be delivered to the consumer. received is a client-side counter of messages already processed. The sentinel logic is: "if the server says there's nothing pending and we haven't received anything yet, there's nothing to read."

The problem: nats.py calls asyncio.sleep(0) (a single event loop yield) between subscribing and calling consumer_info(). In that single yield, the server may have already marked num_pending=0 (all messages sent) while the messages are still sitting in the TCP buffer / _pending_queue on the client side. received=0 is correct — they haven't been processed yet — but num_pending=0 is also technically correct from the server's perspective. The sentinel fires immediately. kv.watch() returns with an empty result. No schedules.

The proper fix in nats.py would be:

buffered = watcher._sub.pending_msgs
if cinfo.num_pending == 0 and received == 0 and buffered == 0:  # add buffered check
    # only fire sentinel if TCP buffer is also empty

This fix has not been merged into the library as of writing (2026-03-24). The nats.py issue is unfiled — this was diagnosed by reading the source, not from any documentation.

Solution

The solution was to create a ResilientNATSScheduleSource class. The workaround is pragmatic: retry up to 3 times with a 10ms sleep between attempts, long enough for the TCP buffer to drain in all realistic scenarios:

class ResilientNATSScheduleSource(NATSKeyValueScheduleSource):
    _resilience_logger = logging.getLogger("Scheduler.Resilience")

    async def get_schedules(self, _retries: int = 3) -> list[ScheduledTask]:
        schedules = await super().get_schedules()
        if schedules:
            return schedules

        if _retries == 0:
            self._resilience_logger.error(
                "get_schedules returned empty after all retries — returning []"
            )
            return []

        self._resilience_logger.warning(
            "get_schedules returned empty — retrying (%d left)", _retries
        )
        await asyncio.sleep(0.01)
        return await self.get_schedules(_retries=_retries - 1)

In practice, the first retry (after 10ms) succeeds essentially always. The error log on exhausted retries serves as an indicator if the bucket genuinely has no schedules — which should never happen since monitoring jobs are registered on application startup.


Problem 3 — Two replicas dispatch the same task twice

Symptom

With the worker resilience sorted, the next goal was scheduler HA. A single-replica scheduler means a crash equals downtime. The fix seems obvious: run two replicas. The problem is equally obvious: two schedulers will both see the same cron task firing, both dispatch it, and users get double results.

The question becomes: how do you run two instances of a task dispatcher while ensuring exactly one dispatches each slot?

Root cause

Two independent processes see the same cron schedule and dispatch independently — there is no coordination layer between replicas.

There are established patterns for this: distributed locks (Zookeeper, etcd), database advisory locks, external cron services. Since Perimon already uses NATS KV for coordination storage, using it for distributed locking as well meant zero new infrastructure dependencies.

Solution

The solution has three layers, each addressing a distinct failure mode discovered while implementing the previous one.

Leader election via NATS KV

The design is a distributed lock over NATS KV with per-message TTL heartbeat:

  • One instance tries to kv.create(key, instance_id, msg_ttl=5s) on startup.
  • kv.create() is atomic — at most one instance succeeds. The winner is the leader.
  • The leader renews the lock every 2 seconds via kv.update(key, ..., last=last_revision, msg_ttl=5s). This is a CAS (compare-and-swap) operation: it only succeeds if last_revision matches the current KV sequence, preventing split-brain scenarios where two instances both believe they're the leader.
  • The standby watches the key with kv.watch(key). When the key disappears (TTL expiry on crash, or explicit delete on clean shutdown), the standby attempts takeover.
  • Failover gap: the leader key TTL is 5 seconds. Maximum time from crash to new leader acquiring the lock is ≤5 seconds (one missed cron slot at worst).

Leader election

On startupNATS JetStreamscheduler-haNormal executionPod APod BPod APod BLeader ElectionLeader Electionleaderheartbeatstandbywatch Tries to become leader Tries to become leader 2s wins loses Refresh leader KV key Watch leader KV key
click to expand

The standby's role

SIGTERM signalNormal executionNATS JetStreamscheduler-haLeaderStandbyheartbeatshutdownwatchLeader ElectionStandbyNew Leader 2s stops the loop If no leader wins loses Watch leader KV key deletes leader KV key leader KV key deleted
click to expand
async def _heartbeat(self) -> None:
    while True:
        await asyncio.sleep(self._heartbeat_interval)  # 2s
        if not self._is_leader or self._kv is None:
            continue
        try:
            self._last_revision = await self._kv.update(
                self._key,
                self._instance_id.encode(),
                last=self._last_revision,
                msg_ttl=self._msg_ttl,
            )
        except KeyWrongLastSequenceError:
            self._logger.warning("heartbeat CAS conflict — demoting to standby")
            self._is_leader = False
            self._last_revision = None
        except Exception as exc:
            self._logger.warning(f"heartbeat error: {exc}")

The CAS on the heartbeat is intentional. If another instance somehow acquired the key (theoretical split-brain), the leader's kv.update(last=...) will fail with KeyWrongLastSequenceError because the revision no longer matches. The leader demotes itself rather than fighting. This is the safer outcome.

The undocumented NATS TTL marker problem

When a key expires by TTL, NATS doesn't silently remove it — it delivers a limit marker to all watchers, so they can distinguish "key expired" from "no messages yet". This marker has Nats-Marker-Reason: MaxAge in its headers but, crucially, no KV-Operation header (unlike an explicit DELETE or PURGE). In nats.py, this means entry.operation is None.

The initial _watch() implementation checked entry.operation is not None to detect key deletion and trigger takeover — correct for explicit deletes. But on TTL expiry, entry.operation is None. The standby never triggered.

Worse: on TTL expiry, kv.create() fails. You might expect that if the key is gone, kv.create() would succeed. But the limit marker has a JetStream sequence (a revision number). kv.create() internally calls kv.update(last=0) (no previous revision expected). The limit marker's sequence is non-zero, so the CAS fails with BadRequestError.

The correct path for TTL takeover is kv.update(last=entry.revision) — a CAS directly on the limit marker's sequence number. Exactly one standby wins; the rest get KeyWrongLastSequenceError.

async def _watch(self) -> None:
    while True:
        try:
            watcher = await self._kv.watch(self._key)
            async for entry in watcher:
                if entry is None:
                    continue

                # Two distinct deletion signals:
                # - explicit DEL/PURGE: entry.operation is not None
                # - TTL limit marker: entry.operation is None AND entry.value is empty
                key_is_gone = entry.operation is not None or not entry.value
                if key_is_gone and not self._is_leader:
                    self._logger.info("Leader key deleted — attempting takeover")
                    await self._try_acquire(last_revision=entry.revision)
        except Exception as exc:
            self._logger.warning(f"watcher error: {exc} — restarting in 2s")
            await asyncio.sleep(2)

And _try_acquire() handles both paths:

async def _try_acquire(self, last_revision: int | None = None) -> None:
    try:
        if last_revision is not None:
            # TTL expiry path: CAS on the limit marker's sequence
            self._last_revision = await self._kv.update(
                self._key,
                self._instance_id.encode(),
                last=last_revision,
                msg_ttl=self._msg_ttl,
            )
        else:
            # Startup path: clean create (no prior key)
            self._last_revision = await self._kv.create(
                self._key, self._instance_id.encode(), msg_ttl=self._msg_ttl
            )
        self._is_leader = True
    except (KeyWrongLastSequenceError, BadRequestError):
        # Another instance won the race
        self._is_leader = False

This behavior is not documented in the nats.py README or API docs. It was discovered by reading JetStream internals and observing BadRequestError in the logs when the expected KeyWrongLastSequenceError never appeared.

Slot deduplication: the at-least-once layer

Leader election ensures only one instance dispatches. But leader election has a gap: if the leader crashes after dispatching a task but before the new leader acquires the lock, the new leader will re-dispatch the same slot. This is the "at-least-once" problem.

The solution is slot markers: a short-lived KV key written after each dispatch, identifying the specific (task, time window) pair. The HAScheduleSource wraps any ScheduleSource with two hooks. On pre_send, it gates on leadership first (standby raises ScheduledTaskCancelledError immediately), then checks for an existing slot marker for this task+window (duplicate from a previous leader — also cancelled). On post_send, it writes the marker: kv.create(slot_key, b"1", msg_ttl=...).

schedulerEach taskleader ?slot markerexists ?skipDispatch taskWrite slot marker no yes yes no
click to expand

The marker is written after dispatch: a crash between dispatch and write causes a rare duplicate, but a pre-dispatch crash would cause a gap — worse for monitoring. The slot key encodes the task and time window; the TTL is calibrated to expire just before the next legitimate fire.


Problem 4 — SIGTERM kills the leader key cleanup

Symptom

During a rolling Kubernetes update, the failing pod would be sent SIGTERM. Expected behavior: the leader runs LeaderElection.shutdown(), which explicitly deletes the leader key. The standby sees the key deletion, immediately runs _try_acquire(), and takes over. Failover in milliseconds.

Actual behavior: the failing pod was killed immediately. No shutdown log. No key deletion. The standby waited the full 5 seconds for TTL expiry before taking over.

5 seconds of scheduler downtime on every deployment.

Root cause

asyncio signal handlers are registered via loop.add_signal_handler(). taskiq's scheduler runner registers a handler for SIGINT — which Python maps to KeyboardInterrupt, which the asyncio event loop converts to a CancelledError on the main task, which taskiq catches to run its graceful shutdown sequence.

But SIGTERM is a different signal. The OS default action for SIGTERM is immediate process termination. No Python exception. No asyncio. No shutdown handlers.

In Kubernetes, pods always receive SIGTERM (not SIGINT) during graceful termination. The graceful shutdown path was being bypassed on every single deployment.

Without the fixWith the fixSIGTERMOS kills processorphaned leader keystandby waits ≤ 5sSIGTERMmain_task.cancel()CancelledErrorkv.delete(key)standby acquires < 1s
click to expand

Solution

def _register_sigterm_handler() -> None:
    """Make SIGTERM behave like SIGINT: cancel the main asyncio task so that
    taskiq's CancelledError handler runs the graceful scheduler shutdown."""
    loop = asyncio.get_running_loop()
    main_task = asyncio.current_task()
    loop.add_signal_handler(signal.SIGTERM, main_task.cancel)

This routes SIGTERM through the same CancelledError path that SIGINT already uses. The existing taskiq shutdown handler runs identically for both signals. _register_sigterm_handler() is called from LeaderElection.startup().

The impact: on SIGTERM, the standby now acquires leadership within milliseconds instead of ≤5 seconds.


Problem 5 — The rolling update gap

Symptom

Even with clean SIGTERM handling, there was a window during rolling updates where the cluster had zero functioning schedulers. The old pod was already terminated; the new pod was still connecting to NATS, initializing leader election, and establishing its role.

Root cause

Kubernetes has no visibility into whether the new scheduler is actually ready. Without a readiness probe reflecting true initialization state, Kubernetes terminates the old pod as soon as the new one's process starts — before NATS is connected or leader election is complete.

Solution

A K8sScheduler class writes a readiness file only after full initialization:

class K8sScheduler(TaskiqScheduler):
    def __init__(self, broker, sources, servers, leader_key):
        leader = LeaderElection(servers=servers, key=leader_key)
        super().__init__(
            broker=broker,
            sources=[HAScheduleSource(s, leader) for s in sources]
        )

    async def startup(self) -> None:
        await super().startup()
        # super().startup() runs LeaderElection.startup() — fully connected + role established
        Path("/tmp/ready").touch()

Kubernetes readiness probe:

probe:
  readiness:
    exec:
      command: ["test", "-f", "/tmp/ready"]
    initialDelaySeconds: 5
    periodSeconds: 3
    failureThreshold: 10

With this setup, the rolling update sequence is:

  1. New pod starts, connects to NATS, establishes leader election role → /tmp/ready written
  2. Kubernetes marks new pod as Ready
  3. Only then does Kubernetes begin terminating the old pod (SIGTERM)
  4. Old pod receives SIGTERM → clean shutdown → leader key deleted
  5. New pod (already connected, already watching) immediately acquires leadership

No gap.

replicaCount: 2  # HA mode

pdb:
  minAvailable: 1

affinity:
  podAntiAffinity:
    preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        podAffinityTerm:
          topologyKey: kubernetes.io/hostname

Failover timeline (steady state)

Leader (scheduler A)NATS KVStandby (scheduler B) heartbeat CAS (t=0 → t=2s) crash (t=X) TTL expires (t=X+5s) limit marker kv.update CAS (last=revision) acquires leadership checks slot markers dispatches next tasks (t=X+6s)
click to expand

Maximum failover window on crash: 5 seconds. Maximum gap on clean shutdown: 0.


What's next: contributing back

Two of these fixes are generic enough to live in the libraries themselves:

  1. ResilientPullBroker — belongs in taskiq-nats. Any production deployment using PullBasedJetStreamBroker with NATS replication or node restarts will hit this bug.
  2. The nats.py TCP buffer race — the one-line fix belongs in nats.py itself. Planning to file the issue with a reproducible test case.

The plan is to validate rolling upgrade stability over a few weeks before opening the PRs.


Lessons

1. "Appears healthy" is not "is healthy." The silent stale consumer was the most dangerous bug. Metrics and health probes all passed. The only symptom was a growing queue depth and missing uptime results.

2. Library internals matter at the edges. Both the nats.py TCP buffer race and the TTL limit marker behavior required reading the source. Neither was documented.

3. Graceful shutdown is a system property, not just a code property. LeaderElection.shutdown() was correct from day one. It was useless because the signal that triggers it (SIGTERM) was never wired up.

4. At-least-once is the right trade-off for monitoring. Choosing "post-dispatch slot markers" over "pre-dispatch slot markers" was a deliberate call: rare duplicates over rare gaps.

5. NATS KV can replace a lot of infrastructure. Leader election, distributed locks, slot markers, schedule storage, response caching — all in one NATS KV bucket. No Zookeeper, no etcd.


Glossary

NATS JetStream — The persistence layer of NATS: durable messages, consumers, streams, and a built-in KV store.

Pull consumer — A JetStream consumer where the client explicitly requests batches of messages. taskiq-nats uses pull for backpressure control.

CAS (compare-and-swap) — An atomic operation: "write this value only if the current revision matches what I expect."

TTL (Time To Live) — A per-key expiry. After the TTL elapses, NATS automatically invalidates the key and notifies watchers.

Limit marker — The notification NATS delivers to watchers when a key expires by TTL. It carries a revision number but no KV-Operation header — distinct from an explicit DELETE or PURGE.

Leader election — The process by which one instance among several claims the coordinator role. Here implemented as an atomic kv.create() with TTL heartbeat renewal.

Slot marker — A short-lived KV key written after each task dispatch, encoding the (task, time window) pair. Used to prevent re-dispatch after a failover.

SIGTERM — The Unix signal Kubernetes sends to a pod before terminating it. Distinct from SIGKILL, which is immediate and unblockable.