emdbg.serial.protocol
1# Copyright (c) 2023, Auterion AG 2# SPDX-License-Identifier: BSD-3-Clause 3 4from __future__ import annotations 5import re 6import time 7import logging 8from contextlib import contextmanager 9from pathlib import Path 10from serial import Serial 11from serial.threaded import ReaderThread, Protocol 12from .utils import find_serial_port, ansi_escape 13from ..utils import add_datetime as add_dt 14 15_LOGGER = logging.getLogger("serial:nsh") 16 17 18class _CmdReader(Protocol): 19 def __init__(self, device: Serial): 20 super().__init__() 21 self.stream = "" 22 self.device = device 23 self._data_received = lambda _: None 24 25 def data_received(self, data: bytes): 26 new_stream = data.decode("utf-8", errors="replace") 27 self._data_received(new_stream) 28 self.stream += new_stream 29 30 def read_packets(self, separator: str) -> list[str]: 31 if separator not in self.stream: 32 return [] 33 *packets, self.stream = self.stream.split(separator) 34 return packets 35 36 def clear_input(self): 37 self.device.reset_input_buffer() 38 self.stream = "" 39 40 def clear_output(self): 41 self.device.reset_output_buffer() 42 43 def clear(self): 44 self.clear_output() 45 self.clear_input() 46 47 48class CommandPrompt: 49 """ 50 Manages a command prompt, in particular, receiving data in the background 51 and logging it out to the INFO logger. 52 Several convenience methods allow you to send a command and receive its 53 response, or wait for a certain pattern to arrive in the stream. 54 """ 55 _TIMEOUT = 3 56 57 def __init__(self, reader_thread: ReaderThread, protocol: _CmdReader, 58 prompt: str = None, newline: str = None): 59 """ 60 Use the `nsh` context manager to build this class correctly. 61 62 :param reader_thread: The background reader thread. 63 :param protocol: The command prompt protocol. 64 :param prompt: Optional prefix of the command prompt (default empty string). 65 :param newline: The newline characters used in the prompt (default `\\r\\n`). 66 """ 67 self._serial = protocol 68 self._reader_thread = reader_thread 69 self._serial._data_received = self._print 70 self._print_data = "" 71 self._prompt = "" if prompt is None else prompt 72 self._newline = "\r\n" if newline is None else newline 73 self.filter_ansi_escapes = True 74 """Filter ANSI escape codes from the output.""" 75 self._logfile = None 76 self.clear() 77 78 def _write_line(self, line): 79 self._reader_thread.write((line + "\n").encode("utf-8")) 80 81 def _print(self, data: str): 82 self._print_data += data 83 if self._newline in self._print_data: 84 *lines, self._print_data = self._print_data.split(self._newline) 85 for line in self._filter(lines): 86 _LOGGER.debug(line) 87 if self._logfile is not None: 88 self._logfile.write(line + "\n") 89 90 def _filter(self, lines): 91 if self.filter_ansi_escapes: 92 lines = list(map(ansi_escape, lines)) 93 return lines 94 95 def _read_packets(self, separator: str, timeout: float = _TIMEOUT) -> list[str]: 96 start = time.time() 97 while True: 98 if packets := self._serial.read_packets(separator): 99 return packets 100 if time.time() - start > timeout: 101 break 102 time.sleep(0.1) 103 return [] 104 105 def _join(self, lines: list[str]) -> str | None: 106 return self._newline.join(lines) if lines else None 107 108 def clear(self): 109 """Clear the receive and transmit buffers.""" 110 self._serial.clear() 111 self._print_data = "" 112 113 def log_to_file(self, filename: Path | str, add_datetime: bool = False) -> Path: 114 """ 115 Log the received data to `filename` or close the log file. 116 117 :param filename: file to log into. If `None`, the file is closed and 118 logging is disabled. 119 :param add_datetime: appends the date and time to the filename. 120 See `emdbg.utils.add_datetime` 121 122 :return: the actual filename with date and time (if selected). 123 """ 124 if self._logfile is not None: 125 self._logfile.close() 126 127 if filename is None: 128 self._logfile = None 129 return None 130 131 filename = add_dt(filename) if add_datetime else Path(filename) 132 self._logfile = filename.open("wt") 133 return filename 134 135 def read_lines(self, timeout: float = _TIMEOUT) -> str | None: 136 """ 137 Return any lines received within `timeout`. 138 Note that any ANSI escape codes (for color or cursor position) are 139 filtered out. 140 141 :param timeout: seconds to wait until new lines arrive. 142 143 :return: received lines or None on timeout 144 """ 145 lines = self._filter(self._read_packets(self._newline, timeout)) 146 return self._join(lines) 147 148 def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None: 149 """ 150 Waits for a regex pattern to appear in a line in the stream. 151 This function reads any new received lines and searches for `pattern` in 152 every line. If the line matches, all lines are returned. 153 Note that any ANSI escape codes (for color or cursor position) are 154 filtered out. 155 156 :param pattern: regex pattern to search for via `re.search`. To match 157 line beginnings and ends you must use `^pattern$`. 158 :param timeout: seconds to wait until new lines arrive. 159 160 :return: received lines until matched pattern or None on timeout 161 """ 162 lines = "" 163 start = time.time() 164 while True: 165 if time.time() - start > timeout: 166 break 167 if (new_lines := self.read_lines(0)) is not None: 168 lines += new_lines 169 if re.search(pattern, new_lines): 170 return lines 171 time.sleep(0.1) 172 _LOGGER.warning(f"Waiting for '{pattern}' timed out after {timeout:.1f}s!") 173 return None 174 175 def wait_for_prompt(self, timeout: float = _TIMEOUT) -> list[str]: 176 """ 177 Waits to the prompt to arrive in the stream. 178 Note that any ANSI escape codes (for color or cursor position) are 179 filtered out. 180 181 :param timeout: seconds to wait until the prompt arrives. 182 183 :return: all lines until the prompt arrives. 184 """ 185 if prompts := self._read_packets(self._newline + self._prompt, timeout): 186 prompt = self._prompt + self._prompt.join(prompts) 187 return self._join(self._filter(prompt.split(self._newline))) 188 _LOGGER.warning(f"Waiting for '{self._prompt}' prompt timed out after {timeout:.1f}s!") 189 return None 190 191 def command(self, command: str, timeout: float = _TIMEOUT) -> str | None: 192 """ 193 Send a command and return all lines until the next prompt. 194 If the command is asynchronous, you need to poll for new lines separately. 195 Note that any ANSI escape codes (for color or cursor position) are 196 filtered out. 197 198 :param command: command string to send to the command prompt. 199 :param timeout: seconds to wait until the prompt arrives. 200 201 :return: all lines from the command issue until the next prompt arrives. 202 """ 203 self._serial.clear() 204 self._write_line(command) 205 if timeout is not None: 206 return self.wait_for_prompt(timeout) 207 208 def command_nowait(self, command: str): 209 """ 210 Send a command to the command prompt without waiting for a response. 211 212 :param command: command string to send to the command prompt. 213 """ 214 self.command(command, None) 215 216 def reboot(self, timeout: int = 15) -> str | None: 217 """ 218 Send the reboot command and wait for the reboot to be completed. 219 220 :param timeout: seconds to wait until the prompt arrives. 221 222 :return: all lines from reboot until the next prompt. 223 """ 224 return self.command("reboot", timeout) 225 226 def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool: 227 """ 228 Check if the command prompt is responding to newline inputs with a prompt. 229 The total timeout is `attempts * timeout`! 230 231 :param timeout: seconds to wait until the prompt arrives. 232 :param attempts: number of times to send a newline and wait. 233 :return: `True` is command prompt responds, `False` otherwise 234 """ 235 self._serial.clear() 236 attempt = 0 237 timeout = timeout / attempts 238 while attempt < attempts: 239 self._write_line("") 240 if self.wait_for_prompt(timeout) is not None: 241 return True 242 attempt += 1 243 return False 244 245 246# ----------------------------------------------------------------------------- 247@contextmanager 248def cmd(serial_or_port: str, baudrate: int = 115200, prompt: str = None, newline: str = None): 249 """ 250 Opens a serial port with the `serial` number or filepath and closes it again. 251 252 :param serial_or_port: the serial number or the filepath of the port to 253 connect to. 254 :param baudrate: the baudrate to use. 255 :param prompt: Optional prefix of the command prompt. 256 :param newline: The newline characters used in the prompt. 257 258 :raises `SerialException`: if serial port is not found. 259 260 :return: yields an initialized `CommandPrompt` object. 261 """ 262 cmd = None 263 if "/" in serial_or_port: 264 ttyDevice = serial_or_port 265 else: 266 ttyDevice = find_serial_port(serial_or_port).device 267 try: 268 _LOGGER.info(f"Starting on port '{serial_or_port}'..." 269 if serial_or_port else "Starting...") 270 device = Serial(ttyDevice, baudrate=baudrate) 271 reader_thread = ReaderThread(device, lambda: _CmdReader(device)) 272 with reader_thread as reader: 273 cmd = CommandPrompt(reader_thread, reader, prompt, newline) 274 yield cmd 275 finally: 276 if cmd is not None: cmd.log_to_file(None) 277 _LOGGER.debug("Stopping.") 278 279 280@contextmanager 281def nsh(serial_or_port: str, baudrate: int = 57600): 282 """ 283 Same as `cmd()` but with a `nsh> ` prompt for use with PX4. 284 """ 285 with cmd(serial_or_port, baudrate, "nsh> ") as nsh: 286 yield nsh 287 288 289# ----------------------------------------------------------------------------- 290# We need to monkey patch the ReaderThread.run() function to prevent a 291# "device not ready" error to abort the reader thread. 292def _patched_run(self): 293 from serial import SerialException 294 self.serial.timeout = 0.1 295 self.protocol = self.protocol_factory() 296 try: 297 self.protocol.connection_made(self) 298 except Exception as e: 299 self.alive = False 300 self.protocol.connection_lost(e) 301 self._connection_made.set() 302 return 303 error = None 304 self._connection_made.set() 305 while self.alive and self.serial.is_open: 306 try: 307 data = self.serial.read(self.serial.in_waiting or 1) 308 except SerialException as e: 309 if self.alive and "readiness" in str(e): 310 # _LOGGER.debug(e) 311 continue 312 error = e 313 break 314 else: 315 if data: 316 try: 317 self.protocol.data_received(data) 318 except Exception as e: 319 error = e 320 break 321 self.alive = False 322 self.protocol.connection_lost(error) 323 self.protocol = None 324 325ReaderThread.run = _patched_run
49class CommandPrompt: 50 """ 51 Manages a command prompt, in particular, receiving data in the background 52 and logging it out to the INFO logger. 53 Several convenience methods allow you to send a command and receive its 54 response, or wait for a certain pattern to arrive in the stream. 55 """ 56 _TIMEOUT = 3 57 58 def __init__(self, reader_thread: ReaderThread, protocol: _CmdReader, 59 prompt: str = None, newline: str = None): 60 """ 61 Use the `nsh` context manager to build this class correctly. 62 63 :param reader_thread: The background reader thread. 64 :param protocol: The command prompt protocol. 65 :param prompt: Optional prefix of the command prompt (default empty string). 66 :param newline: The newline characters used in the prompt (default `\\r\\n`). 67 """ 68 self._serial = protocol 69 self._reader_thread = reader_thread 70 self._serial._data_received = self._print 71 self._print_data = "" 72 self._prompt = "" if prompt is None else prompt 73 self._newline = "\r\n" if newline is None else newline 74 self.filter_ansi_escapes = True 75 """Filter ANSI escape codes from the output.""" 76 self._logfile = None 77 self.clear() 78 79 def _write_line(self, line): 80 self._reader_thread.write((line + "\n").encode("utf-8")) 81 82 def _print(self, data: str): 83 self._print_data += data 84 if self._newline in self._print_data: 85 *lines, self._print_data = self._print_data.split(self._newline) 86 for line in self._filter(lines): 87 _LOGGER.debug(line) 88 if self._logfile is not None: 89 self._logfile.write(line + "\n") 90 91 def _filter(self, lines): 92 if self.filter_ansi_escapes: 93 lines = list(map(ansi_escape, lines)) 94 return lines 95 96 def _read_packets(self, separator: str, timeout: float = _TIMEOUT) -> list[str]: 97 start = time.time() 98 while True: 99 if packets := self._serial.read_packets(separator): 100 return packets 101 if time.time() - start > timeout: 102 break 103 time.sleep(0.1) 104 return [] 105 106 def _join(self, lines: list[str]) -> str | None: 107 return self._newline.join(lines) if lines else None 108 109 def clear(self): 110 """Clear the receive and transmit buffers.""" 111 self._serial.clear() 112 self._print_data = "" 113 114 def log_to_file(self, filename: Path | str, add_datetime: bool = False) -> Path: 115 """ 116 Log the received data to `filename` or close the log file. 117 118 :param filename: file to log into. If `None`, the file is closed and 119 logging is disabled. 120 :param add_datetime: appends the date and time to the filename. 121 See `emdbg.utils.add_datetime` 122 123 :return: the actual filename with date and time (if selected). 124 """ 125 if self._logfile is not None: 126 self._logfile.close() 127 128 if filename is None: 129 self._logfile = None 130 return None 131 132 filename = add_dt(filename) if add_datetime else Path(filename) 133 self._logfile = filename.open("wt") 134 return filename 135 136 def read_lines(self, timeout: float = _TIMEOUT) -> str | None: 137 """ 138 Return any lines received within `timeout`. 139 Note that any ANSI escape codes (for color or cursor position) are 140 filtered out. 141 142 :param timeout: seconds to wait until new lines arrive. 143 144 :return: received lines or None on timeout 145 """ 146 lines = self._filter(self._read_packets(self._newline, timeout)) 147 return self._join(lines) 148 149 def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None: 150 """ 151 Waits for a regex pattern to appear in a line in the stream. 152 This function reads any new received lines and searches for `pattern` in 153 every line. If the line matches, all lines are returned. 154 Note that any ANSI escape codes (for color or cursor position) are 155 filtered out. 156 157 :param pattern: regex pattern to search for via `re.search`. To match 158 line beginnings and ends you must use `^pattern$`. 159 :param timeout: seconds to wait until new lines arrive. 160 161 :return: received lines until matched pattern or None on timeout 162 """ 163 lines = "" 164 start = time.time() 165 while True: 166 if time.time() - start > timeout: 167 break 168 if (new_lines := self.read_lines(0)) is not None: 169 lines += new_lines 170 if re.search(pattern, new_lines): 171 return lines 172 time.sleep(0.1) 173 _LOGGER.warning(f"Waiting for '{pattern}' timed out after {timeout:.1f}s!") 174 return None 175 176 def wait_for_prompt(self, timeout: float = _TIMEOUT) -> list[str]: 177 """ 178 Waits to the prompt to arrive in the stream. 179 Note that any ANSI escape codes (for color or cursor position) are 180 filtered out. 181 182 :param timeout: seconds to wait until the prompt arrives. 183 184 :return: all lines until the prompt arrives. 185 """ 186 if prompts := self._read_packets(self._newline + self._prompt, timeout): 187 prompt = self._prompt + self._prompt.join(prompts) 188 return self._join(self._filter(prompt.split(self._newline))) 189 _LOGGER.warning(f"Waiting for '{self._prompt}' prompt timed out after {timeout:.1f}s!") 190 return None 191 192 def command(self, command: str, timeout: float = _TIMEOUT) -> str | None: 193 """ 194 Send a command and return all lines until the next prompt. 195 If the command is asynchronous, you need to poll for new lines separately. 196 Note that any ANSI escape codes (for color or cursor position) are 197 filtered out. 198 199 :param command: command string to send to the command prompt. 200 :param timeout: seconds to wait until the prompt arrives. 201 202 :return: all lines from the command issue until the next prompt arrives. 203 """ 204 self._serial.clear() 205 self._write_line(command) 206 if timeout is not None: 207 return self.wait_for_prompt(timeout) 208 209 def command_nowait(self, command: str): 210 """ 211 Send a command to the command prompt without waiting for a response. 212 213 :param command: command string to send to the command prompt. 214 """ 215 self.command(command, None) 216 217 def reboot(self, timeout: int = 15) -> str | None: 218 """ 219 Send the reboot command and wait for the reboot to be completed. 220 221 :param timeout: seconds to wait until the prompt arrives. 222 223 :return: all lines from reboot until the next prompt. 224 """ 225 return self.command("reboot", timeout) 226 227 def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool: 228 """ 229 Check if the command prompt is responding to newline inputs with a prompt. 230 The total timeout is `attempts * timeout`! 231 232 :param timeout: seconds to wait until the prompt arrives. 233 :param attempts: number of times to send a newline and wait. 234 :return: `True` is command prompt responds, `False` otherwise 235 """ 236 self._serial.clear() 237 attempt = 0 238 timeout = timeout / attempts 239 while attempt < attempts: 240 self._write_line("") 241 if self.wait_for_prompt(timeout) is not None: 242 return True 243 attempt += 1 244 return False
Manages a command prompt, in particular, receiving data in the background and logging it out to the INFO logger. Several convenience methods allow you to send a command and receive its response, or wait for a certain pattern to arrive in the stream.
58 def __init__(self, reader_thread: ReaderThread, protocol: _CmdReader, 59 prompt: str = None, newline: str = None): 60 """ 61 Use the `nsh` context manager to build this class correctly. 62 63 :param reader_thread: The background reader thread. 64 :param protocol: The command prompt protocol. 65 :param prompt: Optional prefix of the command prompt (default empty string). 66 :param newline: The newline characters used in the prompt (default `\\r\\n`). 67 """ 68 self._serial = protocol 69 self._reader_thread = reader_thread 70 self._serial._data_received = self._print 71 self._print_data = "" 72 self._prompt = "" if prompt is None else prompt 73 self._newline = "\r\n" if newline is None else newline 74 self.filter_ansi_escapes = True 75 """Filter ANSI escape codes from the output.""" 76 self._logfile = None 77 self.clear()
Use the nsh
context manager to build this class correctly.
Parameters
- reader_thread: The background reader thread.
- protocol: The command prompt protocol.
- prompt: Optional prefix of the command prompt (default empty string).
- newline: The newline characters used in the prompt (default
\r\n
).
109 def clear(self): 110 """Clear the receive and transmit buffers.""" 111 self._serial.clear() 112 self._print_data = ""
Clear the receive and transmit buffers.
114 def log_to_file(self, filename: Path | str, add_datetime: bool = False) -> Path: 115 """ 116 Log the received data to `filename` or close the log file. 117 118 :param filename: file to log into. If `None`, the file is closed and 119 logging is disabled. 120 :param add_datetime: appends the date and time to the filename. 121 See `emdbg.utils.add_datetime` 122 123 :return: the actual filename with date and time (if selected). 124 """ 125 if self._logfile is not None: 126 self._logfile.close() 127 128 if filename is None: 129 self._logfile = None 130 return None 131 132 filename = add_dt(filename) if add_datetime else Path(filename) 133 self._logfile = filename.open("wt") 134 return filename
Log the received data to filename
or close the log file.
Parameters
- filename: file to log into. If
None
, the file is closed and logging is disabled. - add_datetime: appends the date and time to the filename.
See
emdbg.utils.add_datetime
Returns
the actual filename with date and time (if selected).
136 def read_lines(self, timeout: float = _TIMEOUT) -> str | None: 137 """ 138 Return any lines received within `timeout`. 139 Note that any ANSI escape codes (for color or cursor position) are 140 filtered out. 141 142 :param timeout: seconds to wait until new lines arrive. 143 144 :return: received lines or None on timeout 145 """ 146 lines = self._filter(self._read_packets(self._newline, timeout)) 147 return self._join(lines)
Return any lines received within timeout
.
Note that any ANSI escape codes (for color or cursor position) are
filtered out.
Parameters
- timeout: seconds to wait until new lines arrive.
Returns
received lines or None on timeout
149 def wait_for(self, pattern: str, timeout: float = _TIMEOUT) -> str | None: 150 """ 151 Waits for a regex pattern to appear in a line in the stream. 152 This function reads any new received lines and searches for `pattern` in 153 every line. If the line matches, all lines are returned. 154 Note that any ANSI escape codes (for color or cursor position) are 155 filtered out. 156 157 :param pattern: regex pattern to search for via `re.search`. To match 158 line beginnings and ends you must use `^pattern$`. 159 :param timeout: seconds to wait until new lines arrive. 160 161 :return: received lines until matched pattern or None on timeout 162 """ 163 lines = "" 164 start = time.time() 165 while True: 166 if time.time() - start > timeout: 167 break 168 if (new_lines := self.read_lines(0)) is not None: 169 lines += new_lines 170 if re.search(pattern, new_lines): 171 return lines 172 time.sleep(0.1) 173 _LOGGER.warning(f"Waiting for '{pattern}' timed out after {timeout:.1f}s!") 174 return None
Waits for a regex pattern to appear in a line in the stream.
This function reads any new received lines and searches for pattern
in
every line. If the line matches, all lines are returned.
Note that any ANSI escape codes (for color or cursor position) are
filtered out.
Parameters
- pattern: regex pattern to search for via
re.search
. To match line beginnings and ends you must use^pattern$
. - timeout: seconds to wait until new lines arrive.
Returns
received lines until matched pattern or None on timeout
176 def wait_for_prompt(self, timeout: float = _TIMEOUT) -> list[str]: 177 """ 178 Waits to the prompt to arrive in the stream. 179 Note that any ANSI escape codes (for color or cursor position) are 180 filtered out. 181 182 :param timeout: seconds to wait until the prompt arrives. 183 184 :return: all lines until the prompt arrives. 185 """ 186 if prompts := self._read_packets(self._newline + self._prompt, timeout): 187 prompt = self._prompt + self._prompt.join(prompts) 188 return self._join(self._filter(prompt.split(self._newline))) 189 _LOGGER.warning(f"Waiting for '{self._prompt}' prompt timed out after {timeout:.1f}s!") 190 return None
Waits to the prompt to arrive in the stream. Note that any ANSI escape codes (for color or cursor position) are filtered out.
Parameters
- timeout: seconds to wait until the prompt arrives.
Returns
all lines until the prompt arrives.
192 def command(self, command: str, timeout: float = _TIMEOUT) -> str | None: 193 """ 194 Send a command and return all lines until the next prompt. 195 If the command is asynchronous, you need to poll for new lines separately. 196 Note that any ANSI escape codes (for color or cursor position) are 197 filtered out. 198 199 :param command: command string to send to the command prompt. 200 :param timeout: seconds to wait until the prompt arrives. 201 202 :return: all lines from the command issue until the next prompt arrives. 203 """ 204 self._serial.clear() 205 self._write_line(command) 206 if timeout is not None: 207 return self.wait_for_prompt(timeout)
Send a command and return all lines until the next prompt. If the command is asynchronous, you need to poll for new lines separately. Note that any ANSI escape codes (for color or cursor position) are filtered out.
Parameters
- command: command string to send to the command prompt.
- timeout: seconds to wait until the prompt arrives.
Returns
all lines from the command issue until the next prompt arrives.
209 def command_nowait(self, command: str): 210 """ 211 Send a command to the command prompt without waiting for a response. 212 213 :param command: command string to send to the command prompt. 214 """ 215 self.command(command, None)
Send a command to the command prompt without waiting for a response.
Parameters
- command: command string to send to the command prompt.
217 def reboot(self, timeout: int = 15) -> str | None: 218 """ 219 Send the reboot command and wait for the reboot to be completed. 220 221 :param timeout: seconds to wait until the prompt arrives. 222 223 :return: all lines from reboot until the next prompt. 224 """ 225 return self.command("reboot", timeout)
Send the reboot command and wait for the reboot to be completed.
Parameters
- timeout: seconds to wait until the prompt arrives.
Returns
all lines from reboot until the next prompt.
227 def is_alive(self, timeout: float = _TIMEOUT, attempts: int = 4) -> bool: 228 """ 229 Check if the command prompt is responding to newline inputs with a prompt. 230 The total timeout is `attempts * timeout`! 231 232 :param timeout: seconds to wait until the prompt arrives. 233 :param attempts: number of times to send a newline and wait. 234 :return: `True` is command prompt responds, `False` otherwise 235 """ 236 self._serial.clear() 237 attempt = 0 238 timeout = timeout / attempts 239 while attempt < attempts: 240 self._write_line("") 241 if self.wait_for_prompt(timeout) is not None: 242 return True 243 attempt += 1 244 return False
Check if the command prompt is responding to newline inputs with a prompt.
The total timeout is attempts * timeout
!
Parameters
- timeout: seconds to wait until the prompt arrives.
- attempts: number of times to send a newline and wait.
Returns
True
is command prompt responds,False
otherwise
248@contextmanager 249def cmd(serial_or_port: str, baudrate: int = 115200, prompt: str = None, newline: str = None): 250 """ 251 Opens a serial port with the `serial` number or filepath and closes it again. 252 253 :param serial_or_port: the serial number or the filepath of the port to 254 connect to. 255 :param baudrate: the baudrate to use. 256 :param prompt: Optional prefix of the command prompt. 257 :param newline: The newline characters used in the prompt. 258 259 :raises `SerialException`: if serial port is not found. 260 261 :return: yields an initialized `CommandPrompt` object. 262 """ 263 cmd = None 264 if "/" in serial_or_port: 265 ttyDevice = serial_or_port 266 else: 267 ttyDevice = find_serial_port(serial_or_port).device 268 try: 269 _LOGGER.info(f"Starting on port '{serial_or_port}'..." 270 if serial_or_port else "Starting...") 271 device = Serial(ttyDevice, baudrate=baudrate) 272 reader_thread = ReaderThread(device, lambda: _CmdReader(device)) 273 with reader_thread as reader: 274 cmd = CommandPrompt(reader_thread, reader, prompt, newline) 275 yield cmd 276 finally: 277 if cmd is not None: cmd.log_to_file(None) 278 _LOGGER.debug("Stopping.")
Opens a serial port with the serial
number or filepath and closes it again.
Parameters
- serial_or_port: the serial number or the filepath of the port to connect to.
- baudrate: the baudrate to use.
- prompt: Optional prefix of the command prompt.
- newline: The newline characters used in the prompt.
Raises
SerialException
: if serial port is not found.
Returns
yields an initialized
CommandPrompt
object.
281@contextmanager 282def nsh(serial_or_port: str, baudrate: int = 57600): 283 """ 284 Same as `cmd()` but with a `nsh> ` prompt for use with PX4. 285 """ 286 with cmd(serial_or_port, baudrate, "nsh> ") as nsh: 287 yield nsh
Same as cmd()
but with a nsh>
prompt for use with PX4.