Commit 14a35f4b authored by Georg Vinogradov's avatar Georg Vinogradov

Add dynamic pub/sub generation

parent d665c72f
Pipeline #159882 passed with stages
in 3 minutes and 19 seconds
......@@ -24,6 +24,8 @@ public class GeneratorMqtt
// Create and fill model
MqttAdapterModel model = new MqttAdapterModel(component.getFullName());
model.addPorts(component.getPortInstanceList());
//Generate files and write to project
contents.add(MqttTemplates.generateMqttAdapterH(model));
files.add(new File("./target/generated-sources/MqttAdapter_"+model.getEscapedCompName()+".h"));
......
......@@ -6,6 +6,7 @@ import de.monticore.lang.embeddedmontiarc.tagging.middleware.mqtt.MqttConnection
import de.monticore.lang.embeddedmontiarc.tagging.middleware.mqtt.MqttConnectionSymbol.MqttConnectionKind;
import java.util.*;
import java.util.stream.Collectors;
// Used to fill .ftl files
......@@ -13,6 +14,8 @@ public class MqttAdapterModel {
private String compName;
private List<String> ports = new ArrayList<>();
private List<EMAPortInstanceSymbol> incoming = new ArrayList<>();
private List<EMAPortInstanceSymbol> outgoing = new ArrayList<>();
public MqttAdapterModel(String compName)
{
......@@ -32,6 +35,37 @@ public class MqttAdapterModel {
.replace(']', '_');
}
public List<EMAPortInstanceSymbol> getIncomingPorts()
{
return incoming;
}
public List<EMAPortInstanceSymbol> getOutgoingPorts()
{
return outgoing;
}
public void addPorts(Collection<EMAPortInstanceSymbol> ports)
{
incoming.addAll(ports);
incoming = incoming.stream().filter(fc -> fc.isMqttPort()).filter(fc -> fc.isIncoming()).collect(Collectors.toList());
outgoing.addAll(ports);
outgoing = outgoing.stream().filter(fc -> fc.isMqttPort()).filter(fc -> fc.isOutgoing()).collect(Collectors.toList());
}
public String getTopic(EMAPortInstanceSymbol port)
{
Optional<MiddlewareSymbol> symbol = port.getMiddlewareSymbol();
if(symbol.isPresent() && symbol.get().isKindOf(MqttConnectionKind.INSTANCE))
{
MqttConnectionSymbol sym = (MqttConnectionSymbol) symbol.get();
String topicName = sym.getTopicName().isPresent()?sym.getTopicName().get():"unknown";
return topicName;
}
return "";
}
// Parse through component to find information about its ports
public void addPortsDesc(Collection<EMAPortInstanceSymbol> ports)
{
......
......@@ -17,17 +17,26 @@ void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::init(${model.getEscaped
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
// Intitialize callback, subscriber and publisher
_clockSubscriber = new client(SERVER_ADDRESS, SUB_ID);
_callback = new Callback(*_clockSubscriber, comp);
_echoPublisher = new client(SERVER_ADDRESS, PUB_ID);
// Intitialize callbacks, subscribers and publishers
<#list model.getIncomingPorts() as sub>
_sub_${sub.getName()} = new client(SERVER_ADDRESS, ${sub.getName()});
_callback_${sub.getName()} = new Callback(*_sub_${sub.getName()}, comp);
</#list>
// Connect subscriber, publisher and subscribe to the topic
<#list model.getOutgoingPorts() as pub>
_pub_${pub.getName()} = new client(SERVER_ADDRESS, ${pub.getName()});
</#list>
// Connect subscribers, publishers and subscribe to the topics
try {
_clockSubscriber->set_callback(*_callback);
_clockSubscriber->connect(connOpts);
_echoPublisher->connect(connOpts);
_clockSubscriber->subscribe(TOPIC, 1);
<#list model.getIncomingPorts() as sub>
_sub_${sub.getName()}->set_callback(*_callback_${sub.getName()});
_sub_${sub.getName()}->connect(connOpts);
_sub_${sub.getName()}->subscribe("${model.getTopic(sub)}", 1)
</#list>
<#list model.getOutgoingPorts() as pub>
_pub_${pub.getName()}->connect(connOpts);
</#list>
} catch (const mqtt::exception& exc) {
cerr << exc.what() << endl;
......@@ -35,21 +44,26 @@ void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::init(${model.getEscaped
}
void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::publish_echoPublisher()
<#list model.getOutgoingPorts() as pub>
void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::publish_echo_${pub.getName()}()
{
string value = to_string(component->rosOut);
auto pubmsg = make_message(TOPIC, value);
try {
_echoPublisher->publish(pubmsg);
} catch (const exception& exc) {
cerr << exc.to_string() << endl;
}
string value = to_string(component->mqttOut);
auto pubmsg = make_message("${model.getTopic(pub)}", value);
try {
_pub_${pub.getName()}->publish(pubmsg);
}
catch (const exception& exc) {
cerr << exc.to_string() << endl;
}
}
</#list>
void <@m.mwIdent/>Adapter_${model.getEscapedCompName()}::tick()
{
publish_echoPublisher();
<#list model.getOutgoingPorts() as pub>
publish_echo_${pub.getName()}();
</#list>
}
......@@ -16,7 +16,10 @@ public:
void init(${model.getEscapedCompName()}* comp);
void publish_echoPublisher();
<#list model.getOutgoingPorts() as pub>
void publish_echo_${pub.getName()}();
</#list>
void tick();
......@@ -28,8 +31,13 @@ private:
${model.getEscapedCompName()}* component = nullptr;
// Callbacks, Subscribers/Publishers
Callback* _callback = nullptr;
client* _clockSubscriber = nullptr;
client* _echoPublisher = nullptr;
// Callbacks, subscribers
<#list model.getIncomingPorts() as sub>
Callback* _callback_${sub.getName()} = nullptr;
client* _sub_${sub.getName()} = nullptr;
</#list>
// Publishers
<#list model.getOutgoingPorts() as pub>
client* _pub_${pub.getName()} = nullptr;
</#list>
};
......@@ -27,5 +27,5 @@ void Callback::message_arrived(const_message_ptr msg)
cout << "Message received "<< msg->get_topic() << ": " << msg->get_payload_str() << endl;
string::size_type sz;
double value = std::stod (msg->get_payload_str(),&sz);
comp_->rosIn = value;
comp_->mqttIn = value;
}
......@@ -32,6 +32,7 @@ public class AdapterGenerationTest extends AbstractSymtabTest {
// Connect component's ports to topics
componentInstanceSymbol.getPortInstance("portA").orElse(null).setMiddlewareSymbol(new MqttConnectionSymbol("/clock"));
componentInstanceSymbol.getPortInstance("portC").orElse(null).setMiddlewareSymbol(new MqttConnectionSymbol("/clock"));
List<File> files = generatorMqtt.generateMqttAdapter(componentInstanceSymbol);
......
......@@ -26,5 +26,5 @@ void Callback::message_arrived(const_message_ptr msg)
cout << "Message received "<< msg->get_topic() << ": " << msg->get_payload_str() << endl;
string::size_type sz;
double value = std::stod (msg->get_payload_str(),&sz);
comp_->rosIn = value;
comp_->mqttIn = value;
}
......@@ -16,17 +16,18 @@ void MqttAdapter_tests_a_compA::init(tests_a_compA *comp)
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
// Intitialize callback, subscriber and publisher
_clockSubscriber = new client(SERVER_ADDRESS, SUB_ID);
_callback = new Callback(*_clockSubscriber, comp);
_echoPublisher = new client(SERVER_ADDRESS, PUB_ID);
// Intitialize callbacks, subscribers and publishers
_sub_portA = new client(SERVER_ADDRESS, portA);
_callback_portA = new Callback(*_sub_portA, comp);
// Connect subscriber, publisher and subscribe to the topic
_pub_portC = new client(SERVER_ADDRESS, portC);
// Connect subscribers, publishers and subscribe to the topics
try {
_clockSubscriber->set_callback(*_callback);
_clockSubscriber->connect(connOpts);
_echoPublisher->connect(connOpts);
_clockSubscriber->subscribe(TOPIC, 1);
_sub_portA->set_callback(*_callback_portA);
_sub_portA->connect(connOpts);
_sub_portA->subscribe("/clock", 1)
_pub_portC->connect(connOpts);
} catch (const mqtt::exception& exc) {
cerr << exc.what() << endl;
......@@ -34,21 +35,22 @@ void MqttAdapter_tests_a_compA::init(tests_a_compA *comp)
}
void MqttAdapter_tests_a_compA::publish_echoPublisher()
void MqttAdapter_tests_a_compA::publish_echo_portC()
{
string value = to_string(component->rosOut);
auto pubmsg = make_message(TOPIC, value);
try {
_echoPublisher->publish(pubmsg);
} catch (const exception& exc) {
cerr << exc.to_string() << endl;
}
string value = to_string(component->mqttOut);
auto pubmsg = make_message("/clock", value);
try {
_pub_portC->publish(pubmsg);
}
catch (const exception& exc) {
cerr << exc.to_string() << endl;
}
}
void MqttAdapter_tests_a_compA::tick()
{
publish_echoPublisher();
publish_echo_portC();
}
......@@ -16,7 +16,8 @@ public:
void init(tests_a_compA* comp);
void publish_echoPublisher();
void publish_echo_portC();
void tick();
......@@ -28,8 +29,9 @@ private:
tests_a_compA* component = nullptr;
// Callbacks, Subscribers/Publishers
Callback* _callback = nullptr;
client* _clockSubscriber = nullptr;
client* _echoPublisher = nullptr;
// Callbacks, subscribers
Callback* _callback_portA = nullptr;
client* _sub_portA = nullptr;
// Publishers
client* _pub_portC = nullptr;
};
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment