Commit 713e875f authored by Steffen Vogel's avatar Steffen Vogel 🎅🏼

Merge branch 'node-zeromq-bind' into 'master'

Rework of zeromq node

See merge request !58
parents d2af9fed 4ce10cbd
......@@ -45,7 +45,7 @@ logging = {
# One of: "warn", "info", "error", "off", "info"
file = "/var/log/villas-node.log" # File for logs
file = "/tmp/villas-node.log" # File for logs
syslog = true # Log to syslogd
}
......
......@@ -47,11 +47,10 @@ struct sample;
struct zeromq {
int ipv6;
struct format_type *format;
struct io io;
struct {
struct Curve {
int enabled;
struct {
char public_key[41];
......@@ -66,18 +65,13 @@ struct zeromq {
#endif
} pattern;
struct {
struct Dir {
void *socket; /**< ZeroMQ socket. */
void *mon_socket;
char *endpoint;
char *filter;
} in;
struct {
void *socket; /**< ZeroMQ socket. */
struct vlist endpoints;
char *filter;
} out;
int bind, pending;
} in, out;
};
/** @see node_type::print */
......
......@@ -133,7 +133,7 @@ void Server::stop()
ret = close(sd);
if (ret)
throw SystemError("Failed to close API socket");;
throw SystemError("Failed to close API socket");
state = State::STOPPED;
}
......
......@@ -155,6 +155,7 @@ int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, s
struct sample *smp = smps[i];
Villas__Node__Sample *pb_smp = pb_msg->samples[i];
smp->flags = 0;
smp->signals = io->signals;
if (pb_smp->type != VILLAS__NODE__SAMPLE__TYPE__DATA) {
......
......@@ -45,20 +45,14 @@ protected:
StatsHook *parent;
public:
StatsWriteHook(struct path *p, struct node *n, int fl, int prio, bool en = true) :
Hook(p, n, fl, prio, en)
StatsWriteHook(StatsHook *pa, struct path *p, struct node *n, int fl, int prio, bool en = true) :
Hook(p, n, fl, prio, en),
parent(pa)
{
state = State::CHECKED;
}
virtual Hook::Reason process(sample *smp)
{
timespec now = time_now();
node->stats->update(Stats::Metric::AGE, time_delta(&smp->ts.received, &now));
return Reason::OK;
}
virtual Hook::Reason process(sample *smp);
};
class StatsReadHook : public Hook {
......@@ -66,9 +60,12 @@ class StatsReadHook : public Hook {
protected:
sample *last;
StatsHook *parent;
public:
StatsReadHook(struct path *p, struct node *n, int fl, int prio, bool en = true) :
Hook(p, n, fl, prio, en)
StatsReadHook(StatsHook *pa, struct path *p, struct node *n, int fl, int prio, bool en = true) :
Hook(p, n, fl, prio, en),
parent(pa)
{
state = State::CHECKED;
}
......@@ -92,38 +89,14 @@ public:
state = State::STOPPED;
}
virtual Hook::Reason process(sample *smp)
{
if (last) {
if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_RECEIVED)
node->stats->update(Stats::Metric::GAP_RECEIVED, time_delta(&last->ts.received, &smp->ts.received));
if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_ORIGIN)
node->stats->update(Stats::Metric::GAP_SAMPLE, time_delta(&last->ts.origin, &smp->ts.origin));
if ((smp->flags & (int) SampleFlags::HAS_TS_ORIGIN) && (smp->flags & (int) SampleFlags::HAS_TS_RECEIVED))
node->stats->update(Stats::Metric::OWD, time_delta(&smp->ts.origin, &smp->ts.received));
if (smp->flags & last->flags & (int) SampleFlags::HAS_SEQUENCE) {
int dist = smp->sequence - (int32_t) last->sequence;
if (dist != 1)
node->stats->update(Stats::Metric::SMPS_REORDERED, dist);
}
}
sample_incref(smp);
if (last)
sample_decref(last);
last = smp;
return Reason::OK;
}
virtual Hook::Reason process(sample *smp);
};
class StatsHook : public Hook {
friend StatsReadHook;
friend StatsWriteHook;
protected:
StatsReadHook *readHook;
StatsWriteHook *writeHook;
......@@ -150,11 +123,13 @@ public:
uri()
{
/* Add child hooks */
readHook = new StatsReadHook(p, n, fl, prio, en);
writeHook = new StatsWriteHook(p, n, fl, prio, en);
readHook = new StatsReadHook(this, p, n, fl, prio, en);
writeHook = new StatsWriteHook(this, p, n, fl, prio, en);
vlist_push(&node->in.hooks, (void *) readHook);
vlist_push(&node->out.hooks, (void *) writeHook);
if (node) {
vlist_push(&node->in.hooks, (void *) readHook);
vlist_push(&node->out.hooks, (void *) writeHook);
}
}
virtual void start()
......@@ -189,6 +164,15 @@ public:
stats->reset();
}
virtual Hook::Reason process(sample *smp)
{
// Only call readHook if it hasnt been added to the node's hook list
if (!node)
return readHook->process(smp);
return Hook::Reason::OK;
}
virtual void periodic()
{
assert(state == State::STARTED);
......@@ -239,12 +223,51 @@ public:
/* Register statistic object to path.
*
* This allows the path code to update statistics. */
node->stats = stats;
if (node)
node->stats = stats;
state = State::PREPARED;
}
};
Hook::Reason StatsWriteHook::process(sample *smp)
{
timespec now = time_now();
parent->stats->update(Stats::Metric::AGE, time_delta(&smp->ts.received, &now));
return Reason::OK;
}
Hook::Reason StatsReadHook::process(sample *smp)
{
if (last) {
if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_RECEIVED)
parent->stats->update(Stats::Metric::GAP_RECEIVED, time_delta(&last->ts.received, &smp->ts.received));
if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_ORIGIN)
parent->stats->update(Stats::Metric::GAP_SAMPLE, time_delta(&last->ts.origin, &smp->ts.origin));
if ((smp->flags & (int) SampleFlags::HAS_TS_ORIGIN) && (smp->flags & (int) SampleFlags::HAS_TS_RECEIVED))
parent->stats->update(Stats::Metric::OWD, time_delta(&smp->ts.origin, &smp->ts.received));
if (smp->flags & last->flags & (int) SampleFlags::HAS_SEQUENCE) {
int dist = smp->sequence - (int32_t) last->sequence;
if (dist != 1)
parent->stats->update(Stats::Metric::SMPS_REORDERED, dist);
}
}
sample_incref(smp);
if (last)
sample_decref(last);
last = smp;
return Reason::OK;
}
/* Register hook */
static HookPlugin<StatsHook> p(
"stats",
......
This diff is collapsed.
......@@ -461,7 +461,7 @@ check: if (optarg == endptr)
throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret);
PipeReceiveDirection recv_dir(node, &io, enable_recv, limit_recv);
PipeSendDirection send_dir(node, &io, enable_recv, limit_recv);
PipeSendDirection send_dir(node, &io, enable_recv, limit_send);
recv_dir.startThread();
send_dir.startThread();
......
......@@ -28,7 +28,7 @@ SCRIPTPATH=$(dirname $SCRIPT)
SRCDIR=${SRCDIR:-$(realpath ${SCRIPTPATH}/..)}
BUILDDIR=${BUILDDIR:-${SRCDIR}/build}
TAG=${TAG:-develop}
TAG=${TAG:-latest}
IMAGE="villas/node-dev:${TAG}"
docker run \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment