Coverage for src/mlopus/mlflow/api/run.py: 87%

94 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-07-13 14:49 +0000

1import typing 

2from pathlib import Path 

3from typing import Callable, TypeVar, Iterator, Mapping 

4 

5from mlopus.utils import dicts, pydantic, mongo, urls 

6from . import entity, contract 

7from .common import schema, decorators, transfer 

8from .mv import ModelVersionApi 

9 

10A = TypeVar("A") # Any type 

11 

12M = schema.ModelVersion 

13 

14ModelIdentifier = contract.ModelIdentifier 

15 

16 

17class RunApi(schema.Run, entity.EntityApi): 

18 """Run metadata with MLflow API handle.""" 

19 

20 def __init__(self, **kwargs): 

21 super().__init__(**kwargs) 

22 from .exp import ExpApi 

23 

24 self.exp: ExpApi = ExpApi(**self.exp) 

25 

26 def using(self, api: contract.MlflowApiContract) -> "RunApi": 

27 super().using(api) 

28 self.exp.using(api) 

29 return self 

30 

31 def _get_latest_data(self) -> schema.Run: 

32 """Get latest data for this entity. Used for self update after methods with the `require_update` decorator.""" 

33 return self.api.get_run(self) 

34 

35 def __enter__(self): 

36 """Executed upon entering a `with` block.""" 

37 return self 

38 

39 def __exit__(self, exc_type, exc_val, exc_tb): 

40 """Executed upon exiting a `with` block.""" 

41 self.end_run(succeeded := exc_type is None) 

42 return succeeded 

43 

44 @property 

45 def url(self) -> str: 

46 """The URL to this experiment run.""" 

47 return self.api.get_run_url(self, self.exp) 

48 

49 @pydantic.validate_arguments 

50 def clean_cached_artifact(self, path_in_run: str = "") -> "RunApi": 

51 """Clean cached artifact for this run. 

52 

53 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.clean_cached_run_artifact.path_in_run` 

54 """ 

55 self.api.clean_cached_run_artifact(self, path_in_run) 

56 return self 

57 

58 @pydantic.validate_arguments 

59 def list_artifacts(self, path_in_run: str = "") -> transfer.LsResult: 

60 """List artifacts in this run's artifact repo. 

61 

62 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.list_run_artifacts.path_in_run` 

63 """ 

64 return self.api.list_run_artifacts(self, path_in_run) 

65 

66 @pydantic.validate_arguments 

67 def cache_artifact(self, path_in_run: str = "") -> Path: 

68 """Pull run artifact from MLflow server to local cache. 

69 

70 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.cache_run_artifact.path_in_run` 

71 """ 

72 return self.api.cache_run_artifact(self, path_in_run) 

73 

74 @pydantic.validate_arguments 

75 def export_artifact(self, target: Path, path_in_run: str = "") -> Path: 

76 """Export run artifact cache to target. 

77 

78 See also: 

79 - :meth:`mlopus.mlflow.BaseMlflowApi.export_run_artifact` 

80 

81 :param target: Cache export path. 

82 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.export_run_artifact.path_in_run` 

83 """ 

84 return self.api.export_run_artifact(self, target, path_in_run) 

85 

86 @pydantic.validate_arguments 

87 def get_artifact(self, path_in_run: str = "") -> Path: 

88 """Get local path to run artifact. 

89 

90 See also: 

91 - :meth:`mlopus.mlflow.BaseMlflowApi.get_run_artifact` 

92 

93 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.get_run_artifact.path_in_run` 

94 """ 

95 return self.api.get_run_artifact(self, path_in_run) 

96 

97 @pydantic.validate_arguments 

98 def place_artifact( 

99 self, target: Path, path_in_run: str = "", overwrite: bool = False, link: bool = True 

100 ) -> "RunApi": 

101 """Place run artifact on target path. 

102 

103 See also: 

104 - :meth:`mlopus.mlflow.BaseMlflowApi.place_run_artifact` 

105 

106 :param target: Target path. 

107 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.place_run_artifact.path_in_run` 

108 :param overwrite: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.place_run_artifact.overwrite` 

109 :param link: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.place_run_artifact.link` 

110 """ 

111 self.api.place_run_artifact(self, target, path_in_run, overwrite, link) 

112 return self 

113 

114 @pydantic.validate_arguments 

115 def load_artifact(self, loader: Callable[[Path], A], path_in_run: str = "") -> A: 

116 """Load run artifact. 

117 

118 See also: 

119 - :meth:`mlopus.mlflow.BaseMlflowApi.load_run_artifact` 

120 

121 :param loader: Loader callback. 

122 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.load_run_artifact.path_in_run` 

123 """ 

124 return self.api.load_run_artifact(self, loader, path_in_run) 

125 

126 @pydantic.validate_arguments 

127 def log_artifact( 

128 self, 

129 source: Path | Callable[[Path], None], 

130 path_in_run: str | None = None, 

131 keep_the_source: bool | None = None, 

132 allow_duplication: bool | None = None, 

133 use_cache: bool | None = None, 

134 ) -> "RunApi": 

135 """Publish artifact file or dir to this experiment run. 

136 

137 See also: 

138 - :meth:`mlopus.mlflow.BaseMlflowApi.log_run_artifact` 

139 

140 :param source: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.source` 

141 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.path_in_run` 

142 :param keep_the_source: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.keep_the_source` 

143 :param allow_duplication: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.allow_duplication` 

144 :param use_cache: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.use_cache` 

145 """ 

146 self.api.log_run_artifact(self, source, path_in_run, keep_the_source, allow_duplication, use_cache) 

147 return self 

148 

149 @pydantic.validate_arguments 

150 def log_model_version( 

151 self, 

152 model: ModelIdentifier, 

153 source: Path | Callable[[Path], None], 

154 path_in_run: str | None = None, 

155 keep_the_source: bool | None = None, 

156 allow_duplication: bool = False, 

157 use_cache: bool | None = None, 

158 version: str | None = None, 

159 tags: Mapping | None = None, 

160 ) -> ModelVersionApi: 

161 """Publish artifact file or dir as model version inside this experiment run. 

162 

163 :param model: | Model name or object. 

164 

165 :param source: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.source` 

166 

167 :param path_in_run: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_model_version.path_in_run` 

168 

169 :param keep_the_source: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.keep_the_source` 

170 

171 :param allow_duplication: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.allow_duplication` 

172 

173 :param use_cache: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_run_artifact.use_cache` 

174 

175 :param version: | See :paramref:`~mlopus.mlflow.BaseMlflowApi.log_model_version.version` 

176 

177 :param tags: | Model version tags. 

178 | See :class:`schema.ModelVersion.tags` 

179 

180 :return: New model version metadata with API handle. 

181 """ 

182 mv = self.api.log_model_version( 

183 model, self, source, path_in_run, keep_the_source, allow_duplication, use_cache, version, tags 

184 ) 

185 return typing.cast(ModelVersionApi, mv) 

186 

187 @pydantic.validate_arguments 

188 def find_model_versions( 

189 self, query: mongo.Query | None = None, sorting: mongo.Sorting | None = None 

190 ) -> Iterator[ModelVersionApi]: 

191 """Search model versions belonging to this run with query in MongoDB query language. 

192 

193 :param query: Query in MongoDB query language. 

194 :param sorting: Sorting criteria (e.g.: `[("asc_field", 1), ("desc_field", -1)]`). 

195 """ 

196 results = self.api.find_model_versions(dicts.set_reserved_key(query, key="run.id", val=self.id), sorting) 

197 return typing.cast(Iterator[ModelVersionApi], results) 

198 

199 def cache_meta(self) -> "RunApi": 

200 """Fetch latest metadata of this run and save to cache.""" 

201 return self._use_values_from(self.api.cache_run_meta(self)) 

202 

203 def export_meta(self, target: Path) -> "RunApi": 

204 """Export metadata cache for this run. 

205 

206 :param target: Cache export path.. 

207 """ 

208 return self._use_values_from(self.api.export_run_meta(self, target)) 

209 

210 @pydantic.validate_arguments 

211 def create_child( 

212 self, 

213 name: str | None = None, 

214 tags: Mapping | None = None, 

215 repo: str | urls.Url | None = None, 

216 ) -> "RunApi": 

217 """Declare a new child run to be used later. 

218 

219 :param name: See :attr:`schema.Run.name`. 

220 :param tags: See :attr:`schema.Run.tags`. 

221 :param repo: See :paramref:`~mlopus.mlflow.BaseMlflowApi.create_run.repo`. 

222 """ 

223 return typing.cast(RunApi, self.api.create_run(self.exp, name, tags, repo, self)) 

224 

225 @pydantic.validate_arguments 

226 def start_child( 

227 self, 

228 name: str | None = None, 

229 tags: Mapping | None = None, 

230 repo: str | urls.Url | None = None, 

231 ) -> "RunApi": 

232 """Start a new child run. 

233 

234 :param name: See :attr:`schema.Run.name`. 

235 :param tags: See :attr:`schema.Run.tags`. 

236 :param repo: See :paramref:`~mlopus.mlflow.BaseMlflowApi.create_run.repo`. 

237 """ 

238 return typing.cast(RunApi, self.api.start_run(self.exp, name, tags, repo, self)) 

239 

240 @property 

241 def children(self) -> Iterator["RunApi"]: 

242 """Child runs.""" 

243 results = self.api.find_child_runs(parent=self) 

244 return typing.cast(Iterator[RunApi], results) 

245 

246 def resume(self) -> "RunApi": 

247 """Resume this experiment run.""" 

248 return self._use_values_from(self.api.resume_run(self)) 

249 

250 def end_run(self, succeeded: bool = True) -> "RunApi": 

251 """End experiment run.""" 

252 return self._use_values_from(self.api.end_run(self, succeeded)) 

253 

254 @decorators.require_update 

255 def set_tags(self, tags: Mapping) -> "RunApi": 

256 """Set tags on this run. 

257 

258 :param tags: See :attr:`schema.Run.tags`. 

259 """ 

260 self.api.set_tags_on_run(self, tags) 

261 return self 

262 

263 @decorators.require_update 

264 def log_params(self, params: Mapping) -> "RunApi": 

265 """Log params to this run. 

266 

267 :param params: See :attr:`schema.Run.params`. 

268 """ 

269 self.api.log_params(self, params) 

270 return self 

271 

272 @decorators.require_update 

273 def log_metrics(self, metrics: Mapping) -> "RunApi": 

274 """Log metrics to this experiment run. 

275 

276 :param metrics: See :attr:`schema.Run.metrics`. 

277 """ 

278 self.api.log_metrics(self, metrics) 

279 return self