Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions aai_cli/agent_cascade/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import concurrent.futures.thread as cf_thread
import contextlib
import threading
import time
from abc import abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
Expand Down Expand Up @@ -120,3 +121,27 @@ def spawn_thread(target: Callable[[], None]) -> Worker:
thread = threading.Thread(target=target, daemon=True) # pragma: no mutate
thread.start()
return thread


def final_tail(buffer: str, held: list[str], *, used_tool: bool) -> str:
"""End-of-stream remainder to flush: joined post-tool narration, else the live pre-tool buffer."""
return "".join(held) if used_tool else buffer


def approval_deadline(pause: brain.ApprovalPause) -> float | None:
"""The reply deadline across a write-approval pause: ``None`` (clock suspended) while the user
is deciding on a gated write — a slow y/n keypress must not trip the reply timeout — and a fresh
finite deadline once answered."""
return None if pause.active else time.monotonic() + REPLY_TIMEOUT_SECONDS


def is_final_turn(event: object, *, format_turns: bool) -> bool:
"""True for an end-of-turn that's the cue to generate a reply.

With formatting on, wait for the *formatted* turn (better text for the LLM); with it off the
server never sets ``turn_is_formatted``, so a bare end-of-turn is the cue — otherwise
``--no-format-turns`` would make the agent never reply.
"""
if not bool(getattr(event, "end_of_turn", False)):
return False
return bool(getattr(event, "turn_is_formatted", False)) or not format_turns
140 changes: 72 additions & 68 deletions aai_cli/agent_cascade/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,21 @@
from aai_cli.agent_cascade._runtime import (
Worker as _Worker,
)
from aai_cli.agent_cascade._runtime import (
approval_deadline as _approval_deadline,
)
from aai_cli.agent_cascade._runtime import (
detach_executor_threads_since as _detach_executor_threads_since,
)
from aai_cli.agent_cascade._runtime import (
executor_threads as _executor_threads,
)
from aai_cli.agent_cascade._runtime import (
final_tail as _final_tail,
)
from aai_cli.agent_cascade._runtime import (
is_final_turn as _is_final_turn,
)
from aai_cli.agent_cascade._runtime import (
new_history as _new_history,
)
Expand Down Expand Up @@ -100,10 +109,12 @@ class CascadeSession:
init=False, # pragma: no mutate
)
# Rotates the per-tool spoken fillers across turns (fillers[_filler_index % len]) so the same
# tool doesn't repeat one phrase. The rotation test pins the exact phrase sequence, so a shifted
# default or mutated increment is caught; the field's `init=` is equivalent (never constructed
# positionally), like the sibling fields, hence the pragma.
# tool doesn't repeat one phrase. The rotation test pins the exact phrase sequence; the field's
# `init=` is equivalent (never constructed positionally), like the siblings, hence the pragma.
_filler_index: int = field(default=0, init=False) # pragma: no mutate
# reply_started fired this turn (separate from _speaking) so an empty reply still brackets it.
# Reset per turn in _generate_reply (the killable line); init= is equivalent, hence the pragma.
_reply_started: bool = field(default=False, init=False) # pragma: no mutate

def greet(self) -> None:
"""Speak the opening greeting (if any) and seed it into the history so the
Expand Down Expand Up @@ -149,18 +160,16 @@ def _silence(self, *, audible_only: bool) -> bool:
"""Cancel an in-flight reply — signal the worker and flush queued audio — and report
whether anything was cancelled.

The audible cases are always cancelled: the greeting (enqueued with no worker), a reply
in its speak-and-enqueue phase (``_speaking``), and the *tail* of a reply whose worker
has finished enqueuing but whose audio is still draining (``pending() > 0``).

