Skip to content

API Reference

This page documents the HTTP API and provides auto-generated reference documentation for the main Python modules in the stuperml package.

HTTP API (stuperml.api)

The FastAPI application is defined in the stuperml.api module. It exposes endpoints for health checking and batch prediction.

Endpoints

GET /

  • Description: Simple health check endpoint.
  • Response: JSON object with keys:
  • message: HTTP status phrase (e.g. "OK").
  • status-code: Numeric status code (e.g. 200).

Example:

curl http://127.0.0.1:8000/

POST /predict

  • Description: Run batch inference on a list of input rows.
  • Request body (PredictionRequest):
  • rows: list of objects where each object is a mapping from feature name to value (bool, int, float, or str). Must contain at least one row.
  • Response body (PredictionResponse):
  • predictions: list of floating-point predictions, one per input row.

The request features must be compatible with the fitted preprocessor (i.e. same feature names and reasonable data types). If the data cannot be transformed, the endpoint responds with a 400 BAD REQUEST containing an error message.

On application startup, the API:

  1. Loads the preprocessor from data/preprocessor.joblib (path defined in configs.data_config).
  2. Infers the model input size either from preprocessor.get_feature_names_out() or, as a fallback, from data/feature_names.json.
  3. Instantiates SimpleMLP with the inferred input size.
  4. Loads weights from models/model.pth.

If any of the artifacts are missing, a FileNotFoundError or RuntimeError is raised during startup.

Python API (mkdocstrings)

The sections below are auto-generated from the docstrings in the stuperml package using mkdocstrings. They provide detailed reference information about public classes, functions, and modules.

Data

stuperml.data

MyDataset

Bases: Dataset

