Coverage for src/mlopus/mlflow/api/contract.py: 100%

119 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-07-13 14:49 +0000

1from abc import ABC, abstractmethod 

2from pathlib import Path 

3from typing import TypeVar, Callable, Iterator, Tuple, Mapping 

4 

5from mlopus.utils import pydantic, mongo, urls 

6from .common import schema, transfer 

7 

8A = TypeVar("A") # Any type 

9 

10# Types used by MLOpus 

11E = schema.Experiment 

12R = schema.Run 

13M = schema.Model 

14V = schema.ModelVersion 

15 

16# Identifier types 

17ExpIdentifier = E | str # Exp entity or exp ID 

18RunIdentifier = R | str # Run entity or run ID 

19ModelIdentifier = M | str # Model entity or model name 

20ModelVersionIdentifier = V | Tuple[str, str] # ModelVersion entity or tuple of (name, version) string 

21 

22 

23class MlflowApiContract(pydantic.BaseModel, ABC): 

24 """Declaration of all standard public methods for MLflow API classes.""" 

25 

26 @abstractmethod 

27 def clean_all_cache(self): 

28 """Clean all cached metadata and artifacts.""" 

29 

30 @abstractmethod 

31 def clean_temp_artifacts(self): 

32 """Clean temporary artifacts.""" 

33 

34 @abstractmethod 

35 def clean_cached_run_artifact(self, run: RunIdentifier, path_in_run: str = ""): 

36 """Clean cached artifacts for specified run.""" 

37 

38 @abstractmethod 

39 def clean_cached_model_artifact(self, model_version: ModelVersionIdentifier): 

40 """Clean cached artifacts for specified model version.""" 

41 

42 @abstractmethod 

43 def list_run_artifacts(self, run: RunIdentifier, path_in_run: str = "") -> transfer.LsResult: 

44 """List run artifacts in repo.""" 

45 

46 @abstractmethod 

47 def list_model_artifact(self, model_version: ModelVersionIdentifier, path_suffix: str = "") -> transfer.LsResult: 

48 """List model version artifacts in repo.""" 

49 

50 @abstractmethod 

51 def cache_run_artifact(self, run: RunIdentifier, path_in_run: str = "") -> Path: 

52 """Pull run artifacts from MLflow server to local cache.""" 

53 

54 @abstractmethod 

55 def cache_model_artifact(self, model_version: ModelVersionIdentifier) -> Path: 

56 """Pull model version artifacts from MLflow server to local cache.""" 

57 

58 @abstractmethod 

59 def get_run_artifact(self, run: RunIdentifier, path_in_run: str = "") -> Path: 

60 """Get local path to run artifacts.""" 

61 

62 @abstractmethod 

63 def get_model_artifact(self, model_version: ModelVersionIdentifier) -> Path: 

64 """Get local path to model artifacts.""" 

65 

66 @abstractmethod 

67 def place_run_artifact( 

68 self, 

69 run: RunIdentifier, 

70 target: Path, 

71 path_in_run: str = "", 

72 overwrite: bool = False, 

73 link: bool = True, 

74 ): 

75 """Place run artifacts on target path. 

76 

77 On online mode: Data is synced with the MLflow server. 

78 On offline mode: No guarantee that the data is current. 

79 """ 

80 

81 @abstractmethod 

82 def place_model_artifact( 

83 self, 

84 model_version: ModelVersionIdentifier, 

85 target: Path, 

86 overwrite: bool = False, 

87 link: bool = True, 

88 ): 

89 """Place model version artifacts on target path. 

90 

91 On online mode: Data is synced with the MLflow server. 

92 On offline mode: No guarantee that the data is current. 

93 """ 

94 

95 @abstractmethod 

96 def export_run_artifact( 

97 self, 

98 run: RunIdentifier, 

99 target: Path, 

100 path_in_run: str = "", 

101 ) -> Path: 

102 """Export run artifact cache to target path.""" 

103 

104 @abstractmethod 

105 def export_model_artifact( 

106 self, 

107 model_version: ModelVersionIdentifier, 

108 target: Path, 

109 ) -> Path: 

110 """Export model version artifact cache to target path.""" 

111 

112 @abstractmethod 

113 def load_run_artifact(self, run: RunIdentifier, loader: Callable[[Path], A], path_in_run: str = "") -> A: 

114 """Load run artifacts.""" 

115 

116 @abstractmethod 

117 def load_model_artifact(self, model_version: ModelVersionIdentifier, loader: Callable[[Path], A]) -> A: 

118 """Load model artifacts.""" 

119 

120 @abstractmethod 

121 def log_run_artifact( 

122 self, 

123 run: RunIdentifier, 

124 source: Path | Callable[[Path], None], 

125 path_in_run: str | None = None, 

126 keep_the_source: bool | None = None, 

127 allow_duplication: bool | None = None, 

128 use_cache: bool | None = None, 

129 ): 

130 """Publish artifact file or dir to experiment run.""" 

131 

132 @abstractmethod 

133 def log_model_version( 

134 self, 

135 model: ModelIdentifier, 

136 run: RunIdentifier, 

137 source: Path | Callable[[Path], None], 

138 path_in_run: str | None = None, 

139 keep_the_source: bool | None = None, 

140 allow_duplication: bool = False, 

141 use_cache: bool | None = None, 

142 version: str | None = None, 

143 tags: Mapping | None = None, 

144 ) -> V: 

145 """Publish artifact file or dir as model version inside the specified experiment run.""" 

146 

147 @abstractmethod 

148 def get_exp_url(self, exp: ExpIdentifier) -> str: 

149 """Get Experiment URL.""" 

150 

151 @abstractmethod 

152 def get_run_url(self, run: RunIdentifier, exp: ExpIdentifier | None = None) -> str: 

153 """Get Run URL.""" 

154 

155 @abstractmethod 

156 def get_model_url(self, model: ModelIdentifier) -> str: 

157 """Get URL to registered model.""" 

158 

159 @abstractmethod 

160 def get_model_version_url(self, model_version: ModelVersionIdentifier) -> str: 

161 """Get model version URL.""" 

162 

163 @abstractmethod 

164 def get_exp(self, exp: ExpIdentifier, **cache_opts: bool) -> E: 

165 """Get Experiment API by ID.""" 

166 

167 @abstractmethod 

168 def get_run(self, run: RunIdentifier, **cache_opts: bool) -> R: 

169 """Get Run API by ID.""" 

170 

171 @abstractmethod 

172 def get_model(self, model: ModelIdentifier, **cache_opts: bool) -> M: 

173 """Get Experiment API by ID.""" 

174 

175 @abstractmethod 

176 def get_model_version(self, model_version: ModelVersionIdentifier, **cache_opts: bool) -> V: 

177 """Get ModelVersion metadata by ID.""" 

178 

179 @abstractmethod 

180 def find_exps(self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None) -> Iterator[E]: 

181 """Search experiments with query in MongoDB query language.""" 

182 

183 @abstractmethod 

184 def find_runs(self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None) -> Iterator[R]: 

185 """Search runs with query in MongoDB query language.""" 

186 

187 @abstractmethod 

188 def find_models(self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None) -> Iterator[M]: 

189 """Search registered models with query in MongoDB query language.""" 

190 

191 @abstractmethod 

192 def find_model_versions( 

193 self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None 

194 ) -> Iterator[V]: 

195 """Search runs with query in MongoDB query language.""" 

196 

197 @pydantic.validate_arguments 

198 def find_child_runs(self, parent: RunIdentifier) -> Iterator[R]: 

199 """Find child runs.""" 

200 

201 @abstractmethod 

202 def cache_exp_meta(self, exp: ExpIdentifier) -> E: 

203 """Get latest Experiment metadata and save to local cache.""" 

204 

205 @abstractmethod 

206 def cache_run_meta(self, run: RunIdentifier) -> R: 

207 """Get latest Run metadata and save to local cache.""" 

208 

209 @abstractmethod 

210 def cache_model_meta(self, model: ModelIdentifier) -> M: 

211 """Get latest Model metadata and save to local cache.""" 

212 

213 @abstractmethod 

214 def cache_model_version_meta(self, model_version: ModelVersionIdentifier) -> V: 

215 """Get latest model version metadata and save to local cache.""" 

216 

217 @abstractmethod 

218 def export_exp_meta(self, exp: ExpIdentifier, target: Path) -> E: 

219 """Export experiment metadata cache to target.""" 

220 

221 @abstractmethod 

222 def export_run_meta(self, run: RunIdentifier, target: Path) -> R: 

223 """Export run metadata cache to target.""" 

224 

225 @abstractmethod 

226 def export_model_meta(self, model: ModelIdentifier, target: Path) -> M: 

227 """Export model metadata cache to target.""" 

228 

229 @abstractmethod 

230 def export_model_version_meta(self, mv: ModelVersionIdentifier, target: Path) -> V: 

231 """Export model version metadata cache to target.""" 

232 

233 @abstractmethod 

234 def create_exp(self, name: str, tags: Mapping | None = None) -> E: 

235 """Create Experiment and return its API.""" 

236 

237 @abstractmethod 

238 def get_or_create_exp(self, name: str) -> E: 

239 """Get or create Experiment and return its API.""" 

240 

241 @abstractmethod 

242 def create_model(self, name: str, tags: Mapping | None = None) -> M: 

243 """Create registered model and return its API.""" 

244 

245 @abstractmethod 

246 def create_run( 

247 self, 

248 exp: ExpIdentifier, 

249 name: str | None = None, 

250 tags: Mapping | None = None, 

251 repo: str | urls.Url | None = None, 

252 parent: RunIdentifier | None = None, 

253 ) -> R: 

254 """Declare a new experiment run to be used later.""" 

255 

256 @abstractmethod 

257 def start_run( 

258 self, 

259 exp: ExpIdentifier, 

260 name: str | None = None, 

261 tags: Mapping | None = None, 

262 repo: str | urls.Url | None = None, 

263 parent: RunIdentifier | None = None, 

264 ) -> R: 

265 """Start a new experiment run.""" 

266 

267 @abstractmethod 

268 def resume_run(self, run: RunIdentifier) -> R: 

269 """Resume a previous experiment run.""" 

270 

271 @abstractmethod 

272 def end_run(self, run: RunIdentifier, succeeded: bool = True) -> R: 

273 """End experiment run.""" 

274 

275 @abstractmethod 

276 def set_tags_on_exp(self, exp: ExpIdentifier, tags: Mapping): 

277 """Set tags on experiment.""" 

278 

279 @abstractmethod 

280 def set_tags_on_run(self, run: RunIdentifier, tags: Mapping): 

281 """Set tags on experiment run.""" 

282 

283 @abstractmethod 

284 def set_tags_on_model(self, model: ModelIdentifier, tags: Mapping): 

285 """Set tags on registered model.""" 

286 

287 @abstractmethod 

288 def set_tags_on_model_version(self, model_version: ModelVersionIdentifier, tags: Mapping): 

289 """Set tags on model version.""" 

290 

291 @abstractmethod 

292 def log_params(self, run: RunIdentifier, params: Mapping): 

293 """Log params to experiment run.""" 

294 

295 @abstractmethod 

296 def log_metrics(self, run: RunIdentifier, metrics: Mapping): 

297 """Log metrics to experiment run."""