MCP の通信メカニズムについて学習します。Model Context Protocol (MCP) のトランスポートはクライアントとサーバ間の通信の基盤を提供します。トランスポートはメッセージの送受信の方法の基礎的なメカニズムを処理します。
Model Context Protocol (MCP) : コンセプト : トランスポート – 標準入出力 & SSE
作成 : クラスキャット・セールスインフォメーション
作成日時 : 05/28/2025
* 本記事は github modelcontextprotocol の以下のページを独自に翻訳した上でまとめ直し、補足説明を加えています :
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
◆ お問合せ : 下記までお願いします。
- クラスキャット セールス・インフォメーション
- sales-info@classcat.com
- ClassCatJP
Model Context Protocol (MCP) : コンセプト : トランスポート – 標準入出力 & SSE
MCP の通信メカニズムについて学習します
Model Context Protocol (MCP) のトランスポートはクライアントとサーバ間の通信の基盤を提供します。トランスポートはメッセージの送受信の方法の基礎的なメカニズムを処理します。
メッセージ形式
MCP はその伝送形式 (wire format) として JSON-RPC 2.0 を使用します。トランスポート層は、送信用に MCP プロトコルメッセージを JSON-RPC 形式に変換し、受信した JSON-RPC メッセージを MCP プロトコルメッセージに再変換する役割を担います。
使用される JSON-RPC メッセージには 3 つの種類があります :
リクエスト
{
jsonrpc: "2.0",
id: number | string,
method: string,
params?: object
}
レスポンス
{
jsonrpc: "2.0",
id: number | string,
result?: object,
error?: {
code: number,
message: string,
data?: unknown
}
}
通知 (Notifications)
{
jsonrpc: "2.0",
method: string,
params?: object
}
組み込みトランスポート・タイプ
MCP は 2 つの標準トランスポート実装を含みます :
標準入出力 (stdio)
stdio トランスポートは標準入出力ストリーム経由の通信を可能にします。これはローカル統合やコマンドライン・ツールに特に有用です。
以下の場合に stdio を使用します :
- コマンドライン・ツールの構築
- ローカル統合の実装
- 単純なプロセス通信が必要な場合
- シェルスクリプトで操作
TypeScript (サーバ)
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioServerTransport();
await server.connect(transport);
TypeScript (クライアント)
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new StdioClientTransport({
command: "./server",
args: ["--option", "value"]
});
await client.connect(transport);
Python (サーバ)
app = Server("example-server")
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
Python (クライアント)
params = StdioServerParameters(
command="./server",
args=["--option", "value"]
)
async with stdio_client(params) as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
Server-Sent Events (SSE)
SSE トランスポートは、クライアント-to-サーバ通信において、HTTP POST リクエストを使用してサーバ-to-クライアント・ストリーミングを可能にします。
以下の場合に SSE を使用します :
- サーバ-to-クライアント・ストリーミングだけが必要な場合
- 制限されたネットワークで作業
- 単純なアップデートの実装
セキュリティ警告: DNS リバインディング攻撃
SSE トランスポートは、適切に保護されていない場合、DNS リバインディング攻撃への脆弱性になり得ます。これを防ぐには :
- 受信する SSE 接続の Origin ヘッダを常に検証 し、想定されるソースからのものであることを確認してください。
- ローカルで実装している場合 サーバをすべてのネットワーク・インターフェイス (0.0.0.0) にバインドすることを避けてください – 代わりに (127.0.0.1) にのみバインドします。
- すべての SSE 接続に対して 適切な認証を実装してください。
これらの保護なしでは、攻撃者は DNS リバインディングを使用して、リモート web サイトからローカル MCP サーバを操作できる可能性があります。
TypeScript (サーバ)
import express from "express";
const app = express();
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
let transport: SSEServerTransport | null = null;
app.get("/sse", (req, res) => {
transport = new SSEServerTransport("/messages", res);
server.connect(transport);
});
app.post("/messages", (req, res) => {
if (transport) {
transport.handlePostMessage(req, res);
}
});
app.listen(3000);
TypeScript (クライアント)
const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
await client.connect(transport);
Python (サーバ)
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
app = Server("example-server")
sse = SseServerTransport("/messages")
async def handle_sse(scope, receive, send):
async with sse.connect_sse(scope, receive, send) as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
async def handle_messages(scope, receive, send):
await sse.handle_post_message(scope, receive, send)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=handle_messages, methods=["POST"]),
]
)
Python (クライアント)
async with sse_client("http://localhost:8000/sse") as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
カスタム・トランスポート
MCP は特定のニーズに合わせたカスタムトランスポートを実装することを容易にします。任意のトランスポート実装は Transport インターフェイスに準拠する必要があるだけです :
以下のためにカスタムトランスポートを実装できます :
- カスタム・ネットワーク・プロトコル
- 専用通信チャネル
- 既存のシステムとの統合
- パフォーマンス最適化
TypeScript
interface Transport {
// Start processing messages
start(): Promise;
// Send a JSON-RPC message
send(message: JSONRPCMessage): Promise;
// Close the connection
close(): Promise;
// Callbacks
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
}
Python
@contextmanager
async def create_transport(
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
"""
Transport interface for MCP.
Args:
read_stream: Stream to read incoming messages from
write_stream: Stream to write outgoing messages to
"""
async with anyio.create_task_group() as tg:
try:
# Start processing messages
tg.start_soon(lambda: process_messages(read_stream))
# Send messages
async with write_stream:
yield write_stream
except Exception as exc:
# Handle errors
raise exc
finally:
# Clean up
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
Note : While MCP Servers are often implemented with asyncio, we recommend implementing low-level interfaces like transports with anyio for wider compatibility.
エラー処理
トランスポート実装は様々なエラーのシナリオを処理する必要があります :
- 接続エラー
- メッセージ解析エラー
- プロトコル・エラー
- ネットワーク・タイムアウト
- リソース・クリーンアップ
Example error handling:
TypeScript
class ExampleTransport implements Transport {
async start() {
try {
// Connection logic
} catch (error) {
this.onerror?.(new Error(`Failed to connect: ${error}`));
throw error;
}
}
async send(message: JSONRPCMessage) {
try {
// Sending logic
} catch (error) {
this.onerror?.(new Error(`Failed to send message: ${error}`));
throw error;
}
}
}
Python
@contextmanager
async def example_transport(scope: Scope, receive: Receive, send: Send):
try:
# Create streams for bidirectional communication
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def message_handler():
try:
async with read_stream_writer:
# Message handling logic
pass
except Exception as exc:
logger.error(f"Failed to handle message: {exc}")
raise exc
async with anyio.create_task_group() as tg:
tg.start_soon(message_handler)
try:
# Yield streams for communication
yield read_stream, write_stream
except Exception as exc:
logger.error(f"Transport error: {exc}")
raise exc
finally:
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
except Exception as exc:
logger.error(f"Failed to initialize transport: {exc}")
raise exc
Note : While MCP Servers are often implemented with asyncio, we recommend implementing low-level interfaces like transports with anyio for wider compatibility.
以上