Agentic Map-Reduce
Overview
Agentic map-reduce runs a tool-using agent over a body of work in parallel and reduces the per-item results into a single answer. You give it a corpus (documents, files, a DataFrame, or a large text) and a task in natural language; LOTUS plans how to split the work, runs one agent per shard in parallel (each with tools, including a sandboxed Python REPL), and aggregates the results.
It is the right tool when a task is decomposable over breadth — analyzing every file in a codebase, processing a large batch of documents, or computing over many records — and where the per-item work benefits from an agent that can call tools (compute, parse, look things up) rather than a single LLM call.
Motivation
A plain semantic operator applies one model call per row. Some tasks need more: the model
must do something per item (run code, compute an exact value, parse a file) and then the
results must be combined. Agentic map-reduce gives each item a full tool-calling agent and
then reduces the findings, while keeping the interface as simple as a single task.
How it works
The pipeline has four stages:
Plan — from your
task, a planner derives the per-shardmapinstruction, thereduceinstruction, and how to shard and parallelize the corpus. You can override any of these.Shard — the corpus is split into bounded batches (one or more units per shard).
Map — one agent per shard runs in parallel. Each agent sees only its shard and can call the available tools (e.g. the Python REPL) in a loop until it produces a finding.
Reduce — the per-shard findings are aggregated into one result. The reducer has the same tools, so numeric or otherwise deterministic aggregation is computed rather than done by hand.
Tool usage is handled transparently: the available tools are described to the agents in a
system-generated prompt, so your task never has to mention them.
Basic Example
import lotus
from lotus.models import LM
from lotus.tools import PythonREPLTool
lotus.settings.configure(lm=LM(model="gpt-4o-mini"))
reports = [
"Q1 travel: flights 420.50, hotel 610.00, meals 133.25.",
"Q1 software: licenses 1200.00, cloud 348.75, monitoring 99.00.",
"Q1 office: desks 890.00, chairs 445.50, supplies 76.20.",
]
corpus = lotus.Corpus.from_documents(reports)
result = corpus.agentic_map_reduce(
task=(
"Each document is an expense report with line items. Compute the exact total "
"for the report. Then summarize the grand total and highest-spending category."
),
tools=[PythonREPLTool()],
)
print(result.output) # the reduced answer
print(result.findings) # per-shard results
print(result.plan) # the derived plan (map/reduce/sharding)
print(result.usage) # token usage
The Corpus
A Corpus normalizes many input forms into shardable units:
lotus.Corpus.from_documents(["doc one", "doc two"]) # in-memory documents
lotus.Corpus.from_files("repo/**/*.py") # files / globs (a codebase)
lotus.Corpus.from_dataframe(df, content_cols=["text"])# tabular rows
lotus.Corpus.from_text(big_string, chunk_chars=4000) # one large document, chunked
Specifying the work
You normally provide only a task; the planner derives the map and reduce steps. For
full control, override them — the planner fills in whatever you leave out:
corpus.agentic_map_reduce(
task="Find every use of the deprecated API foo() and rank by risk.",
map="Report each call to foo() in this shard with file:line and a risk note.",
reduce="Merge and prioritize the per-shard findings into one report.",
)
Tools
Each agent has access to the tools you pass. A sandboxed Python REPL is provided:
from lotus.tools import PythonREPLTool
# Local subprocess sandbox by default (no extra infra); pass a Docker sandbox for
# stronger isolation.
repl = PythonREPLTool()
You can define your own tools with a decorator or a subclass:
from lotus.tools import tool, Tool
from pydantic import BaseModel, Field
@tool(description="Add two integers and return the sum.")
def add(a: int, b: int) -> str:
return str(a + b)
class FileReadArgs(BaseModel):
filename: str = Field(..., description="Name of the file to read.")
class FileReadTool(Tool):
name = "file_read"
description = "Read a file from the sandbox filesystem."
args_schema = FileReadArgs
def run(self, filename: str) -> str:
...
corpus.agentic_map_reduce(task="...", tools=[repl, add, FileReadTool()])
Result
agentic_map_reduce returns a Result with:
output: the reduced final answer.findings: the list of per-shard results (before reduction).plan: thePlanused —map_instruction,reduce_instruction,segmentation,shard_size,parallelism.usage: aggregated token usage.
Parameters
task: The natural-language objective. The only required input.tools: Tools available to each agent (e.g.[PythonREPLTool()]).map/reduce: Optional overrides for the derived instructions.plan:"auto"(default) to let the planner decide, or an explicitPlan.max_parallelism: Cap on concurrent agents (default"auto").max_steps: Tool-calling steps allowed per agent (default6).lm: Override the configured language model for this call.
Note
Agentic map-reduce is designed for read/compute-only fan-out. It is best for tasks that decompose over breadth; a single deep-judgment task is better served by a single operator call.