Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • acs/public/deliverables/platone/platone-blockchain-access-layer
1 result
Show changes
Commits on Source (5)
Showing
with 37727 additions and 334 deletions
......@@ -2,8 +2,9 @@
*node_modules/
*.db/
*-p/
*build/
*app/build/
/dist/
bal.log*
npm-debug.log*
yarn-debug.log*
yarn-error.log*
......@@ -11,6 +12,8 @@ yarn-error.log*
/test/unit/coverage/
/test/e2e/reports/
selenium-debug.log
app/cassandra_schema/cassandra1_data/
app/cassandra_schema/cassandra2_data/
# Editor directories and files
.idea
......
......@@ -3,16 +3,14 @@
MIT
## Deployment
DB Container
Network creation
``` bash
$ docker run -d --name db <your-volume-path>:/data/db -p 27017:27017 mongo:latest
$ docker network create bap-net
```
SCD Container
``` bash
$ cd app/cassandra
$ docker-compose up -d
$ docker run --network bap-net -d --name db -v <your-volume-path>:/data/db -p 27017:27017 mongo:latest
```
Mosquitto Container
......@@ -21,17 +19,11 @@ $ cd app/mosquitto
$ docker-compose up -d
```
Ethereum Nodes
``` bash
$ cd docker-ethereum
$ docker-compose -f docker-compose.yml up -d
```
BAP Container
``` bash
$ cd app #location of DockerFile
$ docker build -t platone-bap:1.0
$ docker run --name bap -p 8082:8082 -e DATABASE_URL=<your-db-url> -e SCD_URL=<your-scd-url> -e MQTT_URL=<your-mqtt-url> -d platone-bap:1.0
$ docker build -t platone-bap:2.0 .
$ docker run --name bap -p 8082:8082 --network bap-net -e DATABASE_URL=db:27017 -e MQTT_HOST=mqtt://mosquitto_container:1883 -d platone-bap:2.0
$ docker exec -it bap bash
$ npm run deploy-local
$ npm run init-producers
......@@ -40,19 +32,11 @@ $ docker restart bap
```
API Gateway Container
Web Dashboard Container
``` bash
$ cd api
$ docker build -t platone-bal-api:1.0
$ docker run -p 3000:3000 -e DATABASE_URL=<your-db-url> -e SCD_URL=<your-scd-url> -d platone-bal-api:1.0
$ cd client #location of DockerFile
$ export API_URL=<bap-url>
$ docker build -t platone-bap-ui:2.0 .
$ docker run -p 80:80 -p 443:443 -d platone-bap-ui:2.0
```
BAP UI Container
``` bash
$ cd client
$ export API_URL=<your-api-url> #URL of API Gateway
$ docker build -t platone-bal-ui:1.0
$ docker run -p 80:80 -p 443:443 -d platone-bal-ui:1.0
```
......@@ -12,5 +12,6 @@ module.exports = {
rules: {
'no-underscore-dangle': 'off',
'linebreak-style': 0,
'max-len': 'off',
},
};
This diff is collapsed.
CREATE KEYSPACE IF NOT EXISTS ddemo_platone WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };
CREATE KEYSPACE IF NOT EXISTS ddemo_platone WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor' : 3};
USE ddemo_platone;
......@@ -14,9 +14,11 @@ CREATE TABLE meterreading (
id uuid,
owner text,
device text,
datetime timestamp,
readings frozen <list<frozen <reading>>>,
timestamp timestamp,
timestamp bigint,
PRIMARY KEY (id)
);
CREATE INDEX meterreading_timestamp_idx ON meterreading ("timestamp");
\ No newline at end of file
CREATE INDEX meterreading_timestamp_idx ON meterreading ("timestamp");
CREATE INDEX meterreading_datetime_idx ON meterreading ("datetime");
\ No newline at end of file
version: '3'
services:
cassandra:
container_name: cassandra
cassandra1:
container_name: cassandra1
image: cassandra:latest
ports:
- "9042:9042"
environment:
- "CASSANDRA_SEEDS=cassandra1,cassandra2"
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"
restart: always
volumes:
- ./cassandra_data:/var/lib/cassandra
- ./cassandra1_data:/var/lib/cassandra
cassandra2:
container_name: cassandra2
image: cassandra:latest
depends_on:
- cassandra1
ports:
- "9043:9042"
environment:
- "CASSANDRA_SEEDS=cassandra1,cassandra2"
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"
restart: always
volumes:
- ./cassandra2_data:/var/lib/cassandra
cassandra-load-ddemo-keyspace:
container_name: ddemo-cassandra-load-keyspace
image: cassandra:latest
depends_on:
- cassandra
- cassandra1
volumes:
- ./ddemo_platone_schema.cql:/ddemo_platone_schema.cql
command: /bin/bash -c "sleep 60 && echo loading cassandra keyspace && cqlsh cassandra -f /ddemo_platone_schema.cql"
command: /bin/bash -c "sleep 60 && echo loading cassandra keyspace && cqlsh -f /ddemo_platone_schema.cql"
deploy:
restart_policy:
condition: on-failure
......@@ -31,14 +47,19 @@ services:
container_name: gdemo-cassandra-load-keyspace
image: cassandra:latest
depends_on:
- cassandra
- cassandra1
volumes:
- ./gdemo_platone_schema.cql:/gdemo_platone_schema.cql
command: /bin/bash -c "sleep 60 && echo loading cassandra keyspace && cqlsh cassandra -f /gdemo_platone_schema.cql"
command: /bin/bash -c "sleep 60 && echo loading cassandra keyspace && cqlsh -f /gdemo_platone_schema.cql"
deploy:
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
window: 120s
networks:
default:
external: true
name: bap-net
CREATE KEYSPACE IF NOT EXISTS gdemo_platone WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };
CREATE KEYSPACE IF NOT EXISTS gdemo_platone WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor' : 3};
USE gdemo_platone;
......@@ -59,7 +59,7 @@ CREATE TYPE usagepoint (
"mRID" text
);
CREATE TABLE IF NOT EXISTS gdemo_platone.MeterReading (
CREATE TABLE meterReading (
id uuid,
is_coincident_trigger boolean,
end_device_events uuid,
......@@ -68,5 +68,9 @@ CREATE TABLE IF NOT EXISTS gdemo_platone.MeterReading (
readings frozen<list<Reading>>,
usagepoint frozen<UsagePoint>,
values_interval uuid, /*id_date_time_interval*/
PRIMARY KEY (id)
);
\ No newline at end of file
datetime timestamp,
owner text,
PRIMARY KEY (id),
);
CREATE INDEX meterreading_timestamp_idx ON meterreading ("datetime");
\ No newline at end of file
......@@ -2,12 +2,24 @@ module.exports = {
mqttHost: process.env.MQTT_HOST || 'mqtts://platone.eng.it:8883',
mqttUsernameG: 'gdemo',
mqttPasswordG: 'Gdemo01!',
xmlFilePath: '../examples_files/amr.xml',
xmlFilePath: '../examples_files/new.xml',
mqttUsernameD: 'ddemo',
mqttPasswordD: 'Ddemo01!',
mqttTopicD: 'platone/ddemo',
mqttTopicG: 'platone/gdemo',
mqttUsernameDsotp: 'dsotp',
mqttPasswordDsotp: 'dsotp',
mqttTopicDsotp: 'platone/dsotp',
mqttTopicDsotpGreek: 'platone/dsotp-greek',
mqttUsernameAdmin: 'admin',
mqttPasswordAdmin: 'admin',
mosquittoAdmin: 'platone',
mosquittoPasswordAdmin: 'Platone2022',
mqttPasswordAdminGerman: 'xu7phohhoh9ux3Aituy9vie7sae7aqua',
cassandraContactPointsIP: process.env.CASSANDRA_HOST || '161.27.206.144',
localDataCenterCassandra: 'datacenter1',
keyspaceDdemo: 'ddemo_platone',
queryGetMeterReadingsByOwnerAndTimestamp: 'SELECT * FROM ddemo_platone.meterreading WHERE owner = ? AND timestamp >= ? AND timestamp < ? ALLOW FILTERING',
keyspaceGdemo: 'gdemo_platone',
queryGetMeterReadingsByOwnerAndTimestamp: 'SELECT * FROM meterreading WHERE owner = ? AND datetime >= ? AND datetime < ? ALLOW FILTERING',
queryGetMeterReadingsByOwnerAndTimestampGreek: 'SELECT * FROM gdemo_platone.meterreading WHERE owner = ? AND timestamp >= ? AND timestamp < ? ALLOW FILTERING',
};
......@@ -15,8 +15,9 @@ const web3ProviderFix = (LoggableCounter) => {
};
function init() {
const { host } = truffleConfig.networks.psm;
const { port } = truffleConfig.networks.psm;
const network = process.env.CHAIN_NETWORK || 'psm';
const { host } = truffleConfig.networks[network];
const { port } = truffleConfig.networks[network];
const url = `http://${host}:${port}`;
return new Web3(new Web3.providers.HttpProvider(url));
......
{
"device": "device1",
"timestamp": "2020-05-20T10:27:57.980802+00:00",
"device": "dummy-pmu-c",
"timestamp": "1616398138830086",
"readings": [
{
"component": "BUS1",
"measurand": "voltmagnitude",
"phase": "A",
"data": 77
"data": 231.768
},
{
"component": "BUS2",
"component": "BUS1",
"measurand": "voltangle",
"phase": "A",
"data": 0.798
},
{
"component": "BUS1",
"measurand": "frequency",
"phase": "A",
"data": 50.1752
},
{
"component": "BUS1",
"measurand": "rocof",
"phase": "A",
"data": 1.101
},
{
"component": "BUS1",
"measurand": "voltmagnitude",
"phase": "B",
"data": 230.36
},
{
"component": "BUS1",
"measurand": "voltangle",
"phase": "B",
"data": 0.062
},
{
"component": "BUS1",
"measurand": "frequency",
"phase": "B",
"data": 50.0308
},
{
"component": "BUS1",
"measurand": "rocof",
"phase": "B",
"data": 1.077
},
{
"component": "BUS1",
"measurand": "voltmagnitude",
"phase": "C",
"data": 231.628
},
{
"component": "BUS1",
"measurand": "voltangle",
"phase": "C",
"data": 0.014
},
{
"component": "BUS1",
"measurand": "frequency",
"phase": "C",
"data": 49.8804
},
{
"component": "BUS1",
"measurand": "rocof",
"phase": "C",
"data": 1.093
},
{
"component": "BUS1",
"measurand": "currmagnitude",
"phase": "A",
"data": 20.164
},
{
"component": "BUS1",
"measurand": "currangle",
"phase": "A",
"data": 88
"data": 0.824
},
{
"component": "BUS1",
"measurand": "frequency",
"phase": "A",
"data": 49.8384
},
{
"component": "BUS1",
"measurand": "rocof",
"phase": "A",
"data": 0.828
},
{
"component": "BUS1",
"measurand": "currmagnitude",
"phase": "B",
"data": 19.416
},
{
"component": "BUS1",
"measurand": "currangle",
"phase": "B",
"data": 0.044
},
{
"component": "BUS1",
"measurand": "frequency",
"phase": "B",
"data": 49.8136
},
{
"component": "BUS1",
"measurand": "rocof",
"phase": "B",
"data": 1.367
},
{
"component": "BUS1",
"measurand": "currmagnitude",
"phase": "C",
"data": 19.164
},
{
"component": "BUS1",
"measurand": "currangle",
"phase": "C",
"data": 0.075
},
{
"component": "BUS1",
"measurand": "frequency",
"phase": "C",
"data": 49.9952
},
{
"component": "BUS1",
"measurand": "rocof",
"phase": "C",
"data": 1.236
}
]
}
\ No newline at end of file
<mr:MeterReadings xmlns:mr="http://iec.ch/TC57/2011/MeterReadings#">
<mr:MeterReading>
<mr:Meter>
<mr:Names>
<mr:name>K.D P490</mr:name>
<mr:NameType>
<mr:name>EndpointID P490</mr:name>
<mr:NameTypeAuthority>
<mr:name>HEDNO</mr:name>
</mr:NameTypeAuthority>
</mr:NameType>
</mr:Names>
</mr:Meter>
<mr:Readings>
<mr:timeStamp>2021-03-29T11:10:00+03:00</mr:timeStamp>
<mr:value>20559.15</mr:value>
<mr:ReadingType ref="0.26.0.0.0.1.54.0.0.0.0.0.0.0.224.0.29.0" />
</mr:Readings>
<mr:UsagePoint>
<mr:mRID>_a9d5f01cf72041659dadf52e2f2a9db7</mr:mRID>
</mr:UsagePoint>
</mr:MeterReading>
<mr:MeterReading>
<mr:Meter>
<mr:Names>
<mr:name>36014543</mr:name>
<mr:NameType>
<mr:name>EndpointID P490</mr:name>
<mr:NameTypeAuthority>
<mr:name>HEDNO</mr:name>
</mr:NameTypeAuthority>
</mr:NameType>
</mr:Names>
</mr:Meter>
<mr:Readings>
<mr:timeStamp>2021-03-29T11:10:00+03:00</mr:timeStamp>
<mr:value>11.44</mr:value>
<mr:ReadingType ref="0.26.0.0.1.1.12.0.0.0.0.0.0.0.224.3.72.0" />
</mr:Readings>
<mr:Readings>
<mr:timeStamp>2021-03-29T11:10:00+03:00</mr:timeStamp>
<mr:value>0.8</mr:value>
<mr:ReadingType ref="0.26.0.0.1.1.12.0.0.0.0.0.0.0.224.3.73.0" />
</mr:Readings>
<mr:UsagePoint>
<mr:mRID>9400175</mr:mRID>
</mr:UsagePoint>
</mr:MeterReading>
<mr:MeterReading>
<mr:Meter>
<mr:Names>
<mr:name>30022297</mr:name>
<mr:NameType>
<mr:name>EndpointID P490</mr:name>
<mr:NameTypeAuthority>
<mr:name>HEDNO</mr:name>
</mr:NameTypeAuthority>
</mr:NameType>
</mr:Names>
</mr:Meter>
<mr:Readings>
<mr:timeStamp>2021-03-29T11:10:00+03:00</mr:timeStamp>
<mr:value>6.4</mr:value>
<mr:ReadingType ref="0.26.0.0.1.1.12.0.0.0.0.0.0.0.224.3.72.0" />
</mr:Readings>
<mr:Readings>
<mr:timeStamp>2021-03-29T11:10:00+03:00</mr:timeStamp>
<mr:value>0.16</mr:value>
<mr:ReadingType ref="0.26.0.0.1.1.12.0.0.0.0.0.0.0.224.3.73.0" />
</mr:Readings>
<mr:UsagePoint>
<mr:mRID>9400177</mr:mRID>
</mr:UsagePoint>
</mr:MeterReading>
</mr:MeterReadings>
\ No newline at end of file
/* eslint-disable no-shadow */
/* eslint-disable no-console */
const mqtt = require('mqtt');
const cassandra = require('cassandra-driver');
const { Mapper } = cassandra.mapping;
const { Uuid } = cassandra.types;
const fs = require('fs-extra');
const xml2js = require('xml2js');
const path = require('path');
const MeterReading = require('../models/MeterReadingModel');
const meterReadingLib = require('../libs/meterReading');
const MeterReadingModel = require('../models/dataModels/MeterReadingModel');
const userLib = require('../libs/user');
const { stripPrefix } = xml2js.processors;
const config = require('../config/config');
const logger = require('../logger');
const options = {
username: config.mqttUsernameG,
password: config.mqttPasswordG,
username: config.mqttUsernameAdmin,
password: config.mqttPasswordAdmin,
rejectedUnauthorized: false,
};
const parser = new xml2js.Parser({
......@@ -27,9 +24,6 @@ const parser = new xml2js.Parser({
attrNameProcessors: [stripPrefix],
});
const filePath = path.join(__dirname, config.xmlFilePath);
console.log(filePath);
class MqttHandler {
constructor(host) {
this.mqttClient = null;
......@@ -38,76 +32,68 @@ class MqttHandler {
connect() {
this.mqttClient = mqtt.connect(this.host, options);
const cassandraClient = new cassandra.Client({
contactPoints: ['161.27.206.144'],
localDataCenter: 'datacenter1',
keyspace: 'gdemo_platone',
});
// Mqtt error calback
this.mqttClient.on('error', (err) => {
console.log(err);
console.log('ERRORE MQTT');
logger.error(err);
logger.info('ERRORE MQTT');
this.mqttClient.end();
});
// Connection callback
this.mqttClient.on('connect', () => {
console.log('mqtt client connected');
logger.info('mqtt client connected');
// mqtt subscriptions
});
this.mqttClient.subscribe('platone/gdemo');
this.mqttClient.publish('platone/gdemo', 'platone/gdemo publisher');
const topicG = config.mqttTopicG;
this.mqttClient.subscribe([topicG]);
this.mqttClient.on('message', (topic, message) => {
if (topic === 'platone/gdemo') {
if (topic === topicG) {
const data = message;
console.log(data.toString());
const meterReadings = [];
fs.readFile(filePath, { encoding: 'utf-8' }, (_error, data) => {
parser.parseString(data, async (err, res) => {
if (err) console.log(err);
console.log(res);
const mapper = new Mapper(cassandraClient, {
models: {
MeterReading: {
tables: ['meterreading'],
},
},
parser.parseString(data, async (err, res) => {
if (err) logger.error(err);
const owner = await userLib.getOwnerByTopic(topic, 'producer');
logger.info(`ONWER-> ${owner}`);
logger.info(JSON.stringify(res));
for (let i = 0; i < res.MeterReading.length; i += 1) {
const meterReading = new MeterReadingModel({
Meter: data.Meter,
Readings: data.Readings,
UsagePoint: data.UsagePoint,
});
const MeterReadingMapper = mapper.forModel('MeterReading');
for (let i = 0; i < res.MeterReading.length; i += 1) {
const meterReading = new MeterReading({
Meter: res.MeterReading[i].Meter,
Readings: res.MeterReading[i].Readings,
UsagePoint: res.MeterReading[i].UsagePoint,
});
meterReadings.push(meterReading);
cassandraClient.connect().then(() => {
MeterReadingMapper.insert({
id: Uuid.random(),
meter: meterReading.Meter,
readings: meterReading.Readings,
usagepoint: meterReading.UsagePoint,
});
}).catch((err2) => {
console.error('There was an error when connecting', err2);
return cassandraClient.shutdown().then(() => { throw err2; });
});
}
console.log(JSON.stringify(meterReadings[0]));
console.log(meterReadings[0].Readings);
});
logger.info(` ${topic} METER READING Meter>>> ${JSON.stringify(meterReading.Meter)}`);
logger.info(` ${topic} METER READING Readings>>> ${JSON.stringify(meterReading.Readings)}`);
logger.info(` ${topic} METER READING UsagePoint>>> ${JSON.stringify(meterReading.UsagePoint)}`);
const meterReadingObj = {
owner,
datetime: new Date(),
Meter: meterReading.Meter,
Readings: meterReading.Readings,
UsagePoint: meterReading.UsagePoint,
};
meterReadingLib.save(meterReadingObj);
const newMeterReading = {
Meter: data.Meter,
Readings: data.Readings,
UsagePoint: data.UsagePoint,
};
this.mqttClient.publish(config.mqttTopicDsotpGreek, JSON.stringify(newMeterReading));
}
});
}
});
this.mqttClient.on('close', () => {
console.log('mqtt client disconnected');
logger.info('mqtt client disconnected');
});
}
}
......
const mqtt = require('mqtt');
const cassandra = require('cassandra-driver');
const { Mapper } = cassandra.mapping;
const { Uuid } = cassandra.types;
const xml2js = require('xml2js');
const MeterReading = require('../models/MeterReadingModel');
const userLib = require('../libs/user');
const { stripPrefix } = xml2js.processors;
const config = require('../config/config');
const logger = require('../logger');
const options = {
username: config.mqttUsernameAdmin,
password: config.mqttPasswordAdmin,
rejectedUnauthorized: false,
};
const parser = new xml2js.Parser({
mergeAttrs: true,
explicitRoot: false,
trim: true,
explicitArray: false,
tagNameProcessors: [stripPrefix],
attrNameProcessors: [stripPrefix],
});
class MqttHandler {
constructor(host) {
this.mqttClient = null;
this.host = host;
}
connect() {
this.mqttClient = mqtt.connect(this.host, options);
const cassandraClient = new cassandra.Client({
contactPoints: [config.cassandraContactPointsIP],
localDataCenter: config.localDataCenterCassandra,
keyspace: config.keyspaceGdemo,
});
// Mqtt error calback
this.mqttClient.on('error', (err) => {
logger.error(err);
logger.info('ERRORE MQTT');
this.mqttClient.end();
});
// Connection callback
this.mqttClient.on('connect', () => {
logger.info('mqtt client connected');
// mqtt subscriptions
});
const topicG = config.mqttTopicG;
this.mqttClient.subscribe([topicG]);
this.mqttClient.on('message', (topic, message) => {
if (topic === topicG) {
const data = message;
const meterReadings = [];
parser.parseString(data, async (err, res) => {
if (err) logger.error(err);
const mapper = new Mapper(cassandraClient, {
models: {
MeterReading: {
tables: ['meterreading'],
},
},
});
const MeterReadingMapper = mapper.forModel('MeterReading');
const owner = await userLib.getOwnerByTopic(topic, 'producer');
logger.info(`ONWER-> ${owner}`);
logger.info(JSON.stringify(res));
for (let i = 0; i < res.MeterReading.length; i += 1) {
const meterReading = new MeterReading({
Meter: res.MeterReading[i].Meter,
Readings: res.MeterReading[i].Readings,
UsagePoint: res.MeterReading[i].UsagePoint,
});
meterReadings.push(meterReading);
cassandraClient.connect().then(() => {
MeterReadingMapper.insert({
id: Uuid.random(),
meter: meterReading.Meter,
readings: meterReading.Readings,
usagepoint: meterReading.UsagePoint,
owner,
datetime: new Date(),
});
}).catch((err2) => {
logger.error('There was an error when connecting', err2);
return cassandraClient.shutdown().then(() => { throw err2; });
});
const newMeterReading = new MeterReading({
Meter: res.MeterReading[i].Meter,
Readings: res.MeterReading[i].Readings,
UsagePoint: res.MeterReading[i].UsagePoint,
});
this.mqttClient.publish(config.mqttTopicDsotpGreek, JSON.stringify(newMeterReading));
}
});
}
});
this.mqttClient.on('close', () => {
logger.info('mqtt client disconnected');
});
}
}
module.exports = MqttHandler;
const mqtt = require('mqtt');
const cassandra = require('cassandra-driver');
// const schedule = require('node-schedule');
// const moment = require('moment');
const { Mapper } = cassandra.mapping;
const { Uuid } = cassandra.types;
const producerLib = require('../libs/producer');
const PMUMeterReading = require('../models/PMUMeterReading');
// const ddemo = require('../examples_files/ddemo');
const userLib = require('../libs/user');
const pmuMeterReadingLib = require('../libs/pmuMeterReading');
const PMUMeterReadingModel = require('../models/dataModels/PMUMeterReadingModel');
const config = require('../config/config');
const logger = require('../logger');
const options = {
username: config.mqttUsernameD,
password: config.mqttPasswordD,
username: config.mqttUsernameAdmin,
password: config.mqttPasswordAdmin,
rejectedUnauthorized: false,
};
......@@ -26,12 +20,6 @@ class MqttHandler {
connect() {
this.mqttClient = mqtt.connect(this.host, options);
const cassandraClient = new cassandra.Client({
contactPoints: [config.cassandraContactPointsIP],
localDataCenter: config.localDataCenterCassandra,
keyspace: config.keyspaceDdemo,
});
// Mqtt error calback
this.mqttClient.on('error', (err) => {
logger.error(err);
......@@ -46,57 +34,50 @@ class MqttHandler {
});
const topicD = config.mqttTopicD;
/* FOR TESTING PUBLISH */
/* const j1 = new schedule.Job(() => {
ddemo.timestamp = moment();
this.mqttClient.publish(topicD, JSON.stringify(ddemo));
});
const rule = new schedule.RecurrenceRule();
rule.minute = new schedule.Range(0, 59, 1);
j1.schedule(rule); */
const topicD2 = 'platone/test2';
this.mqttClient.subscribe([topicD]);
this.mqttClient.subscribe([topicD2]);
this.mqttClient.on('message', async (topic, message) => {
const entryMessage = message;
logger.info(`ENTRY MESSAGE from topic>>> ${topic}`);
logger.info(entryMessage.toString());
const jsonData = JSON.parse(entryMessage);
logger.info(` ${topic} DATA>>> ${JSON.stringify(jsonData)}`);
const datePart1 = jsonData.timestamp.split('.')[0];
const dateSeconds = new Date(datePart1).getTime() / 1000;
const datePart2 = jsonData.timestamp.split('.')[1];
const microseconds = datePart2.split('+')[0];
const timestampF = dateSeconds + microseconds;
const mapper = new Mapper(cassandraClient, {
models: {
MeterReading: {
tables: ['meterreading'],
},
},
const meterReading = new PMUMeterReadingModel({
device: jsonData.device,
timestamp: parseInt(timestampF, 10),
readings: jsonData.readings,
});
const MeterReadingMapper = mapper.forModel('MeterReading');
logger.info(` ${topic} MU MEPTER READING from device>>> ${JSON.stringify(meterReading.device)}`);
logger.info(` ${topic} PMU METER READING with timestamp>>> ${JSON.stringify(meterReading.timestamp)}`);
const owner = await userLib.getOwnerByTopic(topic, 'producer');
logger.info(` owner>>> ${JSON.stringify(owner)}`);
const meterReadingObj = {
owner,
datetime: new Date(),
device: meterReading.device,
timestamp: meterReading.timestamp,
readings: meterReading.readings,
};
const meterReading = new PMUMeterReading({
pmuMeterReadingLib.save(meterReadingObj);
// String Timestamp for DSOTP
const newMeterReading = new PMUMeterReadingModel({
device: jsonData.device,
timestamp: jsonData.timestamp,
timestamp: jsonData.timestamp.toString(),
readings: jsonData.readings,
});
const owner = await producerLib.getOwnerByTopic(topic);
cassandraClient.connect().then(() => {
MeterReadingMapper.insert({
id: Uuid.random(),
owner,
device: meterReading.device,
timestamp: meterReading.timestamp,
readings: meterReading.readings,
});
}).catch((err2) => {
logger.error('There was an error when connecting', err2);
return cassandraClient.shutdown().then(() => { throw err2; });
});
this.mqttClient.publish(config.mqttTopicDsotp, JSON.stringify(newMeterReading));
});
this.mqttClient.on('close', () => {
......
const schedule = require('node-schedule');
const logger = require('../logger');
const scdService = require('../services/scd');
const config = require('../config/config');
// const testPublisher = require('../test/mqtt-publisher');
exports.startScheduling = () => {
const j1 = new schedule.Job(() => {
logger.info('Read from SCD for data certification Ddemo');
scdService.certification(config.keyspaceDdemo);
});
const j2 = new schedule.Job(() => {
logger.info('Read from SCD for data certification Gdemo');
scdService.certification(config.keyspaceGdemo);
});
const j3 = new schedule.Job(() => {
logger.info('Read from SCD for data publishing');
scdService.publishing(config.keyspaceDdemo);
});
const j4 = new schedule.Job(() => {
logger.info('Publish Fake data on MQTT');
// testPublisher();
});
const rule = new schedule.RecurrenceRule();
rule.minute = new schedule.Range(0, 59, 1);
const rule2 = new schedule.RecurrenceRule();
rule2.minute = new schedule.Range(0, 59, 1);
j1.schedule(rule);
// j2.schedule(rule);
// j3.schedule(rule2);
// j4.schedule(rule2);
};
......@@ -9,8 +9,8 @@ const Certification = require('../../models/certification');
* @returns {Promise} - Returns a JSON with the specific certifications
*/
module.exports = (id) => new Promise((resolve, reject) => {
Certification.find({ _id: id }, (err, certifications) => {
module.exports = (query) => new Promise((resolve, reject) => {
Certification.find(query, (err, certifications) => {
if (err) {
reject(err);
} else {
......
const MeterReading = require('../../models/MeterReading');
/**
* Get a specific PmuMeterReading by id
* @memberOf module:PmuMeterReading
* @function get
* @param {String} query -
*
* @returns {Promise} - Returns a JSON with the specific PmuMeterReading
*/
module.exports = (query) => new Promise((resolve, reject) => {
MeterReading.find(query, (err, meterReadings) => {
if (err) {
reject(err);
} else {
resolve(meterReadings);
}
});
});
const MeterReading = require('../../models/MeterReading');
/**
* Get all MeterReading
* @memberOf module:MeterReading
* @function getAll
*
* @returns {Promise} - Returns a JSON with all MeterReading
*/
module.exports = () => new Promise((resolve, reject) => {
MeterReading.find({}, (err, meterReadings) => {
if (err) {
reject(err);
} else {
resolve(meterReadings);
}
});
});
const get = require('./get');
const save = require('./save');
const getAll = require('./getAll');
const self = {
get,
save,
getAll,
};
module.exports = self;
const MeterReading = require('../../models/MeterReading');
/**
* Save an MeterReading object.
* @memberOf module:MeterReading
* @function save
* @param {Object} params - MeterReading params
*
* @returns {Promise} - Returns a JSON with the saved MeterReading
*/
module.exports = (params) => {
const meterReadingData = new MeterReading(params);
return new Promise((resolve, reject) => {
MeterReading.findOneAndUpdate({ _id: meterReadingData._id },
meterReadingData,
{ upsert: true, new: false },
(err, meterReading) => {
if (err) {
reject(err);
} else {
resolve(meterReading);
}
});
});
};