-
Notifications
You must be signed in to change notification settings - Fork 0
/
tigo.py
299 lines (241 loc) · 12.2 KB
/
tigo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/python3
import re
import argparse
import requests
import time
import sys
import json
import logging
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
import paho.mqtt.client as mqtt
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# Set up argument parser
parser = argparse.ArgumentParser(description="Tigo CCA Data Publisher")
parser.add_argument('--device-name-prefix', default='Tigo Optimizer', help='Prefix for device name in Home Assistant')
parser.add_argument('--device-model', default='TS4-A-O', help='Model name for the device in Home Assistant')
parser.add_argument('--mqtt-broker', default='192.168.1.250', help='MQTT broker address')
parser.add_argument('--mqtt-port', type=int, default=1883, help='MQTT broker port')
parser.add_argument('--mqtt-user', default='', help='MQTT username')
parser.add_argument('--mqtt-pass', default='', help='MQTT password')
parser.add_argument('--tigo-router', default='10.11.1.211', help='Tigo router IP address')
parser.add_argument('--poll-interval', type=int, default=10, help='Time in seconds between each poll/publish cycle')
parser.add_argument('--topic-base', default='homeassistant/sensor/energy/tigo', help='Base MQTT topic for Home Assistant')
parser.add_argument('--log-file', default=None, help='Path to log file') # New log-file argument
parser.add_argument('--timeout', type=int, default=5, help='Timeout in seconds for requests') # New timeout argument
parser.add_argument('-debug', action='store_true', help='Enable debug mode') # Add debug flag
args = parser.parse_args()
# Set up logging
log_level = logging.DEBUG if args.debug else logging.INFO
log_format = "%(asctime)s - %(levelname)s - %(message)s"
# Create logger
logger = logging.getLogger()
logger.setLevel(log_level)
# Create handlers
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(console_handler)
if args.log_file:
file_handler = logging.FileHandler(args.log_file)
file_handler.setLevel(log_level)
file_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(file_handler)
# Assign values from arguments
tigo_router = args.tigo_router
mqtt_broker = args.mqtt_broker
mqtt_port = args.mqtt_port
mqtt_user = args.mqtt_user
mqtt_pass = args.mqtt_pass
poll_interval = args.poll_interval
topic_base = args.topic_base
timeout = args.timeout # Assign the timeout from the arguments
client_id = "tigo_energy_client" # Unique client ID for MQTT connection
url = 'http://' + tigo_router + '/cgi-bin/mmdstatus'
# Create session and set basic auth
session = requests.Session()
session.auth = ('Tigo', '$olar')
# Set up retries and keep-alive to prevent dropped connections
retry_strategy = Retry(
total=3, # Number of retries before giving up
status_forcelist=[429, 500, 502, 503, 504], # Retry on these HTTP status codes
method_whitelist=["HEAD", "GET", "OPTIONS"], # Retry on specific HTTP methods
backoff_factor=0.3 # Exponential backoff factor (0.3, 0.6, 1.2, etc.)
)
adapter = HTTPAdapter(max_retries=retry_strategy)
# Mount adapter to the session
session.mount("http://", adapter)
session.mount("https://", adapter)
# Optional: TCP keep-alive
session.keep_alive = True
mqttc = mqtt.Client(client_id=client_id, clean_session=False) # Set client_id and disable clean session
mqttc.username_pw_set(mqtt_user, mqtt_pass)
# Enable detailed MQTT logging
mqttc.on_log = lambda client, userdata, level, buf: logger.debug(f"MQTT Log: {buf}")
logger.debug(f"Connecting to MQTT broker at {mqtt_broker}:{mqtt_port}...")
try:
mqttc.connect(mqtt_broker, mqtt_port, keepalive=120) # Set keepalive to 120 seconds
mqttc.loop_start()
logger.info("MQTT connection established.")
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
sys.exit(1)
def publish_discovery_message(sensor_id, unique_id, name, state_topic, unit_of_measurement, device_class, device):
sanitized_sensor_id = re.sub(r'[^a-zA-Z0-9-_]', '_', sensor_id)
discovery_topic = f"homeassistant/sensor/{sanitized_sensor_id}/config"
discovery_payload = {
"name": name,
"unique_id": unique_id,
"state_topic": state_topic,
"unit_of_measurement": unit_of_measurement,
"device_class": device_class,
"device": device
}
mqttc.publish(discovery_topic, json.dumps(discovery_payload))
logger.debug(f"Published discovery message for {name} to {discovery_topic}: {json.dumps(discovery_payload)}")
def poll_tigo():
try:
response = session.get(url, timeout=timeout) # Use the timeout argument here
if response.status_code == 200:
html = response.content
soup = BeautifulSoup(html, 'html.parser')
table = soup.find("table", {"class": "list_tb"})
if table is None:
logger.debug("Data table not found in the HTML response.")
return None
rows = table.find_all('tr')
d_ = {}
d_['headline'] = ['Label', 'Barcode', 'MAC', 'Voltage_Vin', 'Voltage_Vin_%', 'Voltage_Vout', 'Voltage_Vout_%', 'Current_A', 'Power_W', 'Power_%', 'Temp_C', 'RSSI', 'BRSSI', 'Slot', 'VMPE', 'VMPE', 'Sync/Evt', 'Mode', 'Bypass', 'Event', 'Raw', 'Extra_Raw', 'Details_Raw']
for row in rows:
line = row.find_all('td')
line = [e_.text.strip() for e_ in line]
logger.debug(f"Parsed line data: {line}")
if len(line) > 10:
bc = line[0] + '___' + line[1]
d_[bc] = {}
for i in range(len(line)):
line[i] = line[i].replace('%', '')
line[i] = line[i].replace('\xa0', '_')
line[i] = line[i].replace('/', '-') # No longer replacing 'on' and 'off'
try:
if '.' in line[i]:
d_[bc][d_['headline'][i]] = float(line[i])
else:
d_[bc][d_['headline'][i]] = int(line[i])
except Exception as e:
d_[bc][d_['headline'][i]] = line[i]
# Ensure VMPE is correctly captured
if len(line) >= 15: # Check if we have enough columns for VMPE
d_[bc]['VMPE'] = int(line[14]) if line[14].isdigit() else line[14]
d_.pop('headline', None)
logger.debug(f"Data parsed from Tigo (after removing 'headline'): {d_}")
return d_
else:
logger.debug(f"Failed to connect to Tigo router, status code: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error polling Tigo: {e}")
return None
def publish_mqtt(d_):
if not isinstance(d_, dict):
logger.debug(f"Data is not in the expected format (dictionary). Actual type: {type(d_)}")
return
for panel_id, panel_data in d_.items():
sanitized_panel_id = panel_id.replace('___', '_').replace('-', '_') # Ensure clean panel ID
device_name = f"{args.device_name_prefix} {panel_data['Label']}-{panel_data['Barcode']}" # Create readable device name
sensor_id_base = f"energy_tigo_{sanitized_panel_id}"
device = {
"identifiers": [sensor_id_base],
"name": device_name,
"manufacturer": "Tigo",
"model": args.device_model
}
for metric, value in panel_data.items():
# Sensor ID for MQTT topic (removes special characters)
sanitized_metric = metric.replace('%', '').replace(' ', '_').replace('-', '_')
sensor_id = f"{sensor_id_base}_{sanitized_metric}"
unique_id = sensor_id # Unique ID for Home Assistant
# Only the metric name in the entity name
name = metric
state_topic = f"{topic_base}/{sanitized_panel_id}/{sanitized_metric}"
# Default values
unit_of_measurement = None
device_class = None
# Handle numeric and specific sensor data
if "Power_W" in metric:
unit_of_measurement = "W"
device_class = "power"
elif "Current_A" in metric:
unit_of_measurement = "A"
device_class = "current"
elif "Voltage" in metric:
unit_of_measurement = "V"
device_class = "voltage"
elif "Temp_C" in metric:
unit_of_measurement = "°C"
device_class = "temperature"
elif "RSSI" in metric or "BRSSI" in metric:
unit_of_measurement = "dBm"
device_class = "signal_strength"
elif metric in ["Bypass", "Event", "Raw", "Extra_Raw", "Details_Raw", "Mode", "Sync/Evt"]:
device_class = "enum"
elif metric in ["Label", "Barcode", "MAC", "Slot"]:
# Fields like Label, Barcode, MAC, and Slot shouldn't have units or classes
unit_of_measurement = None
device_class = None
else:
logger.debug(f"No unit of measurement or device class for metric: {metric}")
# Handle non-numeric values
if isinstance(value, str) and value == 'n-a':
logger.debug(f"Non-numeric value for {metric}: {value}. Replacing with None.")
value = None # Skip non-numeric values
# Publish the discovery message for Home Assistant
publish_discovery_message(
sensor_id=sensor_id,
unique_id=unique_id,
name=name, # Just the metric name
state_topic=state_topic,
unit_of_measurement=unit_of_measurement,
device_class=device_class if device_class else None,
device=device
)
logger.debug(f"Publishing to {state_topic}: {value}")
# Publish the actual data to the MQTT broker
if mqttc.is_connected():
if value is not None:
msg_info = mqttc.publish(state_topic, value)
msg_info.wait_for_publish()
if msg_info.rc != mqtt.MQTT_ERR_SUCCESS:
logger.debug(f"Failed to publish to {state_topic}. Return code: {msg_info.rc}")
else:
logger.debug(f"Skipping publishing None value to {state_topic}.")
else:
logger.error("MQTT client is not connected. Skipping publish.")
return
next_poll_time = datetime.now()
while True:
current_time = datetime.now()
if current_time >= next_poll_time:
logger.debug('Triggering data poll and publish...')
# Measure the start time of polling and publishing
start_time = datetime.now()
# Poll and publish the data
d_ = poll_tigo()
if d_:
publish_mqtt(d_)
else:
logger.debug("No data to publish.")
# Measure the time taken for polling and publishing
end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds()
# Adjust the next poll time based on the original cycle duration
if processing_time >= poll_interval:
logger.warning(f"Processing time ({processing_time}s) exceeded poll interval. Scheduling next poll immediately.")
next_poll_time = datetime.now() # Start immediately if processing exceeds interval
else:
# Set the next poll time by adding the poll_interval, subtracting the processing time
next_poll_time += timedelta(seconds=(poll_interval - processing_time))
# Calculate dynamic sleep time based on remaining time until next poll
sleep_time = max(0.5, (next_poll_time - datetime.now()).total_seconds())
time.sleep(sleep_time)