tools.py 9.46 KB
Newer Older
Jiahang Chen's avatar
Jiahang Chen committed
1
"""This module provides a collection of convenience functions."""
C. Albrecht's avatar
WIP    
C. Albrecht committed
2
3

import json
Jiahang Chen's avatar
Jiahang Chen committed
4
import os
Jiahang Chen's avatar
Jiahang Chen committed
5
import xml.etree.ElementTree as ET
Jiahang Chen's avatar
Jiahang Chen committed
6
7
8
9
10
import sqlite3
import time
import jsonschema
from jsonschema import validate
from ml.app_logger import APP_LOGGER
C. Albrecht's avatar
WIP    
C. Albrecht committed
11
12

IDENTITY_PROVIDER_URL = "https://idp.s3i.vswf.dev/"
13
14
BUILT_IN = ["from", "str", "dict", "list", "bool", "float", "int", "tuple",
               "def", "return", "for", "if", "class", "try", "except"]
C. Albrecht's avatar
WIP    
C. Albrecht committed
15
16
17
18
19
20
21
22
23
24
25
26
27
28

class BColors:
    """colors for the console log"""

    HEADER = "\033[95m"
    OKBLUE = "\033[94m"
    OKGREEN = "\033[92m"
    WARNING = "\033[93m"
    FAIL = "\033[91m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"


Jiahang Chen's avatar
Jiahang Chen committed
29
def make_sub_thing(name, roles, features=[]):
C. Albrecht's avatar
WIP    
C. Albrecht committed
30
    """
Jiahang Chen's avatar
Jiahang Chen committed
31
32
33
    Creates a dictionary representing this thing in it's current state
    as a subordinate thing. This representation should be used for
    subordinate things in s3i repository entries
C. Albrecht's avatar
WIP    
C. Albrecht committed
34

Jiahang Chen's avatar
Jiahang Chen committed
35
36
37
38
39
40
    :param name: name of the sub thing
    :type name: str
    :param roles: fml40 roles of the sub thing
    :type roles: str
    :param features: fml40 features of the sub thing
    :type features: list
C. Albrecht's avatar
WIP    
C. Albrecht committed
41

Jiahang Chen's avatar
Jiahang Chen committed
42
43
    :returns: JSON of the sub thing
    :rtype: dict
C. Albrecht's avatar
WIP    
C. Albrecht committed
44
45

    """
Jiahang Chen's avatar
Jiahang Chen committed
46
47
48
    sub_thing = {
        "class": "ml40::Thing",
        "name": name,
Jiahang Chen's avatar
Jiahang Chen committed
49
50
        "roles": roles,
        "features": features
Jiahang Chen's avatar
Jiahang Chen committed
51
52
53
54
55
    }

    return sub_thing


Jiahang Chen's avatar
Jiahang Chen committed
56
def make_feature_config(class_name, identifier="", name="", subFeatures=""):
Jiahang Chen's avatar
Jiahang Chen committed
57
    """
Jiahang Chen's avatar
Jiahang Chen committed
58
    Creates a dictionary representing for its ml/fml40 features
Jiahang Chen's avatar
Jiahang Chen committed
59
60
61
62
63
64
65
66
67
68
69
70
71
72

    :param class_name: class name of the fml40 feature
    :type class_name: str
    :param identifier: local id of the fml40 feature
    :type identifier: str
    :param name: name of the fml40 feature
    :type name: str
    :param subFeatures: sub features
    :type subFeatures: dict

    :returns: JSON of the fml40 feature
    :rtype: dict

    """
Jiahang Chen's avatar
Jiahang Chen committed
73
74
75
76
77
78
79
80
81
82
83
84
85
    config_json = {
        "class": class_name
    }
    if identifier:
        config_json["identifier"] = identifier
    if name:
        config_json["name"] = name
    if subFeatures:
        config_json["subFeatures"] = subFeatures

    return config_json


Jiahang Chen's avatar
Jiahang Chen committed
86
def make_thing_config(thing_id, name, roles, features=[], config_path=""):
Jiahang Chen's avatar
Jiahang Chen committed
87
    """
Jiahang Chen's avatar
Jiahang Chen committed
88
    Creates a configuration file (JSON) for a fml40 thing.
Jiahang Chen's avatar
Jiahang Chen committed
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

    :param thing_id: identifier of the thing
    :type thing_id: str
    :param name: name of the thing
    :type name: str
    :param roles: fml40 roles of the thing
    :type roles: list
    :param features: fml40 features of the thing
    :type features: list
    :param config_path: path where the configuration file should be located
    :type config_path: str

    :returns: name of the configuration file of the thing
    :rtype: str


    """
