Commit 390dab5d authored by Sonja Happ's avatar Sonja Happ
Browse files

WIP: update villasnode, use c++ interface

parent e7ab2cb8
......@@ -22,10 +22,15 @@
#include <list>
#include "villas_interface/villas_message.h"
#ifdef WITH_VILLAS
#include "villas/node.hpp"
#include "villas/pool.hpp"
#endif
//forward declarations
struct vnode;
struct vnode_type;
struct pool;
//class Node;
//struct vnode_type;
//struct pool;
class IO_object;
......@@ -75,12 +80,14 @@ struct nanomsg_data{
/*! \brief Struct that holds general info about the villas node */
struct villas_node_config{
std::string type_name;
std::string format_name;
std::string type_name; // TODO remove (part of configfile)
std::string format_name; // TODO remove (part of configfile)
std::string loglevel;
bool with_node;
double stop_at; // used to synchronize end of simulation in cosimulation
union node_type_config{
std::string configfile;
union node_type_config{ // TODO remove (replaced by configfile)
mqtt_data *mqtt_conf;
nanomsg_data *nanomsg_conf;
} type_config;
......@@ -100,7 +107,7 @@ public:
int start();
int stop();
//methods to init/destroy the villas node
int destroy();
//int destroy();
//methods to control villas node type (used by model ONLY)
int start_node_type();
......@@ -108,12 +115,17 @@ public:
private:
/*VillasNode struct*/
struct vnode *n;
struct vnode_type *type;
/*VillasNode*/
villas::node::Node *n;
//struct vnode_type *type;
std::string type;
//Memory pool
struct pool *p;
villas::node::Pool *p;
// Json config of the node
json_t *conf;
// meta infos
std::vector<Meta_infos> meta;
......
......@@ -31,6 +31,7 @@ if(WITH_VILLAS)
set(WITH_HOOKS OFF CACHE BOOL "disable hooks of libvillas")
set(WITH_WEB OFF CACHE BOOL "disable web of libvillas")
set(WITH_API OFF CACHE BOOL "disable api of libvillas")
set(WITH_GO OFF CACHE BOOL "disable go functions of villasnode")
set(NO_EVENTFD 1 CACHE BOOL "do not use event file descriptors in libvillas")
add_subdirectory(villasnode)
endif()
......
Subproject commit e0242dc544a3138e56cb550e5c9b9d12ae979593
Subproject commit 43965e3584deadf4cf46418b3034e0a94a854052
......@@ -160,10 +160,15 @@ cassandra.iothreads = 10
########################################
# parameters for villas usage
villas.loglevel = debug
villas.configfile = villas-mqtt.json
#villas.configfile = villas_nanomsg_dpsim_cosim.json
# supported node types: mqtt, nanomsg
villas.nodetype = mqtt
villas.format = json
villas.loglevel = debug
# MQTT specific parameters
villas.mqtt.broker = localhost
......
......@@ -48,6 +48,8 @@ Agent_behavior::Agent_behavior(int _id, int _type, std::string _subtype, double&
* \brief Destroy the Agent_behavior::Agent_behavior object
*/
Agent_behavior::~Agent_behavior() {
delete villas_interface;
}
/*! \brief update time (to be called after every time step)
......@@ -161,7 +163,7 @@ int Agent_behavior::destroy_villas_interface() {
IO->log_info("Destroying villas interface");
try {
villas_interface->stop();
villas_interface->destroy();
//villas_interface->destroy(); // Destruction happens in destructor
} catch (std::runtime_error &ex) {
IO->log_info("Caught exception: " + std::string(ex.what()));
return -1;
......
......@@ -26,9 +26,9 @@
#ifdef WITH_VILLAS
#include "villas/nodes/mqtt.hpp"
#include "villas/nodes/nanomsg.hpp"
#include "villas/node.h"
#include "villas/format.hpp"
#include "villas/signal.h"
#include "villas/signal.hpp"
#include "villas/sample.hpp"
#include "villas/log.hpp"
#endif
......@@ -70,50 +70,27 @@ Villas_interface::Villas_interface(villas_node_config *_config, IO_object *IO_ob
}
// TODO extract node type and save as member variable 'type'
//lookup node type
IO->log_info("Villas_interface: Node type lookup for node " + _name + " Type: " + _config->type_name + ".");
type = node_type_lookup(_config->type_name);
/*type = node_type_lookup(_config->type_name);
if(type== nullptr){
IO->log_info("Villas_interface: ERROR: something went wrong in node type lookup");
throw std::runtime_error("Villas_interface: node_type_lookup failed for type " + _config->type_name);
}
}*/
if(with_node) {
int ret; //for return values of villas functions
//set several states of node internal objects to STATE_DESTROYED
n = (struct vnode*) malloc(sizeof(struct vnode));
n->state = State::DESTROYED;
n->in.state = State::DESTROYED;
n->out.state = State::DESTROYED;
n->in.signals.state = State::DESTROYED;
n->out.signals.state = State::DESTROYED;
//Init memory pool for this node (used to get memory for sample in send_message)
p = (struct pool*) malloc(sizeof(struct pool));
p->state = State::DESTROYED;
p->queue.state = State::DESTROYED;
//Allocate 1MB of pool memory for sent and received messages
//200 kiByte = 204800
//1 MiB = 1048576
size_t blocksize=SAMPLE_LENGTH(meta.size()); //sizeof(struct sample) + meta.size()* sizeof(double);
IO->log_info("Villas_interface: Memory pool allocation for " + std::to_string(meta.size())+ " signals and blocksize=" + std::to_string(blocksize));
ret = pool_init(p, 1024, blocksize, &(memory_heap));
if (ret) {
throw std::runtime_error("Villas_interface: pool_init failed for node " + _name +
" and node type " + std::string(type->name) + " with return value " + std::to_string(ret));
}
// generate json config for node (incl. type and name!)
// init VILLASnode based on config from above
IO->log_info("Villas_interface: node_init");
ret = node_init(n, type);
if (ret) {
throw std::runtime_error("Villas_interface: node_init failed for node " + _name +
" and node type " + std::string(type->name) + " with return value " + std::to_string(ret));
}
// TODO parse a json config of the node here, generate json config first
uuid_t supernode_uuid; // TODO set value
//parse configuration of node
/* // TODO this is the node configuration from before, create json from this depending on node type
* //parse configuration of node
IO->log_info("Villas_interface: parsing node configuration");
n->name = strdup(_name.c_str());
......@@ -129,7 +106,7 @@ Villas_interface::Villas_interface(villas_node_config *_config, IO_object *IO_ob
auto *m = (mqtt *) n->_vd;
//set broker
m->host = strdup(_config->type_config.mqtt_conf->broker.c_str()) ;
//m->host = strdup(_config->type_config.mqtt_conf->broker.c_str()) ;
//set publish topic (if any)
if(!_config->type_config.mqtt_conf->publish.empty()){
......@@ -235,29 +212,59 @@ Villas_interface::Villas_interface(villas_node_config *_config, IO_object *IO_ob
vlist_push(&(n->out.signals), sig_out);
}
*
*
* */
// create the node
n = villas::node::NodeFactory::make(conf, supernode_uuid);
if (!n){
IO->log_info("Villas_interface: ERROR: something went wrong creating a node of type " + _config->type_name);
throw std::runtime_error("Villas_interface: NodeFactory::make failed for type " + _config->type_name);
}
/* TODO do we still need this??
// set state to parsed to be able to continue with this node
n->state = State::PARSED;
// set in and out node directions to PARSED
n->in.state = State::PARSED;
n->out.state = State::PARSED;
*/
/*TODO do we still need the memory pool???*/
//Allocate 1MB of pool memory for sent and received messages
//200 kiByte = 204800
//1 MiB = 1048576
size_t blocksize=sizeof(struct villas::node::Sample) + meta.size() * sizeof(double);
IO->log_info("Villas_interface: Memory pool allocation for " + std::to_string(meta.size())+ " signals and blocksize=" + std::to_string(blocksize));
//check node
IO->log_info("Villas_interface: node_check");
ret = node_check(n);
ret = pool_init(p, 1024, blocksize, &(villas::node::memory::heap));
if (ret) {
throw std::runtime_error("Villas_interface: pool_init failed for node " + _name +
" with return value " + std::to_string(ret));
}
// check VILLASnode
IO->log_info("Villas_interface: check node");
ret = n->check();
if (ret) {
throw std::runtime_error("Villas_interface: node_check failed for node " + _name +
" and node type " + std::string(type->name) + " with return value " + std::to_string(ret));
" with return value " + std::to_string(ret));
}
//prepare node
IO->log_info("Villas_interface: node_prepare");
ret = node_prepare(n);
// prepare VILLASnode
IO->log_info("Villas_interface: prepare node");
ret = n->prepare();
if (ret) {
throw std::runtime_error("Villas_interface: node_prepare failed for node " + _name +
" and node type " + std::string(type->name) + " with return value " + std::to_string(ret));
throw std::runtime_error("Villas_interface: node_init failed for node " + _name +
" with return value " + std::to_string(ret));
}
} else{
n = villas::node::NodeFactory::make(type);
IO->log_info("This instance of Villas_interface is used without node");
}
......@@ -280,11 +287,12 @@ Villas_interface::Villas_interface(villas_node_config *_config, IO_object *IO_ob
Villas_interface::~Villas_interface() {
#ifdef WITH_VILLAS
if(with_node) {
delete n; // delete dynamically created VILLASnode
int ret;
ret = pool_destroy(p);
if (ret){
IO->log_info("Villas_interface::Destructor pool_destroy failed for node " + std::string(n->name) +
" and node type " + std::string(type->name) + " with return value " + std::to_string(ret));
IO->log_info("Villas_interface::Destructor pool_destroy failed for node " + std::string(n->getName()) +
" with return value " + std::to_string(ret));
}
}
#endif
......@@ -297,12 +305,12 @@ Villas_interface::~Villas_interface() {
int Villas_interface::start() {
#ifdef WITH_VILLAS
int ret =0; //for return values of villas functions
IO->log_info("Villas_interface::start: starting villas node " + std::string(n->name));
IO->log_info("Villas_interface::start: starting villas node " + std::string(n->getName()));
if(with_node) {
ret = node_start(n);
ret = n->start();
if (ret) {
IO->log_info("Villas_interface::start: node_start failed for node " + std::string(n->name) +
" and node type " + std::string(type->name) + " ; ret="+std::to_string(ret));
IO->log_info("Villas_interface::start: node_start failed for node " + std::string(n->getName()) +
" ; ret="+std::to_string(ret));
}
}
......@@ -324,12 +332,11 @@ int Villas_interface::start() {
int Villas_interface::stop() {
#ifdef WITH_VILLAS
int ret =0; //for return values of villas functions
IO->log_info("Villas_interface::stop: stopping villas node " + std::string(n->name));
IO->log_info("Villas_interface::stop: stopping villas node " + std::string(n->getName()));
if(with_node) {
ret = node_stop(n);
ret = n->stop();
if (ret) {
IO->log_info("Villas_interface::stop: node_stop failed for node " + std::string(n->name) +
" and node type " + std::string(type->name));
IO->log_info("Villas_interface::stop: node_stop failed for node " + std::string(n->getName()));
}
}
else{
......@@ -343,29 +350,6 @@ int Villas_interface::stop() {
#endif
}
/*! \brief Destroy a Villas_interface
*
* */
int Villas_interface::destroy() {
#ifdef WITH_VILLAS
int ret =0; //for return values of villas functions
IO->log_info("Villas_interface::destroy: destroying villas node " + std::string(n->name));
if(with_node) {
ret = node_destroy(n);
if (ret) {
IO->log_info("Villas_interface::destroy: node_destroy failed for node " + std::string(n->name) +
" and node type " + std::string(type->name));
}
}
else{
IO->log_info("Villas_interface::destroy: Cannot destroy() villas node because this instance "
"of Villas_interface is used without node" );
}
return ret;
#else
return -1;
#endif
}
/*! \brief Send a message via a Villas_interface
*
......@@ -377,14 +361,14 @@ void Villas_interface::send_message(Villas_message &msg) {
//int ret = 0;
int allocated;
struct sample * new_sample[n->out.vectorize];
struct villas::node::Sample * new_sample[n->out.vectorize];
allocated = sample_alloc_many(p, new_sample, (int) n->out.vectorize);
//fill sample
IO->log_info("Villas_interface::send_message: sending message of length: " + std::to_string(msg.length));
new_sample[0]->length = msg.length;
new_sample[0]->sequence = number_of_sent_messages;
new_sample[0]->signals = &(n->out.signals);
new_sample[0]->signals = n->getOutputSignals();
for(unsigned int k = 0; k< msg.length; k++){
if(meta[k].type == VILLAS_DATA_TYPE_INT64){
//IO->log_info("Villas_interface::send_message: Filling int " +
......@@ -415,11 +399,12 @@ void Villas_interface::send_message(Villas_message &msg) {
//set flags of sample
new_sample[0]->flags = 0;
new_sample[0]->flags |= (unsigned int) SampleFlags::HAS_DATA;
new_sample[0]->flags |= (unsigned int) villas::node::SampleFlags::HAS_DATA;
// send sample
int release=allocated;
int sent = node_write(n, new_sample, n->out.vectorize);
int sent = n->write(new_sample, n->out.vectorize);
IO->log_info("Villas_interface::send_message: node_write(...) completed, sent " + std::to_string(sent) + " samples");
if (sent < 0) {
IO->log_info("Villas_interface::send_message: Failed to sent sample, reason=" + std::to_string(sent));
......@@ -445,7 +430,7 @@ void Villas_interface::allocate_and_receive(int number_of_messages, std::list<Vi
if(number_of_messages > 0) {
int allocated;
int release_number_of_messages = number_of_messages;
struct sample *new_sample[number_of_messages];
struct villas::node::Sample *new_sample[number_of_messages];
allocated = sample_alloc_many(p, new_sample, number_of_messages);
int release = allocated;
......@@ -453,8 +438,8 @@ void Villas_interface::allocate_and_receive(int number_of_messages, std::list<Vi
// As nanomsg has no built-in queue, it is emulated by reading the socket for multiple times and
// seting the number_of_messages based on node_read()'s actual return values.
if (std::string(type->name) == "nanomsg"){
struct sample *temp_sample[1]; // Temporary sample
if (std::string(type) == "nanomsg"){
struct villas::node::Sample *temp_sample[1]; // Temporary sample
int allocated_temp = sample_alloc_many(p, temp_sample, 1);
int received = 0;
......@@ -462,7 +447,7 @@ void Villas_interface::allocate_and_receive(int number_of_messages, std::list<Vi
for (int i = 0; i < number_of_messages; ++i){
int recv = node_read(n, temp_sample, 1);
int recv = n->read(temp_sample, 1);
if (recv > 0){
sample_copy(new_sample[index], temp_sample[0]);
received++;
......@@ -486,7 +471,7 @@ void Villas_interface::allocate_and_receive(int number_of_messages, std::list<Vi
}
else {
//receive sample
int recv = node_read(n, new_sample, number_of_messages);
int recv = n->read(new_sample, number_of_messages);
IO->log_info("Villas_interface::receive_messages: node_read(...) completed, received "
+ std::to_string(recv) + " samples");
......@@ -553,11 +538,17 @@ void Villas_interface::receive_messages(std::list<Villas_message> &messages) {
//receive all messages which are in incoming queue of villas node
messages.clear();
int number_of_messages;
int number_of_messages = 100; // TODO this parameter should be configurable!
allocate_and_receive(number_of_messages, messages);
// 128kB Memory / 8 Byte per message = 16000 messages
// TODO: Case message queue is no longer accessible for MQTT through VILLASnode C++ interface
// Check which villas node type is used and create corresponding struct
if(std::string(type->name) == "mqtt") {
/*
if(std::string(type) == "mqtt") {
auto * m = (struct mqtt*) n->_vd;
//determine number of messages in the queue
unsigned long queue_tail = m->queue.queue.tail;
unsigned long queue_head = m->queue.queue.head;
......@@ -573,6 +564,7 @@ void Villas_interface::receive_messages(std::list<Villas_message> &messages) {
else if(std::string(type->name) == "nanomsg") {
allocate_and_receive(1000, messages); // 128kB Memory / 8 Byte per message = 16000 messages
}
*/
}
else {
IO->log_info("Villas_interface::receive_messages: Cannot use receive() for villas node"
......@@ -590,19 +582,28 @@ void Villas_interface::receive_messages(std::list<Villas_message> &messages) {
* */
int Villas_interface::start_node_type() {
#ifdef WITH_VILLAS
int ret;
int ret = 0;
//init memory subsystem in each process
ret = memory_init(0);
ret = villas::node::memory::init(0);
if(ret){
IO->log_info("Villas_interface::start_node_type: memory_init failed for type " + std::string(type->name));
IO->log_info("Villas_interface::start_node_type: memory init failed for type " + type);
}
//start node type
IO->log_info("Villas_interface::start_node_type: Starting villas node type " + std::string(type->name));
ret = node_type_start(type, nullptr);
if(ret){
IO->log_info("Villas_interface::start_node_type: node_type_start failed for type " + std::string(type->name));
IO->log_info("Villas_interface: Starting villas node type " + type);
auto *nf = n->getFactory();
if (nf->getState() == State::STARTED) {
IO->log_info("Villas_interface: Villas node type " + type + " already started");
} else {
// nullptr in the following as argument will not work for all node types!
// parameter expects pointer to VILLAS supernode
ret = nf->start(nullptr);
if(ret){
IO->log_info("Villas_interface: Starting node type failed for " + type);
}
}
return ret;
......@@ -617,12 +618,19 @@ int Villas_interface::start_node_type() {
* */
int Villas_interface::stop_node_type() {
#ifdef WITH_VILLAS
int ret;
int ret = 0;
//stop node type
IO->log_info("Villas_interface::stop_node_type: Stopping villas node type " + std::string(type->name));
ret = node_type_stop(type);
IO->log_info("Villas_interface: Stopping villas node type " + type);
auto *nf = n->getFactory();
if (nf->getState() != State::STARTED) {
IO->log_info("Villas_interface: Villas node type " + type + " not started");
}
ret = nf->stop();
if(ret){
IO->log_info("Villas_interface::stop_node_type: node_type_stop failed for type " + std::string(type->name));
IO->log_info("Villas_interface: Stopping node type failed for " + type);
}
return ret;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment