Commit 7998bb07 authored by Mike Grüne's avatar Mike Grüne

Fix Publisher Subscriber

parent 20e51258
Pipeline #162017 passed with stages
in 3 minutes and 3 seconds
......@@ -4,14 +4,14 @@
<@m.mwIdent/>Adapter_${model.getEscapedCompName()}::<@m.mwIdent/>Adapter_${model.getEscapedCompName()}()
{
}
void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::init(${model.getEscapedCompName()} *comp)
{
// Initialize component
this->component = comp;
// Initialize connect options
connect_options connOpts;
connOpts.set_keep_alive_interval(20);
......@@ -19,41 +19,41 @@ void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::init(${model.getEscaped
// Intitialize callbacks, subscribers and publishers
<#list model.getIncomingPorts() as sub>
_sub_${sub.getName()} = new client(SERVER_ADDRESS, ${sub.getName()});
_callback_${sub.getName()} = new Callback(*_sub_${sub.getName()}, comp);
_sub_${sub.getName()} = new client(SERVER_ADDRESS, ${sub.getName()});
_callback_${sub.getName()} = new Callback(*_sub_${sub.getName()}, comp, ${sub.getName()});
</#list>
<#list model.getOutgoingPorts() as pub>
_pub_${pub.getName()} = new client(SERVER_ADDRESS, ${pub.getName()});
_pub_${pub.getName()} = new client(SERVER_ADDRESS, ${pub.getName()});
</#list>
// Connect subscribers, publishers and subscribe to the topics
try {
<#list model.getIncomingPorts() as sub>
_sub_${sub.getName()}->set_callback(*_callback_${sub.getName()});
_sub_${sub.getName()}->set_callback(*_callback_${sub.getName()});
_sub_${sub.getName()}->connect(connOpts);
_sub_${sub.getName()}->subscribe("${model.getTopic(sub)}", 1)
_sub_${sub.getName()}->subscribe("${model.getTopic(sub)}", 1);
</#list>
<#list model.getOutgoingPorts() as pub>
_pub_${pub.getName()}->connect(connOpts);
_pub_${pub.getName()}->connect(connOpts);
</#list>
} catch (const mqtt::exception& exc) {
cerr << exc.what() << endl;
}
}
<#list model.getOutgoingPorts() as pub>
void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::publish_echo_${pub.getName()}()
{
string value = to_string(component->mqttOut);
string value = to_string(component->${pub.getName()});
auto pubmsg = make_message("${model.getTopic(pub)}", value);
try {
_pub_${pub.getName()}->publish(pubmsg);
}
}
catch (const exception& exc) {
cerr << exc.to_string() << endl;
}
......
......@@ -2,15 +2,16 @@
#include "Callback.hpp"
Callback::Callback(client& cli, ${model.getEscapedCompName()}* comp) : callback(), cli_(cli)
Callback::Callback(client& cli, ${model.getEscapedCompName()}* comp, EMAPortInstanceSymbol port) : callback(), cli_(cli)
{
comp_ = comp;
port_ = port;
}
// Callback for when connected to broker
void Callback::connected(const string& cause)
{
cout << "Connected" <<endl;
cout << "Connected" <<endl;
}
// Callback for when the connection is lost.
......@@ -27,5 +28,5 @@ void Callback::message_arrived(const_message_ptr msg)
cout << "Message received "<< msg->get_topic() << ": " << msg->get_payload_str() << endl;
string::size_type sz;
double value = std::stod (msg->get_payload_str(),&sz);
comp_->mqttIn = value;
comp_->port_ = value;
}
......@@ -11,18 +11,20 @@ using namespace <@m.smallIdent/>;
class Callback : public virtual callback
{
client& cli_;
public:
Callback(client& cli, ${model.getEscapedCompName()}* comp);
Callback(client& cli, ${model.getEscapedCompName()}* comp, EMAPortInstanceSymbol port);
void connected(const string& cause);
void connection_lost(const string& cause);
void message_arrived(const_message_ptr msg);
private:
${model.getEscapedCompName()}* comp_;
EMAPortInstanceSymbol port_;
};
#endif /* Callback_hpp */
\ No newline at end of file
#endif /* Callback_hpp */
#include "Callback.hpp"
Callback::Callback(client& cli, tests_a_compA* comp) : callback(), cli_(cli)
Callback::Callback(client& cli, tests_a_compA* comp, EMAPortInstanceSymbol port) : callback(), cli_(cli)
{
comp_ = comp;
port_ = port;
}
// Callback for when connected to broker
void Callback::connected(const string& cause)
{
cout << "Connected" <<endl;
cout << "Connected" <<endl;
}
// Callback for when the connection is lost.
......@@ -26,5 +27,5 @@ void Callback::message_arrived(const_message_ptr msg)
cout << "Message received "<< msg->get_topic() << ": " << msg->get_payload_str() << endl;
string::size_type sz;
double value = std::stod (msg->get_payload_str(),&sz);
comp_->mqttIn = value;
comp_->port_ = value;
}
......@@ -10,18 +10,20 @@ using namespace mqtt;
class Callback : public virtual callback
{
client& cli_;
public:
Callback(client& cli, tests_a_compA* comp);
Callback(client& cli, tests_a_compA* comp, EMAPortInstanceSymbol port);
void connected(const string& cause);
void connection_lost(const string& cause);
void message_arrived(const_message_ptr msg);
private:
tests_a_compA* comp_;
EMAPortInstanceSymbol port_;
};
#endif /* Callback_hpp */
\ No newline at end of file
#endif /* Callback_hpp */
......@@ -3,47 +3,47 @@
MqttAdapter_tests_a_compA::MqttAdapter_tests_a_compA()
{
}
void MqttAdapter_tests_a_compA::init(tests_a_compA *comp)
{
// Initialize component
this->component = comp;
// Initialize connect options
connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
// Intitialize callbacks, subscribers and publishers
_sub_portA = new client(SERVER_ADDRESS, portA);
_callback_portA = new Callback(*_sub_portA, comp);
_pub_portC = new client(SERVER_ADDRESS, portC);
_sub_portA = new client(SERVER_ADDRESS, portA);
_callback_portA = new Callback(*_sub_portA, comp, portA);
_pub_portC = new client(SERVER_ADDRESS, portC);
// Connect subscribers, publishers and subscribe to the topics
try {
_sub_portA->set_callback(*_callback_portA);
_sub_portA->set_callback(*_callback_portA);
_sub_portA->connect(connOpts);
_sub_portA->subscribe("/clock", 1)
_pub_portC->connect(connOpts);
_sub_portA->subscribe("/clock", 1);
_pub_portC->connect(connOpts);
} catch (const mqtt::exception& exc) {
cerr << exc.what() << endl;
}
}
void MqttAdapter_tests_a_compA::publish_echo_portC()
{
string value = to_string(component->mqttOut);
string value = to_string(component->portC);
auto pubmsg = make_message("/clock", value);
try {
_pub_portC->publish(pubmsg);
}
}
catch (const exception& exc) {
cerr << exc.to_string() << endl;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment