diff --git a/aai_cli/agent_cascade/_runtime.py b/aai_cli/agent_cascade/_runtime.py index 2947d0c5..5c23ac0f 100644 --- a/aai_cli/agent_cascade/_runtime.py +++ b/aai_cli/agent_cascade/_runtime.py @@ -17,6 +17,7 @@ import concurrent.futures.thread as cf_thread import contextlib import threading +import time from abc import abstractmethod from collections.abc import Callable from dataclasses import dataclass @@ -120,3 +121,27 @@ def spawn_thread(target: Callable[[], None]) -> Worker: thread = threading.Thread(target=target, daemon=True) # pragma: no mutate thread.start() return thread + + +def final_tail(buffer: str, held: list[str], *, used_tool: bool) -> str: + """End-of-stream remainder to flush: joined post-tool narration, else the live pre-tool buffer.""" + return "".join(held) if used_tool else buffer + + +def approval_deadline(pause: brain.ApprovalPause) -> float | None: + """The reply deadline across a write-approval pause: ``None`` (clock suspended) while the user + is deciding on a gated write — a slow y/n keypress must not trip the reply timeout — and a fresh + finite deadline once answered.""" + return None if pause.active else time.monotonic() + REPLY_TIMEOUT_SECONDS + + +def is_final_turn(event: object, *, format_turns: bool) -> bool: + """True for an end-of-turn that's the cue to generate a reply. + + With formatting on, wait for the *formatted* turn (better text for the LLM); with it off the + server never sets ``turn_is_formatted``, so a bare end-of-turn is the cue — otherwise + ``--no-format-turns`` would make the agent never reply. + """ + if not bool(getattr(event, "end_of_turn", False)): + return False + return bool(getattr(event, "turn_is_formatted", False)) or not format_turns diff --git a/aai_cli/agent_cascade/engine.py b/aai_cli/agent_cascade/engine.py index 2bae77af..a048d062 100644 --- a/aai_cli/agent_cascade/engine.py +++ b/aai_cli/agent_cascade/engine.py @@ -39,12 +39,21 @@ from aai_cli.agent_cascade._runtime import ( Worker as _Worker, ) +from aai_cli.agent_cascade._runtime import ( + approval_deadline as _approval_deadline, +) from aai_cli.agent_cascade._runtime import ( detach_executor_threads_since as _detach_executor_threads_since, ) from aai_cli.agent_cascade._runtime import ( executor_threads as _executor_threads, ) +from aai_cli.agent_cascade._runtime import ( + final_tail as _final_tail, +) +from aai_cli.agent_cascade._runtime import ( + is_final_turn as _is_final_turn, +) from aai_cli.agent_cascade._runtime import ( new_history as _new_history, ) @@ -100,10 +109,12 @@ class CascadeSession: init=False, # pragma: no mutate ) # Rotates the per-tool spoken fillers across turns (fillers[_filler_index % len]) so the same - # tool doesn't repeat one phrase. The rotation test pins the exact phrase sequence, so a shifted - # default or mutated increment is caught; the field's `init=` is equivalent (never constructed - # positionally), like the sibling fields, hence the pragma. + # tool doesn't repeat one phrase. The rotation test pins the exact phrase sequence; the field's + # `init=` is equivalent (never constructed positionally), like the siblings, hence the pragma. _filler_index: int = field(default=0, init=False) # pragma: no mutate + # reply_started fired this turn (separate from _speaking) so an empty reply still brackets it. + # Reset per turn in _generate_reply (the killable line); init= is equivalent, hence the pragma. + _reply_started: bool = field(default=False, init=False) # pragma: no mutate def greet(self) -> None: """Speak the opening greeting (if any) and seed it into the history so the @@ -149,18 +160,16 @@ def _silence(self, *, audible_only: bool) -> bool: """Cancel an in-flight reply — signal the worker and flush queued audio — and report whether anything was cancelled. - The audible cases are always cancelled: the greeting (enqueued with no worker), a reply - in its speak-and-enqueue phase (``_speaking``), and the *tail* of a reply whose worker - has finished enqueuing but whose audio is still draining (``pending() > 0``). - - ``audible_only`` decides whether the *thinking* phase counts too. A spoken barge-in - passes ``False`` to cancel even a reply still being generated — the user has moved on, - so it must not speak once it lands. A UI interrupt passes ``True`` to leave thinking - alone: there's no audio to cut and the blocking graph call can't observe the stop flag, - so cancelling would be a no-op — and crucially, returning False there lets the TUI's - Ctrl-C fall through to *quit* rather than be swallowed (you could otherwise never - Ctrl-C while the agent thinks). Setting the stop flag is harmless when nothing runs (the - next ``_start_reply`` clears it). + The audible cases are always cancelled: the greeting (enqueued with no worker), a reply in + its speak-and-enqueue phase (``_speaking``), and the *tail* of a reply whose worker finished + enqueuing but whose audio is still draining (``pending() > 0``). + + ``audible_only`` decides whether the *thinking* phase counts too. A spoken barge-in passes + ``False`` to cancel even a reply still being generated. A UI interrupt passes ``True`` to + leave thinking alone (no audio to cut, the blocking graph call can't see the stop flag) — + and returning False there lets the TUI's Ctrl-C fall through to *quit* rather than be + swallowed. Setting the stop flag is harmless when nothing runs (next ``_start_reply`` clears + it). """ in_flight = self._speaking.is_set() or self.player.pending() > 0 if not audible_only: @@ -179,13 +188,12 @@ def _barge_in(self) -> None: def interrupt_reply(self) -> bool: """Silence a *speaking* reply without waiting for it; True if one was audible. - The UI-thread-safe counterpart to a spoken barge-in: the live TUI's Escape/Ctrl-C - calls this to silence the agent mid-reply (or mid-greeting) without the user having to - talk over it. Flushing the queued audio stops speech at once; a reply worker then sees - the stop flag, unwinds on its own, and emits ``reply_done`` so the front-end returns to - listening (the STT loop keeps running, so the next spoken turn is handled normally). - It deliberately does *not* join the worker — a join from the UI thread would deadlock - against the worker's own ``call_from_thread`` render hops. + The UI-thread-safe counterpart to a spoken barge-in: the live TUI's Escape/Ctrl-C calls + this to silence the agent mid-reply (or mid-greeting) without the user talking over it. + Flushing the queued audio stops speech at once; a reply worker then sees the stop flag, + unwinds, and emits ``reply_done`` so the front-end returns to listening (the STT loop keeps + running). It deliberately does *not* join the worker — a join from the UI thread would + deadlock against the worker's own ``call_from_thread`` render hops. It reports False (and does nothing) while the reply is merely *thinking*, so the TUI's Ctrl-C falls through to quit instead of being swallowed by a no-op interrupt. @@ -206,6 +214,7 @@ def _start_reply(self) -> None: def _generate_reply(self) -> None: """Stream the LLM reply, speak each clause as it lands, and record what was spoken (so a barge-in still leaves the history alternating).""" + self._reply_started = False messages: list[ChatCompletionMessageParam] = [ {"role": "system", "content": self.config.system_prompt}, *self.history, @@ -219,16 +228,26 @@ def produce() -> None: producer = threading.Thread(target=produce, daemon=True) # pragma: no mutate producer.start() spoken: list[str] = [] - tail = self._consume(events, before, spoken) - # On a clean finish ``tail`` is the unspoken remainder to flush as one last clause; on - # any cut (barge-in, TTS/leg failure, timeout) it is None and nothing more is spoken. - if tail is not None and tail.strip(): - self._speak([tail.strip()], spoken) - # Always record what was spoken — even after a mid-turn leg failure — so the history - # stays alternating and the next turn has the partial answer as context. - self._record_spoken(spoken) - self._speaking.clear() - self.renderer.reply_done(interrupted=self._stop.is_set()) + try: + tail = self._consume(events, before, spoken) + # On a clean finish ``tail`` is the unspoken remainder to flush as one last clause; on a + # cut (barge-in, TTS/leg failure, timeout) it is None and nothing more is spoken. + if tail is not None and tail.strip(): + self._speak([tail.strip()], spoken) + finally: + # Always finalize the turn: clearing the gate keeps a failure between an ApprovalPause's + # active/inactive events from wedging every later turn (and history stays alternating). + self._set_awaiting_approval(active=False) + self._record_spoken(spoken) + self._speaking.clear() + self._emit_reply_started() # bracket reply_done even for an empty (silent) reply + self.renderer.reply_done(interrupted=self._stop.is_set()) + + def _emit_reply_started(self) -> None: + """Fire ``reply_started`` once per turn (idempotent), so every ``reply_done`` is bracketed.""" + if not self._reply_started: + self._reply_started = True + self.renderer.reply_started() def _set_awaiting_approval(self, *, active: bool) -> None: """Arm/disarm the voice-approval gate: while armed, ``on_turn`` routes the next final @@ -246,6 +265,7 @@ def _consume( failure, or a leg failure/timeout — which also surfaces the error).""" deadline: float | None = time.monotonic() + _REPLY_TIMEOUT_SECONDS buffer = "" + held: list[str] = [] # post-tool narration, joined once at end (O(n) vs per-delta concat) spoke_filler = False # only the FIRST tool call of a turn says a spoken filler used_tool = False # once a tool ran, hold text unspoken so only the final answer is read while True: @@ -257,7 +277,7 @@ def _consume( self._surface_error(item.error, started=self._speaking.is_set()) return None if isinstance(item, _Done): - return buffer + return _final_tail(buffer, held, used_tool=used_tool) if isinstance(item, brain.ApprovalPause): deadline = _approval_deadline(item) self._set_awaiting_approval(active=item.active) @@ -267,30 +287,33 @@ def _consume( return None spoke_filler = True used_tool = True - buffer = "" # drop any unspoken preamble — the answer comes after the tool + buffer = "" # drop unspoken preamble + inter-tool narration; the answer follows + held.clear() continue if self._stop.is_set(): return None - # item is a streamed SpeechDelta (every other case returned or continued above). - tail = self._speak_delta(item, buffer, spoken, used_tool=used_tool) + # item is a streamed SpeechDelta (every other case returned/continued above). + tail = self._speak_delta(item, buffer, held, spoken, used_tool=used_tool) if tail is None: return None buffer = tail def _speak_delta( - self, item: brain.SpeechDelta, buffer: str, spoken: list[str], *, used_tool: bool + self, + item: brain.SpeechDelta, + buffer: str, + held: list[str], + spoken: list[str], + *, + used_tool: bool, ) -> str | None: - """Fold one streamed delta into the running buffer and speak any completed clauses. - - Before any tool call, clauses stream out as they land (low-latency speech). *After* a tool - call (``used_tool``) the deep agent tends to narrate verbose planning between tool calls; - that text is held in the buffer unspoken and discarded at the next tool call, so only the - final answer — whatever remains buffered when the stream finishes — is ever read aloud. - - Marks the reply as speaking on the first spoken delta (so a UI interrupt can cut it). - Returns the new buffer, or ``None`` if a TTS failure cut the turn (the caller aborts).""" + """Fold one streamed delta into the reply, speaking any completed clauses. Pre-tool, clauses + stream out as they land and the buffer is returned. *After* a tool call (``used_tool``) the + verbose planning is appended to ``held`` unspoken (joined once at end of stream, so only the + final answer is read — O(n)). Returns the new buffer, or ``None`` on a TTS failure.""" if used_tool: - return buffer + item.text + held.append(item.text) + return buffer self._mark_speaking() buffer += item.text chunks, buffer = pop_clauses(buffer, min_chars=_MIN_CLAUSE_CHARS) @@ -312,7 +335,7 @@ def _mark_speaking(self) -> None: filler. Sets ``_speaking`` (so a UI interrupt can cut it) and fires ``reply_started`` once.""" if not self._speaking.is_set(): self._speaking.set() - self.renderer.reply_started() + self._emit_reply_started() def _speak_filler(self, fillers: tuple[str, ...]) -> bool: """Say a short spoken filler ("Let me check") for the first tool call of a turn, so a @@ -408,7 +431,7 @@ def _surface_error(self, exc: CLIError, *, started: bool) -> None: spoken text already explains the turn). The caller still finalizes the turn.""" self._record_error(exc) if not started: - self.renderer.reply_started() + self._emit_reply_started() self.renderer.agent_transcript(f"(error: {exc.message})", interrupted=False) def _record_error(self, exc: CLIError) -> None: @@ -424,25 +447,6 @@ def shutdown(self) -> None: self._join_reply() -def _approval_deadline(pause: brain.ApprovalPause) -> float | None: - """The reply deadline across a write-approval pause: ``None`` (clock suspended) while the - user is deciding on a gated write — a slow y/n keypress must not trip the reply timeout — and - a fresh finite deadline once answered.""" - return None if pause.active else time.monotonic() + _REPLY_TIMEOUT_SECONDS - - -def _is_final_turn(event: object, *, format_turns: bool) -> bool: - """True for an end-of-turn that's the cue to generate a reply. - - With formatting on, wait for the *formatted* turn (better text for the LLM); - with it off the server never sets ``turn_is_formatted``, so a bare end-of-turn - is the cue — otherwise ``--no-format-turns`` would make the agent never reply. - """ - if not bool(getattr(event, "end_of_turn", False)): - return False - return bool(getattr(event, "turn_is_formatted", False)) or not format_turns - - def run_cascade( *, renderer: Renderer, diff --git a/aai_cli/agent_cascade/risk.py b/aai_cli/agent_cascade/risk.py index 84edf0ec..e89d2d05 100644 --- a/aai_cli/agent_cascade/risk.py +++ b/aai_cli/agent_cascade/risk.py @@ -2,8 +2,9 @@ The approval modal already shows *what* a tool will do; for the genuinely dangerous calls it also shows *why to look twice* — a one-line warning, the way deepagents-code badges suspicious -shell commands and URLs. Purely advisory (the real SSRF guard lives in ``fetch_tool``); this -only nudges the human reviewing a manual approval. Pure functions so they unit-test cleanly. +shell commands and URLs. Advisory: it nudges the human reviewing a manual approval. The real +SSRF guard is :func:`url_is_internal`, which ``webpage_tool`` consults to refuse an internal +fetch outright. Pure functions so they unit-test cleanly. """ from __future__ import annotations @@ -11,9 +12,10 @@ import re from collections.abc import Mapping -# The fetch tool's name, inlined here — its defining module lived in the removed -# `assembly code` agent. Risk scoring is purely advisory. -FETCH_TOOL_NAME = "fetch_url" +# The live agent's read-a-URL tool name (``webpage_tool.READ_URL_TOOL_NAME``), inlined to avoid a +# circular import (``webpage_tool`` consults this module for the SSRF check). Risk scoring is +# advisory; the enforced SSRF refusal lives in :func:`url_is_internal`. +URL_TOOL_NAME = "read_url" # Shell fragments that can destroy data, escalate privileges, or pipe a remote script straight # into a shell — the classic "are you sure?" cases. Word-ish boundaries avoid matching inside @@ -53,17 +55,26 @@ def _url_warning(url: str) -> str | None: return None +def url_is_internal(url: str) -> bool: + """True when ``url`` is SSRF-relevant — a local/internal address or a ``file://`` target. + + The live ``read_url`` tool refuses these outright (the enforced network-fetch guard, since an + agent-chosen URL can be steered to cloud metadata / internal services by web content it read). + """ + return _url_warning(url) is not None + + def risk_warning(name: str, args: Mapping[str, object]) -> str | None: """A one-line caution for a risky tool call, or ``None`` when nothing stands out. - Flags destructive/privileged shell commands (``execute``) and fetches aimed at local or + Flags destructive/privileged shell commands (``execute``) and URL reads aimed at local or ``file://`` targets; everything else returns ``None``. """ if name == "execute": command = args.get("command") if isinstance(command, str): return _shell_warning(command) - elif name == FETCH_TOOL_NAME: + elif name == URL_TOOL_NAME: url = args.get("url") if isinstance(url, str): return _url_warning(url) diff --git a/aai_cli/agent_cascade/sandbox.py b/aai_cli/agent_cascade/sandbox.py index 6cf132df..3c232d54 100644 --- a/aai_cli/agent_cascade/sandbox.py +++ b/aai_cli/agent_cascade/sandbox.py @@ -13,6 +13,7 @@ from __future__ import annotations import platform +import re import shutil import subprocess import tempfile @@ -33,6 +34,37 @@ CWD_WRITE_DENY: tuple[str, ...] = (".git/hooks",) # Shell rc files denied for writes (only inside the write region when cwd == $HOME). SHELL_RC: tuple[str, ...] = (".bashrc", ".zshrc", ".profile", ".bash_profile") +# Which read-denied secret names are directories (the rest are plain files). bwrap masks a +# directory with an empty tmpfs but a file with a /dev/null bind — a tmpfs mountpoint must be a +# directory and a /dev/null bind must be a file — so the two kinds need different masks. +_SECRET_DIRS: frozenset[str] = frozenset({".ssh", ".aws", ".gnupg", ".claude"}) + + +def _sbpl_str(path: str) -> str: + """Escape a path for embedding in an SBPL string literal (``"…"``): backslash and quote. + + A launch directory can contain either, and an unescaped ``"`` would terminate the literal + early, producing a profile ``sandbox-exec`` rejects (so every ``execute`` would then fail). + """ + return path.replace("\\", "\\\\").replace('"', '\\"') + + +def _sbpl_regex(path: str) -> str: + """Escape a path for use as a literal inside an SBPL ``#"…"`` regex literal. + + ``re.escape`` neutralizes regex metacharacters (``(``/``)``/``+``/``[``…) a path may contain — + without it ``/Users/me/My (Project)`` emits an invalid regex that breaks the whole profile — + and the lone ``"`` escape keeps the surrounding string literal intact. + """ + return re.escape(path).replace('"', '\\"') + + +def _mask_secret(target: str, name: str) -> list[str]: + """bwrap args to hide a secret ``target``: an empty tmpfs over a directory secret, a + ``/dev/null`` bind over a file secret (the two kinds need different bwrap directives).""" + if name in _SECRET_DIRS: + return ["--tmpfs", target] + return ["--ro-bind", "/dev/null", target] def render_seatbelt_profile( @@ -47,6 +79,9 @@ def render_seatbelt_profile( ) -> str: """Render an Apple Seatbelt (SBPL) profile: default-allow reads, deny secrets, writes only in cwd + tmp, no network. Last-match-wins, so the denies override the broad allows.""" + # Escape the interpolated paths once: a launch dir / tmp / home with regex or quote + # metacharacters would otherwise emit a profile sandbox-exec can't parse. + c, t, h = _sbpl_str(cwd), _sbpl_str(tmp), _sbpl_str(home) lines = [ "(version 1)", "(deny default)", @@ -54,15 +89,15 @@ def render_seatbelt_profile( "(allow process-fork)", "(allow file-read*)", ] - lines.extend(f'(deny file-read* (subpath "{home}/{name}"))' for name in home_secrets) + lines.extend(f'(deny file-read* (subpath "{h}/{name}"))' for name in home_secrets) # .env and .env.* under cwd, denied via regex; .claude/ via subpath. - lines.append(f'(deny file-read* (regex #"^{cwd}/\\.env($|\\.)"))') + lines.append(f'(deny file-read* (regex #"^{_sbpl_regex(cwd)}/\\.env($|\\.)"))') lines.extend( - f'(deny file-read* (subpath "{cwd}/{name}"))' for name in cwd_read_deny if name != ".env" + f'(deny file-read* (subpath "{c}/{name}"))' for name in cwd_read_deny if name != ".env" ) - lines.append(f'(allow file-write* (subpath "{cwd}") (subpath "{tmp}"))') - lines.extend(f'(deny file-write* (subpath "{cwd}/{name}"))' for name in cwd_write_deny) - lines.extend(f'(deny file-write* (subpath "{home}/{name}"))' for name in shell_rc) + lines.append(f'(allow file-write* (subpath "{c}") (subpath "{t}"))') + lines.extend(f'(deny file-write* (subpath "{c}/{name}"))' for name in cwd_write_deny) + lines.extend(f'(deny file-write* (subpath "{h}/{name}"))' for name in shell_rc) return "\n".join(lines) + "\n" @@ -94,12 +129,13 @@ def build_bwrap_argv( tmp, tmp, ] - # Mask credential stores under $HOME (tmpfs hides their contents). + # Mask credential stores under $HOME and the project-local secrets (best-effort; coarser than + # Seatbelt). Each path is masked by kind — an empty tmpfs over a directory, a /dev/null bind + # over a file — since bwrap can't tmpfs a file mountpoint or bind /dev/null onto a directory. for name in home_secrets: - argv += ["--tmpfs", f"{home}/{name}"] - # Project-local secrets: mask each path (best-effort; coarser than Seatbelt). + argv += _mask_secret(f"{home}/{name}", name) for name in cwd_read_deny: - argv += ["--ro-bind", "/dev/null", f"{cwd}/{name}"] + argv += _mask_secret(f"{cwd}/{name}", name) # Block writes to persistence paths inside cwd by re-binding them read-only. for name in cwd_write_deny: argv += ["--ro-bind", f"{cwd}/{name}", f"{cwd}/{name}"] @@ -148,6 +184,31 @@ def __init__(self, output: str, returncode: int | None) -> None: Runner = Callable[[list[str], str, int], CompletedProcessLike] +# The only environment variables the sandboxed command inherits. The OS sandbox blocks the +# network and read-denies credential *files*, but secrets such as ASSEMBLYAI_API_KEY ride in the +# *environment* too — an unrestricted env would hand the agent-run command every key to print into +# output the model reads or write into a cwd file. So the child env is a minimal non-secret +# allowlist, never a copy of the parent environment. +_ENV_ALLOWLIST: tuple[str, ...] = ( + "PATH", + "HOME", + "USER", + "SHELL", + "LANG", + "LC_ALL", + "LC_CTYPE", + "TERM", + "TMPDIR", + "TZ", +) + + +def _sandbox_env() -> dict[str, str]: + """A minimal child environment for the sandboxed command: only the non-secret basics in + :data:`_ENV_ALLOWLIST`, so no inherited API key or token leaks to agent-run code.""" + parent = child_env() + return {name: parent[name] for name in _ENV_ALLOWLIST if name in parent} + def default_runner(argv: list[str], cwd: str, timeout: int) -> CompletedProcessLike: """Run ``argv`` with combined output, in ``cwd``, time-bounded, with a minimal child env. @@ -162,7 +223,7 @@ def default_runner(argv: list[str], cwd: str, timeout: int) -> CompletedProcessL stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, - env=child_env(), + env=_sandbox_env(), check=False, ) except subprocess.TimeoutExpired as exc: diff --git a/aai_cli/agent_cascade/spoken_approval.py b/aai_cli/agent_cascade/spoken_approval.py index fb57d908..c8b6f635 100644 --- a/aai_cli/agent_cascade/spoken_approval.py +++ b/aai_cli/agent_cascade/spoken_approval.py @@ -38,6 +38,13 @@ def interpret_spoken_approval(transcript: str) -> bool: return bool(_AFFIRMATIVE.search(text)) +# Running code is never voice-approvable: an STT-misheard affirmative must not be able to run an +# arbitrary command. The advisory ``risk`` regex only catches a handful of obviously-destructive +# shells, so the enforcement boundary keys off the tool *class* — every ``execute`` demands a +# deliberate keypress, regardless of how benign its command text looks. +_KEYPRESS_ONLY_TOOLS: tuple[str, ...] = ("execute",) + + def spoken_decision( name: str, args: Mapping[str, object], @@ -48,10 +55,11 @@ def spoken_decision( """How a spoken transcript should resolve an open approval: True approve, False reject, or None *ignore the voice* (the destructive tier — require the keyboard). - Destructive tier (``risk.risk_warning`` fires, e.g. ``rm -rf``/``sudo``) → None, so an STT - mishearing can never green-light it; the keypress is the only channel. Otherwise the grammar + Keypress-only tier → None, so an STT mishearing can never green-light it; the keypress is the + only channel. That tier is any ``execute`` (running code is the highest-stakes mutation) plus + anything the advisory ``risk.risk_warning`` flags (``rm -rf``/``sudo``…). Otherwise the grammar decides: an unambiguous affirmative approves, everything else rejects (fail-safe). """ - if warn(name, args) is not None: + if name in _KEYPRESS_ONLY_TOOLS or warn(name, args) is not None: return None return interpret_spoken_approval(transcript) diff --git a/aai_cli/agent_cascade/text.py b/aai_cli/agent_cascade/text.py index e336fc2a..762008f1 100644 --- a/aai_cli/agent_cascade/text.py +++ b/aai_cli/agent_cascade/text.py @@ -17,20 +17,25 @@ def _is_boundary(text: str, index: int) -> bool: - """True when the char at ``index`` ends a clause: a terminator/separator that is the - last char or is followed by whitespace (so a '.' inside "$3.50" never splits).""" - return index + 1 == len(text) or text[index + 1].isspace() + """True when the char at ``index`` ends a clause: it is *followed by whitespace*. + + End-of-buffer is deliberately NOT a boundary: :func:`pop_clauses` is fed partial streamed + chunks, so a terminator that merely sits at the current end may be mid-token ("$3." before + "50" streams in) — flushing it would split a number. The remainder is re-buffered and the + caller flushes the final tail at end-of-stream, so nothing is lost. + """ + return index + 1 < len(text) and text[index + 1].isspace() def pop_clauses(buffer: str, *, min_chars: int) -> tuple[list[str], str]: """Pull complete speakable clauses off the front of ``buffer`` for incremental TTS. - A hard terminator (``.``/``!``/``?``) followed by whitespace (or end-of-buffer) always - ends a clause; a soft separator (``,``/``;``/``:``) ends one only when the clause built - since the last boundary is at least ``min_chars`` long, so a tiny fragment ("Yes,") - isn't synthesized on its own. Returns the flushed clauses (each stripped, never blank) - and the still-incomplete remainder to keep buffering. The caller flushes the final tail - at end-of-stream. + A hard terminator (``.``/``!``/``?``) followed by whitespace always ends a clause; a soft + separator (``,``/``;``/``:``) ends one only when the clause built since the last boundary is + at least ``min_chars`` long, so a tiny fragment ("Yes,") isn't synthesized on its own. A + terminator at the very end of ``buffer`` is held (it may be mid-token under streaming), so + returns the flushed clauses (each stripped, never blank) and the still-incomplete remainder + to keep buffering. The caller flushes the final tail at end-of-stream. """ clauses: list[str] = [] start = 0 diff --git a/aai_cli/agent_cascade/weather_tool.py b/aai_cli/agent_cascade/weather_tool.py index 2d30d99c..dc3335f1 100644 --- a/aai_cli/agent_cascade/weather_tool.py +++ b/aai_cli/agent_cascade/weather_tool.py @@ -85,7 +85,7 @@ def _c_to_f(celsius: float) -> int: def _get_json(url: str) -> object: """GET ``url`` and return its parsed JSON body (the default network seam).""" - import httpx + import httpx2 as httpx response = httpx.get(url, timeout=_TIMEOUT) response.raise_for_status() diff --git a/aai_cli/agent_cascade/webpage_tool.py b/aai_cli/agent_cascade/webpage_tool.py index 3ec54f15..4e76ab9f 100644 --- a/aai_cli/agent_cascade/webpage_tool.py +++ b/aai_cli/agent_cascade/webpage_tool.py @@ -17,6 +17,7 @@ from collections.abc import Callable from typing import TYPE_CHECKING +from aai_cli.agent_cascade.risk import url_is_internal from aai_cli.core.errors import UsageError if TYPE_CHECKING: @@ -62,6 +63,11 @@ def build_read_url_tool(read: Reader = _read) -> BaseTool: def read_url(url: str) -> str: """Read a web page or PDF by URL and return its text. Use to read an article, document, or page you have the URL for (e.g. from a web-search result).""" + if url_is_internal(url): + # SSRF guard: an agent-chosen URL (often surfaced by web content it just read) must + # never reach a local/internal address or a local file, where it could read cloud + # metadata or internal services and speak the contents back. + return "I can't open that address." try: return _format(read(url)) except UsageError: diff --git a/tests/test_agent_cascade_engine.py b/tests/test_agent_cascade_engine.py index 2ae6d01a..82c8807c 100644 --- a/tests/test_agent_cascade_engine.py +++ b/tests/test_agent_cascade_engine.py @@ -71,6 +71,26 @@ def test_on_turn_final_renders_and_replies(): assert ("reply_done", False) in renderer.calls +def test_empty_reply_still_brackets_reply_done_with_reply_started(): + # A reply that produces no clause and no tool call must still emit reply_started before + # reply_done, so a stream consumer never sees an unmatched reply_done and the TUI resets. + session, renderer, _player = make_session(stream_reply=lambda m: []) + session._generate_reply() + assert renderer.calls.count(("reply_started",)) == 1 + started = renderer.calls.index(("reply_started",)) + done = renderer.calls.index(("reply_done", False)) + assert started < done + + +def test_reply_started_fires_exactly_once_per_turn(): + # The idempotent _emit_reply_started must not double-fire: a multi-clause spoken reply emits + # reply_started once, and the per-turn reset means a second turn emits it again (once). + session, renderer, _player = make_session(stream_reply=_deltas("One. ", "Two. ")) + session._generate_reply() + session._generate_reply() + assert renderer.calls.count(("reply_started",)) == 2 + + def test_reply_forwards_tool_calls_to_the_renderer(): def stream(messages): yield ToolNotice("Searching the web", ("Searching now.",)) diff --git a/tests/test_agent_cascade_files.py b/tests/test_agent_cascade_files.py index c4c11ab4..e80b2a8d 100644 --- a/tests/test_agent_cascade_files.py +++ b/tests/test_agent_cascade_files.py @@ -113,6 +113,24 @@ def spy(evts, deadline, before): assert seen[2] is not None # restored after ApprovalPause(active=False) +def test_turn_aborting_mid_approval_clears_the_awaiting_gate(): + # If the reply leg fails after ApprovalPause(active=True) but before the matching active=False + # (e.g. the approver/graph raises), the turn must still disarm the voice-approval gate — else + # on_turn would route every later transcript to the dead approval and wedge the session. + from aai_cli.core.errors import CLIError + + def stream(messages): + yield ApprovalPause(active=True) + raise CLIError("graph blew up during approval", error_type="agent_brain_error") + + session, renderer, _player = make_session(stream_reply=stream) + session._generate_reply() + + assert not session._awaiting_approval.is_set() # gate disarmed despite the mid-approval failure + assert ("reply_done", True) in renderer.calls or ("reply_done", False) in renderer.calls + assert session.error is not None # the failure was still surfaced + + def test_approval_deadline_suspends_then_restores_into_the_future(): # active=True suspends the clock (None); active=False restores a deadline in the FUTURE — # asserting it's ahead of now (not merely non-None) pins the + so the timeout actually fires. diff --git a/tests/test_agent_cascade_sandbox.py b/tests/test_agent_cascade_sandbox.py index 300013b3..15e1cc6a 100644 --- a/tests/test_agent_cascade_sandbox.py +++ b/tests/test_agent_cascade_sandbox.py @@ -65,6 +65,21 @@ def test_bwrap_argv_masks_home_secrets_and_git_hooks(): assert "/work/proj/.git/hooks" in joined # write blocked via ro-bind +def test_bwrap_masks_directories_with_tmpfs_and_files_with_dev_null(): + # bwrap can't tmpfs a file mountpoint nor bind /dev/null onto a directory, so each secret is + # masked by kind: directory secrets get --tmpfs, file secrets get --ro-bind /dev/null. + argv = sandbox.build_bwrap_argv("/work/proj", "/tmp", "echo hi", "/home/u") + # .claude and .ssh are directories -> tmpfs (the old code wrongly bound /dev/null over .claude). + assert _adjacent(argv, "--tmpfs", "/work/proj/.claude") + assert _adjacent(argv, "--tmpfs", "/home/u/.ssh") + # .env and .netrc/.npmrc are files -> /dev/null bind (the old code wrongly tmpfs'd the files). + assert _has_pair(argv, "--ro-bind", "/dev/null", "/work/proj/.env") + assert _has_pair(argv, "--ro-bind", "/dev/null", "/home/u/.netrc") + # And never the wrong directive for either kind. + assert not _has_pair(argv, "--ro-bind", "/dev/null", "/work/proj/.claude") + assert not _adjacent(argv, "--tmpfs", "/home/u/.netrc") + + def _has_pair(argv, flag, a, b): return any( argv[i] == flag and argv[i + 1] == a and argv[i + 2] == b for i in range(len(argv) - 2) @@ -87,6 +102,20 @@ def test_renderers_cover_the_same_denylists(): assert "/work/proj/.git/hooks" in bwrap +def test_seatbelt_profile_escapes_regex_metacharacters_in_cwd(): + # A launch dir with regex metacharacters must not corrupt the .env-deny regex literal: + # parens are escaped so sandbox-exec gets a valid profile (else every execute would fail). + profile = sandbox.render_seatbelt_profile("/work/My (Proj)", "/tmp", "/home/u") + assert r"\(Proj\)/\.env" in profile # parens neutralized inside the regex + assert "My (Proj)/\\.env" not in profile # never the raw, unescaped paren form + + +def test_seatbelt_profile_escapes_quotes_in_subpath(): + # A double-quote in a path would terminate the SBPL string literal early; it must be escaped. + profile = sandbox.render_seatbelt_profile('/work/a"b', "/tmp", "/home/u") + assert '/work/a\\"b' in profile + + def test_detect_capability_seatbelt_on_macos_with_binary(): cap = sandbox.detect_capability( system=lambda: "Darwin", which=lambda _n: "/usr/bin/sandbox-exec" @@ -141,6 +170,43 @@ def fake_run(argv: list[str], **kwargs: object) -> _Proc: assert captured["stderr"] == subprocess.STDOUT +def test_sandbox_env_allowlists_basics_and_drops_secrets(monkeypatch): + # The sandboxed command must not inherit secrets via the environment, even though the OS + # sandbox blocks the network and credential files. + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("ASSEMBLYAI_API_KEY", "sk-secret") + monkeypatch.setenv("FIRECRAWL_API_KEY", "fc-secret") + monkeypatch.setenv("SOME_TOKEN", "tok") + env = sandbox._sandbox_env() + assert env["PATH"] == "/usr/bin" # a non-secret basic is kept + assert "ASSEMBLYAI_API_KEY" not in env + assert "FIRECRAWL_API_KEY" not in env + assert "SOME_TOKEN" not in env + + +def test_default_runner_passes_scrubbed_env(monkeypatch): + import subprocess + + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setenv("ASSEMBLYAI_API_KEY", "sk-secret") + captured: dict[str, object] = {} + + class _Proc: + stdout = "out" + returncode = 0 + + def fake_run(argv: list[str], **kwargs: object) -> _Proc: + captured.update(kwargs) + return _Proc() + + monkeypatch.setattr(subprocess, "run", fake_run) + sandbox.default_runner(["echo", "hi"], "/work", 5) + env = captured["env"] + assert isinstance(env, dict) + assert "ASSEMBLYAI_API_KEY" not in env # the secret never reaches the sandboxed command + assert env.get("PATH") == "/usr/bin" + + def test_default_runner_handles_none_stdout(monkeypatch): import subprocess diff --git a/tests/test_agent_cascade_spoken_approval.py b/tests/test_agent_cascade_spoken_approval.py index d7a57d52..336d5a2c 100644 --- a/tests/test_agent_cascade_spoken_approval.py +++ b/tests/test_agent_cascade_spoken_approval.py @@ -64,7 +64,14 @@ def test_spoken_decision_destructive_ignores_voice(): assert spoken_decision("execute", {"command": "sudo make install"}, "yes, run it") is None -def test_spoken_decision_benign_execute_honors_voice(): - # A benign command (no risk warning) does take the spoken decision. - assert spoken_decision("execute", {"command": "pytest -q"}, "go ahead") is True - assert spoken_decision("execute", {"command": "pytest -q"}, "no") is False +def test_spoken_decision_execute_is_always_keypress_only(): + # Running code is never voice-approvable, even a benign command with an explicit affirmative: + # a misheard "go ahead" must not run arbitrary code, so execute always returns None. + assert spoken_decision("execute", {"command": "pytest -q"}, "go ahead") is None + assert spoken_decision("execute", {"command": "ls -la"}, "approve") is None + + +def test_spoken_decision_benign_file_write_honors_voice(): + # A non-execute write (sandbox-contained, git-recoverable) still takes the spoken decision. + assert spoken_decision("write_file", {"file_path": "n.txt"}, "go ahead") is True + assert spoken_decision("edit_file", {"file_path": "n.txt"}, "no") is False diff --git a/tests/test_agent_cascade_text.py b/tests/test_agent_cascade_text.py index a7164469..170e87fc 100644 --- a/tests/test_agent_cascade_text.py +++ b/tests/test_agent_cascade_text.py @@ -95,10 +95,24 @@ def test_pop_clauses_returns_nothing_for_an_unterminated_buffer(): def test_pop_clauses_strips_whitespace_from_each_flushed_clause(): - chunks, _remainder = pop_clauses(" Hi there. Next.", min_chars=1) + # Trailing space after the last "." so it's a confirmed boundary (a terminator at end-of-buffer + # is held, not flushed — see test_pop_clauses_holds_terminator_at_end_of_buffer). + chunks, _remainder = pop_clauses(" Hi there. Next. ", min_chars=1) assert chunks == ["Hi there.", "Next."] +def test_pop_clauses_holds_terminator_at_end_of_buffer(): + # A terminator sitting at the current end of a streamed chunk may be mid-token ("$3." before + # "50" arrives), so it is held in the remainder rather than split into its own clause. + chunks, remainder = pop_clauses("It costs $3.", min_chars=1) + assert chunks == [] + assert remainder == "It costs $3." + # Once the rest streams in and a real boundary follows, the number is spoken whole. + chunks, remainder = pop_clauses("It costs $3.50 today. ", min_chars=1) + assert chunks == ["It costs $3.50 today."] + assert remainder == " " + + @pytest.mark.parametrize("min_chars", [1, 25]) def test_pop_clauses_flushes_hard_terminator_regardless_of_min_chars(min_chars): # min_chars only gates SOFT separators; a sentence terminator always flushes. diff --git a/tests/test_agent_cascade_weather.py b/tests/test_agent_cascade_weather.py index 796e4cb9..4ff88156 100644 --- a/tests/test_agent_cascade_weather.py +++ b/tests/test_agent_cascade_weather.py @@ -187,9 +187,9 @@ def test_wmo_descriptions_table_is_exact(): def test_get_json_fetches_and_parses_via_httpx(monkeypatch): # Exercises the default network seam (httpx GET -> raise_for_status -> json), mocking - # httpx so no socket opens. Asserts the URL/timeout passthrough and that the response is + # httpx2 so no socket opens. Asserts the URL/timeout passthrough and that the response is # status-checked, so the mutation gate can't drop any of those lines silently. - import httpx + import httpx2 as httpx calls: dict[str, object] = {} diff --git a/tests/test_agent_cascade_webpage.py b/tests/test_agent_cascade_webpage.py index 92c014fa..059fc46a 100644 --- a/tests/test_agent_cascade_webpage.py +++ b/tests/test_agent_cascade_webpage.py @@ -85,3 +85,18 @@ def read(url: str) -> Article: tool = webpage_tool.build_read_url_tool(read=read) assert tool.invoke({"url": "https://example.com"}) == ("I couldn't read that page right now.") + + +def test_read_url_refuses_internal_addresses_without_fetching(): + # SSRF guard: an agent-chosen internal/file URL is refused outright and the fetch seam is + # never reached (so it can't read cloud metadata or local files and speak them back). + fetched: list[str] = [] + + def read(url: str) -> Article: + fetched.append(url) + return _article(text="secret") + + tool = webpage_tool.build_read_url_tool(read=read) + for url in ("http://169.254.169.254/latest/meta-data/", "file:///etc/passwd"): + assert tool.invoke({"url": url}) == "I can't open that address." + assert fetched == [] # the killer assertion: the fetch never ran diff --git a/tests/test_live_risk.py b/tests/test_live_risk.py index 4b4d348b..ea107b73 100644 --- a/tests/test_live_risk.py +++ b/tests/test_live_risk.py @@ -4,7 +4,7 @@ import pytest -from aai_cli.agent_cascade.risk import risk_warning +from aai_cli.agent_cascade.risk import risk_warning, url_is_internal @pytest.mark.parametrize( @@ -30,17 +30,25 @@ def test_risk_warning_passes_benign_shell() -> None: def test_risk_warning_flags_local_and_file_urls() -> None: - assert "local file" in (risk_warning("fetch_url", {"url": "file:///etc/passwd"}) or "") - assert "local/internal" in (risk_warning("fetch_url", {"url": "http://localhost:8080/x"}) or "") - assert "local/internal" in (risk_warning("fetch_url", {"url": "http://169.254.169.254/"}) or "") - assert "local/internal" in (risk_warning("fetch_url", {"url": "http://192.168.1.1/"}) or "") + assert "local file" in (risk_warning("read_url", {"url": "file:///etc/passwd"}) or "") + assert "local/internal" in (risk_warning("read_url", {"url": "http://localhost:8080/x"}) or "") + assert "local/internal" in (risk_warning("read_url", {"url": "http://169.254.169.254/"}) or "") + assert "local/internal" in (risk_warning("read_url", {"url": "http://192.168.1.1/"}) or "") def test_risk_warning_passes_public_url() -> None: - assert risk_warning("fetch_url", {"url": "https://example.com/docs"}) is None + assert risk_warning("read_url", {"url": "https://example.com/docs"}) is None def test_risk_warning_none_for_other_tools_and_non_string_args() -> None: assert risk_warning("write_file", {"file_path": "rm -rf /"}) is None # path, not a command assert risk_warning("execute", {"command": ["rm", "-rf"]}) is None # non-string is ignored - assert risk_warning("fetch_url", {"url": 123}) is None + assert risk_warning("read_url", {"url": 123}) is None + + +def test_url_is_internal_matches_the_warning_tier() -> None: + # The enforced SSRF predicate is True exactly for the local/file URLs the warning flags. + assert url_is_internal("file:///etc/passwd") is True + assert url_is_internal("http://169.254.169.254/latest/meta-data/") is True + assert url_is_internal("http://localhost:8080/x") is True + assert url_is_internal("https://example.com/docs") is False