``audible_only`` decides whether the *thinking* phase counts too. A spoken barge-in
passes ``False`` to cancel even a reply still being generated — the user has moved on,
so it must not speak once it lands. A UI interrupt passes ``True`` to leave thinking
alone: there's no audio to cut and the blocking graph call can't observe the stop flag,
so cancelling would be a no-op — and crucially, returning False there lets the TUI's
Ctrl-C fall through to *quit* rather than be swallowed (you could otherwise never
Ctrl-C while the agent thinks). Setting the stop flag is harmless when nothing runs (the
next ``_start_reply`` clears it).
The audible cases are always cancelled: the greeting (enqueued with no worker), a reply in
its speak-and-enqueue phase (``_speaking``), and the *tail* of a reply whose worker finished
enqueuing but whose audio is still draining (``pending() > 0``).

``audible_only`` decides whether the *thinking* phase counts too. A spoken barge-in passes
``False`` to cancel even a reply still being generated. A UI interrupt passes ``True`` to
leave thinking alone (no audio to cut, the blocking graph call can't see the stop flag) —
and returning False there lets the TUI's Ctrl-C fall through to *quit* rather than be
swallowed. Setting the stop flag is harmless when nothing runs (next ``_start_reply`` clears
it).
"""
in_flight = self._speaking.is_set() or self.player.pending() > 0
if not audible_only:
Expand All @@ -179,13 +188,12 @@ def _barge_in(self) -> None:
def interrupt_reply(self) -> bool:
"""Silence a *speaking* reply without waiting for it; True if one was audible.

The UI-thread-safe counterpart to a spoken barge-in: the live TUI's Escape/Ctrl-C
calls this to silence the agent mid-reply (or mid-greeting) without the user having to
talk over it. Flushing the queued audio stops speech at once; a reply worker then sees
the stop flag, unwinds on its own, and emits ``reply_done`` so the front-end returns to
listening (the STT loop keeps running, so the next spoken turn is handled normally).
It deliberately does *not* join the worker — a join from the UI thread would deadlock
against the worker's own ``call_from_thread`` render hops.
The UI-thread-safe counterpart to a spoken barge-in: the live TUI's Escape/Ctrl-C calls
this to silence the agent mid-reply (or mid-greeting) without the user talking over it.
Flushing the queued audio stops speech at once; a reply worker then sees the stop flag,
unwinds, and emits ``reply_done`` so the front-end returns to listening (the STT loop keeps
running). It deliberately does *not* join the worker — a join from the UI thread would
deadlock against the worker's own ``call_from_thread`` render hops.

It reports False (and does nothing) while the reply is merely *thinking*, so the TUI's
Ctrl-C falls through to quit instead of being swallowed by a no-op interrupt.
Expand All @@ -206,6 +214,7 @@ def _start_reply(self) -> None:
def _generate_reply(self) -> None:
"""Stream the LLM reply, speak each clause as it lands, and record what was spoken
(so a barge-in still leaves the history alternating)."""
self._reply_started = False
messages: list[ChatCompletionMessageParam] = [
{"role": "system", "content": self.config.system_prompt},
*self.history,
Expand All @@ -219,16 +228,26 @@ def produce() -> None:
producer = threading.Thread(target=produce, daemon=True) # pragma: no mutate
producer.start()
spoken: list[str] = []
tail = self._consume(events, before, spoken)
# On a clean finish ``tail`` is the unspoken remainder to flush as one last clause; on
# any cut (barge-in, TTS/leg failure, timeout) it is None and nothing more is spoken.
if tail is not None and tail.strip():
self._speak([tail.strip()], spoken)
# Always record what was spoken — even after a mid-turn leg failure — so the history
# stays alternating and the next turn has the partial answer as context.
self._record_spoken(spoken)
self._speaking.clear()
self.renderer.reply_done(interrupted=self._stop.is_set())
try:
tail = self._consume(events, before, spoken)
# On a clean finish ``tail`` is the unspoken remainder to flush as one last clause; on a
# cut (barge-in, TTS/leg failure, timeout) it is None and nothing more is spoken.
if tail is not None and tail.strip():
self._speak([tail.strip()], spoken)
finally:
# Always finalize the turn: clearing the gate keeps a failure between an ApprovalPause's
# active/inactive events from wedging every later turn (and history stays alternating).
self._set_awaiting_approval(active=False)
self._record_spoken(spoken)
self._speaking.clear()
self._emit_reply_started() # bracket reply_done even for an empty (silent) reply
self.renderer.reply_done(interrupted=self._stop.is_set())

def _emit_reply_started(self) -> None:
"""Fire ``reply_started`` once per turn (idempotent), so every ``reply_done`` is bracketed."""
if not self._reply_started:
self._reply_started = True
self.renderer.reply_started()

def _set_awaiting_approval(self, *, active: bool) -> None:
"""Arm/disarm the voice-approval gate: while armed, ``on_turn`` routes the next final
Expand All @@ -246,6 +265,7 @@ def _consume(
failure, or a leg failure/timeout — which also surfaces the error)."""
deadline: float | None = time.monotonic() + _REPLY_TIMEOUT_SECONDS
buffer = ""
held: list[str] = [] # post-tool narration, joined once at end (O(n) vs per-delta concat)
spoke_filler = False # only the FIRST tool call of a turn says a spoken filler
used_tool = False # once a tool ran, hold text unspoken so only the final answer is read
while True:
Expand All @@ -257,7 +277,7 @@ def _consume(
self._surface_error(item.error, started=self._speaking.is_set())
return None
if isinstance(item, _Done):
return buffer
return _final_tail(buffer, held, used_tool=used_tool)
if isinstance(item, brain.ApprovalPause):
deadline = _approval_deadline(item)
self._set_awaiting_approval(active=item.active)
Expand All @@ -267,30 +287,33 @@ def _consume(
return None
spoke_filler = True
used_tool = True
buffer = "" # drop any unspoken preamble — the answer comes after the tool
buffer = "" # drop unspoken preamble + inter-tool narration; the answer follows
held.clear()
continue
if self._stop.is_set():
return None
# item is a streamed SpeechDelta (every other case returned or continued above).
tail = self._speak_delta(item, buffer, spoken, used_tool=used_tool)
# item is a streamed SpeechDelta (every other case returned/continued above).
tail = self._speak_delta(item, buffer, held, spoken, used_tool=used_tool)
if tail is None:
return None
buffer = tail

def _speak_delta(
self, item: brain.SpeechDelta, buffer: str, spoken: list[str], *, used_tool: bool
self,
item: brain.SpeechDelta,
buffer: str,
held: list[str],
spoken: list[str],
*,
used_tool: bool,
) -> str | None:
"""Fold one streamed delta into the running buffer and speak any completed clauses.

Before any tool call, clauses stream out as they land (low-latency speech). *After* a tool
call (``used_tool``) the deep agent tends to narrate verbose planning between tool calls;
that text is held in the buffer unspoken and discarded at the next tool call, so only the
final answer — whatever remains buffered when the stream finishes — is ever read aloud.

Marks the reply as speaking on the first spoken delta (so a UI interrupt can cut it).
Returns the new buffer, or ``None`` if a TTS failure cut the turn (the caller aborts)."""
"""Fold one streamed delta into the reply, speaking any completed clauses. Pre-tool, clauses
stream out as they land and the buffer is returned. *After* a tool call (``used_tool``) the
verbose planning is appended to ``held`` unspoken (joined once at end of stream, so only the
final answer is read — O(n)). Returns the new buffer, or ``None`` on a TTS failure."""
if used_tool:
return buffer + item.text
held.append(item.text)
return buffer
self._mark_speaking()
buffer += item.text
chunks, buffer = pop_clauses(buffer, min_chars=_MIN_CLAUSE_CHARS)
Expand All @@ -312,7 +335,7 @@ def _mark_speaking(self) -> None:
filler. Sets ``_speaking`` (so a UI interrupt can cut it) and fires ``reply_started`` once."""
if not self._speaking.is_set():
self._speaking.set()
self.renderer.reply_started()
self._emit_reply_started()

def _speak_filler(self, fillers: tuple[str, ...]) -> bool:
"""Say a short spoken filler ("Let me check") for the first tool call of a turn, so a
Expand Down Expand Up @@ -408,7 +431,7 @@ def _surface_error(self, exc: CLIError, *, started: bool) -> None:
spoken text already explains the turn). The caller still finalizes the turn."""
self._record_error(exc)
if not started:
self.renderer.reply_started()
self._emit_reply_started()
self.renderer.agent_transcript(f"(error: {exc.message})", interrupted=False)

def _record_error(self, exc: CLIError) -> None:
Expand All @@ -424,25 +447,6 @@ def shutdown(self) -> None:
self._join_reply()


def _approval_deadline(pause: brain.ApprovalPause) -> float | None:
"""The reply deadline across a write-approval pause: ``None`` (clock suspended) while the
user is deciding on a gated write — a slow y/n keypress must not trip the reply timeout — and
a fresh finite deadline once answered."""
return None if pause.active else time.monotonic() + _REPLY_TIMEOUT_SECONDS


def _is_final_turn(event: object, *, format_turns: bool) -> bool:
"""True for an end-of-turn that's the cue to generate a reply.

With formatting on, wait for the *formatted* turn (better text for the LLM);
with it off the server never sets ``turn_is_formatted``, so a bare end-of-turn
is the cue — otherwise ``--no-format-turns`` would make the agent never reply.
"""
if not bool(getattr(event, "end_of_turn", False)):
return False
return bool(getattr(event, "turn_is_formatted", False)) or not format_turns


def run_cascade(
*,
renderer: Renderer,
Expand Down
25 changes: 18 additions & 7 deletions aai_cli/agent_cascade/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

The approval modal already shows *what* a tool will do; for the genuinely dangerous calls it
also shows *why to look twice* — a one-line warning, the way deepagents-code badges suspicious
shell commands and URLs. Purely advisory (the real SSRF guard lives in ``fetch_tool``); this
only nudges the human reviewing a manual approval. Pure functions so they unit-test cleanly.
shell commands and URLs. Advisory: it nudges the human reviewing a manual approval. The real
SSRF guard is :func:`url_is_internal`, which ``webpage_tool`` consults to refuse an internal
fetch outright. Pure functions so they unit-test cleanly.
"""

from __future__ import annotations

import re
from collections.abc import Mapping

# The fetch tool's name, inlined here — its defining module lived in the removed
# `assembly code` agent. Risk scoring is purely advisory.
FETCH_TOOL_NAME = "fetch_url"
# The live agent's read-a-URL tool name (``webpage_tool.READ_URL_TOOL_NAME``), inlined to avoid a
# circular import (``webpage_tool`` consults this module for the SSRF check). Risk scoring is
# advisory; the enforced SSRF refusal lives in :func:`url_is_internal`.
URL_TOOL_NAME = "read_url"

# Shell fragments that can destroy data, escalate privileges, or pipe a remote script straight
# into a shell — the classic "are you sure?" cases. Word-ish boundaries avoid matching inside
Expand Down Expand Up @@ -53,17 +55,26 @@ def _url_warning(url: str) -> str | None:
return None


def url_is_internal(url: str) -> bool:
"""True when ``url`` is SSRF-relevant — a local/internal address or a ``file://`` target.

The live ``read_url`` tool refuses these outright (the enforced network-fetch guard, since an
agent-chosen URL can be steered to cloud metadata / internal services by web content it read).
"""
return _url_warning(url) is not None


def risk_warning(name: str, args: Mapping[str, object]) -> str | None:
"""A one-line caution for a risky tool call, or ``None`` when nothing stands out.

Flags destructive/privileged shell commands (``execute``) and fetches aimed at local or
Flags destructive/privileged shell commands (``execute``) and URL reads aimed at local or
``file://`` targets; everything else returns ``None``.
"""
if name == "execute":
command = args.get("command")
if isinstance(command, str):
return _shell_warning(command)
elif name == FETCH_TOOL_NAME:
elif name == URL_TOOL_NAME:
url = args.get("url")
if isinstance(url, str):
return _url_warning(url)
Expand Down
Loading
Loading