-
Notifications
You must be signed in to change notification settings - Fork 1
/
MQTTDataModule.cpp
135 lines (112 loc) · 3.14 KB
/
MQTTDataModule.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* @file MQTTDataModule.cpp
* @Author BeeeOn team - Peter Tisovcik <[email protected]>
* @date September, 2016
*/
#include <ctime>
#include <Poco/Dynamic/Var.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Parser.h>
#include "MQTTDataModule.h"
#include "MQTTDataParser.h"
#include "main.h"
#include "utils.h"
using namespace std;
using Poco::AutoPtr;
MQTTDataModule::MQTTDataModule(IOTMessage msg, shared_ptr<Aggregator> agg) :
ModuleADT(agg, "MQTTDataModule", MOD_MQTT_DATA, msg),
m_msg(msg)
{
}
/**
* Main method of this thread.
*
* Naturally end when quit_global_flag is set to true,
* so the Adaapp is terminating.
*/
void MQTTDataModule::threadFunction()
{
log.information("Starting MQTTDataModule thread...");
while(!quit_global_flag) {
sleep(1);
}
}
void MQTTDataModule::sendConfigCmd(std::string msg)
{
agg->sendFromMQTTDataModule(msg, MQTT_DATA_MODULE_FROM_ADAAPP);
}
void MQTTDataModule::msgToMQTTDataModule(string msg_text)
{
MQTTDataParser msgParser;
IOTMessage iot = msgParser.parseMessage(msg_text);
iot.time = time(NULL);
iot.adapter_id = m_msg.adapter_id;
if (!iot.valid) {
log.error("Invalid msg for MQTTDataModule", __FILE__, __LINE__);
return;
}
if (iot.state == "data")
agg->sendData(iot);
else
log.error("Unknown message state for MQTTDataModule", __FILE__, __LINE__);
}
void MQTTDataModule::parseCmdFromServer(const Command& cmd)
{
if (cmd.state == "listen") {
log.debug("Send sensor pairing state command.");
sendConfigCmd(createPairingMessage());
}
else if (cmd.state == "clean") {
log.debug("Clean command to delete sensor.");
sendConfigCmd(createDeleteMessage(cmd.euid));
}
else if (cmd.state == "set") {
log.debug("Set actuator value(s) comand");
sendConfigCmd(createSetActuatorsMessage(cmd));
}
else if (cmd.state == "error") {
log.error("Error state in command -> Nothing to send via MQTT",
__FILE__, __LINE__);
}
else {
log.error("Unknown state of incomming command from server! Command state = "
+ cmd.state, __FILE__, __LINE__);
}
}
string MQTTDataModule::createPairingMessage() const
{
Poco::JSON::Object jsonMsg;
jsonMsg.set("state", "listen");
Poco::Dynamic::Var jsonVar = jsonMsg;
return jsonVar.convert<std::string>();
}
string MQTTDataModule::createDeleteMessage(const euid_t& euid) const
{
Poco::JSON::Object jsonMsg;
Poco::JSON::Object data;
data.set("euid", euid);
jsonMsg.set("device", data);
jsonMsg.set("state", "clean");
Poco::Dynamic::Var jsonVar = jsonMsg;
return jsonVar.convert<std::string>();
}
string MQTTDataModule::createSetActuatorsMessage(const Command& cmd) const
{
Poco::JSON::Object jsonMsg;
Poco::JSON::Object sensorValues;
Poco::JSON::Object::Ptr data;
Poco::JSON::Array arrayPtr;
jsonMsg.set("state", "set");
jsonMsg.set("device_id", std::to_string(cmd.device_id));
jsonMsg.set("euid", std::to_string(cmd.euid));
size_t i = 0;
for (auto item : cmd.values) {
data = new Poco::JSON::Object();
data->set("module_id", to_string(item.first));
data->set("value", to_string(item.second));
arrayPtr.set(i++, data);
jsonMsg.set("data", arrayPtr);
}
Poco::Dynamic::Var jsonVar = jsonMsg;
return jsonVar.convert<std::string>();
}