Skip to content

Commit

Permalink
bytestring nodeid base64
Browse files Browse the repository at this point in the history
  • Loading branch information
schroeder- authored and oroulet committed Oct 11, 2023
1 parent cac2d1c commit ab6d30a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
12 changes: 10 additions & 2 deletions asyncua/ua/uatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
implement ua datatypes
"""

import binascii
import sys
from typing import Optional, Any, Union, Generic, List
import collections
Expand All @@ -12,6 +13,7 @@
import itertools
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass, field
from base64 import b64encode, b64decode

# hack to support python < 3.8
if sys.version_info.minor < 10:
Expand Down Expand Up @@ -548,9 +550,15 @@ def _from_string(string):
elif k == "b":
ntype = NodeIdType.ByteString
if v[0:2] == '0x':
# our custom handling, normaly all bytestring nodes are base64
identifier = bytes.fromhex(v[2:])
else:
identifier = v.encode()
try:
# this should be the default case base64
identifier = b64decode(v.encode("ascii"))
except (binascii.Error, ValueError):
# fallback for backwards compatiblity just use the bytestring
identifier = v.encode('ascii')
elif k == "srv":
srv = int(v.strip())
elif k == "nsu":
Expand Down Expand Up @@ -579,7 +587,7 @@ def to_string(self):
ntype = "g"
elif self.NodeIdType == NodeIdType.ByteString:
ntype = "b"
identifier = '0x' + identifier.hex()
identifier = b64encode(identifier).decode('ascii')
string.append(f"{ntype}={identifier}")
return ";".join(string)

Expand Down
4 changes: 3 additions & 1 deletion tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import tempfile
from datetime import datetime, timedelta
from pathlib import Path

from base64 import b64encode
import pytest

from asyncua import Node, ua, uamethod
Expand Down Expand Up @@ -182,6 +182,8 @@ async def test_node_bytestring(opc):
)
node = opc.opc.get_node("ns=2;b=VarByteString")
assert node == var
node = opc.opc.get_node(f"ns=2;b={b64encode(b'VarByteString').decode()}")
assert node == var
node = opc.opc.get_node(f"ns=2;b=0x{b'VarByteString'.hex()}")
assert node == var

Expand Down

0 comments on commit ab6d30a

Please sign in to comment.