Skip to content

Commit

Permalink
Faster detection for Mosaic X5
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefal committed Dec 21, 2024
1 parent c4cb52a commit 8507b38
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 31 deletions.
4 changes: 2 additions & 2 deletions tools/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,8 @@ configure_gnss(){
#remove SBAS Rtcm message (1107) as it is disabled in the F9P configuration.
sudo -u "${RTKBASE_USER}" sed -i -r '/^rtcm_/s/1107(\([0-9]+\))?,//' "${rtkbase_path}"/settings.conf && \
return $?

elif [[ $(python3 "${rtkbase_path}"/tools/sept_tool.py --port /dev/ttyGNSS_CTRL --baudrate ${com_port_settings%%:*} --command get_model --retry 5) =~ 'mosaic-X5' ]]
elif [[ $(python3 "${rtkbase_path}"/tools/sept_tool.py --port /dev/ttyGNSS_CTRL --baudrate ${com_port_settings%%:*} --command get_model --retry 2) =~ 'mosaic-X5' ]]
then
#get mosaic-X5 firmware release
firmware="$(python3 "${rtkbase_path}"/tools/sept_tool.py --port /dev/ttyGNSS_CTRL --baudrate ${com_port_settings%%:*} --command get_firmware --retry 5)" || firmware='?'
Expand Down
57 changes: 29 additions & 28 deletions tools/septentrio/septentrio_cmd.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#! /usr/bin/env python3
from . serial_comm import SerialComm
from enum import Enum
from logging import getLogger
import logging
import xml.etree.ElementTree as ET
import time
#Code inspired by https://github.com/jonamat/sim-modem


logging.basicConfig(format='%(levelname)s: %(message)s')
log = logging.getLogger(__name__)
log.setLevel('ERROR')

class SeptGnss:
"""Class for sending command to Septentrio Gnss receivers"""

Expand All @@ -24,27 +28,30 @@ def __init__(
cmd_delay=cmd_delay,
connection_descriptor='USB2>',
)
self.debug = debug
if debug:
log.setLevel('DEBUG')
self.connect()

def connect(self) -> None:
self.comm.send('exeEchoMessage, COM1, "A:HELLO", none')
read = self.comm.read_raw(1000)
try:
check_hello = b"A:HELLO" in read
res_descriptor = read.decode().split()[-1]
log.debug("Connecting")
self.comm.device_serial.reset_input_buffer()
self.comm.send('exeEchoMessage, COM1, "A:HELLO", none')
read = self.comm.read_until_line("A:HELLO")
read = self.comm.device_serial.readline() #line we don't need
read = self.comm.read_raw(5) #reading next 5 chars to get connection descriptor
res_descriptor = read.decode()
check_descriptor = 'COM' in res_descriptor or 'USB' in res_descriptor or 'IP1' in res_descriptor
if check_hello and check_descriptor:
self.comm.connection_descriptor = read.decode().split()[-1]
if check_descriptor:
self.comm.connection_descriptor = res_descriptor
else:
raise Exception
if self.debug:
print("GNSS receiver connected, debug mode enabled")
print("Connection descriptor: {}".format(self.comm.connection_descriptor))
except Exception:
log.debug("GNSS receiver connected, debug mode enabled")
log.debug("Connection descriptor: {}".format(self.comm.connection_descriptor))
except Exception as e:
print("GNSS receiver did not respond correctly")
if self.debug:
print(read)
log.debug(read)
log.debug("Exception: ", e)
self.close()

def close(self) -> None:
Expand Down Expand Up @@ -82,7 +89,7 @@ def __parse_rcv_info(self, pseudo_xml, element_tag, info) -> str:
e = ET.ElementTree(ET.fromstring(''.join(pseudo_xml)))
res_info = None
for element in e.iter():
#print(element.tag, element.attrib, element.text)
log.debug(element.tag, element.attrib, element.text)
res_info = element.get(info) if element_tag in element.tag and element.get(info) else res_info
return res_info

Expand All @@ -98,12 +105,10 @@ def set_factory_default(self) -> None:
Reset receiver settings to factory defaults and restart it
Connection will be closed
'''
if self.debug:
print("Sending: 'exeResetReceiver, Soft, Config'")
log.debug("Sending: 'exeResetReceiver, Soft, Config'")
self.comm.send('exeResetReceiver, Soft, Config')
read = self.comm.read_until('STOP>')
if self.debug:
print("Receiving: {}".format(read))
log.debug("Receiving: {}".format(read))
if read[-1] != 'STOP>' or read[0].startswith('$R?'):
raise Exception("Command failed!\nSent: 'exeResetReceiver, Soft, Config'\nReceived: {}".format(read))
self.close()
Expand Down Expand Up @@ -132,23 +137,19 @@ def set_config_permanent(self) -> None:
# ----------------------------------- OTHERS --------------------------------- #

def send_read_lines(self, cmd, *args) -> list:
if self.debug:
print("Sending: {}{}{}".format(cmd, ', ' if args else '', ', '.join(args)))
log.debug("Sending: {}{}{}".format(cmd, ', ' if args else '', ', '.join(args)))
self.comm.send("{}{}{}".format(cmd, ', ' if args else '', ', '.join(args)))
read = self.comm.read_lines()
if self.debug:
print("Receiving: {}".format(read))
log.debug("Receiving: {}".format(read))
if read[-1] != self.comm.connection_descriptor or read[0].startswith('$R?'):
raise Exception("Command failed!\nSent: {}\nReceived: {}".format((cmd + ', ' + ', '.join(args)), read))
return read

def send_read_until(self, cmd, *args) -> list:
if self.debug:
print("Sending: {}{}{}".format(cmd, ', ' if args else '', ', '.join(args)))
log.debug("Sending: {}{}{}".format(cmd, ', ' if args else '', ', '.join(args)))
self.comm.send("{}{}{}".format(cmd, ', ' if args else '', ', '.join(args)))
read = self.comm.read_until()
if self.debug:
print("Receiving: {}".format(read))
log.debug("Receiving: {}".format(read))
if read[-1] != self.comm.connection_descriptor or read[0].startswith('$R?'):
raise Exception("Command failed!\nSent: {}\nReceived: {}".format((cmd + ', ' + ', '.join(args)), read))
return read
9 changes: 8 additions & 1 deletion tools/septentrio/serial_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ def read_until(self, expect = None) -> list:
read = read.decode(self.byte_encoding).strip().splitlines()
read = [ val for val in read if val != '']
return read


def read_until_line(self, expected='\r\n', eol='\r\n') -> str:
read_start = self.device_serial.read_until(expected = expected.encode())
read_start = read_start.decode(self.byte_encoding, errors='ignore').strip().splitlines()[-1]
if expected in read_start:
read_end = self.device_serial.readline().decode(self.byte_encoding, errors='ignore')
return read_start + read_end

def read_raw(self, size: int):
return self.device_serial.read(size)

Expand Down

0 comments on commit 8507b38

Please sign in to comment.