diff --git a/canopen/objectdictionary/__init__.py b/canopen/objectdictionary/__init__.py index c8de5aa4..99502c13 100644 --- a/canopen/objectdictionary/__init__.py +++ b/canopen/objectdictionary/__init__.py @@ -208,11 +208,16 @@ def __init__(self, name: str, index: int): self.name = name #: Storage location of index self.storage_location: Optional[str] = None + #: CiA 306 ObjFlags bitfield + self.obj_flags: int = 0 + #: CiA 306 Denotation string (DCF only) + self.denotation: str = "" self.subindices: dict[int, ODVariable] = {} self.names: dict[str, ODVariable] = {} def __repr__(self) -> str: - return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" + flags = f" flags=0x{self.obj_flags:X}" if self.obj_flags else "" + return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}{flags}>" def __getitem__(self, subindex: Union[int, str]) -> ODVariable: item = self.names.get(subindex) or self.subindices.get(subindex) @@ -269,11 +274,16 @@ def __init__(self, name: str, index: int): self.name = name #: Storage location of index self.storage_location: Optional[str] = None + #: CiA 306 ObjFlags bitfield + self.obj_flags: int = 0 + #: CiA 306 Denotation string (DCF only) + self.denotation: str = "" self.subindices: dict[int, ODVariable] = {} self.names: dict[str, ODVariable] = {} def __repr__(self) -> str: - return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" + flags = f" flags=0x{self.obj_flags:X}" if self.obj_flags else "" + return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}{flags}>" def __getitem__(self, subindex: Union[int, str]) -> ODVariable: var = self.names.get(subindex) or self.subindices.get(subindex) @@ -379,12 +389,17 @@ def __init__(self, name: str, index: int, subindex: int = 0): self.bit_definitions: dict[str, list[int]] = {} #: Storage location of index self.storage_location: Optional[str] = None + #: CiA 306 ObjFlags bitfield + self.obj_flags: int = 0 + #: CiA 306 Denotation string (DCF only) + self.denotation: str = "" #: Can this variable be mapped to a PDO self.pdo_mappable = False def __repr__(self) -> str: subindex = self.subindex if isinstance(self.parent, (ODRecord, ODArray)) else None - return f"<{type(self).__qualname__} {self.qualname!r} at {pretty_index(self.index, subindex)}>" + flags = f" flags=0x{self.obj_flags:X}" if self.obj_flags else "" + return f"<{type(self).__qualname__} {self.qualname!r} at {pretty_index(self.index, subindex)}{flags}>" @property def qualname(self) -> str: diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index d47a3019..add01e88 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -3,7 +3,9 @@ import copy import logging import re +import sys from configparser import NoOptionError, NoSectionError, RawConfigParser +from datetime import datetime as dt from typing import TYPE_CHECKING from canopen.objectdictionary import ( @@ -24,7 +26,8 @@ def import_eds(source, node_id): eds = RawConfigParser(inline_comment_prefixes=(';',)) - eds.optionxform = str + eds.optionxform = str # type: ignore[assignment] + fp = None opened_here = False try: if hasattr(source, "read"): @@ -35,7 +38,7 @@ def import_eds(source, node_id): eds.read_file(fp) finally: # Only close object if opened in this fn - if opened_here: + if opened_here and fp is not None: fp.close() od = ObjectDictionary() @@ -139,14 +142,20 @@ def import_eds(source, node_id): arr.add_member(last_subindex) arr.add_member(build_variable(eds, section, node_id, object_type, index, 1)) arr.storage_location = storage_location + arr.obj_flags = _get_obj_flags(eds, section) + arr.denotation = eds.get(section, "Denotation") if eds.has_option(section, "Denotation") else "" od.add_object(arr) elif object_type == objectcodes.ARRAY: arr = ODArray(name, index) arr.storage_location = storage_location + arr.obj_flags = _get_obj_flags(eds, section) + arr.denotation = eds.get(section, "Denotation") if eds.has_option(section, "Denotation") else "" od.add_object(arr) elif object_type == objectcodes.RECORD: record = ODRecord(name, index) record.storage_location = storage_location + record.obj_flags = _get_obj_flags(eds, section) + record.denotation = eds.get(section, "Denotation") if eds.has_option(section, "Denotation") else "" od.add_object(record) continue @@ -171,8 +180,10 @@ def import_eds(source, node_id): index = int(match.group(1), 16) num_of_entries = int(eds.get(section, "NrOfEntries")) entry = od[index] + if not isinstance(entry, (ODRecord, ODArray)): + continue # For CompactSubObj index 1 is were we find the variable - src_var = od[index][1] + src_var = entry[1] for subindex in range(1, num_of_entries + 1): var = copy_variable(eds, section, subindex, src_var) if var is not None: @@ -258,6 +269,15 @@ def _revert_variable(var_type, value): return f"0x{value:02X}" +def _get_obj_flags(eds, section): + if eds.has_option(section, "ObjFlags"): + try: + return int(eds.get(section, "ObjFlags"), 0) + except ValueError: + pass + return 0 + + def build_variable( eds: RawConfigParser, section: str, @@ -321,16 +341,16 @@ def build_variable( pass if eds.has_option(section, "DefaultValue"): try: - var.default_raw = eds.get(section, "DefaultValue") - if '$NODEID' in var.default_raw: + var.default_raw = eds.get(section, "DefaultValue") # type: ignore[attr-defined] + if '$NODEID' in var.default_raw: # type: ignore[attr-defined] var.relative = True - var.default = _convert_variable(node_id, var.data_type, var.default_raw) + var.default = _convert_variable(node_id, var.data_type, var.default_raw) # type: ignore[assignment,attr-defined] except ValueError: pass if eds.has_option(section, "ParameterValue"): try: - var.value_raw = eds.get(section, "ParameterValue") - var.value = _convert_variable(node_id, var.data_type, var.value_raw) + var.value_raw = eds.get(section, "ParameterValue") # type: ignore[attr-defined] + var.value = _convert_variable(node_id, var.data_type, var.value_raw) # type: ignore[assignment,attr-defined] except ValueError: pass # Factor, Description and Unit are not standard according to the CANopen specifications, but @@ -350,6 +370,9 @@ def build_variable( var.unit = eds.get(section, "Unit") except ValueError: pass + var.obj_flags = _get_obj_flags(eds, section) + if eds.has_option(section, "Denotation"): + var.denotation = eds.get(section, "Denotation") return var @@ -425,12 +448,21 @@ def export_variable(var, eds): if getattr(var, 'unit', '') != '': eds.set(section, "Unit", var.unit) + if getattr(var, 'obj_flags', 0) != 0: + eds.set(section, "ObjFlags", f"0x{var.obj_flags:X}") + if device_commisioning and getattr(var, 'denotation', '') != '': + eds.set(section, "Denotation", var.denotation) + def export_record(var, eds): section = f"{var.index:04X}" export_common(var, eds, section) eds.set(section, "SubNumber", f"0x{len(var.subindices):X}") ot = objectcodes.RECORD if isinstance(var, ODRecord) else objectcodes.ARRAY eds.set(section, "ObjectType", f"0x{ot:X}") + if getattr(var, 'obj_flags', 0) != 0: + eds.set(section, "ObjFlags", f"0x{var.obj_flags:X}") + if device_commisioning and getattr(var, 'denotation', '') != '': + eds.set(section, "Denotation", var.denotation) for i in var: export_variable(var[i], eds) @@ -438,10 +470,9 @@ def export_record(var, eds): eds = RawConfigParser() # both disables lowercasing, and allows int keys - eds.optionxform = str + eds.optionxform = str # type: ignore[assignment] - from datetime import datetime as dt - defmtime = dt.utcnow() + defmtime = dt.now() try: # only if eds was loaded by us @@ -450,12 +481,12 @@ def export_record(var, eds): origFileInfo = { # just set some defaults "CreationDate": defmtime.strftime("%m-%d-%Y"), - "CreationTime": defmtime.strftime("%I:%m%p"), + "CreationTime": defmtime.strftime("%I:%M%p"), "EdsVersion": 4.2, } file_info.setdefault("ModificationDate", defmtime.strftime("%m-%d-%Y")) - file_info.setdefault("ModificationTime", defmtime.strftime("%I:%m%p")) + file_info.setdefault("ModificationTime", defmtime.strftime("%I:%M%p")) for k, v in origFileInfo.items(): file_info.setdefault(k, v) @@ -485,34 +516,36 @@ def export_record(var, eds): continue elif isinstance(val, str): eds.set("DeviceInfo", eprop, val) - elif isinstance(val, (int, bool)): - eds.set("DeviceInfo", eprop, int(val)) + elif isinstance(val, bool): + eds.set("DeviceInfo", eprop, str(int(val))) + elif isinstance(val, int): + eds.set("DeviceInfo", eprop, str(val)) # we are also adding out of spec baudrates here. for rate in od.device_information.allowed_baudrates.union( {10e3, 20e3, 50e3, 125e3, 250e3, 500e3, 800e3, 1000e3}): eds.set( "DeviceInfo", f"BaudRate_{int(rate//1000)}", - int(rate in od.device_information.allowed_baudrates)) + str(int(rate in od.device_information.allowed_baudrates))) if device_commisioning and (od.bitrate or od.node_id): eds.add_section("DeviceComissioning") if od.bitrate: - eds.set("DeviceComissioning", "Baudrate", int(od.bitrate / 1000)) + eds.set("DeviceComissioning", "Baudrate", str(od.bitrate // 1000)) if od.node_id: - eds.set("DeviceComissioning", "NodeID", int(od.node_id)) + eds.set("DeviceComissioning", "NodeID", str(od.node_id)) eds.add_section("Comments") i = 0 for line in od.comments.splitlines(): i += 1 eds.set("Comments", f"Line{i}", line) - eds.set("Comments", "Lines", i) + eds.set("Comments", "Lines", str(i)) eds.add_section("DummyUsage") for i in range(1, 8): key = f"Dummy{i:04d}" - eds.set("DummyUsage", key, 1 if (key in od) else 0) + eds.set("DummyUsage", key, str(1 if (key in od) else 0)) def mandatory_indices(x): return x in {0x1000, 0x1001, 0x1018} @@ -533,9 +566,9 @@ def optional_indices(x): def add_list(section, list): eds.add_section(section) - eds.set(section, "SupportedObjects", len(list)) + eds.set(section, "SupportedObjects", str(len(list))) for i in range(0, len(list)): - eds.set(section, (i + 1), f"0x{list[i]:04X}") + eds.set(section, str(i + 1), f"0x{list[i]:04X}") for index in list: export_object(od[index], eds) @@ -544,7 +577,6 @@ def add_list(section, list): add_list("ManufacturerObjects", supported_manufacturer_indices) if not dest: - import sys dest = sys.stdout eds.write(dest, False) diff --git a/test/sample.eds b/test/sample.eds index ad00a12e..ccf94f71 100644 --- a/test/sample.eds +++ b/test/sample.eds @@ -1025,6 +1025,14 @@ DataType=0x0007 AccessType=rw PDOMapping=0 +[3060] +ParameterName=Object with ObjFlags +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 +ObjFlags=0x1 + [3064] ParameterName=Record with DOMAIN sub-object SubNumber=0x2 @@ -1044,3 +1052,24 @@ ObjectType=0x2 DataType=0x0007 AccessType=rw PDOMapping=0 + +[3065] +ParameterName=Record with ObjFlags +ObjectType=0x9 +ObjFlags=0x3 +SubNumber=0x2 + +[3065sub0] +ParameterName=Highest sub-index supported +ObjectType=0x7 +DataType=0x0005 +AccessType=ro +DefaultValue=0x01 +PDOMapping=0 + +[3065sub1] +ParameterName=Value +ObjectType=0x7 +DataType=0x0007 +AccessType=rw +PDOMapping=0 diff --git a/test/test_eds.py b/test/test_eds.py index 7a19ffeb..09bfde8b 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -1,8 +1,12 @@ +import configparser +import contextlib +import io import os import unittest import canopen -from canopen.objectdictionary.eds import _signed_int_from_hex +from canopen import objectdictionary +from canopen.objectdictionary.eds import _get_obj_flags, _signed_int_from_hex from canopen.utils import pretty_index from .util import DATATYPES_EDS, SAMPLE_EDS, tmp_file @@ -70,8 +74,6 @@ def test_load_implicit_nodeid(self): self.assertEqual(od.node_id, 16) def test_load_implicit_nodeid_fallback(self): - import io - # First, remove the NodeID option from DeviceComissioning. with open(SAMPLE_EDS) as f: lines = [L for L in f.readlines() if not L.startswith("NodeID=")] @@ -93,8 +95,6 @@ def test_load_baudrate(self): self.assertEqual(od.bitrate, 500_000) def test_load_baudrate_fallback(self): - import io - # Remove the Baudrate option. with open(SAMPLE_EDS) as f: lines = [L for L in f.readlines() if not L.startswith("Baudrate=")] @@ -105,11 +105,11 @@ def test_load_baudrate_fallback(self): def test_variable(self): var = self.od['Producer heartbeat time'] - self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertIsInstance(var, objectdictionary.ODVariable) self.assertEqual(var.index, 0x1017) self.assertEqual(var.subindex, 0) self.assertEqual(var.name, 'Producer heartbeat time') - self.assertEqual(var.data_type, canopen.objectdictionary.UNSIGNED16) + self.assertEqual(var.data_type, objectdictionary.UNSIGNED16) self.assertEqual(var.access_type, 'rw') self.assertFalse(var.is_domain) self.assertEqual(var.default, 0) @@ -118,20 +118,21 @@ def test_variable(self): def test_relative_variable(self): var = self.od['Receive PDO 0 Communication Parameter']['COB-ID use by RPDO 1'] self.assertTrue(var.relative) + assert self.od.node_id is not None self.assertEqual(var.default, 512 + self.od.node_id) def test_record(self): record = self.od['Identity object'] - self.assertIsInstance(record, canopen.objectdictionary.ODRecord) + self.assertIsInstance(record, objectdictionary.ODRecord) self.assertEqual(len(record), 4) self.assertEqual(record.index, 0x1018) self.assertEqual(record.name, 'Identity object') var = record['Vendor-ID'] - self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertIsInstance(var, objectdictionary.ODVariable) self.assertEqual(var.name, 'Vendor-ID') self.assertEqual(var.index, 0x1018) self.assertEqual(var.subindex, 1) - self.assertEqual(var.data_type, canopen.objectdictionary.UNSIGNED32) + self.assertEqual(var.data_type, objectdictionary.UNSIGNED32) self.assertEqual(var.access_type, 'ro') self.assertFalse(var.is_domain) @@ -158,15 +159,15 @@ def test_signed_int_from_hex(self): def test_array_compact_subobj(self): array = self.od[0x1003] - self.assertIsInstance(array, canopen.objectdictionary.ODArray) + self.assertIsInstance(array, objectdictionary.ODArray) self.assertEqual(array.index, 0x1003) self.assertEqual(array.name, 'Pre-defined error field') var = array[5] - self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertIsInstance(var, objectdictionary.ODVariable) self.assertEqual(var.name, 'Pre-defined error field_5') self.assertEqual(var.index, 0x1003) self.assertEqual(var.subindex, 5) - self.assertEqual(var.data_type, canopen.objectdictionary.UNSIGNED32) + self.assertEqual(var.data_type, objectdictionary.UNSIGNED32) self.assertEqual(var.access_type, 'ro') self.assertFalse(var.is_domain) @@ -194,11 +195,11 @@ def test_sub_index_w_capital_s(self): def test_dummy_variable(self): var = self.od['Dummy0003'] - self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertIsInstance(var, objectdictionary.ODVariable) self.assertEqual(var.index, 0x0003) self.assertEqual(var.subindex, 0) self.assertEqual(var.name, 'Dummy0003') - self.assertEqual(var.data_type, canopen.objectdictionary.INTEGER16) + self.assertEqual(var.data_type, objectdictionary.INTEGER16) self.assertEqual(var.access_type, 'const') self.assertFalse(var.is_domain) self.assertEqual(len(var), 16) @@ -219,28 +220,27 @@ def test_reading_factor(self): def test_read_domain_object(self): var = self.od[0x3063] - self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertIsInstance(var, objectdictionary.ODVariable) self.assertEqual(var.index, 0x3063) self.assertEqual(var.subindex, 0) self.assertEqual(var.name, 'DOMAIN object') - self.assertEqual(var.data_type, canopen.objectdictionary.UNSIGNED32) + self.assertEqual(var.data_type, objectdictionary.UNSIGNED32) self.assertEqual(var.access_type, 'rw') self.assertTrue(var.is_domain) def test_read_domain_subobject(self): record = self.od[0x3064] var = record[1] - self.assertIsInstance(var, canopen.objectdictionary.ODVariable) + self.assertIsInstance(var, objectdictionary.ODVariable) self.assertEqual(var.index, 0x3064) self.assertEqual(var.subindex, 1) self.assertEqual(var.name, 'DOMAIN sub-object') - self.assertEqual(var.data_type, canopen.objectdictionary.UNSIGNED32) + self.assertEqual(var.data_type, objectdictionary.UNSIGNED32) self.assertEqual(var.access_type, 'rw') self.assertTrue(var.is_domain) def test_roundtrip_domain_objects(self): # ObjectType==DOMAIN survive an EDS export/import round-trip - import io with io.StringIO() as dest: canopen.export_od(self.od, dest, 'eds') dest.name = 'mock.eds' @@ -271,7 +271,6 @@ def test_export_eds_to_file(self): self.verify_od(dest, doctype) def test_export_eds_to_file_unknown_extension(self): - import io for suffix in ".txt", "": with tmp_file(suffix=suffix) as tmp: dest = tmp.name @@ -294,7 +293,6 @@ def test_export_eds_to_file_unknown_extension(self): self.verify_od(buf, "eds") def test_export_eds_unknown_doctype(self): - import io filelike_object = io.StringIO() self.addCleanup(filelike_object.close) for dest in "filename", None, filelike_object: @@ -307,7 +305,6 @@ def test_export_eds_unknown_doctype(self): os.stat(dest) def test_export_eds_to_filelike_object(self): - import io for doctype in "eds", "dcf": with io.StringIO() as dest: with self.subTest(dest=dest, doctype=doctype): @@ -320,8 +317,6 @@ def test_export_eds_to_filelike_object(self): self.verify_od(dest, doctype) def test_export_eds_to_stdout(self): - import contextlib - import io with contextlib.redirect_stdout(io.StringIO()) as f: ret = canopen.export_od(self.od, None, "eds") self.assertIsNone(ret) @@ -353,7 +348,7 @@ def verify_od(self, source, doctype): self.assertEqual(type(actual_object), type(expected_object)) self.assertEqual(actual_object.name, expected_object.name) - if isinstance(actual_object, canopen.objectdictionary.ODVariable): + if isinstance(actual_object, objectdictionary.ODVariable): expected_vars = [expected_object] actual_vars = [actual_object] else: @@ -396,6 +391,81 @@ def verify_od(self, source, doctype): self.assertEqual(self.od.comments, exported_od.comments) + def test_reading_obj_flags(self): + var = self.od[0x3060] + self.assertIsInstance(var, objectdictionary.ODVariable) + self.assertEqual(var.obj_flags, 0x1) + + def test_reading_obj_flags_default(self): + """Standard objects without ObjFlags must have obj_flags == 0.""" + var = self.od[0x1017] # Producer heartbeat time — no ObjFlags in sample.eds + self.assertEqual(var.obj_flags, 0) + + def test_reading_obj_flags_record(self): + record = self.od[0x3065] + self.assertIsInstance(record, objectdictionary.ODRecord) + self.assertEqual(record.obj_flags, 0x3) + + def test_roundtrip_obj_flags(self): + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3060].obj_flags, 0x1) + self.assertEqual(od2[0x1017].obj_flags, 0) + + def test_roundtrip_obj_flags_record(self): + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3065].obj_flags, 0x3) + + def test_invalid_obj_flags_returns_zero(self): + eds = configparser.RawConfigParser() + eds.optionxform = str + eds.add_section("3060") + eds.set("3060", "ObjFlags", "not_a_number") + self.assertEqual(_get_obj_flags(eds, "3060"), 0) + + def test_denotation_roundtrip_dcf(self): + self.od[0x3060].denotation = 'FlaggedObject' + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'dcf') + dest.name = 'mock.dcf' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3060].denotation, 'FlaggedObject') + + def test_denotation_not_exported_in_eds_mode(self): + self.od[0x3060].denotation = 'ShouldNotAppear' + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'eds') + dest.name = 'mock.eds' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3060].denotation, '') + + def test_obj_flags_in_repr(self): + var = self.od[0x3060] + self.assertIn("flags=0x1", repr(var)) + record = self.od[0x3065] + self.assertIn("flags=0x3", repr(record)) + # zero flags must not clutter repr + self.assertNotIn("flags", repr(self.od[0x1017])) + + def test_denotation_record_roundtrip_dcf(self): + """Denotation on ODRecord/ODArray is preserved in DCF round-trip.""" + self.od[0x3065].denotation = 'RecordLabel' + with io.StringIO() as dest: + canopen.export_od(self.od, dest, 'dcf') + dest.name = 'mock.dcf' + dest.seek(0) + od2 = canopen.import_od(dest) + self.assertEqual(od2[0x3065].denotation, 'RecordLabel') + if __name__ == "__main__": unittest.main()