emdbg.serial.protocol

  1# Copyright (c) 2023, Auterion AG
  2# SPDX-License-Identifier: BSD-3-Clause
  3
  4from __future__ import annotations
  5import re
  6import time
  7import logging
  8from contextlib import contextmanager
  9from pathlib import Path
 10from serial import Serial
 11from serial.threaded import ReaderThread, Protocol
 12from .utils import find_serial_port, ansi_escape
 13from ..utils import add_datetime as add_dt
 14
 15_LOGGER = logging.getLogger("serial:nsh")
 16
 17
 18class _CmdReader(Protocol):
 19    def __init__(self, device: Serial):
 20        super().__init__()
 21        self.stream = ""
 22        self.device = device
 23        self._data_received = lambda _: None
 24
 25    def data_received(self, data: bytes):
 26        new_stream = data.decode("utf-8", errors="replace")
 27        self._data_received(new_stream)
 28        self.stream += new_stream
 29
 30    def read_packets(self, separator: str) -> list[str]:
 31        if separator not in self.stream:
 32            return []
 33        *packets, self.stream = self.stream.split(separator)
 34        return packets
 35
 36    def clear_input(self):
 37        self.device.reset_input_buffer()
 38        self.stream = ""
 39
 40    def clear_output(self):
 41        self.device.reset_output_buffer()
 42
 43    def clear(self):
 44        self.clear_output()
 45        self.clear_input()
 46
 47
 48class CommandPrompt:
 49    """
 50    Manages a command prompt, in particular, receiving data in the background
 51    and logging it out to the INFO logger.
 52    Several convenience methods allow you to send a command and receive its
 53    response, or wait for a certain pattern to arrive in the stream.
 54    """
 55    _TIMEOUT = 3
 56
 57    def __init__(self, reader_thread: ReaderThread, protocol: _CmdReader,
 58                 prompt: str = None, newline: str = None):
 59        """
 60        Use the `nsh` context manager to build this class correctly.
 61
 62        :param reader_thread: The background reader thread.
 63        :param protocol: The command prompt protocol.
 64        :param prompt: Optional prefix of the command prompt (default empty string).
 65        :param newline: The newline characters used in the prompt (default `\\r\\n`).
 66        """
 67        self._serial = protocol
 68        self._reader_thread = reader_thread
 69        self._serial._data_received = self._print
 70        self._print_data = ""
 71        self._prompt = "" if prompt is None else prompt
 72        self._newline = "\r\n" if newline is None else newline
 73        self.filter_ansi_escapes = True
 74        """Filter ANSI escape codes from the output."""
 75        self._logfile = None
 76        self.clear()
 77
 78    def _write_line(self, line):
 79        self._reader_thread.write((line + "\n").encode("utf-8"))
 80
 81    def _print(self, data: str):
 82        self._print_data += data
 83        if self._newline in self._print_data:
 84            *lines, self._print_data = self._print_data.split(self._newline)
 85            for line in self._filter(lines):
 86                _LOGGER.debug(line)
 87                if self._logfile is not None:
 88                    self._logfile.write(line + "\n")
 89
 90    def _filter(self, lines):
 91        if self.filter_ansi_escapes:
 92            lines = list(map(ansi_escape, lines))
 93        return lines
 94
 95    def _read_packets(self, separator: str, timeout: float = _TIMEOUT) -> list[str]:
 96        start = time.time()
 97        while True:
 98            if packets := self._serial.read_packets(separator):
 99                return packets
100            if time.time() - start > timeout:
101                break
102            time.sleep(0.1)
103        return []
104
105    def _join(self, lines: list[str]) -> str | None:
106        return self._newline.join(lines) if lines else None
107
108    def clear(self):
109        """Clear the receive and transmit buffers."""
110        self._serial.clear()
111        self._print_data = ""
112
113    def log_to_file(self, filename: Path | str, add_datetime: bool = False) -> Path:
114        """
115        Log the received data to `filename` or close the log file.
116
117        :param filename: file to log into. If `None`, the file is closed and
118                         logging is disabled.
119        :param add_datetime: appends the date and time to the filename.
120                             See `emdbg.utils.add_datetime`
121
122        :return: the actual filename with date and time (if selected).
123        """
124        if self._logfile is not None:
125            self._logfile.close()
126
127        if filename is None:
128            self._logfile = None
129            return None
130
131        filename = add_dt(filename) if add_datetime else Path(filename)
132        self._logfile = filename.open("wt")
133        return filename
134
135    def read_lines(self, timeout: float = _TIMEOUT) -> str | None:
136        """
137        Return any lines received within `timeout`.
138        Note that any ANSI escape codes (for color or cursor position) are
139        filtered out.
140
141        :param timeout: seconds to wait until new lines arrive.
142
143        :return: received lines or None on timeout
144        """
145        lines = self._filter(self._read_packets(self._newline, timeout))
146        return self._join(lines)
147
148    def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None:
149        """
150        Waits for a regex pattern to appear in a line in the stream.
151        This function reads any new received lines and searches for `pattern` in
152        every line. If the line matches, all lines are returned.
153        Note that any ANSI escape codes (for color or cursor position) are
154        filtered out.
155
156        :param pattern: regex pattern to search for via `re.search`. To match
157                        line beginnings and ends you must use `^pattern$`.
158        :param timeout: seconds to wait until new lines arrive.
159
160        :return: received lines until matched pattern or None on timeout
161        """
162        lines = ""
163        start = time.time()
164        while True:
165            if time.time() - start > timeout:
166                break
167            if (new_lines := self.read_lines(0)) is not None:
168                lines += new_lines
169                if re.search(pattern, new_lines):
170                    return lines
171            time.sleep(0.1)
172        _LOGGER.warning(f"Waiting for '{pattern}' timed out after {timeout:.1f}s!")
173        return None
174
175    def wait_for_prompt(self, timeout: float = _TIMEOUT) -> list[str]:
176        """
177        Waits to the prompt to arrive in the stream.
178        Note that any ANSI escape codes (for color or cursor position) are
179        filtered out.
180
181        :param timeout: seconds to wait until the prompt arrives.
182
183        :return: all lines until the prompt arrives.
184        """
185        if prompts := self._read_packets(self._newline + self._prompt, timeout):
186            prompt = self._prompt + self._prompt.join(prompts)
187            return self._join(self._filter(prompt.split(self._newline)))
188        _LOGGER.warning(f"Waiting for '{self._prompt}' prompt timed out after {timeout:.1f}s!")
189        return None
190
191    def command(self, command: str, timeout: float = _TIMEOUT) -> str | None:
192        """
193        Send a command and return all lines until the next prompt.
194        If the command is asynchronous, you need to poll for new lines separately.
195        Note that any ANSI escape codes (for color or cursor position) are
196        filtered out.
197
198        :param command: command string to send to the command prompt.
199        :param timeout: seconds to wait until the prompt arrives.
200
201        :return: all lines from the command issue until the next prompt arrives.
202        """
203        self._serial.clear()
204        self._write_line(command)
205        if timeout is not None:
206            return self.wait_for_prompt(timeout)
207
208    def command_nowait(self, command: str):
209        """
210        Send a command to the command prompt without waiting for a response.
211
212        :param command: command string to send to the command prompt.
213        """
214        self.command(command, None)
215
216    def reboot(self, timeout: int = 15) -> str | None:
217        """
218        Send the reboot command and wait for the reboot to be completed.
219
220        :param timeout: seconds to wait until the prompt arrives.
221
222        :return: all lines from reboot until the next prompt.
223        """
224        return self.command("reboot", timeout)
225
226    def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool:
227        """
228        Check if the command prompt is responding to newline inputs with a prompt.
229        The total timeout is `attempts * timeout`!
230
231        :param timeout: seconds to wait until the prompt arrives.
232        :param attempts: number of times to send a newline and wait.
233        :return: `True` is command prompt responds, `False` otherwise
234        """
235        self._serial.clear()
236        attempt = 0
237        timeout = timeout / attempts
238        while attempt < attempts:
239            self._write_line("")
240            if self.wait_for_prompt(timeout) is not None:
241                return True
242            attempt += 1
243        return False
244
245
246# -----------------------------------------------------------------------------
247@contextmanager
248def cmd(serial_or_port: str, baudrate: int = 115200, prompt: str = None, newline: str = None):
249    """
250    Opens a serial port with the `serial` number or filepath and closes it again.
251
252    :param serial_or_port: the serial number or the filepath of the port to
253                           connect to.
254    :param baudrate: the baudrate to use.
255    :param prompt: Optional prefix of the command prompt.
256    :param newline: The newline characters used in the prompt.
257
258    :raises `SerialException`: if serial port is not found.
259
260    :return: yields an initialized `CommandPrompt` object.
261    """
262    cmd = None
263    if "/" in serial_or_port:
264        ttyDevice = serial_or_port
265    else:
266        ttyDevice = find_serial_port(serial_or_port).device
267    try:
268        _LOGGER.info(f"Starting on port '{serial_or_port}'..."
269                     if serial_or_port else "Starting...")
270        device = Serial(ttyDevice, baudrate=baudrate)
271        reader_thread = ReaderThread(device, lambda: _CmdReader(device))
272        with reader_thread as reader:
273            cmd = CommandPrompt(reader_thread, reader, prompt, newline)
274            yield cmd
275    finally:
276        if cmd is not None: cmd.log_to_file(None)
277        _LOGGER.debug("Stopping.")
278
279
280@contextmanager
281def nsh(serial_or_port: str, baudrate: int = 57600):
282    """
283    Same as `cmd()` but with a `nsh> ` prompt for use with PX4.
284    """
285    with cmd(serial_or_port, baudrate, "nsh> ") as nsh:
286        yield nsh
287
288
289# -----------------------------------------------------------------------------
290# We need to monkey patch the ReaderThread.run() function to prevent a
291# "device not ready" error to abort the reader thread.
292def _patched_run(self):
293    from serial import SerialException
294    self.serial.timeout = 0.1
295    self.protocol = self.protocol_factory()
296    try:
297        self.protocol.connection_made(self)
298    except Exception as e:
299        self.alive = False
300        self.protocol.connection_lost(e)
301        self._connection_made.set()
302        return
303    error = None
304    self._connection_made.set()
305    while self.alive and self.serial.is_open:
306        try:
307            data = self.serial.read(self.serial.in_waiting or 1)
308        except SerialException as e:
309            if self.alive and "readiness" in str(e):
310                # _LOGGER.debug(e)
311                continue
312            error = e
313            break
314        else:
315            if data:
316                try:
317                    self.protocol.data_received(data)
318                except Exception as e:
319                    error = e
320                    break
321    self.alive = False
322    self.protocol.connection_lost(error)
323    self.protocol = None
324
325ReaderThread.run = _patched_run
class CommandPrompt:
 49class CommandPrompt:
 50    """
 51    Manages a command prompt, in particular, receiving data in the background
 52    and logging it out to the INFO logger.
 53    Several convenience methods allow you to send a command and receive its
 54    response, or wait for a certain pattern to arrive in the stream.
 55    """
 56    _TIMEOUT = 3
 57
 58    def __init__(self, reader_thread: ReaderThread, protocol: _CmdReader,
 59                 prompt: str = None, newline: str = None):
 60        """
 61        Use the `nsh` context manager to build this class correctly.
 62
 63        :param reader_thread: The background reader thread.
 64        :param protocol: The command prompt protocol.
 65        :param prompt: Optional prefix of the command prompt (default empty string).
 66        :param newline: The newline characters used in the prompt (default `\\r\\n`).
 67        """
 68        self._serial = protocol
 69        self._reader_thread = reader_thread
 70        self._serial._data_received = self._print
 71        self._print_data = ""
 72        self._prompt = "" if prompt is None else prompt
 73        self._newline = "\r\n" if newline is None else newline
 74        self.filter_ansi_escapes = True
 75        """Filter ANSI escape codes from the output."""
 76        self._logfile = None
 77        self.clear()
 78
 79    def _write_line(self, line):
 80        self._reader_thread.write((line + "\n").encode("utf-8"))
 81
 82    def _print(self, data: str):
 83        self._print_data += data
 84        if self._newline in self._print_data:
 85            *lines, self._print_data = self._print_data.split(self._newline)
 86            for line in self._filter(lines):
 87                _LOGGER.debug(line)
 88                if self._logfile is not None:
 89                    self._logfile.write(line + "\n")
 90
 91    def _filter(self, lines):
 92        if self.filter_ansi_escapes:
 93            lines = list(map(ansi_escape, lines))
 94        return lines
 95
 96    def _read_packets(self, separator: str, timeout: float = _TIMEOUT) -> list[str]:
 97        start = time.time()
 98        while True:
 99            if packets := self._serial.read_packets(separator):
100                return packets
101            if time.time() - start > timeout:
102                break
103            time.sleep(0.1)
104        return []
105
106    def _join(self, lines: list[str]) -> str | None:
107        return self._newline.join(lines) if lines else None
108
109    def clear(self):
110        """Clear the receive and transmit buffers."""
111        self._serial.clear()
112        self._print_data = ""
113
114    def log_to_file(self, filename: Path | str, add_datetime: bool = False) -> Path:
115        """
116        Log the received data to `filename` or close the log file.
117
118        :param filename: file to log into. If `None`, the file is closed and
119                         logging is disabled.
120        :param add_datetime: appends the date and time to the filename.
121                             See `emdbg.utils.add_datetime`
122
123        :return: the actual filename with date and time (if selected).
124        """
125        if self._logfile is not None:
126            self._logfile.close()
127
128        if filename is None:
129            self._logfile = None
130            return None
131
132        filename = add_dt(filename) if add_datetime else Path(filename)
133        self._logfile = filename.open("wt")
134        return filename
135
136    def read_lines(self, timeout: float = _TIMEOUT) -> str | None:
137        """
138        Return any lines received within `timeout`.
139        Note that any ANSI escape codes (for color or cursor position) are
140        filtered out.
141
142        :param timeout: seconds to wait until new lines arrive.
143
144        :return: received lines or None on timeout
145        """
146        lines = self._filter(self._read_packets(self._newline, timeout))
147        return self._join(lines)
148
149    def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None:
150        """
151        Waits for a regex pattern to appear in a line in the stream.
152        This function reads any new received lines and searches for `pattern` in
153        every line. If the line matches, all lines are returned.
154        Note that any ANSI escape codes (for color or cursor position) are
155        filtered out.
156
157        :param pattern: regex pattern to search for via `re.search`. To match
158                        line beginnings and ends you must use `^pattern$`.
159        :param timeout: seconds to wait until new lines arrive.
160
161        :return: received lines until matched pattern or None on timeout
162        """
163        lines = ""
164        start = time.time()
165        while True:
166            if time.time() - start > timeout:
167                break
168            if (new_lines := self.read_lines(0)) is not None:
169                lines += new_lines
170                if re.search(pattern, new_lines):
171                    return lines
172            time.sleep(0.1)
173        _LOGGER.warning(f"Waiting for '{pattern}' timed out after {timeout:.1f}s!")
174        return None
175
176    def wait_for_prompt(self, timeout: float = _TIMEOUT) -> list[str]:
177        """
178        Waits to the prompt to arrive in the stream.
179        Note that any ANSI escape codes (for color or cursor position) are
180        filtered out.
181
182        :param timeout: seconds to wait until the prompt arrives.
183
184        :return: all lines until the prompt arrives.
185        """
186        if prompts := self._read_packets(self._newline + self._prompt, timeout):
187            prompt = self._prompt + self._prompt.join(prompts)
188            return self._join(self._filter(prompt.split(self._newline)))
189        _LOGGER.warning(f"Waiting for '{self._prompt}' prompt timed out after {timeout:.1f}s!")
190        return None
191
192    def command(self, command: str, timeout: float = _TIMEOUT) -> str | None:
193        """
194        Send a command and return all lines until the next prompt.
195        If the command is asynchronous, you need to poll for new lines separately.
196        Note that any ANSI escape codes (for color or cursor position) are
197        filtered out.
198
199        :param command: command string to send to the command prompt.
200        :param timeout: seconds to wait until the prompt arrives.
201
202        :return: all lines from the command issue until the next prompt arrives.
203        """
204        self._serial.clear()
205        self._write_line(command)
206        if timeout is not None:
207            return self.wait_for_prompt(timeout)
208
209    def command_nowait(self, command: str):
210        """
211        Send a command to the command prompt without waiting for a response.
212
213        :param command: command string to send to the command prompt.
214        """
215        self.command(command, None)
216
217    def reboot(self, timeout: int = 15) -> str | None:
218        """
219        Send the reboot command and wait for the reboot to be completed.
220
221        :param timeout: seconds to wait until the prompt arrives.
222
223        :return: all lines from reboot until the next prompt.
224        """
225        return self.command("reboot", timeout)
226
227    def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool:
228        """
229        Check if the command prompt is responding to newline inputs with a prompt.
230        The total timeout is `attempts * timeout`!
231
232        :param timeout: seconds to wait until the prompt arrives.
233        :param attempts: number of times to send a newline and wait.
234        :return: `True` is command prompt responds, `False` otherwise
235        """
236        self._serial.clear()
237        attempt = 0
238        timeout = timeout / attempts
239        while attempt < attempts:
240            self._write_line("")
241            if self.wait_for_prompt(timeout) is not None:
242                return True
243            attempt += 1
244        return False

