-
Notifications
You must be signed in to change notification settings - Fork 460
Updated MCP server service with web wrapper #2298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…sued to query CISO Assistant This is an updated MCP service wrapper that can be used to connect the Claude Desktop to CISO Assistant and query content to create meaningful reports or analyses. It includes the following 12 components that can be run. get_risk_scenarios get_applied_controls get_audits_progress get_evidence get_evidence_by_id get_evidence_with_ids review_evidence_by_id review_all_evidence get_policies get_policies_with_ids review_policy_by_id review_all_policies
WalkthroughThree new files are introduced to support a FastAPI-based MCP server for the CISO Assistant. These include detailed setup documentation, a requirements file listing necessary Python dependencies, and a comprehensive FastAPI application ( Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant ClaudeDesktop
participant FastAPI_Server
participant CISO_API
User->>ClaudeDesktop: Configure MCP server connection
ClaudeDesktop->>FastAPI_Server: Connect via /sse or /messages
FastAPI_Server->>ClaudeDesktop: Send connection notification, keepalive pings
ClaudeDesktop->>FastAPI_Server: POST method request (e.g., tools/call)
FastAPI_Server->>CISO_API: HTTP GET/POST (with token)
CISO_API-->>FastAPI_Server: API response
FastAPI_Server-->>ClaudeDesktop: Tool result (markdown, review, etc.)
ClaudeDesktop->>User: Display results
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🧹 Nitpick comments (5)
cli/fastapi_requirements.txt (1)
2-3
: Consider removing redundant HTTP client libraryBoth
httpx
(async) andrequests
(sync) are included. Since the FastAPI app uses async methods, consider standardizing onhttpx
and removingrequests
.cli/fastapi.md (2)
31-31
: Fix hard tab characterReplace the hard tab with spaces for consistent formatting.
Apply this diff:
- "http://xyzhost:3443/sse", - "--allow-http" + "http://xyzhost:3443/sse", + "--allow-http"
37-37
: Format bare URL as codeWrap the URL in backticks for better formatting.
Apply this diff:
-Where http://xyzhost:3443 is the hostname of your CISO Assistant server and the Port defined in your .mcp.env file. +Where `http://xyzhost:3443` is the hostname of your CISO Assistant server and the Port defined in your `.mcp.env` file.cli/fastapi_wrapper.py (2)
1-16
: Remove unused importThe
requests
module is imported but never used in the code. Onlyhttpx
is used for HTTP requests.Apply this diff:
import httpx -import requests import os
1491-1494
: Consider making the bind address configurableBinding to
0.0.0.0
exposes the service on all network interfaces. This might not be desirable in all deployment scenarios.Make the host configurable:
if __name__ == "__main__": port = int(os.getenv('PORT', 8000)) + host = os.getenv('HOST', '0.0.0.0') logger.info(f"Starting server on port {port}") - uvicorn.run(app, host="0.0.0.0", port=port) + uvicorn.run(app, host=host, port=port)For production deployments, consider:
- Running behind a reverse proxy (nginx, traefik)
- Enabling HTTPS at the proxy level
- Implementing proper logging and monitoring
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cli/fastapi.md
(1 hunks)cli/fastapi_requirements.txt
(1 hunks)cli/fastapi_wrapper.py
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
cli/fastapi_wrapper.py (2)
backend/core/views.py (1)
attachment
(4086-4111)backend/ebios_rm/models.py (1)
name
(854-859)
🪛 markdownlint-cli2 (0.17.2)
cli/fastapi.md
31-31: Hard tabs
Column: 1
(MD010, no-hard-tabs)
37-37: Bare URL used
(MD034, no-bare-urls)
🔇 Additional comments (2)
cli/fastapi.md (2)
30-32
: Security consideration: Review the use of --allow-http flagThe
--allow-http
flag disables SSL/TLS verification. This should only be used in development environments. For production, use HTTPS with proper certificates.Consider adding a security warning in the documentation about this flag.
9-9
: Missing reference documentationThe instructions reference "mcp.md" for token generation instructions, but this file is not provided in the PR. Ensure this documentation exists or update the reference.
fastapi | ||
httpx | ||
requests | ||
python-dotenv | ||
pydantic | ||
uvicorn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Pin package versions for security and reproducibility
Installing packages without version constraints can lead to:
- Security vulnerabilities from unvetted updates
- Compatibility issues between team members
- Non-reproducible builds
Consider pinning all packages to specific versions.
Apply this diff to add version constraints:
-fastapi
-httpx
-requests
-python-dotenv
-pydantic
-uvicorn
+fastapi==0.104.1
+httpx==0.25.2
+requests==2.31.0
+python-dotenv==1.0.0
+pydantic==2.5.2
+uvicorn==0.24.0
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In cli/fastapi_requirements.txt at lines 1 to 6, the packages are listed without
version constraints, which can cause security and compatibility issues. Update
each package entry to specify exact version numbers by researching the latest
stable versions or the versions currently used in the project. This will ensure
reproducible builds and reduce risks from unvetted updates.
"code": -32603, | ||
"message": f"Internal error: {str(e)}" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sanitize error messages to prevent information disclosure
Error messages include full exception details which could reveal sensitive system information.
Log detailed errors but return generic messages to clients:
"error": {
"code": -32603,
- "message": f"Internal error: {str(e)}"
+ "message": "Internal server error"
}
Also applies to: 1467-1469
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py at lines 1339 to 1341 and similarly at lines 1467 to
1469, the error messages returned to clients include full exception details,
risking sensitive information disclosure. Modify the code to log the detailed
exception internally for debugging but return a generic error message like
"Internal server error" to the client instead of the full exception string.
async with httpx.AsyncClient(verify=VERIFY_CERTIFICATE) as client: | ||
response = await client.get(url, headers=headers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add timeout configuration for HTTP requests
HTTP requests without timeouts can hang indefinitely, causing resource exhaustion.
Add timeout configuration:
-async with httpx.AsyncClient(verify=VERIFY_CERTIFICATE) as client:
+async with httpx.AsyncClient(verify=VERIFY_CERTIFICATE, timeout=30.0) as client:
Also applies to: 250-251, 308-309, 366-367
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py at lines 192-193, 250-251, 308-309, and 366-367, the
httpx.AsyncClient calls lack timeout settings, which can cause requests to hang
indefinitely. Modify each AsyncClient instantiation to include a timeout
parameter with an appropriate value to ensure requests fail gracefully if they
take too long.
async def review_policy_by_id(self, policy_id: str): | ||
"""Review a specific policy by ID including policy document if available""" | ||
headers = {"Authorization": f"Token {TOKEN}"} | ||
url = f"{API_URL}/policies/{policy_id}/" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add UUID validation for ID parameters
Methods accepting evidence_id
and policy_id
parameters should validate they are valid UUIDs to prevent injection attacks.
Add validation at the beginning of these methods:
import uuid
# In each method accepting an ID:
try:
uuid.UUID(evidence_id) # or policy_id
except ValueError:
return {
"content": [
{
"type": "text",
"text": f"Invalid ID format. Please provide a valid UUID."
}
]
}
Also applies to: 691-695, 749-753
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py around lines 360 to 364, add validation for the
policy_id parameter to ensure it is a valid UUID before proceeding. Import the
uuid module and wrap the UUID conversion in a try-except block; if a ValueError
is raised, return a response indicating the ID format is invalid. Apply the same
validation pattern to methods handling evidence_id and policy_id at lines
691-695 and 749-753 as well.
# Get raw JSON to handle different ID types | ||
body = await request.body() | ||
message_data = json.loads(body.decode('utf-8')) | ||
|
||
logger.info(f"Received message: {message_data}") | ||
|
||
method = message_data.get('method') | ||
message_id = message_data.get('id') | ||
params = message_data.get('params', {}) | ||
|
||
if method == 'initialize': | ||
# Return server capabilities | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"result": { | ||
"protocolVersion": "2024-11-05", | ||
"capabilities": { | ||
"tools": { | ||
"listChanged": True | ||
} | ||
}, | ||
"serverInfo": { | ||
"name": "ciso-assistant", | ||
"version": "1.0.0" | ||
} | ||
} | ||
} | ||
logger.info(f"Sending initialize response: {response}") | ||
return JSONResponse(content=response) | ||
|
||
elif method == 'tools/list': | ||
# Return available tools | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"result": { | ||
"tools": mcp_wrapper.tools | ||
} | ||
} | ||
logger.info(f"Sending tools list: {response}") | ||
return JSONResponse(content=response) | ||
|
||
elif method == 'tools/call': | ||
# Execute tool | ||
tool_name = params.get('name') | ||
tool_arguments = params.get('arguments', {}) | ||
logger.info(f"Executing tool: {tool_name} with arguments: {tool_arguments}") | ||
|
||
if hasattr(mcp_wrapper, tool_name): | ||
# Execute the corresponding method | ||
method_func = getattr(mcp_wrapper, tool_name) | ||
|
||
# Check if the method requires arguments | ||
if tool_arguments: | ||
result = await method_func(**tool_arguments) | ||
else: | ||
result = await method_func() | ||
|
||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"result": result | ||
} | ||
logger.info(f"Tool execution result: {response}") | ||
return JSONResponse(content=response) | ||
else: | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"error": { | ||
"code": -32601, | ||
"message": f"Unknown tool: {tool_name}" | ||
} | ||
} | ||
return JSONResponse(content=response) | ||
|
||
else: | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"error": { | ||
"code": -32601, | ||
"message": f"Unknown method: {method}" | ||
} | ||
} | ||
return JSONResponse(content=response) | ||
|
||
except Exception as e: | ||
logger.error(f"Error handling message: {str(e)}") | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_data.get("id") if 'message_data' in locals() else None, | ||
"error": { | ||
"code": -32603, | ||
"message": f"Internal error: {str(e)}" | ||
} | ||
} | ||
return JSONResponse(content=response, status_code=500) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add authentication to protect endpoints
The API endpoints have no authentication mechanism, allowing anyone to access CISO Assistant data.
Consider implementing:
- API key authentication for the MCP endpoints
- Token validation before processing requests
- Rate limiting to prevent abuse
Example middleware:
from fastapi import Security, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
if credentials.credentials != os.getenv("MCP_API_KEY"):
raise HTTPException(status_code=403, detail="Invalid authentication")
return credentials.credentials
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py around lines 1241 to 1471, the API endpoints lack
authentication, exposing sensitive data. Implement API key authentication by
adding a security dependency using FastAPI's HTTPBearer scheme. Create a token
verification function that checks the provided token against an environment
variable (e.g., MCP_API_KEY) and raises an HTTP 403 error if invalid. Apply this
verification as a dependency to the MCP endpoints (/sse POST and /messages POST)
to ensure only authorized requests are processed. Optionally, consider adding
rate limiting middleware separately to prevent abuse.
# Get raw JSON to handle different ID types | ||
body = await request.body() | ||
message_data = json.loads(body.decode('utf-8')) | ||
|
||
logger.info(f"Received SSE POST message: {message_data}") | ||
|
||
method = message_data.get('method') | ||
message_id = message_data.get('id') | ||
params = message_data.get('params', {}) | ||
|
||
if method == 'initialize': | ||
# Return server capabilities | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"result": { | ||
"protocolVersion": "2024-11-05", | ||
"capabilities": { | ||
"tools": { | ||
"listChanged": True | ||
} | ||
}, | ||
"serverInfo": { | ||
"name": "ciso-assistant", | ||
"version": "1.0.0" | ||
} | ||
} | ||
} | ||
logger.info(f"Sending initialize response: {response}") | ||
return JSONResponse(content=response) | ||
|
||
elif method == 'tools/list': | ||
# Return available tools | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"result": { | ||
"tools": mcp_wrapper.tools | ||
} | ||
} | ||
logger.info(f"Sending tools list: {response}") | ||
return JSONResponse(content=response) | ||
|
||
elif method == 'tools/call': | ||
# Execute tool | ||
tool_name = params.get('name') | ||
tool_arguments = params.get('arguments', {}) | ||
logger.info(f"Executing tool: {tool_name} with arguments: {tool_arguments}") | ||
|
||
if hasattr(mcp_wrapper, tool_name): | ||
# Execute the corresponding method | ||
method_func = getattr(mcp_wrapper, tool_name) | ||
|
||
# Check if the method requires arguments | ||
if tool_arguments: | ||
result = await method_func(**tool_arguments) | ||
else: | ||
result = await method_func() | ||
|
||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"result": result | ||
} | ||
logger.info(f"Tool execution result: {response}") | ||
return JSONResponse(content=response) | ||
else: | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"error": { | ||
"code": -32601, | ||
"message": f"Unknown tool: {tool_name}" | ||
} | ||
} | ||
return JSONResponse(content=response) | ||
|
||
else: | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_id, | ||
"error": { | ||
"code": -32601, | ||
"message": f"Unknown method: {method}" | ||
} | ||
} | ||
return JSONResponse(content=response) | ||
|
||
except Exception as e: | ||
logger.error(f"Error handling SSE POST message: {str(e)}") | ||
response = { | ||
"jsonrpc": "2.0", | ||
"id": message_data.get("id") if 'message_data' in locals() else None, | ||
"error": { | ||
"code": -32603, | ||
"message": f"Internal error: {str(e)}" | ||
} | ||
} | ||
return JSONResponse(content=response, status_code=500) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Eliminate code duplication between endpoints
The /sse
POST and /messages
POST endpoints have identical logic. This violates DRY principles.
Extract the common logic into a shared method:
async def handle_mcp_message(message_data: dict):
"""Common handler for MCP protocol messages"""
method = message_data.get('method')
message_id = message_data.get('id')
params = message_data.get('params', {})
# ... rest of the logic ...
@app.post("/sse")
async def sse_post_endpoint(request: Request):
body = await request.body()
message_data = json.loads(body.decode('utf-8'))
return await handle_mcp_message(message_data)
@app.post("/messages")
async def handle_message(request: Request):
body = await request.body()
message_data = json.loads(body.decode('utf-8'))
return await handle_mcp_message(message_data)
Also applies to: 1368-1471
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py around lines 1241 to 1344 and also 1368 to 1471, the
POST endpoints for /sse and /messages contain duplicated logic for handling MCP
protocol messages. To fix this, extract the shared message processing code into
a new async function, e.g., handle_mcp_message(message_data: dict), that
implements the core logic based on the method and params. Then refactor both
endpoint handlers to parse the request body into message_data and call this
shared function, returning its result. This eliminates duplication and
centralizes the MCP message handling logic.
app.add_middleware( | ||
CORSMiddleware, | ||
allow_origins=["*"], | ||
allow_credentials=True, | ||
allow_methods=["GET", "POST", "OPTIONS"], | ||
allow_headers=["*"], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Restrict CORS origins for production security
Allowing all origins (*
) in CORS configuration is a security risk in production environments.
Consider restricting to specific allowed origins:
app.add_middleware(
CORSMiddleware,
- allow_origins=["*"],
+ allow_origins=os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(","),
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
)
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py around lines 28 to 34, the CORS middleware is
currently configured to allow all origins with allow_origins=["*"], which is
insecure for production. Update allow_origins to a list of specific, trusted
domain URLs that should be permitted to access the API, replacing the wildcard.
This restricts cross-origin requests to only those origins you explicitly trust.
content_type = doc_response.headers.get('content-type', '').lower() | ||
|
||
if 'text' in content_type or 'json' in content_type: | ||
return doc_response.text[:2000] # Limit to first 2000 chars |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add content size limit for text preview
Fetching unlimited text content could cause memory issues.
Limit the response size:
if 'text' in content_type or 'json' in content_type:
- return doc_response.text[:2000] # Limit to first 2000 chars
+ # Read only first 10KB to prevent memory issues
+ content = doc_response.content[:10240].decode('utf-8', errors='ignore')
+ return content[:2000] # Limit to first 2000 chars
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py at line 1036, the code returns the full text content
from doc_response.text which can lead to memory issues if the content is very
large. Modify the code to limit the returned text to the first 2000 characters
to prevent excessive memory usage. This can be done by slicing the text string
to doc_response.text[:2000] before returning it.
# Handle relative URLs | ||
if file_url.startswith('/'): | ||
file_url = f"{API_URL.rstrip('/')}{file_url}" | ||
elif not file_url.startswith('http'): | ||
file_url = f"{API_URL.rstrip('/')}/{file_url}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Sanitize URL construction to prevent path traversal
The URL construction logic could be vulnerable to path traversal attacks if the attachment URLs contain malicious paths.
Use proper URL joining:
from urllib.parse import urljoin
# Replace the URL construction logic:
if file_url.startswith('/'):
file_url = urljoin(API_URL, file_url)
elif not file_url.startswith('http'):
file_url = urljoin(API_URL + '/', file_url)
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py around lines 1025 to 1030, the current URL
construction uses string concatenation which can lead to path traversal
vulnerabilities. Replace the manual string concatenation with the urljoin
function from urllib.parse to safely join the base API_URL and the file_url.
Import urljoin at the top and update the if-else block to use urljoin for both
cases to ensure proper URL sanitization.
API_URL = os.getenv("API_URL", "") | ||
TOKEN = os.getenv("TOKEN", "") | ||
VERIFY_CERTIFICATE = os.getenv("VERIFY_CERTIFICATE", "true").lower() in ( | ||
"true", "1", "yes", "on" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add URL validation and security warnings
The API_URL is used without validation, and SSL verification can be disabled, both presenting security risks.
Add URL validation and logging:
# Configuration
API_URL = os.getenv("API_URL", "")
TOKEN = os.getenv("TOKEN", "")
VERIFY_CERTIFICATE = os.getenv("VERIFY_CERTIFICATE", "true").lower() in (
"true", "1", "yes", "on"
)
+
+# Validate API URL
+if not API_URL:
+ logger.error("API_URL environment variable is not set")
+elif not API_URL.startswith(("http://", "https://")):
+ logger.error("API_URL must start with http:// or https://")
+
+if not VERIFY_CERTIFICATE:
+ logger.warning("SSL certificate verification is disabled. This is insecure for production use.")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
API_URL = os.getenv("API_URL", "") | |
TOKEN = os.getenv("TOKEN", "") | |
VERIFY_CERTIFICATE = os.getenv("VERIFY_CERTIFICATE", "true").lower() in ( | |
"true", "1", "yes", "on" | |
) | |
# Configuration | |
API_URL = os.getenv("API_URL", "") | |
TOKEN = os.getenv("TOKEN", "") | |
VERIFY_CERTIFICATE = os.getenv("VERIFY_CERTIFICATE", "true").lower() in ( | |
"true", "1", "yes", "on" | |
) | |
# Validate API URL | |
if not API_URL: | |
logger.error("API_URL environment variable is not set") | |
elif not API_URL.startswith(("http://", "https://")): | |
logger.error("API_URL must start with http:// or https://") | |
if not VERIFY_CERTIFICATE: | |
logger.warning("SSL certificate verification is disabled. This is insecure for production use.") |
🤖 Prompt for AI Agents
In cli/fastapi_wrapper.py around lines 37 to 42, the API_URL environment
variable is used without validation and SSL verification can be disabled, which
poses security risks. Add code to validate that API_URL is a well-formed URL
before use, and if VERIFY_CERTIFICATE is set to disable SSL verification, log a
security warning to alert users about the potential risk. This ensures safer
configuration handling and better visibility of insecure settings.
This is an updated MCP server service using Fastapi to allow connectivity to CISO Assistant from compatible MCP applications such as Claude Desktop.
This builds upon the original MCP implementation originally added, adding additional functionality that can be used to generate report or analyses.
Summary by CodeRabbit
New Features
Documentation
Chores