@shinyaz

Bedrock AgentCore RuntimeのStateful MCPサーバーでElicitation・Sampling・Progress Notificationsを動かす

目次

はじめに

2026年3月10日、AWSはAmazon Bedrock AgentCore RuntimeにStateful MCPサーバー機能を追加した。AgentCore Runtime上のMCPサーバーはこれまでステートレスのみサポートしており、ツール呼び出しごとにコンテキストがリセットされていた。Stateful MCPではMcp-Session-Idヘッダーで専用microVMにセッションが紐付き、複数リクエストにまたがるインタラクションが可能になる。

追加された機能は3つ。Elicitation(サーバーからクライアントへの入力要求)、Sampling(サーバーからクライアントへのLLMテキスト生成要求)、Progress Notifications(長時間処理の進行状況通知)だ。本記事では、FastMCPでStateful MCPサーバーを構築し、agentcore deployでデプロイした上でこれらの機能を実際に動作確認した結果を共有する。公式ドキュメントはStateful MCP server featuresを参照。

Stateful MCPの3つの新機能

従来のMCPサーバーは「クライアントがツールを呼び、サーバーが結果を返す」一方通行だった。Stateful MCPでは通信が双方向になり、サーバー側がクライアントに対して能動的にアクションを要求できる。

機能方向用途
Elicitationサーバー → クライアントユーザーの好みや追加情報を対話的に収集
Samplingサーバー → クライアントLLMによるテキスト生成をクライアント側で実行させる
Progress Notificationsサーバー → クライアントフライト検索やデータ処理など長時間操作の進捗報告

これらはすべてセッション維持が前提だ。stateless_http=Falsestreamable-httpトランスポートの組み合わせで有効化される。

検証環境の構築

検証にはFastMCP 3.1.1で旅行プランニング用MCPサーバーを構築した。3つの新機能すべてを1つのツール(plan_trip)で組み合わせて使えるようにしている。

前提条件:

  • AWS CLIセットアップ済み(bedrock-agentcore:*iam:*s3:*の操作権限)
  • Python 3.10+
  • agentcore CLI(uv tool install bedrock-agentcore-starter-toolkitでインストール)

MCPサーバーコード

travel_server.py
from fastmcp import FastMCP, Context
import json
 
mcp = FastMCP("Travel Planner")
 
DESTINATIONS = {
    "tokyo": {"city": "Tokyo", "highlights": ["Shibuya Crossing", "Senso-ji Temple", "Tsukiji Market"]},
    "paris": {"city": "Paris", "highlights": ["Eiffel Tower", "Louvre Museum", "Seine River Cruise"]},
}
 
@mcp.resource("travel://destinations")
def list_destinations() -> str:
    return json.dumps(DESTINATIONS, indent=2)
 
@mcp.tool()
async def plan_trip(ctx: Context) -> str:
    """Elicitation + Sampling + Progress Notificationsを組み合わせた旅行プランニング。"""
    total_steps = 5
 
    # Elicitation: サーバーからクライアントに入力を要求
    await ctx.report_progress(progress=0, total=total_steps)
    dest_result = await ctx.elicit(
        message="Where would you like to go?\nOptions: Paris, Tokyo",
        response_type=str,
    )
    if dest_result.action != "accept":
        return "Cancelled."
    destination = dest_result.data
    await ctx.report_progress(progress=1, total=total_steps)
 
    days_result = await ctx.elicit(
        message=f"How many days will you spend in {destination}?", response_type=int,
    )
    days = days_result.data
    await ctx.report_progress(progress=3, total=total_steps)
 
    # Sampling: サーバーからクライアントにLLM生成を要求
    response = await ctx.sample(
        messages=f"Give 3 tips for a trip to {destination} ({days} days).",
        max_tokens=200,
    )
    ai_tips = response.text
    await ctx.report_progress(progress=5, total=total_steps)
 
    return json.dumps({
        "destination": destination, "days": days,
        "ai_tips": ai_tips, "status": "planned",
    }, indent=2)
 
if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, stateless_http=False)

ポイントはstateless_http=Falseだ。公式ドキュメントではこの設定がCRITICALと明記されており、設定しないとElicitationやSamplingのコールバックがサーバーからクライアントに到達しない。

上記は核心部分を抜粋した簡略版だ。検証で使用した完全版サーバーには、Progress Notifications検証用のsearch_flights、Sampling専用のquick_recommend、パラメータ付きリソース、プロンプトテンプレートも含まれている。

完全なサーバーコード(travel_server.py)
travel_server.py
import asyncio
import json
 
from fastmcp import FastMCP, Context
 
mcp = FastMCP(
    "Travel Planner",
    instructions="A stateful travel planning MCP server that demonstrates "
    "elicitation, sampling, and progress notifications.",
)
 
DESTINATIONS = {
    "paris": {
        "city": "Paris",
        "country": "France",
        "highlights": ["Eiffel Tower", "Louvre Museum", "Seine River Cruise"],
        "best_season": "Spring",
    },
    "tokyo": {
        "city": "Tokyo",
        "country": "Japan",
        "highlights": ["Shibuya Crossing", "Senso-ji Temple", "Tsukiji Market"],
        "best_season": "Autumn",
    },
    "new york": {
        "city": "New York",
        "country": "USA",
        "highlights": ["Central Park", "Statue of Liberty", "Broadway"],
        "best_season": "Fall",
    },
    "bali": {
        "city": "Bali",
        "country": "Indonesia",
        "highlights": ["Ubud Rice Terraces", "Tanah Lot Temple", "Seminyak Beach"],
        "best_season": "Dry Season (Apr-Oct)",
    },
}
 
 
@mcp.resource("travel://destinations")
def list_destinations() -> str:
    return json.dumps(DESTINATIONS, indent=2)
 
 
@mcp.resource("travel://destination/{city}")
def get_destination(city: str) -> str:
    dest = DESTINATIONS.get(city.lower())
    if dest:
        return json.dumps(dest, indent=2)
    return json.dumps({"error": f"Destination '{city}' not found"})
 
 
@mcp.prompt()
def packing_list(destination: str, days: int, trip_type: str) -> str:
    return (
        f"Create a detailed {days}-day packing list for a {trip_type} trip "
        f"to {destination}. Include weather-appropriate clothing, essentials, "
        f"and destination-specific items."
    )
 
 
@mcp.prompt()
def local_phrases(destination: str) -> str:
    return (
        f"Teach me 10 essential local phrases for visiting {destination}. "
        f"Include greetings, asking for directions, ordering food, "
        f"and emergency phrases with pronunciation guides."
    )
 
 
@mcp.tool()
async def plan_trip(ctx: Context) -> str:
    """Plan a complete trip using elicitation, sampling, and progress notifications."""
    total_steps = 5
    await ctx.report_progress(progress=0, total=total_steps)
    dest_result = await ctx.elicit(
        message="Where would you like to go?\nOptions: Paris, Tokyo, New York, Bali",
        response_type=str,
    )
    if dest_result.action != "accept":
        return "Trip planning cancelled."
    destination = dest_result.data
    await ctx.report_progress(progress=1, total=total_steps)
 
    days_result = await ctx.elicit(
        message=f"How many days will you spend in {destination}?",
        response_type=int,
    )
    if days_result.action != "accept":
        return "Trip planning cancelled."
    days = days_result.data
    await ctx.report_progress(progress=2, total=total_steps)
 
    type_result = await ctx.elicit(
        message="What type of trip?\nOptions: leisure, business, adventure",
        response_type=str,
    )
    if type_result.action != "accept":
        return "Trip planning cancelled."
    trip_type = type_result.data
    await ctx.report_progress(progress=3, total=total_steps)
 
    response = await ctx.sample(
        messages=f"Give 3 brief tips for a {trip_type} trip to {destination} "
        f"lasting {days} days. Be concise.",
        max_tokens=200,
    )
    ai_tips = response.text
    await ctx.report_progress(progress=5, total=total_steps)
 
    dest_info = DESTINATIONS.get(destination.lower(), {})
    highlights = dest_info.get("highlights", ["No specific highlights available"])
    return json.dumps({
        "destination": destination,
        "days": days,
        "trip_type": trip_type,
        "highlights": highlights,
        "ai_tips": ai_tips,
        "status": "planned",
    }, indent=2)
 
 
@mcp.tool()
async def quick_recommend(ctx: Context) -> str:
    """Get a quick destination recommendation using sampling only."""
    response = await ctx.sample(
        messages="Recommend one travel destination from: Paris, Tokyo, New York, Bali. "
        "Give a one-sentence reason why.",
        max_tokens=100,
    )
    return f"Recommendation: {response.text}"
 
 
@mcp.tool()
async def search_flights(ctx: Context, origin: str, destination: str) -> str:
    """Simulate a flight search with progress notifications."""
    total = 4
    stages = [
        "Searching airlines...",
        "Comparing prices...",
        "Checking availability...",
        "Finalizing results...",
    ]
    for i, stage in enumerate(stages):
        await ctx.report_progress(progress=i + 1, total=total)
        await ctx.info(stage)
        await asyncio.sleep(0.3)
    return json.dumps({
        "origin": origin,
        "destination": destination,
        "flights": [
            {"airline": "AirExample", "price": "$450", "duration": "8h 30m"},
            {"airline": "SkyDemo", "price": "$520", "duration": "7h 15m"},
        ],
    }, indent=2)
 
 
if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, stateless_http=False)

デプロイ

依存パッケージのrequirements.txtを用意する。

requirements.txt
fastmcp>=2.10.0
mcp

agentcore CLIのDirect Code Deploy機能を使えば、Dockerfileなしでデプロイできる。

ローカルでの動作確認手順

AgentCoreにデプロイする前に、ローカルで動作確認できる。

ターミナル
# 仮想環境の作成と依存インストール
uv venv && source .venv/bin/activate
uv pip install "fastmcp>=2.10.0" mcp
 
# サーバー起動(別ターミナルで実行)
python travel_server.py
# → http://0.0.0.0:8000/mcp で起動

MCP Inspectorを使えばブラウザからツールやリソースの動作を確認できる。プログラムでテストする場合は後述のクライアントコードを参照。

ターミナル
# 設定(MCP protocol指定 + Direct Code Deploy)
agentcore configure \
  -e travel_server.py -p MCP -n stateful_mcp_demo \
  -dt direct_code_deploy -rf requirements.txt -r us-west-2 -ni
 
# デプロイ(IAMロール・S3バケットは自動作成)
agentcore deploy
出力結果
✅ Deployment completed successfully
Agent ARN: arn:aws:bedrock-agentcore:us-west-2:381492023699:runtime/stateful_mcp_demo-ZgZZ0pEA9n
Deployment Type: Direct Code Deploy

IAMロール作成、依存パッケージのLinux ARM64向けクロスコンパイル、S3アップロード、ランタイム作成、エンドポイント作成まですべて自動化される。デプロイ所要時間は約3分(大半はメモリリソースの初期化待ち)だった。

テストクライアント

テストクライアントはMCP Python SDKのstreamablehttp_clientを使い、Elicitation・Samplingのコールバックを登録する。リモート接続ではSigV4認証が必要だ。

Python
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.session import ClientSession
 
async with streamablehttp_client(url, httpx_client_factory=sigv4_factory) as (
    read_stream, write_stream, get_session_id
):
    async with ClientSession(
        read_stream, write_stream,
        elicitation_callback=elicit_handler,
        sampling_callback=sampling_handler,
    ) as session:
        await session.initialize()
        session_id = get_session_id()  # → "d28be10a-298b-4bfe-a16a-c810f95269c8"

リモート接続のエンドポイントURLはARNをURLエンコードして以下の形式にする。

エンドポイントURL
https://bedrock-agentcore.{REGION}.amazonaws.com/runtimes/{ENCODED_ARN}/invocations?qualifier=DEFAULT

Stateful MCPの核心はコールバックの実装にある。Elicitationではサーバーからの質問にどう応答するか、Samplingではどのように(どのLLMで)テキストを生成するかをクライアント側で制御する。

コールバック実装とSigV4認証(テストクライアント完全版)
test_client.py
import asyncio
import json
import urllib.parse
 
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
import httpx
 
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.session import ClientSession
from mcp.types import CreateMessageResult, ElicitResult, TextContent
 
AGENT_RUNTIME_ARN = "arn:aws:bedrock-agentcore:us-west-2:ACCOUNT_ID:runtime/YOUR_RUNTIME_ID"
REGION = "us-west-2"
 
 
async def elicit_callback(context, params):
    """Elicitationコールバック: サーバーからの質問に応答する。"""
    msg = params.message if hasattr(params, "message") else str(params)
    print(f"  [ELICITATION] Server asks: {msg}")
    # 実運用ではここでユーザーに入力を求める
    response = input("  Your answer: ").strip()
    return ElicitResult(action="accept", content={"value": response})
 
 
async def sampling_callback(context, params):
    """Samplingコールバック: サーバーからのLLM生成要求に応答する。"""
    msg_text = ""
    if hasattr(params, "messages"):
        for m in params.messages:
            if hasattr(m.content, "text"):
                msg_text = m.content.text
                break
    print(f"  [SAMPLING] Server requests: {msg_text[:100]}...")
    # 実運用ではここでLLM APIを呼び出す
    ai_response = "1. Research local customs. 2. Pack light. 3. Book in advance."
    return CreateMessageResult(
        role="assistant",
        content=TextContent(type="text", text=ai_response),
        model="your-model-id",
        stopReason="endTurn",
    )
 
 
class SigV4HttpxAuth(httpx.Auth):
    """AgentCore Runtime接続用のSigV4認証。"""
    def __init__(self, region, service="bedrock-agentcore"):
        self.region = region
        self.service = service
        session = boto3.Session()
        creds = session.get_credentials().get_frozen_credentials()
        self.credentials = Credentials(creds.access_key, creds.secret_key, creds.token)
 
    def auth_flow(self, request):
        aws_request = AWSRequest(
            method=request.method, url=str(request.url),
            headers=dict(request.headers), data=request.content,
        )
        SigV4Auth(self.credentials, self.service, self.region).add_auth(aws_request)
        for key, value in aws_request.headers.items():
            request.headers[key] = value
        yield request
 
 
def create_sigv4_httpx_client(**kwargs):
    kwargs.pop("auth", None)  # SDK由来のauthを除去
    return httpx.AsyncClient(auth=SigV4HttpxAuth(REGION), **kwargs)
 
 
async def main():
    encoded_arn = urllib.parse.quote(AGENT_RUNTIME_ARN, safe="")
    url = f"https://bedrock-agentcore.{REGION}.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=DEFAULT"
 
    async with streamablehttp_client(
        url, httpx_client_factory=create_sigv4_httpx_client,
        timeout=120, sse_read_timeout=120, terminate_on_close=False,
    ) as (read_stream, write_stream, get_session_id):
        async with ClientSession(
            read_stream, write_stream,
            elicitation_callback=elicit_callback,
            sampling_callback=sampling_callback,
        ) as session:
            await session.initialize()
            print(f"Session ID: {get_session_id()}")
 
            # ツール一覧
            tools = await session.list_tools()
            for t in tools.tools:
                print(f"  {t.name}: {t.description}")
 
            # ツール実行(Elicitation + Sampling + Progress)
            result = await session.call_tool("plan_trip", {})
            for c in result.content:
                if hasattr(c, "text"):
                    print(c.text)
 
if __name__ == "__main__":
    asyncio.run(main())

ローカルテストの場合はcreate_sigv4_httpx_clientを外し、URLをhttp://localhost:8000/mcpに変えるだけでよい。

検証結果

ローカル(localhost:8000)とリモート(AgentCore Runtime)の両方で、Elicitation・Sampling・Progress Notifications・Resources・Prompts・セッション管理の各機能を検証した。

1. Elicitation — サーバー主導のユーザー入力取得

plan_tripツール内でctx.elicit()が呼ばれると、サーバーからクライアントにJSON-RPCリクエストが送られ、クライアント側のelicitation_callbackが発火する。

出力結果
[ELICITATION] Server asks: Where would you like to go?
Options: Paris, Tokyo
[ELICITATION] Auto-responding: Tokyo
 
[ELICITATION] Server asks: How many days will you spend in Tokyo?
[ELICITATION] Auto-responding: 3

従来のMCPではツール実行中にサーバーがクライアントに問いかけることは不可能だった。Stateful MCPではMcp-Session-Idで同一セッションが維持されるため、ツール実行を一時停止してユーザー入力を待ち、再開できる。今回の検証ではresponse_typestrintを指定し、いずれも正常に動作することを確認した。MCP仕様上はこの型指定によるバリデーションもサポートされている。

2. Sampling — サーバーからクライアントへのLLM生成要求

ctx.sample()はサーバー側がクライアントに「このプロンプトでLLMを呼んでテキストを生成して」と依頼するメカニズムだ。

出力結果
[SAMPLING] Server requests: Give 3 tips for a trip to Tokyo (3 days)...
[SAMPLING] Auto-responding with preset text

設計上の重要なポイントは、LLM呼び出しがサーバー側ではなくクライアント側で行われることだ。MCPサーバーはLLMのAPIキーやモデル選択に関与しない。クライアント(AIエージェントホスト)が自身のLLMを使って応答を生成し、結果をサーバーに返す。これにより、MCPサーバーはモデル非依存を保ちつつ、AI生成コンテンツを活用したワークフローを構築できる。

3. Progress Notifications — 長時間処理の進行状況通知

フルサーバーに実装したsearch_flightsツールでは、フライト検索のシミュレーションとして4段階の進捗通知を送信する。

出力結果(ローカル)
  Found 2 flights
    AirExample: $450 (8h 30m)
    SkyDemo: $520 (7h 15m)
  PASS: Flight search with progress completed

ctx.report_progress(progress=1, total=4)のようにステップ数と合計を指定する。クライアント側ではprogress_handlerコールバックで受信し、プログレスバーや状況表示に利用できる。MCP仕様上、通知はfire-and-forget(応答不要)であり、検証でもサーバー側の処理がブロックされずに進行することを確認した。

4. Resources / Prompts — 既存MCPプリミティブとの共存

Stateful MCPでもResourcesとPromptsは従来通り動作する。

出力結果
--- List Resources ---
  travel://destinations: list_destinations
  PASS: 1 resources found
 
--- Read Parameterized Resource ---
  Tokyo highlights: ['Shibuya Crossing', 'Senso-ji Temple', 'Tsukiji Market']
  PASS: Parameterized resource read successfully
 
--- List Prompts ---
  packing_list: Generate a packing list prompt for a trip.
  local_phrases: Generate a prompt for learning local phrases.
  PASS: 2 prompts found

travel://destination/{city}のようなパラメータ付きリソースも正常に動作した。Stateful機能を追加しても既存のMCPプリミティブに影響はない。

5. セッション管理 — Mcp-Session-Idの維持

上記すべての検証(ツール一覧取得、リソース読み取り、Elicitation、Sampling、Progress Notifications)を通じて、セッションIDが一貫して維持されることを確認した。

出力結果
--- Session ID Persistence ---
  Initial: d28be10a-298b-4bfe-a16a-c810f95269c8
  Final:   d28be10a-298b-4bfe-a16a-c810f95269c8
  PASS: Session ID maintained across all requests

ローカル(FastMCP自身が管理)でもリモート(AgentCore Runtimeが管理)でも、初期化時に発行されたセッションIDが全リクエストを通じて同一であることが確認できた。

ローカルとリモートの挙動比較

検証項目ローカルリモート(AgentCore)
ElicitationPASSPASS
SamplingPASSPASS
Progress NotificationsPASSPASS
ResourcesPASSPASS
PromptsPASSPASS
Session ID維持PASSPASS
認証方式不要SigV4
Session ID形式a5190ac38f63428b...(hex)d28be10a-298b-4bfe-...(UUID)

ローカルのSession IDはFastMCPが生成するhex文字列、AgentCore Runtimeが返すSession IDはUUID形式と違いがあるが、機能面での差異はなかった。

実装上の落とし穴

エンドポイントURLのARNエンコード

AgentCore RuntimeのMCPエンドポイントURLは、ARN全体をURLエンコードして/invocations?qualifier=DEFAULTに接続する必要がある。

正しいURL
https://bedrock-agentcore.us-west-2.amazonaws.com/runtimes/arn%3Aaws%3Abedrock-agentcore%3Aus-west-2%3A381492023699%3Aruntime%2Fstateful_mcp_demo-ZgZZ0pEA9n/invocations?qualifier=DEFAULT

ランタイムIDだけを渡す/runtimes/{runtimeId}/mcp形式では404が返る。ドキュメント上のサンプルコードではこの点が抽象化されているため、直接HTTPリクエストを構築する場合は注意が必要だ。

SigV4署名とhttpx_client_factory

MCP Python SDKのstreamablehttp_clientはhttpxを使用しており、SigV4認証をフックするにはhttpx_client_factoryパラメータでカスタムクライアントを注入する。SDKがauthパラメータを渡してくるため、factory内でkwargs.pop("auth", None)して自前のSigV4 Authに差し替える必要がある。

stateless_http=Falseの明示

FastMCPのデフォルトはstateless_http=Trueだ。公式ドキュメントによると、この場合streamable-httpトランスポートでもセッション管理が無効になり、ElicitationやSamplingのコールバックが到達しない。Stateful機能を使う場合は明示的にFalseを指定する。

まとめ

  • Stateful MCPはサーバーからクライアントへの双方向通信を実現する — Elicitation(入力要求)、Sampling(LLM生成要求)、Progress Notifications(進捗通知)の3つにより、MCPサーバーがツール実行中にクライアントと対話できるようになった。従来の「呼んだら結果が返る」一方通行から大きな進化だ。
  • SamplingはLLM非依存の設計を維持する鍵 — LLM呼び出しをクライアント側に委ねることで、MCPサーバーは特定のモデルやAPIキーに依存せずAI生成コンテンツを活用できる。MCPの哲学であるツールとモデルの分離が貫かれている。
  • AgentCore Runtime上でもローカルと同一の挙動が確認できた — Elicitation、Sampling、Progress Notificationsのすべてがローカル・リモート両方で一貫して動作した。ただしエンドポイントURLのARNエンコードとSigV4認証のフックは手動セットアップが必要だ。
  • stateless_http=Falseの1行が全機能のゲートになっている — この設定を忘れるとセッション管理が無効になり、Stateful機能がサイレントに動作しない。FastMCPのデフォルトがstatefulではない点は要注意だ。

クリーンアップ

ターミナル
cd /tmp/agentcore-stateful-mcp
agentcore destroy

agentcore destroyでランタイム、エンドポイント、IAMロール、S3バケット、メモリリソースをまとめて削除できる。

共有する

田原 慎也

田原 慎也

ソリューションアーキテクト @ AWS

AWS ソリューションアーキテクトとして金融業界のお客様を中心に技術支援を行っています。クラウドアーキテクチャや AI/ML に関する学びをこのサイトで発信しています。このサイトの内容は個人の見解であり、所属企業の公式な意見や見解を代表するものではありません。

関連記事