Commit f00c225b authored by Felix Wege's avatar Felix Wege

io: removed CHECKED state and fixed sprintf/sscan

parent 27f14b6e
......@@ -82,8 +82,6 @@ int io_init2(struct io *io, const struct format_type *fmt, const char *dt, int f
int io_destroy(struct io *io) __attribute__ ((warn_unused_result));
int io_check(struct io *io);
int io_open(struct io *io, const char *uri);
int io_close(struct io *io);
......
......@@ -64,10 +64,6 @@ public:
if (ret)
throw RuntimeError("Failed to initialze IO");
ret = io_check(&io);
if (ret)
throw RuntimeError("Failed to check IO");
ret = io_open(&io, uri);
if (ret)
throw RuntimeError("Failed to open IO");
......
......@@ -144,7 +144,7 @@ int io_destroy(struct io *io)
{
int ret;
assert(io->state == State::CLOSED || io->state == State::INITIALIZED || io->state == State::CHECKED);
assert(io->state == State::CLOSED || io->state == State::INITIALIZED);
ret = io_type(io)->destroy ? io_type(io)->destroy(io) : 0;
if (ret)
......@@ -163,13 +163,6 @@ int io_destroy(struct io *io)
return 0;
}
int io_check(struct io *io)
{
io->state = State::CHECKED;
return 0;
}
int io_stream_open(struct io *io, const char *uri)
{
int ret;
......@@ -338,7 +331,7 @@ int io_open(struct io *io, const char *uri)
{
int ret;
assert(io->state == State::CHECKED || io->state == State::CLOSED);
assert(io->state == State::INITIALIZED || io->state == State::CLOSED);
ret = io_type(io)->open
? io_type(io)->open(io, uri)
......@@ -505,14 +498,14 @@ FILE * io_stream_input(struct io *io) {
int io_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
assert(io->state == State::CHECKED || io->state == State::OPENED);
assert(io->state == State::INITIALIZED || io->state == State::OPENED || io->state == State::CLOSED);
return io_type(io)->sscan ? io_type(io)->sscan(io, buf, len, rbytes, smps, cnt) : -1;
}
int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt)
{
assert(io->state == State::CHECKED || io->state == State::OPENED);
assert(io->state == State::INITIALIZED || io->state == State::OPENED || io->state == State::CLOSED);
return io_type(io)->sprint ? io_type(io)->sprint(io, buf, len, wbytes, smps, cnt) : -1;
}
......@@ -245,10 +245,6 @@ int amqp_start(struct vnode *n)
if (ret)
return ret;
ret = io_check(&a->io);
if (ret)
return ret;
/* Connect producer */
a->producer = amqp_connect(&a->connection_info, &a->ssl_info);
if (!a->producer)
......
......@@ -125,10 +125,6 @@ int exec_prepare(struct vnode *n)
if (ret)
return ret;
ret = io_check(&e->io);
if (ret)
return ret;
/* Start subprocess */
e->proc = std::make_unique<Popen>(e->command, e->arguments, e->environment, e->working_dir, e->shell);
debug(2, "Started sub-process with pid=%d", e->proc->getPid());
......
......@@ -276,10 +276,6 @@ int file_start(struct vnode *n)
if (ret)
return ret;
ret = io_check(&f->io);
if (ret)
return ret;
ret = io_open(&f->io, f->uri);
if (ret)
return ret;
......
......@@ -299,10 +299,6 @@ int mqtt_check(struct vnode *n)
int ret;
struct mqtt *m = (struct mqtt *) n->_vd;
ret = io_check(&m->io);
if (ret)
return ret;
ret = mosquitto_sub_topic_check(m->subscribe);
if (ret != MOSQ_ERR_SUCCESS)
error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret));
......
......@@ -164,10 +164,6 @@ int nanomsg_start(struct vnode *n)
if (ret)
return ret;
ret = io_check(&m->io);
if (ret)
return ret;
ret = m->in.socket = nn_socket(AF_SP, NN_SUB);
if (ret < 0) {
warning("Failed to create nanomsg socket: node=%s, error=%s", node_name(n), nn_strerror(errno));
......
......@@ -340,10 +340,6 @@ int rtp_start(struct vnode *n)
if (ret)
return -1;
ret = io_check(&r->io);
if (ret)
return ret;
/* Initialize AIMD hook */
if (r->aimd.rate_hook_type != RTPHookType::DISABLED) {
#ifdef WITH_HOOKS
......
......@@ -168,10 +168,6 @@ int socket_start(struct vnode *n)
if (ret)
return ret;
ret = io_check(&s->io);
if (ret)
return ret;
/* Create socket */
switch (s->layer) {
case SocketLayer::UDP:
......
......@@ -297,10 +297,6 @@ int test_rtt_start(struct vnode *n)
if (ret)
return ret;
ret = io_check(&t->io);
if (ret)
return ret;
t->task.setRate(c->rate);
t->current = -1;
......
......@@ -94,10 +94,6 @@ static int websocket_connection_init(struct websocket_connection *c)
if (ret)
return ret;
ret = io_check(&c->io);
if (ret)
return ret;
c->buffers.recv = new Buffer(DEFAULT_WEBSOCKET_BUFFER_SIZE);
c->buffers.send = new Buffer(DEFAULT_WEBSOCKET_BUFFER_SIZE);
......
......@@ -327,10 +327,6 @@ int zeromq_start(struct vnode *n)
if (ret)
return ret;
ret = io_check(&z->io);
if (ret)
return ret;
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case zeromq::Pattern::RADIODISH:
......
......@@ -136,10 +136,6 @@ protected:
if (ret)
throw RuntimeError("Failed to initialize IO: {}", dirs[i].name);
ret = io_check(&dirs[i].io);
if (ret)
throw RuntimeError("Failed to validate IO configuration");
ret = io_open(&dirs[i].io, nullptr);
if (ret)
throw RuntimeError("Failed to open IO");
......
......@@ -211,10 +211,6 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to initialize IO");
ret = io_check(&io);
if (ret)
throw RuntimeError("Failed to validate IO configuration");
ret = io_open(&io, nullptr);
if (ret)
throw RuntimeError("Failed to open IO");
......
......@@ -421,10 +421,6 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to initialize IO");
ret = io_check(&io);
if (ret)
throw RuntimeError("Failed to validate IO configuration");
ret = io_open(&io, nullptr);
if (ret)
throw RuntimeError("Failed to open IO");
......
......@@ -265,10 +265,6 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to initialize output");
ret = io_check(&io);
if (ret)
throw RuntimeError("Failed to validate IO configuration");
ret = pool_init(&q, 16, SAMPLE_LENGTH(vlist_length(&n.in.signals)), &memory_heap);
if (ret)
throw RuntimeError("Failed to initialize pool");
......
......@@ -63,10 +63,6 @@ public:
if (ret)
throw RuntimeError("Failed to initialize IO");
ret = io_check(&io);
if (ret)
throw RuntimeError("Failed to validate IO configuration");
ret = io_open(&io, path.c_str());
if (ret)
throw RuntimeError("Failed to open file: {}", path);
......
......@@ -258,9 +258,6 @@ ParameterizedTest(Param *p, io, lowlevel, .init = init_memory)
ret = io_init(&io, f, &signals, (int) SampleFlags::HAS_ALL);
cr_assert_eq(ret, 0);
ret = io_check(&io);
cr_assert_eq(ret, 0);
cnt = io_sprint(&io, buf, sizeof(buf), &wbytes, smps, p->cnt);
cr_assert_eq(cnt, p->cnt, "Written only %d of %d samples for format %s", cnt, p->cnt, format_type_name(f));
......@@ -359,9 +356,6 @@ ParameterizedTest(Param *p, io, highlevel, .init = init_memory)
ret = io_init(&io, f, &signals, (int) SampleFlags::HAS_ALL);
cr_assert_eq(ret, 0);
ret = io_check(&io);
cr_assert_eq(ret, 0);
ret = io_open(&io, fn);
cr_assert_eq(ret, 0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment