emdbg.patch.operation
1# Copyright (c) 2023, Auterion AG 2# SPDX-License-Identifier: BSD-3-Clause 3 4from __future__ import annotations 5import os 6import shutil 7import subprocess 8import hashlib 9from pathlib import Path 10from contextlib import contextmanager 11import logging 12LOGGER = logging.getLogger("patch:ops") 13 14def _repopath(path: str = None) -> Path: 15 return Path(__file__).parents[1] / (path or "") 16 17class OperationError(Exception): 18 """Base exception for patching-related errors.""" 19 pass 20 21# ----------------------------------------------------------------------------- 22class CopyOperation: 23 """ 24 Copies a file from the store to the repository and removes it again. 25 26 In case the destination location already exists, the file is stored in a 27 cache, then overwritten. On restore, the cached file is copied back 28 Note that you can set `src` to `None` if you want to remove a file inside 29 the repository. 30 """ 31 PATCH_STORE = _repopath(".patch_store") 32 33 def __init__(self, source: Path, destination: Path): 34 """ 35 :param source: File to copy into the repository. If set to `None`, the 36 destination file is simply removed. 37 :param destination: File to add or overwrite. In case, the file exists, 38 it is copied to the `.patch_store` folder from where 39 it is copied back when restoring. 40 """ 41 self._src = None 42 self._dst = Path(destination) 43 self._hash = str(self._dst.absolute()) 44 if source is not None: 45 self._src = Path(source) 46 self._hash += str(self._src.absolute()) 47 self._hash = hashlib.md5(self._hash.encode("utf-8")).hexdigest() 48 self._store = self.PATCH_STORE / str(self._hash) 49 self._dstrestore = self._store / self._dst.name 50 51 def test_do(self) -> bool: 52 return True 53 54 def test_undo(self) -> bool: 55 return True 56 57 def do(self) -> bool: 58 """ 59 Copies the `source` file to the `destination` location. 60 If the destination exists, it is copied to the `.patch_store`, before 61 getting overwritten by the source. 62 63 :return: `True` if applied successfully. 64 """ 65 if self._dst.exists(): 66 LOGGER.debug(f"Caching {self._dst} -> {self._dstrestore}") 67 self._store.mkdir(parents=True, exist_ok=True) 68 shutil.copy2(self._dst, self._dstrestore) 69 if self._src is not None: 70 LOGGER.debug(f"Copying {self._src} -> {self._dst}") 71 self._dst.parent.mkdir(parents=True, exist_ok=True) 72 shutil.copy2(self._src, self._dst) 73 return True 74 75 def undo(self) -> bool: 76 """ 77 Removes the source file and copies the original destination file if it 78 is found in the `.patch_store` folder. 79 80 :return: `True` if restored successfully. 81 """ 82 LOGGER.debug(f"Removing {self._dst}") 83 self._dst.unlink(missing_ok=True) 84 if self._dstrestore.exists(): 85 LOGGER.debug(f"Uncaching {self._dstrestore} -> {self._dst}") 86 shutil.copy2(self._dstrestore, self._dst) 87 self._dstrestore.unlink() 88 return True 89 90 def __repr__(self) -> str: 91 if self._src is None: 92 return f"Copy(Store <- {self._dst.name})" 93 if self._src.name == self._dst.name: 94 return f"Copy({self._dst.name})" 95 return f"Copy({self._src.name} -> {self._dst.name})" 96 97 98class PatchOperation: 99 """ 100 Applies a patch in unified diff format to a directory. 101 You can generate the patch from a git commit: `git format-patch -1 <sha>`. 102 However, there are many more methods that generate unified diffs. 103 """ 104 def __init__(self, root: Path, patch: Path): 105 """ 106 :param root: directory to relocate the relative paths inside the patch to. 107 :param patch: path to patch in unified diff format. 108 109 :raises ValueError: if patch does not exists, or it cannot be parsed. 110 """ 111 self._root = Path(root).absolute().resolve() 112 self._patch = Path(patch).absolute().resolve() 113 self._cmd = "patch --strip=1 --forward --ignore-whitespace --reject-file=- " \ 114 f"--posix --directory={self._root} --input={self._patch} --silent" 115 self._cmd_rev = self._cmd + " --reverse" 116 self._cmd_check = " --dry-run" 117 118 def test_do(self) -> bool: 119 cmd_check = self._cmd + self._cmd_check 120 if subprocess.run(cmd_check, shell=True).returncode: 121 LOGGER.debug(cmd_check) 122 return False 123 return True 124 125 def test_undo(self) -> bool: 126 cmd_check = self._cmd_rev + self._cmd_check 127 if subprocess.run(cmd_check, shell=True).returncode: 128 LOGGER.debug(cmd_check) 129 return False 130 return True 131 132 def do(self) -> bool: 133 """ 134 Applies the patch to the directory. 135 136 :return: `True` if applied successfully. 137 """ 138 LOGGER.debug(f"Applying patch {self._patch}") 139 if not self.test_do(): return False 140 LOGGER.debug(self._cmd) 141 return not subprocess.run(self._cmd, shell=True).returncode 142 143 def undo(self) -> bool: 144 """ 145 Applies the reversed patch to the directory. 146 147 :return: `True` if restored successfully. 148 """ 149 LOGGER.debug(f"Reverting patch {self._patch}") 150 if not self.test_undo(): return False 151 LOGGER.debug(self._cmd_rev) 152 return not subprocess.run(self._cmd_rev, shell=True).returncode 153 154 def __repr__(self) -> str: 155 return f"Patch({self._patch.name})" 156 157 158# ----------------------------------------------------------------------------- 159class PatchManager: 160 """ 161 Stores a number of named operations and applies them all. 162 """ 163 def __init__(self, name: str, ops: list[CopyOperation|PatchOperation]): 164 """ 165 :param name: Short, human-readable name of patch. 166 :param ops: list of operations. 167 """ 168 self.name = name 169 # make sure that PatchOperations happen first, since they can fail 170 self._ops = sorted(ops, key=lambda o: isinstance(o, CopyOperation)) 171 172 def do(self): 173 """ 174 Runs all operations to apply the patch. All operations are attempted 175 even if any of them fail. 176 177 :raises `OperationError`: if any of the operations failed. 178 """ 179 LOGGER.info(f"Applying '{self.name}'") 180 for op in self._ops: 181 if not op.test_do(): 182 raise OperationError(f"Patching failed: {op}") 183 for op in self._ops: 184 if not op.do(): 185 raise OperationError(f"Patching failed: {op}") 186 187 def undo(self): 188 """ 189 Runs all operations to apply the patch. All operations are attempted 190 even if any of them fail. 191 192 :raises `OperationError`: if any of the operations failed. 193 """ 194 LOGGER.info(f"Restoring '{self.name}'") 195 for op in self._ops: 196 if not op.test_undo(): 197 raise OperationError(f"Reverting failed: {op}") 198 for op in self._ops: 199 if not op.undo(): 200 raise OperationError(f"Reverting failed: {op}") 201 202 @contextmanager 203 def apply(self): 204 """ 205 Context manager version to apply and restore the patch within a scope. 206 207 ```py 208 # patch repository 209 with patch.apply(): 210 # run tests here 211 # restore repository 212 ``` 213 """ 214 try: 215 self.do() 216 yield 217 finally: 218 self.undo()
Base exception for patching-related errors.
23class CopyOperation: 24 """ 25 Copies a file from the store to the repository and removes it again. 26 27 In case the destination location already exists, the file is stored in a 28 cache, then overwritten. On restore, the cached file is copied back 29 Note that you can set `src` to `None` if you want to remove a file inside 30 the repository. 31 """ 32 PATCH_STORE = _repopath(".patch_store") 33 34 def __init__(self, source: Path, destination: Path): 35 """ 36 :param source: File to copy into the repository. If set to `None`, the 37 destination file is simply removed. 38 :param destination: File to add or overwrite. In case, the file exists, 39 it is copied to the `.patch_store` folder from where 40 it is copied back when restoring. 41 """ 42 self._src = None 43 self._dst = Path(destination) 44 self._hash = str(self._dst.absolute()) 45 if source is not None: 46 self._src = Path(source) 47 self._hash += str(self._src.absolute()) 48 self._hash = hashlib.md5(self._hash.encode("utf-8")).hexdigest() 49 self._store = self.PATCH_STORE / str(self._hash) 50 self._dstrestore = self._store / self._dst.name 51 52 def test_do(self) -> bool: 53 return True 54 55 def test_undo(self) -> bool: 56 return True 57 58 def do(self) -> bool: 59 """ 60 Copies the `source` file to the `destination` location. 61 If the destination exists, it is copied to the `.patch_store`, before 62 getting overwritten by the source. 63 64 :return: `True` if applied successfully. 65 """ 66 if self._dst.exists(): 67 LOGGER.debug(f"Caching {self._dst} -> {self._dstrestore}") 68 self._store.mkdir(parents=True, exist_ok=True) 69 shutil.copy2(self._dst, self._dstrestore) 70 if self._src is not None: 71 LOGGER.debug(f"Copying {self._src} -> {self._dst}") 72 self._dst.parent.mkdir(parents=True, exist_ok=True) 73 shutil.copy2(self._src, self._dst) 74 return True 75 76 def undo(self) -> bool: 77 """ 78 Removes the source file and copies the original destination file if it 79 is found in the `.patch_store` folder. 80 81 :return: `True` if restored successfully. 82 """ 83 LOGGER.debug(f"Removing {self._dst}") 84 self._dst.unlink(missing_ok=True) 85 if self._dstrestore.exists(): 86 LOGGER.debug(f"Uncaching {self._dstrestore} -> {self._dst}") 87 shutil.copy2(self._dstrestore, self._dst) 88 self._dstrestore.unlink() 89 return True 90 91 def __repr__(self) -> str: 92 if self._src is None: 93 return f"Copy(Store <- {self._dst.name})" 94 if self._src.name == self._dst.name: 95 return f"Copy({self._dst.name})" 96 return f"Copy({self._src.name} -> {self._dst.name})"
Copies a file from the store to the repository and removes it again.
In case the destination location already exists, the file is stored in a
cache, then overwritten. On restore, the cached file is copied back
Note that you can set src
to None
if you want to remove a file inside
the repository.
34 def __init__(self, source: Path, destination: Path): 35 """ 36 :param source: File to copy into the repository. If set to `None`, the 37 destination file is simply removed. 38 :param destination: File to add or overwrite. In case, the file exists, 39 it is copied to the `.patch_store` folder from where 40 it is copied back when restoring. 41 """ 42 self._src = None 43 self._dst = Path(destination) 44 self._hash = str(self._dst.absolute()) 45 if source is not None: 46 self._src = Path(source) 47 self._hash += str(self._src.absolute()) 48 self._hash = hashlib.md5(self._hash.encode("utf-8")).hexdigest() 49 self._store = self.PATCH_STORE / str(self._hash) 50 self._dstrestore = self._store / self._dst.name
Parameters
- source: File to copy into the repository. If set to
None
, the destination file is simply removed. - destination: File to add or overwrite. In case, the file exists,
it is copied to the
.patch_store
folder from where it is copied back when restoring.
58 def do(self) -> bool: 59 """ 60 Copies the `source` file to the `destination` location. 61 If the destination exists, it is copied to the `.patch_store`, before 62 getting overwritten by the source. 63 64 :return: `True` if applied successfully. 65 """ 66 if self._dst.exists(): 67 LOGGER.debug(f"Caching {self._dst} -> {self._dstrestore}") 68 self._store.mkdir(parents=True, exist_ok=True) 69 shutil.copy2(self._dst, self._dstrestore) 70 if self._src is not None: 71 LOGGER.debug(f"Copying {self._src} -> {self._dst}") 72 self._dst.parent.mkdir(parents=True, exist_ok=True) 73 shutil.copy2(self._src, self._dst) 74 return True
Copies the source
file to the destination
location.
If the destination exists, it is copied to the .patch_store
, before
getting overwritten by the source.
Returns
True
if applied successfully.
76 def undo(self) -> bool: 77 """ 78 Removes the source file and copies the original destination file if it 79 is found in the `.patch_store` folder. 80 81 :return: `True` if restored successfully. 82 """ 83 LOGGER.debug(f"Removing {self._dst}") 84 self._dst.unlink(missing_ok=True) 85 if self._dstrestore.exists(): 86 LOGGER.debug(f"Uncaching {self._dstrestore} -> {self._dst}") 87 shutil.copy2(self._dstrestore, self._dst) 88 self._dstrestore.unlink() 89 return True
Removes the source file and copies the original destination file if it
is found in the .patch_store
folder.
Returns
True
if restored successfully.
99class PatchOperation: 100 """ 101 Applies a patch in unified diff format to a directory. 102 You can generate the patch from a git commit: `git format-patch -1 <sha>`. 103 However, there are many more methods that generate unified diffs. 104 """ 105 def __init__(self, root: Path, patch: Path): 106 """ 107 :param root: directory to relocate the relative paths inside the patch to. 108 :param patch: path to patch in unified diff format. 109 110 :raises ValueError: if patch does not exists, or it cannot be parsed. 111 """ 112 self._root = Path(root).absolute().resolve() 113 self._patch = Path(patch).absolute().resolve() 114 self._cmd = "patch --strip=1 --forward --ignore-whitespace --reject-file=- " \ 115 f"--posix --directory={self._root} --input={self._patch} --silent" 116 self._cmd_rev = self._cmd + " --reverse" 117 self._cmd_check = " --dry-run" 118 119 def test_do(self) -> bool: 120 cmd_check = self._cmd + self._cmd_check 121 if subprocess.run(cmd_check, shell=True).returncode: 122 LOGGER.debug(cmd_check) 123 return False 124 return True 125 126 def test_undo(self) -> bool: 127 cmd_check = self._cmd_rev + self._cmd_check 128 if subprocess.run(cmd_check, shell=True).returncode: 129 LOGGER.debug(cmd_check) 130 return False 131 return True 132 133 def do(self) -> bool: 134 """ 135 Applies the patch to the directory. 136 137 :return: `True` if applied successfully. 138 """ 139 LOGGER.debug(f"Applying patch {self._patch}") 140 if not self.test_do(): return False 141 LOGGER.debug(self._cmd) 142 return not subprocess.run(self._cmd, shell=True).returncode 143 144 def undo(self) -> bool: 145 """ 146 Applies the reversed patch to the directory. 147 148 :return: `True` if restored successfully. 149 """ 150 LOGGER.debug(f"Reverting patch {self._patch}") 151 if not self.test_undo(): return False 152 LOGGER.debug(self._cmd_rev) 153 return not subprocess.run(self._cmd_rev, shell=True).returncode 154 155 def __repr__(self) -> str: 156 return f"Patch({self._patch.name})"
Applies a patch in unified diff format to a directory.
You can generate the patch from a git commit: git format-patch -1 <sha>
.
However, there are many more methods that generate unified diffs.
105 def __init__(self, root: Path, patch: Path): 106 """ 107 :param root: directory to relocate the relative paths inside the patch to. 108 :param patch: path to patch in unified diff format. 109 110 :raises ValueError: if patch does not exists, or it cannot be parsed. 111 """ 112 self._root = Path(root).absolute().resolve() 113 self._patch = Path(patch).absolute().resolve() 114 self._cmd = "patch --strip=1 --forward --ignore-whitespace --reject-file=- " \ 115 f"--posix --directory={self._root} --input={self._patch} --silent" 116 self._cmd_rev = self._cmd + " --reverse" 117 self._cmd_check = " --dry-run"
Parameters
- root: directory to relocate the relative paths inside the patch to.
- patch: path to patch in unified diff format.
Raises
- ValueError: if patch does not exists, or it cannot be parsed.
133 def do(self) -> bool: 134 """ 135 Applies the patch to the directory. 136 137 :return: `True` if applied successfully. 138 """ 139 LOGGER.debug(f"Applying patch {self._patch}") 140 if not self.test_do(): return False 141 LOGGER.debug(self._cmd) 142 return not subprocess.run(self._cmd, shell=True).returncode
Applies the patch to the directory.
Returns
True
if applied successfully.
144 def undo(self) -> bool: 145 """ 146 Applies the reversed patch to the directory. 147 148 :return: `True` if restored successfully. 149 """ 150 LOGGER.debug(f"Reverting patch {self._patch}") 151 if not self.test_undo(): return False 152 LOGGER.debug(self._cmd_rev) 153 return not subprocess.run(self._cmd_rev, shell=True).returncode
Applies the reversed patch to the directory.
Returns
True
if restored successfully.
160class PatchManager: 161 """ 162 Stores a number of named operations and applies them all. 163 """ 164 def __init__(self, name: str, ops: list[CopyOperation|PatchOperation]): 165 """ 166 :param name: Short, human-readable name of patch. 167 :param ops: list of operations. 168 """ 169 self.name = name 170 # make sure that PatchOperations happen first, since they can fail 171 self._ops = sorted(ops, key=lambda o: isinstance(o, CopyOperation)) 172 173 def do(self): 174 """ 175 Runs all operations to apply the patch. All operations are attempted 176 even if any of them fail. 177 178 :raises `OperationError`: if any of the operations failed. 179 """ 180 LOGGER.info(f"Applying '{self.name}'") 181 for op in self._ops: 182 if not op.test_do(): 183 raise OperationError(f"Patching failed: {op}") 184 for op in self._ops: 185 if not op.do(): 186 raise OperationError(f"Patching failed: {op}") 187 188 def undo(self): 189 """ 190 Runs all operations to apply the patch. All operations are attempted 191 even if any of them fail. 192 193 :raises `OperationError`: if any of the operations failed. 194 """ 195 LOGGER.info(f"Restoring '{self.name}'") 196 for op in self._ops: 197 if not op.test_undo(): 198 raise OperationError(f"Reverting failed: {op}") 199 for op in self._ops: 200 if not op.undo(): 201 raise OperationError(f"Reverting failed: {op}") 202 203 @contextmanager 204 def apply(self): 205 """ 206 Context manager version to apply and restore the patch within a scope. 207 208 ```py 209 # patch repository 210 with patch.apply(): 211 # run tests here 212 # restore repository 213 ``` 214 """ 215 try: 216 self.do() 217 yield 218 finally: 219 self.undo()
Stores a number of named operations and applies them all.
164 def __init__(self, name: str, ops: list[CopyOperation|PatchOperation]): 165 """ 166 :param name: Short, human-readable name of patch. 167 :param ops: list of operations. 168 """ 169 self.name = name 170 # make sure that PatchOperations happen first, since they can fail 171 self._ops = sorted(ops, key=lambda o: isinstance(o, CopyOperation))
Parameters
- name: Short, human-readable name of patch.
- ops: list of operations.
173 def do(self): 174 """ 175 Runs all operations to apply the patch. All operations are attempted 176 even if any of them fail. 177 178 :raises `OperationError`: if any of the operations failed. 179 """ 180 LOGGER.info(f"Applying '{self.name}'") 181 for op in self._ops: 182 if not op.test_do(): 183 raise OperationError(f"Patching failed: {op}") 184 for op in self._ops: 185 if not op.do(): 186 raise OperationError(f"Patching failed: {op}")
Runs all operations to apply the patch. All operations are attempted even if any of them fail.
Raises
OperationError
: if any of the operations failed.
188 def undo(self): 189 """ 190 Runs all operations to apply the patch. All operations are attempted 191 even if any of them fail. 192 193 :raises `OperationError`: if any of the operations failed. 194 """ 195 LOGGER.info(f"Restoring '{self.name}'") 196 for op in self._ops: 197 if not op.test_undo(): 198 raise OperationError(f"Reverting failed: {op}") 199 for op in self._ops: 200 if not op.undo(): 201 raise OperationError(f"Reverting failed: {op}")
Runs all operations to apply the patch. All operations are attempted even if any of them fail.
Raises
OperationError
: if any of the operations failed.
203 @contextmanager 204 def apply(self): 205 """ 206 Context manager version to apply and restore the patch within a scope. 207 208 ```py 209 # patch repository 210 with patch.apply(): 211 # run tests here 212 # restore repository 213 ``` 214 """ 215 try: 216 self.do() 217 yield 218 finally: 219 self.undo()
Context manager version to apply and restore the patch within a scope.
# patch repository
with patch.apply():
# run tests here
# restore repository