Task Manager#
BaseTaskManager#
BaseTaskManager is the main integration point in EAA. It owns:
the model object
the registered tools
long-term memory
SQLite persistence
graph compilation and execution
active transcript state
Construction options that matter most in practice are:
llm_configmemory_configtoolsskill_dirssession_db_pathuse_webuiuse_coding_toolsrun_codes_in_sandboxprune_checkpoints
Built-in graphs#
Chat graph#
build_chat_graph() creates the reusable conversation graph. It:
accepts bootstrap input or waits for user input
calls the model
executes any tool calls
returns to user input when the assistant produces a plain response
This is what backs run_conversation() and
run_conversation_from_checkpoint().
Feedback-loop graph#
build_feedback_loop_graph() creates the iterative tool-driving graph. It:
starts from an
initial_promptexpects tool calls on each model turn
optionally reprompts the model if the turn is invalid
executes tool calls and injects follow-up messages
supports human gates via
NEED HUMANand termination viaTERMINATE
This is what backs run_feedback_loop() and
run_feedback_loop_from_checkpoint().
Customization#
Custom graph#
build_task_graph() is the hook for a task-manager-specific LangGraph
workflow. The base class returns None; if you want a custom graph, override
it and implement run() or run_from_checkpoint() around it.
from langgraph.graph import END, START, StateGraph
from eaa_core.task_manager.base import BaseTaskManager
from eaa_core.task_manager.state import TaskManagerState
class CustomState(TaskManagerState):
pass
class CustomTaskManager(BaseTaskManager):
def build_task_graph(self, checkpointer=None):
builder = StateGraph(CustomState)
builder.add_node("call_model", self.node_factory.call_model, input_schema=CustomState)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", END)
return builder.compile(checkpointer=checkpointer)
def run(self):
initial_state = CustomState(
messages=list(self.context),
full_history=list(self.full_history),
)
graph = self.task_graph
graph_kwargs = {}
if self.session_db_path is not None:
graph, checkpoint_config, _ = self.get_checkpointed_graph(
"task_graph",
load_state=False,
)
self.task_graph = graph
graph_kwargs["config"] = checkpoint_config
final_state = graph.invoke(initial_state, **graph_kwargs)
self.set_active_state(CustomState.model_validate(final_state), "task_graph")
- Status note:
The repository currently does not ship built-in task-manager subclasses that define their own
task_graph. The custom graph hook is present for extension work.
Custom workflow without a graph#
Several analytical task managers in the repository do not rely on a custom LangGraph workflow. Instead, they orchestrate the experiment directly in Python and keep the shared transcript/WebUI updated through task-manager helpers.
The relevant helpers are:
record_system_message()for narrative progress updatesupdate_message_history()for explicit transcript mutationsadd_webui_message_to_db()for display-only WebUI messagesrun_conversation()if control should fall back to free-form chat after the analytical workflow finishes
This pattern is used heavily in
AnalyticalScanningMicroscopeFocusingTaskManager: the code performs explicit
steps, emits status messages as it goes, and can still enter chat mode at the
end.
Checkpointing and resume#
Checkpointing uses the same SQLite database referenced by session_db_path by
default. The run_*_from_checkpoint() methods also accept a
checkpoint_db_path override when you need to resume from a different SQLite
file. The checkpoint database can hold:
LangGraph checkpoints
explicit WebUI display messages
queued WebUI input
WebUI status flags
The base task manager exposes three resume paths:
run_conversation_from_checkpoint()run_feedback_loop_from_checkpoint()run_from_checkpoint()for managers that implementtask_graph
Important details:
prune_checkpoints=Truekeeps only the newest checkpoint per graph threadchat, feedback loop, and task graph each get their own checkpoint thread id
the WebUI can read the transcript from explicit WebUI messages or, if needed, directly from the latest checkpointed state