|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
HTTP server wrapper for Environment instances. |
|
|
|
|
|
This module provides utilities to wrap any Environment subclass and expose it |
|
|
over HTTP endpoints that HTTPEnvClient can consume. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
from dataclasses import asdict |
|
|
from typing import Any, Dict, Type |
|
|
|
|
|
from .interfaces import Environment |
|
|
from .types import Action, Observation |
|
|
from fastapi import Body, FastAPI |
|
|
|
|
|
class HTTPEnvServer: |
|
|
""" |
|
|
HTTP server wrapper for Environment instances. |
|
|
|
|
|
This class wraps an Environment and exposes its reset(), step(), and state |
|
|
methods as HTTP endpoints compatible with HTTPEnvClient. |
|
|
|
|
|
The server expects: |
|
|
- Action deserialization: Converts JSON dict to Action subclass |
|
|
- Observation serialization: Converts Observation subclass to JSON dict |
|
|
|
|
|
Example: |
|
|
>>> from core.env_server import HTTPEnvServer |
|
|
>>> from envs.coding_env.server import CodeExecutionEnvironment |
|
|
>>> |
|
|
>>> env = CodeExecutionEnvironment() |
|
|
>>> server = HTTPEnvServer(env) |
|
|
>>> |
|
|
>>> # Register routes with FastAPI |
|
|
>>> from fastapi import FastAPI |
|
|
>>> app = FastAPI() |
|
|
>>> server.register_routes(app) |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
env: Environment, |
|
|
action_cls: Type[Action], |
|
|
observation_cls: Type[Observation], |
|
|
): |
|
|
""" |
|
|
Initialize HTTP server wrapper. |
|
|
|
|
|
Args: |
|
|
env: The Environment instance to wrap |
|
|
action_cls: The Action subclass this environment expects |
|
|
observation_cls: The Observation subclass this environment returns |
|
|
""" |
|
|
self.env = env |
|
|
self.action_cls = action_cls |
|
|
self.observation_cls = observation_cls |
|
|
|
|
|
def register_routes(self, app: Any) -> None: |
|
|
""" |
|
|
Register HTTP routes on a FastAPI application. |
|
|
|
|
|
Args: |
|
|
app: FastAPI application instance |
|
|
""" |
|
|
|
|
|
if not isinstance(app, FastAPI): |
|
|
raise TypeError("app must be a FastAPI instance") |
|
|
|
|
|
@app.post("/reset") |
|
|
async def reset(request: Dict[str, Any] = Body(default={})) -> Dict[str, Any]: |
|
|
"""Reset endpoint - returns initial observation.""" |
|
|
|
|
|
observation = self.env.reset() |
|
|
return self._serialize_observation(observation) |
|
|
|
|
|
@app.post("/step") |
|
|
async def step(request: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Step endpoint - executes action and returns observation.""" |
|
|
action_data = request.get("action", {}) |
|
|
|
|
|
|
|
|
|
|
|
action = self._deserialize_action(action_data) |
|
|
|
|
|
|
|
|
observation = self.env.step(action) |
|
|
|
|
|
|
|
|
return self._serialize_observation(observation) |
|
|
|
|
|
@app.get("/state") |
|
|
async def get_state() -> Dict[str, Any]: |
|
|
"""State endpoint - returns current environment state.""" |
|
|
state = self.env.state |
|
|
return asdict(state) |
|
|
|
|
|
@app.get("/health") |
|
|
async def health() -> Dict[str, str]: |
|
|
"""Health check endpoint.""" |
|
|
return {"status": "healthy"} |
|
|
|
|
|
|
|
|
def _deserialize_action(self, action_data: Dict[str, Any]) -> Action: |
|
|
""" |
|
|
Convert JSON dict to Action instance. |
|
|
|
|
|
Args: |
|
|
action_data: Dictionary containing action data |
|
|
|
|
|
Returns: |
|
|
Action instance |
|
|
|
|
|
Note: |
|
|
This is a simple implementation. Subclasses may need to override |
|
|
for more complex deserialization logic. |
|
|
""" |
|
|
|
|
|
metadata = action_data.pop("metadata", {}) |
|
|
action = self.action_cls(**action_data) |
|
|
action.metadata = metadata |
|
|
return action |
|
|
|
|
|
def _serialize_observation(self, observation: Observation) -> Dict[str, Any]: |
|
|
""" |
|
|
Convert Observation instance to JSON-compatible dict. |
|
|
|
|
|
Args: |
|
|
observation: Observation instance |
|
|
|
|
|
Returns: |
|
|
Dictionary compatible with HTTPEnvClient._parse_result() |
|
|
|
|
|
The format matches what HTTPEnvClient expects: |
|
|
{ |
|
|
"observation": {...}, # Observation fields |
|
|
"reward": float | None, |
|
|
"done": bool, |
|
|
} |
|
|
""" |
|
|
obs_dict = asdict(observation) |
|
|
|
|
|
|
|
|
reward = obs_dict.pop("reward", None) |
|
|
done = obs_dict.pop("done", False) |
|
|
obs_dict.pop("metadata", None) |
|
|
|
|
|
|
|
|
return { |
|
|
"observation": obs_dict, |
|
|
"reward": reward, |
|
|
"done": done, |
|
|
} |
|
|
|
|
|
def create_app( |
|
|
env: Environment, |
|
|
action_cls: Type[Action], |
|
|
observation_cls: Type[Observation], |
|
|
env_name: Optional[str] = None, |
|
|
) -> Any: |
|
|
""" |
|
|
Create a FastAPI application with or without web interface. |
|
|
|
|
|
This function creates a FastAPI app with the web interface enabled by default, |
|
|
including README integration for better user experience. |
|
|
|
|
|
Args: |
|
|
env: The Environment instance to serve |
|
|
action_cls: The Action subclass this environment expects |
|
|
observation_cls: The Observation subclass this environment returns |
|
|
env_name: Optional environment name for README loading |
|
|
|
|
|
Returns: |
|
|
FastAPI application instance with or without web interface and README integration |
|
|
""" |
|
|
|
|
|
|
|
|
enable_web = ( |
|
|
os.getenv("ENABLE_WEB_INTERFACE", "false").lower() in ("true", "1", "yes") |
|
|
) |
|
|
|
|
|
if enable_web: |
|
|
|
|
|
from .web_interface import create_web_interface_app |
|
|
return create_web_interface_app(env, action_cls, observation_cls, env_name) |
|
|
else: |
|
|
|
|
|
return create_fastapi_app(env, action_cls, observation_cls) |
|
|
|
|
|
|
|
|
def create_fastapi_app( |
|
|
env: Environment, |
|
|
action_cls: Type[Action], |
|
|
observation_cls: Type[Observation], |
|
|
) -> Any: |
|
|
""" |
|
|
Create a FastAPI application with routes for the given environment. |
|
|
|
|
|
Args: |
|
|
env: The Environment instance to serve |
|
|
action_cls: The Action subclass this environment expects |
|
|
observation_cls: The Observation subclass this environment returns |
|
|
|
|
|
Returns: |
|
|
FastAPI application instance with routes registered |
|
|
|
|
|
Example: |
|
|
>>> from envs.coding_env.server import CodeExecutionEnvironment |
|
|
>>> from envs.coding_env.models import CodeAction, CodeObservation |
|
|
>>> |
|
|
>>> env = CodeExecutionEnvironment() |
|
|
>>> app = create_fastapi_app(env, CodeAction, CodeObservation) |
|
|
>>> |
|
|
>>> # Run with: uvicorn module:app --host 0.0.0.0 --port 8000 |
|
|
""" |
|
|
try: |
|
|
from fastapi import FastAPI |
|
|
except ImportError: |
|
|
raise ImportError( |
|
|
"FastAPI is required. Install with: pip install fastapi uvicorn" |
|
|
) |
|
|
|
|
|
app = FastAPI(title="Environment HTTP Server") |
|
|
server = HTTPEnvServer(env, action_cls, observation_cls) |
|
|
server.register_routes(app) |
|
|
return app |
|
|
|