emdbg.debug.px4.task

  1# Copyright (c) 2023, Auterion AG
  2# SPDX-License-Identifier: BSD-3-Clause
  3
  4from __future__ import annotations
  5from functools import cached_property
  6from . import utils
  7from .system_load import system_load
  8from .device import Device
  9from .base import Base
 10from dataclasses import dataclass
 11from collections import defaultdict
 12import rich.box
 13from rich.text import Text
 14from rich.table import Table
 15
 16# The mapping from register name to position on the thread stack
 17_XCP_REGS_MAP: dict[str, int] = {
 18    "msp":     0,
 19    "basepri": 1,
 20    "r4":      2,
 21    "r5":      3,
 22    "r6":      4,
 23    "r7":      5,
 24    "r8":      6,
 25    "r9":      7,
 26    "r10":     8,
 27    "r11":     9,
 28    "s16":     11 + 0,
 29    "s17":     11 + 1,
 30    "s18":     11 + 2,
 31    "s19":     11 + 3,
 32    "s20":     11 + 4,
 33    "s21":     11 + 5,
 34    "s22":     11 + 6,
 35    "s23":     11 + 7,
 36    "s24":     11 + 8,
 37    "s25":     11 + 9,
 38    "s26":     11 + 10,
 39    "s27":     11 + 11,
 40    "s28":     11 + 12,
 41    "s29":     11 + 13,
 42    "s30":     11 + 14,
 43    "s31":     11 + 15,
 44    "r0":      27 + 0,
 45    "r1":      27 + 1,
 46    "r2":      27 + 2,
 47    "r3":      27 + 3,
 48    "r12":     27 + 4,
 49    "r14":     27 + 5,
 50    "r15":     27 + 6,
 51    "xpsr":    27 + 7,
 52    "s0":      27 + 8,
 53    "s1":      27 + 9,
 54    "s2":      27 + 10,
 55    "s3":      27 + 11,
 56    "s4":      27 + 12,
 57    "s5":      27 + 13,
 58    "s6":      27 + 14,
 59    "s7":      27 + 15,
 60    "s8":      27 + 16,
 61    "s9":      27 + 17,
 62    "s10":     27 + 18,
 63    "s11":     27 + 19,
 64    "s12":     27 + 20,
 65    "s13":     27 + 21,
 66    "s14":     27 + 22,
 67    "s15":     27 + 23,
 68    "fpscr":   27 + 24,
 69}
 70
 71class Task(Base):
 72    """
 73    NuttX task
 74    """
 75    _STACK_COLOR = 0xdeadbeef
 76    _FILE_DESCRIPTORS_PER_BLOCK = 6 # TODO query from ELF
 77
 78    @dataclass
 79    class Load:
 80        total: int
 81        """Total task runtime in µs"""
 82        interval: int
 83        """Interval task runtime in µs since last sample"""
 84        delta: int
 85        """Task runtime in µs within the interval"""
 86
 87        @property
 88        def relative(self) -> float:
 89            """Relative runtime within the interval"""
 90            return self.delta / self.interval if self.interval else 0
 91
 92    def __init__(self, gdb, tcb_ptr: "gdb.Value"):
 93        super().__init__(gdb)
 94        self._tcb = tcb_ptr
 95        self._system_load = system_load(self._gdb)
 96        self.pid = self._tcb["pid"]
 97        self.init_priority = int(self._tcb["init_priority"])
 98        self.stack_limit = int(self._tcb["adj_stack_size"])
 99        self.stack_ptr = self._tcb["stack_base_ptr"]
100        self._is_running_switched = None
101
102    @cached_property
103    def name(self) -> str:
104        """Name of the task"""
105        try:
106            return self._tcb["name"].string()
107        except:
108            return "?"
109
110    @cached_property
111    def sched_priority(self) -> int:
112        """The scheduled priority of the task"""
113        return int(self._tcb["sched_priority"])
114
115    @cached_property
116    def _statenames(self):
117        return self._gdb.types.make_enum_dict(self._gdb.lookup_type("enum tstate_e"))
118
119    @cached_property
120    def state(self) -> str:
121        """Task state name"""
122        for name, value in self._statenames.items():
123            if value == self._tcb["task_state"]:
124                return name
125        return "UNKNOWN"
126
127    @cached_property
128    def short_state(self) -> str:
129        """The short name of the task state"""
130        mapping = {
131            "TASK_PENDING": "PEND", "TASK_READYTORUN": "READY", "TASK_RUNNING": "RUN",
132            "WAIT_SEM": "w:sem", "WAIT_SIG": "w:sig"}
133        return mapping.get(self.state.replace("TSTATE_", ""), "???")
134
135    def _is_state_in(self, *states: list[str]) -> bool:
136        states = {int(self._statenames[s]) for s in states}
137        if int(self._tcb["task_state"]) in states:
138            return True
139        return False
140
141    @cached_property
142    def is_waiting(self) -> bool:
143        """The task is waiting for a semaphore or signal"""
144        if self._is_state_in("TSTATE_WAIT_SEM", "TSTATE_WAIT_SIG"):
145            return True
146
147    @cached_property
148    def is_runnable(self) -> bool:
149        """The task is pending, ready to run, or running"""
150        return self._is_state_in("TSTATE_TASK_PENDING",
151                                 "TSTATE_TASK_READYTORUN",
152                                 "TSTATE_TASK_RUNNING")
153
154    @cached_property
155    def stack_used(self) -> int:
156        """The amount of stack used by the thread in bytes"""
157        if not self.stack_ptr: return 0
158        stack_u32p = self.stack_ptr.cast(self._gdb.lookup_type("unsigned int").pointer())
159        # Stack grows from top to bottom, we do a binary search for the
160        # 0xdeadbeef value from the bottom upwards
161        watermark = utils.binary_search_last(stack_u32p, self._STACK_COLOR, hi=self.stack_limit // 4) + 1
162        # validate the binary search (does not seem necessary)
163        # for ii in range(0, watermark):
164        #     if stack_u32p[ii] != self._STACK_COLOR:
165        #         print(f"{self.name}: Correcting stack size from {watermark * 4} to {ii * 4}!")
166        #         return ii * 4
167        return self.stack_limit - watermark * 4
168
169    @cached_property
170    def waiting_for(self) -> str:
171        """
172        What the task is waiting for. If its a semaphore, return an object,
173        otherwise a string.
174        """
175        if self._is_state_in("TSTATE_WAIT_SEM"):
176            from .semaphore import Semaphore
177            sem = self._tcb['waitsem']
178            ostr = f"{int(sem):#08x} "
179            if descr := self.description_at(sem): ostr += f"<{descr}> "
180            ostr += Semaphore(self._gdb, sem).to_string()
181            return ostr
182        if self._is_state_in("TSTATE_WAIT_SIG"):
183            return "signal"
184        return ""
185
186    @cached_property
187    def files(self) -> list["gdb.Value"]:
188        """The list of inode pointers the task holds"""
189        filelist = self._tcb["group"]["tg_filelist"]
190        rows = filelist["fl_rows"]
191        files = filelist["fl_files"]
192        result = []
193        for ri in range(rows):
194            for ci in range(self._FILE_DESCRIPTORS_PER_BLOCK):
195                file = files[ri][ci]
196                if inode := file["f_inode"]:
197                    result.append(file)
198        return result
199
200    @cached_property
201    def location(self) -> "gdb.Block":
202        """The block the task is currently executing"""
203        if self.is_current_task:
204            pc = self.read_register("pc")
205        else:
206            pc = self._tcb["xcp"]["regs"][32]
207        block = self.block(pc)
208        while block and not block.function:
209            block = block.superblock
210        return block.function if block else int(pc)
211
212    def switch_to(self) -> bool:
213        """Switch to this task by writing the register file"""
214        if self.is_current_task:
215            return False
216        regs = {name: self._tcb["xcp"]["regs"][offset]
217                for name, offset in _XCP_REGS_MAP.items()}
218        regs = self.fix_nuttx_sp(regs)
219        self.write_registers(regs)
220        self._is_running_switched = True
221        return True
222
223    def switch_from(self) -> dict[str, int] | None:
224        """Switch to this task by writing the register file"""
225        if not self.is_current_task:
226            return None
227        self._is_running_switched = False
228        if self.short_state == "RUN":
229            self._is_running_switched = None
230        return self.registers
231
232    @cached_property
233    def is_current_task(self) -> bool:
234        """If the task is currently running"""
235        if self._is_running_switched is not None:
236            return self._is_running_switched
237        return self.short_state == "RUN"
238
239    @cached_property
240    def load(self) -> Load:
241        """The task load based on the system load monitor"""
242        if self._system_load is None: return self.Load(0, 0, 0)
243        _, interval, sl = self._system_load.sample
244        total, delta = sl.get(int(self._tcb), (0,0))
245        return self.Load(total, interval, delta)
246
247    def __repr__(self) -> str:
248        return f"Task({self.name}, {self.pid})"
249
250    def __str__(self) -> str:
251        ostr = self.__repr__() + f": state={self.state}, "
252        ostr += f"prio={self.sched_priority}({self.init_priority}), "
253        ostr += f"stack={self.stack_used}/{self.stack_limit}"
254        if waiting := self.waiting_for:
255            ostr += f", waiting={waiting}"
256        return ostr
257
258
259def all_tasks(gdb) -> list[Task]:
260    """Return a list of all tasks"""
261    type_tcb_s = gdb.lookup_type("struct tcb_s").pointer()
262    def _tasks(name):
263        tcbs = []
264        if (task_list := gdb.lookup_global_symbol(name)) is None:
265            return []
266        task_list = task_list.value()
267        current_task = task_list["head"]
268        if current_task:
269            while True:
270                tcbs.append(Task(gdb, current_task.cast(type_tcb_s)))
271                if current_task == task_list["tail"]:
272                    break
273                next_task = current_task["flink"]
274                # if next_task["blink"] == current_task:
275                #   LOGGER.error(f"Task linkage is broken in {tasks}!")
276                #   break
277                current_task = next_task
278        return tcbs
279
280    tcbs  = _tasks("g_pendingtasks")
281    tcbs += _tasks("g_readytorun")
282    tcbs += _tasks("g_waitingforsemaphore")
283    tcbs += _tasks("g_waitingforsignal")
284    tcbs += _tasks("g_inactivetasks")
285    return tcbs
286
287
288_RESTORE_REGISTERS = None
289def task_switch(gdb, pid: int) -> bool:
290    """
291    Switch to another task.
292    On initial switch the current register file is saved and can be restored
293    by passing a PID <0.
294
295    :param pid: the PID of the task to switch to, or <0 to restore initial task.
296    :return: Success of switching operation.
297    """
298    global _RESTORE_REGISTERS
299    # Restore registers to original task
300    if pid < 0:
301        if _RESTORE_REGISTERS is not None:
302            Base(gdb).write_registers(_RESTORE_REGISTERS)
303        _RESTORE_REGISTERS = None
304        return True
305
306    # Otherwise find the new pointer
307    tcbs = all_tasks(gdb)
308    if (next_task := next((t for t in tcbs if int(t.pid) == pid), None)) is not None:
309        # Find the currently executing task and save their registers
310        if (current_task := next((t for t in tcbs if t.is_current_task), None)) is not None:
311            regs = current_task.switch_from()
312        else:
313            regs = Base(gdb).registers
314        # We only care about the first register set for restoration
315        if next_task.switch_to():
316            if _RESTORE_REGISTERS is None:
317                _RESTORE_REGISTERS = regs
318            print(f"Switched to task '{next_task.name}' ({pid}).")
319            return True
320
321        print("Task already loaded!")
322        return False
323
324    print(f"Unknown task PID '{pid}'!")
325    return False
326
327
328def all_tasks_as_table(gdb, sort_key: str = None, with_stack_usage: bool = True,
329                       with_file_names: bool = True, with_waiting: bool = True) \
330                                    -> tuple[Table, str] | tuple[None, None]:
331    """
332    Return a table of running tasks similar to the NSH top command.
333
334    :param sort_key: optional lambda function to sort the table rows.
335    :param with_stack_usage: compute and show the task stack usage.
336    :param with_file_names: show what files the task has open.
337    :param with_waiting: show what the task is waiting for.
338
339    :return: The task table and additional output. If no tasks are found, return `None`.
340    """
341    table = Table(box=rich.box.MINIMAL_DOUBLE_HEAD)
342    table.add_column("struct tcb_s*", justify="right", no_wrap=True)
343    table.add_column("pid", justify="right", no_wrap=True)
344    table.add_column("Task Name")
345    table.add_column("Location", no_wrap=True)
346    table.add_column("CPU(ms)", justify="right")
347    table.add_column("CPU(%)", justify="right")
348    table.add_column("Stack\nUsage", justify="right")
349    table.add_column("Stack\nAvail", justify="right")
350    table.add_column("Prio", justify="right")
351    table.add_column("Base", justify="right")
352    if with_file_names: table.add_column("Open File Names")
353    else: table.add_column("FDs", justify="right")
354    table.add_column("State")
355    if with_waiting: table.add_column("Waiting For")
356
357    tasks = all_tasks(gdb)
358    if not tasks: return None, None
359    interval_us = tasks[0].load.interval
360    if not interval_us:
361        start, *_ = tasks[0]._system_load.sample
362        total_interval_us = (Device(gdb).uptime - start) or 1
363    total_user_us, total_idle_us = 0, 0
364    rows = []
365    for task in tasks:
366        # Remember the CPU loads for idle task and add it for the other tasks
367        if task.pid == 0: total_idle_us = task.load.delta if interval_us else task.load.total
368        else: total_user_us += task.load.delta if interval_us else task.load.total
369
370        # Find the file names or just the number of file descriptors
371        if with_file_names:
372            file_names = [task.read_string(f["f_inode"]["i_name"]) for f in task.files]
373            file_description = ", ".join(sorted(list(set(file_names))))
374        else:
375            file_description = len(task.files)
376
377        # Add all the values per row
378        relative = task.load.relative if interval_us else task.load.total / total_interval_us
379        stack_overflow = with_stack_usage and task.stack_used >= (task.stack_limit - max(8, task.stack_limit * 0.1))
380        row = [hex(task._tcb), task.pid, task.name,
381               hex(task.location) if isinstance(task.location, int) else task.location.name,
382               task.load.total//1000, f"{(relative * 100):.1f}",
383               Text.assemble((str(task.stack_used) if with_stack_usage else "", "bold red" if stack_overflow else "")),
384               Text.assemble((str(task.stack_limit), "bold" if stack_overflow else "")),
385               Text.assemble((str(task.sched_priority), "bold red" if task.sched_priority > task.init_priority else "")),
386               task.init_priority, file_description, task.short_state]
387        if with_waiting:
388            row.append(task.waiting_for)
389        rows.append(row)
390
391    # Sort the rows by PID by default and format the table
392    for row in sorted(rows, key=lambda l: l[1] if sort_key is None else sort_key):
393        table.add_row(*[r if isinstance(r, Text) else str(r) for r in row],
394                      style="bold blue" if row[11] == "RUN" else None)
395
396    # Add the task information
397    running = sum(1 for t in tasks if t.is_runnable)
398    sleeping = sum(1 for t in tasks if t.is_waiting)
399    output = f"Processes: {len(tasks)} total, {running} running, {sleeping} sleeping\n"
400
401    # Add CPU utilization and guard against division by zero
402    if not interval_us: interval_us = total_interval_us
403    user = 100 * total_user_us / interval_us;
404    idle = 100 * total_idle_us / interval_us;
405    sched = 100 * (interval_us - total_idle_us - total_user_us) / interval_us;
406    output += f"CPU usage: {user:.1f}% tasks, {sched:.1f}% sched, {idle:.1f}% idle\n"
407
408    # Uptime finally
409    output += f"Uptime: {Device(gdb).uptime/1e6:.2f}s total, {interval_us/1e6:.2f}s interval\n"
410    return table, output
411
412
413def all_files_as_table(gdb, sort_key: str = None) -> Table | None:
414    """
415    Return a table of open files owned by tasks.
416
417    :param sort_key: optional lambda function to sort the table rows.
418
419    :return: The file table or `None` if no tasks exist.
420    """
421    tasks = all_tasks(gdb)
422    if not tasks: return None
423
424    files = {}
425    file_tasks = defaultdict(set)
426    for task in tasks:
427        # Find the file names or just the number of file descriptors
428        for f in task.files:
429            files[int(f["f_inode"])] = f["f_inode"]
430            file_tasks[int(f["f_inode"])].add(task)
431    # Format the rows
432    rows = []
433    for addr, inode in files.items():
434        rows.append((hex(addr),
435                     hex(inode["i_private"]) if inode["i_private"] else "",
436                     task.read_string(inode["i_name"]),
437                     ", ".join(sorted(t.name for t in file_tasks[addr]))))
438    # sort and add rows
439    table = Table(box=rich.box.MINIMAL_DOUBLE_HEAD)
440    table.add_column("struct inode*", justify="right", no_wrap=True)
441    table.add_column("i_private*", justify="right", no_wrap=True)
442    table.add_column("Name")
443    table.add_column("Tasks")
444    for row in sorted(rows, key=lambda l: l[2] if sort_key is None else sort_key):
445        table.add_row(*row)
446    return table
class Task(emdbg.debug.px4.base.Base):
 72class Task(Base):
 73    """
 74    NuttX task
 75    """
 76    _STACK_COLOR = 0xdeadbeef
 77    _FILE_DESCRIPTORS_PER_BLOCK = 6 # TODO query from ELF
 78
 79    @dataclass
 80    class Load:
 81        total: int
 82        """Total task runtime in µs"""
 83        interval: int
 84        """Interval task runtime in µs since last sample"""
 85        delta: int
 86        """Task runtime in µs within the interval"""
 87
 88        @property
 89        def relative(self) -> float:
 90            """Relative runtime within the interval"""
 91            return self.delta / self.interval if self.interval else 0
 92
 93    def __init__(self, gdb, tcb_ptr: "gdb.Value"):
 94        super().__init__(gdb)
 95        self._tcb = tcb_ptr
 96        self._system_load = system_load(self._gdb)
 97        self.pid = self._tcb["pid"]
 98        self.init_priority = int(self._tcb["init_priority"])
 99        self.stack_limit = int(self._tcb["adj_stack_size"])
100        self.stack_ptr = self._tcb["stack_base_ptr"]
101        self._is_running_switched = None
102
103    @cached_property
104    def name(self) -> str:
105        """Name of the task"""
106        try:
107            return self._tcb["name"].string()
108        except:
109            return "?"
110
111    @cached_property
112    def sched_priority(self) -> int:
113        """The scheduled priority of the task"""
114        return int(self._tcb["sched_priority"])
115
116    @cached_property
117    def _statenames(self):
118        return self._gdb.types.make_enum_dict(self._gdb.lookup_type("enum tstate_e"))
119
120    @cached_property
121    def state(self) -> str:
122        """Task state name"""
123        for name, value in self._statenames.items():
124            if value == self._tcb["task_state"]:
125                return name
126        return "UNKNOWN"
127
128    @cached_property
129    def short_state(self) -> str:
130        """The short name of the task state"""
131        mapping = {
132            "TASK_PENDING": "PEND", "TASK_READYTORUN": "READY", "TASK_RUNNING": "RUN",
133            "WAIT_SEM": "w:sem", "WAIT_SIG": "w:sig"}
134        return mapping.get(self.state.replace("TSTATE_", ""), "???")
135
136    def _is_state_in(self, *states: list[str]) -> bool:
137        states = {int(self._statenames[s]) for s in states}
138        if int(self._tcb["task_state"]) in states:
139            return True
140        return False
141
142    @cached_property
143    def is_waiting(self) -> bool:
144        """The task is waiting for a semaphore or signal"""
145        if self._is_state_in("TSTATE_WAIT_SEM", "TSTATE_WAIT_SIG"):
146            return True
147
148    @cached_property
149    def is_runnable(self) -> bool:
150        """The task is pending, ready to run, or running"""
151        return self._is_state_in("TSTATE_TASK_PENDING",
152                                 "TSTATE_TASK_READYTORUN",
153                                 "TSTATE_TASK_RUNNING")
154
155    @cached_property
156    def stack_used(self) -> int:
157        """The amount of stack used by the thread in bytes"""
158        if not self.stack_ptr: return 0
159        stack_u32p = self.stack_ptr.cast(self._gdb.lookup_type("unsigned int").pointer())
160        # Stack grows from top to bottom, we do a binary search for the
161        # 0xdeadbeef value from the bottom upwards
162        watermark = utils.binary_search_last(stack_u32p, self._STACK_COLOR, hi=self.stack_limit // 4) + 1
163        # validate the binary search (does not seem necessary)
164        # for ii in range(0, watermark):
165        #     if stack_u32p[ii] != self._STACK_COLOR:
166        #         print(f"{self.name}: Correcting stack size from {watermark * 4} to {ii * 4}!")
167        #         return ii * 4
168        return self.stack_limit - watermark * 4
169
170    @cached_property
171    def waiting_for(self) -> str:
172        """
173        What the task is waiting for. If its a semaphore, return an object,
174        otherwise a string.
175        """
176        if self._is_state_in("TSTATE_WAIT_SEM"):
177            from .semaphore import Semaphore
178            sem = self._tcb['waitsem']
179            ostr = f"{int(sem):#08x} "
180            if descr := self.description_at(sem): ostr += f"<{descr}> "
181            ostr += Semaphore(self._gdb, sem).to_string()
182            return ostr
183        if self._is_state_in("TSTATE_WAIT_SIG"):
184            return "signal"
185        return ""
186
187    @cached_property
188    def files(self) -> list["gdb.Value"]:
189        """The list of inode pointers the task holds"""
190        filelist = self._tcb["group"]["tg_filelist"]
191        rows = filelist["fl_rows"]
192        files = filelist["fl_files"]
193        result = []
194        for ri in range(rows):
195            for ci in range(self._FILE_DESCRIPTORS_PER_BLOCK):
196                file = files[ri][ci]
197                if inode := file["f_inode"]:
198                    result.append(file)
199        return result
200
201    @cached_property
202    def location(self) -> "gdb.Block":
203        """The block the task is currently executing"""
204        if self.is_current_task:
205            pc = self.read_register("pc")
206        else:
207            pc = self._tcb["xcp"]["regs"][32]
208        block = self.block(pc)
209        while block and not block.function:
210            block = block.superblock
211        return block.function if block else int(pc)
212
213    def switch_to(self) -> bool:
214        """Switch to this task by writing the register file"""
215        if self.is_current_task:
216            return False
217        regs = {name: self._tcb["xcp"]["regs"][offset]
218                for name, offset in _XCP_REGS_MAP.items()}
219        regs = self.fix_nuttx_sp(regs)
220        self.write_registers(regs)
221        self._is_running_switched = True
222        return True
223
224    def switch_from(self) -> dict[str, int] | None:
225        """Switch to this task by writing the register file"""
226        if not self.is_current_task:
227            return None
228        self._is_running_switched = False
229        if self.short_state == "RUN":
230            self._is_running_switched = None
231        return self.registers
232
233    @cached_property
234    def is_current_task(self) -> bool:
235        """If the task is currently running"""
236        if self._is_running_switched is not None:
237            return self._is_running_switched
238        return self.short_state == "RUN"
239
240    @cached_property
241    def load(self) -> Load:
242        """The task load based on the system load monitor"""
243        if self._system_load is None: return self.Load(0, 0, 0)
244        _, interval, sl = self._system_load.sample
245        total, delta = sl.get(int(self._tcb), (0,0))
246        return self.Load(total, interval, delta)
247
248    def __repr__(self) -> str:
249        return f"Task({self.name}, {self.pid})"
250
251    def __str__(self) -> str:
252        ostr = self.__repr__() + f": state={self.state}, "
253        ostr += f"prio={self.sched_priority}({self.init_priority}), "
254        ostr += f"stack={self.stack_used}/{self.stack_limit}"
255        if waiting := self.waiting_for:
256            ostr += f", waiting={waiting}"
257        return ostr

NuttX task

Task(gdb, tcb_ptr: "'gdb.Value'")
 93    def __init__(self, gdb, tcb_ptr: "gdb.Value"):
 94        super().__init__(gdb)
 95        self._tcb = tcb_ptr
 96        self._system_load = system_load(self._gdb)
 97        self.pid = self._tcb["pid"]
 98        self.init_priority = int(self._tcb["init_priority"])
 99        self.stack_limit = int(self._tcb["adj_stack_size"])
100        self.stack_ptr = self._tcb["stack_base_ptr"]
101        self._is_running_switched = None
pid
init_priority
stack_limit
stack_ptr
name: str
103    @cached_property
104    def name(self) -> str:
105        """Name of the task"""
106        try:
107            return self._tcb["name"].string()
108        except:
109            return "?"

Name of the task

sched_priority: int
111    @cached_property
112    def sched_priority(self) -> int:
113        """The scheduled priority of the task"""
114        return int(self._tcb["sched_priority"])

The scheduled priority of the task

state: str
120    @cached_property
121    def state(self) -> str:
122        """Task state name"""
123        for name, value in self._statenames.items():
124            if value == self._tcb["task_state"]:
125                return name
126        return "UNKNOWN"

Task state name

short_state: str
128    @cached_property
129    def short_state(self) -> str:
130        """The short name of the task state"""
131        mapping = {
132            "TASK_PENDING": "PEND", "TASK_READYTORUN": "READY", "TASK_RUNNING": "RUN",
133            "WAIT_SEM": "w:sem", "WAIT_SIG": "w:sig"}
134        return mapping.get(self.state.replace("TSTATE_", ""), "???")

The short name of the task state

is_waiting: bool
142    @cached_property
143    def is_waiting(self) -> bool:
144        """The task is waiting for a semaphore or signal"""
145        if self._is_state_in("TSTATE_WAIT_SEM", "TSTATE_WAIT_SIG"):
146            return True

The task is waiting for a semaphore or signal

is_runnable: bool
148    @cached_property
149    def is_runnable(self) -> bool:
150        """The task is pending, ready to run, or running"""
151        return self._is_state_in("TSTATE_TASK_PENDING",
152                                 "TSTATE_TASK_READYTORUN",
153                                 "TSTATE_TASK_RUNNING")

The task is pending, ready to run, or running

stack_used: int
155    @cached_property
156    def stack_used(self) -> int:
157        """The amount of stack used by the thread in bytes"""
158        if not self.stack_ptr: return 0
159        stack_u32p = self.stack_ptr.cast(self._gdb.lookup_type("unsigned int").pointer())
160        # Stack grows from top to bottom, we do a binary search for the
161        # 0xdeadbeef value from the bottom upwards
162        watermark = utils.binary_search_last(stack_u32p, self._STACK_COLOR, hi=self.stack_limit // 4) + 1
163        # validate the binary search (does not seem necessary)
164        # for ii in range(0, watermark):
165        #     if stack_u32p[ii] != self._STACK_COLOR:
166        #         print(f"{self.name}: Correcting stack size from {watermark * 4} to {ii * 4}!")
167        #         return ii * 4
168        return self.stack_limit - watermark * 4

The amount of stack used by the thread in bytes

waiting_for: str
170    @cached_property
171    def waiting_for(self) -> str:
172        """
173        What the task is waiting for. If its a semaphore, return an object,
174        otherwise a string.
175        """
176        if self._is_state_in("TSTATE_WAIT_SEM"):
177            from .semaphore import Semaphore
178            sem = self._tcb['waitsem']
179            ostr = f"{int(sem):#08x} "
180            if descr := self.description_at(sem): ostr += f"<{descr}> "
181            ostr += Semaphore(self._gdb, sem).to_string()
182            return ostr
183        if self._is_state_in("TSTATE_WAIT_SIG"):
184            return "signal"
185        return ""

What the task is waiting for. If its a semaphore, return an object, otherwise a string.

files: "list['gdb.Value']"
187    @cached_property
188    def files(self) -> list["gdb.Value"]:
189        """The list of inode pointers the task holds"""
190        filelist = self._tcb["group"]["tg_filelist"]
191        rows = filelist["fl_rows"]
192        files = filelist["fl_files"]
193        result = []
194        for ri in range(rows):
195            for ci in range(self._FILE_DESCRIPTORS_PER_BLOCK):
196                file = files[ri][ci]
197                if inode := file["f_inode"]:
198                    result.append(file)
199        return result

The list of inode pointers the task holds

location: "'gdb.Block'"
201    @cached_property
202    def location(self) -> "gdb.Block":
203        """The block the task is currently executing"""
204        if self.is_current_task:
205            pc = self.read_register("pc")
206        else:
207            pc = self._tcb["xcp"]["regs"][32]
208        block = self.block(pc)
209        while block and not block.function:
210            block = block.superblock
211        return block.function if block else int(pc)

The block the task is currently executing

def switch_to(self) -> bool:
213    def switch_to(self) -> bool:
214        """Switch to this task by writing the register file"""
215        if self.is_current_task:
216            return False
217        regs = {name: self._tcb["xcp"]["regs"][offset]
218                for name, offset in _XCP_REGS_MAP.items()}
219        regs = self.fix_nuttx_sp(regs)
220        self.write_registers(regs)
221        self._is_running_switched = True
222        return True

Switch to this task by writing the register file

def switch_from(self) -> dict[str, int] | None:
224    def switch_from(self) -> dict[str, int] | None:
225        """Switch to this task by writing the register file"""
226        if not self.is_current_task:
227            return None
228        self._is_running_switched = False
229        if self.short_state == "RUN":
230            self._is_running_switched = None
231        return self.registers

Switch to this task by writing the register file

is_current_task: bool
233    @cached_property
234    def is_current_task(self) -> bool:
235        """If the task is currently running"""
236        if self._is_running_switched is not None:
237            return self._is_running_switched
238        return self.short_state == "RUN"

If the task is currently running

load: Task.Load
240    @cached_property
241    def load(self) -> Load:
242        """The task load based on the system load monitor"""
243        if self._system_load is None: return self.Load(0, 0, 0)
244        _, interval, sl = self._system_load.sample
245        total, delta = sl.get(int(self._tcb), (0,0))
246        return self.Load(total, interval, delta)

The task load based on the system load monitor

@dataclass
class Task.Load:
79    @dataclass
80    class Load:
81        total: int
82        """Total task runtime in µs"""
83        interval: int
84        """Interval task runtime in µs since last sample"""
85        delta: int
86        """Task runtime in µs within the interval"""
87
88        @property
89        def relative(self) -> float:
90            """Relative runtime within the interval"""
91            return self.delta / self.interval if self.interval else 0
Task.Load(total: int, interval: int, delta: int)
total: int

Total task runtime in µs

interval: int

Interval task runtime in µs since last sample

delta: int

Task runtime in µs within the interval

relative: float
88        @property
89        def relative(self) -> float:
90            """Relative runtime within the interval"""
91            return self.delta / self.interval if self.interval else 0

Relative runtime within the interval

def all_tasks(gdb) -> list[Task]:
260def all_tasks(gdb) -> list[Task]:
261    """Return a list of all tasks"""
262    type_tcb_s = gdb.lookup_type("struct tcb_s").pointer()
263    def _tasks(name):
264        tcbs = []
265        if (task_list := gdb.lookup_global_symbol(name)) is None:
266            return []
267        task_list = task_list.value()
268        current_task = task_list["head"]
269        if current_task:
270            while True:
271                tcbs.append(Task(gdb, current_task.cast(type_tcb_s)))
272                if current_task == task_list["tail"]:
273                    break
274                next_task = current_task["flink"]
275                # if next_task["blink"] == current_task:
276                #   LOGGER.error(f"Task linkage is broken in {tasks}!")
277                #   break
278                current_task = next_task
279        return tcbs
280
281    tcbs  = _tasks("g_pendingtasks")
282    tcbs += _tasks("g_readytorun")
283    tcbs += _tasks("g_waitingforsemaphore")
284    tcbs += _tasks("g_waitingforsignal")
285    tcbs += _tasks("g_inactivetasks")
286    return tcbs

Return a list of all tasks

def task_switch(gdb, pid: int) -> bool:
290def task_switch(gdb, pid: int) -> bool:
291    """
292    Switch to another task.
293    On initial switch the current register file is saved and can be restored
294    by passing a PID <0.
295
296    :param pid: the PID of the task to switch to, or <0 to restore initial task.
297    :return: Success of switching operation.
298    """
299    global _RESTORE_REGISTERS
300    # Restore registers to original task
301    if pid < 0:
302        if _RESTORE_REGISTERS is not None:
303            Base(gdb).write_registers(_RESTORE_REGISTERS)
304        _RESTORE_REGISTERS = None
305        return True
306
307    # Otherwise find the new pointer
308    tcbs = all_tasks(gdb)
309    if (next_task := next((t for t in tcbs if int(t.pid) == pid), None)) is not None:
310        # Find the currently executing task and save their registers
311        if (current_task := next((t for t in tcbs if t.is_current_task), None)) is not None:
312            regs = current_task.switch_from()
313        else:
314            regs = Base(gdb).registers
315        # We only care about the first register set for restoration
316        if next_task.switch_to():
317            if _RESTORE_REGISTERS is None:
318                _RESTORE_REGISTERS = regs
319            print(f"Switched to task '{next_task.name}' ({pid}).")
320            return True
321
322        print("Task already loaded!")
323        return False
324
325    print(f"Unknown task PID '{pid}'!")
326    return False

Switch to another task. On initial switch the current register file is saved and can be restored by passing a PID <0.

Parameters
  • pid: the PID of the task to switch to, or <0 to restore initial task.
Returns

Success of switching operation.

def all_tasks_as_table( gdb, sort_key: str = None, with_stack_usage: bool = True, with_file_names: bool = True, with_waiting: bool = True) -> tuple[rich.table.Table, str] | tuple[None, None]:
329def all_tasks_as_table(gdb, sort_key: str = None, with_stack_usage: bool = True,
330                       with_file_names: bool = True, with_waiting: bool = True) \
331                                    -> tuple[Table, str] | tuple[None, None]:
332    """
333    Return a table of running tasks similar to the NSH top command.
334
335    :param sort_key: optional lambda function to sort the table rows.
336    :param with_stack_usage: compute and show the task stack usage.
337    :param with_file_names: show what files the task has open.
338    :param with_waiting: show what the task is waiting for.
339
340    :return: The task table and additional output. If no tasks are found, return `None`.
341    """
342    table = Table(box=rich.box.MINIMAL_DOUBLE_HEAD)
343    table.add_column("struct tcb_s*", justify="right", no_wrap=True)
344    table.add_column("pid", justify="right", no_wrap=True)
345    table.add_column("Task Name")
346    table.add_column("Location", no_wrap=True)
347    table.add_column("CPU(ms)", justify="right")
348    table.add_column("CPU(%)", justify="right")
349    table.add_column("Stack\nUsage", justify="right")
350    table.add_column("Stack\nAvail", justify="right")
351    table.add_column("Prio", justify="right")
352    table.add_column("Base", justify="right")
353    if with_file_names: table.add_column("Open File Names")
354    else: table.add_column("FDs", justify="right")
355    table.add_column("State")
356    if with_waiting: table.add_column("Waiting For")
357
358    tasks = all_tasks(gdb)
359    if not tasks: return None, None
360    interval_us = tasks[0].load.interval
361    if not interval_us:
362        start, *_ = tasks[0]._system_load.sample
363        total_interval_us = (Device(gdb).uptime - start) or 1
364    total_user_us, total_idle_us = 0, 0
365    rows = []
366    for task in tasks:
367        # Remember the CPU loads for idle task and add it for the other tasks
368        if task.pid == 0: total_idle_us = task.load.delta if interval_us else task.load.total
369        else: total_user_us += task.load.delta if interval_us else task.load.total
370
371        # Find the file names or just the number of file descriptors
372        if with_file_names:
373            file_names = [task.read_string(f["f_inode"]["i_name"]) for f in task.files]
374            file_description = ", ".join(sorted(list(set(file_names))))
375        else:
376            file_description = len(task.files)
377
378        # Add all the values per row
379        relative = task.load.relative if interval_us else task.load.total / total_interval_us
380        stack_overflow = with_stack_usage and task.stack_used >= (task.stack_limit - max(8, task.stack_limit * 0.1))
381        row = [hex(task._tcb), task.pid, task.name,
382               hex(task.location) if isinstance(task.location, int) else task.location.name,
383               task.load.total//1000, f"{(relative * 100):.1f}",
384               Text.assemble((str(task.stack_used) if with_stack_usage else "", "bold red" if stack_overflow else "")),
385               Text.assemble((str(task.stack_limit), "bold" if stack_overflow else "")),
386               Text.assemble((str(task.sched_priority), "bold red" if task.sched_priority > task.init_priority else "")),
387               task.init_priority, file_description, task.short_state]
388        if with_waiting:
389            row.append(task.waiting_for)
390        rows.append(row)
391
392    # Sort the rows by PID by default and format the table
393    for row in sorted(rows, key=lambda l: l[1] if sort_key is None else sort_key):
394        table.add_row(*[r if isinstance(r, Text) else str(r) for r in row],
395                      style="bold blue" if row[11] == "RUN" else None)
396
397    # Add the task information
398    running = sum(1 for t in tasks if t.is_runnable)
399    sleeping = sum(1 for t in tasks if t.is_waiting)
400    output = f"Processes: {len(tasks)} total, {running} running, {sleeping} sleeping\n"
401
402    # Add CPU utilization and guard against division by zero
403    if not interval_us: interval_us = total_interval_us
404    user = 100 * total_user_us / interval_us;
405    idle = 100 * total_idle_us / interval_us;
406    sched = 100 * (interval_us - total_idle_us - total_user_us) / interval_us;
407    output += f"CPU usage: {user:.1f}% tasks, {sched:.1f}% sched, {idle:.1f}% idle\n"
408
409    # Uptime finally
410    output += f"Uptime: {Device(gdb).uptime/1e6:.2f}s total, {interval_us/1e6:.2f}s interval\n"
411    return table, output

Return a table of running tasks similar to the NSH top command.

Parameters
  • sort_key: optional lambda function to sort the table rows.
  • with_stack_usage: compute and show the task stack usage.
  • with_file_names: show what files the task has open.
  • with_waiting: show what the task is waiting for.
Returns

The task table and additional output. If no tasks are found, return None.

def all_files_as_table(gdb, sort_key: str = None) -> rich.table.Table | None:
414def all_files_as_table(gdb, sort_key: str = None) -> Table | None:
415    """
416    Return a table of open files owned by tasks.
417
418    :param sort_key: optional lambda function to sort the table rows.
419
420    :return: The file table or `None` if no tasks exist.
421    """
422    tasks = all_tasks(gdb)
423    if not tasks: return None
424
425    files = {}
426    file_tasks = defaultdict(set)
427    for task in tasks:
428        # Find the file names or just the number of file descriptors
429        for f in task.files:
430            files[int(f["f_inode"])] = f["f_inode"]
431            file_tasks[int(f["f_inode"])].add(task)
432    # Format the rows
433    rows = []
434    for addr, inode in files.items():
435        rows.append((hex(addr),
436                     hex(inode["i_private"]) if inode["i_private"] else "",
437                     task.read_string(inode["i_name"]),
438                     ", ".join(sorted(t.name for t in file_tasks[addr]))))
439    # sort and add rows
440    table = Table(box=rich.box.MINIMAL_DOUBLE_HEAD)
441    table.add_column("struct inode*", justify="right", no_wrap=True)
442    table.add_column("i_private*", justify="right", no_wrap=True)
443    table.add_column("Name")
444    table.add_column("Tasks")
445    for row in sorted(rows, key=lambda l: l[2] if sort_key is None else sort_key):
446        table.add_row(*row)
447    return table

Return a table of open files owned by tasks.

Parameters
  • sort_key: optional lambda function to sort the table rows.
Returns

The file table or None if no tasks exist.