Manages a command prompt, in particular, receiving data in the background and logging it out to the INFO logger. Several convenience methods allow you to send a command and receive its response, or wait for a certain pattern to arrive in the stream.

CommandPrompt( reader_thread: serial.threaded.ReaderThread, protocol: emdbg.serial.protocol._CmdReader, prompt: str = None, newline: str = None)
58    def __init__(self, reader_thread: ReaderThread, protocol: _CmdReader,
59                 prompt: str = None, newline: str = None):
60        """
61        Use the `nsh` context manager to build this class correctly.
62
63        :param reader_thread: The background reader thread.
64        :param protocol: The command prompt protocol.
65        :param prompt: Optional prefix of the command prompt (default empty string).
66        :param newline: The newline characters used in the prompt (default `\\r\\n`).
67        """
68        self._serial = protocol
69        self._reader_thread = reader_thread
70        self._serial._data_received = self._print
71        self._print_data = ""
72        self._prompt = "" if prompt is None else prompt
73        self._newline = "\r\n" if newline is None else newline
74        self.filter_ansi_escapes = True
75        """Filter ANSI escape codes from the output."""
76        self._logfile = None
77        self.clear()

Use the nsh context manager to build this class correctly.

Parameters
  • reader_thread: The background reader thread.
  • protocol: The command prompt protocol.
  • prompt: Optional prefix of the command prompt (default empty string).
  • newline: The newline characters used in the prompt (default \r\n).
filter_ansi_escapes

Filter ANSI escape codes from the output.

def clear(self):
109    def clear(self):
110        """Clear the receive and transmit buffers."""
111        self._serial.clear()
112        self._print_data = ""

Clear the receive and transmit buffers.

def log_to_file( self, filename: pathlib.Path | str, add_datetime: bool = False) -> pathlib.Path:
114    def log_to_file(self, filename: Path | str, add_datetime: bool = False) -> Path:
115        """
116        Log the received data to `filename` or close the log file.
117
118        :param filename: file to log into. If `None`, the file is closed and
119                         logging is disabled.
120        :param add_datetime: appends the date and time to the filename.
121                             See `emdbg.utils.add_datetime`
122
123        :return: the actual filename with date and time (if selected).
124        """
125        if self._logfile is not None:
126            self._logfile.close()
127
128        if filename is None:
129            self._logfile = None
130            return None
131
132        filename = add_dt(filename) if add_datetime else Path(filename)
133        self._logfile = filename.open("wt")
134        return filename

Log the received data to filename or close the log file.

Parameters
  • filename: file to log into. If None, the file is closed and logging is disabled.
  • add_datetime: appends the date and time to the filename. See emdbg.utils.add_datetime
Returns

the actual filename with date and time (if selected).

def read_lines(self, timeout: float = 3) -> str | None:
136    def read_lines(self, timeout: float = _TIMEOUT) -> str | None:
137        """
138        Return any lines received within `timeout`.
139        Note that any ANSI escape codes (for color or cursor position) are
140        filtered out.
141
142        :param timeout: seconds to wait until new lines arrive.
143
144        :return: received lines or None on timeout
145        """
146        lines = self._filter(self._read_packets(self._newline, timeout))
147        return self._join(lines)

Return any lines received within timeout. Note that any ANSI escape codes (for color or cursor position) are filtered out.

Parameters
  • timeout: seconds to wait until new lines arrive.
Returns

received lines or None on timeout

def wait_for(self, pattern: str, timeout: float = 3) -> str | None:
149    def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None:
150        """
151        Waits for a regex pattern to appear in a line in the stream.
152        This function reads any new received lines and searches for `pattern` in
153        every line. If the line matches, all lines are returned.
154        Note that any ANSI escape codes (for color or cursor position) are
155        filtered out.
156
157        :param pattern: regex pattern to search for via `re.search`. To match
158                        line beginnings and ends you must use `^pattern$`.
159        :param timeout: seconds to wait until new lines arrive.
160
161        :return: received lines until matched pattern or None on timeout
162        """
163        lines = ""
164        start = time.time()
165        while True:
166            if time.time() - start > timeout:
167                break
168            if (new_lines := self.read_lines(0)) is not None:
169                lines += new_lines
170                if re.search(pattern, new_lines):
171                    return lines
172            time.sleep(0.1)
173        _LOGGER.warning(f"Waiting for '{pattern}' timed out after {timeout:.1f}s!")
174        return None

Waits for a regex pattern to appear in a line in the stream. This function reads any new received lines and searches for pattern in every line. If the line matches, all lines are returned. Note that any ANSI escape codes (for color or cursor position) are filtered out.

Parameters
  • pattern: regex pattern to search for via re.search. To match line beginnings and ends you must use ^pattern$.
  • timeout: seconds to wait until new lines arrive.
Returns

received lines until matched pattern or None on timeout

def wait_for_prompt(self, timeout: float = 3) -> list[str]:
176    def wait_for_prompt(self, timeout: float = _TIMEOUT) -> list[str]:
177        """
178        Waits to the prompt to arrive in the stream.
179        Note that any ANSI escape codes (for color or cursor position) are
180        filtered out.
181
182        :param timeout: seconds to wait until the prompt arrives.
183
184        :return: all lines until the prompt arrives.
185        """
186        if prompts := self._read_packets(self._newline + self._prompt, timeout):
187            prompt = self._prompt + self._prompt.join(prompts)
188            return self._join(self._filter(prompt.split(self._newline)))
189        _LOGGER.warning(f"Waiting for '{self._prompt}' prompt timed out after {timeout:.1f}s!")
190        return None

Waits to the prompt to arrive in the stream. Note that any ANSI escape codes (for color or cursor position) are filtered out.

Parameters
  • timeout: seconds to wait until the prompt arrives.
Returns

all lines until the prompt arrives.

def command(self, command: str, timeout: float = 3) -> str | None:
192    def command(self, command: str, timeout: float = _TIMEOUT) -> str | None:
193        """
194        Send a command and return all lines until the next prompt.
195        If the command is asynchronous, you need to poll for new lines separately.
196        Note that any ANSI escape codes (for color or cursor position) are
197        filtered out.
198
199        :param command: command string to send to the command prompt.
200        :param timeout: seconds to wait until the prompt arrives.
201
202        :return: all lines from the command issue until the next prompt arrives.
203        """
204        self._serial.clear()
205        self._write_line(command)
206        if timeout is not None:
207            return self.wait_for_prompt(timeout)

Send a command and return all lines until the next prompt. If the command is asynchronous, you need to poll for new lines separately. Note that any ANSI escape codes (for color or cursor position) are filtered out.

Parameters
  • command: command string to send to the command prompt.
  • timeout: seconds to wait until the prompt arrives.
Returns

all lines from the command issue until the next prompt arrives.

def command_nowait(self, command: str):
209    def command_nowait(self, command: str):
210        """
211        Send a command to the command prompt without waiting for a response.
212
213        :param command: command string to send to the command prompt.
214        """
215        self.command(command, None)

Send a command to the command prompt without waiting for a response.

Parameters
  • command: command string to send to the command prompt.
def reboot(self, timeout: int = 15) -> str | None:
217    def reboot(self, timeout: int = 15) -> str | None:
218        """
219        Send the reboot command and wait for the reboot to be completed.
220
221        :param timeout: seconds to wait until the prompt arrives.
222
223        :return: all lines from reboot until the next prompt.
224        """
225        return self.command("reboot", timeout)

Send the reboot command and wait for the reboot to be completed.

Parameters
  • timeout: seconds to wait until the prompt arrives.
Returns

all lines from reboot until the next prompt.

def is_alive(self, timeout: float = 3, attempts: int = 4) -> bool:
227    def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool:
228        """
229        Check if the command prompt is responding to newline inputs with a prompt.
230        The total timeout is `attempts * timeout`!
231
232        :param timeout: seconds to wait until the prompt arrives.
233        :param attempts: number of times to send a newline and wait.
234        :return: `True` is command prompt responds, `False` otherwise
235        """
236        self._serial.clear()
237        attempt = 0
238        timeout = timeout / attempts
239        while attempt < attempts:
240            self._write_line("")
241            if self.wait_for_prompt(timeout) is not None:
242                return True
243            attempt += 1
244        return False

Check if the command prompt is responding to newline inputs with a prompt. The total timeout is attempts * timeout!

Parameters
  • timeout: seconds to wait until the prompt arrives.
  • attempts: number of times to send a newline and wait.
Returns

True is command prompt responds, False otherwise

@contextmanager
def cmd( serial_or_port: str, baudrate: int = 115200, prompt: str = None, newline: str = None):
248@contextmanager
249def cmd(serial_or_port: str, baudrate: int = 115200, prompt: str = None, newline: str = None):
250    """
251    Opens a serial port with the `serial` number or filepath and closes it again.
252
253    :param serial_or_port: the serial number or the filepath of the port to
254                           connect to.
255    :param baudrate: the baudrate to use.
256    :param prompt: Optional prefix of the command prompt.
257    :param newline: The newline characters used in the prompt.
258
259    :raises `SerialException`: if serial port is not found.
260
261    :return: yields an initialized `CommandPrompt` object.
262    """
263    cmd = None
264    if "/" in serial_or_port:
265        ttyDevice = serial_or_port
266    else:
267        ttyDevice = find_serial_port(serial_or_port).device
268    try:
269        _LOGGER.info(f"Starting on port '{serial_or_port}'..."
270                     if serial_or_port else "Starting...")
271        device = Serial(ttyDevice, baudrate=baudrate)
272        reader_thread = ReaderThread(device, lambda: _CmdReader(device))
273        with reader_thread as reader:
274            cmd = CommandPrompt(reader_thread, reader, prompt, newline)
275            yield cmd
276    finally:
277        if cmd is not None: cmd.log_to_file(None)
278        _LOGGER.debug("Stopping.")

Opens a serial port with the serial number or filepath and closes it again.

Parameters
  • serial_or_port: the serial number or the filepath of the port to connect to.
  • baudrate: the baudrate to use.
  • prompt: Optional prefix of the command prompt.
  • newline: The newline characters used in the prompt.
Raises
  • SerialException: if serial port is not found.
Returns

yields an initialized CommandPrompt object.

@contextmanager
def nsh(serial_or_port: str, baudrate: int = 57600):
281@contextmanager
282def nsh(serial_or_port: str, baudrate: int = 57600):
283    """
284    Same as `cmd()` but with a `nsh> ` prompt for use with PX4.
285    """
286    with cmd(serial_or_port, baudrate, "nsh> ") as nsh:
287        yield nsh

Same as cmd() but with a nsh> prompt for use with PX4.