Rust SDK – MCP http streamable

[dependencies]
anyhow = "1.0.98"
axum = "0.8.4"
rmcp = { version = "0.3.0", features = ["server", "transport-io", "transport-sse-server", "transport-streamable-http-server"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
tokio = { version = "1.46.1", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
use rmcp::handler::server::router::Router;
use std::sync::Arc;
use rmcp::{
ErrorData as McpError, RoleServer, ServerHandler,
handler::server::{router::tool::ToolRouter, tool::Parameters},
model::*,
schemars,
service::RequestContext,
tool, tool_handler, tool_router,
transport::streamable_http_server::{
StreamableHttpService, session::local::LocalSessionManager,
},
};
use serde_json::json;
use tokio::sync::Mutex;
use tracing_subscriber::{
layer::SubscriberExt,
util::SubscriberInitExt,
{self},
};
const BIND_ADDRESS: &str = "127.0.0.1:7000";
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct StructRequest {
pub a: i32,
pub b: i32,
}
#[derive(Clone)]
pub struct Counter {
counter: Arc<Mutex<i32>>,
tool_router: ToolRouter<Counter>,
}
#[tool_router]
impl Counter {
#[allow(dead_code)]
pub fn new() -> Self {
Self {
counter: Arc::new(Mutex::new(0)),
tool_router: Self::tool_router(),
}
}
fn _create_resource_text(&self, uri: &str, name: &str) -> Resource {
RawResource::new(uri, name.to_string()).no_annotation()
}
#[tool(description = "Increment the counter by 1")]
async fn increment(&self) -> Result<CallToolResult, McpError> {
let mut counter = self.counter.lock().await;
*counter += 1;
Ok(CallToolResult::success(vec![Content::text(
counter.to_string(),
)]))
}
#[tool(description = "Decrement the counter by 1")]
async fn decrement(&self) -> Result<CallToolResult, McpError> {
let mut counter = self.counter.lock().await;
*counter -= 1;
Ok(CallToolResult::success(vec![Content::text(
counter.to_string(),
)]))
}
#[tool(description = "Get the current counter value")]
async fn get_value(&self) -> Result<CallToolResult, McpError> {
let counter = self.counter.lock().await;
Ok(CallToolResult::success(vec![Content::text(
counter.to_string(),
)]))
}
#[tool(description = "Say hello to the client")]
fn say_hello(&self) -> Result<CallToolResult, McpError> {
Ok(CallToolResult::success(vec![Content::text("hello")]))
}
#[tool(description = "Repeat what you say")]
fn echo(&self, Parameters(object): Parameters<JsonObject>) -> Result<CallToolResult, McpError> {
Ok(CallToolResult::success(vec![Content::text(
serde_json::Value::Object(object).to_string(),
)]))
}
#[tool(description = "Calculate the sum of two numbers")]
fn sum(
&self,
Parameters(StructRequest { a, b }): Parameters<StructRequest>,
) -> Result<CallToolResult, McpError> {
Ok(CallToolResult::success(vec![Content::text(
(a + b).to_string(),
)]))
}
}
#[tool_handler]
impl ServerHandler for Counter {
fn get_info(&self) -> ServerInfo {
ServerInfo {
protocol_version: ProtocolVersion::V_2024_11_05,
capabilities: ServerCapabilities::builder()
.enable_prompts()
.enable_resources()
.enable_tools()
.build(),
server_info: Implementation::from_build_env(),
instructions: Some("This server provides a counter tool that can increment and decrement values. The counter starts at 0 and can be modified using the 'increment' and 'decrement' tools. Use 'get_value' to check the current count.".to_string()),
}
}
async fn list_resources(
&self,
_request: Option<PaginatedRequestParam>,
_: RequestContext<RoleServer>,
) -> Result<ListResourcesResult, McpError> {
Ok(ListResourcesResult {
resources: vec![
self._create_resource_text("str:////Users/to/some/path/", "cwd"),
self._create_resource_text("memo://insights", "memo-name"),
],
next_cursor: None,
})
}
async fn read_resource(
&self,
ReadResourceRequestParam { uri }: ReadResourceRequestParam,
_: RequestContext<RoleServer>,
) -> Result<ReadResourceResult, McpError> {
match uri.as_str() {
"str:////Users/to/some/path/" => {
let cwd = "/Users/to/some/path/";
Ok(ReadResourceResult {
contents: vec![ResourceContents::text(cwd, uri)],
})
}
"memo://insights" => {
let memo = "Business Intelligence Memo\n\nAnalysis has revealed 5 key insights ...";
Ok(ReadResourceResult {
contents: vec![ResourceContents::text(memo, uri)],
})
}
_ => Err(McpError::resource_not_found(
"resource_not_found",
Some(json!({
"uri": uri
})),
)),
}
}
async fn list_prompts(
&self,
_request: Option<PaginatedRequestParam>,
_: RequestContext<RoleServer>,
) -> Result<ListPromptsResult, McpError> {
Ok(ListPromptsResult {
next_cursor: None,
prompts: vec![Prompt::new(
"example_prompt",
Some("This is an example prompt that takes one required argument, message"),
Some(vec![PromptArgument {
name: "message".to_string(),
description: Some("A message to put in the prompt".to_string()),
required: Some(true),
}]),
)],
})
}
async fn get_prompt(
&self,
GetPromptRequestParam { name, arguments }: GetPromptRequestParam,
_: RequestContext<RoleServer>,
) -> Result<GetPromptResult, McpError> {
match name.as_str() {
"example_prompt" => {
let message = arguments
.and_then(|json| json.get("message")?.as_str().map(|s| s.to_string()))
.ok_or_else(|| {
McpError::invalid_params("No message provided to example_prompt", None)
})?;
let prompt =
format!("This is an example prompt with your message here: '{message}'");
Ok(GetPromptResult {
description: None,
messages: vec![PromptMessage {
role: PromptMessageRole::User,
content: PromptMessageContent::text(prompt),
}],
})
}
_ => Err(McpError::invalid_params("prompt not found", None)),
}
}
async fn list_resource_templates(
&self,
_request: Option<PaginatedRequestParam>,
_: RequestContext<RoleServer>,
) -> Result<ListResourceTemplatesResult, McpError> {
Ok(ListResourceTemplatesResult {
next_cursor: None,
resource_templates: Vec::new(),
})
}
async fn initialize(
&self,
_request: InitializeRequestParam,
context: RequestContext<RoleServer>,
) -> Result<InitializeResult, McpError> {
if let Some(http_request_part) = context.extensions.get::<axum::http::request::Parts>() {
let initialize_headers = &http_request_part.headers;
let initialize_uri = &http_request_part.uri;
tracing::info!(?initialize_headers, %initialize_uri, "initialize from http server");
}
Ok(self.get_info())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "debug".to_string().into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let service = StreamableHttpService::new(
|| Ok(Counter::new()),
LocalSessionManager::default().into(),
Default::default(),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind(BIND_ADDRESS).await?;
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async { tokio::signal::ctrl_c().await.unwrap() })
.await;
Ok(())
}

Validation with schema
- MCP Server → Exposes tools with JSON Schema
- MCP Client → Converts schemas to LLM function calling format
- LLM → Sees function definitions with parameter schemas
- LLM → Calls functions with structured JSON parameters
- MCP → Validates parameters against your schema
The LLM uses this schema information to:
- Understand what parameters to send – It knows
sum
needsa
andb
integers - Structure its function calls properly – It tries to send the right JSON format
- Self-correct – If it gets an error, it can see what went wrong
But LLMs Aren’t Perfect
Despite seeing the schema, LLMs might still:
- Send strings instead of numbers (
"5"
vs5
) - Miss required parameters
- Send extra unexpected fields
- Make typos in parameter names