Jiahang Chen's avatar
Jiahang Chen committed
106
107
108
    if not config_path:
        config_path = os.path.join("__file__", "configs")

Jiahang Chen's avatar
Jiahang Chen committed
109
    config_file = {
Jiahang Chen's avatar
Jiahang Chen committed
110
111
        "thingId": thing_id,
        "policyId": thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
112
113
114
        "attributes": {
            "class": "ml40::Thing",
            "name": name,
Jiahang Chen's avatar
Jiahang Chen committed
115
116
            "roles": roles,
            "features": features,
Jiahang Chen's avatar
Jiahang Chen committed
117
118
        }
    }
Jiahang Chen's avatar
Jiahang Chen committed
119
120
    file_path = os.path.join(config_path, "{}.json".format(name))
    with open(file_path, 'wb') as file:
Jiahang Chen's avatar
Jiahang Chen committed
121
        file.write(json.dumps(config_file).encode('utf-8'))
122
    return "{}.json".format(name)
Jiahang Chen's avatar
Jiahang Chen committed
123
124


Jiahang Chen's avatar
Jiahang Chen committed
125
def load_config(config_filepath):
Jiahang Chen's avatar
Jiahang Chen committed
126
    """
Jiahang Chen's avatar
Jiahang Chen committed
127
    Loads a JSON object from a json formatted file found at config_filepath.
C. Albrecht's avatar
WIP    
C. Albrecht committed
128
129

    :param config_filepath: Path to json formatted file.
Jiahang Chen's avatar
Jiahang Chen committed
130
    :type config_filepath: str
C. Albrecht's avatar
WIP    
C. Albrecht committed
131
132

    """
Jiahang Chen's avatar
Jiahang Chen committed
133
    with open(config_filepath) as config_file:
C. Albrecht's avatar
WIP    
C. Albrecht committed
134
135
136
137
        config = json.load(config_file)
        return config


Jiahang Chen's avatar
Jiahang Chen committed
138
def find_broker_endpoint(dir_obj, thing_id):
C. Albrecht's avatar
WIP    
C. Albrecht committed
139
    """
Jiahang Chen's avatar
Jiahang Chen committed
140
    Finds the S3I-B endpoint of a thing
C. Albrecht's avatar
WIP    
C. Albrecht committed
141

Jiahang Chen's avatar
Jiahang Chen committed
142
143
144
    :param dir_obj: S³I Directory Object
    :type dir_obj: object
    :param thing_id: identifier of the searched thing
C. Albrecht's avatar
WIP    
C. Albrecht committed
145
146

    """
Jiahang Chen's avatar
Jiahang Chen committed
147
148
149
150
151
152
    thing_json = dir_obj.queryThingIDBased(thing_id)
    all_endpoints = thing_json["attributes"].get("allEndpoints", None)
    if all_endpoints:
        for ep in all_endpoints:
            if "s3ib" in ep:
                return ep
C. Albrecht's avatar
WIP    
C. Albrecht committed
153
154


Jiahang Chen's avatar
Jiahang Chen committed
155
def remove_namespace(input_str):
C. Albrecht's avatar
WIP    
C. Albrecht committed
156
    """
Jiahang Chen's avatar
Jiahang Chen committed
157
    Removes the specified namespace like ml40 or fml40
Jiahang Chen's avatar
Jiahang Chen committed
158

Jiahang Chen's avatar
Jiahang Chen committed
159
160
161
162
    :param input_str: input with namespace
    :type input_str: str
    :returns: output without namespace
    :rtype: str
Jiahang Chen's avatar
Jiahang Chen committed
163

Jiahang Chen's avatar
Jiahang Chen committed
164
    """
Jiahang Chen's avatar
Jiahang Chen committed
165
    return input_str.replace("mml40::", "").replace("fml40::", "").replace("ml40::", "")
166
167
168
169
170
171
172
173
174
175
176
177


def check_var_conflict(var):
    """
    check if the input is built-in variable
    :param var: python var
    :return: modified variable, if the input is a input variable.
    """

    if var in BUILT_IN:
        return "_{}".format(var)
    else:
Jiahang Chen's avatar
Jiahang Chen committed
178
179
180
181
182
183
184
185
186
187
188
        return var


class XML:
    def __init__(self, path=None, xml_str=None):
        self.et = ET
        self._namespace = {}

        self._namespace = {
            'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
            'schemaLocation': 'urn:skogforsk:stanford2010 HarvestedProduction_V3p0.xsd',
Jiahang Chen's avatar
Jiahang Chen committed
189
            '': 'urn:skogforsk:stanford2010'
Jiahang Chen's avatar
Jiahang Chen committed
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
        }

        for key in self._namespace:
            self.et.register_namespace(key, self._namespace[key])

        if path is not None:
            self.tree = self.et.parse(path)
            self.root = self.tree.getroot()

        if xml_str is not None:
            self.root = self.et.fromstring(xml_str)

    def find_nodes(self, path):
        path_list = path.split('/')
        new_path = ""
        for _path in path_list:
            if ":" not in _path:
