emdbg.debug.remote.rpyc

  1# Copyright (c) 2015, Gallopsled et al.
  2# Copyright (c) 2023, Auterion AG
  3# SPDX-License-Identifier: MIT
  4# Adapted from https://github.com/Gallopsled/pwntools
  5
  6from __future__ import annotations
  7from threading import Event
  8from contextlib import contextmanager
  9import time
 10from .gdb import Interface
 11import logging
 12LOGGER = logging.getLogger(__name__)
 13from ...logger import VERBOSITY
 14
 15__all__ = ["Gdb"]
 16
 17class Breakpoint:
 18    """Mirror of `gdb.Breakpoint` class.
 19
 20    See https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html
 21    for more information.
 22    """
 23    def __init__(self, conn, *args, **kwargs):
 24        """Do not create instances of this class directly.
 25
 26        Use `Gdb.Breakpoint` instead.
 27        """
 28        # Creates a real breakpoint and connects it with this mirror
 29        self.conn = conn
 30        self.server_breakpoint = conn.root.set_breakpoint(
 31            self, hasattr(self, 'stop'), *args, **kwargs)
 32
 33    def __getattr__(self, item):
 34        """Return attributes of the real breakpoint."""
 35        if item in (
 36                '____id_pack__',
 37                '__name__',
 38                '____conn__',
 39                'stop',
 40        ):
 41            # Ignore RPyC netref attributes.
 42            # Also, if stop() is not defined, hasattr() call in our
 43            # __init__() will bring us here. Don't contact the
 44            # server in this case either.
 45            raise AttributeError()
 46        return getattr(self.server_breakpoint, item)
 47
 48    def exposed_stop(self):
 49        # Handle stop() call from the server.
 50        return self.stop()
 51
 52
 53class FinishBreakpoint:
 54    """Mirror of `gdb.FinishBreakpoint` class.
 55
 56    See https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html
 57    for more information.
 58    """
 59
 60    def __init__(self, conn, *args, **kwargs):
 61        """Do not create instances of this class directly.
 62
 63        Use `Gdb.FinishBreakpoint` instead.
 64        """
 65        # Creates a real finish breakpoint and connects it with this mirror
 66        self.conn = conn
 67        self.server_breakpoint = conn.root.set_finish_breakpoint(
 68            self, hasattr(self, 'stop'), hasattr(self, 'out_of_scope'),
 69            *args, **kwargs)
 70
 71    def __getattr__(self, item):
 72        """Return attributes of the real breakpoint."""
 73        if item in (
 74                '____id_pack__',
 75                '__name__',
 76                '____conn__',
 77                'stop',
 78                'out_of_scope',
 79        ):
 80            # Ignore RPyC netref attributes.
 81            # Also, if stop() or out_of_scope() are not defined, hasattr() call
 82            # in our __init__() will bring us here. Don't contact the
 83            # server in this case either.
 84            raise AttributeError()
 85        return getattr(self.server_breakpoint, item)
 86
 87    def exposed_stop(self):
 88        # Handle stop() call from the server.
 89        return self.stop()
 90
 91    def exposed_out_of_scope(self):
 92        # Handle out_of_scope() call from the server.
 93        return self.out_of_scope()
 94
 95
 96class Gdb(Interface):
 97    """Mirror of `gdb` module.
 98    This class uses a RPyC to communicate with the GDB subprocess and exchange
 99    IPC messages about the state of the system. You can therefore access
100    everything that the GDB Python API can in this process and it gets
101    synchronized automatically. However, keep in mind that access may be
102    significantly slower than when using the Python API in GDB directly.
103
104    See the [GDB Python API documentation](https://sourceware.org/gdb/onlinedocs/gdb/Basic-Python.html).
105    """
106
107    def __init__(self, conn, backend, process):
108        """Do not create instances of this class directly.
109
110        Use `emdbg.debug.gdb.call_rpyc()` instead.
111        """
112        super().__init__(backend)
113        self.conn = conn
114        self.process = process
115        self.type = "rpyc"
116
117        class _Breakpoint(Breakpoint):
118            def __init__(self, *args, **kwargs):
119                super().__init__(conn, *args, **kwargs)
120        class _FinishBreakpoint(FinishBreakpoint):
121            def __init__(self, *args, **kwargs):
122                super().__init__(conn, *args, **kwargs)
123
124        self.Breakpoint = _Breakpoint
125        """
126        Mirror of [`gdb.Breakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html).
127        """
128        self.FinishBreakpoint = _FinishBreakpoint
129        """
130        Mirror of [`gdb.FinishBreakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html).
131        """
132        self.stopped = Event()
133
134        def stop_handler(event):
135            self.stopped.set()
136
137        self.events.stop.connect(stop_handler)
138
139    def __getattr__(self, item):
140        return getattr(self.conn.root.gdb, item)
141
142    def wait(self):
143        """Wait until the program stops."""
144        self.stopped.wait()
145        self.stopped.clear()
146
147    def interrupt_and_wait(self) -> bool:
148        if not self.interrupted:
149            self.execute("interrupt")
150            self.wait()
151            self.interrupted = True
152            return True
153        return False
154
155    def continue_nowait(self):
156        """Continue the program. Do not wait until it stops again."""
157        if self.interrupted:
158            self.execute("continue&")
159            self.interrupted = False
160        time.sleep(0.2)
161
162    def continue_and_wait(self):
163        """Continue the program and wait until it stops again."""
164        if self.interrupted:
165            self.execute("continue&")
166            self.interrupted = False
167        self.wait()
168
169    def quit(self):
170        self.interrupted = False
171        self.conn.root.quit()
172
173    def execute(self, cmd, timeout=1, to_string=False) -> str | None:
174        if VERBOSITY >= 3: LOGGER.debug(f"(gdb) {cmd}")
175        return self.conn.root.gdb.execute(cmd, to_string=to_string)
class Gdb(emdbg.debug.remote.gdb.Interface):
 97class Gdb(Interface):
 98    """Mirror of `gdb` module.
 99    This class uses a RPyC to communicate with the GDB subprocess and exchange
100    IPC messages about the state of the system. You can therefore access
101    everything that the GDB Python API can in this process and it gets
102    synchronized automatically. However, keep in mind that access may be
103    significantly slower than when using the Python API in GDB directly.
104
105    See the [GDB Python API documentation](https://sourceware.org/gdb/onlinedocs/gdb/Basic-Python.html).
106    """
107
108    def __init__(self, conn, backend, process):
109        """Do not create instances of this class directly.
110
111        Use `emdbg.debug.gdb.call_rpyc()` instead.
112        """
113        super().__init__(backend)
114        self.conn = conn
115        self.process = process
116        self.type = "rpyc"
117
118        class _Breakpoint(Breakpoint):
119            def __init__(self, *args, **kwargs):
120                super().__init__(conn, *args, **kwargs)
121        class _FinishBreakpoint(FinishBreakpoint):
122            def __init__(self, *args, **kwargs):
123                super().__init__(conn, *args, **kwargs)
124
125        self.Breakpoint = _Breakpoint
126        """
127        Mirror of [`gdb.Breakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html).
128        """
129        self.FinishBreakpoint = _FinishBreakpoint
130        """
131        Mirror of [`gdb.FinishBreakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html).
132        """
133        self.stopped = Event()
134
135        def stop_handler(event):
136            self.stopped.set()
137
138        self.events.stop.connect(stop_handler)
139
140    def __getattr__(self, item):
141        return getattr(self.conn.root.gdb, item)
142
143    def wait(self):
144        """Wait until the program stops."""
145        self.stopped.wait()
146        self.stopped.clear()
147
148    def interrupt_and_wait(self) -> bool:
149        if not self.interrupted:
150            self.execute("interrupt")
151            self.wait()
152            self.interrupted = True
153            return True
154        return False
155
156    def continue_nowait(self):
157        """Continue the program. Do not wait until it stops again."""
158        if self.interrupted:
159            self.execute("continue&")
160            self.interrupted = False
161        time.sleep(0.2)
162
163    def continue_and_wait(self):
164        """Continue the program and wait until it stops again."""
165        if self.interrupted:
166            self.execute("continue&")
167            self.interrupted = False
168        self.wait()
169
170    def quit(self):
171        self.interrupted = False
172        self.conn.root.quit()
173
174    def execute(self, cmd, timeout=1, to_string=False) -> str | None:
175        if VERBOSITY >= 3: LOGGER.debug(f"(gdb) {cmd}")
176        return self.conn.root.gdb.execute(cmd, to_string=to_string)

Mirror of gdb module. This class uses a RPyC to communicate with the GDB subprocess and exchange IPC messages about the state of the system. You can therefore access everything that the GDB Python API can in this process and it gets synchronized automatically. However, keep in mind that access may be significantly slower than when using the Python API in GDB directly.

See the GDB Python API documentation.

Gdb(conn, backend, process)
108    def __init__(self, conn, backend, process):
109        """Do not create instances of this class directly.
110
111        Use `emdbg.debug.gdb.call_rpyc()` instead.
112        """
113        super().__init__(backend)
114        self.conn = conn
115        self.process = process
116        self.type = "rpyc"
117
118        class _Breakpoint(Breakpoint):
119            def __init__(self, *args, **kwargs):
120                super().__init__(conn, *args, **kwargs)
121        class _FinishBreakpoint(FinishBreakpoint):
122            def __init__(self, *args, **kwargs):
123                super().__init__(conn, *args, **kwargs)
124
125        self.Breakpoint = _Breakpoint
126        """
127        Mirror of [`gdb.Breakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Breakpoints-In-Python.html).
128        """
129        self.FinishBreakpoint = _FinishBreakpoint
130        """
131        Mirror of [`gdb.FinishBreakpoint` class](https://sourceware.org/gdb/onlinedocs/gdb/Finish-Breakpoints-in-Python.html).
132        """
133        self.stopped = Event()
134
135        def stop_handler(event):
136            self.stopped.set()
137
138        self.events.stop.connect(stop_handler)

Do not create instances of this class directly.

Use emdbg.debug.gdb.call_rpyc() instead.

conn
process
type
Breakpoint
FinishBreakpoint
stopped
def wait(self):
143    def wait(self):
144        """Wait until the program stops."""
145        self.stopped.wait()
146        self.stopped.clear()

Wait until the program stops.

def interrupt_and_wait(self) -> bool:
148    def interrupt_and_wait(self) -> bool:
149        if not self.interrupted:
150            self.execute("interrupt")
151            self.wait()
152            self.interrupted = True
153            return True
154        return False

Interrupt the program and wait until it stops.

Returns

True if GDB has been interrupted, False if it was already interrupted.

def continue_nowait(self):
156    def continue_nowait(self):
157        """Continue the program. Do not wait until it stops again."""
158        if self.interrupted:
159            self.execute("continue&")
160            self.interrupted = False
161        time.sleep(0.2)

Continue the program. Do not wait until it stops again.

def continue_and_wait(self):
163    def continue_and_wait(self):
164        """Continue the program and wait until it stops again."""
165        if self.interrupted:
166            self.execute("continue&")
167            self.interrupted = False
168        self.wait()

Continue the program and wait until it stops again.

def quit(self):
170    def quit(self):
171        self.interrupted = False
172        self.conn.root.quit()

Terminate GDB.

def execute(self, cmd, timeout=1, to_string=False) -> str | None:
174    def execute(self, cmd, timeout=1, to_string=False) -> str | None:
175        if VERBOSITY >= 3: LOGGER.debug(f"(gdb) {cmd}")
176        return self.conn.root.gdb.execute(cmd, to_string=to_string)

Executes a command on the GDB command prompt and waits for a response. If to_string is set, it will return the response as a string.

Parameters
  • command: The command string to send to the GDB prompt.
  • timeout: How long to wait for a response in seconds.
  • to_string: Capture the response as a string and return it.