View skill
You are an AI assistant that helps users with various tasks including coding, research, and analysis.
# Core Behavior
Be concise and direct. Answer in fewer than 4 lines unless the user asks for detail.
After working on a file, just stop - don't explain what you did unless asked.
Avoid unnecessary introductions or conclusions.
When you run non-trivial bash commands, briefly explain what they do.
## Proactiveness
Take action when asked, but don't surprise users with unrequested actions.
If asked how to approach something, answer first before taking action.
## Following Conventions
- Check existing code for libraries and frameworks before assuming availability
- Mimic existing code style, naming conventions, and patterns
- Never add comments unless asked
## Task Management
Use write_todos for complex multi-step tasks (3+ steps). Mark tasks in_progress before starting, completed immediately after finishing.
For simple 1-2 step tasks, just do them directly without todos.
## File Reading Best Practices
When exploring codebases or reading multiple files, use pagination to prevent context overflow.
**Pattern for codebase exploration:**
1. First scan: `read_file(path, limit=100)` - See file structure and key sections
2. Targeted read: `read_file(path, offset=100, limit=200)` - Read specific sections if needed
3. Full read: Only use `read_file(path)` without limit when necessary for editing
**When to paginate:**
- Reading any file >500 lines
- Exploring unfamiliar codebases (always start with limit=100)
- Reading multiple files in sequence
**When full read is OK:**
- Small files (<500 lines)
- Files you need to edit immediately after reading
## Working with Subagents (task tool)
When delegating to subagents:
- **Use filesystem for large I/O**: If input/output is large (>500 words), communicate via files
- **Parallelize independent work**: Spawn parallel subagents for independent tasks
- **Clear specifications**: Tell subagent exactly what format/structure you need
- **Main agent synthesizes**: Subagents gather/execute, main agent integrates results
## Tools
### shell
Execute shell commands. Always quote paths with spaces.
The bash command will be run from your current working directory.
Examples: `pytest /foo/bar/tests` (good), `cd /foo/bar && pytest tests` (bad)
### File Tools
- read_file: Read file contents (use absolute paths)
- edit_file: Replace exact strings in files (must read first, provide unique old_string)
- write_file: Create or overwrite files
- ls: List directory contents
- glob: Find files by pattern (e.g., "**/*.py")
- grep: Search file contents
Always use absolute paths starting with /.
### web_search
Search for documentation, error solutions, and code examples.
### http_request
Make HTTP requests to APIs (GET, POST, etc.).
## Code References
When referencing code, use format: `file_path:line_number`
## Documentation
- Do NOT create excessive markdown summary/documentation files after completing work
- Focus on the work itself, not documenting what you did
- Only create documentation when explicitly requested
## Bundled Sources
### __main__.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/__main__.py`
```python
"""Allow running the CLI as: python -m deepagents.cli."""
from deepagents_cli.main import cli_main
if __name__ == "__main__":
cli_main()
```
### _version.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/_version.py`
```python
"""Version information for deepagents-cli."""
__version__ = "0.0.13a2"
```
### app.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/app.py`
```python
"""Textual UI application for deepagents-cli."""
# ruff: noqa: BLE001, PLR0912, PLR2004, S110, SIM108
from __future__ import annotations
import asyncio
import contextlib
import subprocess
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar
from textual.app import App
from textual.binding import Binding, BindingType
from textual.containers import Container, VerticalScroll
from textual.css.query import NoMatches
from textual.events import Click, MouseUp # noqa: TC002 - used in type annotation
from textual.widgets import Static # noqa: TC002 - used at runtime
from deepagents_cli.clipboard import copy_selection_to_clipboard
from deepagents_cli.textual_adapter import TextualUIAdapter, execute_task_textual
from deepagents_cli.widgets.approval import ApprovalMenu
from deepagents_cli.widgets.chat_input import ChatInput
from deepagents_cli.widgets.loading import LoadingWidget
from deepagents_cli.widgets.messages import (
AssistantMessage,
ErrorMessage,
SystemMessage,
ToolCallMessage,
UserMessage,
)
from deepagents_cli.widgets.status import StatusBar
from deepagents_cli.widgets.welcome import WelcomeBanner
if TYPE_CHECKING:
from langgraph.pregel import Pregel
from textual.app import ComposeResult
from textual.worker import Worker
class TextualTokenTracker:
"""Token tracker that updates the status bar."""
def __init__(self, update_callback: callable, hide_callback: callable | None = None) -> None:
"""Initialize with callbacks to update the display."""
self._update_callback = update_callback
self._hide_callback = hide_callback
self.current_context = 0
def add(self, total_tokens: int, _output_tokens: int = 0) -> None:
"""Update token count from a response.
Args:
total_tokens: Total context tokens (input + output from usage_metadata)
_output_tokens: Unused, kept for backwards compatibility
"""
self.current_context = total_tokens
self._update_callback(self.current_context)
def reset(self) -> None:
"""Reset token count."""
self.current_context = 0
self._update_callback(0)
def hide(self) -> None:
"""Hide the token display (e.g., during streaming)."""
if self._hide_callback:
self._hide_callback()
def show(self) -> None:
"""Show the token display with current value (e.g., after interrupt)."""
self._update_callback(self.current_context)
class TextualSessionState:
"""Session state for the Textual app."""
def __init__(
self,
*,
auto_approve: bool = False,
thread_id: str | None = None,
) -> None:
"""Initialize session state.
Args:
auto_approve: Whether to auto-approve tool calls
thread_id: Optional thread ID (generates 8-char hex if not provided)
"""
self.auto_approve = auto_approve
self.thread_id = thread_id if thread_id else uuid.uuid4().hex[:8]
def reset_thread(self) -> str:
"""Reset to a new thread. Returns the new thread_id."""
self.thread_id = uuid.uuid4().hex[:8]
return self.thread_id
class DeepAgentsApp(App):
"""Main Textual application for deepagents-cli."""
TITLE = "DeepAgents"
CSS_PATH = "app.tcss"
ENABLE_COMMAND_PALETTE = False
# Slow down scroll speed (default is 3 lines per scroll event)
# Using 0.25 to require 4 scroll events per line - very smooth
SCROLL_SENSITIVITY_Y = 0.25
BINDINGS: ClassVar[list[BindingType]] = [
Binding("escape", "interrupt", "Interrupt", show=False, priority=True),
Binding("ctrl+c", "quit_or_interrupt", "Quit/Interrupt", show=False),
Binding("ctrl+d", "quit_app", "Quit", show=False, priority=True),
Binding("ctrl+t", "toggle_auto_approve", "Toggle Auto-Approve", show=False),
Binding(
"shift+tab", "toggle_auto_approve", "Toggle Auto-Approve", show=False, priority=True
),
Binding("ctrl+o", "toggle_tool_output", "Toggle Tool Output", show=False),
# Approval menu keys (handled at App level for reliability)
Binding("up", "approval_up", "Up", show=False),
Binding("k", "approval_up", "Up", show=False),
Binding("down", "approval_down", "Down", show=False),
Binding("j", "approval_down", "Down", show=False),
Binding("enter", "approval_select", "Select", show=False),
Binding("y", "approval_yes", "Yes", show=False),
Binding("1", "approval_yes", "Yes", show=False),
Binding("n", "approval_no", "No", show=False),
Binding("2", "approval_no", "No", show=False),
Binding("a", "approval_auto", "Auto", show=False),
Binding("3", "approval_auto", "Auto", show=False),
]
def __init__(
self,
*,
agent: Pregel | None = None,
assistant_id: str | None = None,
backend: Any = None, # noqa: ANN401 # CompositeBackend
auto_approve: bool = False,
cwd: str | Path | None = None,
thread_id: str | None = None,
initial_prompt: str | None = None,
**kwargs: Any,
) -> None:
"""Initialize the DeepAgents application.
Args:
agent: Pre-configured LangGraph agent (optional for standalone mode)
assistant_id: Agent identifier for memory storage
backend: Backend for file operations
auto_approve: Whether to start with auto-approve enabled
cwd: Current working directory to display
thread_id: Optional thread ID for session persistence
initial_prompt: Optional prompt to auto-submit when session starts
**kwargs: Additional arguments passed to parent
"""
super().__init__(**kwargs)
self._agent = agent
self._assistant_id = assistant_id
self._backend = backend
self._auto_approve = auto_approve
self._cwd = str(cwd) if cwd else str(Path.cwd())
# Avoid collision with App._thread_id
self._lc_thread_id = thread_id
self._initial_prompt = initial_prompt
self._status_bar: StatusBar | None = None
self._chat_input: ChatInput | None = None
self._quit_pending = False
self._session_state: TextualSessionState | None = None
self._ui_adapter: TextualUIAdapter | None = None
self._pending_approval: asyncio.Future | None = None
self._pending_approval_widget: Any = None
# Agent task tracking for interruption
self._agent_worker: Worker[None] | None = None
self._agent_running = False
self._loading_widget: LoadingWidget | None = None
self._token_tracker: TextualTokenTracker | None = None
def compose(self) -> ComposeResult:
"""Compose the application layout."""
# Main chat area with scrollable messages
with VerticalScroll(id="chat"):
yield WelcomeBanner(id="welcome-banner")
yield Container(id="messages") # Container can have children mounted
# Bottom app container - holds either ChatInput OR ApprovalMenu (swapped)
# This is OUTSIDE VerticalScroll so arrow keys work in approval
with Container(id="bottom-app-container"):
yield ChatInput(cwd=self._cwd, id="input-area")
# Status bar at bottom
yield StatusBar(cwd=self._cwd, id="status-bar")
async def on_mount(self) -> None:
"""Initialize components after mount."""
self._status_bar = self.query_one("#status-bar", StatusBar)
self._chat_input = self.query_one("#input-area", ChatInput)
# Set initial auto-approve state
if self._auto_approve:
self._status_bar.set_auto_approve(enabled=True)
# Create session state
self._session_state = TextualSessionState(
auto_approve=self._auto_approve,
thread_id=self._lc_thread_id,
)
# Create token tracker that updates status bar
self._token_tracker = TextualTokenTracker(self._update_tokens, self._hide_tokens)
# Create UI adapter if agent is provided
if self._agent:
self._ui_adapter = TextualUIAdapter(
mount_message=self._mount_message,
update_status=self._update_status,
request_approval=self._request_approval,
on_auto_approve_enabled=self._on_auto_approve_enabled,
scroll_to_bottom=self._scroll_chat_to_bottom,
)
self._ui_adapter.set_token_tracker(self._token_tracker)
# Focus the input (autocomplete is now built into ChatInput)
self._chat_input.focus_input()
# Auto-submit initial prompt if provided
if self._initial_prompt and self._initial_prompt.strip():
# Use call_after_refresh to ensure UI is fully mounted before submitting
self.call_after_refresh(
lambda: asyncio.create_task(self._handle_user_message(self._initial_prompt))
)
def _update_status(self, message: str) -> None:
"""Update the status bar with a message."""
if self._status_bar:
self._status_bar.set_status_message(message)
def _update_tokens(self, count: int) -> None:
"""Update the token count in status bar."""
if self._status_bar:
self._status_bar.set_tokens(count)
def _hide_tokens(self) -> None:
"""Hide the token display during streaming."""
if self._status_bar:
self._status_bar.hide_tokens()
def _scroll_chat_to_bottom(self) -> None:
"""Scroll the chat area to the bottom.
Uses anchor() for smoother streaming - keeps scroll locked to bottom
as new content is added without causing visual jumps.
"""
try:
chat = self.query_one("#chat", VerticalScroll)
# anchor() locks scroll to bottom and auto-scrolls as content grows
# Much smoother than calling scroll_end() on every chunk
chat.anchor()
except NoMatches:
pass
async def _request_approval(
self,
action_request: Any, # noqa: ANN401
assistant_id: str | None,
) -> asyncio.Future:
"""Request user approval inline in the messages area.
Returns a Future that resolves to the user's decision.
Mounts ApprovalMenu in the messages area (inline with chat).
ChatInput stays visible - user can still see it.
If another approval is already pending, queue this one.
"""
loop = asyncio.get_running_loop()
result_future: asyncio.Future = loop.create_future()
# If there's already a pending approval, wait for it to complete first
if self._pending_approval_widget is not None:
while self._pending_approval_widget is not None: # noqa: ASYNC110
await asyncio.sleep(0.1)
# Create menu with unique ID to avoid conflicts
unique_id = f"approval-menu-{uuid.uuid4().hex[:8]}"
menu = ApprovalMenu(action_request, assistant_id, id=unique_id)
menu.set_future(result_future)
# Store reference
self._pending_approval_widget = menu
# Pause the loading spinner during approval
if self._loading_widget:
self._loading_widget.pause("Awaiting decision")
# Update status to show we're waiting for approval
self._update_status("Waiting for approval...")
# Mount approval inline in messages area (not replacing ChatInput)
try:
messages = self.query_one("#messages", Container)
await messages.mount(menu)
self._scroll_chat_to_bottom()
# Focus approval menu
self.call_after_refresh(menu.focus)
except Exception as e:
self._pending_approval_widget = None
if not result_future.done():
result_future.set_exception(e)
return result_future
def _on_auto_approve_enabled(self) -> None:
"""Callback when auto-approve mode is enabled via HITL."""
self._auto_approve = True
if self._status_bar:
self._status_bar.set_auto_approve(enabled=True)
if self._session_state:
self._session_state.auto_approve = True
async def on_chat_input_submitted(self, event: ChatInput.Submitted) -> None:
"""Handle submitted input from ChatInput widget."""
value = event.value
mode = event.mode
# Reset quit pending state on any input
self._quit_pending = False
# Handle different modes
if mode == "bash":
# Bash command - strip the ! prefix
await self._handle_bash_command(value.removeprefix("!"))
elif mode == "command":
# Slash command
await self._handle_command(value)
else:
# Normal message - will be sent to agent
await self._handle_user_message(value)
def on_chat_input_mode_changed(self, event: ChatInput.ModeChanged) -> None:
"""Update status bar when input mode changes."""
if self._status_bar:
self._status_bar.set_mode(event.mode)
async def on_approval_menu_decided(
self,
event: Any, # noqa: ANN401, ARG002
) -> None:
"""Handle approval menu decision - remove from messages and refocus input."""
# Remove ApprovalMenu using stored reference
if self._pending_approval_widget:
await self._pending_approval_widget.remove()
self._pending_approval_widget = None
# Resume the loading spinner after approval
if self._loading_widget:
self._loading_widget.resume()
# Clear status message
self._update_status("")
# Refocus the chat input
if self._chat_input:
self.call_after_refresh(self._chat_input.focus_input)
async def _handle_bash_command(self, command: str) -> None:
"""Handle a bash command (! prefix).
Args:
command: The bash command to execute
"""
# Mount user message showing the bash command
await self._mount_message(UserMessage(f"!{command}"))
# Execute the bash command (shell=True is intentional for user-requested bash)
try:
result = await asyncio.to_thread( # noqa: S604
subprocess.run,
command,
shell=True,
capture_output=True,
text=True,
cwd=self._cwd,
timeout=60,
)
output = result.stdout.strip()
if result.stderr:
output += f"\n[stderr]\n{result.stderr.strip()}"
if output:
# Display output as assistant message (uses markdown for code blocks)
msg = AssistantMessage(f"```\n{output}\n```")
await self._mount_message(msg)
await msg.write_initial_content()
else:
await self._mount_message(SystemMessage("Command completed (no output)"))
if result.returncode != 0:
await self._mount_message(ErrorMessage(f"Exit code: {result.returncode}"))
# Scroll to show the output
self._scroll_chat_to_bottom()
except subprocess.TimeoutExpired:
await self._mount_message(ErrorMessage("Command timed out (60s limit)"))
except OSError as e:
await self._mount_message(ErrorMessage(str(e)))
async def _handle_command(self, command: str) -> None:
"""Handle a slash command.
Args:
command: The slash command (including /)
"""
cmd = command.lower().strip()
if cmd in ("/quit", "/exit", "/q"):
self.exit()
elif cmd == "/help":
await self._mount_message(UserMessage(command))
await self._mount_message(
SystemMessage("Commands: /quit, /clear, /tokens, /threads, /help")
)
elif cmd == "/version":
await self._mount_message(UserMessage(command))
# Show CLI package version
try:
from deepagents_cli._version import __version__
await self._mount_message(SystemMessage(f"deepagents version: {__version__}"))
except Exception:
await self._mount_message(SystemMessage("deepagents version: unknown"))
elif cmd == "/clear":
await self._clear_messages()
if self._token_tracker:
self._token_tracker.reset()
# Clear status message (e.g., "Interrupted" from previous session)
self._update_status("")
# Reset thread to start fresh conversation
if self._session_state:
new_thread_id = self._session_state.reset_thread()
await self._mount_message(SystemMessage(f"Started new session: {new_thread_id}"))
elif cmd == "/threads":
await self._mount_message(UserMessage(command))
if self._session_state:
await self._mount_message(
SystemMessage(f"Current session: {self._session_state.thread_id}")
)
else:
await self._mount_message(SystemMessage("No active session"))
elif cmd == "/tokens":
await self._mount_message(UserMessage(command))
if self._token_tracker and self._token_tracker.current_context > 0:
count = self._token_tracker.current_context
if count >= 1000:
formatted = f"{count / 1000:.1f}K"
else:
formatted = str(count)
await self._mount_message(SystemMessage(f"Current context: {formatted} tokens"))
else:
await self._mount_message(SystemMessage("No token usage yet"))
else:
await self._mount_message(UserMessage(command))
await self._mount_message(SystemMessage(f"Unknown command: {cmd}"))
async def _handle_user_message(self, message: str) -> None:
"""Handle a user message to send to the agent.
Args:
message: The user's message
"""
# Mount the user message
await self._mount_message(UserMessage(message))
# Check if agent is available
if self._agent and self._ui_adapter and self._session_state:
# Show loading widget
self._loading_widget = LoadingWidget("Thinking")
await self._mount_message(self._loading_widget)
self._agent_running = True
# Disable cursor blink while agent is working
if self._chat_input:
self._chat_input.set_cursor_active(active=False)
# Use run_worker to avoid blocking the main event loop
# This allows the UI to remain responsive during agent execution
self._agent_worker = self.run_worker(
self._run_agent_task(message),
exclusive=False,
)
else:
await self._mount_message(
SystemMessage("Agent not configured. Run with --agent flag or use standalone mode.")
)
async def _run_agent_task(self, message: str) -> None:
"""Run the agent task in a background worker.
This runs in a worker thread so the main event loop stays responsive.
"""
try:
await execute_task_textual(
user_input=message,
agent=self._agent,
assistant_id=self._assistant_id,
session_state=self._session_state,
adapter=self._ui_adapter,
backend=self._backend,
)
except Exception as e:
await self._mount_message(ErrorMessage(f"Agent error: {e}"))
finally:
# Clean up loading widget and agent state
await self._cleanup_agent_task()
async def _cleanup_agent_task(self) -> None:
"""Clean up after agent task completes or is cancelled."""
self._agent_running = False
self._agent_worker = None
# Remove loading widget if present
if self._loading_widget:
with contextlib.suppress(Exception):
await self._loading_widget.remove()
self._loading_widget = None
# Re-enable cursor blink now that agent is done
if self._chat_input:
self._chat_input.set_cursor_active(active=True)
# Ensure token display is restored (in case of early cancellation)
if self._token_tracker:
self._token_tracker.show()
async def _mount_message(self, widget: Static) -> None:
"""Mount a message widget to the messages area.
Args:
widget: The message widget to mount
"""
try:
messages = self.query_one("#messages", Container)
await messages.mount(widget)
# Scroll to bottom
chat = self.query_one("#chat", VerticalScroll)
chat.scroll_end(animate=False)
except NoMatches:
pass
async def _clear_messages(self) -> None:
"""Clear the messages area."""
try:
messages = self.query_one("#messages", Container)
await messages.remove_children()
except NoMatches:
# Widget not found - can happen during shutdown
pass
def action_quit_or_interrupt(self) -> None:
"""Handle Ctrl+C - interrupt agent, reject approval, or quit on double press.
Priority order:
1. If agent is running, interrupt it (preserve input)
2. If approval menu is active, reject it
3. If double press (quit_pending), quit
4. Otherwise show quit hint
"""
# If agent is running, interrupt it
if self._agent_running and self._agent_worker:
self._agent_worker.cancel()
self._quit_pending = False
return
# If approval menu is active, reject it
if self._pending_approval_widget:
self._pending_approval_widget.action_select_reject()
self._quit_pending = False
return
# Double Ctrl+C to quit
if self._quit_pending:
self.exit()
else:
self._quit_pending = True
self.notify("Press Ctrl+C again to quit", timeout=3)
def action_interrupt(self) -> None:
"""Handle escape key - interrupt agent or reject approval.
This is the primary way to stop a running agent.
"""
# If agent is running, interrupt it
if self._agent_running and self._agent_worker:
self._agent_worker.cancel()
return
# If approval menu is active, reject it
if self._pending_approval_widget:
self._pending_approval_widget.action_select_reject()
def action_quit_app(self) -> None:
"""Handle quit action (Ctrl+D)."""
self.exit()
def action_toggle_auto_approve(self) -> None:
"""Toggle auto-approve mode."""
self._auto_approve = not self._auto_approve
if self._status_bar:
self._status_bar.set_auto_approve(enabled=self._auto_approve)
if self._session_state:
self._session_state.auto_approve = self._auto_approve
def action_toggle_tool_output(self) -> None:
"""Toggle expand/collapse of the most recent tool output."""
# Find all tool messages with output, get the most recent one
try:
tool_messages = list(self.query(ToolCallMessage))
# Find ones with output, toggle the most recent
for tool_msg in reversed(tool_messages):
if tool_msg.has_output:
tool_msg.toggle_output()
return
except Exception:
pass
# Approval menu action handlers (delegated from App-level bindings)
# NOTE: These only activate when approval widget is pending AND input is not focused
def action_approval_up(self) -> None:
"""Handle up arrow in approval menu."""
# Only handle if approval is active (input handles its own up for history/completion)
if self._pending_approval_widget and not self._is_input_focused():
self._pending_approval_widget.action_move_up()
def action_approval_down(self) -> None:
"""Handle down arrow in approval menu."""
if self._pending_approval_widget and not self._is_input_focused():
self._pending_approval_widget.action_move_down()
def action_approval_select(self) -> None:
"""Handle enter in approval menu."""
# Only handle if approval is active AND input is not focused
if self._pending_approval_widget and not self._is_input_focused():
self._pending_approval_widget.action_select()
def _is_input_focused(self) -> bool:
"""Check if the chat input (or its text area) has focus."""
if not self._chat_input:
return False
focused = self.focused
if focused is None:
return False
# Check if focused widget is the text area inside chat input
return focused.id == "chat-input" or focused in self._chat_input.walk_children()
def action_approval_yes(self) -> None:
"""Handle yes/1 in approval menu."""
if self._pending_approval_widget:
self._pending_approval_widget.action_select_approve()
def action_approval_no(self) -> None:
"""Handle no/2 in approval menu."""
if self._pending_approval_widget:
self._pending_approval_widget.action_select_reject()
def action_approval_auto(self) -> None:
"""Handle auto/3 in approval menu."""
if self._pending_approval_widget:
self._pending_approval_widget.action_select_auto()
def action_approval_escape(self) -> None:
"""Handle escape in approval menu - reject."""
if self._pending_approval_widget:
self._pending_approval_widget.action_select_reject()
def on_click(self, _event: Click) -> None:
"""Handle clicks anywhere in the terminal to focus on the command line."""
if not self._chat_input:
return
self.call_after_refresh(self._chat_input.focus_input)
def on_mouse_up(self, event: MouseUp) -> None: # noqa: ARG002
"""Copy selection to clipboard on mouse release."""
copy_selection_to_clipboard(self)
async def run_textual_app(
*,
agent: Pregel | None = None,
assistant_id: str | None = None,
backend: Any = None, # noqa: ANN401 # CompositeBackend
auto_approve: bool = False,
cwd: str | Path | None = None,
thread_id: str | None = None,
initial_prompt: str | None = None,
) -> None:
"""Run the Textual application.
Args:
agent: Pre-configured LangGraph agent (optional)
assistant_id: Agent identifier for memory storage
backend: Backend for file operations
auto_approve: Whether to start with auto-approve enabled
cwd: Current working directory to display
thread_id: Optional thread ID for session persistence
initial_prompt: Optional prompt to auto-submit when session starts
"""
app = DeepAgentsApp(
agent=agent,
assistant_id=assistant_id,
backend=backend,
auto_approve=auto_approve,
cwd=cwd,
thread_id=thread_id,
initial_prompt=initial_prompt,
)
await app.run_async()
if __name__ == "__main__":
import asyncio
asyncio.run(run_textual_app())
```
### clipboard.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/clipboard.py`
```python
"""Clipboard utilities for deepagents-cli."""
from __future__ import annotations
import base64
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from textual.app import App
_PREVIEW_MAX_LENGTH = 40
def _copy_osc52(text: str) -> None:
"""Copy text using OSC 52 escape sequence (works over SSH/tmux)."""
encoded = base64.b64encode(text.encode("utf-8")).decode("ascii")
osc52_seq = f"\033]52;c;{encoded}\a"
if os.environ.get("TMUX"):
osc52_seq = f"\033Ptmux;\033{osc52_seq}\033\\"
with open("/dev/tty", "w") as tty:
tty.write(osc52_seq)
tty.flush()
def _shorten_preview(texts: list[str]) -> str:
"""Shorten text for notification preview."""
dense_text = "⏎".join(texts).replace("\n", "⏎")
if len(dense_text) > _PREVIEW_MAX_LENGTH:
return f"{dense_text[: _PREVIEW_MAX_LENGTH - 1]}…"
return dense_text
def copy_selection_to_clipboard(app: App) -> None:
"""Copy selected text from app widgets to clipboard.
This queries all widgets for their text_selection and copies
any selected text to the system clipboard.
"""
selected_texts = []
for widget in app.query("*"):
if not hasattr(widget, "text_selection") or not widget.text_selection:
continue
selection = widget.text_selection
try:
result = widget.get_selection(selection)
except Exception:
continue
if not result:
continue
selected_text, _ = result
if selected_text.strip():
selected_texts.append(selected_text)
if not selected_texts:
return
combined_text = "\n".join(selected_texts)
# Try multiple clipboard methods
copy_methods = [_copy_osc52, app.copy_to_clipboard]
# Try pyperclip if available
try:
import pyperclip
copy_methods.insert(1, pyperclip.copy)
except ImportError:
pass
for copy_fn in copy_methods:
try:
copy_fn(combined_text)
# Use markup=False to prevent copied text from being parsed as Rich markup
app.notify(
f'"{_shorten_preview(selected_texts)}" copied',
severity="information",
timeout=2,
markup=False,
)
return
except Exception:
continue
# If all methods fail, still notify but warn
app.notify(
"Failed to copy - no clipboard method available",
severity="warning",
timeout=3,
)
```
### config.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/config.py`
```python
"""Configuration, constants, and model creation for the CLI."""
import os
import re
import sys
import uuid
from dataclasses import dataclass
from pathlib import Path
import dotenv
from rich.console import Console
from deepagents_cli._version import __version__
dotenv.load_dotenv()
# CRITICAL: Override LANGSMITH_PROJECT to route agent traces to separate project
# LangSmith reads LANGSMITH_PROJECT at invocation time, so we override it here
# and preserve the user's original value for shell commands
_deepagents_project = os.environ.get("DEEPAGENTS_LANGSMITH_PROJECT")
_original_langsmith_project = os.environ.get("LANGSMITH_PROJECT")
if _deepagents_project:
# Override LANGSMITH_PROJECT for agent traces
os.environ["LANGSMITH_PROJECT"] = _deepagents_project
# Now safe to import LangChain modules
from langchain_core.language_models import BaseChatModel
# Color scheme
COLORS = {
"primary": "#10b981",
"dim": "#6b7280",
"user": "#ffffff",
"agent": "#10b981",
"thinking": "#34d399",
"tool": "#fbbf24",
}
# ASCII art banner
DEEP_AGENTS_ASCII = f"""
██████╗ ███████╗ ███████╗ ██████╗
██╔══██╗ ██╔════╝ ██╔════╝ ██╔══██╗
██║ ██║ █████╗ █████╗ ██████╔╝
██║ ██║ ██╔══╝ ██╔══╝ ██╔═══╝
██████╔╝ ███████╗ ███████╗ ██║
╚═════╝ ╚══════╝ ╚══════╝ ╚═╝
█████╗ ██████╗ ███████╗ ███╗ ██╗ ████████╗ ███████╗
██╔══██╗ ██╔════╝ ██╔════╝ ████╗ ██║ ╚══██╔══╝ ██╔════╝
███████║ ██║ ███╗ █████╗ ██╔██╗ ██║ ██║ ███████╗
██╔══██║ ██║ ██║ ██╔══╝ ██║╚██╗██║ ██║ ╚════██║
██║ ██║ ╚██████╔╝ ███████╗ ██║ ╚████║ ██║ ███████║
╚═╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═╝ ╚═══╝ ╚═╝ ╚══════╝
v{__version__}
"""
# Interactive commands
COMMANDS = {
"clear": "Clear screen and reset conversation",
"help": "Show help information",
"tokens": "Show token usage for current session",
"quit": "Exit the CLI",
"exit": "Exit the CLI",
}
# Maximum argument length for display
MAX_ARG_LENGTH = 150
# Agent configuration
config = {"recursion_limit": 1000}
# Rich console instance
console = Console(highlight=False)
def _find_project_root(start_path: Path | None = None) -> Path | None:
"""Find the project root by looking for .git directory.
Walks up the directory tree from start_path (or cwd) looking for a .git
directory, which indicates the project root.
Args:
start_path: Directory to start searching from. Defaults to current working directory.
Returns:
Path to the project root if found, None otherwise.
"""
current = Path(start_path or Path.cwd()).resolve()
# Walk up the directory tree
for parent in [current, *list(current.parents)]:
git_dir = parent / ".git"
if git_dir.exists():
return parent
return None
def _find_project_agent_md(project_root: Path) -> list[Path]:
"""Find project-specific AGENTS.md file(s).
Checks two locations and returns ALL that exist:
1. project_root/.deepagents/AGENTS.md
2. project_root/AGENTS.md
Both files will be loaded and combined if both exist.
Args:
project_root: Path to the project root directory.
Returns:
List of paths to project AGENTS.md files (may contain 0, 1, or 2 paths).
"""
paths = []
# Check .deepagents/AGENTS.md (preferred)
deepagents_md = project_root / ".deepagents" / "AGENTS.md"
if deepagents_md.exists():
paths.append(deepagents_md)
# Check root AGENTS.md (fallback, but also include if both exist)
root_md = project_root / "AGENTS.md"
if root_md.exists():
paths.append(root_md)
return paths
@dataclass
class Settings:
"""Global settings and environment detection for deepagents-cli.
This class is initialized once at startup and provides access to:
- Available models and API keys
- Current project information
- Tool availability (e.g., Tavily)
- File system paths
Attributes:
project_root: Current project root directory (if in a git project)
openai_api_key: OpenAI API key if available
anthropic_api_key: Anthropic API key if available
tavily_api_key: Tavily API key if available
deepagents_langchain_project: LangSmith project name for deepagents agent tracing
user_langchain_project: Original LANGSMITH_PROJECT from environment (for user code)
"""
# API keys
openai_api_key: str | None
anthropic_api_key: str | None
google_api_key: str | None
tavily_api_key: str | None
# LangSmith configuration
deepagents_langchain_project: str | None # For deepagents agent tracing
user_langchain_project: str | None # Original LANGSMITH_PROJECT for user code
# Model configuration
model_name: str | None = None # Currently active model name
model_provider: str | None = None # Provider (openai, anthropic, google)
# Project information
project_root: Path | None = None
@classmethod
def from_environment(cls, *, start_path: Path | None = None) -> "Settings":
"""Create settings by detecting the current environment.
Args:
start_path: Directory to start project detection from (defaults to cwd)
Returns:
Settings instance with detected configuration
"""
# Detect API keys
openai_key = os.environ.get("OPENAI_API_KEY")
anthropic_key = os.environ.get("ANTHROPIC_API_KEY")
google_key = os.environ.get("GOOGLE_API_KEY")
tavily_key = os.environ.get("TAVILY_API_KEY")
# Detect LangSmith configuration
# DEEPAGENTS_LANGSMITH_PROJECT: Project for deepagents agent tracing
# user_langchain_project: User's ORIGINAL LANGSMITH_PROJECT (before override)
# Note: LANGSMITH_PROJECT was already overridden at module import time (above)
# so we use the saved original value, not the current os.environ value
deepagents_langchain_project = os.environ.get("DEEPAGENTS_LANGSMITH_PROJECT")
user_langchain_project = _original_langsmith_project # Use saved original!
# Detect project
project_root = _find_project_root(start_path)
return cls(
openai_api_key=openai_key,
anthropic_api_key=anthropic_key,
google_api_key=google_key,
tavily_api_key=tavily_key,
deepagents_langchain_project=deepagents_langchain_project,
user_langchain_project=user_langchain_project,
project_root=project_root,
)
@property
def has_openai(self) -> bool:
"""Check if OpenAI API key is configured."""
return self.openai_api_key is not None
@property
def has_anthropic(self) -> bool:
"""Check if Anthropic API key is configured."""
return self.anthropic_api_key is not None
@property
def has_google(self) -> bool:
"""Check if Google API key is configured."""
return self.google_api_key is not None
@property
def has_tavily(self) -> bool:
"""Check if Tavily API key is configured."""
return self.tavily_api_key is not None
@property
def has_deepagents_langchain_project(self) -> bool:
"""Check if deepagents LangChain project name is configured."""
return self.deepagents_langchain_project is not None
@property
def has_project(self) -> bool:
"""Check if currently in a git project."""
return self.project_root is not None
@property
def user_deepagents_dir(self) -> Path:
"""Get the base user-level .deepagents directory.
Returns:
Path to ~/.deepagents
"""
return Path.home() / ".deepagents"
def get_user_agent_md_path(self, agent_name: str) -> Path:
"""Get user-level AGENTS.md path for a specific agent.
Returns path regardless of whether the file exists.
Args:
agent_name: Name of the agent
Returns:
Path to ~/.deepagents/{agent_name}/AGENTS.md
"""
return Path.home() / ".deepagents" / agent_name / "AGENTS.md"
def get_project_agent_md_path(self) -> Path | None:
"""Get project-level AGENTS.md path.
Returns path regardless of whether the file exists.
Returns:
Path to {project_root}/.deepagents/AGENTS.md, or None if not in a project
"""
if not self.project_root:
return None
return self.project_root / ".deepagents" / "AGENTS.md"
@staticmethod
def _is_valid_agent_name(agent_name: str) -> bool:
"""Validate prevent invalid filesystem paths and security issues."""
if not agent_name or not agent_name.strip():
return False
# Allow only alphanumeric, hyphens, underscores, and whitespace
return bool(re.match(r"^[a-zA-Z0-9_\-\s]+$", agent_name))
def get_agent_dir(self, agent_name: str) -> Path:
"""Get the global agent directory path.
Args:
agent_name: Name of the agent
Returns:
Path to ~/.deepagents/{agent_name}
"""
if not self._is_valid_agent_name(agent_name):
msg = (
f"Invalid agent name: {agent_name!r}. "
"Agent names can only contain letters, numbers, hyphens, underscores, and spaces."
)
raise ValueError(msg)
return Path.home() / ".deepagents" / agent_name
def ensure_agent_dir(self, agent_name: str) -> Path:
"""Ensure the global agent directory exists and return its path.
Args:
agent_name: Name of the agent
Returns:
Path to ~/.deepagents/{agent_name}
"""
if not self._is_valid_agent_name(agent_name):
msg = (
f"Invalid agent name: {agent_name!r}. "
"Agent names can only contain letters, numbers, hyphens, underscores, and spaces."
)
raise ValueError(msg)
agent_dir = self.get_agent_dir(agent_name)
agent_dir.mkdir(parents=True, exist_ok=True)
return agent_dir
def ensure_project_deepagents_dir(self) -> Path | None:
"""Ensure the project .deepagents directory exists and return its path.
Returns:
Path to project .deepagents directory, or None if not in a project
"""
if not self.project_root:
return None
project_deepagents_dir = self.project_root / ".deepagents"
project_deepagents_dir.mkdir(parents=True, exist_ok=True)
return project_deepagents_dir
def get_user_skills_dir(self, agent_name: str) -> Path:
"""Get user-level skills directory path for a specific agent.
Args:
agent_name: Name of the agent
Returns:
Path to ~/.deepagents/{agent_name}/skills/
"""
return self.get_agent_dir(agent_name) / "skills"
def ensure_user_skills_dir(self, agent_name: str) -> Path:
"""Ensure user-level skills directory exists and return its path.
Args:
agent_name: Name of the agent
Returns:
Path to ~/.deepagents/{agent_name}/skills/
"""
skills_dir = self.get_user_skills_dir(agent_name)
skills_dir.mkdir(parents=True, exist_ok=True)
return skills_dir
def get_project_skills_dir(self) -> Path | None:
"""Get project-level skills directory path.
Returns:
Path to {project_root}/.deepagents/skills/, or None if not in a project
"""
if not self.project_root:
return None
return self.project_root / ".deepagents" / "skills"
def ensure_project_skills_dir(self) -> Path | None:
"""Ensure project-level skills directory exists and return its path.
Returns:
Path to {project_root}/.deepagents/skills/, or None if not in a project
"""
if not self.project_root:
return None
skills_dir = self.get_project_skills_dir()
skills_dir.mkdir(parents=True, exist_ok=True)
return skills_dir
# Global settings instance (initialized once)
settings = Settings.from_environment()
class SessionState:
"""Holds mutable session state (auto-approve mode, etc)."""
def __init__(self, auto_approve: bool = False, no_splash: bool = False) -> None:
self.auto_approve = auto_approve
self.no_splash = no_splash
self.exit_hint_until: float | None = None
self.exit_hint_handle = None
self.thread_id = str(uuid.uuid4())
def toggle_auto_approve(self) -> bool:
"""Toggle auto-approve and return new state."""
self.auto_approve = not self.auto_approve
return self.auto_approve
def get_default_coding_instructions() -> str:
"""Get the default coding agent instructions.
These are the immutable base instructions that cannot be modified by the agent.
Long-term memory (AGENTS.md) is handled separately by the middleware.
"""
default_prompt_path = Path(__file__).parent / "default_agent_prompt.md"
return default_prompt_path.read_text()
def _detect_provider(model_name: str) -> str | None:
"""Auto-detect provider from model name.
Args:
model_name: Model name to detect provider from
Returns:
Provider name (openai, anthropic, google) or None if can't detect
"""
model_lower = model_name.lower()
if any(x in model_lower for x in ["gpt", "o1", "o3"]):
return "openai"
if "claude" in model_lower:
return "anthropic"
if "gemini" in model_lower:
return "google"
return None
def create_model(model_name_override: str | None = None) -> BaseChatModel:
"""Create the appropriate model based on available API keys.
Uses the global settings instance to determine which model to create.
Args:
model_name_override: Optional model name to use instead of environment variable
Returns:
ChatModel instance (OpenAI, Anthropic, or Google)
Raises:
SystemExit if no API key is configured or model provider can't be determined
"""
# Determine provider and model
if model_name_override:
# Use provided model, auto-detect provider
provider = _detect_provider(model_name_override)
if not provider:
console.print(
f"[bold red]Error:[/bold red] Could not detect provider from model name: {model_name_override}"
)
console.print("\nSupported model name patterns:")
console.print(" - OpenAI: gpt-*, o1-*, o3-*")
console.print(" - Anthropic: claude-*")
console.print(" - Google: gemini-*")
sys.exit(1)
# Check if API key for detected provider is available
if provider == "openai" and not settings.has_openai:
console.print(
f"[bold red]Error:[/bold red] Model '{model_name_override}' requires OPENAI_API_KEY"
)
sys.exit(1)
elif provider == "anthropic" and not settings.has_anthropic:
console.print(
f"[bold red]Error:[/bold red] Model '{model_name_override}' requires ANTHROPIC_API_KEY"
)
sys.exit(1)
elif provider == "google" and not settings.has_google:
console.print(
f"[bold red]Error:[/bold red] Model '{model_name_override}' requires GOOGLE_API_KEY"
)
sys.exit(1)
model_name = model_name_override
# Use environment variable defaults, detect provider by API key priority
elif settings.has_openai:
provider = "openai"
model_name = os.environ.get("OPENAI_MODEL", "gpt-5-mini")
elif settings.has_anthropic:
provider = "anthropic"
model_name = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-5-20250929")
elif settings.has_google:
provider = "google"
model_name = os.environ.get("GOOGLE_MODEL", "gemini-3-pro-preview")
else:
console.print("[bold red]Error:[/bold red] No API key configured.")
console.print("\nPlease set one of the following environment variables:")
console.print(" - OPENAI_API_KEY (for OpenAI models like gpt-5-mini)")
console.print(" - ANTHROPIC_API_KEY (for Claude models)")
console.print(" - GOOGLE_API_KEY (for Google Gemini models)")
console.print("\nExample:")
console.print(" export OPENAI_API_KEY=your_api_key_here")
console.print("\nOr add it to your .env file.")
sys.exit(1)
# Store model info in settings for display
settings.model_name = model_name
settings.model_provider = provider
# Create and return the model
if provider == "openai":
from langchain_openai import ChatOpenAI
return ChatOpenAI(model=model_name)
if provider == "anthropic":
from langchain_anthropic import ChatAnthropic
return ChatAnthropic(
model_name=model_name,
max_tokens=20_000, # type: ignore[arg-type]
)
if provider == "google":
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(
model=model_name,
temperature=0,
max_tokens=None,
)
```
### file_ops.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/file_ops.py`
```python
"""Helpers for tracking file operations and computing diffs for CLI display."""
from __future__ import annotations
import difflib
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from deepagents.backends.utils import perform_string_replacement
from deepagents_cli.config import settings
if TYPE_CHECKING:
from deepagents.backends.protocol import BACKEND_TYPES
FileOpStatus = Literal["pending", "success", "error"]
@dataclass
class ApprovalPreview:
"""Data used to render HITL previews."""
title: str
details: list[str]
diff: str | None = None
diff_title: str | None = None
error: str | None = None
def _safe_read(path: Path) -> str | None:
"""Read file content, returning None on failure."""
try:
return path.read_text()
except (OSError, UnicodeDecodeError):
return None
def _count_lines(text: str) -> int:
"""Count lines in text, treating empty strings as zero lines."""
if not text:
return 0
return len(text.splitlines())
def compute_unified_diff(
before: str,
after: str,
display_path: str,
*,
max_lines: int | None = 800,
context_lines: int = 3,
) -> str | None:
"""Compute a unified diff between before and after content.
Args:
before: Original content
after: New content
display_path: Path for display in diff headers
max_lines: Maximum number of diff lines (None for unlimited)
context_lines: Number of context lines around changes (default 3)
Returns:
Unified diff string or None if no changes
"""
before_lines = before.splitlines()
after_lines = after.splitlines()
diff_lines = list(
difflib.unified_diff(
before_lines,
after_lines,
fromfile=f"{display_path} (before)",
tofile=f"{display_path} (after)",
lineterm="",
n=context_lines,
)
)
if not diff_lines:
return None
if max_lines is not None and len(diff_lines) > max_lines:
truncated = diff_lines[: max_lines - 1]
truncated.append("...")
return "\n".join(truncated)
return "\n".join(diff_lines)
@dataclass
class FileOpMetrics:
"""Line and byte level metrics for a file operation."""
lines_read: int = 0
start_line: int | None = None
end_line: int | None = None
lines_written: int = 0
lines_added: int = 0
lines_removed: int = 0
bytes_written: int = 0
@dataclass
class FileOperationRecord:
"""Track a single filesystem tool call."""
tool_name: str
display_path: str
physical_path: Path | None
tool_call_id: str | None
args: dict[str, Any] = field(default_factory=dict)
status: FileOpStatus = "pending"
error: str | None = None
metrics: FileOpMetrics = field(default_factory=FileOpMetrics)
diff: str | None = None
before_content: str | None = None
after_content: str | None = None
read_output: str | None = None
hitl_approved: bool = False
def resolve_physical_path(path_str: str | None, assistant_id: str | None) -> Path | None:
"""Convert a virtual/relative path to a physical filesystem path."""
if not path_str:
return None
try:
if assistant_id and path_str.startswith("/memories/"):
agent_dir = settings.get_agent_dir(assistant_id)
suffix = path_str.removeprefix("/memories/").lstrip("/")
return (agent_dir / suffix).resolve()
path = Path(path_str)
if path.is_absolute():
return path
return (Path.cwd() / path).resolve()
except (OSError, ValueError):
return None
def format_display_path(path_str: str | None) -> str:
"""Format a path for display."""
if not path_str:
return "(unknown)"
try:
path = Path(path_str)
if path.is_absolute():
return path.name or str(path)
return str(path)
except (OSError, ValueError):
return str(path_str)
def build_approval_preview(
tool_name: str,
args: dict[str, Any],
assistant_id: str | None,
) -> ApprovalPreview | None:
"""Collect summary info and diff for HITL approvals."""
path_str = str(args.get("file_path") or args.get("path") or "")
display_path = format_display_path(path_str)
physical_path = resolve_physical_path(path_str, assistant_id)
if tool_name == "write_file":
content = str(args.get("content", ""))
before = _safe_read(physical_path) if physical_path and physical_path.exists() else ""
after = content
diff = compute_unified_diff(before or "", after, display_path, max_lines=100)
additions = 0
if diff:
additions = sum(
1
for line in diff.splitlines()
if line.startswith("+") and not line.startswith("+++")
)
total_lines = _count_lines(after)
details = [
f"File: {path_str}",
"Action: Create new file" + (" (overwrites existing content)" if before else ""),
f"Lines to write: {additions or total_lines}",
]
return ApprovalPreview(
title=f"Write {display_path}",
details=details,
diff=diff,
diff_title=f"Diff {display_path}",
)
if tool_name == "edit_file":
if physical_path is None:
return ApprovalPreview(
title=f"Update {display_path}",
details=[f"File: {path_str}", "Action: Replace text"],
error="Unable to resolve file path.",
)
before = _safe_read(physical_path)
if before is None:
return ApprovalPreview(
title=f"Update {display_path}",
details=[f"File: {path_str}", "Action: Replace text"],
error="Unable to read current file contents.",
)
old_string = str(args.get("old_string", ""))
new_string = str(args.get("new_string", ""))
replace_all = bool(args.get("replace_all", False))
replacement = perform_string_replacement(before, old_string, new_string, replace_all)
if isinstance(replacement, str):
return ApprovalPreview(
title=f"Update {display_path}",
details=[f"File: {path_str}", "Action: Replace text"],
error=replacement,
)
after, occurrences = replacement
diff = compute_unified_diff(before, after, display_path, max_lines=None)
additions = 0
deletions = 0
if diff:
additions = sum(
1
for line in diff.splitlines()
if line.startswith("+") and not line.startswith("+++")
)
deletions = sum(
1
for line in diff.splitlines()
if line.startswith("-") and not line.startswith("---")
)
details = [
f"File: {path_str}",
f"Action: Replace text ({'all occurrences' if replace_all else 'single occurrence'})",
f"Occurrences matched: {occurrences}",
f"Lines changed: +{additions} / -{deletions}",
]
return ApprovalPreview(
title=f"Update {display_path}",
details=details,
diff=diff,
diff_title=f"Diff {display_path}",
)
return None
class FileOpTracker:
"""Collect file operation metrics during a CLI interaction."""
def __init__(self, *, assistant_id: str | None, backend: BACKEND_TYPES | None = None) -> None:
"""Initialize the tracker."""
self.assistant_id = assistant_id
self.backend = backend
self.active: dict[str | None, FileOperationRecord] = {}
self.completed: list[FileOperationRecord] = []
def start_operation(
self, tool_name: str, args: dict[str, Any], tool_call_id: str | None
) -> None:
if tool_name not in {"read_file", "write_file", "edit_file"}:
return
path_str = str(args.get("file_path") or args.get("path") or "")
display_path = format_display_path(path_str)
record = FileOperationRecord(
tool_name=tool_name,
display_path=display_path,
physical_path=resolve_physical_path(path_str, self.assistant_id),
tool_call_id=tool_call_id,
args=args,
)
if tool_name in {"write_file", "edit_file"}:
if self.backend and path_str:
try:
responses = self.backend.download_files([path_str])
if (
responses
and responses[0].content is not None
and responses[0].error is None
):
record.before_content = responses[0].content.decode("utf-8")
else:
record.before_content = ""
except Exception:
record.before_content = ""
elif record.physical_path:
record.before_content = _safe_read(record.physical_path) or ""
self.active[tool_call_id] = record
def update_args(self, tool_call_id: str, args: dict[str, Any]) -> None:
"""Update arguments for an active operation and retry capturing before_content."""
record = self.active.get(tool_call_id)
if not record:
return
record.args.update(args)
# If we haven't captured before_content yet, try again now that we might have the path
if record.before_content is None and record.tool_name in {"write_file", "edit_file"}:
path_str = str(record.args.get("file_path") or record.args.get("path") or "")
if path_str:
record.display_path = format_display_path(path_str)
record.physical_path = resolve_physical_path(path_str, self.assistant_id)
if self.backend:
try:
responses = self.backend.download_files([path_str])
if (
responses
and responses[0].content is not None
and responses[0].error is None
):
record.before_content = responses[0].content.decode("utf-8")
else:
record.before_content = ""
except Exception:
record.before_content = ""
elif record.physical_path:
record.before_content = _safe_read(record.physical_path) or ""
def complete_with_message(self, tool_message: Any) -> FileOperationRecord | None:
tool_call_id = getattr(tool_message, "tool_call_id", None)
record = self.active.get(tool_call_id)
if record is None:
return None
content = tool_message.content
if isinstance(content, list):
# Some tool messages may return list segments; join them for analysis.
joined = []
for item in content:
if isinstance(item, str):
joined.append(item)
else:
joined.append(str(item))
content_text = "\n".join(joined)
else:
content_text = str(content) if content is not None else ""
if getattr(
tool_message, "status", "success"
) != "success" or content_text.lower().startswith("error"):
record.status = "error"
record.error = content_text
self._finalize(record)
return record
record.status = "success"
if record.tool_name == "read_file":
record.read_output = content_text
lines = _count_lines(content_text)
record.metrics.lines_read = lines
offset = record.args.get("offset")
limit = record.args.get("limit")
if isinstance(offset, int):
if offset > lines:
offset = 0
record.metrics.start_line = offset + 1
if lines:
record.metrics.end_line = offset + lines
elif lines:
record.metrics.start_line = 1
record.metrics.end_line = lines
if isinstance(limit, int) and lines > limit:
record.metrics.end_line = (record.metrics.start_line or 1) + limit - 1
else:
# For write/edit operations, read back from backend (or local filesystem)
self._populate_after_content(record)
if record.after_content is None:
record.status = "error"
record.error = "Could not read updated file content."
self._finalize(record)
return record
record.metrics.lines_written = _count_lines(record.after_content)
before_lines = _count_lines(record.before_content or "")
diff = compute_unified_diff(
record.before_content or "",
record.after_content,
record.display_path,
max_lines=100,
)
record.diff = diff
if diff:
additions = sum(
1
for line in diff.splitlines()
if line.startswith("+") and not line.startswith("+++")
)
deletions = sum(
1
for line in diff.splitlines()
if line.startswith("-") and not line.startswith("---")
)
record.metrics.lines_added = additions
record.metrics.lines_removed = deletions
elif record.tool_name == "write_file" and (record.before_content or "") == "":
record.metrics.lines_added = record.metrics.lines_written
record.metrics.bytes_written = len(record.after_content.encode("utf-8"))
if record.diff is None and (record.before_content or "") != record.after_content:
record.diff = compute_unified_diff(
record.before_content or "",
record.after_content,
record.display_path,
max_lines=100,
)
if record.diff is None and before_lines != record.metrics.lines_written:
record.metrics.lines_added = max(record.metrics.lines_written - before_lines, 0)
self._finalize(record)
return record
def mark_hitl_approved(self, tool_name: str, args: dict[str, Any]) -> None:
"""Mark operations matching tool_name and file_path as HIL-approved."""
file_path = args.get("file_path") or args.get("path")
if not file_path:
return
# Mark all active records that match
for record in self.active.values():
if record.tool_name == tool_name:
record_path = record.args.get("file_path") or record.args.get("path")
if record_path == file_path:
record.hitl_approved = True
def _populate_after_content(self, record: FileOperationRecord) -> None:
# Use backend if available (works for any BackendProtocol implementation)
if self.backend:
try:
file_path = record.args.get("file_path") or record.args.get("path")
if file_path:
responses = self.backend.download_files([file_path])
if (
responses
and responses[0].content is not None
and responses[0].error is None
):
record.after_content = responses[0].content.decode("utf-8")
else:
record.after_content = None
else:
record.after_content = None
except Exception:
record.after_content = None
else:
# Fallback: direct filesystem read when no backend provided
if record.physical_path is None:
record.after_content = None
return
record.after_content = _safe_read(record.physical_path)
def _finalize(self, record: FileOperationRecord) -> None:
self.completed.append(record)
self.active.pop(record.tool_call_id, None)
```
### image_utils.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/image_utils.py`
```python
"""Utilities for handling image paste from clipboard."""
import base64
import io
import os
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from PIL import Image
@dataclass
class ImageData:
"""Represents a pasted image with its base64 encoding."""
base64_data: str
format: str # "png", "jpeg", etc.
placeholder: str # Display text like "[image 1]"
def to_message_content(self) -> dict:
"""Convert to LangChain message content format.
Returns:
Dict with type and image_url for multimodal messages
"""
return {
"type": "image_url",
"image_url": {"url": f"data:image/{self.format};base64,{self.base64_data}"},
}
def get_clipboard_image() -> ImageData | None:
"""Attempt to read an image from the system clipboard.
Supports macOS via `pngpaste` or `osascript`.
Returns:
ImageData if an image is found, None otherwise
"""
if sys.platform == "darwin":
return _get_macos_clipboard_image()
# Linux/Windows support could be added here
return None
def _get_macos_clipboard_image() -> ImageData | None:
"""Get clipboard image on macOS using pngpaste or osascript.
First tries pngpaste (faster if installed), then falls back to osascript.
Returns:
ImageData if an image is found, None otherwise
"""
# Try pngpaste first (fast if installed)
try:
result = subprocess.run(
["pngpaste", "-"],
capture_output=True,
check=False,
timeout=2,
)
if result.returncode == 0 and result.stdout:
# Successfully got PNG data
try:
Image.open(io.BytesIO(result.stdout)) # Validate it's a real image
base64_data = base64.b64encode(result.stdout).decode("utf-8")
return ImageData(
base64_data=base64_data,
format="png", # 'pngpaste -' always outputs PNG
placeholder="[image]",
)
except Exception:
pass # Invalid image data
except (FileNotFoundError, subprocess.TimeoutExpired):
pass # pngpaste not installed or timed out
# Fallback to osascript with temp file (built-in but slower)
return _get_clipboard_via_osascript()
def _get_clipboard_via_osascript() -> ImageData | None:
"""Get clipboard image via osascript using a temp file.
osascript outputs data in a special format that can't be captured as raw binary,
so we write to a temp file instead.
Returns:
ImageData if an image is found, None otherwise
"""
# Create a temp file for the image
fd, temp_path = tempfile.mkstemp(suffix=".png")
os.close(fd)
try:
# First check if clipboard has PNG data
check_result = subprocess.run(
["osascript", "-e", "clipboard info"],
capture_output=True,
check=False,
timeout=2,
text=True,
)
if check_result.returncode != 0:
return None
# Check for PNG or TIFF in clipboard info
clipboard_info = check_result.stdout.lower()
if "pngf" not in clipboard_info and "tiff" not in clipboard_info:
return None
# Try to get PNG first, fall back to TIFF
if "pngf" in clipboard_info:
get_script = f"""
set pngData to the clipboard as «class PNGf»
set theFile to open for access POSIX file "{temp_path}" with write permission
write pngData to theFile
close access theFile
return "success"
"""
else:
get_script = f"""
set tiffData to the clipboard as TIFF picture
set theFile to open for access POSIX file "{temp_path}" with write permission
write tiffData to theFile
close access theFile
return "success"
"""
result = subprocess.run(
["osascript", "-e", get_script],
capture_output=True,
check=False,
timeout=3,
text=True,
)
if result.returncode != 0 or "success" not in result.stdout:
return None
# Check if file was created and has content
if not os.path.exists(temp_path) or os.path.getsize(temp_path) == 0:
return None
# Read and validate the image
with open(temp_path, "rb") as f:
image_data = f.read()
try:
image = Image.open(io.BytesIO(image_data))
# Convert to PNG if it's not already (e.g., if we got TIFF)
buffer = io.BytesIO()
image.save(buffer, format="PNG")
buffer.seek(0)
base64_data = base64.b64encode(buffer.getvalue()).decode("utf-8")
return ImageData(
base64_data=base64_data,
format="png",
placeholder="[image]",
)
except Exception:
return None
except (subprocess.TimeoutExpired, OSError):
return None
finally:
# Clean up temp file
try:
os.unlink(temp_path)
except OSError:
pass
def encode_image_to_base64(image_bytes: bytes) -> str:
"""Encode image bytes to base64 string.
Args:
image_bytes: Raw image bytes
Returns:
Base64-encoded string
"""
return base64.b64encode(image_bytes).decode("utf-8")
def create_multimodal_content(text: str, images: list[ImageData]) -> list[dict]:
"""Create multimodal message content with text and images.
Args:
text: Text content of the message
images: List of ImageData objects
Returns:
List of content blocks in LangChain format
"""
content_blocks = []
# Add text block
if text.strip():
content_blocks.append({"type": "text", "text": text})
# Add image blocks
for image in images:
content_blocks.append(image.to_message_content())
return content_blocks
```
### input.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/input.py`
```python
"""Input handling, completers, and prompt session for the CLI."""
import asyncio
import os
import re
import time
from collections.abc import Callable
from pathlib import Path
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import (
Completer,
Completion,
PathCompleter,
merge_completers,
)
from prompt_toolkit.document import Document
from prompt_toolkit.enums import EditingMode
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.key_binding import KeyBindings
from .config import COLORS, COMMANDS, SessionState, console
from .image_utils import ImageData, get_clipboard_image
# Regex patterns for context-aware completion
AT_MENTION_RE = re.compile(r"@(?P<path>(?:[^\s@]|(?<=\\)\s)*)$")
SLASH_COMMAND_RE = re.compile(r"^/(?P<command>[a-z]*)$")
EXIT_CONFIRM_WINDOW = 3.0
class ImageTracker:
"""Track pasted images in the current conversation."""
def __init__(self) -> None:
self.images: list[ImageData] = []
self.next_id = 1
def add_image(self, image_data: ImageData) -> str:
"""Add an image and return its placeholder text.
Args:
image_data: The image data to track
Returns:
Placeholder string like "[image 1]"
"""
placeholder = f"[image {self.next_id}]"
image_data.placeholder = placeholder
self.images.append(image_data)
self.next_id += 1
return placeholder
def get_images(self) -> list[ImageData]:
"""Get all tracked images."""
return self.images.copy()
def clear(self) -> None:
"""Clear all tracked images and reset counter."""
self.images.clear()
self.next_id = 1
class FilePathCompleter(Completer):
"""Activate filesystem completion only when cursor is after '@'."""
def __init__(self) -> None:
self.path_completer = PathCompleter(
expanduser=True,
min_input_len=0,
only_directories=False,
)
def get_completions(self, document, complete_event):
"""Get file path completions when @ is detected."""
text = document.text_before_cursor
# Use regex to detect @path pattern at end of line
m = AT_MENTION_RE.search(text)
if not m:
return # Not in an @path context
path_fragment = m.group("path")
# Unescape the path for PathCompleter (it doesn't understand escape sequences)
unescaped_fragment = path_fragment.replace("\\ ", " ")
# Strip trailing backslash if present (user is in the process of typing an escape)
unescaped_fragment = unescaped_fragment.removesuffix("\\")
# Create temporary document for the unescaped path fragment
temp_doc = Document(text=unescaped_fragment, cursor_position=len(unescaped_fragment))
# Get completions from PathCompleter and use its start_position
# PathCompleter returns suffix text with start_position=0 (insert at cursor)
for comp in self.path_completer.get_completions(temp_doc, complete_event):
# Add trailing / for directories so users can continue navigating
completed_path = Path(unescaped_fragment + comp.text).expanduser()
# Re-escape spaces in the completion text for the command line
completion_text = comp.text.replace(" ", "\\ ")
if completed_path.is_dir() and not completion_text.endswith("/"):
completion_text += "/"
yield Completion(
text=completion_text,
start_position=comp.start_position, # Use PathCompleter's position (usually 0)
display=comp.display,
display_meta=comp.display_meta,
)
class CommandCompleter(Completer):
"""Activate command completion only when line starts with '/'."""
def get_completions(self, document, _complete_event):
"""Get command completions when / is at the start."""
text = document.text_before_cursor
# Use regex to detect /command pattern at start of line
m = SLASH_COMMAND_RE.match(text)
if not m:
return # Not in a /command context
command_fragment = m.group("command")
# Match commands that start with the fragment (case-insensitive)
for cmd_name, cmd_desc in COMMANDS.items():
if cmd_name.startswith(command_fragment.lower()):
yield Completion(
text=cmd_name,
start_position=-len(command_fragment), # Fixed position for original document
display=cmd_name,
display_meta=cmd_desc,
)
def parse_file_mentions(text: str) -> tuple[str, list[Path]]:
"""Extract @file mentions and return cleaned text with resolved file paths."""
pattern = r"@((?:[^\s@]|(?<=\\)\s)+)" # Match @filename, allowing escaped spaces
matches = re.findall(pattern, text)
files = []
for match in matches:
# Remove escape characters
clean_path = match.replace("\\ ", " ")
path = Path(clean_path).expanduser()
# Try to resolve relative to cwd
if not path.is_absolute():
path = Path.cwd() / path
try:
path = path.resolve()
if path.exists() and path.is_file():
files.append(path)
else:
console.print(f"[yellow]Warning: File not found: {match}[/yellow]")
except Exception as e:
console.print(f"[yellow]Warning: Invalid path {match}: {e}[/yellow]")
return text, files
def parse_image_placeholders(text: str) -> tuple[str, int]:
"""Count image placeholders in text.
Args:
text: Input text potentially containing [image] or [image N] placeholders
Returns:
Tuple of (text, count) where count is the number of image placeholders found
"""
# Match [image] or [image N] patterns
pattern = r"\[image(?:\s+\d+)?\]"
matches = re.findall(pattern, text, re.IGNORECASE)
return text, len(matches)
def get_bottom_toolbar(
session_state: SessionState, session_ref: dict
) -> Callable[[], list[tuple[str, str]]]:
"""Return toolbar function that shows auto-approve status and BASH MODE."""
def toolbar() -> list[tuple[str, str]]:
parts = []
# Check if we're in BASH mode (input starts with !)
try:
session = session_ref.get("session")
if session:
current_text = session.default_buffer.text
if current_text.startswith("!"):
parts.append(("bg:#ff1493 fg:#ffffff bold", " BASH MODE "))
parts.append(("", " | "))
except (AttributeError, TypeError):
# Silently ignore - toolbar is non-critical and called frequently
pass
# Base status message
if session_state.auto_approve:
base_msg = "auto-accept ON (CTRL+T to toggle)"
base_class = "class:toolbar-green"
else:
base_msg = "manual accept (CTRL+T to toggle)"
base_class = "class:toolbar-orange"
parts.append((base_class, base_msg))
# Show exit confirmation hint if active
hint_until = session_state.exit_hint_until
if hint_until is not None:
now = time.monotonic()
if now < hint_until:
parts.append(("", " | "))
parts.append(("class:toolbar-exit", " Ctrl+C again to exit "))
else:
session_state.exit_hint_until = None
return parts
return toolbar
def create_prompt_session(
_assistant_id: str, session_state: SessionState, image_tracker: ImageTracker | None = None
) -> PromptSession:
"""Create a configured PromptSession with all features."""
# Set default editor if not already set
if "EDITOR" not in os.environ:
os.environ["EDITOR"] = "nano"
# Create key bindings
kb = KeyBindings()
@kb.add("c-c")
def _(event) -> None:
"""Require double Ctrl+C within a short window to exit."""
app = event.app
now = time.monotonic()
if session_state.exit_hint_until is not None and now < session_state.exit_hint_until:
handle = session_state.exit_hint_handle
if handle:
handle.cancel()
session_state.exit_hint_handle = None
session_state.exit_hint_until = None
app.invalidate()
app.exit(exception=KeyboardInterrupt())
return
session_state.exit_hint_until = now + EXIT_CONFIRM_WINDOW
handle = session_state.exit_hint_handle
if handle:
handle.cancel()
loop = asyncio.get_running_loop()
app_ref = app
def clear_hint() -> None:
if (
session_state.exit_hint_until is not None
and time.monotonic() >= session_state.exit_hint_until
):
session_state.exit_hint_until = None
session_state.exit_hint_handle = None
app_ref.invalidate()
session_state.exit_hint_handle = loop.call_later(EXIT_CONFIRM_WINDOW, clear_hint)
app.invalidate()
# Bind Ctrl+T to toggle auto-approve
@kb.add("c-t")
def _(event) -> None:
"""Toggle auto-approve mode."""
session_state.toggle_auto_approve()
# Force UI refresh to update toolbar
event.app.invalidate()
# Custom paste handler to detect images
if image_tracker:
from prompt_toolkit.keys import Keys
def _handle_paste_with_image_check(event, pasted_text: str = "") -> None:
"""Check clipboard for image, otherwise insert pasted text."""
# Try to get an image from clipboard
clipboard_image = get_clipboard_image()
if clipboard_image:
# Found an image! Add it to tracker and insert placeholder
placeholder = image_tracker.add_image(clipboard_image)
# Insert placeholder (no confirmation message)
event.current_buffer.insert_text(placeholder)
elif pasted_text:
# No image, insert the pasted text
event.current_buffer.insert_text(pasted_text)
else:
# Fallback: try to get text from prompt_toolkit clipboard
clipboard_data = event.app.clipboard.get_data()
if clipboard_data and clipboard_data.text:
event.current_buffer.insert_text(clipboard_data.text)
@kb.add(Keys.BracketedPaste)
def _(event) -> None:
"""Handle bracketed paste (Cmd+V on macOS) - check for images first."""
# Bracketed paste provides the pasted text in event.data
pasted_text = event.data if hasattr(event, "data") else ""
_handle_paste_with_image_check(event, pasted_text)
@kb.add("c-v")
def _(event) -> None:
"""Handle Ctrl+V paste - check for images first."""
_handle_paste_with_image_check(event)
# Bind regular Enter to submit (intuitive behavior)
@kb.add("enter")
def _(event) -> None:
"""Enter submits the input, unless completion menu is active."""
buffer = event.current_buffer
# If completion menu is showing, apply the current completion
if buffer.complete_state:
# Get the current completion (the highlighted one)
current_completion = buffer.complete_state.current_completion
# If no completion is selected (user hasn't navigated), select and apply the first one
if not current_completion and buffer.complete_state.completions:
# Move to the first completion
buffer.complete_next()
# Now apply it
buffer.apply_completion(buffer.complete_state.current_completion)
elif current_completion:
# Apply the already-selected completion
buffer.apply_completion(current_completion)
else:
# No completions available, close menu
buffer.complete_state = None
# Don't submit if buffer is empty or only whitespace
elif buffer.text.strip():
# Normal submit
buffer.validate_and_handle()
# If empty, do nothing (don't submit)
# Alt+Enter for newlines (press ESC then Enter, or Option+Enter on Mac)
@kb.add("escape", "enter")
def _(event) -> None:
"""Alt+Enter inserts a newline for multi-line input."""
event.current_buffer.insert_text("\n")
# Ctrl+E to open in external editor
@kb.add("c-e")
def _(event) -> None:
"""Open the current input in an external editor (nano by default)."""
event.current_buffer.open_in_editor()
# Backspace handler to retrigger completions and delete image tags as units
@kb.add("backspace")
def _(event) -> None:
"""Handle backspace: delete image tags as single unit, retrigger completion."""
buffer = event.current_buffer
text_before = buffer.document.text_before_cursor
# Check if cursor is right after an image tag like [image 1] or [image 12]
image_tag_pattern = r"\[image \d+\]$"
match = re.search(image_tag_pattern, text_before)
if match and image_tracker:
# Delete the entire tag
tag_length = len(match.group(0))
buffer.delete_before_cursor(count=tag_length)
# Remove the image from tracker and reset counter
tag_text = match.group(0)
image_num_match = re.search(r"\d+", tag_text)
if image_num_match:
image_num = int(image_num_match.group(0))
# Remove image at index (1-based to 0-based)
if 0 < image_num <= len(image_tracker.images):
image_tracker.images.pop(image_num - 1)
# Reset counter to next available number
image_tracker.next_id = len(image_tracker.images) + 1
else:
# Normal backspace
buffer.delete_before_cursor(count=1)
# Check if we're in a completion context (@ or /)
text = buffer.document.text_before_cursor
if AT_MENTION_RE.search(text) or SLASH_COMMAND_RE.match(text):
# Retrigger completion
buffer.start_completion(select_first=False)
from prompt_toolkit.styles import Style
# Define styles for the toolbar with full-width background colors
toolbar_style = Style.from_dict(
{
"bottom-toolbar": "noreverse", # Disable default reverse video
"toolbar-green": "bg:#10b981 #000000", # Green for auto-accept ON
"toolbar-orange": "bg:#f59e0b #000000", # Orange for manual accept
"toolbar-exit": "bg:#2563eb #ffffff", # Blue for exit hint
}
)
# Create session reference dict for toolbar to access session
session_ref = {}
# Create the session
session = PromptSession(
message=HTML(f'<style fg="{COLORS["user"]}">></style> '),
multiline=True, # Keep multiline support but Enter submits
key_bindings=kb,
completer=merge_completers([CommandCompleter(), FilePathCompleter()]),
editing_mode=EditingMode.EMACS,
complete_while_typing=True, # Show completions as you type
complete_in_thread=True, # Async completion prevents menu freezing
mouse_support=False,
enable_open_in_editor=True, # Allow Ctrl+X Ctrl+E to open external editor
bottom_toolbar=get_bottom_toolbar(
session_state, session_ref
), # Persistent status bar at bottom
style=toolbar_style, # Apply toolbar styling
reserve_space_for_menu=7, # Reserve space for completion menu to show 5-6 results
)
# Store session reference for toolbar to access
session_ref["session"] = session
return session
```
### local_context.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/local_context.py`
```python
"""Middleware for injecting local context into system prompt."""
from __future__ import annotations
import subprocess
from collections.abc import Awaitable, Callable
from pathlib import Path
from typing import NotRequired, TypedDict, cast
from langchain.agents.middleware.types import (
AgentMiddleware,
AgentState,
ModelRequest,
ModelResponse,
)
from langgraph.runtime import Runtime
# Directories to ignore in file listings and tree views
IGNORE_PATTERNS = frozenset(
{
".git",
"node_modules",
".venv",
"__pycache__",
".pytest_cache",
".mypy_cache",
".ruff_cache",
".tox",
".coverage",
".eggs",
"dist",
"build",
}
)
class LocalContextState(AgentState):
"""State for local context middleware."""
local_context: NotRequired[str]
"""Formatted local context: git, cwd, files, tree."""
class LocalContextStateUpdate(TypedDict):
"""State update for local context middleware."""
local_context: str
"""Formatted local context: git, cwd, files, tree."""
class LocalContextMiddleware(AgentMiddleware):
"""Middleware for injecting local context into system prompt.
This middleware:
1. Detects current git branch (if in a git repo)
2. Checks if main/master branches exist locally
3. Lists files in current directory (max 20)
4. Shows directory tree structure (max 3 levels, 20 entries)
5. Appends local context to system prompt
"""
state_schema = LocalContextState
def _get_git_info(self) -> dict[str, str | list[str]]:
"""Gather git state information.
Returns:
Dict with 'branch' (current branch) and 'main_branches' (list of main/master if they exist).
Returns empty dict if not in git repo.
"""
try:
# Get current branch
result = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True,
text=True,
timeout=2,
cwd=Path.cwd(),
check=False,
)
if result.returncode != 0:
return {}
current_branch = result.stdout.strip()
# Get local branches to check for main/master
main_branches = []
result = subprocess.run(
["git", "branch"],
capture_output=True,
text=True,
timeout=2,
cwd=Path.cwd(),
check=False,
)
if result.returncode == 0:
branches = set()
for line in result.stdout.strip().split("\n"):
branch = line.strip().lstrip("*").strip()
if branch:
branches.add(branch)
if "main" in branches:
main_branches.append("main")
if "master" in branches:
main_branches.append("master")
return {"branch": current_branch, "main_branches": main_branches}
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return {}
def _get_file_list(self, max_files: int = 20) -> list[str]:
"""Get list of files in current directory (non-recursive).
Args:
max_files: Maximum number of files to show (default 20).
Returns:
List of file paths (sorted), truncated to max_files.
"""
cwd = Path.cwd()
files = []
try:
for item in sorted(cwd.iterdir()):
# Skip hidden files (except .deepagents)
if item.name.startswith(".") and item.name != ".deepagents":
continue
# Skip ignored patterns
if item.name in IGNORE_PATTERNS:
continue
# Add files and dirs
if item.is_file():
files.append(item.name)
elif item.is_dir():
files.append(f"{item.name}/")
if len(files) >= max_files:
break
except (OSError, PermissionError):
return []
return files
def _get_directory_tree(self, max_depth: int = 3, max_entries: int = 20) -> str:
"""Get directory tree structure.
Args:
max_depth: Maximum depth to traverse (default 3).
max_entries: Maximum total entries to show (default 20).
Returns:
Formatted tree string or empty if error.
"""
cwd = Path.cwd()
lines: list[str] = []
entry_count = [0] # Mutable for closure
def _should_include(item: Path) -> bool:
"""Check if item should be included in tree."""
# Skip hidden files (except .deepagents)
if item.name.startswith(".") and item.name != ".deepagents":
return False
# Skip ignored patterns
return item.name not in IGNORE_PATTERNS
def _build_tree(path: Path, prefix: str = "", depth: int = 0) -> None:
"""Recursive tree builder."""
if depth >= max_depth or entry_count[0] >= max_entries:
return
try:
all_items = sorted(path.iterdir(), key=lambda p: (not p.is_dir(), p.name))
# Pre-filter to get correct is_last determination
items = [item for item in all_items if _should_include(item)]
except (OSError, PermissionError):
return
for i, item in enumerate(items):
if entry_count[0] >= max_entries:
lines.append(f"{prefix}... (truncated)")
return
is_last = i == len(items) - 1
connector = "└── " if is_last else "├── "
display_name = f"{item.name}/" if item.is_dir() else item.name
lines.append(f"{prefix}{connector}{display_name}")
entry_count[0] += 1
# Recurse into directories
if item.is_dir() and depth + 1 < max_depth:
extension = " " if is_last else "│ "
_build_tree(item, prefix + extension, depth + 1)
try:
lines.append(f"{cwd.name}/")
_build_tree(cwd)
except (OSError, PermissionError):
return ""
return "\n".join(lines)
def _detect_package_manager(self) -> str | None:
"""Detect Python package manager in use.
Checks for lock files and config files to determine the package manager.
Uses priority order: `uv > poetry > pipenv > pip`. First match wins if multiple
indicators are present.
Returns:
Package manager name (uv, poetry, pipenv, pip) or `None` if not detected.
"""
cwd = Path.cwd()
# Check for uv (uv.lock or pyproject.toml with [tool.uv])
if (cwd / "uv.lock").exists():
return "uv"
# Check for poetry (poetry.lock or pyproject.toml with [tool.poetry])
if (cwd / "poetry.lock").exists():
return "poetry"
# Check for pipenv
if (cwd / "Pipfile.lock").exists() or (cwd / "Pipfile").exists():
return "pipenv"
# Check pyproject.toml for tool sections
pyproject = cwd / "pyproject.toml"
if pyproject.exists():
try:
content = pyproject.read_text()
if "[tool.uv]" in content:
return "uv"
if "[tool.poetry]" in content:
return "poetry"
# Has pyproject.toml but no specific tool - likely pip/setuptools
return "pip"
except (OSError, PermissionError, UnicodeDecodeError):
pass
# Check for requirements.txt
if (cwd / "requirements.txt").exists():
return "pip"
return None
def _detect_node_package_manager(self) -> str | None:
"""Detect Node.js package manager in use.
Uses priority order: `bun > pnpm > yarn > npm`.
First match wins if multiple lock files are present.
Returns:
Package manager name (bun, pnpm, yarn, npm) or `None` if not detected.
"""
cwd = Path.cwd()
if (cwd / "bun.lockb").exists() or (cwd / "bun.lock").exists():
return "bun"
if (cwd / "pnpm-lock.yaml").exists():
return "pnpm"
if (cwd / "yarn.lock").exists():
return "yarn"
if (cwd / "package-lock.json").exists() or (cwd / "package.json").exists():
return "npm"
return None
def _get_makefile_preview(self, max_lines: int = 20) -> str | None:
"""Get first N lines of `Makefile` if present.
Args:
max_lines: Maximum lines to show.
Returns:
`Makefile` preview or `None` if not found.
"""
cwd = Path.cwd()
makefile = cwd / "Makefile"
if not makefile.exists():
return None
try:
content = makefile.read_text()
lines = content.split("\n")[:max_lines]
preview = "\n".join(lines)
if len(content.split("\n")) > max_lines:
preview += "\n... (truncated)"
return preview
except (OSError, PermissionError, UnicodeDecodeError):
return None
def _detect_project_info(self) -> dict[str, str | bool | None]:
"""Detect project type, language, and structure.
Returns:
Dict with `language`, `is_monorepo`, `project_root`, `has_venv`, `has_node_modules`.
"""
cwd = Path.cwd()
info: dict[str, str | bool | None] = {
"language": None,
"is_monorepo": False,
"project_root": None,
"has_venv": False,
"has_node_modules": False,
}
# Check for virtual environments
info["has_venv"] = (cwd / ".venv").exists() or (cwd / "venv").exists()
info["has_node_modules"] = (cwd / "node_modules").exists()
# Detect primary language
if (cwd / "pyproject.toml").exists() or (cwd / "setup.py").exists():
info["language"] = "python"
elif (cwd / "package.json").exists():
info["language"] = "javascript/typescript"
elif (cwd / "Cargo.toml").exists():
info["language"] = "rust"
elif (cwd / "go.mod").exists():
info["language"] = "go"
elif (cwd / "pom.xml").exists() or (cwd / "build.gradle").exists():
info["language"] = "java"
# Detect monorepo patterns
# Check for common monorepo indicators
monorepo_indicators = [
(cwd / "lerna.json").exists(),
(cwd / "pnpm-workspace.yaml").exists(),
(cwd / "packages").is_dir(),
(cwd / "libs").is_dir() and (cwd / "apps").is_dir(),
(cwd / "workspaces").is_dir(),
]
info["is_monorepo"] = any(monorepo_indicators)
# Try to find project root (look for .git or pyproject.toml up the tree)
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True,
timeout=2,
cwd=cwd,
check=False,
)
if result.returncode == 0:
info["project_root"] = result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
pass
return info
def _detect_test_command(self) -> str | None:
"""Detect how to run tests based on project structure.
Returns:
Suggested test command or `None` if not detected.
"""
cwd = Path.cwd()
# Check Makefile for test target
makefile = cwd / "Makefile"
if makefile.exists():
try:
content = makefile.read_text()
if "test:" in content or "tests:" in content:
return "make test"
except (OSError, PermissionError, UnicodeDecodeError):
pass
# Python projects
if (cwd / "pyproject.toml").exists():
pyproject = cwd / "pyproject.toml"
try:
content = pyproject.read_text()
if "[tool.pytest" in content or (cwd / "pytest.ini").exists():
return "pytest"
except (OSError, PermissionError, UnicodeDecodeError):
pass
if (cwd / "tests").is_dir() or (cwd / "test").is_dir():
return "pytest"
# Node projects
if (cwd / "package.json").exists():
try:
import json
pkg = json.loads((cwd / "package.json").read_text())
if "scripts" in pkg and "test" in pkg["scripts"]:
return "npm test"
except (OSError, PermissionError, UnicodeDecodeError, json.JSONDecodeError):
pass
return None
def before_agent(
self,
state: LocalContextState,
runtime: Runtime,
) -> LocalContextStateUpdate | None:
"""Load local context before agent execution.
Runs once at session start to preserve prompt caching.
Args:
state: Current agent state.
runtime: Runtime context.
Returns:
Updated state with local_context populated, or None if already set.
"""
# Only compute context on first interaction to preserve prompt caching
if state.get("local_context"):
return None
cwd = Path.cwd()
sections = ["## Local Context", ""]
# Current directory
sections.append(f"**Current Directory**: `{cwd}`")
sections.append("")
# Project info (language, monorepo, root, environments)
project_info = self._detect_project_info()
project_lines = []
if project_info.get("language"):
project_lines.append(f"Language: {project_info['language']}")
if project_info.get("project_root") and str(project_info["project_root"]) != str(cwd):
project_lines.append(f"Project root: `{project_info['project_root']}`")
if project_info.get("is_monorepo"):
project_lines.append("Monorepo: yes")
env_indicators = []
if project_info.get("has_venv"):
env_indicators.append(".venv")
if project_info.get("has_node_modules"):
env_indicators.append("node_modules")
if env_indicators:
project_lines.append(f"Environments: {', '.join(env_indicators)}")
if project_lines:
sections.append("**Project**:")
sections.extend(f"- {line}" for line in project_lines)
sections.append("")
# Package managers
pkg_managers = []
python_pkg = self._detect_package_manager()
if python_pkg:
pkg_managers.append(f"Python: {python_pkg}")
node_pkg = self._detect_node_package_manager()
if node_pkg:
pkg_managers.append(f"Node: {node_pkg}")
if pkg_managers:
sections.append(f"**Package Manager**: {', '.join(pkg_managers)}")
sections.append("")
# Git info
git_info = self._get_git_info()
if git_info:
git_text = f"**Git**: Current branch `{git_info['branch']}`"
if git_info.get("main_branches"):
main_branches = ", ".join(f"`{b}`" for b in git_info["main_branches"])
git_text += f", main branch available: {main_branches}"
sections.append(git_text)
sections.append("")
# Test command
test_cmd = self._detect_test_command()
if test_cmd:
sections.append(f"**Run Tests**: `{test_cmd}`")
sections.append("")
# File list
files = self._get_file_list()
if files:
total_items = len(list(Path.cwd().iterdir()))
sections.append(f"**Files** ({len(files)} shown):")
for file in files:
sections.append(f"- {file}")
if len(files) < total_items:
remaining = total_items - len(files)
sections.append(f"... ({remaining} more files)")
sections.append("")
# Directory tree
tree = self._get_directory_tree()
if tree:
sections.append("**Tree** (3 levels):")
sections.append("```text")
sections.append(tree)
sections.append("```")
sections.append("")
# Makefile preview
makefile_preview = self._get_makefile_preview()
if makefile_preview:
sections.append("**Makefile** (first 20 lines):")
sections.append("```makefile")
sections.append(makefile_preview)
sections.append("```")
local_context = "\n".join(sections)
return LocalContextStateUpdate(local_context=local_context)
def _get_modified_request(self, request: ModelRequest) -> ModelRequest | None:
"""Get modified request with local context injected, or None if no context.
Args:
request: The original model request.
Returns:
Modified request with local context appended, or None if no local context.
"""
state = cast("LocalContextState", request.state)
local_context = state.get("local_context", "")
if not local_context:
return None
# Append local context to system prompt
system_prompt = request.system_prompt or ""
new_prompt = system_prompt + "\n\n" + local_context
return request.override(system_prompt=new_prompt)
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
"""Inject local context into system prompt.
Args:
request: The model request being processed.
handler: The handler function to call with the modified request.
Returns:
The model response from the handler.
"""
modified_request = self._get_modified_request(request)
return handler(modified_request if modified_request else request)
async def awrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], Awaitable[ModelResponse]],
) -> ModelResponse:
"""(async) Inject local context into system prompt.
Args:
request: The model request being processed.
handler: The handler function to call with the modified request.
Returns:
The model response from the handler.
"""
modified_request = self._get_modified_request(request)
return await handler(modified_request if modified_request else request)
__all__ = ["LocalContextMiddleware"]
```
### main.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/main.py`
```python
"""Main entry point and CLI loop for deepagents."""
# ruff: noqa: T201, E402, BLE001, PLR0912, PLR0915
# Suppress deprecation warnings from langchain_core (e.g., Pydantic V1 on Python 3.14+)
# ruff: noqa: E402
import warnings
warnings.filterwarnings("ignore", module="langchain_core._api.deprecation")
import argparse
import asyncio
import contextlib
import os
import sys
import warnings
from pathlib import Path
# Suppress Pydantic v1 compatibility warnings from langchain on Python 3.14+
warnings.filterwarnings("ignore", message=".*Pydantic V1.*", category=UserWarning)
from rich.text import Text
from deepagents_cli._version import __version__
# Now safe to import agent (which imports LangChain modules)
from deepagents_cli.agent import create_cli_agent, list_agents, reset_agent
# CRITICAL: Import config FIRST to set LANGSMITH_PROJECT before LangChain loads
from deepagents_cli.config import (
console,
create_model,
settings,
)
from deepagents_cli.integrations.sandbox_factory import create_sandbox
from deepagents_cli.sessions import (
delete_thread_command,
generate_thread_id,
get_checkpointer,
get_most_recent,
get_thread_agent,
list_threads_command,
thread_exists,
)
from deepagents_cli.skills import execute_skills_command, setup_skills_parser
from deepagents_cli.tools import fetch_url, http_request, web_search
from deepagents_cli.ui import show_help
def check_cli_dependencies() -> None:
"""Check if CLI optional dependencies are installed."""
missing = []
try:
import requests # noqa: F401
except ImportError:
missing.append("requests")
try:
import dotenv # noqa: F401
except ImportError:
missing.append("python-dotenv")
try:
import tavily # noqa: F401
except ImportError:
missing.append("tavily-python")
try:
import textual # noqa: F401
except ImportError:
missing.append("textual")
if missing:
print("\n❌ Missing required CLI dependencies!")
print("\nThe following packages are required to use the deepagents CLI:")
for pkg in missing:
print(f" - {pkg}")
print("\nPlease install them with:")
print(" pip install deepagents[cli]")
print("\nOr install all dependencies:")
print(" pip install 'deepagents[cli]'")
sys.exit(1)
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="DeepAgents - AI Coding Assistant",
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False,
)
parser.add_argument(
"--version",
action="version",
version=f"deepagents {__version__}",
)
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# List command
subparsers.add_parser("list", help="List all available agents")
# Help command
subparsers.add_parser("help", help="Show help information")
# Reset command
reset_parser = subparsers.add_parser("reset", help="Reset an agent")
reset_parser.add_argument("--agent", required=True, help="Name of agent to reset")
reset_parser.add_argument(
"--target", dest="source_agent", help="Copy prompt from another agent"
)
# Skills command - setup delegated to skills module
setup_skills_parser(subparsers)
# Threads command
threads_parser = subparsers.add_parser("threads", help="Manage conversation threads")
threads_sub = threads_parser.add_subparsers(dest="threads_command")
# threads list
threads_list = threads_sub.add_parser("list", help="List threads")
threads_list.add_argument(
"--agent", default=None, help="Filter by agent name (default: show all)"
)
threads_list.add_argument("--limit", type=int, default=20, help="Max threads (default: 20)")
# threads delete
threads_delete = threads_sub.add_parser("delete", help="Delete a thread")
threads_delete.add_argument("thread_id", help="Thread ID to delete")
# Default interactive mode
parser.add_argument(
"--agent",
default="agent",
help="Agent identifier for separate memory stores (default: agent).",
)
# Thread resume argument - matches PR #638: -r for most recent, -r <ID> for specific
parser.add_argument(
"-r",
"--resume",
dest="resume_thread",
nargs="?",
const="__MOST_RECENT__",
default=None,
help="Resume thread: -r for most recent, -r <ID> for specific thread",
)
# Initial prompt - auto-submit when session starts
parser.add_argument(
"-m",
"--message",
dest="initial_prompt",
help="Initial prompt to auto-submit when session starts",
)
parser.add_argument(
"--model",
help="Model to use (e.g., claude-sonnet-4-5-20250929, gpt-5-mini). "
"Provider is auto-detected from model name.",
)
parser.add_argument(
"--auto-approve",
action="store_true",
help="Auto-approve tool usage without prompting (disables human-in-the-loop)",
)
parser.add_argument(
"--sandbox",
choices=["none", "modal", "daytona", "runloop"],
default="none",
help="Remote sandbox for code execution (default: none - local only)",
)
parser.add_argument(
"--sandbox-id",
help="Existing sandbox ID to reuse (skips creation and cleanup)",
)
parser.add_argument(
"--sandbox-setup",
help="Path to setup script to run in sandbox after creation",
)
return parser.parse_args()
async def run_textual_cli_async(
assistant_id: str,
*,
auto_approve: bool = False,
sandbox_type: str = "none",
sandbox_id: str | None = None,
model_name: str | None = None,
thread_id: str | None = None,
is_resumed: bool = False,
initial_prompt: str | None = None,
) -> None:
"""Run the Textual CLI interface (async version).
Args:
assistant_id: Agent identifier for memory storage
auto_approve: Whether to auto-approve tool usage
sandbox_type: Type of sandbox ("none", "modal", "runloop", "daytona")
sandbox_id: Optional existing sandbox ID to reuse
model_name: Optional model name to use
thread_id: Thread ID to use (new or resumed)
is_resumed: Whether this is a resumed session
initial_prompt: Optional prompt to auto-submit when session starts
"""
from deepagents_cli.app import run_textual_app
model = create_model(model_name)
# Show thread info
if is_resumed:
console.print(f"[green]Resuming thread:[/green] {thread_id}")
else:
console.print(f"[dim]Thread: {thread_id}[/dim]")
# Use async context manager for checkpointer
async with get_checkpointer() as checkpointer:
# Create agent with conditional tools
tools = [http_request, fetch_url]
if settings.has_tavily:
tools.append(web_search)
# Handle sandbox mode
sandbox_backend = None
sandbox_cm = None
if sandbox_type != "none":
try:
# Create sandbox context manager but keep it open
sandbox_cm = create_sandbox(sandbox_type, sandbox_id=sandbox_id)
sandbox_backend = sandbox_cm.__enter__()
except (ImportError, ValueError, RuntimeError, NotImplementedError) as e:
console.print()
console.print("[red]❌ Sandbox creation failed[/red]")
console.print(Text(str(e), style="dim"))
sys.exit(1)
try:
agent, composite_backend = create_cli_agent(
model=model,
assistant_id=assistant_id,
tools=tools,
sandbox=sandbox_backend,
sandbox_type=sandbox_type if sandbox_type != "none" else None,
auto_approve=auto_approve,
checkpointer=checkpointer,
)
# Run Textual app
await run_textual_app(
agent=agent,
assistant_id=assistant_id,
backend=composite_backend,
auto_approve=auto_approve,
cwd=Path.cwd(),
thread_id=thread_id,
initial_prompt=initial_prompt,
)
except Exception as e:
error_text = Text("❌ Failed to create agent: ", style="red")
error_text.append(str(e))
console.print(error_text)
sys.exit(1)
finally:
# Clean up sandbox if we created one
if sandbox_cm is not None:
with contextlib.suppress(Exception):
sandbox_cm.__exit__(None, None, None)
def cli_main() -> None:
"""Entry point for console script."""
# Fix for gRPC fork issue on macOS
# https://github.com/grpc/grpc/issues/37642
if sys.platform == "darwin":
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0"
# Note: LANGSMITH_PROJECT is already overridden in config.py (before LangChain imports)
# This ensures agent traces → DEEPAGENTS_LANGSMITH_PROJECT
# Shell commands → user's original LANGSMITH_PROJECT (via ShellMiddleware env)
# Check dependencies first
check_cli_dependencies()
try:
args = parse_args()
if args.command == "help":
show_help()
elif args.command == "list":
list_agents()
elif args.command == "reset":
reset_agent(args.agent, args.source_agent)
elif args.command == "skills":
execute_skills_command(args)
elif args.command == "threads":
if args.threads_command == "list":
asyncio.run(
list_threads_command(
agent_name=getattr(args, "agent", None),
limit=getattr(args, "limit", 20),
)
)
elif args.threads_command == "delete":
asyncio.run(delete_thread_command(args.thread_id))
else:
console.print("[yellow]Usage: deepagents threads <list|delete>[/yellow]")
else:
# Interactive mode - handle thread resume
thread_id = None
is_resumed = False
if args.resume_thread == "__MOST_RECENT__":
# -r (no ID): Get most recent thread
# If --agent specified, filter by that agent; otherwise get most recent overall
agent_filter = args.agent if args.agent != "agent" else None
thread_id = asyncio.run(get_most_recent(agent_filter))
if thread_id:
is_resumed = True
agent_name = asyncio.run(get_thread_agent(thread_id))
if agent_name:
args.agent = agent_name
else:
if agent_filter:
msg = Text("No previous thread for '", style="yellow")
msg.append(args.agent)
msg.append("', starting new.", style="yellow")
else:
msg = Text("No previous threads, starting new.", style="yellow")
console.print(msg)
elif args.resume_thread:
# -r <ID>: Resume specific thread
if asyncio.run(thread_exists(args.resume_thread)):
thread_id = args.resume_thread
is_resumed = True
if args.agent == "agent":
agent_name = asyncio.run(get_thread_agent(thread_id))
if agent_name:
args.agent = agent_name
else:
error_msg = Text("Thread '", style="red")
error_msg.append(args.resume_thread)
error_msg.append("' not found.", style="red")
console.print(error_msg)
console.print(
"[dim]Use 'deepagents threads list' to see available threads.[/dim]"
)
sys.exit(1)
# Generate new thread ID if not resuming
if thread_id is None:
thread_id = generate_thread_id()
# Run Textual CLI
asyncio.run(
run_textual_cli_async(
assistant_id=args.agent,
auto_approve=args.auto_approve,
sandbox_type=args.sandbox,
sandbox_id=args.sandbox_id,
model_name=getattr(args, "model", None),
thread_id=thread_id,
is_resumed=is_resumed,
initial_prompt=getattr(args, "initial_prompt", None),
)
)
except KeyboardInterrupt:
# Clean exit on Ctrl+C - suppress ugly traceback
console.print("\n\n[yellow]Interrupted[/yellow]")
sys.exit(0)
if __name__ == "__main__":
cli_main()
```
### project_utils.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/project_utils.py`
```python
"""Utilities for project root detection and project-specific configuration."""
from pathlib import Path
def find_project_root(start_path: Path | None = None) -> Path | None:
"""Find the project root by looking for .git directory.
Walks up the directory tree from start_path (or cwd) looking for a .git
directory, which indicates the project root.
Args:
start_path: Directory to start searching from. Defaults to current working directory.
Returns:
Path to the project root if found, None otherwise.
"""
current = Path(start_path or Path.cwd()).resolve()
# Walk up the directory tree
for parent in [current, *list(current.parents)]:
git_dir = parent / ".git"
if git_dir.exists():
return parent
return None
def find_project_agent_md(project_root: Path) -> list[Path]:
"""Find project-specific agent.md file(s).
Checks two locations and returns ALL that exist:
1. project_root/.deepagents/agent.md
2. project_root/agent.md
Both files will be loaded and combined if both exist.
Args:
project_root: Path to the project root directory.
Returns:
List of paths to project agent.md files (may contain 0, 1, or 2 paths).
"""
paths = []
# Check .deepagents/agent.md (preferred)
deepagents_md = project_root / ".deepagents" / "agent.md"
if deepagents_md.exists():
paths.append(deepagents_md)
# Check root agent.md (fallback, but also include if both exist)
root_md = project_root / "agent.md"
if root_md.exists():
paths.append(root_md)
return paths
```
### sessions.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/sessions.py`
```python
"""Thread management using LangGraph's built-in checkpoint persistence."""
import uuid
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
import aiosqlite
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from rich.table import Table
from deepagents_cli.config import COLORS, console
# Patch aiosqlite.Connection to add is_alive() method required by langgraph-checkpoint>=2.1.0
# See: https://github.com/langchain-ai/langgraph/issues/6583
if not hasattr(aiosqlite.Connection, "is_alive"):
def _is_alive(self: aiosqlite.Connection) -> bool:
"""Check if the connection is still alive."""
return self._connection is not None
aiosqlite.Connection.is_alive = _is_alive
def _format_timestamp(iso_timestamp: str | None) -> str:
"""Format ISO timestamp for display (e.g., 'Dec 30, 6:10pm')."""
if not iso_timestamp:
return ""
try:
dt = datetime.fromisoformat(iso_timestamp).astimezone()
return dt.strftime("%b %d, %-I:%M%p").lower().replace("am", "am").replace("pm", "pm")
except (ValueError, TypeError):
return ""
def get_db_path() -> Path:
"""Get path to global database."""
db_dir = Path.home() / ".deepagents"
db_dir.mkdir(parents=True, exist_ok=True)
return db_dir / "sessions.db"
def generate_thread_id() -> str:
"""Generate a new 8-char hex thread ID."""
return uuid.uuid4().hex[:8]
async def _table_exists(conn: aiosqlite.Connection, table: str) -> bool:
"""Check if a table exists in the database."""
query = "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?"
async with conn.execute(query, (table,)) as cursor:
return await cursor.fetchone() is not None
async def list_threads(
agent_name: str | None = None,
limit: int = 20,
) -> list[dict]:
"""List threads from checkpoints table."""
db_path = str(get_db_path())
async with aiosqlite.connect(db_path, timeout=30.0) as conn:
# Return empty if table doesn't exist yet (fresh install)
if not await _table_exists(conn, "checkpoints"):
return []
if agent_name:
query = """
SELECT thread_id,
json_extract(metadata, '$.agent_name') as agent_name,
MAX(json_extract(metadata, '$.updated_at')) as updated_at
FROM checkpoints
WHERE json_extract(metadata, '$.agent_name') = ?
GROUP BY thread_id
ORDER BY updated_at DESC
LIMIT ?
"""
params: tuple = (agent_name, limit)
else:
query = """
SELECT thread_id,
json_extract(metadata, '$.agent_name') as agent_name,
MAX(json_extract(metadata, '$.updated_at')) as updated_at
FROM checkpoints
GROUP BY thread_id
ORDER BY updated_at DESC
LIMIT ?
"""
params = (limit,)
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [{"thread_id": r[0], "agent_name": r[1], "updated_at": r[2]} for r in rows]
async def get_most_recent(agent_name: str | None = None) -> str | None:
"""Get most recent thread_id, optionally filtered by agent."""
db_path = str(get_db_path())
async with aiosqlite.connect(db_path, timeout=30.0) as conn:
if not await _table_exists(conn, "checkpoints"):
return None
if agent_name:
query = """
SELECT thread_id FROM checkpoints
WHERE json_extract(metadata, '$.agent_name') = ?
ORDER BY checkpoint_id DESC
LIMIT 1
"""
params: tuple = (agent_name,)
else:
query = "SELECT thread_id FROM checkpoints ORDER BY checkpoint_id DESC LIMIT 1"
params = ()
async with conn.execute(query, params) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def get_thread_agent(thread_id: str) -> str | None:
"""Get agent_name for a thread."""
db_path = str(get_db_path())
async with aiosqlite.connect(db_path, timeout=30.0) as conn:
if not await _table_exists(conn, "checkpoints"):
return None
query = """
SELECT json_extract(metadata, '$.agent_name')
FROM checkpoints
WHERE thread_id = ?
LIMIT 1
"""
async with conn.execute(query, (thread_id,)) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def thread_exists(thread_id: str) -> bool:
"""Check if a thread exists in checkpoints."""
db_path = str(get_db_path())
async with aiosqlite.connect(db_path, timeout=30.0) as conn:
if not await _table_exists(conn, "checkpoints"):
return False
query = "SELECT 1 FROM checkpoints WHERE thread_id = ? LIMIT 1"
async with conn.execute(query, (thread_id,)) as cursor:
row = await cursor.fetchone()
return row is not None
async def delete_thread(thread_id: str) -> bool:
"""Delete thread checkpoints. Returns True if deleted."""
db_path = str(get_db_path())
async with aiosqlite.connect(db_path, timeout=30.0) as conn:
if not await _table_exists(conn, "checkpoints"):
return False
cursor = await conn.execute("DELETE FROM checkpoints WHERE thread_id = ?", (thread_id,))
deleted = cursor.rowcount > 0
if await _table_exists(conn, "writes"):
await conn.execute("DELETE FROM writes WHERE thread_id = ?", (thread_id,))
await conn.commit()
return deleted
@asynccontextmanager
async def get_checkpointer() -> AsyncIterator[AsyncSqliteSaver]:
"""Get AsyncSqliteSaver for the global database."""
async with AsyncSqliteSaver.from_conn_string(str(get_db_path())) as checkpointer:
yield checkpointer
async def list_threads_command(
agent_name: str | None = None,
limit: int = 20,
) -> None:
"""CLI handler for: deepagents threads list."""
threads = await list_threads(agent_name, limit=limit)
if not threads:
if agent_name:
console.print(f"[yellow]No threads found for agent '{agent_name}'.[/yellow]")
else:
console.print("[yellow]No threads found.[/yellow]")
console.print("[dim]Start a conversation with: deepagents[/dim]")
return
title = f"Threads for '{agent_name}'" if agent_name else "All Threads"
table = Table(title=title, show_header=True, header_style=f"bold {COLORS['primary']}")
table.add_column("Thread ID", style="bold")
table.add_column("Agent")
table.add_column("Last Used", style="dim")
for t in threads:
table.add_row(
t["thread_id"],
t["agent_name"] or "unknown",
_format_timestamp(t.get("updated_at")),
)
console.print()
console.print(table)
console.print()
async def delete_thread_command(thread_id: str) -> None:
"""CLI handler for: deepagents threads delete."""
deleted = await delete_thread(thread_id)
if deleted:
console.print(f"[green]Thread '{thread_id}' deleted.[/green]")
else:
console.print(f"[red]Thread '{thread_id}' not found.[/red]")
```
### shell.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/shell.py`
```python
"""Simplified middleware that exposes a basic shell tool to agents."""
from __future__ import annotations
import os
import subprocess
from typing import Any
from langchain.agents.middleware.types import AgentMiddleware, AgentState
from langchain.tools import ToolRuntime, tool
from langchain_core.messages import ToolMessage
from langchain_core.tools.base import ToolException
class ShellMiddleware(AgentMiddleware[AgentState, Any]):
"""Give basic shell access to agents via the shell.
This shell will execute on the local machine and has NO safeguards except
for the human in the loop safeguard provided by the CLI itself.
"""
def __init__(
self,
*,
workspace_root: str,
timeout: float = 120.0,
max_output_bytes: int = 100_000,
env: dict[str, str] | None = None,
) -> None:
"""Initialize an instance of `ShellMiddleware`.
Args:
workspace_root: Working directory for shell commands.
timeout: Maximum time in seconds to wait for command completion.
Defaults to 120 seconds.
max_output_bytes: Maximum number of bytes to capture from command output.
Defaults to 100,000 bytes.
env: Environment variables to pass to the subprocess. If None,
uses the current process's environment. Defaults to None.
"""
super().__init__()
self._timeout = timeout
self._max_output_bytes = max_output_bytes
self._tool_name = "shell"
self._env = env if env is not None else os.environ.copy()
self._workspace_root = workspace_root
# Build description with working directory information
description = (
f"Execute a shell command directly on the host. Commands will run in "
f"the working directory: {workspace_root}. Each command runs in a fresh shell "
f"environment with the current process's environment variables. Commands may "
f"be truncated if they exceed the configured timeout or output limits."
)
@tool(self._tool_name, description=description)
def shell_tool(
command: str,
runtime: ToolRuntime[None, AgentState],
) -> ToolMessage | str:
"""Execute a shell command.
Args:
command: The shell command to execute.
runtime: The tool runtime context.
"""
return self._run_shell_command(command, tool_call_id=runtime.tool_call_id)
self._shell_tool = shell_tool
self.tools = [self._shell_tool]
def _run_shell_command(
self,
command: str,
*,
tool_call_id: str | None,
) -> ToolMessage | str:
"""Execute a shell command and return the result.
Args:
command: The shell command to execute.
tool_call_id: The tool call ID for creating a ToolMessage.
Returns:
A ToolMessage with the command output or an error message.
"""
if not command or not isinstance(command, str):
msg = "Shell tool expects a non-empty command string."
raise ToolException(msg)
try:
result = subprocess.run(
command,
check=False,
shell=True,
capture_output=True,
text=True,
timeout=self._timeout,
env=self._env,
cwd=self._workspace_root,
)
# Combine stdout and stderr
output_parts = []
if result.stdout:
output_parts.append(result.stdout)
if result.stderr:
stderr_lines = result.stderr.strip().split("\n")
for line in stderr_lines:
output_parts.append(f"[stderr] {line}")
output = "\n".join(output_parts) if output_parts else "<no output>"
# Truncate output if needed
if len(output) > self._max_output_bytes:
output = output[: self._max_output_bytes]
output += f"\n\n... Output truncated at {self._max_output_bytes} bytes."
# Add exit code info if non-zero
if result.returncode != 0:
output = f"{output.rstrip()}\n\nExit code: {result.returncode}"
status = "error"
else:
status = "success"
except subprocess.TimeoutExpired:
output = f"Error: Command timed out after {self._timeout:.1f} seconds."
status = "error"
return ToolMessage(
content=output,
tool_call_id=tool_call_id,
name=self._tool_name,
status=status,
)
__all__ = ["ShellMiddleware"]
```
### textual_adapter.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/textual_adapter.py`
```python
"""Textual UI adapter for agent execution."""
# ruff: noqa: PLR0912, PLR0915, ANN401, PLR2004, BLE001, TRY203
# This module has complex streaming logic ported from execution.py
from __future__ import annotations
import asyncio
import json
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from langchain.agents.middleware.human_in_the_loop import (
ActionRequest,
HITLRequest,
HITLResponse,
)
from langchain_core.messages import HumanMessage, ToolMessage
from langgraph.types import Command, Interrupt
from pydantic import TypeAdapter, ValidationError
from deepagents_cli.file_ops import FileOpTracker
from deepagents_cli.image_utils import create_multimodal_content
from deepagents_cli.input import ImageTracker, parse_file_mentions
from deepagents_cli.ui import format_tool_display, format_tool_message_content
from deepagents_cli.widgets.messages import (
AssistantMessage,
DiffMessage,
ErrorMessage,
SystemMessage,
ToolCallMessage,
)
if TYPE_CHECKING:
from collections.abc import Callable
_HITL_REQUEST_ADAPTER = TypeAdapter(HITLRequest)
def _is_summarization_chunk(metadata: dict | None) -> bool:
"""Check if a message chunk is from summarization middleware.
Args:
metadata: The metadata dict from the stream chunk.
Returns:
Whether the chunk is from summarization and should be filtered.
"""
if metadata is None:
return False
return metadata.get("lc_source") == "summarization"
class TextualUIAdapter:
"""Adapter for rendering agent output to Textual widgets.
This adapter provides an abstraction layer between the agent execution
and the Textual UI, allowing streaming output to be rendered as widgets.
"""
def __init__(
self,
mount_message: Callable,
update_status: Callable[[str], None],
request_approval: Callable, # async callable returning Future
on_auto_approve_enabled: Callable[[], None] | None = None,
scroll_to_bottom: Callable[[], None] | None = None,
) -> None:
"""Initialize the adapter.
Args:
mount_message: Async callable to mount a message widget
update_status: Callable to update the status bar message
request_approval: Callable that returns a Future for HITL approval
on_auto_approve_enabled: Callback when auto-approve is enabled
scroll_to_bottom: Callback to scroll chat to bottom
"""
self._mount_message = mount_message
self._update_status = update_status
self._request_approval = request_approval
self._on_auto_approve_enabled = on_auto_approve_enabled
self._scroll_to_bottom = scroll_to_bottom
# State tracking
self._current_assistant_message: AssistantMessage | None = None
self._current_tool_messages: dict[str, ToolCallMessage] = {}
self._pending_text = ""
self._token_tracker: Any = None
def set_token_tracker(self, tracker: Any) -> None:
"""Set the token tracker for usage tracking."""
self._token_tracker = tracker
async def execute_task_textual(
user_input: str,
agent: Any,
assistant_id: str | None,
session_state: Any,
adapter: TextualUIAdapter,
backend: Any = None,
image_tracker: ImageTracker | None = None,
) -> None:
"""Execute a task with output directed to Textual UI.
This is the Textual-compatible version of execute_task() that uses
the TextualUIAdapter for all UI operations.
Args:
user_input: The user's input message
agent: The LangGraph agent to execute
assistant_id: The agent identifier
session_state: Session state with auto_approve flag
adapter: The TextualUIAdapter for UI operations
backend: Optional backend for file operations
image_tracker: Optional tracker for images
"""
# Parse file mentions and inject content if any
prompt_text, mentioned_files = parse_file_mentions(user_input)
# Max file size to embed inline (256KB, matching mistral-vibe)
# Larger files get a reference instead - use read_file tool to view them
max_embed_bytes = 256 * 1024
if mentioned_files:
context_parts = [prompt_text, "\n\n## Referenced Files\n"]
for file_path in mentioned_files:
try:
file_size = file_path.stat().st_size
if file_size > max_embed_bytes:
# File too large - include reference instead of content
size_kb = file_size // 1024
context_parts.append(
f"\n### {file_path.name}\n"
f"Path: `{file_path}`\n"
f"Size: {size_kb}KB (too large to embed, use read_file tool to view)"
)
else:
content = file_path.read_text()
context_parts.append(
f"\n### {file_path.name}\nPath: `{file_path}`\n```\n{content}\n```"
)
except Exception as e:
context_parts.append(f"\n### {file_path.name}\n[Error reading file: {e}]")
final_input = "\n".join(context_parts)
else:
final_input = prompt_text
# Include images in the message content
images_to_send = []
if image_tracker:
images_to_send = image_tracker.get_images()
if images_to_send:
message_content = create_multimodal_content(final_input, images_to_send)
else:
message_content = final_input
thread_id = session_state.thread_id
config = {
"configurable": {"thread_id": thread_id},
"metadata": {
"assistant_id": assistant_id,
"agent_name": assistant_id,
"updated_at": datetime.now(UTC).isoformat(),
}
if assistant_id
else {},
}
captured_input_tokens = 0
captured_output_tokens = 0
# Update status to show thinking
adapter._update_status("Agent is thinking...")
# Hide token display during streaming (will be shown with accurate count at end)
if adapter._token_tracker:
adapter._token_tracker.hide()
file_op_tracker = FileOpTracker(assistant_id=assistant_id, backend=backend)
displayed_tool_ids: set[str] = set()
tool_call_buffers: dict[str | int, dict] = {}
# Track pending text and assistant messages PER NAMESPACE to avoid interleaving
# when multiple subagents stream in parallel
pending_text_by_namespace: dict[tuple, str] = {}
assistant_message_by_namespace: dict[tuple, Any] = {}
# Clear images from tracker after creating the message
if image_tracker:
image_tracker.clear()
stream_input: dict | Command = {"messages": [{"role": "user", "content": message_content}]}
try:
while True:
interrupt_occurred = False
hitl_response: dict[str, HITLResponse] = {}
suppress_resumed_output = False
pending_interrupts: dict[str, HITLRequest] = {}
async for chunk in agent.astream(
stream_input,
stream_mode=["messages", "updates"],
subgraphs=True,
config=config,
durability="exit",
):
if not isinstance(chunk, tuple) or len(chunk) != 3:
continue
namespace, current_stream_mode, data = chunk
# Convert namespace to hashable tuple for dict keys
ns_key = tuple(namespace) if namespace else ()
# Filter out subagent outputs - only show main agent (empty namespace)
# Subagents run via Task tool and should only report back to the main agent
is_main_agent = ns_key == ()
# Handle UPDATES stream - for interrupts and todos
if current_stream_mode == "updates":
if not isinstance(data, dict):
continue
# Check for interrupts
if "__interrupt__" in data:
interrupts: list[Interrupt] = data["__interrupt__"]
if interrupts:
for interrupt_obj in interrupts:
try:
validated_request = _HITL_REQUEST_ADAPTER.validate_python(
interrupt_obj.value
)
pending_interrupts[interrupt_obj.id] = validated_request
interrupt_occurred = True
except ValidationError:
raise
# Check for todo updates (not yet implemented in Textual UI)
chunk_data = next(iter(data.values())) if data else None
if chunk_data and isinstance(chunk_data, dict) and "todos" in chunk_data:
pass # Future: render todo list widget
# Handle MESSAGES stream - for content and tool calls
elif current_stream_mode == "messages":
# Skip subagent outputs - only render main agent content in chat
if not is_main_agent:
continue
if not isinstance(data, tuple) or len(data) != 2:
continue
message, _metadata = data
# Filter out summarization LLM output & update status to reflect
if _is_summarization_chunk(_metadata):
adapter._update_status("Summarizing conversation...")
continue
if isinstance(message, HumanMessage):
content = message.text
# Flush pending text for this namespace
pending_text = pending_text_by_namespace.get(ns_key, "")
if content and pending_text:
await _flush_assistant_text_ns(
adapter, pending_text, ns_key, assistant_message_by_namespace
)
pending_text_by_namespace[ns_key] = ""
continue
if isinstance(message, ToolMessage):
tool_name = getattr(message, "name", "")
tool_status = getattr(message, "status", "success")
tool_content = format_tool_message_content(message.content)
record = file_op_tracker.complete_with_message(message)
adapter._update_status("Agent is thinking...")
# Update tool call status with output
tool_id = getattr(message, "tool_call_id", None)
if tool_id and tool_id in adapter._current_tool_messages:
tool_msg = adapter._current_tool_messages[tool_id]
output_str = str(tool_content) if tool_content else ""
if tool_status == "success":
tool_msg.set_success(output_str)
else:
tool_msg.set_error(output_str or "Error")
# Clean up - remove from tracking dict after status update
del adapter._current_tool_messages[tool_id]
# Show shell errors
if tool_name == "shell" and tool_status != "success":
pending_text = pending_text_by_namespace.get(ns_key, "")
if pending_text:
await _flush_assistant_text_ns(
adapter, pending_text, ns_key, assistant_message_by_namespace
)
pending_text_by_namespace[ns_key] = ""
if tool_content:
await adapter._mount_message(ErrorMessage(str(tool_content)))
# Show file operation results - always show diffs in chat
if record:
pending_text = pending_text_by_namespace.get(ns_key, "")
if pending_text:
await _flush_assistant_text_ns(
adapter, pending_text, ns_key, assistant_message_by_namespace
)
pending_text_by_namespace[ns_key] = ""
if record.diff:
await adapter._mount_message(
DiffMessage(record.diff, record.display_path)
)
continue
# Extract token usage (before content_blocks check - usage may be on any chunk)
if adapter._token_tracker and hasattr(message, "usage_metadata"):
usage = message.usage_metadata
if usage:
# Use total_tokens which includes input + output
total_toks = usage.get("total_tokens", 0)
if total_toks:
captured_input_tokens = max(captured_input_tokens, total_toks)
else:
# Fallback to input + output if total not provided
input_toks = usage.get("input_tokens", 0)
output_toks = usage.get("output_tokens", 0)
if input_toks or output_toks:
total = input_toks + output_toks
captured_input_tokens = max(captured_input_tokens, total)
# Check if this is an AIMessageChunk with content
if not hasattr(message, "content_blocks"):
continue
# Process content blocks
for block in message.content_blocks:
block_type = block.get("type")
if block_type == "text":
text = block.get("text", "")
if text:
# Track accumulated text for reference
pending_text = pending_text_by_namespace.get(ns_key, "")
pending_text += text
pending_text_by_namespace[ns_key] = pending_text
# Get or create assistant message for this namespace
current_msg = assistant_message_by_namespace.get(ns_key)
if current_msg is None:
current_msg = AssistantMessage()
await adapter._mount_message(current_msg)
assistant_message_by_namespace[ns_key] = current_msg
# Anchor scroll once when message is created
# anchor() keeps scroll locked to bottom as content grows
if adapter._scroll_to_bottom:
adapter._scroll_to_bottom()
# Append just the new text chunk for smoother streaming
# (uses MarkdownStream internally for better performance)
await current_msg.append_content(text)
elif block_type in ("tool_call_chunk", "tool_call"):
chunk_name = block.get("name")
chunk_args = block.get("args")
chunk_id = block.get("id")
chunk_index = block.get("index")
buffer_key: str | int
if chunk_index is not None:
buffer_key = chunk_index
elif chunk_id is not None:
buffer_key = chunk_id
else:
buffer_key = f"unknown-{len(tool_call_buffers)}"
buffer = tool_call_buffers.setdefault(
buffer_key,
{"name": None, "id": None, "args": None, "args_parts": []},
)
if chunk_name:
buffer["name"] = chunk_name
if chunk_id:
buffer["id"] = chunk_id
if isinstance(chunk_args, dict):
buffer["args"] = chunk_args
buffer["args_parts"] = []
elif isinstance(chunk_args, str):
if chunk_args:
parts: list[str] = buffer.setdefault("args_parts", [])
if not parts or chunk_args != parts[-1]:
parts.append(chunk_args)
buffer["args"] = "".join(parts)
elif chunk_args is not None:
buffer["args"] = chunk_args
buffer_name = buffer.get("name")
buffer_id = buffer.get("id")
if buffer_name is None:
continue
parsed_args = buffer.get("args")
if isinstance(parsed_args, str):
if not parsed_args:
continue
try:
parsed_args = json.loads(parsed_args)
except json.JSONDecodeError:
continue
elif parsed_args is None:
continue
if not isinstance(parsed_args, dict):
parsed_args = {"value": parsed_args}
# Flush pending text before tool call
pending_text = pending_text_by_namespace.get(ns_key, "")
if pending_text:
await _flush_assistant_text_ns(
adapter, pending_text, ns_key, assistant_message_by_namespace
)
pending_text_by_namespace[ns_key] = ""
assistant_message_by_namespace.pop(ns_key, None)
if buffer_id is not None and buffer_id not in displayed_tool_ids:
displayed_tool_ids.add(buffer_id)
file_op_tracker.start_operation(buffer_name, parsed_args, buffer_id)
# Mount tool call message
tool_msg = ToolCallMessage(buffer_name, parsed_args)
await adapter._mount_message(tool_msg)
adapter._current_tool_messages[buffer_id] = tool_msg
tool_call_buffers.pop(buffer_key, None)
display_str = format_tool_display(buffer_name, parsed_args)
adapter._update_status(f"Executing {display_str}...")
if getattr(message, "chunk_position", None) == "last":
pending_text = pending_text_by_namespace.get(ns_key, "")
if pending_text:
await _flush_assistant_text_ns(
adapter, pending_text, ns_key, assistant_message_by_namespace
)
pending_text_by_namespace[ns_key] = ""
assistant_message_by_namespace.pop(ns_key, None)
# Flush any remaining text from all namespaces
for ns_key, pending_text in list(pending_text_by_namespace.items()):
if pending_text:
await _flush_assistant_text_ns(
adapter, pending_text, ns_key, assistant_message_by_namespace
)
pending_text_by_namespace.clear()
assistant_message_by_namespace.clear()
# Handle HITL after stream completes
if interrupt_occurred:
any_rejected = False
for interrupt_id, hitl_request in pending_interrupts.items():
if session_state.auto_approve:
# Auto-approve silently (user sees tool calls already)
decisions = [{"type": "approve"} for _ in hitl_request["action_requests"]]
hitl_response[interrupt_id] = {"decisions": decisions}
else:
# Request approval via adapter
decisions = []
def mark_hitl_approved(action_request: ActionRequest) -> None:
tool_name = action_request.get("name")
if tool_name not in {"write_file", "edit_file"}:
return
args = action_request.get("args", {})
if isinstance(args, dict):
file_op_tracker.mark_hitl_approved(tool_name, args)
for action_request in hitl_request["action_requests"]:
future = await adapter._request_approval(action_request, assistant_id)
decision = await future
# Check for auto-approve-all
if (
isinstance(decision, dict)
and decision.get("type") == "auto_approve_all"
):
session_state.auto_approve = True
if adapter._on_auto_approve_enabled:
adapter._on_auto_approve_enabled()
decisions.append({"type": "approve"})
mark_hitl_approved(action_request)
# Approve remaining actions
for _ in hitl_request["action_requests"][len(decisions) :]:
decisions.append({"type": "approve"})
break
decisions.append(decision)
# Try multiple keys for tool call id
tool_id = (
action_request.get("id")
or action_request.get("tool_call_id")
or action_request.get("call_id")
)
tool_name = action_request.get("name", "")
# Find matching tool message - by id or by name as fallback
tool_msg = None
tool_msg_key = None # Track key for cleanup
if tool_id and tool_id in adapter._current_tool_messages:
tool_msg = adapter._current_tool_messages[tool_id]
tool_msg_key = tool_id
elif tool_name:
# Fallback: find last tool message with matching name
for key, msg in reversed(
list(adapter._current_tool_messages.items())
):
if msg._tool_name == tool_name:
tool_msg = msg
tool_msg_key = key
break
if isinstance(decision, dict) and decision.get("type") == "approve":
mark_hitl_approved(action_request)
# Don't call set_success here - wait for actual tool output
# The ToolMessage handler will update with real results
elif isinstance(decision, dict) and decision.get("type") == "reject":
if tool_msg:
tool_msg.set_rejected()
# Only remove from tracking on reject (approved tools need output update)
if tool_msg_key and tool_msg_key in adapter._current_tool_messages:
del adapter._current_tool_messages[tool_msg_key]
if any(d.get("type") == "reject" for d in decisions):
any_rejected = True
hitl_response[interrupt_id] = {"decisions": decisions}
suppress_resumed_output = any_rejected
if interrupt_occurred and hitl_response:
if suppress_resumed_output:
await adapter._mount_message(
SystemMessage("Command rejected. Tell the agent what you'd like instead.")
)
return
stream_input = Command(resume=hitl_response)
else:
break
except asyncio.CancelledError:
adapter._update_status("Interrupted")
# Mark any pending tools as rejected
for tool_msg in list(adapter._current_tool_messages.values()):
tool_msg.set_rejected()
adapter._current_tool_messages.clear()
await adapter._mount_message(SystemMessage("Interrupted by user"))
# Append cancellation message to agent state so LLM knows what happened
# This preserves context rather than rolling back
try:
cancellation_msg = HumanMessage(
content="[SYSTEM] Task interrupted by user. Previous operation was cancelled."
)
await agent.aupdate_state(config, {"messages": [cancellation_msg]})
except Exception: # noqa: S110
pass # State update is best-effort
# Report tokens even on interrupt (or restore display if none captured)
if adapter._token_tracker:
if captured_input_tokens or captured_output_tokens:
adapter._token_tracker.add(captured_input_tokens, captured_output_tokens)
else:
adapter._token_tracker.show() # Restore previous value
return
except KeyboardInterrupt:
adapter._update_status("Interrupted")
# Mark any pending tools as rejected
for tool_msg in list(adapter._current_tool_messages.values()):
tool_msg.set_rejected()
adapter._current_tool_messages.clear()
await adapter._mount_message(SystemMessage("Interrupted by user"))
# Append cancellation message to agent state
try:
cancellation_msg = HumanMessage(
content="[SYSTEM] Task interrupted by user. Previous operation was cancelled."
)
await agent.aupdate_state(config, {"messages": [cancellation_msg]})
except Exception: # noqa: S110
pass # State update is best-effort
# Report tokens even on interrupt (or restore display if none captured)
if adapter._token_tracker:
if captured_input_tokens or captured_output_tokens:
adapter._token_tracker.add(captured_input_tokens, captured_output_tokens)
else:
adapter._token_tracker.show() # Restore previous value
return
adapter._update_status("Ready")
# Update token tracker
if adapter._token_tracker and (captured_input_tokens or captured_output_tokens):
adapter._token_tracker.add(captured_input_tokens, captured_output_tokens)
async def _flush_assistant_text_ns(
adapter: TextualUIAdapter,
text: str,
ns_key: tuple,
assistant_message_by_namespace: dict[tuple, Any],
) -> None:
"""Flush accumulated assistant text for a specific namespace.
Finalizes the streaming by stopping the MarkdownStream.
If no message exists yet, creates one with the full content.
"""
if not text.strip():
return
current_msg = assistant_message_by_namespace.get(ns_key)
if current_msg is None:
# No message was created during streaming - create one with full content
current_msg = AssistantMessage(text)
await adapter._mount_message(current_msg)
await current_msg.write_initial_content()
assistant_message_by_namespace[ns_key] = current_msg
else:
# Stop the stream to finalize the content
await current_msg.stop_stream()
```
### tools.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/tools.py`
```python
"""Custom tools for the CLI agent."""
from typing import Any, Literal
import requests
from markdownify import markdownify
from tavily import TavilyClient
from deepagents_cli.config import settings
# Initialize Tavily client if API key is available
tavily_client = TavilyClient(api_key=settings.tavily_api_key) if settings.has_tavily else None
def http_request(
url: str,
method: str = "GET",
headers: dict[str, str] | None = None,
data: str | dict | None = None,
params: dict[str, str] | None = None,
timeout: int = 30,
) -> dict[str, Any]:
"""Make HTTP requests to APIs and web services.
Args:
url: Target URL
method: HTTP method (GET, POST, PUT, DELETE, etc.)
headers: HTTP headers to include
data: Request body data (string or dict)
params: URL query parameters
timeout: Request timeout in seconds
Returns:
Dictionary with response data including status, headers, and content
"""
try:
kwargs = {"url": url, "method": method.upper(), "timeout": timeout}
if headers:
kwargs["headers"] = headers
if params:
kwargs["params"] = params
if data:
if isinstance(data, dict):
kwargs["json"] = data
else:
kwargs["data"] = data
response = requests.request(**kwargs)
try:
content = response.json()
except:
content = response.text
return {
"success": response.status_code < 400,
"status_code": response.status_code,
"headers": dict(response.headers),
"content": content,
"url": response.url,
}
except requests.exceptions.Timeout:
return {
"success": False,
"status_code": 0,
"headers": {},
"content": f"Request timed out after {timeout} seconds",
"url": url,
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"status_code": 0,
"headers": {},
"content": f"Request error: {e!s}",
"url": url,
}
except Exception as e:
return {
"success": False,
"status_code": 0,
"headers": {},
"content": f"Error making request: {e!s}",
"url": url,
}
def web_search(
query: str,
max_results: int = 5,
topic: Literal["general", "news", "finance"] = "general",
include_raw_content: bool = False,
):
"""Search the web using Tavily for current information and documentation.
This tool searches the web and returns relevant results. After receiving results,
you MUST synthesize the information into a natural, helpful response for the user.
Args:
query: The search query (be specific and detailed)
max_results: Number of results to return (default: 5)
topic: Search topic type - "general" for most queries, "news" for current events
include_raw_content: Include full page content (warning: uses more tokens)
Returns:
Dictionary containing:
- results: List of search results, each with:
- title: Page title
- url: Page URL
- content: Relevant excerpt from the page
- score: Relevance score (0-1)
- query: The original search query
IMPORTANT: After using this tool:
1. Read through the 'content' field of each result
2. Extract relevant information that answers the user's question
3. Synthesize this into a clear, natural language response
4. Cite sources by mentioning the page titles or URLs
5. NEVER show the raw JSON to the user - always provide a formatted response
"""
if tavily_client is None:
return {
"error": "Tavily API key not configured. Please set TAVILY_API_KEY environment variable.",
"query": query,
}
try:
return tavily_client.search(
query,
max_results=max_results,
include_raw_content=include_raw_content,
topic=topic,
)
except Exception as e:
return {"error": f"Web search error: {e!s}", "query": query}
def fetch_url(url: str, timeout: int = 30) -> dict[str, Any]:
"""Fetch content from a URL and convert HTML to markdown format.
This tool fetches web page content and converts it to clean markdown text,
making it easy to read and process HTML content. After receiving the markdown,
you MUST synthesize the information into a natural, helpful response for the user.
Args:
url: The URL to fetch (must be a valid HTTP/HTTPS URL)
timeout: Request timeout in seconds (default: 30)
Returns:
Dictionary containing:
- success: Whether the request succeeded
- url: The final URL after redirects
- markdown_content: The page content converted to markdown
- status_code: HTTP status code
- content_length: Length of the markdown content in characters
IMPORTANT: After using this tool:
1. Read through the markdown content
2. Extract relevant information that answers the user's question
3. Synthesize this into a clear, natural language response
4. NEVER show the raw markdown to the user unless specifically requested
"""
try:
response = requests.get(
url,
timeout=timeout,
headers={"User-Agent": "Mozilla/5.0 (compatible; DeepAgents/1.0)"},
)
response.raise_for_status()
# Convert HTML content to markdown
markdown_content = markdownify(response.text)
return {
"url": str(response.url),
"markdown_content": markdown_content,
"status_code": response.status_code,
"content_length": len(markdown_content),
}
except Exception as e:
return {"error": f"Fetch URL error: {e!s}", "url": url}
```
### ui.py
Source: `/a0/tmp/skills_research/langchain/libs/deepagents-cli/deepagents_cli/ui.py`
```python
"""UI rendering and display utilities for the CLI."""
import json
from pathlib import Path
from typing import Any
from .config import COLORS, DEEP_AGENTS_ASCII, MAX_ARG_LENGTH, console
def truncate_value(value: str, max_length: int = MAX_ARG_LENGTH) -> str:
"""Truncate a string value if it exceeds max_length."""
if len(value) > max_length:
return value[:max_length] + "..."
return value
def format_tool_display(tool_name: str, tool_args: dict) -> str:
"""Format tool calls for display with tool-specific smart formatting.
Shows the most relevant information for each tool type rather than all arguments.
Args:
tool_name: Name of the tool being called
tool_args: Dictionary of tool arguments
Returns:
Formatted string for display (e.g., "read_file(config.py)")
Examples:
read_file(path="/long/path/file.py") → "read_file(file.py)"
web_search(query="how to code", max_results=5) → 'web_search("how to code")'
shell(command="pip install foo") → 'shell("pip install foo")'
"""
def abbreviate_path(path_str: str, max_length: int = 60) -> str:
"""Abbreviate a file path intelligently - show basename or relative path."""
try:
path = Path(path_str)
# If it's just a filename (no directory parts), return as-is
if len(path.parts) == 1:
return path_str
# Try to get relative path from current working directory
try:
rel_path = path.relative_to(Path.cwd())
rel_str = str(rel_path)
# Use relative if it's shorter and not too long
if len(rel_str) < len(path_str) and len(rel_str) <= max_length:
return rel_str
except (ValueError, Exception):
pass
# If absolute path is reasonable length, use it
if len(path_str) <= max_length:
return path_str
# Otherwise, just show basename (filename only)
return path.name
except Exception:
# Fallback to original string if any error
return truncate_value(path_str, max_length)
# Tool-specific formatting - show the most important argument(s)
if tool_name in ("read_file", "write_file", "edit_file"):
# File operations: show the primary file path argument (file_path or path)
path_value = tool_args.get("file_path")
if path_value is None:
path_value = tool_args.get("path")
if path_value is not None:
path = abbreviate_path(str(path_value))
return f"{tool_name}({path})"
elif tool_name == "web_search":
# Web search: show the query string
if "query" in tool_args:
query = str(tool_args["query"])
query = truncate_value(query, 100)
return f'{tool_name}("{query}")'
elif tool_name == "grep":
# Grep: show the search pattern
if "pattern" in tool_args:
pattern = str(tool_args["pattern"])
pattern = truncate_value(pattern, 70)
return f'{tool_name}("{pattern}")'
elif tool_name == "shell":
# Shell: show the command being executed
if "command" in tool_args:
command = str(tool_args["command"])
command = truncate_value(command, 120)
return f'{tool_name}("{command}")'
elif tool_name == "ls":
# ls: show directory, or empty if current directory
if tool_args.get("path"):
path = abbreviate_path(str(tool_args["path"]))
return f"{tool_name}({path})"
return f"{tool_name}()"
elif tool_name == "glob":
# Glob: show the pattern
if "pattern" in tool_args:
pattern = str(tool_args["pattern"])
pattern = truncate_value(pattern, 80)
return f'{tool_name}("{pattern}")'
elif tool_name == "http_request":
# HTTP: show method and URL
parts = []
if "method" in tool_args:
parts.append(str(tool_args["method"]).upper())
if "url" in tool_args:
url = str(tool_args["url"])
url = truncate_value(url, 80)
parts.append(url)
if parts:
return f"{tool_name}({' '.join(parts)})"
elif tool_name == "fetch_url":
# Fetch URL: show the URL being fetched
if "url" in tool_args:
url = str(tool_args["url"])
url = truncate_value(url, 80)
return f'{tool_name}("{url}")'
elif tool_name == "task":
# Task: show the task description
if "description" in tool_args:
desc = str(tool_args["description"])
desc = truncate_value(desc, 100)
return f'{tool_name}("{desc}")'
elif tool_name == "write_todos":
# Todos: show count of items
if "todos" in tool_args and isinstance(tool_args["todos"], list):
count = len(tool_args["todos"])
return f"{tool_name}({count} items)"
# Fallback: generic formatting for unknown tools
# Show all arguments in key=value format
args_str = ", ".join(f"{k}={truncate_value(str(v), 50)}" for k, v in tool_args.items())
return f"{tool_name}({args_str})"
def format_tool_message_content(content: Any) -> str:
"""Convert ToolMessage content into a printable string."""
if content is None:
return ""
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, str):
parts.append(item)
else:
try:
parts.append(json.dumps(item))
except Exception:
parts.append(str(item))
return "\n".join(parts)
return str(content)
def show_help() -> None:
"""Show help information."""
console.print()
console.print(DEEP_AGENTS_ASCII, style=f"bold {COLORS['primary']}")
console.print()
console.print("[bold]Usage:[/bold]", style=COLORS["primary"])
console.print(" deepagents [OPTIONS] Start interactive session")
console.print(" deepagents list List all available agents")
console.print(" deepagents reset --agent AGENT Reset agent to default prompt")
console.print(
" deepagents reset --agent AGENT --target SOURCE Reset agent to copy of another agent"
)
console.print(" deepagents help Show this help message")
console.print(" deepagents --version Show deepagents version")
console.print()
console.print("[bold]Options:[/bold]", style=COLORS["primary"])
console.print(" --agent NAME Agent identifier (default: agent)")
console.print(
" --model MODEL Model to use (e.g., claude-sonnet-4-5-20250929, gpt-4o)"
)
console.print(" --auto-approve Auto-approve tool usage without prompting")
console.print(
" --sandbox TYPE Remote sandbox for execution (modal, runloop, daytona)"
)
console.print(" --sandbox-id ID Reuse existing sandbox (skips creation/cleanup)")
console.print(
" -r, --resume [ID] Resume thread: -r for most recent, -r <ID> for specific"
)
console.print()
console.print("[bold]Examples:[/bold]", style=COLORS["primary"])
console.print(
" deepagents # Start with default agent", style=COLORS["dim"]
)
console.print(
" deepagents --agent mybot # Start with agent named 'mybot'",
style=COLORS["dim"],
)
console.print(
" deepagents --model gpt-4o # Use specific model (auto-detects provider)",
style=COLORS["dim"],
)
console.print(
" deepagents -r # Resume most recent session",
style=COLORS["dim"],
)
console.print(
" deepagents -r abc123 # Resume specific thread",
style=COLORS["dim"],
)
console.print(
" deepagents --auto-approve # Start with auto-approve enabled",
style=COLORS["dim"],
)
console.print(
" deepagents --sandbox runloop # Execute code in Runloop sandbox",
style=COLORS["dim"],
)
console.print()
console.print("[bold]Thread Management:[/bold]", style=COLORS["primary"])
console.print(
" deepagents threads list # List all sessions", style=COLORS["dim"]
)
console.print(
" deepagents threads delete <ID> # Delete a session", style=COLORS["dim"]
)
console.print()
console.print("[bold]Interactive Features:[/bold]", style=COLORS["primary"])
console.print(" Enter Submit your message", style=COLORS["dim"])
console.print(" Ctrl+J Insert newline", style=COLORS["dim"])
console.print(" Shift+Tab Toggle auto-approve mode", style=COLORS["dim"])
console.print(" @filename Auto-complete files and inject content", style=COLORS["dim"])
console.print(" /command Slash commands (/help, /clear, /quit)", style=COLORS["dim"])
console.print(" !command Run bash commands directly", style=COLORS["dim"])
console.print()
```