AutoGen のエージェントはメッセージに反応し、メッセージを送信しそして発行することができて、メッセージはエージェントが相互に通信できる唯一の手段です。
AutoGen Core : フレームワークガイド : メッセージと通信
作成 : クラスキャット・セールスインフォメーション
作成日時 : 04/14/2025
* 本記事は github microsoft/autogen の以下のページを独自に翻訳した上でまとめ直し、補足説明を加えています :
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
◆ お問合せ : 下記までお願いします。
- クラスキャット セールス・インフォメーション
- sales-info@classcat.com
- ClassCatJP
AutoGen Core : フレームワークガイド : メッセージと通信
AutoGen のエージェントはメッセージに反応し、メッセージを送信しそして発行することができて、メッセージはエージェントが相互に通信できる唯一の手段です。
メッセージ
メッセージはシリアライズ可能なオブジェクトで、以下を使用して定義できます :
- Pydantic の pydantic.BaseModel のサブクラス、または
- dataclass
例えば :
from dataclasses import dataclass
@dataclass
class TextMessage:
content: str
source: str
@dataclass
class ImageMessage:
url: str
source: str
Note : メッセージは純粋にデータで、ロジックを含むべきではありません。
メッセージ・ハンドラ
エージェントがメッセージを受信する際、ランタイムはエージェントのメッセージハンドラ (on_message()) を呼び出します、これはエージェントのメッセージ処理ロジックを実装する必要があります。このメッセージがエージェントにより処理できない場合、エージェントは CantHandleException を上げる必要があります。
基底クラス BaseAgent はメッセージ処理ロジックを提供せず、on_message() メソッドの直接的な実装は高度なユースケース以外では勧められません。
開発者は RoutedAgent 基底クラスの実装から始めるべきです、これは組み込みメッセージ・ルーティング機能を提供しています。
タイプによるメッセージのルーティング
RoutedAgent 基底クラスは message_handler() デコレータにより、メッセージタイプをメッセージハンドラに関連付けるメカニズムを提供していますので、開発者は on_message() メソッドを実装する必要がありません。
例えば、次の type-routed エージェントは異なるメッセージハンドラを使用して TextMessage と ImageMessage に応答します :
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
class MyAgent(RoutedAgent):
@message_handler
async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hello, {message.source}, you said {message.content}!")
@message_handler
async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
print(f"Hello, {message.source}, you sent me {message.url}!")
エージェント・ランタイムを作成してエージェントタイプを登録します (Agent and Agent Runtime 参照) :
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')
このエージェントを TextMessage と ImageMessage でテストします。
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()
Hello, User, you said Hello, World!! Hello, User, you sent me https://example.com/image.jpg!
ランタイムは、最初のメッセージを配信するとき、エージェント ID AgentId(“my_agent”, “default”) で MyAgent のインスタンスを自動的に作成します。
同じタイプのメッセージのルーティング
あるシナリオでは、同じタイプのメッセージを異なるハンドラにルーティングすることが有用です。例えば、異なる送信者エージェントからのメッセージは異なる方法で処理されるべきでしょう。message_handler() デコレータの match パラメータを使用できます。
match パラメータは同じメッセージタイプ用のハンドラを特定のメッセージに関連付けます – それはメッセージタイプ・ルーティングの補助的な (secondary) ものです。それは引数としてメッセージと MessageContext を取る callable を受け取り、メッセージが decorated ハンドラにより処理されるべきかを示すブーリアンを返します。callable はハンドラのアルファベット順に確認されます。
次は、match パラメータを使用して送信者エージェントに基づいてメッセージをルーティングするエージェントの例です :
class RoutedBySenderAgent(RoutedAgent):
@message_handler(match=lambda msg, ctx: msg.source.startswith("user1")) # type: ignore
async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")
@message_handler(match=lambda msg, ctx: msg.source.startswith("user2")) # type: ignore
async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")
@message_handler(match=lambda msg, ctx: msg.source.startswith("user2")) # type: ignore
async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
print(f"Hello, {message.source}, you sent me {message.url}!")
上述のエージェントは送信者エージェントを決定するためにメッセージに source フィールドを使用します。エージェント ID が利用可能であれば、送信者エージェントを決めるために MessageContext の sender フィールドを使用することもできます。
異なる source 値を持つメッセージでエージェントをテストしましょう :
runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hello from user 1 handler, user1-test, you said Hello, World!! Hello from user 2 handler, user2-test, you said Hello, World!! Hello, user2-test, you sent me https://example.com/image.jpg!
上記の例で、最初の ImageMessage は処理されません、メッセージの source フィールドがハンドラの一致条件に一致しないからです。
ダイレクト・メッセージング
AutoGen core には 2 つのタイプの通信があります :
- ダイレクト・メッセージング (Direct Messaging) : ダイレクト・メッセージを別のエージェントに送信します。
- ブロードキャスト : メッセージをトピックに発行します。
まず、ダイレクト・メッセージングを見てみましょう。ダイレクト・メッセージを別のエージェントに送信するためには、メッセージハンドラ内では autogen_core.BaseAgent.send_message() メソッドを使用し、ランタイムからは autogen_core.AgentRuntime.send_message() メソッドを使用します。
これらのメソッドの呼び出しの待機 (awaiting) は受信するエージェントのメッセージハンドラの戻り値が返されます。受信するエージェントのハンドラが None を返す場合には、None が返されます。
Note : 送信者が待っている間に呼び出されたエージェントが例外をあげた場合、例外は送信者に伝播し戻されます。
リクエスト / レスポンス
ダイレクト・メッセージングは、送信者が受信者からのレスポンスを想定する、リクエスト/レスポンスのシナリオに対して使用できます。受信者は、メッセージハンドラからの値を返すことでメッセージに応答できます。これをエージェント間の関数呼び出しとして考えることができます。
例えば、次のエージェントを考えましょう :
from dataclasses import dataclass
from autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
@dataclass
class Message:
content: str
class InnerAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
return Message(content=f"Hello from inner, {message.content}")
class OuterAgent(RoutedAgent):
def __init__(self, description: str, inner_agent_type: str):
super().__init__(description)
self.inner_agent_id = AgentId(inner_agent_type, self.id.key)
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
print(f"Received message: {message.content}")
# Send a direct message to the inner agent and receives a response.
response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)
print(f"Received inner response: {response.content}")
OuterAgent はメッセージを受信すると、InnerAgent にダイレクトメッセージを送信してレスポンスでメッセージを受信します。
Message を OuterAgent に送信することでこれらのエージェントをテストできます。
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Received message: Hello, World! Received inner response: Hello from inner, Hello from outer, Hello, World!
両方の出力は OuterAgent のメッセージハンドラにより生成されますが、2 番目の出力は InnerAgent からのレスポンスに基づいています。
一般的に言えば、ダイレクト・メッセージングは、送信者と受信者が密接にカップリングされているシナリオに適しています – それらは一緒に作成され、送信者は受信者の特定のインスタンスにリンクされています。例えば、エージェントはダイレクト・メッセージを ToolAgent のインスタンスに送信することでツール呼び出しを実行し、そのレスポンスを使用して行動・観察 (action-observation) ループを形成します。
ブロードキャスト
ブロードキャストはトピックとサブスクリプションを備えた publish/subscribe モデルです。基本概念を学習するには Topic and Subscription を読んでください。
ダイレクト・メッセージングとブロードキャストの間の主な違いは、ブロードキャストはリクエスト/レスポンスのシナリオには使用できないことです。エージェントがメッセージを発行する場合、それは一方通行で、受信するエージェントのハンドラが値を返す場合でさえ、他のエージェントからのレスポンスを受け取ることはできません。
Note : 発行されたメッセージにレスポンスが与えられた場合、それは破棄されます。
Note : エージェントがサブスクライブしたエージェント用にメッセージタイプを発行する場合、発行されたメッセージを受け取りません。これは無限ループを防ぐためです。
トピックへのサブスクライブと発行
型ベース・サブスクリプションは特定のトピックタイプのトピックに発行されたメッセージを特定のエージェントタイプのエージェントにマップします。RoutedAgent をサブクラス化したエージェントに特定のトピックタイプのトピックをサブスクライブさせるには、type_subscription() クラス・デコレータを使用できます。
次の例は、type_subscription() デコレータを使用して “default” トピックタイプのトピックをサブスクライブし、受信したメッセージを出力する ReceiverAgent クラスを示します。
from autogen_core import RoutedAgent, message_handler, type_subscription
@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
print(f"Received a message: {message.content}")
エージェントのハンドラからメッセージを発行するには、publish_message() メソッドを使用して TopicId を指定します。この呼び出しは、ランタイムがすべてのサブスクライバーへのメッセージの配信をスケジュール可能になるまで待機する必要がありますが、それは常に None を返します。発行されたメッセージ処理中にエージェントが例外を上げた場合、これはログ記録されますが、発行したエージェントに伝播し戻されません。
次の例は、メッセージを受信したときにメッセージをトピックに発行する BroadcastingAgent を示しています。
from autogen_core import TopicId
class BroadcastingAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
await self.publish_message(
Message("Publishing a message from broadcasting agent!"),
topic_id=TopicId(type="default", source=self.id.key),
)
BroadcastingAgent はメッセージを、タイプ “default” で、ソースはエージェント・インスタンスのエージェントキーに割り当てられたトピックに発行します。
サブスクリプションは、エージェントタイプの登録の一部としてか、別の API メソッドを通して、エージェントランタイムにより登録されます。受信するエージェントを type_subscription() デコレータで、ブロードキャストするエージェントをデコレータなしに TypeSubscription を登録する方法を示します。
from autogen_core import TypeSubscription
runtime = SingleThreadedAgentRuntime()
# Option 1: with type_subscription decorator
# The type_subscription class decorator automatically adds a TypeSubscription to
# the runtime when the agent is registered.
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))
# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(
Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime! Received a message: Publishing a message from broadcasting agent!
上記の例で示されるように、エージェントインスタンスを作成する必要なく、ランタイムの publish_message() メソッドを通してトピックに直接発行することもできます。
出力から、受信側エージェントにより 2 つのメッセージが受け取られたことがわかります : 1 つはランタイム経由で発行され、他方はブロードキャストするエージェントにより発行されました。
デフォルト・トピックとサブスクリプション
上記の例では、TopicId と TypeSubscription を使用してトピックとサブスクリプションをそれぞれ指定しました。これは多くのシナリオについて適切な方法です。けれども、発行のスコープが単一である場合、つまり、すべてのエージェントがすべてのブロードキャストされるメッセージを発行しサブスクライブする場合、便利なクラス DefaultTopicId と default_subscription() を使用してコードを単純化できます。
DefaultTopicId はトピックを作成するためのもので、トピックタイプのデフォルト値として “default” を使用し、トピックソースのデフォルト値として発行するエージェントのキーを使用します。default_subscription() はデフォルト・トピックをサブスクライブするタイプ・サブスクリプションを作成するためのものです。DefaultTopicId と default_subscription() を使用して BroadcastingAgent を単純化できます。
from autogen_core import DefaultTopicId, default_subscription
@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
# Publish a message to all agents in the same namespace.
await self.publish_message(
Message("Publishing a message from broadcasting agent!"),
topic_id=DefaultTopicId(),
)
ランタイムが register() を呼び出してエージェントタイプを登録すると、それは TypeSubscription を作成します、そのトピックタイプはデフォルト値として “default” を使用し、エージェントタイプは同じコンテキストに登録されているのと同じエージェントタイプを使用します。
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(
runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime! Received a message: Publishing a message from broadcasting agent!
以上