Source code in src/stuperml/data.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class MyDataset(Dataset):
    def __init__(
        self,
        split: str = "train",
        cfg: DataConfig = data_config,
    ) -> None:
        logger.debug(f"Initializing MyDataset with split='{split}' and cfg={cfg}")
        self.cfg = cfg
        self.split = split.lower()

        self.X: Optional[torch.Tensor] = None
        self.y: Optional[torch.Tensor] = None

        if self.split not in {"train", "val", "test"}:
            logger.error(f"Invalid split '{self.split}' provided.")
            raise ValueError("split must be one of: 'train', 'val', 'test'")
        logger.info(f"MyDataset initialized for split '{self.split}'.")

        x_path = self.cfg.data_folder / f"X_{self.split}.pt"
        y_path = self.cfg.data_folder / f"y_{self.split}.pt"
        if x_path.exists() and y_path.exists() and self._config_matches(self.cfg):
            logger.debug(f"Loading preprocessed tensors from {x_path} and {y_path}.")
            self.X = torch.load(x_path)
            self.y = torch.load(y_path)
            logger.info(f"Loaded {self.X.size(0)} samples for split '{self.split}'.")
        else:
            logger.warning(f"Preprocessed data not found at {x_path} and {y_path}. Call preprocess() first.")

    def __len__(self) -> int:
        if self.X is None:
            raise RuntimeError("Dataset not initialized with preprocessed tensors.")
        return int(self.X.shape[0])

    def __getitem__(self, index: int):
        if self.X is None or self.y is None:
            raise RuntimeError("Dataset not initialized with preprocessed tensors.")
        return self.X[index], self.y[index]

    def preprocess(self) -> None:
        logger.debug("Starting data preprocessing")
        self.cfg.data_folder.mkdir(parents=True, exist_ok=True)
        logger.info(f"Data folder created/verified: {self.cfg.data_folder}")

        if self.cfg.gcs_uri:
            logger.debug("Using GCS data source")
            if (
                self.cfg.gcs_service_account_key
                and "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ
                and os.path.exists(self.cfg.gcs_service_account_key)
            ):
                os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cfg.gcs_service_account_key
            elif self.cfg.gcs_service_account_key and not os.path.exists(self.cfg.gcs_service_account_key):
                print(
                    f"Notice: Service account key '{self.cfg.gcs_service_account_key}' not found. "
                    "Assuming we are running in Cloud Run/Environment with auto-auth."
                )

            gcs_uri = self.cfg.gcs_uri
            if self.cfg.gcs_data:
                gcs_uri = f"{gcs_uri.rstrip('/')}/{self.cfg.gcs_data}"
            try:
                csv_path = _download_csv_from_gcs(gcs_uri, self.cfg.data_folder)
                logger.info(f"Downloaded CSV from GCS: {gcs_uri}")
            except Exception as e:
                logger.error(f"Failed to download CSV from GCS: {e}")
                raise
        else:
            logger.debug("Using Kaggle data source")
            csv_path = _download_csv("ankushnarwade/ai-impact-on-student-performance")
            logger.info(f"Downloaded CSV from Kaggle dataset: {csv_path}")
        df = pd.read_csv(csv_path)
        logger.debug(f"CSV loaded into DataFrame with shape {df.shape}")

        if self.cfg.target_col not in df.columns:
            logger.error(f"Target column '{self.cfg.target_col}' not found. Available columns: {list(df.columns)}")
            raise KeyError(f"Target column '{self.cfg.target_col}' not found. Columns: {list(df.columns)}")

        dropped = self.cfg.dropped_columns
        train_size = float(self.cfg.train_size)
        val_size = float(self.cfg.val_size)
        test_size = float(self.cfg.test_size)
        seed = int(self.cfg.seed)

        _validate_splits(train_size, val_size, test_size)
        y_np = df[self.cfg.target_col].to_numpy()
        X_df = df.drop(columns=[self.cfg.target_col, *dropped], errors="ignore")

        pre = _build_preprocessor()
        X_np = pre.fit_transform(X_df)

        try:
            feat_names = pre.get_feature_names_out().tolist()
        except Exception:
            feat_names = []

        (X_train, y_train), (X_val, y_val), (X_test, y_test) = _split_data(
            X_np, y_np, train_size, val_size, test_size, seed
        )

        torch.save(_to_tensor(X_train), self.cfg.data_folder / "X_train.pt")
        torch.save(_to_tensor(X_val), self.cfg.data_folder / "X_val.pt")
        torch.save(_to_tensor(X_test), self.cfg.data_folder / "X_test.pt")

        torch.save(_to_tensor(y_train), self.cfg.data_folder / "y_train.pt")
        torch.save(_to_tensor(y_val), self.cfg.data_folder / "y_val.pt")
        torch.save(_to_tensor(y_test), self.cfg.data_folder / "y_test.pt")

        (self.cfg.data_folder / "feature_names.json").write_text(json.dumps(feat_names))
        joblib.dump(pre, self.cfg.data_folder / "preprocessor.joblib")
        self._write_config(self.cfg)
        logger.info("Preprocessing complete - data splits and preprocessor saved.")

    def _serialize_config(self, cfg: DataConfig) -> dict[str, object]:
        """Return a JSON-serializable snapshot of config for equality checks."""
        config = asdict(cfg)
        config["data_folder"] = str(cfg.data_folder)
        config["dropped_columns"] = sorted(cfg.dropped_columns)
        return config

    def _write_config(self, cfg: DataConfig) -> None:
        """Persist data configuration used to generate artifacts."""
        config_path = cfg.data_folder / "data_config.json"
        config_path.write_text(json.dumps(self._serialize_config(cfg), sort_keys=True))

    def _config_matches(self, cfg: DataConfig) -> bool:
        """Check whether on-disk configuration matches the current config."""
        config_path = cfg.data_folder / "data_config.json"
        if not config_path.exists():
            return False
        stored = json.loads(config_path.read_text())
        current = self._serialize_config(cfg)
        return stored == current

    def _ensure_preprocessed(self) -> None:
        """Ensure preprocessing artifacts exist and match the current config."""
        required = [
            self.cfg.data_folder / "X_train.pt",
            self.cfg.data_folder / "X_val.pt",
            self.cfg.data_folder / "X_test.pt",
            self.cfg.data_folder / "y_train.pt",
            self.cfg.data_folder / "y_val.pt",
            self.cfg.data_folder / "y_test.pt",
            self.cfg.data_folder / "feature_names.json",
            self.cfg.data_folder / "preprocessor.joblib",
        ]
        if not all(path.exists() for path in required) or not self._config_matches(self.cfg):
            logger.info("Preprocessing artifacts missing or config changed; regenerating.")
            self.X = None
            self.y = None
            self.preprocess()

    def load_data(self) -> tuple[TensorDataset, TensorDataset, TensorDataset]:
        self._ensure_preprocessed()
        data_dir: Path
        data_dir = self.cfg.data_folder

        train_features = torch.load(data_dir / "X_train.pt")
        train_target = torch.load(data_dir / "y_train.pt")

        val_features = torch.load(data_dir / "X_val.pt")
        val_target = torch.load(data_dir / "y_val.pt")

        test_features = torch.load(data_dir / "X_test.pt")
        test_target = torch.load(data_dir / "y_test.pt")

        train_set = TensorDataset(train_features, train_target)
        val_set = TensorDataset(val_features, val_target)
        test_set = TensorDataset(test_features, test_target)

        return train_set, val_set, test_set

Models

stuperml.model

Training

stuperml.train

train

train(
    lr: float = 0.001,
    batch_size: int = 32,
    epochs: int = 30,
    verbose: bool = False,
) -> None

Train the model and persist artifacts.

Source code in src/stuperml/train.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def train(lr: float = 1e-3, batch_size: int = 32, epochs: int = 30, verbose: bool = False) -> None:
    """Train the model and persist artifacts."""
    print("Training day and night")

    timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    print(f"Run ID: {timestamp}")

    print(f"{lr=}, {batch_size=}, {epochs=}")

    train_set, val_set, _ = MyDataset(cfg=data_config).load_data()
    n_features = train_set.tensors[0].shape[1]

    model = SimpleMLP(input_size=n_features).to(DEVICE)
    train_dataloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val_set, batch_size=batch_size)

    loss_fn = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    statistics = {"train_loss": [], "val_loss": []}

    for epoch in range(epochs):
        model.train()
        epoch_train_loss = 0.0
        for index, (features, target) in enumerate(train_dataloader):
            features, target = features.to(DEVICE), target.to(DEVICE)
            target = target.view(-1, 1).float()

            optimizer.zero_grad()
            y_pred = model(features)
            loss = loss_fn(y_pred, target)
            loss.backward()
            optimizer.step()

            epoch_train_loss += loss.item()
            if verbose:
                if index % 10 == 0:
                    print(f"Epoch {epoch}, iter {index}, \t train_loss: {loss.item():.5f}")

        model.eval()
        epoch_val_loss = 0.0
        with torch.no_grad():
            for features, target in val_dataloader:
                features, target = features.to(DEVICE), target.to(DEVICE)
                target = target.view(-1, 1).float()
                y_pred = model(features)
                v_loss = loss_fn(y_pred, target)
                epoch_val_loss += v_loss.item()

        avg_train = epoch_train_loss / len(train_dataloader)
        avg_val = epoch_val_loss / len(val_dataloader)
        statistics["train_loss"].append(avg_train)
        statistics["val_loss"].append(avg_val)

        print(f"Epoch {epoch} \t Summary: Train Loss: {avg_train:.5f}, \t Val Loss: {avg_val:.5f}")

    print("Training complete")

    MODEL_DIR.mkdir(parents=True, exist_ok=True)
    model_path = MODEL_DIR / f"model_{timestamp}.pth"
    torch.save(model.state_dict(), model_path)
    print("Saved locally.")

    gcs_models_uri = os.getenv("AIP_MODEL_DIR") or data_config.gcs_models_uri
    if gcs_models_uri:
        if (
            data_config.gcs_service_account_key
            and "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ
            and os.path.exists(data_config.gcs_service_account_key)
        ):
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = data_config.gcs_service_account_key
        _upload_model_artifacts(model_path, gcs_models_uri, timestamp)

    plt.figure(figsize=(10, 5))
    plt.plot(statistics["train_loss"], label="Train Loss")
    plt.plot(statistics["val_loss"], label="Val Loss")
    plt.title("Training and Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.savefig("src/stuperml/figures/training_validation_epoch_error.png")

Evaluation

stuperml.evaluate

HTTP API

stuperml.api

PredictionRequest

Bases: BaseModel

Request payload for batch prediction.

Source code in src/stuperml/api.py
42
43
44
45
class PredictionRequest(BaseModel):
    """Request payload for batch prediction."""

    rows: list[dict[str, bool | int | float | str]] = Field(..., min_length=1)

PredictionResponse

Bases: BaseModel

Response payload for batch prediction.

Source code in src/stuperml/api.py
48
49
50
51
class PredictionResponse(BaseModel):
    """Response payload for batch prediction."""

    predictions: list[float]

lifespan async

lifespan(_: FastAPI)

Load and release model artifacts for app lifecycle.

Source code in src/stuperml/api.py
25
26
27
28
29
30
31
32
33
34
@asynccontextmanager
async def lifespan(_: FastAPI):
    """Load and release model artifacts for app lifecycle."""
    global _model, _preprocessor
    _model, _preprocessor = _load_model()
    try:
        yield
    finally:
        _model = None
        _preprocessor = None

predict

predict(request: PredictionRequest) -> PredictionResponse

Run batch inference on input rows.

Source code in src/stuperml/api.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest) -> PredictionResponse:
    """Run batch inference on input rows."""
    if _model is None or _preprocessor is None:
        raise HTTPException(status_code=HTTPStatus.SERVICE_UNAVAILABLE, detail="Model not loaded")

    features_df = pd.DataFrame(request.rows)
    try:
        transformed = _preprocessor.transform(features_df)
    except Exception as exc:
        raise HTTPException(status_code=HTTPStatus.BAD_REQUEST, detail=str(exc)) from exc

    tensor = torch.as_tensor(transformed, dtype=torch.float32)
    with torch.no_grad():
        outputs = _model(tensor).squeeze(1).tolist()

    return PredictionResponse(predictions=[float(value) for value in outputs])

root

root() -> dict[str, Any]

Health check.

Source code in src/stuperml/api.py
77
78
79
80
@app.get("/")
def root() -> dict[str, Any]:
    """Health check."""
    return {"message": HTTPStatus.OK.phrase, "status-code": HTTPStatus.OK}