Hands-On: Build a Multi‑Agent Marketplace with uAgents
Overview
This tutorial walks through building a compact multi-agent marketplace using the uAgents framework. Three agents — Directory, Seller, and Buyer — exchange structured messages and perform asynchronous interactions to simulate discovery, negotiation, and transactions in a decentralized environment.
Install and define message models
Install uAgents and declare the message models that form the contract between agents.
!pip -q install "uagents>=0.11.2"
import asyncio, random
from typing import List, Dict, Optional
from uagents import Agent, Context, Bureau, Model, Protocol
class ServiceAnnounce(Model):
category: str
endpoint: str
class ServiceQuery(Model):
category: str
class ServiceList(Model):
addresses: List[str]
class OfferRequest(Model):
item: str
max_price: int
class Offer(Model):
item: str
price: int
qty: int
class Order(Model):
item: str
qty: int
class Receipt(Model):
item: str
qty: int
total: int
ok: bool
note: Optional[str] = None
These models structure announcements, queries, offers, orders, and receipts so agents can exchange typed messages reliably.
Protocols and agent registration
Define protocols and wire agents into them. The Directory agent handles service registration and lookup; other agents include the appropriate protocols to participate.
registry_proto = Protocol(name="registry", version="1.0")
trade_proto = Protocol(name="trade", version="1.0")
directory = Agent(name="directory", seed="dir-seed-001")
seller = Agent(name="seller", seed="seller-seed-001")
buyer = Agent(name="buyer", seed="buyer-seed-001")
directory.include(registry_proto)
seller.include(trade_proto)
buyer.include(registry_proto)
buyer.include(trade_proto)
@registry_proto.on_message(model=ServiceAnnounce)
async def on_announce(ctx: Context, sender: str, msg: ServiceAnnounce):
reg = await ctx.storage.get("reg") or {}
reg.setdefault(msg.category, set()).add(sender)
await ctx.storage.set("reg", reg)
ctx.logger.info(f"Registered {sender} under '{msg.category}'")
@registry_proto.on_message(model=ServiceQuery)
async def on_query(ctx: Context, sender: str, msg: ServiceQuery):
reg = await ctx.storage.get("reg") or {}
addrs = sorted(list(reg.get(msg.category, set())))
await ctx.send(sender, ServiceList(addresses=addrs))
ctx.logger.info(f"Returned {len(addrs)} providers for '{msg.category}'")
The Directory persists a registry mapping categories to provider addresses and responds to queries with a ServiceList.
Seller: catalog, offers, and order processing
The seller maintains a catalog and implements trade handlers for offer requests and orders. Pricing can vary, and stock is tracked to enforce availability.
CATALOG: Dict[str, Dict[str, int]] = {
"camera": {"price": 120, "qty": 3},
"laptop": {"price": 650, "qty": 2},
"headphones": {"price": 60, "qty": 5},
}
@seller.on_event("startup")
async def seller_start(ctx: Context):
await ctx.send(directory.address, ServiceAnnounce(category="electronics", endpoint=seller.address))
ctx.logger.info("Seller announced to directory")
@trade_proto.on_message(model=OfferRequest)
async def on_offer_request(ctx: Context, sender: str, req: OfferRequest):
item = CATALOG.get(req.item)
if not item:
await ctx.send(sender, Offer(item=req.item, price=0, qty=0))
return
price = max(1, int(item["price"] * (0.9 + 0.2 * random.random())))
if price > req.max_price or item["qty"] <= 0:
await ctx.send(sender, Offer(item=req.item, price=0, qty=0))
return
await ctx.send(sender, Offer(item=req.item, price=price, qty=item["qty"]))
ctx.logger.info(f"Offered {req.item} at {price} with qty {item['qty']}")
@trade_proto.on_message(model=Order)
async def on_order(ctx: Context, sender: str, order: Order):
item = CATALOG.get(order.item)
if not item or item["qty"] < order.qty:
await ctx.send(sender, Receipt(item=order.item, qty=0, total=0, ok=False, note="Not enough stock"))
return
total = item["price"] * order.qty
item["qty"] -= order.qty
await ctx.send(sender, Receipt(item=order.item, qty=order.qty, total=total, ok=True, note="Thanks!"))
The seller announces itself to the directory on startup and responds with either a valid offer or a zeroed offer when conditions aren’t met.
Buyer: discovery, requesting offers, and ordering
The buyer queries the directory to discover sellers, requests an offer within a budget, and places an order if an acceptable offer is received.
@buyer.on_event("startup")
async def buyer_start(ctx: Context):
ctx.logger.info("Buyer querying directory for electronics...")
resp = await ctx.ask(directory.address, ServiceQuery(category="electronics"), expects=ServiceList, timeout=5.0)
sellers = resp.addresses if resp else []
if not sellers:
return
target = sellers[0]
desired = "laptop"
budget = 700
ctx.logger.info(f"Requesting offer for '{desired}' within budget {budget} from {target}")
offer = await ctx.ask(target, OfferRequest(item=desired, max_price=budget), expects=Offer, timeout=5.0)
if not offer or offer.price <= 0:
return
qty = 1 if offer.qty >= 1 else 0
if qty == 0:
return
ctx.logger.info(f"Placing order for {qty} x {offer.item} at {offer.price}")
receipt = await ctx.ask(target, Order(item=offer.item, qty=qty), expects=Receipt, timeout=5.0)
if receipt and receipt.ok:
ctx.logger.info(f"ORDER SUCCESS: {receipt.qty} x {receipt.item} | total={receipt.total}")
The buyer follows a simple logic: discover, request, evaluate, and order. It uses asynchronous ask/response patterns to interact with seller agents.
Periodic discovery and running the simulation
Add a periodic discovery tick and run all agents in a single event loop using a Bureau.
@buyer.on_interval(period=6.0)
async def periodic_discovery(ctx: Context):
seen = await ctx.storage.get("seen") or 0
if seen >= 1:
return
await ctx.storage.set("seen", seen + 1)
ctx.logger.info("Periodic discovery tick -> re-query directory")
resp = await ctx.ask(directory.address, ServiceQuery(category="electronics"), expects=ServiceList, timeout=3.0)
n = len(resp.addresses) if resp else 0
ctx.logger.info(f"Periodic: directory reports {n} seller(s)")
bureau = Bureau()
bureau.add(directory)
bureau.add(seller)
bureau.add(buyer)
async def run_demo(seconds=10):
task = asyncio.create_task(bureau.run_async())
try:
await asyncio.sleep(seconds)
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
print("\n Demo run complete.\n")
try:
loop = asyncio.get_running_loop()
await run_demo(10)
except RuntimeError:
asyncio.run(run_demo(10))
Running the demo shows agents discovering each other, negotiating prices, and completing a transaction via message exchanges. The example illustrates how uAgents supports structured messaging, asynchronous workflows, and local state to orchestrate lightweight multi-agent systems.