import argparse
import base64
import inspect
import logging
import sys
from typing import (
Any,
Callable,
Dict,
List,
Tuple,
Union,
get_args,
get_origin,
)
from fake import FAKER, PROVIDER_REGISTRY, FileSystemStorage
from fastmcp import FastMCP
__title__ = "fake-py-mcp"
__version__ = "0.3"
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2025 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
"mcp",
"get_return_type",
"get_supported_params",
"is_supported_type",
"main",
"register_fakepy_tools",
"serialise_result",
"server_info",
)
# ----------------------------------------------------------------------------
# Logging setup
# ----------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# MCP Server instance
# ----------------------------------------------------------------------------
mcp = FastMCP("fake.py MCP Server")
_tools_registered = False
# ----------------------------------------------------------------------------
# Helper: Type mapping for fake.py methods
# ----------------------------------------------------------------------------
PROVIDER_LIST = sorted(PROVIDER_REGISTRY["fake.Faker"])
[docs]def get_return_type(method: Callable) -> Any:
"""Infer return type for MCP schema from method docstring or name."""
name = method.__name__
if name in {
"bmp",
"docx",
"eml",
"epub",
"gif",
"jpg",
"odt",
"pdf",
"png",
"ppm",
"rtf",
"svg",
"tar",
"tif",
"wav",
"zip",
}:
return str # base64-encoded or filename
if name.endswith("_file"):
return str # file path
if name in {"latitude_longitude"}:
return List[float]
if name in {
"company_emails",
"emails",
"first_names",
"free_emails",
"last_names",
"names",
"paragraphs",
"sentences",
"slugs",
"texts",
"usernames",
"words",
}:
return List[str]
if name in {"uuid"}:
return str
if name in {"date"}:
return str
if name in {"date_time"}:
return str
if name in {"latitude", "longitude"}:
return float
if name in {"pybool"}:
return bool
if name in {"pyint", "year"}:
return int
return str
[docs]def serialise_result(name: str, result: Any) -> Any:
"""Serialise result for MCP transport."""
if name in {
"bin",
"bmp",
"docx",
"eml",
"epub",
"gif",
"jpg",
"odt",
"pdf",
"png",
"ppm",
"rtf",
"svg",
"tar",
"tif",
"wav",
"zip",
}:
# If result is bytes, it's raw content -> base64
# If result is str, it's a filepath (storage is used) -> return path
if isinstance(result, bytes):
return base64.b64encode(result).decode("ascii")
return result
if name == "uuid":
return str(result)
if name == "date":
return (
result.isoformat() if hasattr(result, "isoformat") else str(result)
)
if name == "date_time":
return (
result.isoformat() if hasattr(result, "isoformat") else str(result)
)
if name == "latitude_longitude":
return list(result)
return result
# ----------------------------------------------------------------------------
# Parameter support helpers
# ----------------------------------------------------------------------------
# Include complex container types
_SUPPORTED_BASES = {
bool,
dict,
Dict,
float,
int,
list,
List,
str,
tuple,
Tuple,
}
[docs]def is_supported_type(typ) -> bool:
"""Return True if typ is supported type, container, or Optional thereof."""
# Allow Any (often used for flexible dicts/lists)
if typ is Any:
return True
# Check direct base types (e.g. int, List)
if typ in _SUPPORTED_BASES:
return True
origin = get_origin(typ)
# Handle Generics (e.g. List[str], Dict[str, Any])
if origin in _SUPPORTED_BASES:
args = get_args(typ)
# Recursively check inner types.
# If no args (e.g. plain List), it's supported.
if not args:
return True
return all(is_supported_type(arg) for arg in args)
# Handle Union / Optional (e.g. Union[str, int], Optional[List[str]])
if origin is Union:
args = get_args(typ)
# Check all non-None arguments
non_none_args = [arg for arg in args if arg is not type(None)]
if not non_none_args:
return True
return all(is_supported_type(arg) for arg in non_none_args)
return False
[docs]def get_supported_params(sig):
"""Return a list of (name, param) for supported parameters."""
supported = []
for name, param in sig.parameters.items():
# Exclude *args, **kwargs, and any named 'options'
if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
continue
if name.lower() in {"options", "storage"}:
# We explicitly skip 'storage' here because we inject it manually
# if the server is configured with a storage root.
continue
if param.annotation is inspect.Parameter.empty:
continue # skip untyped
if not is_supported_type(param.annotation):
continue
supported.append((name, param))
return supported
# ----------------------------------------------------------------------------
# Dynamic tool registration (closure-safe)
# ----------------------------------------------------------------------------
def _create_tool_wrapper(
method, attr, return_type, doc, params, annotations, storage_backend=None
):
"""Factory to create a tool function with arguments."""
# Check if the underlying method accepts 'storage'
method_sig = inspect.signature(method)
accepts_storage = "storage" in method_sig.parameters
# Build the function with the correct signature using closure
def tool_fn(*args, **kwargs):
# Map args to parameter names
call_kwargs = {}
for i, (name, param) in enumerate(params):
if name in kwargs:
call_kwargs[name] = kwargs[name]
elif i < len(args):
call_kwargs[name] = args[i]
elif param.default is not inspect.Parameter.empty:
call_kwargs[name] = param.default
else:
raise TypeError(f"Missing required argument: {name}")
# Inject storage if available and method supports it
if storage_backend and accepts_storage:
call_kwargs["storage"] = storage_backend
try:
result = method(**call_kwargs)
return serialise_result(attr, result)
except Exception as err:
LOGGER.error(f"Error in {attr}(): {err}")
raise RuntimeError(f"fake.py error in {attr}(): {err}") from err
# Set function metadata
tool_fn.__name__ = attr
tool_fn.__doc__ = doc
tool_fn.__annotations__ = {
**annotations,
"return": return_type,
}
# Set signature to match the original method
tool_fn.__signature__ = inspect.Signature(
parameters=[param for _, param in params],
return_annotation=return_type
)
return tool_fn
def _create_simple_wrapper(
method, attr, return_type, doc, storage_backend=None
):
"""Factory to create a tool function without arguments."""
# Check if the underlying method accepts 'storage'
method_sig = inspect.signature(method)
accepts_storage = "storage" in method_sig.parameters
def tool_fn():
call_kwargs = {}
# Inject storage if available and method supports it
if storage_backend and accepts_storage:
call_kwargs["storage"] = storage_backend
try:
result = method(**call_kwargs)
return serialise_result(attr, result)
except Exception as err:
LOGGER.error(f"Error in {attr}(): {err}")
raise RuntimeError(f"fake.py error in {attr}(): {err}") from err
tool_fn.__name__ = attr
tool_fn.__doc__ = doc
tool_fn.__annotations__ = {"return": return_type}
return tool_fn
# ----------------------------------------------------------------------------
# Example: Server info tool
# ----------------------------------------------------------------------------
@mcp.tool()
def server_info() -> Dict[str, Any]:
"""
Get information about this MCP server and available fake.py tools.
"""
return {
"server": "fake.py MCP Server",
"tools": sorted([
attr for attr in PROVIDER_LIST
if not attr.startswith("_") and callable(getattr(FAKER, attr))
]),
"docs": "https://github.com/barseghyanartur/fake.py"
}
[docs]def main() -> None:
parser = argparse.ArgumentParser(
description="fake.py MCP Server — run in stdio or http mode."
)
parser.add_argument(
"mode",
nargs="?",
choices=["stdio", "http", "sse"],
default="stdio",
help="Transport mode: 'stdio' (default) or 'http'",
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="Host for HTTP mode (default: 0.0.0.0)",
)
parser.add_argument(
"--port",
type=int,
default=8005,
help="Port for HTTP mode (default: 8005)",
)
parser.add_argument(
"--storage-root",
type=str,
default=None,
help="Path to storage root for file generation",
)
args = parser.parse_args()
# Initialise storage if root is provided
storage_backend = None
if args.storage_root:
LOGGER.info(f"Initialising storage with root: {args.storage_root}")
storage_backend = FileSystemStorage(root_path=args.storage_root)
# Register tools with the configured storage backend
register_fakepy_tools(storage_backend=storage_backend)
if args.mode == "http":
LOGGER.info(
f"Starting MCP server in HTTP mode on {args.host}:{args.port}"
)
mcp.run(transport="http", host=args.host, port=args.port)
elif args.mode == "sse":
LOGGER.info(
f"Starting MCP server in SSE mode on {args.host}:{args.port}"
)
mcp.run(transport="sse", host=args.host, port=args.port)
else:
LOGGER.info("Starting MCP server in STDIO mode")
mcp.run()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
LOGGER.info("Server stopped by user")
sys.exit(0)