emdbg.debug.px4.system_load

 1# Copyright (c) 2023, Auterion AG
 2# SPDX-License-Identifier: BSD-3-Clause
 3
 4from __future__ import annotations
 5from functools import cached_property
 6from . import utils
 7from .base import Base
 8from .device import Device
 9import logging
10LOGGER = logging.getLogger(__name__)
11
12
13class SystemLoad(Base):
14    """
15    PX4 system load monitor using the `cpuload.cpp` module inside PX4.
16    Do not use this class directly, instead access the cpu load information via
17    the `emdbg.debug.px4.task.all_tasks_as_table()` function.
18    """
19    def __init__(self, gdb):
20        super().__init__(gdb)
21        self._device = Device(gdb)
22        self._system_load = self.lookup_global_symbol_ptr("system_load")
23        self._monitor = self.lookup_static_symbol_ptr("cpuload_monitor_all_count")
24        self._psl = None
25        self.restart()
26
27    def _load(self):
28        sl = self._system_load.dereference()
29        tasks = {"hrt": self._device.uptime, "start": int(sl["start_time"]), "tasks": {}}
30        for task in utils.gdb_iter(sl["tasks"]):
31            if not task["valid"]: continue
32            tcb = int(task["tcb"])
33            total = int(task["total_runtime"])
34            tasks["tasks"][tcb] = total
35        return tasks
36
37    def restart(self):
38        """
39        Enables the cpuload monitor in PX4 and loads the first reference value.
40        """
41        if self._system_load is None: return
42        if self._monitor.dereference()["_value"] == 0:
43            self._gdb.execute("set cpuload_monitor_all_count = 1")
44            self._gdb.execute(f"set system_load.start_time = {self._device.uptime}")
45            for ii in range(1, self._system_load["tasks"].type.range()[1]):
46                self._gdb.execute(f"set system_load.tasks[{ii}].total_runtime = 0")
47                self._gdb.execute(f"set system_load.tasks[{ii}].curr_start_time = 0")
48        self._psl = self._load()
49
50
51    def stop(self):
52        """Disables the cpuload monitor"""
53        if self._monitor is None: return
54        self._gdb.execute("set cpuload_monitor_all_count = 0")
55
56    @cached_property
57    def sample(self) -> tuple[int, int, dict[int, tuple[int, int]]]:
58        """
59        Samples the cpuload monitor and computes the difference to the last
60        sample.
61        :return: a tuple of (start time since enabled [µs], interval from last
62                 sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference
63                 from last sample [µs])]).
64        """
65        if self._system_load is None or not self._system_load["initialized"]:
66            return (0, 0, {})
67
68        sl = self._load()
69        # compute the delta running times
70        sample = {}
71        for tcb, total in sl["tasks"].items():
72            ptotal = self._psl["tasks"].get(tcb, total)
73            sample[tcb] = (total, total - ptotal)
74        interval = self._device.uptime - self._psl["hrt"]
75        # remember the new system load as previous load
76        self._psl = sl
77        return (sl["start"], interval, sample)
78
79
80_SYSTEM_LOAD = None
81def system_load(gdb):
82    """
83    :return: The SystemLoad singleton object.
84    """
85    global _SYSTEM_LOAD
86    if _SYSTEM_LOAD is None:
87        _SYSTEM_LOAD = SystemLoad(gdb)
88    return _SYSTEM_LOAD
89
90
91def restart_system_load_monitor(gdb):
92    """
93    Starts the system load monitor if not started, else resets the sample
94    interval to right now.
95    """
96    system_load(gdb).sample
97    system_load(gdb).restart()
LOGGER = <Logger emdbg.debug.px4.system_load (WARNING)>
class SystemLoad(emdbg.debug.px4.base.Base):
14class SystemLoad(Base):
15    """
16    PX4 system load monitor using the `cpuload.cpp` module inside PX4.
17    Do not use this class directly, instead access the cpu load information via
18    the `emdbg.debug.px4.task.all_tasks_as_table()` function.
19    """
20    def __init__(self, gdb):
21        super().__init__(gdb)
22        self._device = Device(gdb)
23        self._system_load = self.lookup_global_symbol_ptr("system_load")
24        self._monitor = self.lookup_static_symbol_ptr("cpuload_monitor_all_count")
25        self._psl = None
26        self.restart()
27
28    def _load(self):
29        sl = self._system_load.dereference()
30        tasks = {"hrt": self._device.uptime, "start": int(sl["start_time"]), "tasks": {}}
31        for task in utils.gdb_iter(sl["tasks"]):
32            if not task["valid"]: continue
33            tcb = int(task["tcb"])
34            total = int(task["total_runtime"])
35            tasks["tasks"][tcb] = total
36        return tasks
37
38    def restart(self):
39        """
40        Enables the cpuload monitor in PX4 and loads the first reference value.
41        """
42        if self._system_load is None: return
43        if self._monitor.dereference()["_value"] == 0:
44            self._gdb.execute("set cpuload_monitor_all_count = 1")
45            self._gdb.execute(f"set system_load.start_time = {self._device.uptime}")
46            for ii in range(1, self._system_load["tasks"].type.range()[1]):
47                self._gdb.execute(f"set system_load.tasks[{ii}].total_runtime = 0")
48                self._gdb.execute(f"set system_load.tasks[{ii}].curr_start_time = 0")
49        self._psl = self._load()
50
51
52    def stop(self):
53        """Disables the cpuload monitor"""
54        if self._monitor is None: return
55        self._gdb.execute("set cpuload_monitor_all_count = 0")
56
57    @cached_property
58    def sample(self) -> tuple[int, int, dict[int, tuple[int, int]]]:
59        """
60        Samples the cpuload monitor and computes the difference to the last
61        sample.
62        :return: a tuple of (start time since enabled [µs], interval from last
63                 sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference
64                 from last sample [µs])]).
65        """
66        if self._system_load is None or not self._system_load["initialized"]:
67            return (0, 0, {})
68
69        sl = self._load()
70        # compute the delta running times
71        sample = {}
72        for tcb, total in sl["tasks"].items():
73            ptotal = self._psl["tasks"].get(tcb, total)
74            sample[tcb] = (total, total - ptotal)
75        interval = self._device.uptime - self._psl["hrt"]
76        # remember the new system load as previous load
77        self._psl = sl
78        return (sl["start"], interval, sample)

PX4 system load monitor using the cpuload.cpp module inside PX4. Do not use this class directly, instead access the cpu load information via the emdbg.debug.px4.task.all_tasks_as_table() function.

SystemLoad(gdb)
20    def __init__(self, gdb):
21        super().__init__(gdb)
22        self._device = Device(gdb)
23        self._system_load = self.lookup_global_symbol_ptr("system_load")
24        self._monitor = self.lookup_static_symbol_ptr("cpuload_monitor_all_count")
25        self._psl = None
26        self.restart()
def restart(self):
38    def restart(self):
39        """
40        Enables the cpuload monitor in PX4 and loads the first reference value.
41        """
42        if self._system_load is None: return
43        if self._monitor.dereference()["_value"] == 0:
44            self._gdb.execute("set cpuload_monitor_all_count = 1")
45            self._gdb.execute(f"set system_load.start_time = {self._device.uptime}")
46            for ii in range(1, self._system_load["tasks"].type.range()[1]):
47                self._gdb.execute(f"set system_load.tasks[{ii}].total_runtime = 0")
48                self._gdb.execute(f"set system_load.tasks[{ii}].curr_start_time = 0")
49        self._psl = self._load()

Enables the cpuload monitor in PX4 and loads the first reference value.

def stop(self):
52    def stop(self):
53        """Disables the cpuload monitor"""
54        if self._monitor is None: return
55        self._gdb.execute("set cpuload_monitor_all_count = 0")

Disables the cpuload monitor

sample: tuple[int, int, dict[int, tuple[int, int]]]
57    @cached_property
58    def sample(self) -> tuple[int, int, dict[int, tuple[int, int]]]:
59        """
60        Samples the cpuload monitor and computes the difference to the last
61        sample.
62        :return: a tuple of (start time since enabled [µs], interval from last
63                 sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference
64                 from last sample [µs])]).
65        """
66        if self._system_load is None or not self._system_load["initialized"]:
67            return (0, 0, {})
68
69        sl = self._load()
70        # compute the delta running times
71        sample = {}
72        for tcb, total in sl["tasks"].items():
73            ptotal = self._psl["tasks"].get(tcb, total)
74            sample[tcb] = (total, total - ptotal)
75        interval = self._device.uptime - self._psl["hrt"]
76        # remember the new system load as previous load
77        self._psl = sl
78        return (sl["start"], interval, sample)

Samples the cpuload monitor and computes the difference to the last sample.

Returns

a tuple of (start time since enabled [µs], interval from last sample [µs], tasks dict[tcb [ptr] -> (total runtime [µs], difference from last sample [µs])]).

def system_load(gdb):
82def system_load(gdb):
83    """
84    :return: The SystemLoad singleton object.
85    """
86    global _SYSTEM_LOAD
87    if _SYSTEM_LOAD is None:
88        _SYSTEM_LOAD = SystemLoad(gdb)
89    return _SYSTEM_LOAD
Returns

The SystemLoad singleton object.

def restart_system_load_monitor(gdb):
92def restart_system_load_monitor(gdb):
93    """
94    Starts the system load monitor if not started, else resets the sample
95    interval to right now.
96    """
97    system_load(gdb).sample
98    system_load(gdb).restart()

Starts the system load monitor if not started, else resets the sample interval to right now.