Commit 061ecc98 authored by Simon Sebastian Humpohl's avatar Simon Sebastian Humpohl
Browse files

Add a csv logger

Add a hardware subpackage with leak tester reading
parent 3ac79667
......@@ -46,8 +46,11 @@ This module contains a everything from `itertools`, `more_itertools` and custom
Here you find decorators, functions and classes that help you implement caching like `file_cache` and `lru_cache`. This is helpful if you need to call computationally expensive functions with the same arguments repeatedly.
## qutil.io
User input related functions like `query_yes_no`.
User input related functions like `query_yes_no` or a `CsvLogger` interface (for reading use pandas.read_csv).
## qutil.parallel
Functions and classes related to parallel execution i.e. multi-threading, multi-processing and asyncio.
There is a class for periodic callbacks from another thread `ThreadedPeriodicCallback`.
## qutil.hardware
This package contains little scripts to talk to various hardware devices. For example reading the leak tester via serial interface.
"""This module contains the SmartTest class which you can use to communicate with the leak tester."""
import serial
import logging
import argparse
import time
from datetime import datetime
from qutil.io import CsvLogger
__all__ = ["SmartTest"]
logger = logging.getLogger('smart_test')
class CheckSumError(ValueError):
pass
def calc_checksum(msg):
return b'%03i' % (sum(msg)%256)
def to_valid_message(msg: bytes):
return msg + calc_checksum(msg) + b'\r'
def parse_data(data: bytes, fmt: str):
if fmt in ('boolean_old', 0):
assert data in (b'0'*6, b'1'*6)
return bool(int(data))
if fmt in ('u_integer', 1):
assert len(data) == 6
return int(data)
if fmt in ('u_real', 2):
assert len(data) == 6
return int(data) / 100
if fmt in ('string', 4):
assert len(data) == 6
return data.decode('ascii')
if fmt in ('boolean_new', 6):
assert len(data) == 1
return bool(int(data))
if fmt in ('u_short_int', 7):
assert len(data) == 3
return int(data)
if fmt in ('u_expo_new', 10):
assert len(data) == 6
mantissa = data[:4].decode()
exponent = str(int(data[4:]) - 20 - 3)
# use parsing logic for most exact result
return float('%sE%s' % (mantissa, exponent))
if fmt in ('string16', 11):
assert len(data) == 16
return data.decode()
raise KeyError('unknown format argument for parsing', data, fmt)
def encode_data(value, fmt: str) -> bytes:
if fmt in ('boolean_old', 0):
assert value in (True, False, 0, 1)
return 6*b'1' if value else 6*b'0'
if fmt in ('u_integer', 1):
assert value < 1000000 and value >= 0
return b'%06u' % value
if fmt in ('u_real', 2):
assert value >= 0 and value < 10000
return (b'%07.2f' % value).replace(b'.', b'')
if fmt in ('string', 4):
assert len(value) == 4
return value.encode('ascii')
if fmt in ('boolean_new', 6):
assert value in (True, False, 0, 1)
return b'1' if value else b'0'
if fmt in ('u_short_int', 7):
assert value >= 0 and value < 1000
return b'%03u' % value
if fmt in ('u_expo_new', 10):
assert value >= 0
str_repr = (b'%.3E' % value)
mantissa, exponent = str_repr.replace(b'.', b'').split(b'E')
exponent = int(exponent) + 20
assert exponent >= 0 and exponent < 100
return mantissa + b'%02u' % exponent
if fmt in ('string16', 11):
assert len(value) == 16
return value.encode('ascii')
raise KeyError('unknown format argument for parsing', value, fmt)
def parse_msg(msg: bytes, fmt: str):
addr = msg[:3]
logger.debug('parsing message from %r', addr)
action = msg[3:5]
param_num = int(msg[5:8])
data_len = int(msg[8:10])
data = msg[10:10+data_len]
expected_check_sum = calc_checksum(msg[:10+data_len])
check_sum = msg[10+data_len:10+data_len+3]
if expected_check_sum != check_sum:
raise CheckSumError(msg)
return action, param_num, parse_data(data, fmt)
class SmartTest:
"""Talk to PfeifferVacuum leak tester via serial interface (USB serial adapter).
Currently only get_leakrate_mbarls is implemented as a method but you can easily extend this to other get methods using
the parameter number and the value format from the manual. For setting something you need to implement set method with the action b'10'
"""
def __init__(self, device: str, address: int):
assert address < 1000
self.device = device
self.address = b'%03i' % address
def query(self, param_number: int, fmt):
param_b = b'%03i' % param_number
msg = self.address + b'00' + param_b + b'02' + b'=?'
query = to_valid_message(msg)
with serial.Serial(self.device) as ser:
ser.write(query)
answer = ser.read_until(b'\r')
action, param_num, value = parse_msg(answer, fmt)
logger.info('received: %r', (action, param_num, value))
assert action == b'10'
assert param_num == param_number, "%u != %u" % (param_num, param_number)
return value
def get_leakrate_mbarls(self) -> float:
return self.query(670, 'u_expo_new')
def periodically_log_leak_rate(filename, period, device, address):
leak_detector = SmartTest(device, address)
csv_logger = CsvLogger(filename, ['Time', 'Leakrate(mbar x l/s)'])
while True:
leakrate = leak_detector.get_leakrate_mbarls()
current_time = datetime.now()
csv_logger.write('%s' % current_time, '%.4e' % leakrate)
print('Current leakrate %.4e pausing for %f sec.' % (leakrate, period), end='\r')
time.sleep(period)
default_prefix = '%Y_%m_%d_%H_%M_%S_'
parser = argparse.ArgumentParser(description='Periodically log the leak rate to the given file as tab-separated values.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--filename', default='smart_test_fast.log', help='Filename of log file. By default prefixed by a date/time string.')
parser.add_argument('--period', default=1., type=float, help='Wait time between read-outs')
parser.add_argument('--device', default='/dev/ttyUSB0', help='Device path')
parser.add_argument('--address', default=1, type=int, help='See SmartTester settings')
parser.add_argument('--no-date-prefix', action='store_true', help='Omit the default %s prefix' % default_prefix.replace('%', '%%'))
def main():
args = vars(parser.parse_args())
filename = args.pop('filename')
if not args.pop('no_date_prefix'):
filename = datetime.now().strftime(default_prefix) + filename
periodically_log_leak_rate(filename=filename, **args)
if __name__ == '__main__':
main()
import sys
import csv
import os.path
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via input() and return their answer.
......@@ -30,4 +32,31 @@ def query_yes_no(question, default="yes"):
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
\ No newline at end of file
"(or 'y' or 'n').\n")
class CsvLogger:
"""Logger that only open file for writing. Use pandas.read_csv to read csv files"""
def __init__(self, filename: str, fieldnames: list, dialect='excel-tab', append=False):
self.dialect = csv.get_dialect(dialect)
self.filename = os.path.abspath(filename)
self.fieldnames = fieldnames
if os.path.exists(filename):
if not append:
raise FileExistsError
# validate file has the correct header
with open(filename, 'r') as file:
reader = csv.DictReader(file, dialect=dialect)
if reader.fieldnames != fieldnames:
raise RuntimeError("Existing file has differing fieldnames", reader.fieldnames, fieldnames)
else:
with open(self.filename, 'x') as file:
csv.DictWriter(file, fieldnames=self.fieldnames, dialect=self.dialect).writeheader()
def write(self, *args):
with open(self.filename, 'a+') as file:
csv.DictWriter(file, fieldnames=self.fieldnames, dialect=self.dialect).writerow(dict(zip(self.fieldnames, args)))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment