Coverage for src/mlopus/mlflow/api/common/serde.py: 100%
16 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-13 14:49 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-07-13 14:49 +0000
1from pathlib import Path
2from typing import Type, TypeVar
4from mlopus.utils import pydantic, json_utils
6T = TypeVar("T", bound=pydantic.BaseModel)
9class EntitySerializer(pydantic.BaseModel):
10 """Base (de)serializer for metadata entities."""
12 @classmethod
13 def read(cls, type_: Type[T], data: str) -> T:
14 return type_.parse_obj(json_utils.loads(data))
16 @classmethod
17 def dump(cls, data: T) -> str:
18 return json_utils.dumps(data.dict())
20 def load(self, type_: Type[T], path: Path) -> T:
21 return self.read(type_, data=path.read_text())
23 def save(self, data: T, path: Path):
24 path.parent.mkdir(exist_ok=True, parents=True)
25 path.write_text(self.dump(data))