Coverage for src/mlopus/mlflow/api/contract.py: 100%
119 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-13 14:49 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-13 14:49 +0000
1from abc import ABC, abstractmethod
2from pathlib import Path
3from typing import TypeVar, Callable, Iterator, Tuple, Mapping
5from mlopus.utils import pydantic, mongo, urls
6from .common import schema, transfer
8A = TypeVar("A") # Any type
10# Types used by MLOpus
11E = schema.Experiment
12R = schema.Run
13M = schema.Model
14V = schema.ModelVersion
16# Identifier types
17ExpIdentifier = E | str # Exp entity or exp ID
18RunIdentifier = R | str # Run entity or run ID
19ModelIdentifier = M | str # Model entity or model name
20ModelVersionIdentifier = V | Tuple[str, str] # ModelVersion entity or tuple of (name, version) string
23class MlflowApiContract(pydantic.BaseModel, ABC):
24 """Declaration of all standard public methods for MLflow API classes."""
26 @abstractmethod
27 def clean_all_cache(self):
28 """Clean all cached metadata and artifacts."""
30 @abstractmethod
31 def clean_temp_artifacts(self):
32 """Clean temporary artifacts."""
34 @abstractmethod
35 def clean_cached_run_artifact(self, run: RunIdentifier, path_in_run: str = ""):
36 """Clean cached artifacts for specified run."""
38 @abstractmethod
39 def clean_cached_model_artifact(self, model_version: ModelVersionIdentifier):
40 """Clean cached artifacts for specified model version."""
42 @abstractmethod
43 def list_run_artifacts(self, run: RunIdentifier, path_in_run: str = "") -> transfer.LsResult:
44 """List run artifacts in repo."""
46 @abstractmethod
47 def list_model_artifact(self, model_version: ModelVersionIdentifier, path_suffix: str = "") -> transfer.LsResult:
48 """List model version artifacts in repo."""
50 @abstractmethod
51 def cache_run_artifact(self, run: RunIdentifier, path_in_run: str = "") -> Path:
52 """Pull run artifacts from MLflow server to local cache."""
54 @abstractmethod
55 def cache_model_artifact(self, model_version: ModelVersionIdentifier) -> Path:
56 """Pull model version artifacts from MLflow server to local cache."""
58 @abstractmethod
59 def get_run_artifact(self, run: RunIdentifier, path_in_run: str = "") -> Path:
60 """Get local path to run artifacts."""
62 @abstractmethod
63 def get_model_artifact(self, model_version: ModelVersionIdentifier) -> Path:
64 """Get local path to model artifacts."""
66 @abstractmethod
67 def place_run_artifact(
68 self,
69 run: RunIdentifier,
70 target: Path,
71 path_in_run: str = "",
72 overwrite: bool = False,
73 link: bool = True,
74 ):
75 """Place run artifacts on target path.
77 On online mode: Data is synced with the MLflow server.
78 On offline mode: No guarantee that the data is current.
79 """
81 @abstractmethod
82 def place_model_artifact(
83 self,
84 model_version: ModelVersionIdentifier,
85 target: Path,
86 overwrite: bool = False,
87 link: bool = True,
88 ):
89 """Place model version artifacts on target path.
91 On online mode: Data is synced with the MLflow server.
92 On offline mode: No guarantee that the data is current.
93 """
95 @abstractmethod
96 def export_run_artifact(
97 self,
98 run: RunIdentifier,
99 target: Path,
100 path_in_run: str = "",
101 ) -> Path:
102 """Export run artifact cache to target path."""
104 @abstractmethod
105 def export_model_artifact(
106 self,
107 model_version: ModelVersionIdentifier,
108 target: Path,
109 ) -> Path:
110 """Export model version artifact cache to target path."""
112 @abstractmethod
113 def load_run_artifact(self, run: RunIdentifier, loader: Callable[[Path], A], path_in_run: str = "") -> A:
114 """Load run artifacts."""
116 @abstractmethod
117 def load_model_artifact(self, model_version: ModelVersionIdentifier, loader: Callable[[Path], A]) -> A:
118 """Load model artifacts."""
120 @abstractmethod
121 def log_run_artifact(
122 self,
123 run: RunIdentifier,
124 source: Path | Callable[[Path], None],
125 path_in_run: str | None = None,
126 keep_the_source: bool | None = None,
127 allow_duplication: bool | None = None,
128 use_cache: bool | None = None,
129 ):
130 """Publish artifact file or dir to experiment run."""
132 @abstractmethod
133 def log_model_version(
134 self,
135 model: ModelIdentifier,
136 run: RunIdentifier,
137 source: Path | Callable[[Path], None],
138 path_in_run: str | None = None,
139 keep_the_source: bool | None = None,
140 allow_duplication: bool = False,
141 use_cache: bool | None = None,
142 version: str | None = None,
143 tags: Mapping | None = None,
144 ) -> V:
145 """Publish artifact file or dir as model version inside the specified experiment run."""
147 @abstractmethod
148 def get_exp_url(self, exp: ExpIdentifier) -> str:
149 """Get Experiment URL."""
151 @abstractmethod
152 def get_run_url(self, run: RunIdentifier, exp: ExpIdentifier | None = None) -> str:
153 """Get Run URL."""
155 @abstractmethod
156 def get_model_url(self, model: ModelIdentifier) -> str:
157 """Get URL to registered model."""
159 @abstractmethod
160 def get_model_version_url(self, model_version: ModelVersionIdentifier) -> str:
161 """Get model version URL."""
163 @abstractmethod
164 def get_exp(self, exp: ExpIdentifier, **cache_opts: bool) -> E:
165 """Get Experiment API by ID."""
167 @abstractmethod
168 def get_run(self, run: RunIdentifier, **cache_opts: bool) -> R:
169 """Get Run API by ID."""
171 @abstractmethod
172 def get_model(self, model: ModelIdentifier, **cache_opts: bool) -> M:
173 """Get Experiment API by ID."""
175 @abstractmethod
176 def get_model_version(self, model_version: ModelVersionIdentifier, **cache_opts: bool) -> V:
177 """Get ModelVersion metadata by ID."""
179 @abstractmethod
180 def find_exps(self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None) -> Iterator[E]:
181 """Search experiments with query in MongoDB query language."""
183 @abstractmethod
184 def find_runs(self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None) -> Iterator[R]:
185 """Search runs with query in MongoDB query language."""
187 @abstractmethod
188 def find_models(self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None) -> Iterator[M]:
189 """Search registered models with query in MongoDB query language."""
191 @abstractmethod
192 def find_model_versions(
193 self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None
194 ) -> Iterator[V]:
195 """Search runs with query in MongoDB query language."""
197 @pydantic.validate_arguments
198 def find_child_runs(self, parent: RunIdentifier) -> Iterator[R]:
199 """Find child runs."""
201 @abstractmethod
202 def cache_exp_meta(self, exp: ExpIdentifier) -> E:
203 """Get latest Experiment metadata and save to local cache."""
205 @abstractmethod
206 def cache_run_meta(self, run: RunIdentifier) -> R:
207 """Get latest Run metadata and save to local cache."""
209 @abstractmethod
210 def cache_model_meta(self, model: ModelIdentifier) -> M:
211 """Get latest Model metadata and save to local cache."""
213 @abstractmethod
214 def cache_model_version_meta(self, model_version: ModelVersionIdentifier) -> V:
215 """Get latest model version metadata and save to local cache."""
217 @abstractmethod
218 def export_exp_meta(self, exp: ExpIdentifier, target: Path) -> E:
219 """Export experiment metadata cache to target."""
221 @abstractmethod
222 def export_run_meta(self, run: RunIdentifier, target: Path) -> R:
223 """Export run metadata cache to target."""
225 @abstractmethod
226 def export_model_meta(self, model: ModelIdentifier, target: Path) -> M:
227 """Export model metadata cache to target."""
229 @abstractmethod
230 def export_model_version_meta(self, mv: ModelVersionIdentifier, target: Path) -> V:
231 """Export model version metadata cache to target."""
233 @abstractmethod
234 def create_exp(self, name: str, tags: Mapping | None = None) -> E:
235 """Create Experiment and return its API."""
237 @abstractmethod
238 def get_or_create_exp(self, name: str) -> E:
239 """Get or create Experiment and return its API."""
241 @abstractmethod
242 def create_model(self, name: str, tags: Mapping | None = None) -> M:
243 """Create registered model and return its API."""
245 @abstractmethod
246 def create_run(
247 self,
248 exp: ExpIdentifier,
249 name: str | None = None,
250 tags: Mapping | None = None,
251 repo: str | urls.Url | None = None,
252 parent: RunIdentifier | None = None,
253 ) -> R:
254 """Declare a new experiment run to be used later."""
256 @abstractmethod
257 def start_run(
258 self,
259 exp: ExpIdentifier,
260 name: str | None = None,
261 tags: Mapping | None = None,
262 repo: str | urls.Url | None = None,
263 parent: RunIdentifier | None = None,
264 ) -> R:
265 """Start a new experiment run."""
267 @abstractmethod
268 def resume_run(self, run: RunIdentifier) -> R:
269 """Resume a previous experiment run."""
271 @abstractmethod
272 def end_run(self, run: RunIdentifier, succeeded: bool = True) -> R:
273 """End experiment run."""
275 @abstractmethod
276 def set_tags_on_exp(self, exp: ExpIdentifier, tags: Mapping):
277 """Set tags on experiment."""
279 @abstractmethod
280 def set_tags_on_run(self, run: RunIdentifier, tags: Mapping):
281 """Set tags on experiment run."""
283 @abstractmethod
284 def set_tags_on_model(self, model: ModelIdentifier, tags: Mapping):
285 """Set tags on registered model."""
287 @abstractmethod
288 def set_tags_on_model_version(self, model_version: ModelVersionIdentifier, tags: Mapping):
289 """Set tags on model version."""
291 @abstractmethod
292 def log_params(self, run: RunIdentifier, params: Mapping):
293 """Log params to experiment run."""
295 @abstractmethod
296 def log_metrics(self, run: RunIdentifier, metrics: Mapping):
297 """Log metrics to experiment run."""