Skip to content
Snippets Groups Projects
Commit 342d7451 authored by Simon Sebastian Humpohl's avatar Simon Sebastian Humpohl
Browse files

Add Tabor provided TEWXAwg class

parent b20ab451
No related branches found
No related tags found
No related merge requests found
...@@ -3,8 +3,8 @@ from distutils.core import setup ...@@ -3,8 +3,8 @@ from distutils.core import setup
setup( setup(
name='pyte', name='pyte',
version='', version='',
packages=['pyte'], packages=['pyte','teawg'],
package_dir={'pyte': 'src'}, package_dir={'pyte': 'pyte', 'teawg': 'teawg'},
url='', url='',
license='', license='',
author='Tabor electronics', author='Tabor electronics',
......
'''Tabor-Electronics AWG-Instrument Controller
'''
# import sys
import socket
import struct
import copy
import numpy as np
import warnings
import pyte
# WX2184 Properties
_wx2184_properties = {
'model_name': 'WX2184', # the model name
'num_parts': 2, # number of instrument parts
'chan_per_part': 2, # number of channels per part
'seg_quantum': 16, # segment-length quantum
'min_seg_len': 192, # minimal segment length
'max_arb_mem': 32E6, # maximal arbitrary-memory (points per channel)
'min_dac_val': 0, # minimal DAC value
'max_dac_val': 2 ** 14 - 1, # maximal DAC value
'max_num_segs': 32E+3, # maximal number of segments
'max_seq_len': 48 * 1024 - 2, # maximal sequencer-table length (# rows)
'min_seq_len': 3, # minimal sequencer-table length (# rows)
'max_num_seq': 1000, # maximal number of sequencer-table
'max_aseq_len': 48 * 1024 - 2, # maximal advanced-sequencer table length
'min_aseq_len': 2, # minimal advanced-sequencer table length
'min_sclk': 75e6, # minimal sampling-rate (samples/seconds)
'max_sclk': 2300e6, # maximal sampling-rate (samples/seconds)
'digital_support': False, # is digital-wave supported?
}
# WX1284 Definitions
_wx1284_properties = {
'model_name': 'WX1284', # the model name
'num_parts': 2, # number of instrument parts
'chan_per_part': 2, # number of channels per part
'seg_quantum': 16, # segment-length quantum
'min_seg_len': 192, # minimal segment length
'max_arb_mem': 32E6, # maximal arbitrary-memory (points per channel)
'min_dac_val': 0, # minimal DAC value
'max_dac_val': 2 ** 14 - 1, # maximal DAC value
'max_num_segs': 32E+3, # maximal number of segments
'max_seq_len': 48 * 1024 - 2, # maximal sequencer-table length (# rows)
'min_seq_len': 3, # minimal sequencer-table length (# rows)
'max_num_seq': 1000, # maximal number of sequencer-table
'max_aseq_len': 48 * 1024 - 2, # maximal advanced-sequencer table length
'min_aseq_len': 2, # minimal advanced-sequencer table length
'min_sclk': 75e6, # minimal sampling-rate (samples/seconds)
'max_sclk': 1250e6, # maximal sampling-rate (samples/seconds)
'digital_support': False, # is digital-wave supported?
}
# dictionary of supported-models' properties
model_properties_dict = {
'WX2184': _wx2184_properties,
'WX2184C': _wx2184_properties,
'WX1284': _wx2184_properties,
'WX1284C': _wx2184_properties,
}
def get_device_properties(idn_str, opt_str):
'''Get the device-properties dictionary according to its *IDN? and *OPT?
:param idn_str: the instrument's answer to '*IDN?' query.
:param opt_str: the instrument's answer to '*OPT?' query.
:returns: dictionary of the device properties.
'''
dev_props = None
idn_parts = idn_str.split(',')
if len(idn_parts) == 4 and idn_parts[1] in model_properties_dict:
model_name = idn_parts[1]
d = model_properties_dict[model_name]
dev_props = copy.deepcopy(d)
dev_props['model_name'] = model_name
if model_name in ('WX2184', 'WX2184C', 'WX1284', 'WX1284C'):
dev_props['max_arb_mem'] = int(opt_str[2:4]) * 1E6
elif opt_str.startswith('1M', 1):
dev_props['max_arb_mem'] = 1E6
elif opt_str.startswith('2M', 1):
dev_props['max_arb_mem'] = 2E6
elif opt_str.startswith('8M', 1):
dev_props['max_arb_mem'] = 8E6
elif opt_str.startswith('16M', 1):
dev_props['max_arb_mem'] = 16E6
elif opt_str.startswith('32M', 1):
dev_props['max_arb_mem'] = 32E6
elif opt_str.startswith('64M', 1):
dev_props['max_arb_mem'] = 64E6
elif opt_str.startswith('512K', 1):
dev_props['max_arb_mem'] = 512E3
if opt_str.endswith('D'):
dev_props['digital_support'] = True
return dev_props
class TEWXAwg(object):
'''Tabor-Electronics WX-Instrument Controller (Without VISA-NI)'''
def __init__(self, instr_addr=None, paranoia_level=1):
''' Initialize this `TEWWAwg` instance.
The given `instr_addr` can be either
- An empty string or None (meaning no address)
- IP-Address in digits and dots format (e.g. '192.168.0.170')
- NI-VISA Resource Name (e.g. 'TCPIP:192.168.0.170::5025::SOCKET')
If it is not None, then communication is opened.
:param instr_addr: the instrument address
:param paranoia_level: the `paranoia_level` (0,1 or 2)
'''
self._model_name = ''
self._visa_inst = None
self._dev_props = None
self._instr_addr = None
self._paranoia_level = int(paranoia_level)
self._set_instr_address(instr_addr)
if self._instr_addr is not None:
self.open()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def open(self, instr_addr=None):
'''Open/Reopen Connection
The given `instr_addr` can be either
- An empty string or None (meaning no address)
- IP-Address in digits and dots format (e.g. '192.168.0.170')
- NI-VISA Resource Name (e.g. 'TCPIP:192.168.0.170::5025::SOCKET')
Note: If `instr_addr` is None, then the previous value is used.
:param instr_addr: the instrument's address
'''
self.close()
self._dev_props = None
if instr_addr is not None:
self._set_instr_address(instr_addr)
if self._instr_addr is not None:
self._visa_inst = pyte.open_session(self._instr_addr, extra_init=True)
if self._visa_inst is not None:
idn = self._visa_inst.ask('*IDN?')
opt = self._visa_inst.ask('*OPT?')
self._dev_props = get_device_properties(idn, opt)
if self._dev_props is None and self._paranoia_level >= 2:
warn_msg = 'unsupported model: {0}.'.format(idn)
warnings.warn(warn_msg)
self._model_name = self.get_dev_property('model_name', '')
def close(self):
'''Close Connection'''
if self._visa_inst is not None:
try:
self._visa_inst.close()
except:
pass
self._visa_inst = None
@property
def visa_inst(self):
'''Get the VISA-Instrument'''
return self._visa_inst
@property
def instr_address(self):
'''Get the instrument address '''
return self._instr_addr
@property
def dev_properties(self):
'''Get dictionary of the device properties '''
return self._dev_props
def get_dev_property(self, property_name, default_value=None):
'''Get the value of the specified device-property
:param property_name: the property name.
:param default_value: default value (if the specified property is missing)
:returns: the specified property, or the default-value if it's not defined.
'''
if self._dev_props is not None and property_name in self._dev_props:
return self._dev_props[property_name]
return default_value
@property
def model_name(self):
'''Get the model name '''
return self._model_name
@property
def paranoia_level(self):
'''Get the (default) paranoia-level
The paranoia-level indicates whether and which additional query
should be sent after sending SCPI-Command to the instrument:
- 0: do not send any additional query
- 1: send '*OPC?' (recommended).
- 2: send ':SYST:ERR?' and validate the response (for debug).
'''
return self._paranoia_level
@paranoia_level.setter
def paranoia_level(self, value):
'''Set the (default) paranoia-level (0, 1 or 2)'''
self._paranoia_level = int(value)
def send_cmd(self, cmd_str, paranoia_level=None):
'''Send the given command to the instrument
:param cmd_str: the command string (SCPI statement).
:param paranoia_level: the paranoia-level (overrides the default one)
'''
if paranoia_level is None:
paranoia_level = self._paranoia_level
pyte.send_cmd(self._visa_inst, cmd_str, paranoia_level)
def send_query(self, query_str):
'''Send the given query to the instrument and read the response
:param query_str: the query string (SCPI statement).
:returns: the instrument's response.
'''
return self._visa_inst.ask(query_str)
def read_resp(self):
'''Read response from the instrument. '''
return self._visa_inst.read()
def send_binary_data(self, pref, bin_dat, paranoia_level=None):
'''Send the given binary-data to the instrument.
:param pref: binary-data header prefix (e.g. ':TRAC:DATA')
:param bin_dat: the binary-data to send (a `numpy.ndarray`)
:param paranoia_level: the paranoia-level (overrides the default one)
'''
if paranoia_level is None:
paranoia_level = self._paranoia_level
pyte.download_binary_data(self._visa_inst, pref, bin_dat, bin_dat.nbytes, paranoia_level)
def send_binary_file(self, file_path, pref, offset=0, data_size=None, paranoia_level=None):
"""Load binary data from the specified file and send it to instrument.
Notes:
1. The caller needs not add the binary-data header (#<data-length>)
2. The header-prefix, `pref`, can be empty string or `None`
:param file_path: the file path.
:param pref: the binary data header prefix (e.g. ':TRACe:DATA').
:param offset: starting-offset in the file (in bytes).
:param data_size: data-size in bytes (`None` means all)
:param paranoia_level: paranoia-level (overrides the default one)
"""
if paranoia_level is None:
paranoia_level = self._paranoia_level
pyte.download_binary_file(self._visa_inst, pref, file_path, offset, data_size, paranoia_level)
def send_arbcon_wav_file(self, file_path, pref=':TRAC:DATA', bits_per_point=14, paranoia_level=None):
"""Load wave data from binary wav file created by the ArbConnection, normalize and it to instrument.
:param file_path: the file path.
:param pref: the header prefix (e.g. ':TRACe:DATA').
:param bits_per_point: number of bits per wave-point.
:param paranoia_level: paranoia-level (overrides the default one)
"""
if paranoia_level is None:
paranoia_level = self._paranoia_level
pyte.download_arbcon_wav_file(self._visa_inst, file_path, pref, bits_per_point, paranoia_level)
def download_segment_lengths(self, seg_len_list, pref=':SEGM:DATA', paranoia_level=None):
'''Download Segments-Lengths Table to Instrument
:param seg_len_list: the list of segments-lengths.
:param pref: the binary-data-header prefix.
:param paranoia_level: the paranoia-level (overrides the default one)
Example:
The fastest way to download multiple segments to the instrument
is to download the wave-data of all the segments, including the
segment-prefixes (16 idle-points) of all segments except the 1st,
into segment 1 (pseudo segment), and afterward download the
appropriate segment-lengths. Using the SEGM:DATA command this will
allow to slice the pseudo segment into the corresponding list of segments.
>>> with TEWXAwg('192.168.0.170') as inst:
>>> # Select segment 1:
>>> inst.send_cmd(':TRACe:SELect 1')
>>>
>>> # Download the wave-data of all segments:
>>> inst.send_binary_data(':TRACe:DATA', wave_data)
>>>
>>> # Download the appropriate segment-lengths list:
>>> seg_lengths = [ 1024, 1024, 384, 4096, 8192 ]
>>> inst.download_segment_lengths(seg_lengths)
'''
if isinstance(seg_len_list, np.ndarray):
if seg_len_list.ndim != 1:
seg_len_list = seg_len_list.flatten()
if seg_len_list.dtype != np.uint32:
seg_len_list = np.asarray(seg_len_list, dtype=np.uint32)
else:
seg_len_list = np.asarray(seg_len_list, dtype=np.uint32)
if seg_len_list.ndim != 1:
seg_len_list = seg_len_list.flatten()
self.send_binary_data(pref, seg_len_list, paranoia_level=paranoia_level)
def download_sequencer_table(self, seq_table, pref=':SEQ:DATA', paranoia_level=None):
'''Download Sequencer-Table to Instrument
The sequencer-table, `seq_table`, is a list of 3-tuples
of the form: (<repeats>, <segment no.>, <jump-flag>)
:param seq_table: the sequencer-table (list of 3-tuples)
:param pref: the binary-data-header prefix.
:param paranoia_level: the paranoia-level (overrides the default one)
Example:
>>> # Create Sequencer-Table:
>>> repeats = [ 1, 1, 100, 4, 1 ]
>>> seg_nb = [ 2, 3, 5, 1, 4 ]
>>> jump = [ 0, 0, 1, 0, 0 ]
>>> sequencer_table = zip(repeats, seg_nb, jump)
>>>
>>> # Select sequencer no. 1, and write its table:
>>> with TEWXAwg('192.168.0.170') as inst:
>>> inst.send_cmd(':SEQ:SELect 1')
>>> inst.download_sequencer_table(sequencer_table)
'''
tbl_len = len(seq_table)
s = struct.Struct('< L H B x')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
for n in range(tbl_len):
repeats, seg_nb, jump_flag = seq_table[n]
s.pack_into(m, n * s_size, int(repeats), int(seg_nb), int(jump_flag))
self.send_binary_data(pref, m, paranoia_level=paranoia_level)
def download_adv_seq_table(self, seq_table, pref=':ASEQ:DATA', paranoia_level=None):
'''Download Advanced-Sequencer-Table to Instrument
The sequencer-table, `seq_table`, is a list of 3-tuples
of the form: (<repeats>, <sequence no.>, <jump-flag>)
:param seq_table: the sequencer-table (list of 3-tuples)
:param pref: the binary-data-header prefix.
:param paranoia_level: the paranoia-level (overrides the default one)
Example:
>>> # Create advanced-sequencer table:
>>> repeats = [ 1, 1, 100, 4, 1 ]
>>> seq_nb = [ 2, 3, 5, 1, 4 ]
>>> jump = [ 0, 0, 1, 0, 0 ]
>>> adv_sequencer_table = zip(repeats, seq_nb, jump)
>>>
>>> # Download it to instrument
>>> with TEWXAwg('192.168.0.170') as inst:
>>> inst.download_adv_seq_table(adv_sequencer_table)
'''
tbl_len = len(seq_table)
s = struct.Struct('< L H B x')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
for n in range(tbl_len):
repeats, seg_nb, jump_flag = seq_table[n]
s.pack_into(m, n * s_size, int(repeats), int(seg_nb), int(jump_flag))
self.send_binary_data(pref, m, paranoia_level=None)
def download_fast_pattern_table(self, patt_table, pref=':PATT:COMP:FAST:DATA', paranoia_level=None):
'''Download Fast (Piecewise-flat) Pulse-Pattern Table to Instrument
The pattern-table, `patt_table`, is a list of 2-tuples
of the form: (<voltage-level (volt)>, <dwell-time (sec)>)
:param patt_table: the pattern-table (list of 2-tuples)
:param pref: the binary-data-header prefix.
:param paranoia_level: the paranoia-level (overrides the default one)
Note:
In order to avoid Settings-Conflict make sure you can find
a valid sampling-rate, `sclk`, such that the length in points
of each dwell-time, `dwell-time*sclk` is integral number, and
the total length in points is divisible by the segment-quantum
(either 16 or 32 depending on the instrument model).
Optionally set the point-time-resolution manually to `1/sclk`.
Example:
>>> inst = TEWXAwg('192.168.0.170')
>>>
>>> # Create fast-pulse pattern table:
>>> volt_levels = [0.0 , 0.1 , 0.5 , 0.1 , -0.1, -0.5, -0.1, -0.05]
>>> dwell_times = [1e-9, 1e-6, 1e-9, 1e-6, 1e-6, 1e-9, 1e-6, 5e-9 ]
>>> pattern_table = zip(volt_levels, dwell_times)
>>>
>>> # Set Function-Mode=Pattern, Pattern-Mode=Composer, Pattern-Type=Fast:
>>> inst.send_cmd(':FUNC:MODE PATT; :PATT:MODE COMP; :PATT:COMP:TRAN:TYPE FAST')
>>>
>>> # Optionally set User-Defined (rather than Auto) point sampling time:
>>> inst.send_cmd(':PATT:COMP:RES:TYPE USER; :PATT:COMP:RES 0.5e-9')
>>>
>>> # Download the pattern-table to instrument:
>>> inst.download_fast_pattern_table(pattern_table)
>>>
>>> inst.close()
'''
tbl_len = len(patt_table)
s = struct.Struct('< f d')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
for n in range(tbl_len):
volt_level, dwel_time = patt_table[n]
volt_level = float(volt_level)
dwel_time = float(dwel_time)
s.pack_into(m, n * s_size, volt_level, dwel_time)
self.send_binary_data(pref, m, paranoia_level=paranoia_level)
def download_linear_pattern_table(self, patt_table, start_level, pref=':PATT:COMP:LIN:DATA', paranoia_level=None):
'''Download Piecewise-Linear Pulse-Pattern to Instrument
The pattern-table, `patt_table`, is a list of 2-tuples
of the form: (<voltage-level (volt)>, <dwell-time (sec)>).
Here the `vlotage-level` is the section's end-level.
The section's start-lavel is the previous-section's end-level.
The argument `start_level` is the first-section's start-level.
:param patt_table: the pattern-table (list of 2-tuples)
:param start_level: the (first-section's) start voltage level.
:param pref: the binary-data-header prefix.
:param paranoia_level: the paranoia-level (overrides the default one)
Note:
In order to avoid Settings-Conflict make sure you can find
a valid sampling-rate, `sclk`, such that the length in points
of each dwell-time, `dwell-time` * `sclk` is integral number, and
the total length in points is divisible by the segment-quantum
(either 16 or 32 depending on the instrument model).
Optionally set the point-time-resolution manually to `1/sclk`.
Example:
>>> inst = TEWXAwg('192.168.0.170')
>>>
>>> # Create fast-pulse pattern table:
>>> start_level = 0.0
>>> volt_levels = [0.1 , 0.1 , 0.5 , 0.1 , -0.1, -0.1, -0.5, -0.1, 0.0 ]
>>> dwel_times = [1e-9, 1e-6, 1e-9, 1e-6, 4e-9, 1e-6, 1e-9, 1e-6, 1e-9 ]
>>> pattern_table = zip(volt_levels, dwel_times)
>>>
>>> # Set Function-Mode=Pattern, Pattern-Mode=Composer, Pattern-Type=Linear:
>>>inst.send_cmd(':FUNC:MODE PATT; :PATT:MODE COMP; :PATT:COMP:TRAN:TYPE LIN')
>>>
>>> # Optionally set User-Defined (rather than Auto) point sampling time:
>>> inst.send_cmd(':PATT:COMP:RES:TYPE USER; :PATT:COMP:RES 0.5e-9')
>>>
>>> # Download the pattern-table to instrument:
>>> inst.download_linear_pattern_table(pattern_table, start_level)
>>>
>>> inst.close()
'''
tbl_len = len(patt_table)
s = struct.Struct('< f d')
s_size = s.size
m = np.empty(s_size * tbl_len, dtype='uint8')
for n in range(tbl_len):
volt_level, dwel_time = patt_table[n]
volt_level = float(volt_level)
dwel_time = float(dwel_time)
s.pack_into(m, n * s_size, volt_level, dwel_time)
if start_level is not None:
start_level = float(start_level)
self.send_cmd(':PATT:COMP:LIN:STARt {0:f}'.format(start_level), paranoia_level=paranoia_level)
self.download_binary_data(pref, m, paranoia_level=paranoia_level)
def build_sine_wave(self, cycle_len, num_cycles=1, phase_degree=0, low_level=0, high_level=2 ** 14 - 1):
'''Build Sine Wave
:param cycle_len: cycle length (in points).
:param num_cycles: number of cycles.
:param phase_degree: starting-phase (in degrees)
:param low_level: the sine low level.
:param high_level: the sine high level.
:returns: `numpy.array` with the wave data (DAC values)
'''
cycle_len = int(cycle_len)
num_cycles = int(num_cycles)
if cycle_len <= 0 or num_cycles <= 0:
return None
dac_min = self.get_dev_property('min_dac_val', 0)
dac_max = self.get_dev_property('max_dac_val', 2 ** 14 - 1)
wav_len = cycle_len * num_cycles
phase = float(phase_degree) * np.pi / 180.0
x = np.linspace(start=phase, stop=phase + 2 * np.pi, num=cycle_len, endpoint=False)
zero_val = (low_level + high_level) / 2.0
amplitude = (high_level - low_level) / 2.0
y = np.sin(x) * amplitude + zero_val
y = np.round(y)
y = np.clip(y, dac_min, dac_max)
y = y.astype(np.uint16)
wav = np.empty(wav_len, dtype=np.uint16)
for n in range(num_cycles):
wav[n * cycle_len: (n + 1) * cycle_len] = y
return wav
def build_triangle_wave(self, cycle_len, num_cycles=1, phase_degree=0, low_level=0, high_level=2 ** 14 - 1):
'''Build Triangle Wave
:param cycle_len: cycle length (in points).
:param num_cycles: number of cycles.
:param phase_degree: starting-phase (in degrees)
:param low_level: the triangle low level.
:param high_level: the triangle high level.
:returns: `numpy.array` with the wave data (DAC values)
'''
cycle_len = int(cycle_len)
num_cycles = int(num_cycles)
if cycle_len <= 0 or num_cycles <= 0:
return None
dac_min = self.get_dev_property('min_dac_val', 0)
dac_max = self.get_dev_property('max_dac_val', 2 ** 14 - 1)
wav_len = cycle_len * num_cycles
phase = float(phase_degree) * np.pi / 180.0
x = np.linspace(start=phase, stop=phase + 2 * np.pi, num=cycle_len, endpoint=False)
zero_val = (low_level + high_level) / 2.0
amplitude = (high_level - low_level) / 2.0
y = np.sin(x)
y = np.arcsin(y) * 2 * amplitude / np.pi + zero_val
y = np.round(y)
y = np.clip(y, dac_min, dac_max)
y = y.astype(np.uint16)
wav = np.empty(wav_len, dtype=np.uint16)
for n in range(num_cycles):
wav[n * cycle_len: (n + 1) * cycle_len] = y
return wav
def build_square_wave(self, cycle_len, num_cycles=1, duty_cycle=50.0, phase_degree=0, low_level=0,
high_level=2 ** 14 - 1):
'''Build Square Wave
:param cycle_len: cycle length (in points).
:param num_cycles: number of cycles.
:param duty_cycle: duty-cycle (between 0% and 100%)
:param phase_degree: starting-phase (in degrees)
:param low_level: the triangle low level.
:param high_level: the triangle high level.
:returns: `numpy.array` with the wave data (DAC values)
'''
cycle_len = int(cycle_len)
num_cycles = int(num_cycles)
if cycle_len <= 0 or num_cycles <= 0:
return None
dac_min = self.get_dev_property('min_dac_val', 0)
dac_max = self.get_dev_property('max_dac_val', 2 ** 14 - 1)
wav_len = cycle_len * num_cycles
duty_cycle = np.clip(duty_cycle, 0.0, 100.0)
low_level = np.clip(low_level, dac_min, dac_max)
high_level = np.clip(high_level, dac_min, dac_max)
phase = float(phase_degree) * np.pi / 180.0
x = np.linspace(start=phase, stop=phase + 2 * np.pi, num=cycle_len, endpoint=False)
x = x <= 2 * np.pi * duty_cycle / 100.0
y = np.full(x.shape, low_level)
y[x] = high_level
y = y.astype(np.uint16)
wav = np.empty(wav_len, dtype=np.uint16)
for n in range(num_cycles):
wav[n * cycle_len: (n + 1) * cycle_len] = y
return wav
@staticmethod
def make_combined_wave(wav1, wav2, dest_array, dest_array_offset=0, add_idle_pts=False, quantum=16):
'''Make 2-channels combined wave from the 2 given waves
The destination-array, `dest_array`, is either a `numpy.array` with `dtype=uint16`, or `None`.
If it is `None` then only the next destination-array's write-offset offset is calculated.
Each of the given waves, `wav1` and `wav2`, is either a `numpy.array` with `dtype=uint16`, or `None`.
If it is `None`, then the corresponding entries of `dest_array` are not changed.
:param wav1: the DAC values of wave 1 (either `numpy.array` with `dtype=uint16`, or `None`).
:param wav2: the DAC values of wave 2 (either `numpy.array` with `dtype=uint16`, or `None`).
:param dest_array: the destination-array (either `numpy.array` with `dtype=uint16`, or `None`).
:param dest_array_offset: the destination-array's write-offset.
:param add_idle_pts: should add idle-points (segment-prefix)?
:param quantum: the combined-wave quantum (usually 16 points)
:returns: the next destination-array's write-offset.
'''
len1, len2 = 0, 0
if wav1 is not None:
len1 = len(wav1)
if wav2 is not None:
len2 = len(wav2)
wav_len = max(len1, len2)
if 0 == wav_len:
return dest_array_offset
if wav_len % quantum != 0:
wav_len = wav_len + (quantum - wav_len % quantum)
tot_len = 2 * wav_len
if add_idle_pts:
tot_len = tot_len + 2 * quantum
if dest_array is None:
return dest_array_offset + tot_len
dest_len = len(dest_array)
if min(quantum, len2) > 0:
rd_offs = 0
wr_offs = dest_array_offset
if add_idle_pts:
wr_offs = wr_offs + 2 * quantum
while rd_offs < len2 and wr_offs < dest_len:
chunk_len = min((quantum, len2 - rd_offs, dest_len - wr_offs))
dest_array[wr_offs: wr_offs + chunk_len] = wav2[rd_offs: rd_offs + chunk_len]
rd_offs = rd_offs + chunk_len
wr_offs = wr_offs + chunk_len + quantum
if add_idle_pts:
rd_offs = 0
wr_offs = dest_array_offset
chunk_len = min(quantum, dest_len - wr_offs)
if chunk_len > 0:
dest_array[wr_offs: wr_offs + chunk_len] = wav2[0]
if min(quantum, len1) > 0:
rd_offs = 0
wr_offs = dest_array_offset + quantum
if add_idle_pts:
wr_offs = wr_offs + 2 * quantum
while rd_offs < len1 and wr_offs < dest_len:
chunk_len = min((quantum, len1 - rd_offs, dest_len - wr_offs))
dest_array[wr_offs: wr_offs + chunk_len] = wav1[rd_offs: rd_offs + chunk_len]
rd_offs = rd_offs + chunk_len
wr_offs = wr_offs + chunk_len + quantum
if add_idle_pts:
rd_offs = 0
wr_offs = dest_array_offset + quantum
chunk_len = min(quantum, dest_len - wr_offs)
if chunk_len > 0:
dest_array[wr_offs: wr_offs + chunk_len] = wav1[0]
return dest_array_offset + tot_len
def _set_instr_address(self, instr_addr):
'''Set the instrument address
The given `instr_addr` can be either
- An empty string or None (meaning no address)
- IP-Address in digits and dots format (e.g. '192.168.0.170')
- NI-VISA Resource Name (e.g. 'TCPIP:192.168.0.170::5025::SOCKET')
:param instr_addr: the instrument address (string)
'''
self._instr_addr = None
if instr_addr is None or len(instr_addr) == 0:
return
# try to parse it as an IP-Address
try:
packed_ip = socket.inet_aton(instr_addr)
ip_str = socket.inet_ntoa(packed_ip)
self._ip_addr = ip_str
self._instr_addr = "TCPIP::{0}::5025::SOCKET".format(ip_str)
return
except:
pass
self._instr_addr = instr_addr
def wx_istrument_example():
'''Example of use
Connect to WX-Instrument, download 3 segments
and define (simple) sequence based on those 3 segments.
'''
print()
ip_addr = input('Please enter the instrument\'s IP-Address (for example: 192.168.0.199): ')
print()
with TEWXAwg(ip_addr, paranoia_level=2) as inst:
idn = inst.send_query('*IDN?')
print()
'connected to {0}\n'.format(idn)
# reset instrument and clear error-list
inst.send_cmd('*CLS; *RST')
# select active channel
inst.send_cmd(':INST:SEL 1')
# set function-mode: 'USER' (arbitrary-wave)
inst.send_cmd('FUNCtion:MODE USER')
# delete previously defined segments (not really necessary after *RST)
inst.send_cmd(':TRACE:DEL:ALL')
# set sampling-rate
inst.send_cmd(':SOUR:FREQ:RAST 1.0e9')
# ---------------------------------------------------------------------
# download sine-wave with cycle-length of 1024 points to segment 1
# ---------------------------------------------------------------------
print()
print()
'downloading sine-wave with cycle-length of 1024 points to segment 1 ...'
seg_nb = 1
cycle_len = 1024
num_cycles = 1
seg_len = cycle_len * num_cycles
wav_dat = inst.build_sine_wave(cycle_len, num_cycles)
# define the length of segment 1
inst.send_cmd(':TRAC:DEF {0:d},{1:d}'.format(seg_nb, seg_len))
# select segment 1 as the active segment
# (the one to which binary-data is downloaded)
inst.send_cmd(':TRAC:SEL {0:d}'.format(seg_nb))
# download the binary wave data:
inst.send_binary_data(pref=':TRAC:DATA', bin_dat=wav_dat)
# ---------------------------------------------------------------------
# download triangle-wave with cycle-length of 1024 points to segment 2
# ---------------------------------------------------------------------
print()
print()
'downloading triangle-wave with cycle-length of 1024 points to segment 2 ...'
seg_nb = 2
cycle_len = 1024
num_cycles = 1
seg_len = cycle_len * num_cycles
wav_dat = inst.build_triangle_wave(cycle_len, num_cycles)
# define the length of segment 1
inst.send_cmd(':TRAC:DEF {0:d},{1:d}'.format(seg_nb, seg_len))
# select segment 1 as the active segment
# (the one to which binary-data is downloaded)
inst.send_cmd(':TRAC:SEL {0:d}'.format(seg_nb))
# download the binary wave data:
inst.send_binary_data(pref=':TRAC:DATA', bin_dat=wav_dat)
# ---------------------------------------------------------------------
# download square-wave with cycle-length of 1024 points to segment 3
# ---------------------------------------------------------------------
print()
print()
'downloading square-wave with cycle-length of 1024 points to segment 3 ...'
seg_nb = 3
cycle_len = 1024
num_cycles = 1
seg_len = cycle_len * num_cycles
wav_dat = inst.build_square_wave(cycle_len, num_cycles)
# define the length of segment 1
inst.send_cmd(':TRAC:DEF {0:d},{1:d}'.format(seg_nb, seg_len))
# select segment 1 as the active segment
# (the one to which binary-data is downloaded)
inst.send_cmd(':TRAC:SEL {0:d}'.format(seg_nb))
# download the binary wave data:
inst.send_binary_data(pref=':TRAC:DATA', bin_dat=wav_dat)
wav_dat = None # let the garbage-collecteor free it
# ---------------------------------------------------------------------
# define sequence based on those three segments:
# ---------------------------------------------------------------------
print()
print()
'define sequencer based on those 3 segments ..'
# create sequencer table:
seg_num = [2, 1, 2, 3, 1]
repeats = [5, 4, 3, 2, 1]
jump = [0, 0, 0, 0, 0]
seq_table = list(zip(repeats, seg_num, jump))
# select sequencer 1 as the active sequencer (the one that being editted)
inst.send_cmd(':SEQ:SELect 1')
# download the sequencer table:
inst.download_sequencer_table(seq_table)
# set function-mode: 'SEQ' (simple sequencer)
inst.send_cmd('FUNCtion:MODE SEQ')
# turn on the active-channel's output:
inst.send_cmd(':OUTP ON')
syst_err = inst.send_query(':SYST:ERR?')
print()
print()
'End of the example - status: {0}'.format(syst_err)
print()
if __name__ == "__main__":
wx_istrument_example()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment