Практическое руководство: мультиагентный маркетплейс на uAgents

Обзор

В этом руководстве показано, как создать компактную мультиагентную торговую систему с помощью фреймворка uAgents. Три агента — Directory, Seller и Buyer — обмениваются типизированными сообщениями и выполняют асинхронные взаимодействия для моделирования поиска, переговоров и транзакций в децентрализованной среде.

Установка и модели сообщений

Установите uAgents и объявите модели сообщений, которые задают контракт взаимодействия между агентами.

!pip -q install "uagents>=0.11.2"
import asyncio, random
from typing import List, Dict, Optional
from uagents import Agent, Context, Bureau, Model, Protocol


class ServiceAnnounce(Model):
   category: str
   endpoint: str


class ServiceQuery(Model):
   category: str


class ServiceList(Model):
   addresses: List[str]


class OfferRequest(Model):
   item: str
   max_price: int


class Offer(Model):
   item: str
   price: int
   qty: int


class Order(Model):
   item: str
   qty: int


class Receipt(Model):
   item: str
   qty: int
   total: int
   ok: bool
   note: Optional[str] = None

Эти модели задают структуру объявлений, запросов, предложений, заказов и квитанций, чтобы агенты могли обмениваться типизированными сообщениями.

Протоколы и регистрация агентов

Определите протоколы и подключите агентов к ним. Агент Directory управляет регистрацией сервисов и поиском; остальные агенты включают соответствующие протоколы.

registry_proto = Protocol(name="registry", version="1.0")
trade_proto = Protocol(name="trade", version="1.0")


directory = Agent(name="directory", seed="dir-seed-001")
seller = Agent(name="seller", seed="seller-seed-001")
buyer = Agent(name="buyer", seed="buyer-seed-001")


directory.include(registry_proto)
seller.include(trade_proto)
buyer.include(registry_proto)
buyer.include(trade_proto)


@registry_proto.on_message(model=ServiceAnnounce)
async def on_announce(ctx: Context, sender: str, msg: ServiceAnnounce):
   reg = await ctx.storage.get("reg") or {}
   reg.setdefault(msg.category, set()).add(sender)
   await ctx.storage.set("reg", reg)
   ctx.logger.info(f"Registered {sender} under '{msg.category}'")


@registry_proto.on_message(model=ServiceQuery)
async def on_query(ctx: Context, sender: str, msg: ServiceQuery):
   reg = await ctx.storage.get("reg") or {}
   addrs = sorted(list(reg.get(msg.category, set())))
   await ctx.send(sender, ServiceList(addresses=addrs))
   ctx.logger.info(f"Returned {len(addrs)} providers for '{msg.category}'")

Directory хранит реестр, отображающий категории в адреса поставщиков, и отвечает на запросы ServiceQuery списком провайдеров.

Seller: каталог, предложения и обработка заказов

Seller поддерживает каталог и реализует обработчики запросов на предложение и заказы. Цена может варьироваться, а запас товара отслеживается для проверки доступности.

CATALOG: Dict[str, Dict[str, int]] = {
   "camera": {"price": 120, "qty": 3},
   "laptop": {"price": 650, "qty": 2},
   "headphones": {"price": 60, "qty": 5},
}


@seller.on_event("startup")
async def seller_start(ctx: Context):
   await ctx.send(directory.address, ServiceAnnounce(category="electronics", endpoint=seller.address))
   ctx.logger.info("Seller announced to directory")


@trade_proto.on_message(model=OfferRequest)
async def on_offer_request(ctx: Context, sender: str, req: OfferRequest):
   item = CATALOG.get(req.item)
   if not item:
       await ctx.send(sender, Offer(item=req.item, price=0, qty=0))
       return
   price = max(1, int(item["price"] * (0.9 + 0.2 * random.random())))
   if price > req.max_price or item["qty"] <= 0:
       await ctx.send(sender, Offer(item=req.item, price=0, qty=0))
       return
   await ctx.send(sender, Offer(item=req.item, price=price, qty=item["qty"]))
   ctx.logger.info(f"Offered {req.item} at {price} with qty {item['qty']}")


@trade_proto.on_message(model=Order)
async def on_order(ctx: Context, sender: str, order: Order):
   item = CATALOG.get(order.item)
   if not item or item["qty"] < order.qty:
       await ctx.send(sender, Receipt(item=order.item, qty=0, total=0, ok=False, note="Not enough stock"))
       return
   total = item["price"] * order.qty
   item["qty"] -= order.qty
   await ctx.send(sender, Receipt(item=order.item, qty=order.qty, total=total, ok=True, note="Thanks!"))

Seller объявляет себя Directory при старте и отвечает либо валидным предложением, либо пустым предложением при несоответствии условий.

Buyer: обнаружение, запросы предложений и оформление заказа

Buyer запрашивает Directory для поиска продавцов, просит предложение в пределах бюджета и оформляет заказ при приемлемом ответе.

@buyer.on_event("startup")
async def buyer_start(ctx: Context):
   ctx.logger.info("Buyer querying directory for electronics...")
   resp = await ctx.ask(directory.address, ServiceQuery(category="electronics"), expects=ServiceList, timeout=5.0)
   sellers = resp.addresses if resp else []
   if not sellers:
       return
   target = sellers[0]
   desired = "laptop"
   budget = 700
   ctx.logger.info(f"Requesting offer for '{desired}' within budget {budget} from {target}")
   offer = await ctx.ask(target, OfferRequest(item=desired, max_price=budget), expects=Offer, timeout=5.0)
   if not offer or offer.price <= 0:
       return
   qty = 1 if offer.qty >= 1 else 0
   if qty == 0:
       return
   ctx.logger.info(f"Placing order for {qty} x {offer.item} at {offer.price}")
   receipt = await ctx.ask(target, Order(item=offer.item, qty=qty), expects=Receipt, timeout=5.0)
   if receipt and receipt.ok:
       ctx.logger.info(f"ORDER SUCCESS: {receipt.qty} x {receipt.item} | total={receipt.total}")

Buyer реализует логику: обнаружение, запрос предложения, оценка и заказ, используя асинхронные паттерны ask/response.

Периодическое обнаружение и запуск симуляции

Добавьте периодический тик обнаружения и запустите всех агентов в едином цикле событий через Bureau.

@buyer.on_interval(period=6.0)
async def periodic_discovery(ctx: Context):
   seen = await ctx.storage.get("seen") or 0
   if seen >= 1:
       return
   await ctx.storage.set("seen", seen + 1)
   ctx.logger.info("Periodic discovery tick -> re-query directory")
   resp = await ctx.ask(directory.address, ServiceQuery(category="electronics"), expects=ServiceList, timeout=3.0)
   n = len(resp.addresses) if resp else 0
   ctx.logger.info(f"Periodic: directory reports {n} seller(s)")


bureau = Bureau()
bureau.add(directory)
bureau.add(seller)
bureau.add(buyer)


async def run_demo(seconds=10):
   task = asyncio.create_task(bureau.run_async())
   try:
       await asyncio.sleep(seconds)
   finally:
       task.cancel()
       try:
           await task
       except asyncio.CancelledError:
           pass
   print("\n Demo run complete.\n")


try:
   loop = asyncio.get_running_loop()
   await run_demo(10)
except RuntimeError:
   asyncio.run(run_demo(10))

Выполнение демонстрации показывает, как агенты обнаруживают друг друга, согласуют цену и завершают транзакцию через обмен сообщениями. Пример иллюстрирует, как uAgents упрощает структурированную передачу сообщений, асинхронные рабочие процессы и управление локальным состоянием для оркестрации лёгких мультиагентных систем.