Jiahang Chen's avatar
Jiahang Chen committed
207
                _path = "{" + self._namespace.get('') + "}" + _path
Jiahang Chen's avatar
Jiahang Chen committed
208
209
210
211
            new_path += _path
            new_path += "/"
        if new_path[-1] == "/":
            new_path = new_path[:-1]
Jiahang Chen's avatar
Jiahang Chen committed
212
213
214
        return self.root.findall(new_path, self._namespace)

    def to_string(self, root):
Jiahang Chen's avatar
Jiahang Chen committed
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
        return self.et.tostring(root, "unicode", "xml")


class DataBase:
    def __init__(self, db, conf):
        self.__db = db
        self.__conf = conf

        self.__conn = None

    def connect(self):
        self.__conn = sqlite3.connect(self.__db, check_same_thread=False)
        APP_LOGGER.info("Connect to the database {}".format(self.__db))

    def build_table(self, table):
        conf_schema = {
            "type": "array",
            "items": {
                "type": "object",
                "required": ["var_name", "var_type"],
                "properties": {
                    "var_name": {"type": "string"},
                    "var_type": {"type": "string"}
                }
            }
        }
        try:
            validate(instance=self.__conf, schema=conf_schema)
        except jsonschema.ValidationError as e:
            APP_LOGGER.error("config json error: {}".format(e))

        exec_str = "ID INTEGER PRIMARY KEY AUTOINCREMENT, TIMESTAMP REAL NOT NULL"

        for i in self.__conf:
            name = i["var_name"]
            type = i["var_type"]
            exec_str += ", {} {}".format(name, type)

        exec_str = '''CREATE TABLE if not exists {0} ({1});'''.format(table, exec_str)
        cursor = self.__conn.cursor()
        try:
            cursor.execute(exec_str)
            APP_LOGGER.info("Create table {} if not exists".format(table))
        except sqlite3.OperationalError as e:
            APP_LOGGER.warning(e)

    def disconnect(self):
        self.__conn.close()

    def update(self, table, value_dict):
        for key in value_dict.keys():
            def_var_name = []
            for var in self.__conf:
                def_var_name.append(var["var_name"])
            if key not in def_var_name:
                raise ValueError("{} is not defined variable name".format(key))

        timestamp = time.time()
        cursor = self.__conn.cursor()
        exec_str = "TIMESTAMP"
        for i in self.__conf:
            exec_str += ",{}".format(i["var_name"])
        exec_str = "INSERT INTO {} ({})".format(table, exec_str)

        value_str = "?"
        for i in self.__conf:
            if isinstance(value_dict[i["var_name"]], str):
                value_str += ", '{}'".format(value_dict[i["var_name"]])
            elif isinstance(value_dict[i["var_name"]], (float, int, bool)):
                value_str += ", {}".format(value_dict[i["var_name"]])
        value_str = "VALUES ({})".format(value_str)

        exec_str += value_str
        try:
            cursor.execute(exec_str, (timestamp,))
            APP_LOGGER.info("Insert new value to table {}".format(table))
        except sqlite3.OperationalError as e:
            APP_LOGGER.warning(e)
        self.__conn.commit()

    def delete(self, table, id=None):
        if id is None:
            exec_str = "DELETE FROM {};".format(table)
            APP_LOGGER.info("delete all elements from table {}".format(table))
        else:
            exec_str = "DELETE FROM {} where id={}".format(table, id)
            APP_LOGGER.info("delete element id {} from table {}".format(id, table))
        try:
            self.__conn.cursor().execute(exec_str)
        except sqlite3.OperationalError as e:
            APP_LOGGER.warning(e)

        self.__conn.commit()

    def search(self, table):
        exec_str = "SELECT * FROM {};".format(table)
        cursor = self.__conn.cursor().execute(exec_str)
        for row in cursor:
            print("------------------")
            print("ID = {}".format(row[0]))
            print("TIMESTAMP = {}".format(row[1]))
            for i in range(len(self.__conf)):
                print("{} = {}".format(self.__conf[i]["var_name"].upper(), row[i + 2]))
            print("------------------\n\n")

    def execute(self, exec_str):
        try:
            return self.__conn.cursor().execute(exec_str)
        except sqlite3.OperationalError as e:
            APP_LOGGER.warning(e)