Monitoring is one of those rare domains where "mostly works" isn't good enough. If your product scheduler goes down for 30 seconds during a rolling Kubernetes update and nobody notices, that's fine. If your monitoring scheduler goes down for 30 seconds and nobody notices — you've just introduced the exact failure mode your users pay you to detect. You're blind.
This post documents the journey from a single-replica, fragile monitoring scheduler to a proper high-availability setup on NATS JetStream and taskiq, running on Kubernetes. It was not a planned sprint. It was a sequence of incidents and quiet bugs, each fix revealing the next problem. Five problems, five fixes — and a nats.py bug that only surfaced by reading the source.
The system in brief
Perimon's monitoring backend has three components: a Web API (FastAPI), a monitoring worker (consumes tasks from NATS JetStream, runs HTTP/SSL checks), and a monitoring scheduler (paces the execution of monitoring tasks). The message infrastructure is NATS JetStream, used both as the task queue (via taskiq-nats) and as the KV store for coordination.
The scheduler is the heartbeat of the whole system. If it goes down, checks stop silently — no errors, no logs, just nothing. The workers drain their queue and go quiet. From the outside, Perimon looks healthy. Internally, every monitored website just became invisible.
This creates an asymmetry in acceptable failure modes. A duplicate check is annoying but recoverable. A missed check is a false negative: a website may have gone down and Perimon didn't notice. For monitoring, false negatives are worse than duplicates. That trade-off shapes every design decision below.
Problem 1 — The worker that stopped listening (silently)
Symptom
After a NATS server restart (or a brief network partition), the monitoring workers appeared healthy. K8s readiness probes passed, NATS connections reported as alive, memory and CPU were normal. But no tasks were being consumed. The queue depth climbed. No errors in the logs.
Root cause
taskiq-nats uses a pull-based JetStream consumer: the worker explicitly calls consumer.fetch() to poll for messages. The PullBasedJetStreamBroker.listen() loop from the taskiq-nats library looks roughly like:
while True:
try:
messages = await self.consumer.fetch(
batch=self.pull_consume_batch,
timeout=self.pull_consume_timeout,
)
for msg in messages:
yield AckableMessage(data=msg.data, ack=msg.ack)
except NatsTimeoutError:
continue # normal empty poll
When timeout=None (or a very large value), fetch() blocks indefinitely until messages arrive. After a NATS reconnect, nats.py rebuilds the connection at the transport layer — but the JetStream context (self.js) and the pull consumer (self.consumer) become stale. They were created against the previous connection. fetch() is now waiting on a dead subscription that will never deliver.
The reconnected_cb fires and logs something, but nobody is watching it. The listen loop is stuck in await self.consumer.fetch(...) forever. The worker is alive; it just never exits the fetch() call.
Solution
The solution was to create a ResilientPullBroker class, extending PullBasedJetStreamBroker with a _reinit_event: asyncio.Event and overwriting the listen() loop to race two futures on every iteration:
Consumer loop reset
async def listen(self):
while True:
fetch_task = None
reinit_task = None
try:
fetch_task = asyncio.ensure_future(
self.consumer.fetch(
batch=self.pull_consume_batch,
timeout=self.pull_consume_timeout,
)
)
reinit_task = asyncio.ensure_future(self._reinit_event.wait())
done, pending = await asyncio.wait(
{fetch_task, reinit_task},
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
if reinit_task in done:
self._reinit_event.clear()
await self._reinit_consumer()
continue
messages = fetch_task.result()
for msg in messages:
yield AckableMessage(data=msg.data, ack=msg.ack)
except NatsTimeoutError:
continue
except Exception as e:
_logger.warning(
f"Pull consumer fetch error ({type(e).__name__}: {e})"
" — reinitializing consumer"
)
await self._reinit_consumer()
finally:
# On shutdown (CancelledError), cancel orphaned tasks
for t in [fetch_task, reinit_task]:
if t is not None and not t.done():
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
Every iteration now races fetch() against _reinit_event.wait(). When reconnected_cb fires (wrapped in the constructor), it sets the event. The running iteration cancels the pending fetch(), clears the event, and calls _reinit_consumer():
async def _reinit_consumer(self) -> None:
_logger.warning("Reconnect detected — reinitializing pull consumer")
try:
self.js = self.client.jetstream()
await self._startup_consumer()
_logger.info("Pull consumer reinitialized successfully")
except Exception as e:
_logger.error(f"Consumer reinitialization failed: {e}")
os.kill(os.getpid(), signal.SIGTERM)
If reinitialization itself fails (e.g. NATS is still down), the process SIGTERMs itself. Kubernetes restarts the pod. The alternative — silently looping in a broken state — is worse.
Two implementation details worth noting: _reinit_event is created in startup() — not __init__() — because asyncio events must bind to a running event loop. And a _shutting_down flag suppresses all NATS lifecycle callbacks during a clean broker.shutdown(), so the closed_cb that normally SIGTERMs the process doesn't fire on intentional shutdown.
Problem 2 — The scheduler fetching an empty schedule
Symptom
Some dispatch cycles would fire with no tasks — as if the schedule were empty. Not all tasks missed, not every cycle: the issue was intermittent and most reliably reproducible right after a scheduler pod restart, when the first few cron dispatches would be missing. On the next cycle, everything was fine.
Root cause
taskiq-nats provides NATSKeyValueScheduleSource — a schedule source backed by NATS KV. The scheduler calls get_schedules() on every dispatch cycle. Each call opens a kv.watch() with a sentinel: when the watcher delivers a None entry, it signals "all current keys have been delivered", and the list is considered complete. The race condition is present on every one of those calls.
The problem is a race condition in nats.py's KV watch implementation:
kv.watch() subscribes → gets consumer_info → checks:
if num_pending == 0 and received == 0 → fire sentinel immediately
num_pending is a JetStream server-side counter of messages waiting to be delivered to the consumer. received is a client-side counter of messages already processed. The sentinel logic is: "if the server says there's nothing pending and we haven't received anything yet, there's nothing to read."
The problem: nats.py calls asyncio.sleep(0) (a single event loop yield) between subscribing and calling consumer_info(). In that single yield, the server may have already marked num_pending=0 (all messages sent) while the messages are still sitting in the TCP buffer / _pending_queue on the client side. received=0 is correct — they haven't been processed yet — but num_pending=0 is also technically correct from the server's perspective. The sentinel fires immediately. kv.watch() returns with an empty result. No schedules.
The proper fix in nats.py would be:
buffered = watcher._sub.pending_msgs
if cinfo.num_pending == 0 and received == 0 and buffered == 0: # add buffered check
# only fire sentinel if TCP buffer is also empty
This fix has not been merged into the library as of writing (2026-03-24). The nats.py issue is unfiled — this was diagnosed by reading the source, not from any documentation.
Solution
The solution was to create a ResilientNATSScheduleSource class. The workaround is pragmatic: retry up to 3 times with a 10ms sleep between attempts, long enough for the TCP buffer to drain in all realistic scenarios:
class ResilientNATSScheduleSource(NATSKeyValueScheduleSource):
_resilience_logger = logging.getLogger("Scheduler.Resilience")
async def get_schedules(self, _retries: int = 3) -> list[ScheduledTask]:
schedules = await super().get_schedules()
if schedules:
return schedules
if _retries == 0:
self._resilience_logger.error(
"get_schedules returned empty after all retries — returning []"
)
return []
self._resilience_logger.warning(
"get_schedules returned empty — retrying (%d left)", _retries
)
await asyncio.sleep(0.01)
return await self.get_schedules(_retries=_retries - 1)
In practice, the first retry (after 10ms) succeeds essentially always. The error log on exhausted retries serves as an indicator if the bucket genuinely has no schedules — which should never happen since monitoring jobs are registered on application startup.
Problem 3 — Two replicas dispatch the same task twice
Symptom
With the worker resilience sorted, the next goal was scheduler HA. A single-replica scheduler means a crash equals downtime. The fix seems obvious: run two replicas. The problem is equally obvious: two schedulers will both see the same cron task firing, both dispatch it, and users get double results.
The question becomes: how do you run two instances of a task dispatcher while ensuring exactly one dispatches each slot?
Root cause
Two independent processes see the same cron schedule and dispatch independently — there is no coordination layer between replicas.
There are established patterns for this: distributed locks (Zookeeper, etcd), database advisory locks, external cron services. Since Perimon already uses NATS KV for coordination storage, using it for distributed locking as well meant zero new infrastructure dependencies.
Solution
The solution has three layers, each addressing a distinct failure mode discovered while implementing the previous one.
Leader election via NATS KV
The design is a distributed lock over NATS KV with per-message TTL heartbeat:
- One instance tries to
kv.create(key, instance_id, msg_ttl=5s)on startup. kv.create()is atomic — at most one instance succeeds. The winner is the leader.- The leader renews the lock every 2 seconds via
kv.update(key, ..., last=last_revision, msg_ttl=5s). This is a CAS (compare-and-swap) operation: it only succeeds iflast_revisionmatches the current KV sequence, preventing split-brain scenarios where two instances both believe they're the leader. - The standby watches the key with
kv.watch(key). When the key disappears (TTL expiry on crash, or explicit delete on clean shutdown), the standby attempts takeover. - Failover gap: the leader key TTL is 5 seconds. Maximum time from crash to new leader acquiring the lock is ≤5 seconds (one missed cron slot at worst).
Leader election
The standby's role
async def _heartbeat(self) -> None:
while True:
await asyncio.sleep(self._heartbeat_interval) # 2s
if not self._is_leader or self._kv is None:
continue
try:
self._last_revision = await self._kv.update(
self._key,
self._instance_id.encode(),
last=self._last_revision,
msg_ttl=self._msg_ttl,
)
except KeyWrongLastSequenceError:
self._logger.warning("heartbeat CAS conflict — demoting to standby")
self._is_leader = False
self._last_revision = None
except Exception as exc:
self._logger.warning(f"heartbeat error: {exc}")
The CAS on the heartbeat is intentional. If another instance somehow acquired the key (theoretical split-brain), the leader's kv.update(last=...) will fail with KeyWrongLastSequenceError because the revision no longer matches. The leader demotes itself rather than fighting. This is the safer outcome.
The undocumented NATS TTL marker problem
When a key expires by TTL, NATS doesn't silently remove it — it delivers a limit marker to all watchers, so they can distinguish "key expired" from "no messages yet". This marker has Nats-Marker-Reason: MaxAge in its headers but, crucially, no KV-Operation header (unlike an explicit DELETE or PURGE). In nats.py, this means entry.operation is None.
The initial _watch() implementation checked entry.operation is not None to detect key deletion and trigger takeover — correct for explicit deletes. But on TTL expiry, entry.operation is None. The standby never triggered.
Worse: on TTL expiry, kv.create() fails. You might expect that if the key is gone, kv.create() would succeed. But the limit marker has a JetStream sequence (a revision number). kv.create() internally calls kv.update(last=0) (no previous revision expected). The limit marker's sequence is non-zero, so the CAS fails with BadRequestError.
The correct path for TTL takeover is kv.update(last=entry.revision) — a CAS directly on the limit marker's sequence number. Exactly one standby wins; the rest get KeyWrongLastSequenceError.
async def _watch(self) -> None:
while True:
try:
watcher = await self._kv.watch(self._key)
async for entry in watcher:
if entry is None:
continue
# Two distinct deletion signals:
# - explicit DEL/PURGE: entry.operation is not None
# - TTL limit marker: entry.operation is None AND entry.value is empty
key_is_gone = entry.operation is not None or not entry.value
if key_is_gone and not self._is_leader:
self._logger.info("Leader key deleted — attempting takeover")
await self._try_acquire(last_revision=entry.revision)
except Exception as exc:
self._logger.warning(f"watcher error: {exc} — restarting in 2s")
await asyncio.sleep(2)
And _try_acquire() handles both paths:
async def _try_acquire(self, last_revision: int | None = None) -> None:
try:
if last_revision is not None:
# TTL expiry path: CAS on the limit marker's sequence
self._last_revision = await self._kv.update(
self._key,
self._instance_id.encode(),
last=last_revision,
msg_ttl=self._msg_ttl,
)
else:
# Startup path: clean create (no prior key)
self._last_revision = await self._kv.create(
self._key, self._instance_id.encode(), msg_ttl=self._msg_ttl
)
self._is_leader = True
except (KeyWrongLastSequenceError, BadRequestError):
# Another instance won the race
self._is_leader = False
This behavior is not documented in the nats.py README or API docs. It was discovered by reading JetStream internals and observing BadRequestError in the logs when the expected KeyWrongLastSequenceError never appeared.
Slot deduplication: the at-least-once layer
Leader election ensures only one instance dispatches. But leader election has a gap: if the leader crashes after dispatching a task but before the new leader acquires the lock, the new leader will re-dispatch the same slot. This is the "at-least-once" problem.
The solution is slot markers: a short-lived KV key written after each dispatch, identifying the specific (task, time window) pair. The HAScheduleSource wraps any ScheduleSource with two hooks. On pre_send, it gates on leadership first (standby raises ScheduledTaskCancelledError immediately), then checks for an existing slot marker for this task+window (duplicate from a previous leader — also cancelled). On post_send, it writes the marker: kv.create(slot_key, b"1", msg_ttl=...).
The marker is written after dispatch: a crash between dispatch and write causes a rare duplicate, but a pre-dispatch crash would cause a gap — worse for monitoring. The slot key encodes the task and time window; the TTL is calibrated to expire just before the next legitimate fire.
Problem 4 — SIGTERM kills the leader key cleanup
Symptom
During a rolling Kubernetes update, the failing pod would be sent SIGTERM. Expected behavior: the leader runs LeaderElection.shutdown(), which explicitly deletes the leader key. The standby sees the key deletion, immediately runs _try_acquire(), and takes over. Failover in milliseconds.
Actual behavior: the failing pod was killed immediately. No shutdown log. No key deletion. The standby waited the full 5 seconds for TTL expiry before taking over.
5 seconds of scheduler downtime on every deployment.
Root cause
asyncio signal handlers are registered via loop.add_signal_handler(). taskiq's scheduler runner registers a handler for SIGINT — which Python maps to KeyboardInterrupt, which the asyncio event loop converts to a CancelledError on the main task, which taskiq catches to run its graceful shutdown sequence.
But SIGTERM is a different signal. The OS default action for SIGTERM is immediate process termination. No Python exception. No asyncio. No shutdown handlers.
In Kubernetes, pods always receive SIGTERM (not SIGINT) during graceful termination. The graceful shutdown path was being bypassed on every single deployment.
Solution
def _register_sigterm_handler() -> None:
"""Make SIGTERM behave like SIGINT: cancel the main asyncio task so that
taskiq's CancelledError handler runs the graceful scheduler shutdown."""
loop = asyncio.get_running_loop()
main_task = asyncio.current_task()
loop.add_signal_handler(signal.SIGTERM, main_task.cancel)
This routes SIGTERM through the same CancelledError path that SIGINT already uses. The existing taskiq shutdown handler runs identically for both signals. _register_sigterm_handler() is called from LeaderElection.startup().
The impact: on SIGTERM, the standby now acquires leadership within milliseconds instead of ≤5 seconds.
Problem 5 — The rolling update gap
Symptom
Even with clean SIGTERM handling, there was a window during rolling updates where the cluster had zero functioning schedulers. The old pod was already terminated; the new pod was still connecting to NATS, initializing leader election, and establishing its role.
Root cause
Kubernetes has no visibility into whether the new scheduler is actually ready. Without a readiness probe reflecting true initialization state, Kubernetes terminates the old pod as soon as the new one's process starts — before NATS is connected or leader election is complete.
Solution
A K8sScheduler class writes a readiness file only after full initialization:
class K8sScheduler(TaskiqScheduler):
def __init__(self, broker, sources, servers, leader_key):
leader = LeaderElection(servers=servers, key=leader_key)
super().__init__(
broker=broker,
sources=[HAScheduleSource(s, leader) for s in sources]
)
async def startup(self) -> None:
await super().startup()
# super().startup() runs LeaderElection.startup() — fully connected + role established
Path("/tmp/ready").touch()
Kubernetes readiness probe:
probe:
readiness:
exec:
command: ["test", "-f", "/tmp/ready"]
initialDelaySeconds: 5
periodSeconds: 3
failureThreshold: 10
With this setup, the rolling update sequence is:
- New pod starts, connects to NATS, establishes leader election role →
/tmp/readywritten - Kubernetes marks new pod as Ready
- Only then does Kubernetes begin terminating the old pod (SIGTERM)
- Old pod receives SIGTERM → clean shutdown → leader key deleted
- New pod (already connected, already watching) immediately acquires leadership
No gap.
replicaCount: 2 # HA mode
pdb:
minAvailable: 1
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
topologyKey: kubernetes.io/hostname
Failover timeline (steady state)
Maximum failover window on crash: 5 seconds. Maximum gap on clean shutdown: 0.
What's next: contributing back
Two of these fixes are generic enough to live in the libraries themselves:
ResilientPullBroker— belongs intaskiq-nats. Any production deployment usingPullBasedJetStreamBrokerwith NATS replication or node restarts will hit this bug.- The nats.py TCP buffer race — the one-line fix belongs in
nats.pyitself. Planning to file the issue with a reproducible test case.
The plan is to validate rolling upgrade stability over a few weeks before opening the PRs.
Lessons
1. "Appears healthy" is not "is healthy." The silent stale consumer was the most dangerous bug. Metrics and health probes all passed. The only symptom was a growing queue depth and missing uptime results.
2. Library internals matter at the edges. Both the nats.py TCP buffer race and the TTL limit marker behavior required reading the source. Neither was documented.
3. Graceful shutdown is a system property, not just a code property. LeaderElection.shutdown() was correct from day one. It was useless because the signal that triggers it (SIGTERM) was never wired up.
4. At-least-once is the right trade-off for monitoring. Choosing "post-dispatch slot markers" over "pre-dispatch slot markers" was a deliberate call: rare duplicates over rare gaps.
5. NATS KV can replace a lot of infrastructure. Leader election, distributed locks, slot markers, schedule storage, response caching — all in one NATS KV bucket. No Zookeeper, no etcd.
Glossary
NATS JetStream — The persistence layer of NATS: durable messages, consumers, streams, and a built-in KV store.
Pull consumer — A JetStream consumer where the client explicitly requests batches of messages. taskiq-nats uses pull for backpressure control.
CAS (compare-and-swap) — An atomic operation: "write this value only if the current revision matches what I expect."
TTL (Time To Live) — A per-key expiry. After the TTL elapses, NATS automatically invalidates the key and notifies watchers.
Limit marker — The notification NATS delivers to watchers when a key expires by TTL. It carries a revision number but no KV-Operation header — distinct from an explicit DELETE or PURGE.
Leader election — The process by which one instance among several claims the coordinator role. Here implemented as an atomic kv.create() with TTL heartbeat renewal.
Slot marker — A short-lived KV key written after each task dispatch, encoding the (task, time window) pair. Used to prevent re-dispatch after a failover.
SIGTERM — The Unix signal Kubernetes sends to a pod before terminating it. Distinct from SIGKILL, which is immediate and unblockable.
