MCP のコア・アーキテクチャのコンポーネントとコンセプトをカバーし、MCP がクライアント、サーバと LLM に接続する方法を理解します。MCP は、LLM アプリケーションと統合の間のシームレスな通信を可能にする、柔軟で拡張可能なアーキテクチャに基づいて構築されています。
Model Context Protocol (MCP) : コンセプト : コア・アーキテクチャ / 実装サンプル
作成 : クラスキャット・セールスインフォメーション
作成日時 : 05/22/2025
* 本記事は github modelcontextprotocol の以下のページを独自に翻訳した上でまとめ直し、補足説明を加えています :
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
◆ お問合せ : 下記までお願いします。
- クラスキャット セールス・インフォメーション
- sales-info@classcat.com
- ClassCatJP
Model Context Protocol (MCP) : コンセプト : コア・アーキテクチャ / 実装サンプル
MCP がクライアント、サーバと LLM に接続する方法を理解します。
Model Context Protocol (MCP) は、LLM アプリケーションと統合の間のシームレスな通信を可能にする、柔軟で拡張可能なアーキテクチャに基づいて構築されています。このドキュメントはコア・アーキテクチャのコンポーネントとコンセプトを扱っています。
概要
MCP はクライアント・サーバ・アーキテクチャに従っています、そこでは :
- ホスト は、接続を初期化する (Claude Desktop や IDE のような) LLM アプリケーションです。
- クライアント は、ホストアプリケーション内で、サーバとの 1:1 接続を維持します。
- サーバ はクライアントにコンテキスト、ツールとプロンプトを提供します。
コア・コンポーネント
プロトコル層
プロトコル層は、メッセージ・フレーミング、リクエスト/レスポンス・リンキングと高レベル通信パターンを処理します。
TypeScript
class Protocol<Request, Notification, Result> {
// Handle incoming requests
setRequestHandler<T>(schema: T, handler: (request: T, extra: RequestHandlerExtra) => Promise<Result>): void
// Handle incoming notifications
setNotificationHandler<T>(schema: T, handler: (notification: T) => Promise<void>): void
// Send requests and await responses
request<T>(request: Request, schema: T, options?: RequestOptions): Promise<T>
// Send one-way notifications
notification(notification: Notification): Promise<void>
}
Python
class Session(BaseSession[RequestT, NotificationT, ResultT]):
async def send_request(
self,
request: RequestT,
result_type: type[Result]
) -> Result:
"""Send request and wait for response. Raises McpError if response contains error."""
# Request handling implementation
async def send_notification(
self,
notification: NotificationT
) -> None:
"""Send one-way notification that doesn't expect response."""
# Notification handling implementation
async def _received_request(
self,
responder: RequestResponder[ReceiveRequestT, ResultT]
) -> None:
"""Handle incoming request from other side."""
# Request handling implementation
async def _received_notification(
self,
notification: ReceiveNotificationT
) -> None:
"""Handle incoming notification from other side."""
# Notification handling implementation
主要クラスは以下があります :
- Protocol
- Client
- Server
トランスポート層
トランスポート層はクライアントとサーバ間の実際の通信を処理します。MCP は複数のトランスポート・メカニズムをサポートします :
- Stdio トランスポート
- 通信のために標準入出力を使用します
- ローカルプロセスに理想的
- HTTP with SSE トランスポート
- サーバ-to-クライアント・メッセージのためには Server-Sent Events (SSE) を使用します
- クライアント-to-サーバ・メッセージのためには HTTP POST を使用
すべてのトランスポートは JSON-RPC 2.0 を使用してメッセージを交換します。Model Context Protocol メッセージ形式についての詳細は、仕様 をご覧ください。
メッセージ・タイプ
MCP は以下の主要なメッセージタイプを持ちます :
- Request は相手側からのレスポンスを想定します :
interface Request { method: string; params?: { ... }; }
- Result はリクエストへの成功したレスポンスです :
interface Result { [key: string]: unknown; }
- Error はリクエストが失敗したことを示します :
interface Error { code: number; message: string; data?: unknown; }
- Notification はレスポンスを期待しない一方通行のメッセージです :
interface Notification { method: string; params?: { ... }; }
コネクション・ライフサイクル
1. 初期化 (Initialization)
- クライアントはプロトコル・バージョンと機能とともに initialize リクエストを送信します。
- サーバはプロトコル・バーションと機能とともにレスポンスします。
- クライアントは確認応答 (ACK, acknowledgment) として initialized notification を送信します。
- 通常のメッセージ交換が開始されます。
2. メッセージ交換
初期化後、以下のパターンがサポートされます :
- Request-Response: クライアントまたはサーバがリクエストを送信し、他方が応答します。
- Notification: どちらかが一方向メッセージを送信します。
3. 終了 (Termination)
どちらかがコネクションを終了 (terminarte) できます :
- close() 経由でクリーン・シャットダウン
- トランスポート切断
- エラー状態 (conditions)
エラー処理
MCP は以下の標準エラーコードを定義しています :
enum ErrorCode {
// Standard JSON-RPC error codes
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603
}
SDK とアプリケーションは -32000 より大きい独自のエラーコードを定義できます。
エラーは以下を通して伝播されます :
- リクエストへのエラー応答
- トランスポート上のエラーイベント
- プロトコルレベルのエラーハンドラ
実装サンプル
MCP サーバの実装の基本的なサンプルが以下にあります :
TypeScript
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {
resources: {}
}
});
// Handle requests
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "example://resource",
name: "Example Resource"
}
]
};
});
// Connect transport
const transport = new StdioServerTransport();
await server.connect(transport);
Python
import asyncio
import mcp.types as types
from mcp.server import Server
from mcp.server.stdio import stdio_server
app = Server("example-server")
@app.list_resources()
async def list_resources() -> list[types.Resource]:
return [
types.Resource(
uri="example://resource",
name="Example Resource"
)
]
async def main():
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
以上