emdbg.debug.remote.rpyc
1# Copyright (c) 2015, Gallopsled et al. 2# Copyright (c) 2023, Auterion AG 3# SPDX-License-Identifier: MIT 4# Adapted from https://github.com/Gallopsled/pwntools 5 6from __future__ import annotations 7from threading import Event 8from contextlib import contextmanager 9import time 10from .gdb import Interface 11import logging 12LOGGER = logging.getLogger(__name__) 13from ...logger import VERBOSITY 14 15__all__ = ["Gdb"] 16 17class Breakpoint: 18 """Mirror of `gdb.Breakpoint` class. 19 20 See https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html 21 for more information. 22 """ 23 def __init__(self, conn, *args, **kwargs): 24 """Do not create instances of this class directly. 25 26 Use `Gdb.Breakpoint` instead. 27 """ 28 # Creates a real breakpoint and connects it with this mirror 29 self.conn = conn 30 self.server_breakpoint = conn.root.set_breakpoint( 31 self, hasattr(self, 'stop'), *args, **kwargs) 32 33 def __getattr__(self, item): 34 """Return attributes of the real breakpoint.""" 35 if item in ( 36 '____id_pack__', 37 '__name__', 38 '____conn__', 39 'stop', 40 ): 41 # Ignore RPyC netref attributes. 42 # Also, if stop() is not defined, hasattr() call in our 43 # __init__() will bring us here. Don't contact the 44 # server in this case either. 45 raise AttributeError() 46 return getattr(self.server_breakpoint, item) 47 48 def exposed_stop(self): 49 # Handle stop() call from the server. 50 return self.stop() 51 52 53class FinishBreakpoint: 54 """Mirror of `gdb.FinishBreakpoint` class. 55 56 See https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html 57 for more information. 58 """ 59 60 def __init__(self, conn, *args, **kwargs): 61 """Do not create instances of this class directly. 62 63 Use `Gdb.FinishBreakpoint` instead. 64 """ 65 # Creates a real finish breakpoint and connects it with this mirror 66 self.conn = conn 67 self.server_breakpoint = conn.root.set_finish_breakpoint( 68 self, hasattr(self, 'stop'), hasattr(self, 'out_of_scope'), 69 *args, **kwargs) 70 71 def __getattr__(self, item): 72 """Return attributes of the real breakpoint.""" 73 if item in ( 74 '____id_pack__', 75 '__name__', 76 '____conn__', 77 'stop', 78 'out_of_scope', 79 ): 80 # Ignore RPyC netref attributes. 81 # Also, if stop() or out_of_scope() are not defined, hasattr() call 82 # in our __init__() will bring us here. Don't contact the 83 # server in this case either. 84 raise AttributeError() 85 return getattr(self.server_breakpoint, item) 86 87 def exposed_stop(self): 88 # Handle stop() call from the server. 89 return self.stop() 90 91 def exposed_out_of_scope(self): 92 # Handle out_of_scope() call from the server. 93 return self.out_of_scope() 94 95 96class Gdb(Interface): 97 """Mirror of `gdb` module. 98 This class uses a RPyC to communicate with the GDB subprocess and exchange 99 IPC messages about the state of the system. You can therefore access 100 everything that the GDB Python API can in this process and it gets 101 synchronized automatically. However, keep in mind that access may be 102 significantly slower than when using the Python API in GDB directly. 103 104 See the [GDB Python API documentation](https://sourceware.org/gdb/onlinedocs/gdb/Basic-Python.html). 105 """ 106 107 def __init__(self, conn, backend, process): 108 """Do not create instances of this class directly. 109 110 Use `emdbg.debug.gdb.call_rpyc()` instead. 111 """ 112 super().__init__(backend) 113 self.conn = conn 114 self.process = process 115 self.type = "rpyc" 116 117 class _Breakpoint(Breakpoint): 118 def __init__(self, *args, **kwargs): 119 super().__init__(conn, *args, **kwargs) 120 class _FinishBreakpoint(FinishBreakpoint): 121 def __init__(self, *args, **kwargs): 122 super().__init__(conn, *args, **kwargs) 123 124 self.Breakpoint = _Breakpoint 125 """ 126 Mirror of [`gdb.Breakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html). 127 """ 128 self.FinishBreakpoint = _FinishBreakpoint 129 """ 130 Mirror of [`gdb.FinishBreakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html). 131 """ 132 self.stopped = Event() 133 134 def stop_handler(event): 135 self.stopped.set() 136 137 self.events.stop.connect(stop_handler) 138 139 def __getattr__(self, item): 140 return getattr(self.conn.root.gdb, item) 141 142 def wait(self): 143 """Wait until the program stops.""" 144 self.stopped.wait() 145 self.stopped.clear() 146 147 def interrupt_and_wait(self) -> bool: 148 if not self.interrupted: 149 self.execute("interrupt") 150 self.wait() 151 self.interrupted = True 152 return True 153 return False 154 155 def continue_nowait(self): 156 """Continue the program. Do not wait until it stops again.""" 157 if self.interrupted: 158 self.execute("continue&") 159 self.interrupted = False 160 time.sleep(0.2) 161 162 def continue_and_wait(self): 163 """Continue the program and wait until it stops again.""" 164 if self.interrupted: 165 self.execute("continue&") 166 self.interrupted = False 167 self.wait() 168 169 def quit(self): 170 self.interrupted = False 171 self.conn.root.quit() 172 173 def execute(self, cmd, timeout=1, to_string=False) -> str | None: 174 if VERBOSITY >= 3: LOGGER.debug(f"(gdb) {cmd}") 175 return self.conn.root.gdb.execute(cmd, to_string=to_string)
97class Gdb(Interface): 98 """Mirror of `gdb` module. 99 This class uses a RPyC to communicate with the GDB subprocess and exchange 100 IPC messages about the state of the system. You can therefore access 101 everything that the GDB Python API can in this process and it gets 102 synchronized automatically. However, keep in mind that access may be 103 significantly slower than when using the Python API in GDB directly. 104 105 See the [GDB Python API documentation](https://sourceware.org/gdb/onlinedocs/gdb/Basic-Python.html). 106 """ 107 108 def __init__(self, conn, backend, process): 109 """Do not create instances of this class directly. 110 111 Use `emdbg.debug.gdb.call_rpyc()` instead. 112 """ 113 super().__init__(backend) 114 self.conn = conn 115 self.process = process 116 self.type = "rpyc" 117 118 class _Breakpoint(Breakpoint): 119 def __init__(self, *args, **kwargs): 120 super().__init__(conn, *args, **kwargs) 121 class _FinishBreakpoint(FinishBreakpoint): 122 def __init__(self, *args, **kwargs): 123 super().__init__(conn, *args, **kwargs) 124 125 self.Breakpoint = _Breakpoint 126 """ 127 Mirror of [`gdb.Breakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html). 128 """ 129 self.FinishBreakpoint = _FinishBreakpoint 130 """ 131 Mirror of [`gdb.FinishBreakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html). 132 """ 133 self.stopped = Event() 134 135 def stop_handler(event): 136 self.stopped.set() 137 138 self.events.stop.connect(stop_handler) 139 140 def __getattr__(self, item): 141 return getattr(self.conn.root.gdb, item) 142 143 def wait(self): 144 """Wait until the program stops.""" 145 self.stopped.wait() 146 self.stopped.clear() 147 148 def interrupt_and_wait(self) -> bool: 149 if not self.interrupted: 150 self.execute("interrupt") 151 self.wait() 152 self.interrupted = True 153 return True 154 return False 155 156 def continue_nowait(self): 157 """Continue the program. Do not wait until it stops again.""" 158 if self.interrupted: 159 self.execute("continue&") 160 self.interrupted = False 161 time.sleep(0.2) 162 163 def continue_and_wait(self): 164 """Continue the program and wait until it stops again.""" 165 if self.interrupted: 166 self.execute("continue&") 167 self.interrupted = False 168 self.wait() 169 170 def quit(self): 171 self.interrupted = False 172 self.conn.root.quit() 173 174 def execute(self, cmd, timeout=1, to_string=False) -> str | None: 175 if VERBOSITY >= 3: LOGGER.debug(f"(gdb) {cmd}") 176 return self.conn.root.gdb.execute(cmd, to_string=to_string)
Mirror of gdb
module.
This class uses a RPyC to communicate with the GDB subprocess and exchange
IPC messages about the state of the system. You can therefore access
everything that the GDB Python API can in this process and it gets
synchronized automatically. However, keep in mind that access may be
significantly slower than when using the Python API in GDB directly.
See the GDB Python API documentation.
108 def __init__(self, conn, backend, process): 109 """Do not create instances of this class directly. 110 111 Use `emdbg.debug.gdb.call_rpyc()` instead. 112 """ 113 super().__init__(backend) 114 self.conn = conn 115 self.process = process 116 self.type = "rpyc" 117 118 class _Breakpoint(Breakpoint): 119 def __init__(self, *args, **kwargs): 120 super().__init__(conn, *args, **kwargs) 121 class _FinishBreakpoint(FinishBreakpoint): 122 def __init__(self, *args, **kwargs): 123 super().__init__(conn, *args, **kwargs) 124 125 self.Breakpoint = _Breakpoint 126 """ 127 Mirror of [`gdb.Breakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html). 128 """ 129 self.FinishBreakpoint = _FinishBreakpoint 130 """ 131 Mirror of [`gdb.FinishBreakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html). 132 """ 133 self.stopped = Event() 134 135 def stop_handler(event): 136 self.stopped.set() 137 138 self.events.stop.connect(stop_handler)
Do not create instances of this class directly.
Use emdbg.debug.gdb.call_rpyc()
instead.
143 def wait(self): 144 """Wait until the program stops.""" 145 self.stopped.wait() 146 self.stopped.clear()
Wait until the program stops.
148 def interrupt_and_wait(self) -> bool: 149 if not self.interrupted: 150 self.execute("interrupt") 151 self.wait() 152 self.interrupted = True 153 return True 154 return False
Interrupt the program and wait until it stops.
Returns
True
if GDB has been interrupted,False
if it was already interrupted.
156 def continue_nowait(self): 157 """Continue the program. Do not wait until it stops again.""" 158 if self.interrupted: 159 self.execute("continue&") 160 self.interrupted = False 161 time.sleep(0.2)
Continue the program. Do not wait until it stops again.
163 def continue_and_wait(self): 164 """Continue the program and wait until it stops again.""" 165 if self.interrupted: 166 self.execute("continue&") 167 self.interrupted = False 168 self.wait()
Continue the program and wait until it stops again.
174 def execute(self, cmd, timeout=1, to_string=False) -> str | None: 175 if VERBOSITY >= 3: LOGGER.debug(f"(gdb) {cmd}") 176 return self.conn.root.gdb.execute(cmd, to_string=to_string)
Executes a command on the GDB command prompt and waits for a response.
If to_string
is set, it will return the response as a string.
Parameters
- command: The command string to send to the GDB prompt.
- timeout: How long to wait for a response in seconds.
- to_string: Capture the response as a string and return it.