Commit 8633228b authored by Leander Schulten's avatar Leander Schulten

Modules: Add MQTT support

Merge remote-tracking branch 'origin/wip_need_sanitizer' into controllerInQThread
parent 62e2ce51
Pipeline #272782 passed with stage
in 14 minutes and 17 seconds
......@@ -63,6 +63,7 @@ SOURCES += \
dmx/programmprototype.cpp \
modules/dmxconsumer.cpp \
modules/ledconsumer.cpp \
modules/mqttimpl.cpp \
scanner.cpp \
settingsfilewrapper.cpp \
slideshow.cpp \
......@@ -136,6 +137,8 @@ HEADERS += \
dmx/dmxchannelfilter.h \
modules/controlpoint.hpp \
modules/ledconsumer.h \
modules/mqtt.hpp \
modules/mqttimpl.h \
modules/scanner.hpp \
scanner.h \
settingsfilewrapper.h \
......@@ -290,3 +293,9 @@ linux{
# needed for the SystemVolume class
LIBS += -lasound
}
# Qt MQTT
INCLUDEPATH += $$PWD/lib/qtmqtt/include
LIBS += -L$$PWD/lib/qtmqtt/lib -lQt5Mqtt
......@@ -1036,6 +1036,7 @@ void CodeEditorHelper::compile(){
stream << "#define HAVE_SPOTIFY" << endl;
stream << "#define HAVE_CONTROL_POINT" << endl;
stream << "#define HAVE_ISCANNER" << endl;
stream << "#define HAVE_MQTT" << endl;
switch (module->getType()) {
case Modules::Module::Filter:
stream << "#define HAVE_FILTER" << endl;
......@@ -1057,7 +1058,7 @@ void CodeEditorHelper::compile(){
stream << "using namespace Modules;" << endl;
stream << "using namespace std;" << endl;
stream << "" << endl;
lineCounter += 15;
lineCounter += 16;
stream << externCode << endl;
lineCounter += externCode.count("\n") + 1;
stream << "" << endl;
......
......@@ -27,4 +27,9 @@ cd RtAudio
./build_rtAudio.sh
cd ..
echo $'\n\nUpdate Qt MQTT'
cd qtmqtt
./getAndUpdate.sh
cd ..
echo "Lib installation complete"
#!/bin/bash
set -e
if [[ "$OSTYPE" == "msys" ]]; then # windows
# on windows we have to install perl first :(
if [[ $(cmd "/c perl -v") == *"is not recognized"* ]]; then
echo "########################################"
echo "################## ERROR ###############"
echo "########################################"
echo "Perl is not installed on your system (in cmd). You have to install Perl!"
echo "Visit http://strawberryperl.com and download and install Perl."
exit 3
fi
fi
# first we have to find the Qt dir
if [ ! -z "${QT_DIR}" ]; then # from env variable
QT="${QT_DIR}"
if [ ! -d "$QT" ]; then
echo "The dir saved in env variable QT_DIR does not exists: '$QT'"
echo "Execute 'export QT_DIR=your/path/to/qt' bevor execute this script to set the QT dir."
exit 2
fi
elif [ -d "/c/Qt" ]; then # windows
QT="/c/Qt"
elif [ -d ~/Qt ]; then # mainly mac and linux
QT=~/Qt
elif [ -d "/opt/Qt" ]; then # some stackoverflow answers recommends that
QT="/opt/Qt"
elif [ ! -z "$GITLAB_CI" ]; then
echo "Use Gitlab CI"
else
if [ ! -z "${QT_DIR}" ]; then
QT="${QT_DIR}"
if [ ! -d "$QT" ]; then
echo "The dir saved in env variable QT_DIR does not exists: '$QT'"
exit 2
fi
else
echo "No Qt installation found."
echo "Execute 'export QT_DIR=your/path/to/qt' bevor execute this script to set the QT dir."
exit 1
fi
fi
if [ ! -z "$GITLAB_CI" ]; then # in gitlab CI
LATEST=$(/usr/src/mxe/usr/x86_64-w64-mingw32.shared.posix/qt5/bin/qmake --version | grep -oh '5\.[0-9]\+.[0-9]\+')
else
LATEST=$(ls "$QT" | grep 5. | tail -1)
if [ -z $LATEST ]; then
echo "No installed Qt Version like 5.xx.x found in '$QT'"
echo "Execute 'export QT_DIR=your/path/to/qt' bevor execute this script to set the QT dir."
exit 1
fi
fi
echo "Use QT Version $LATEST"
GIT_DIR="mqtt"
CUR_DIR=$(pwd)
# add or update git
if [ ! -d "$GIT_DIR" ]; then # if folder "GIT_DIR" does not exists
git clone https://github.com/qt/qtmqtt.git "$GIT_DIR"
cd "$GIT_DIR"
else
cd "$GIT_DIR"
if [[ $(git pull) = "Already up to date." ]]; then
# we can skip copying
echo "Already up to date."
exit 0
fi
fi
# we are in the "$GIT_DIR" now.
# get the latest 5.x branch, but we now use the latest installed qt version
# last=$(git branch -a | grep -oh '5\.[0-9]\+.[0-9]\+' | tail -1)
git checkout "$LATEST"
cd ..
# to build there must be no space in th path.
# copy to different dir to change files
TEMP_DIR=$(mktemp) # creates empty file
rm $TEMP_DIR
mkdir $TEMP_DIR
cp -r $GIT_DIR $TEMP_DIR/$GIT_DIR
cd $TEMP_DIR/$GIT_DIR
# we have to change the includes.
for file in ./src/mqtt/*
do
[ -d "$file" ] && continue
echo "Process $file";
sed "s/<QtMqtt\/\([a-z]\+\)\.h>/\"\1.h\"/" "$file" > "temp";
cp "temp" "$file";
done
# build the project
if [ ! -z "$GITLAB_CI" ]; then # in gitlab CI
/usr/src/mxe/usr/x86_64-w64-mingw32.shared.posix/qt5/bin/qmake
else
BUILD=$(ls "$QT/$LATEST/" | grep _64)
"$QT/$LATEST/$BUILD/bin/qmake"
fi
make -j$(getconf _NPROCESSORS_ONLN)
# go back and copy the relevant files
cd "$CUR_DIR"
[ -d include ] && rm -rf include
cp -r $TEMP_DIR/$GIT_DIR/include include
cp $GIT_DIR/src/mqtt/*.h include/QtMqtt
[ -d lib ] && rm -rf lib
cp -r $TEMP_DIR/$GIT_DIR/lib lib
rm -rf $TEMP_DIR
echo "Installation complete"
\ No newline at end of file
......@@ -9,6 +9,7 @@
//#define HAVE_SPOTIFY
//#define HAVE_CONTROL_POINT
//#define HAVE_ISCANNER
//#define HAVE_MQTT
#ifdef HAVE_PROGRAM
#include "program.hpp"
......@@ -44,6 +45,10 @@
#include <functional>
#endif
#ifdef HAVE_MQTT
#include "mqtt.hpp"
#endif
#include <string>
//disable Warning for char * as return type in extern "C" Block with clang
......@@ -60,7 +65,7 @@
#endif
extern "C" {
enum class MODUL_TYPE{Program, LoopProgram,Filter,Consumer,Audio,Spotify,ControlPoint, IScanner};
enum class MODUL_TYPE { Program, LoopProgram, Filter, Consumer, Audio, Spotify, ControlPoint, IScanner, Mqtt };
#ifdef MODULE_LIBRARY
......@@ -110,9 +115,15 @@ MODULE_EXPORT bool have(MODUL_TYPE t){
#endif
case MODUL_TYPE::IScanner:
#ifdef HAVE_ISCANNER
return true;
return true;
#else
return false;
return false;
#endif
case MODUL_TYPE::Mqtt:
#ifdef HAVE_MQTT
return true;
#else
return false;
#endif
}
return false; // Vielleicht wird das enum in einer weiteren Version erweitert.
......@@ -170,26 +181,31 @@ MODULE_EXPORT void _setControlPoint(Modules::ControlPoint const * s){controlPoin
#endif
#ifdef HAVE_ISCANNER
std::function<Modules::IScanner*(const std::string &)> getScannerByNameCallback;
std::function<Modules::IScanner*(const std::string &)> getScannerByNameOrCreateCallback;
MODULE_EXPORT void _setGetScannerByNameCallback(std::function<Modules::IScanner*(const std::string &)> callback){getScannerByNameCallback = callback;}
MODULE_EXPORT void _setGetScannerByNameOrCreateCallback(std::function<Modules::IScanner*(const std::string &)> callback){getScannerByNameOrCreateCallback = callback;}
std::function<Modules::IScanner *(const std::string &)> getScannerByNameCallback;
std::function<Modules::IScanner *(const std::string &)> getScannerByNameOrCreateCallback;
MODULE_EXPORT void _setGetScannerByNameCallback(std::function<Modules::IScanner *(const std::string &)> callback) { getScannerByNameCallback = callback; }
MODULE_EXPORT void _setGetScannerByNameOrCreateCallback(std::function<Modules::IScanner *(const std::string &)> callback) { getScannerByNameOrCreateCallback = callback; }
namespace Scanner {
Modules::IScanner * getByName(const std::string & name){
if(getScannerByNameCallback){
Modules::IScanner *getByName(const std::string &name) {
if (getScannerByNameCallback) {
return getScannerByNameCallback(name);
}
return nullptr;
}
Modules::IScanner * getByNameOrCreate(const std::string & name){
if(getScannerByNameOrCreateCallback){
Modules::IScanner *getByNameOrCreate(const std::string &name) {
if (getScannerByNameOrCreateCallback) {
return getScannerByNameOrCreateCallback(name);
}
return nullptr;
}
}
} // namespace Scanner
#endif
#ifdef HAVE_MQTT
std::function<Modules::detail::IMqttClientImpl *()> createMqttClientCallback;
MODULE_EXPORT void _setCreateMqttClientCallback(std::function<Modules::detail::IMqttClientImpl *()> callback) { createMqttClientCallback = callback; }
Modules::detail::IMqttClientImpl *_createMqttClientImpl() { return createMqttClientCallback ? createMqttClientCallback() : nullptr; }
#endif
}
#endif // MODULES_GLOBAL_H
......@@ -2,6 +2,7 @@
#include "dmxconsumer.h"
#include "ledconsumer.h"
#include "module.h"
#include "mqttimpl.h"
#include "scanner.h"
#include "scanner.hpp"
#include "settings.h"
......@@ -220,24 +221,34 @@ typedef Modules::Program* (*CreateProgramm)(unsigned int index);
func(&controller_.getControlPoint());
}
}
if(f(MODUL_TYPE::IScanner)){
typedef void (*SetGetScannerByNameCallback)(std::function<IScanner*(const std::string &)>);
typedef void (*SetGetScannerByNameOrCreateCallback)(std::function<IScanner*(const std::string &)>);
if (f(MODUL_TYPE::IScanner)) {
typedef void (*SetGetScannerByNameCallback)(std::function<IScanner *(const std::string &)>);
typedef void (*SetGetScannerByNameOrCreateCallback)(std::function<IScanner *(const std::string &)>);
SetGetScannerByNameCallback getFunc = reinterpret_cast<SetGetScannerByNameCallback>(lib.resolve("_setGetScannerByNameCallback"));
SetGetScannerByNameOrCreateCallback getOrCreateFunc = reinterpret_cast<SetGetScannerByNameOrCreateCallback>(lib.resolve("_setGetScannerByNameOrCreateCallback"));
if(getFunc){
getFunc([](const std::string &name){return Scanner::getByName(name);});
}else{
if (getFunc) {
getFunc([](const std::string &name) { return Scanner::getByName(name); });
} else {
qDebug() << "getFunc is null, abort loading lib";
return;
}
if(getOrCreateFunc){
getOrCreateFunc([](const std::string &name){return Scanner::getByNameOrCreate(name);});
}else{
if (getOrCreateFunc) {
getOrCreateFunc([](const std::string &name) { return Scanner::getByNameOrCreate(name); });
} else {
qDebug() << "getOrCreateFunc is null, abort loading lib";
return;
}
}
if (f(MODUL_TYPE::Mqtt)) {
using SetCreateMqttClientCallback = void (*)(std::function<detail::IMqttClientImpl *()>);
SetCreateMqttClientCallback getFunc = reinterpret_cast<SetCreateMqttClientCallback>(lib.resolve("_setCreateMqttClientCallback"));
if (getFunc) {
getFunc([] { return new MqttImpl; });
} else {
qWarning() << "Modules MqttClient: getFunc is null";
return;
}
}
if(f(MODUL_TYPE::Program)){
loadType(lib,programms,"Program",lastLibraryIdentifier,[&](const auto p){
if(replaceOldModulesInProgramBlocks){
......
#ifndef MQTT_HPP
#define MQTT_HPP
#include <functional>
#include <memory>
#include <string>
namespace Modules {
enum class MqttClientStatus { Disconnected, Connecting, Connected, NotAvailible };
namespace detail {
class IMqttClientImpl {
public:
virtual void setLastWillMessage(const std::string &topic, const std::string &message, bool retain) = 0;
virtual void connect(const std::string &host, int port) = 0;
virtual void publishMessage(const std::string &topic, const std::string &message) = 0;
virtual void publishValue(const std::string &topic, const std::string &message) = 0;
virtual void subscribe(const std::string &topic, std::function<void(std::string)> callback) = 0;
virtual MqttClientStatus status() = 0;
virtual ~IMqttClientImpl() {}
};
template <typename Type>
bool convert(const std::string &s, Type &value) noexcept {
static_assert(std::is_integral_v<Type>, "The value must be an integral value like int, float, long, double, ...");
using T = std::make_signed_t<Type>;
try {
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, bool> || std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t>)
value = std::stoi(s);
else if constexpr (std::is_same_v<T, long>)
value = std::stol(s);
else if constexpr (std::is_same_v<T, long long>)
value = std::stoll(s);
else if constexpr (std::is_same_v<T, float>)
value = std::stof(s);
else if constexpr (std::is_same_v<T, double>)
value = std::stod(s);
else if constexpr (std::is_same_v<T, long double>)
value = std::stod(s);
} catch (std::logic_error &) {
return false;
}
}
} // namespace detail
} // namespace Modules
extern "C" {
Modules::detail::IMqttClientImpl *_createMqttClientImpl();
}
namespace Modules {
class MqttClient {
std::unique_ptr<Modules::detail::IMqttClientImpl> impl;
public:
MqttClient() : impl(_createMqttClientImpl()) {
if (!impl) throw std::runtime_error("MQTT Client is nullptr");
}
MqttClient(const std::string &lastWillTopic, const std::string &lastWillMessage, bool retain) : MqttClient() { impl->setLastWillMessage(lastWillTopic, lastWillMessage, retain); }
template <typename T>
MqttClient(const std::string &lastWillTopic, const T &lastWillValue, bool retain) : MqttClient(lastWillTopic, std::to_string(lastWillValue), retain) {}
MqttClient(const std::string &host, int port = 1883) : MqttClient() { impl->connect(host, port); }
void connect(const std::string &host, int port = 1883) { impl->connect(host, port); }
void publishMessage(const std::string &topic, const std::string &message) { impl->publishMessage(topic, message); }
template <typename T>
void publishMessage(const std::string &topic, const T &message) {
static_assert(std::is_arithmetic_v<T>, "T must be a arithmetic type");
impl->publishMessage(topic, std::to_string(message));
}
void publishValue(const std::string &topic, const std::string &message) { impl->publishValue(topic, message); }
template <typename T>
void publishValue(const std::string &topic, const T &message) {
static_assert(std::is_arithmetic_v<T>, "T must be a arithmetic type");
impl->publishValue(topic, std::to_string(message));
}
void subscribe(const std::string &topic, std::function<void(std::string)> callback) { impl->subscribe(topic, callback); }
template <typename T>
void subscribe(const std::string &topic, std::function<void(T)> callback) {
impl->subscribe(topic, [=](std::string s) {
T value;
if (detail::convert(s, value)) callback(value);
});
}
template <typename T>
void subscribe(const std::string &topic, std::function<void(std::string, bool, T)> callback) {
impl->subscribe(topic, [=](std::string s) {
T value{};
bool success = detail::convert(s, value);
callback(s, success, value);
});
}
MqttClientStatus status() { return impl->status(); }
};
using Mqtt = MqttClient;
} // namespace Modules
#endif // MQTT_HPP
#include "mqttimpl.h"
#include "modelmanager.h"
Modules::MqttImpl::MqttImpl() {
// the objects gets created in the application main thread, but must live in the controller thread.
// So we have to create the object in the controller Thread, but setLastWillMessage and connect can already be called in
// the main thread, so we have to delay the execution of these methonds
ModuleManager::singletone()->controller().runInController([this]() {
client = std::make_unique<QMqttClient>();
QObject::connect(client.get(), &QMqttClient::connected, [this] {
for (const auto &i : queried) {
if (client->state() != QMqttClient::Connected) {
// maybe we lose connection. Then we don't want to crash (subscribe adds than to queried)
return;
}
subscribe(i.first, i.second);
}
queried.clear();
});
if (lastWillDelayed) {
client->setWillTopic(std::get<0>(*lastWillDelayed));
client->setWillMessage(std::get<1>(*lastWillDelayed));
client->setWillRetain(std::get<2>(*lastWillDelayed));
}
if (connectDelayed) {
client->setHostname(std::get<0>(*connectDelayed));
client->setPort(std::get<1>(*connectDelayed));
client->connectToHost();
}
});
}
Modules::MqttImpl::~MqttImpl() {
ModuleManager::singletone()->controller().runInController([c = client.release()] { delete c; });
}
void Modules::MqttImpl::setLastWillMessage(const std::string &topic, const std::string &message, bool retain) {
if (client) {
client->setWillTopic(QString::fromStdString(topic));
client->setWillMessage(QByteArray::fromStdString(message));
client->setWillRetain(retain);
} else {
lastWillDelayed.emplace(QString::fromStdString(topic), QByteArray::fromStdString(message), retain);
}
}
void Modules::MqttImpl::connect(const std::string &host, int port) {
if (client) {
client->setPort(port);
client->setHostname(QString::fromStdString(host));
client->connectToHost();
} else {
connectDelayed.emplace(QString::fromStdString(host), port);
}
}
void Modules::MqttImpl::publishMessage(const std::string &topic, const std::string &message) { publish(topic, message, false); }
void Modules::MqttImpl::publishValue(const std::string &topic, const std::string &message) { publish(topic, message, true); }
void Modules::MqttImpl::subscribe(const std::string &topic, std::function<void(std::string)> callback) {
if (!client || client->state() != QMqttClient::Connected) {
// when not yet connected, wait for a connection and add than
queried.emplace_back(topic, callback);
return;
}
auto funcPointer = callback.target_type().name();
if (callbacks.find(std::make_pair(topic, funcPointer)) != callbacks.end()) {
// prevent double registrations of callbacks for the same topic
return;
}
callbacks.emplace(topic, funcPointer);
auto sub = client->subscribe(QString::fromStdString(topic));
QObject::connect(sub, &QMqttSubscription::messageReceived, [=](QMqttMessage m) { callback(m.payload().toStdString()); });
}
Modules::MqttClientStatus Modules::MqttImpl::status() {
switch (client->state()) {
case QMqttClient::Connected: return MqttClientStatus::Connected;
case QMqttClient::Connecting: return MqttClientStatus::Connecting;
case QMqttClient::Disconnected: return MqttClientStatus::Disconnected;
}
}
void Modules::MqttImpl::publish(const std::string &topic, const std::string &message, bool retain) { client->publish(QString::fromStdString(topic), QByteArray::fromStdString(message), 1, retain); }
#ifndef MQTTIMPL_H
#define MQTTIMPL_H
#include "mqtt.hpp"
#include <QtMqtt/QMqttClient>
#include <atomic>
#include <memory>
#include <optional>
#include <set>
#include <tuple>
namespace Modules {
class MqttImpl : public detail::IMqttClientImpl {
std::unique_ptr<QMqttClient> client;
std::optional<std::tuple<QString, QByteArray, bool>> lastWillDelayed;
std::optional<std::tuple<QString, int>> connectDelayed;
std::set<std::pair<std::string, const char *>> callbacks;
std::vector<std::pair<std::string, std::function<void(std::string)>>> queried;
public:
MqttImpl();
~MqttImpl() override;
void setLastWillMessage(const std::string &topic, const std::string &message, bool retain) override;
void connect(const std::string &host, int port) override;
void publishMessage(const std::string &topic, const std::string &message) override;
void publishValue(const std::string &topic, const std::string &message) override;
void subscribe(const std::string &topic, std::function<void(std::string)> callback) override;
MqttClientStatus status() override;
protected:
void publish(const std::string &topic, const std::string &message, bool retain);
};
} // namespace Modules
#endif // MQTTIMPL_H
#include "programblock.h"
#include <QJsonArray>
#include <stdexcept>
#include "modulemanager.h"
#include "json_storage.h"
#include <QJsonArray>
#include <stdexcept>
namespace Modules {
ProgrammBlockVector ProgramBlockManager::model;
ProgrammBlockVector ProgramBlockManager::model;
namespace detail {
namespace detail {
template<typename PointerType>
class SharedPtrComperator{
public:
......@@ -199,8 +197,9 @@ namespace Modules {
bool isRunning = ModuleManager::singletone()->controller().isProgramRunning(this);
programs.erase(it);
programs.insert(newProgram);
if(isRunning)
newProgram->start();
if (isRunning) {
controller->runInController([=] { newProgram->start(); });
}
for(auto & f : filter){
for(auto &con : f.second.targeds){
if(con.second.targed == original.get()){
......@@ -246,21 +245,22 @@ namespace Modules {
void ProgramBlock::replaceConsumer(std::shared_ptr<Consumer> original, std::shared_ptr<Consumer> newConsumer){
if(std::find_if(std::cbegin(consumer),std::cend(consumer),[&](const auto &c){return c.source == original;}) == std::end(consumer))
return;
if(getStatus() == Running)
original->stop();
transferProperties(original.get(),newConsumer.get());
for(auto & f : consumer){
if(f.source == original){
if (getStatus() == Running) {
controller->runInController([=] { original->stop(); });
}
transferProperties(original.get(), newConsumer.get());
for (auto &f : consumer) {
if (f.source == original) {
f.source = newConsumer;
}
}
if(getStatus() == Running)
newConsumer->start();
emit propertyBaseChanged(original.get(),newConsumer.get());
if (getStatus() == Running) {
controller->runInController([=] { newConsumer->start(); });
}
emit propertyBaseChanged(original.get(), newConsumer.get());
}
void ProgramBlock::restart(Controller * c){
void ProgramBlock::restart(Controller *c) {
stop();
start(c);
}
......@@ -453,4 +453,4 @@ namespace Modules {
}
}
}
}
} // namespace Modules
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment