From Documentation to Pipelines: Building a Unified Tool Orchestration Framework
Parsing documentation into a standardized ToolSpec
The first step is to convert free-form tool documentation into a compact, machine-readable specification. This example defines a ToolSpec dataclass and a simple parser that extracts a short description plus input and output parameter hints from text blocks.
import re, json, time, random
from dataclasses import dataclass
from typing import Callable, Dict, Any, List, Tuple
@dataclass
class ToolSpec:
name: str
description: str
inputs: Dict[str, str]
outputs: Dict[str, str]
def parse_doc_to_spec(name: str, doc: str) -> ToolSpec:
desc = doc.strip().splitlines()[0].strip() if doc.strip() else name
arg_block = "n".join([l for l in doc.splitlines() if "--" in l or ":" in l])
inputs = {}
for line in arg_block.splitlines():
m = re.findall(r"(--?w[w-]*|bw+b)s*[:=]?s*(w+)?", line)
for key, typ in m:
k = key.lstrip("-")
if k and k not in inputs and k not in ["Returns","Output","Outputs"]:
inputs[k] = (typ or "str")
if not inputs: inputs = {"in": "str"}
return ToolSpec(name=name, description=desc, inputs=inputs, outputs={"out":"json"})
This parser is intentionally lightweight: it takes the first line as a description and tries to pick up argument lines that contain -- or :. If no inputs are found, it falls back to a default input named in. The resulting ToolSpec is a small, consistent representation that downstream systems can use.
Mock bioinformatics tool implementations
To demonstrate how parsed specs become callable tools, the tutorial provides compact Python functions that mimic common bioinformatics tools. Each returns a dictionary with predictable keys so they can be handled homogeneously by the orchestration layer.
def tool_fastqc(seq_fasta: str, min_len:int=30) -> Dict[str,Any]:
seqs = [s for s in re.split(r">[^n]*n", seq_fasta)[1:]]
lens = [len(re.sub(r"s+","",s)) for s in seqs]
q30 = sum(l>=min_len for l in lens)/max(1,len(lens))
gc = sum(c in "GCgc" for s in seqs for c in s)/max(1,sum(lens))
return {"n_seqs":len(lens),"len_mean":(sum(lens)/max(1,len(lens))),"pct_q30":q30,"gc":gc}
def tool_bowtie2_like(ref:str, reads:str, mode:str="end-to-end") -> Dict[str,Any]:
def revcomp(s):
t=str.maketrans("ACGTacgt","TGCAtgca"); return s.translate(t)[::-1]
reads_list=[r for r in re.split(r">[^n]*n", reads)[1:]]
ref_seq="".join(ref.splitlines()[1:])
hits=[]
for i,r in enumerate(reads_list):
rseq="".join(r.split())
aligned = (rseq in ref_seq) or (revcomp(rseq) in ref_seq)
hits.append({"read_id":i,"aligned":bool(aligned),"pos":ref_seq.find(rseq)})
return {"n":len(hits),"aligned":sum(h["aligned"] for h in hits),"mode":mode,"hits":hits}
def tool_bcftools_like(ref:str, alt:str, win:int=15) -> Dict[str,Any]:
ref_seq="".join(ref.splitlines()[1:]); alt_seq="".join(alt.splitlines()[1:])
n=min(len(ref_seq),len(alt_seq)); vars=[]
for i in range(n):
if ref_seq[i]!=alt_seq[i]: vars.append({"pos":i,"ref":ref_seq[i],"alt":alt_seq[i]})
return {"n_sites":n,"n_var":len(vars),"variants":vars[:win]}
FASTQC_DOC = """FastQC-like quality control for FASTA
--seq_fasta: str --min_len: int Outputs: json"""
BOWTIE_DOC = """Bowtie2-like aligner
--ref: str --reads: str --mode: str Outputs: json"""
BCF_DOC = """bcftools-like variant caller
--ref: str --alt: str --win: int Outputs: json"""
These toy tools implement basic behaviors: reading FASTA-like strings, computing summary statistics, searching for read alignments, and identifying variant sites. They are intentionally simple so that the focus remains on orchestration and interface consistency.
Tool registry and a minimal server API
A small registry stores ToolSpec objects and references to the callables. The server exposes register, list_tools, and call_tool operations, allowing dynamic registration and invocation.
@dataclass
class MCPTool:
spec: ToolSpec
fn: Callable[..., Dict[str,Any]]
class MCPServer:
def __init__(self): self.tools: Dict[str,MCPTool] = {}
def register(self, name:str, doc:str, fn:Callable[...,Dict[str,Any]]):
spec = parse_doc_to_spec(name, doc); self.tools[name]=MCPTool(spec, fn)
def list_tools(self) -> List[Dict[str,Any]]:
return [dict(name=t.spec.name, description=t.spec.description, inputs=t.spec.inputs, outputs=t.spec.outputs) for t in self.tools.values()]
def call_tool(self, name:str, args:Dict[str,Any]) -> Dict[str,Any]:
if name not in self.tools: raise KeyError(f"tool {name} not found")
spec = self.tools[name].spec
kwargs={k:args.get(k) for k in spec.inputs.keys()}
return self.tools[name].fn(**kwargs)
server=MCPServer()
server.register("fastqc", FASTQC_DOC, tool_fastqc)
server.register("bowtie2", BOWTIE_DOC, tool_bowtie2_like)
server.register("bcftools", BCF_DOC, tool_bcftools_like)
Task = Tuple[str, Dict[str,Any]]
PIPELINES = {
"rnaseq_qc_align_call":[
("fastqc", {"seq_fasta":"{reads}", "min_len":30}),
("bowtie2", {"ref":"{ref}", "reads":"{reads}", "mode":"end-to-end"}),
("bcftools", {"ref":"{ref}", "alt":"{alt}", "win":15}),
]
}
def compile_pipeline(nl_request:str) -> List[Task]:
key = "rnaseq_qc_align_call" if re.search(r"rna|qc|align|variant|call", nl_request, re.I) else "rnaseq_qc_align_call"
return PIPELINES[key]
The registry uses the parsed ToolSpec to determine which arguments to extract from a provided argument map before calling the underlying function. Pipelines are represented as sequences of (tool_name, args_template) tuples.
Generating test data, running pipelines, and benchmarking
The example generates small synthetic FASTA data and supplies it as context to a pipeline runner. A simple benchmarking harness measures runtimes for individual tools and the whole pipeline.
def mk_fasta(header:str, seq:str)->str: return f">{header}n{seq}n"
random.seed(0)
REF_SEQ="".join(random.choice("ACGT") for _ in range(300))
REF = mk_fasta("ref",REF_SEQ)
READS = mk_fasta("r1", REF_SEQ[50:130]) + mk_fasta("r2","ACGT"*15) + mk_fasta("r3", REF_SEQ[180:240])
ALT = mk_fasta("alt", REF_SEQ[:150] + "T" + REF_SEQ[151:])
def run_pipeline(nl:str, ctx:Dict[str,str]) -> Dict[str,Any]:
plan=compile_pipeline(nl); results=[]; t0=time.time()
for name, arg_tpl in plan:
args={k:(v.format(**ctx) if isinstance(v,str) else v) for k,v in arg_tpl.items()}
out=server.call_tool(name, args)
results.append({"tool":name,"args":args,"output":out})
return {"request":nl,"elapsed_s":round(time.time()-t0,4),"results":results}
def bench_individual() -> List[Dict[str,Any]]:
cases=[
("fastqc", {"seq_fasta":READS,"min_len":25}),
("bowtie2", {"ref":REF,"reads":READS,"mode":"end-to-end"}),
("bcftools", {"ref":REF,"alt":ALT,"win":10}),
]
rows=[]
for name,args in cases:
t0=time.time(); ok=True; err=None; out=None
try: out=server.call_tool(name,args)
except Exception as e: ok=False; err=str(e)
rows.append({"tool":name,"ok":ok,"ms":int((time.time()-t0)*1000),"out_keys":list(out.keys()) if ok else [],"err":err})
return rows
def bench_pipeline() -> Dict[str,Any]:
t0=time.time()
res=run_pipeline("Run RNA-seq QC, align, and variant call.", {"ref":REF,"reads":READS,"alt":ALT})
ok = all(step["output"] for step in res["results"])
return {"pipeline":"rnaseq_qc_align_call","ok":ok,"ms":int((time.time()-t0)*1000),"n_steps":len(res["results"])}
print("== TOOLS =="); print(json.dumps(server.list_tools(), indent=2))
print("n== INDIVIDUAL BENCH =="); print(json.dumps(bench_individual(), indent=2))
print("n== PIPELINE BENCH =="); print(json.dumps(bench_pipeline(), indent=2))
print("n== PIPELINE RUN =="); print(json.dumps(run_pipeline("Run RNA-seq QC, align, and variant call.", {"ref":REF,"reads":READS,"alt":ALT}), indent=2))
Running these snippets demonstrates that tools can be parsed from docs, registered, invoked, composed into pipelines, and benchmarked. The example shows how straightforward it is to standardize tool interfaces and automate multi-step workflows.