Source code for fakepy_mcp

import argparse
import base64
import inspect
import logging
import sys
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Tuple,
    Union,
    get_args,
    get_origin,
)

from fake import FAKER, PROVIDER_REGISTRY, FileSystemStorage
from fastmcp import FastMCP

__title__ = "fake-py-mcp"
__version__ = "0.3"
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2025 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "mcp",
    "get_return_type",
    "get_supported_params",
    "is_supported_type",
    "main",
    "register_fakepy_tools",
    "serialise_result",
    "server_info",
)
# ----------------------------------------------------------------------------
# Logging setup
# ----------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)


# ----------------------------------------------------------------------------
# MCP Server instance
# ----------------------------------------------------------------------------
mcp = FastMCP("fake.py MCP Server")
_tools_registered = False

# ----------------------------------------------------------------------------
# Helper: Type mapping for fake.py methods
# ----------------------------------------------------------------------------
PROVIDER_LIST = sorted(PROVIDER_REGISTRY["fake.Faker"])


[docs]def get_return_type(method: Callable) -> Any: """Infer return type for MCP schema from method docstring or name.""" name = method.__name__ if name in { "bmp", "docx", "eml", "epub", "gif", "jpg", "odt", "pdf", "png", "ppm", "rtf", "svg", "tar", "tif", "wav", "zip", }: return str # base64-encoded or filename if name.endswith("_file"): return str # file path if name in {"latitude_longitude"}: return List[float] if name in { "company_emails", "emails", "first_names", "free_emails", "last_names", "names", "paragraphs", "sentences", "slugs", "texts", "usernames", "words", }: return List[str] if name in {"uuid"}: return str if name in {"date"}: return str if name in {"date_time"}: return str if name in {"latitude", "longitude"}: return float if name in {"pybool"}: return bool if name in {"pyint", "year"}: return int return str
[docs]def serialise_result(name: str, result: Any) -> Any: """Serialise result for MCP transport.""" if name in { "bin", "bmp", "docx", "eml", "epub", "gif", "jpg", "odt", "pdf", "png", "ppm", "rtf", "svg", "tar", "tif", "wav", "zip", }: # If result is bytes, it's raw content -> base64 # If result is str, it's a filepath (storage is used) -> return path if isinstance(result, bytes): return base64.b64encode(result).decode("ascii") return result if name == "uuid": return str(result) if name == "date": return ( result.isoformat() if hasattr(result, "isoformat") else str(result) ) if name == "date_time": return ( result.isoformat() if hasattr(result, "isoformat") else str(result) ) if name == "latitude_longitude": return list(result) return result
# ---------------------------------------------------------------------------- # Parameter support helpers # ---------------------------------------------------------------------------- # Include complex container types _SUPPORTED_BASES = { bool, dict, Dict, float, int, list, List, str, tuple, Tuple, }
[docs]def is_supported_type(typ) -> bool: """Return True if typ is supported type, container, or Optional thereof.""" # Allow Any (often used for flexible dicts/lists) if typ is Any: return True # Check direct base types (e.g. int, List) if typ in _SUPPORTED_BASES: return True origin = get_origin(typ) # Handle Generics (e.g. List[str], Dict[str, Any]) if origin in _SUPPORTED_BASES: args = get_args(typ) # Recursively check inner types. # If no args (e.g. plain List), it's supported. if not args: return True return all(is_supported_type(arg) for arg in args) # Handle Union / Optional (e.g. Union[str, int], Optional[List[str]]) if origin is Union: args = get_args(typ) # Check all non-None arguments non_none_args = [arg for arg in args if arg is not type(None)] if not non_none_args: return True return all(is_supported_type(arg) for arg in non_none_args) return False
[docs]def get_supported_params(sig): """Return a list of (name, param) for supported parameters.""" supported = [] for name, param in sig.parameters.items(): # Exclude *args, **kwargs, and any named 'options' if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD): continue if name.lower() in {"options", "storage"}: # We explicitly skip 'storage' here because we inject it manually # if the server is configured with a storage root. continue if param.annotation is inspect.Parameter.empty: continue # skip untyped if not is_supported_type(param.annotation): continue supported.append((name, param)) return supported
# ---------------------------------------------------------------------------- # Dynamic tool registration (closure-safe) # ---------------------------------------------------------------------------- def _create_tool_wrapper( method, attr, return_type, doc, params, annotations, storage_backend=None ): """Factory to create a tool function with arguments.""" # Check if the underlying method accepts 'storage' method_sig = inspect.signature(method) accepts_storage = "storage" in method_sig.parameters # Build the function with the correct signature using closure def tool_fn(*args, **kwargs): # Map args to parameter names call_kwargs = {} for i, (name, param) in enumerate(params): if name in kwargs: call_kwargs[name] = kwargs[name] elif i < len(args): call_kwargs[name] = args[i] elif param.default is not inspect.Parameter.empty: call_kwargs[name] = param.default else: raise TypeError(f"Missing required argument: {name}") # Inject storage if available and method supports it if storage_backend and accepts_storage: call_kwargs["storage"] = storage_backend try: result = method(**call_kwargs) return serialise_result(attr, result) except Exception as err: LOGGER.error(f"Error in {attr}(): {err}") raise RuntimeError(f"fake.py error in {attr}(): {err}") from err # Set function metadata tool_fn.__name__ = attr tool_fn.__doc__ = doc tool_fn.__annotations__ = { **annotations, "return": return_type, } # Set signature to match the original method tool_fn.__signature__ = inspect.Signature( parameters=[param for _, param in params], return_annotation=return_type ) return tool_fn def _create_simple_wrapper( method, attr, return_type, doc, storage_backend=None ): """Factory to create a tool function without arguments.""" # Check if the underlying method accepts 'storage' method_sig = inspect.signature(method) accepts_storage = "storage" in method_sig.parameters def tool_fn(): call_kwargs = {} # Inject storage if available and method supports it if storage_backend and accepts_storage: call_kwargs["storage"] = storage_backend try: result = method(**call_kwargs) return serialise_result(attr, result) except Exception as err: LOGGER.error(f"Error in {attr}(): {err}") raise RuntimeError(f"fake.py error in {attr}(): {err}") from err tool_fn.__name__ = attr tool_fn.__doc__ = doc tool_fn.__annotations__ = {"return": return_type} return tool_fn
[docs]def register_fakepy_tools(storage_backend=None): """Dynamically register all FAKER methods as MCP tools with arg support.""" global _tools_registered if _tools_registered: return _tools_registered = True for attr in PROVIDER_LIST: if attr.startswith("_"): LOGGER.info(f"Skipping {attr}. Private methods not supported.") continue method = getattr(FAKER, attr) if not callable(method): LOGGER.info(f"Skipping {attr}. Not a callable.") continue sig = inspect.signature(method) params = get_supported_params(sig) return_type = get_return_type(method) doc = inspect.getdoc(method) or f"Fake.py: {attr}()" if params: annotations = {name: param.annotation for name, param in params} # Pass explicit arguments to the helper tool_fn = _create_tool_wrapper( method, attr, return_type, doc, params, annotations, storage_backend, ) mcp.tool(name=attr, description=doc)(tool_fn) else: # Pass explicit arguments to the helper tool_fn = _create_simple_wrapper( method, attr, return_type, doc, storage_backend ) mcp.tool(name=attr, description=doc)(tool_fn)
# ---------------------------------------------------------------------------- # Example: Server info tool # ---------------------------------------------------------------------------- @mcp.tool() def server_info() -> Dict[str, Any]: """ Get information about this MCP server and available fake.py tools. """ return { "server": "fake.py MCP Server", "tools": sorted([ attr for attr in PROVIDER_LIST if not attr.startswith("_") and callable(getattr(FAKER, attr)) ]), "docs": "https://github.com/barseghyanartur/fake.py" }
[docs]def main() -> None: parser = argparse.ArgumentParser( description="fake.py MCP Server — run in stdio or http mode." ) parser.add_argument( "mode", nargs="?", choices=["stdio", "http", "sse"], default="stdio", help="Transport mode: 'stdio' (default) or 'http'", ) parser.add_argument( "--host", default="0.0.0.0", help="Host for HTTP mode (default: 0.0.0.0)", ) parser.add_argument( "--port", type=int, default=8005, help="Port for HTTP mode (default: 8005)", ) parser.add_argument( "--storage-root", type=str, default=None, help="Path to storage root for file generation", ) args = parser.parse_args() # Initialise storage if root is provided storage_backend = None if args.storage_root: LOGGER.info(f"Initialising storage with root: {args.storage_root}") storage_backend = FileSystemStorage(root_path=args.storage_root) # Register tools with the configured storage backend register_fakepy_tools(storage_backend=storage_backend) if args.mode == "http": LOGGER.info( f"Starting MCP server in HTTP mode on {args.host}:{args.port}" ) mcp.run(transport="http", host=args.host, port=args.port) elif args.mode == "sse": LOGGER.info( f"Starting MCP server in SSE mode on {args.host}:{args.port}" ) mcp.run(transport="sse", host=args.host, port=args.port) else: LOGGER.info("Starting MCP server in STDIO mode") mcp.run()
if __name__ == "__main__": try: main() except KeyboardInterrupt: LOGGER.info("Server stopped by user") sys.exit(0)