Commit 1c40f0ec authored by Simon Sebastian Humpohl's avatar Simon Sebastian Humpohl
Browse files

Update pytabor to current tabor state

parent 5924058f
'''Tabor-Electronics Utilities (for `pyvisa` 1.6 or above).'''
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# =========================================================================
# Copyright (C) 2016 Tabor-Electronics Ltd <http://www.taborelec.com/>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# =========================================================================
'''
pyte16 -- control Tabor-Electronics instruments using `pyvisa` 1.6 or above.
@author: Nadav
@date: 2016-11-23
@license: GPL
@copyright: 2016 Tabor-Electronics Ltd.
@contact: <http://www.taborelec.com/>
'''
from __future__ import print_function
from builtins import input
import sys
import socket
import ctypes
......@@ -10,11 +38,28 @@ import numpy as np
from pyvisa import ResourceManager
import pyvisa.constants as vc
__all__ = ['open_session', 'prompt_msg','make_bin_dat_header', 'get_visa_err_desc', 'write_raw_string',
'write_raw_bin_dat', 'send_cmd', 'build_sine_wave', 'build_triangle_wave', 'build_square_wave',
'download_binary_data', 'download_binary_file', 'download_arbcon_wav_file', 'download_segment_lengths',
'download_sequencer_table', 'download_adv_seq_table', 'download_fast_pattern_table',
'download_linear_pattern_table', 'make_combined_wave', 'PyteException']
__version__ = '1.0.1'
__revision__ = '$Rev: 4238 $'
__docformat__ = 'reStructuredText'
__all__ = [
'open_session',
'send_cmd',
'download_binary_data',
'download_binary_file',
'download_binary_file',
'download_arbcon_wav_file',
'download_segment_lengths',
'download_sequencer_table',
'download_adv_seq_table',
'download_fast_pattern_table',
'download_linear_pattern_table',
'build_sine_wave',
'build_triangle_wave',
'build_square_wave',
'add_markers',
'make_combined_wave',
'PyteException']
class PyteException(Exception):
......@@ -209,7 +254,7 @@ def _select_visa_rsc_name(rsc_manager=None, title=None, interface_name=None):
if len(rsc_descs) != num_rscs:
rsc_descs = ["" for n in range(num_rscs)]
# get resources descriptions:
for n, name in zip(list(range(num_rscs)), rsc_names):
for n, name in zip(range(num_rscs), rsc_names):
vi = None
try:
vi =rsc_manager.open_resource(name)
......@@ -227,7 +272,7 @@ def _select_visa_rsc_name(rsc_manager=None, title=None, interface_name=None):
pass
print("Please choose one of the available devices:")
for n, name, desc in zip(list(range(num_rscs)), rsc_names, rsc_descs):
for n, name, desc in zip(range(num_rscs), rsc_names, rsc_descs):
print(" {0:d}. {1} ({2})".format(n+1, desc, name))
print(" {0:d}. Back to main menu".format(num_rscs+1))
msg = "Please enter your choice [{0:d}:{1:d}]: ".format(1, num_rscs+1)
......@@ -244,10 +289,10 @@ def _select_visa_rsc_name(rsc_manager=None, title=None, interface_name=None):
else:
selected_rsc_name = rsc_names[choice - 1]
break
return selected_rsc_name
def _init_vi_inst(vi, timeout_msec=10000, read_buff_size_bytes=4096, write_buff_size_bytes=4096):
return selected_rsc_name
def _init_vi_inst(vi, timeout_msec=30000, read_buff_size_bytes=4096, write_buff_size_bytes=4096):
'''Initialize the given Instrument VISA Session
:param vi: `pyvisa` instrument.
......@@ -265,12 +310,16 @@ def _init_vi_inst(vi, timeout_msec=10000, read_buff_size_bytes=4096, write_buff_
vi.read_termination = '\n'
vi.write_termination = '\n'
intf_type = vi.get_visa_attribute(vc.VI_ATTR_INTF_TYPE)
if intf_type in (vc.VI_INTF_USB, vc.VI_INTF_GPIB, vc.VI_INTF_TCPIP):
if intf_type in (vc.VI_INTF_USB, vc.VI_INTF_GPIB, vc.VI_INTF_TCPIP, vc.VI_INTF_ASRL):
vi.set_visa_attribute(vc.VI_ATTR_WR_BUF_OPER_MODE, vc.VI_FLUSH_ON_ACCESS)
vi.set_visa_attribute(vc.VI_ATTR_RD_BUF_OPER_MODE, vc.VI_FLUSH_ON_ACCESS)
if intf_type == vc.VI_INTF_TCPIP:
vi.set_visa_attribute(vc.VI_ATTR_TERMCHAR_EN, vc.VI_TRUE) # vc.VI_FALSE
vi.clear()
vi.set_visa_attribute(vc.VI_ATTR_TERMCHAR_EN, vc.VI_TRUE) # vc.VI_FALSE
elif intf_type == vc.VI_INTF_ASRL:
vi.set_visa_attribute(vc.VI_ATTR_ASRL_BAUD, 115200)
vi.set_visa_attribute(vc.VI_ATTR_ASRL_END_OUT, 0)
vi.set_visa_attribute(vc.VI_ATTR_ASRL_END_IN, 2)
vi.clear()
def open_session(resource_name = None, title_msg = None, vi_rsc_mgr = None, extra_init=True):
'''Open VISA Session (optionally prompt for resource name).
......@@ -446,11 +495,19 @@ def send_cmd(vi, cmd_str, paranoia_level=1):
else:
ask_str = ':SYST:ERR?'
syst_err = vi.ask(ask_str)
if not syst_err.startswith('0'):
try:
errnb = int(syst_err.split(',')[0])
except:
errnb = -1
if errnb != 0:
syst_err = syst_err.rstrip()
wrn_msg = 'ERR: "{0}" after CMD: "{1}"'.format(cmd_str, syst_err)
warnings.warn(wrn_msg)
wrn_msg = 'ERR: "{0}" after CMD: "{1}"'.format(syst_err, cmd_str)
_ = vi.ask('*CLS; *OPC?') # clear the error-list
if paranoia_level >= 3:
raise NameError(wrn_msg)
else:
warnings.warn(wrn_msg)
else:
vi.write(cmd_str)
......@@ -462,10 +519,10 @@ def _pre_download_binary_data(vi, bin_dat_size=None):
:returns: the max write-chunk size (in bytes) and the original time-out (in msec)
'''
orig_timeout = vi.timeout
max_chunk_size = 4096
try:
max_chunk_size = vi.write_buff_size if hasattr(vi, 'write_buff_size') else max_chunk_size
max_chunk_size = vi.write_buff_size if hasattr(vi, 'write_buff_size') else 4096
try:
intf_type = vi.get_visa_attribute(vc.VI_ATTR_INTF_TYPE)
if intf_type == vc.VI_INTF_GPIB:
_ = vi.write("*OPC?")
......@@ -562,15 +619,21 @@ def download_binary_data(vi, pref, bin_dat, dat_size, paranoia_level=1):
if not syst_err.startswith('0'):
syst_err = syst_err.rstrip()
wrn_msg = 'ERR: "{0}" after sending binary data (pref="{1}", dat_size={2})'.format(syst_err, pref, dat_size)
warnings.warn(wrn_msg)
_ = vi.ask('*CLS; *OPC?') # clear the error-list
if paranoia_level >= 3:
raise NameError(wrn_msg)
else:
warnings.warn(wrn_msg)
except:
if ret_count >= 0:
ret_count = -1
wrn_msg = 'Error in download_binary_data(pref="{0}", dat_size={1}): \n{2}'.format(pref, dat_size, sys.exc_info())
warnings.warn(wrn_msg)
wrn_msg = 'Error in download_binary_data(pref="{0}", dat_size={1}): \n{2}'.format(pref, dat_size, sys.exc_info())
if paranoia_level >= 3:
raise NameError(wrn_msg)
else:
warnings.warn(wrn_msg)
return ret_count
def download_binary_file(vi, pref, file_path, offset=0, data_size=None, paranoia_level=1):
......@@ -817,15 +880,18 @@ def download_sequencer_table(vi, seq_table, pref=':SEQ:DATA', paranoia_level=1):
>>> pytabor.download_sequencer_table(vi, sequencer_table)
'''
tbl_len = len(seq_table)
try:
tbl_len = len(seq_table)
except:
seq_table = list(seq_table)
tbl_len = len(seq_table)
s = struct.Struct('< L H B x')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
for n in range(tbl_len):
repeats, seg_nb, jump_flag = seq_table[n]
s.pack_into(m, n * s_size, int(repeats), int(seg_nb), int(jump_flag))
s.pack_into(m, n * s_size, np.uint32(repeats), np.uint16(seg_nb), np.uint8(jump_flag))
return download_binary_data(vi, pref, m, m.nbytes, paranoia_level=paranoia_level)
def download_adv_seq_table(vi, adv_seq_table, pref=':ASEQ:DATA', paranoia_level=1):
......@@ -850,15 +916,18 @@ def download_adv_seq_table(vi, adv_seq_table, pref=':ASEQ:DATA', paranoia_level=
>>> # Download it to instrument
>>> pytabor.download_adv_seq_table(vi, adv_sequencer_table)
'''
tbl_len = len(adv_seq_table)
try:
tbl_len = len(adv_seq_table)
except:
adv_seq_table = list(adv_seq_table)
tbl_len = len(adv_seq_table)
s = struct.Struct('< L H B x')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
for n in range(tbl_len):
repeats, seq_nb, jump_flag = adv_seq_table[n]
s.pack_into(m, n * s_size, int(repeats), int(seq_nb), int(jump_flag))
s.pack_into(m, n * s_size, np.uint32(repeats), np.uint16(seq_nb), np.uint8(jump_flag))
return download_binary_data(vi, pref, m, m.nbytes, paranoia_level=paranoia_level)
def download_fast_pattern_table(vi, patt_table, pref=':PATT:COMP:FAST:DATA', paranoia_level=1):
......@@ -901,8 +970,11 @@ def download_fast_pattern_table(vi, patt_table, pref=':PATT:COMP:FAST:DATA', par
>>>
>>> vi.close()
'''
tbl_len = len(patt_table)
try:
tbl_len = len(patt_table)
except:
patt_table = list(patt_table)
tbl_len = len(patt_table)
s = struct.Struct('< f d')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
......@@ -960,8 +1032,11 @@ def download_linear_pattern_table(vi, patt_table, start_level, pref=':PATT:COMP:
>>>
>>> vi.close()
'''
tbl_len = len(patt_table)
try:
tbl_len = len(patt_table)
except:
patt_table = list(patt_table)
tbl_len = len(patt_table)
s = struct.Struct('< f d')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
......@@ -1097,6 +1172,56 @@ def build_square_wave(cycle_len, num_cycles=1, duty_cycle=50.0, phase_degree=0,
return wav
def add_markers(dat_buff, marker_pos, marker_width, marker_bit1, marker_bit2, dat_offs=0, dat_len=None):
"""Add markers bits to the wave-data in the given buffer.
Note that in case of 4-channels devices, the markers bits
are both added to the 1st channel of each channels-pair.
IMPORTANT: This function currently fits only 4-channels devices (WX2184 / WX1284).
:param dat_buff: `numpy` array containing the wave-data (data-type='uint16')
:param marker_pos: the marker start-position within the wave-data (in wave-points)
:param marker_width: the marker width (in wave-points).
:param marker_bit1: the value of 1st marker's bit (zero or one)
:param marker_bit2: the value of 2nd marker's bit (zero or one)
:param dat_offs: the offset of the wave-data within the data-buffer (default: 0).
:param dat_len: the length of the actual wave-data (default: the length of `dat_buff`).
"""
shift_pts = 12
if dat_len is None:
dat_len = len(dat_buff) - dat_offs
if len(dat_buff) > 0 and dat_len > 0 and marker_width > 0:
marker_bits = 0
if marker_bit1:
marker_bits |= 0x4000
if marker_bit2:
marker_bits |= 0x8000
assert(marker_pos % 2 == 0)
assert(marker_width % 2 == 0)
assert(dat_len % 16 == 0 and dat_len >= 16)
seg_pos = (marker_pos + shift_pts) % dat_len
seg_pos = (seg_pos//16)*16 + 8 + (seg_pos%16)//2
while marker_width > 0:
if seg_pos >= dat_len:
seg_pos = 8
buf_index = (dat_offs + seg_pos) % len(dat_buff)
dat_buff[buf_index] &= 0x3fff
dat_buff[buf_index] |= marker_bits
marker_width -= 2
seg_pos += 1
if seg_pos % 16 == 0:
seg_pos += 8
def make_combined_wave(wav1, wav2, dest_array, dest_array_offset=0, add_idle_pts=False, quantum=16):
'''Make 2-channels combined wave from the 2 given waves
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment