emdbg.debug.px4.system_load
1# Copyright (c) 2023, Auterion AG 2# SPDX-License-Identifier: BSD-3-Clause 3 4from __future__ import annotations 5from functools import cached_property 6from . import utils 7from .base import Base 8from .device import Device 9import logging 10LOGGER = logging.getLogger(__name__) 11 12 13class SystemLoad(Base): 14 """ 15 PX4 system load monitor using the `cpuload.cpp` module inside PX4. 16 Do not use this class directly, instead access the cpu load information via 17 the `emdbg.debug.px4.task.all_tasks_as_table()` function. 18 """ 19 def __init__(self, gdb): 20 super().__init__(gdb) 21 self._device = Device(gdb) 22 self._system_load = self.lookup_global_symbol_ptr("system_load") 23 self._monitor = self.lookup_static_symbol_ptr("cpuload_monitor_all_count") 24 self._psl = None 25 self.restart() 26 27 def _load(self): 28 sl = self._system_load.dereference() 29 tasks = {"hrt": self._device.uptime, "start": int(sl["start_time"]), "tasks": {}} 30 for task in utils.gdb_iter(sl["tasks"]): 31 if not task["valid"]: continue 32 tcb = int(task["tcb"]) 33 total = int(task["total_runtime"]) 34 tasks["tasks"][tcb] = total 35 return tasks 36 37 def restart(self): 38 """ 39 Enables the cpuload monitor in PX4 and loads the first reference value. 40 """ 41 if self._system_load is None: return 42 if self._monitor.dereference()["_value"] == 0: 43 self._gdb.execute("set cpuload_monitor_all_count = 1") 44 self._gdb.execute(f"set system_load.start_time = {self._device.uptime}") 45 for ii in range(1, self._system_load["tasks"].type.range()[1]): 46 self._gdb.execute(f"set system_load.tasks[{ii}].total_runtime = 0") 47 self._gdb.execute(f"set system_load.tasks[{ii}].curr_start_time = 0") 48 self._psl = self._load() 49 50 51 def stop(self): 52 """Disables the cpuload monitor""" 53 if self._monitor is None: return 54 self._gdb.execute("set cpuload_monitor_all_count = 0") 55 56 @cached_property 57 def sample(self) -> tuple[int, int, dict[int, tuple[int, int]]]: 58 """ 59 Samples the cpuload monitor and computes the difference to the last 60 sample. 61 :return: a tuple of (start time since enabled [µs], interval from last 62 sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference 63 from last sample [µs])]). 64 """ 65 if self._system_load is None or not self._system_load["initialized"]: 66 return (0, 0, {}) 67 68 sl = self._load() 69 # compute the delta running times 70 sample = {} 71 for tcb, total in sl["tasks"].items(): 72 ptotal = self._psl["tasks"].get(tcb, total) 73 sample[tcb] = (total, total - ptotal) 74 interval = self._device.uptime - self._psl["hrt"] 75 # remember the new system load as previous load 76 self._psl = sl 77 return (sl["start"], interval, sample) 78 79 80_SYSTEM_LOAD = None 81def system_load(gdb): 82 """ 83 :return: The SystemLoad singleton object. 84 """ 85 global _SYSTEM_LOAD 86 if _SYSTEM_LOAD is None: 87 _SYSTEM_LOAD = SystemLoad(gdb) 88 return _SYSTEM_LOAD 89 90 91def restart_system_load_monitor(gdb): 92 """ 93 Starts the system load monitor if not started, else resets the sample 94 interval to right now. 95 """ 96 system_load(gdb).sample 97 system_load(gdb).restart()
LOGGER =
<Logger emdbg.debug.px4.system_load (WARNING)>
14class SystemLoad(Base): 15 """ 16 PX4 system load monitor using the `cpuload.cpp` module inside PX4. 17 Do not use this class directly, instead access the cpu load information via 18 the `emdbg.debug.px4.task.all_tasks_as_table()` function. 19 """ 20 def __init__(self, gdb): 21 super().__init__(gdb) 22 self._device = Device(gdb) 23 self._system_load = self.lookup_global_symbol_ptr("system_load") 24 self._monitor = self.lookup_static_symbol_ptr("cpuload_monitor_all_count") 25 self._psl = None 26 self.restart() 27 28 def _load(self): 29 sl = self._system_load.dereference() 30 tasks = {"hrt": self._device.uptime, "start": int(sl["start_time"]), "tasks": {}} 31 for task in utils.gdb_iter(sl["tasks"]): 32 if not task["valid"]: continue 33 tcb = int(task["tcb"]) 34 total = int(task["total_runtime"]) 35 tasks["tasks"][tcb] = total 36 return tasks 37 38 def restart(self): 39 """ 40 Enables the cpuload monitor in PX4 and loads the first reference value. 41 """ 42 if self._system_load is None: return 43 if self._monitor.dereference()["_value"] == 0: 44 self._gdb.execute("set cpuload_monitor_all_count = 1") 45 self._gdb.execute(f"set system_load.start_time = {self._device.uptime}") 46 for ii in range(1, self._system_load["tasks"].type.range()[1]): 47 self._gdb.execute(f"set system_load.tasks[{ii}].total_runtime = 0") 48 self._gdb.execute(f"set system_load.tasks[{ii}].curr_start_time = 0") 49 self._psl = self._load() 50 51 52 def stop(self): 53 """Disables the cpuload monitor""" 54 if self._monitor is None: return 55 self._gdb.execute("set cpuload_monitor_all_count = 0") 56 57 @cached_property 58 def sample(self) -> tuple[int, int, dict[int, tuple[int, int]]]: 59 """ 60 Samples the cpuload monitor and computes the difference to the last 61 sample. 62 :return: a tuple of (start time since enabled [µs], interval from last 63 sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference 64 from last sample [µs])]). 65 """ 66 if self._system_load is None or not self._system_load["initialized"]: 67 return (0, 0, {}) 68 69 sl = self._load() 70 # compute the delta running times 71 sample = {} 72 for tcb, total in sl["tasks"].items(): 73 ptotal = self._psl["tasks"].get(tcb, total) 74 sample[tcb] = (total, total - ptotal) 75 interval = self._device.uptime - self._psl["hrt"] 76 # remember the new system load as previous load 77 self._psl = sl 78 return (sl["start"], interval, sample)
PX4 system load monitor using the cpuload.cpp
module inside PX4.
Do not use this class directly, instead access the cpu load information via
the emdbg.debug.px4.task.all_tasks_as_table()
function.
def
restart(self):
38 def restart(self): 39 """ 40 Enables the cpuload monitor in PX4 and loads the first reference value. 41 """ 42 if self._system_load is None: return 43 if self._monitor.dereference()["_value"] == 0: 44 self._gdb.execute("set cpuload_monitor_all_count = 1") 45 self._gdb.execute(f"set system_load.start_time = {self._device.uptime}") 46 for ii in range(1, self._system_load["tasks"].type.range()[1]): 47 self._gdb.execute(f"set system_load.tasks[{ii}].total_runtime = 0") 48 self._gdb.execute(f"set system_load.tasks[{ii}].curr_start_time = 0") 49 self._psl = self._load()
Enables the cpuload monitor in PX4 and loads the first reference value.
def
stop(self):
52 def stop(self): 53 """Disables the cpuload monitor""" 54 if self._monitor is None: return 55 self._gdb.execute("set cpuload_monitor_all_count = 0")
Disables the cpuload monitor
sample: tuple[int, int, dict[int, tuple[int, int]]]
57 @cached_property 58 def sample(self) -> tuple[int, int, dict[int, tuple[int, int]]]: 59 """ 60 Samples the cpuload monitor and computes the difference to the last 61 sample. 62 :return: a tuple of (start time since enabled [µs], interval from last 63 sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference 64 from last sample [µs])]). 65 """ 66 if self._system_load is None or not self._system_load["initialized"]: 67 return (0, 0, {}) 68 69 sl = self._load() 70 # compute the delta running times 71 sample = {} 72 for tcb, total in sl["tasks"].items(): 73 ptotal = self._psl["tasks"].get(tcb, total) 74 sample[tcb] = (total, total - ptotal) 75 interval = self._device.uptime - self._psl["hrt"] 76 # remember the new system load as previous load 77 self._psl = sl 78 return (sl["start"], interval, sample)
Samples the cpuload monitor and computes the difference to the last sample.
Returns
a tuple of (start time since enabled [µs], interval from last sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference from last sample [µs])]).
Inherited Members
- emdbg.debug.px4.base.Base
- register_names
- registers
- read_register
- write_register
- write_registers
- fix_nuttx_sp
- lookup_static_symbol_in_function
- lookup_static_symbol_ptr
- lookup_global_symbol_ptr
- lookup_static_symbol_in_function_ptr
- value_ptr
- addr_ptr
- read_memory
- write_memory
- read_uint
- read_int
- read_string
- symtab_line
- block
- description_at
- integer_type
- uint32
- int32
def
system_load(gdb):
82def system_load(gdb): 83 """ 84 :return: The SystemLoad singleton object. 85 """ 86 global _SYSTEM_LOAD 87 if _SYSTEM_LOAD is None: 88 _SYSTEM_LOAD = SystemLoad(gdb) 89 return _SYSTEM_LOAD
Returns
The SystemLoad singleton object.
def
restart_system_load_monitor(gdb):
92def restart_system_load_monitor(gdb): 93 """ 94 Starts the system load monitor if not started, else resets the sample 95 interval to right now. 96 """ 97 system_load(gdb).sample 98 system_load(gdb).restart()
Starts the system load monitor if not started, else resets the sample interval to right now.