Coverage for src/mlopus/utils/paths.py: 89%

90 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-07-13 14:49 +0000

1import contextlib 

2import os 

3import shutil 

4from pathlib import Path 

5from typing import Literal, Iterable, TypeVar 

6 

7T = TypeVar("T") 

8 

9PathLike = Path | str 

10 

11PathOperation = Literal["copy", "move", "link"] 

12 

13 

14class IllegalPath(Exception): 

15 """Generic exception for illegal paths.""" 

16 

17 

18class Mode: 

19 """File permission modes.""" 

20 

21 rwx = 0o770 

22 r_x = 0o550 

23 

24 

25def is_sub_dir(x: PathLike, y: PathLike) -> bool: 

26 """Tell if `x` is subdirectory of `y`.""" 

27 return Path(x).expanduser().resolve().is_relative_to(Path(y).expanduser().resolve()) 

28 

29 

30def is_cwd(path: PathLike) -> bool: 

31 """Tell whether the path points to the current working directory.""" 

32 return Path(path).expanduser().resolve() == Path.cwd() 

33 

34 

35def ensure_is_dir(path: PathLike, force: bool = False) -> Path: 

36 """Ensure that specified path exists and is a directory.""" 

37 if (path := Path(path)).is_file() or is_broken_link(path): 

38 if force: 

39 path.unlink() 

40 raise NotADirectoryError(path) 

41 

42 path.mkdir(exist_ok=True, parents=True) 

43 return path 

44 

45 

46def is_broken_link(path: PathLike): 

47 """Detect broken symlink.""" 

48 return not (path := Path(path)).exists() and path.is_symlink() 

49 

50 

51def ensure_non_existing(path: PathLike, force: bool = False) -> Path: 

52 """Ensure that specified path doesn't exist yet, but its parents do.""" 

53 if (path := Path(path)).is_dir(): 

54 if not force: 

55 raise IsADirectoryError(path) 

56 elif path.is_symlink(): 

57 path.unlink() 

58 else: 

59 shutil.rmtree(path) 

60 elif path.is_file() or is_broken_link(path): 

61 if not force: 

62 raise FileExistsError(path) 

63 path.unlink() 

64 

65 return path 

66 

67 

68def ensure_empty_dir(path: PathLike, force: bool = False) -> Path: 

69 """Ensure that specified path is an empty directory.""" 

70 if (path := Path(path)).is_file() or is_broken_link(path) or (path.is_dir() and os.listdir(path)): 

71 ensure_non_existing(path, force) 

72 path.mkdir(parents=True, exist_ok=True) 

73 return path 

74 

75 

76def ensure_only_parents(path: PathLike, force: bool = False) -> Path: 

77 """Ensure that specified path doesn't exist yet, but its parents do.""" 

78 ensure_non_existing(path := Path(path), force) 

79 path.parent.mkdir(parents=True, exist_ok=True) 

80 return path 

81 

82 

83def is_rel_link(path: PathLike) -> bool: 

84 """Tell if path is relative symbolic link.""" 

85 return (path := Path(path)).is_symlink() and not Path(os.readlink(path)).is_absolute() 

86 

87 

88def iter_links(path: PathLike) -> Iterable[Path]: 

89 """Find and iterate symbolic links.""" 

90 if is_rel_link(path := Path(path)): 

91 yield path 

92 if path.is_dir(): 

93 for dirpath, dirnames, filenames in os.walk(path): 

94 for child in dirnames + filenames: 

95 if (link := Path(dirpath).joinpath(child)).is_symlink(): 

96 yield link 

97 

98 

99def place_path(src: PathLike, tgt: PathLike, mode: PathOperation, overwrite: bool = False, move_abs_links: bool = True): 

100 """Place source file or dir on target using selected operation.""" 

101 src = Path(src) 

102 ensure_only_parents(tgt := Path(tgt), force=overwrite) 

103 

104 if mode == "move": 

105 for link in iter_links(src): 

106 if (rel := is_rel_link(link)) or not move_abs_links: 

107 raise RuntimeError( 

108 f"Cannot move path '{src}' containing {'relative ' if rel else ''}symbolic link: {link}" 

109 ) 

110 src.rename(tgt) 

111 elif mode == "copy": 

112 shutil.copytree(src, tgt, symlinks=True) if src.is_dir() else shutil.copy(src, tgt, follow_symlinks=True) 

113 elif mode == "link": 

114 tgt.symlink_to(src.expanduser().resolve()) 

115 else: 

116 raise NotImplementedError(f"mode='{mode}'") 

117 

118 

119def chmod(path: PathLike, mode: int): 

120 """Apply chmod to file or directory.""" 

121 if (path := Path(path)).is_dir(): 

122 rchmod(path, mode) 

123 else: 

124 path.chmod(mode) 

125 

126 

127def rchmod(path: PathLike, mode: int): 

128 """Apply recursive chmod to directory.""" 

129 if not (path := Path(path)).is_dir(): 

130 raise NotADirectoryError(path) 

131 

132 path.chmod(mode) 

133 

134 for dirpath, dirnames, filenames in os.walk(path): 

135 for child in dirnames + filenames: 

136 Path(dirpath).joinpath(child).chmod(mode) 

137 

138 

139@contextlib.contextmanager 

140def dir_lock(path: Path): 

141 """Directory is unlocked inside this context, then protected against modifications on closure.""" 

142 try: 

143 rchmod(path, Mode.rwx) if path.exists() else None 

144 yield path 

145 finally: 

146 rchmod(path, Mode.r_x) if path.exists() else None 

147 

148 

149def iter_files(path: PathLike) -> Iterable[Path]: 

150 """Recursively iterate files in dir yielding their paths relative to that dir.""" 

151 for subdir, _, files in os.walk(path): 

152 for file in files: 

153 yield Path(subdir) / file