Skip to content
Snippets Groups Projects
Select Git revision
  • 15490e1fe1468a38aad4c7c1bb54af30ed41a000
  • master default protected
  • updated_emam2middleware
  • patch-1
  • corrected_get_torcs_image
  • develop
  • multi_variant_model
7 results

torcsclient.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Agilent_NetworkAnalyzer.py 9.37 KiB
    #!/usr/bin/env python
    
    from VISA_Driver import VISA_Driver
    import numpy as np
    import time
    
    __version__ = "0.1.0"
    
    class Error(Exception):
        pass
    
    class Driver(VISA_Driver):
        """ This class implements the Agilent 5230 PNA driver"""
    
        def performOpen(self, options={}):
            """Perform the operation of opening the instrument connection"""
            # init meas param dict
            self.dMeasParam = {}
            # calling the generic VISA open to make sure we have a connection
            VISA_Driver.performOpen(self, options=options)
            # do perform get value for acquisition mode
    
    
        def performSetValue(self, quant, value, sweepRate=0.0, options={}):
            """Perform the Set Value instrument operation. This function should
            return the actual value set by the instrument"""
            # update visa commands for triggers
            if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled',
                              'S22 - Enabled'):
                if self.getModel() in ('E5071C',):
                    # new trace handling, use trace numbers, set all at once
                    lParam = ['S11', 'S21', 'S12', 'S22']
                    dParamValue = dict()
                    for param in lParam:
                        dParamValue[param] = self.getValue('%s - Enabled' % param)
                    dParamValue[quant.name[:3]] = value
                    # add parameters, if enabled
                    self.dMeasParam = dict()
                    for (param, enabled) in dParamValue.items():
                        if enabled:
                            nParam = len(self.dMeasParam)+1
                            self.writeAndLog(":CALC:PAR%d:DEF %s" %
                                             (nParam, param))
                            self.dMeasParam[param] = nParam
                    # set number of visible traces
                    self.writeAndLog(":CALC:PAR:COUN %d" % len(self.dMeasParam))
                else:
                    # get updated list of measurements in use
                    self.getActiveMeasurements()
                    param = quant.name[:3]
                    # old-type handling of traces
                    if param in self.dMeasParam:
                        # clear old measurements for this parameter
                        for name in self.dMeasParam[param]:
                            self.writeAndLog("CALC:PAR:DEL '%s'" % name)
                    # create new measurement, if enabled is true
                    if value:
                        newName = 'LabC_%s' % param
                        self.writeAndLog("CALC:PAR:EXT '%s','%s'" % (newName, param))
                        # show on PNA screen
                        iTrace = 1 + ['S11', 'S21', 'S12', 'S22'].index(param)
        #                sPrev = self.askAndLog('DISP:WIND:CAT?')
        #                if sPrev.find('EMPTY')>0:
        #                    # no previous traces
        #                    iTrace = 1
        #                else:
        #                    # previous traces, add new
        #                    lTrace = sPrev[1:-1].split(',')
        #                    iTrace = int(lTrace[-1]) + 1
                        self.writeAndLog("DISP:WIND:TRAC%d:FEED '%s'" % (iTrace, newName))
                        # add to dict with list of measurements
                        self.dMeasParam[param] = [newName]
            elif quant.name in ('Wait for new trace',):
                # do nothing
                pass
            else:
                # run standard VISA case 
                value = VISA_Driver.performSetValue(self, quant, value, sweepRate, options)
            return value
    
    
        def performGetValue(self, quant, options={}):
            """Perform the Get Value instrument operation"""
            # check type of quantity
            if quant.name in ('S11 - Enabled', 'S21 - Enabled', 'S12 - Enabled',
                              'S22 - Enabled'):
                # update list of channels in use
                self.getActiveMeasurements()
                # get selected parameter
                param = quant.name[:3]
                value = (param in self.dMeasParam)
            elif quant.name in ('S11', 'S21', 'S12', 'S22'):
                # check if channel is on
                if quant.name not in self.dMeasParam:
                    # get active measurements again, in case they changed
                    self.getActiveMeasurements()
                if quant.name in self.dMeasParam:
                    if self.getModel() in ('E5071C',):
                        # new trace handling, use trace numbers
                        self.writeAndLog("CALC:PAR%d:SEL" % self.dMeasParam[quant.name])
                    else:
                        # old parameter handing, select parameter (use last in list)
                        sName = self.dMeasParam[quant.name][-1]
                        self.writeAndLog("CALC:PAR:SEL '%s'" % sName)
                    # if not in continous mode, trig from computer
                    bWaitTrace = self.getValue('Wait for new trace')
                    bAverage = self.getValue('Average')
                    duration = (float(self.askAndLog("SENS:SWE:TIME?"))
                    * (self.getValue('# of averages') if bAverage else 1))
                    # wait for trace, either in averaging or normal mode
                    if bWaitTrace:
                        self.writeAndLog(':ABOR;:TRIG:SOUR BUS;:INIT:CONT OFF;:INIT:IMM;')
                        if bAverage:
                            self.writeAndLog(':TRIG:AVER ON')
                        self.writeAndLog(':TRIG:SING;')
                        self.writeAndLog('*OPC')
                        # wait some time before first check
                        self.wait(0.03)
                        bDone = False
                        start_time = time.time()
    
                        while (not bDone) and (not self.isStopped()):
                            # check if done
                            stb = int(self.askAndLog('*ESR?'))
                            bDone = (stb & 1) > 0
                            if not bDone:
                                #report progress
                                self.reportProgress((time.time()-start_time)/duration)
                                self.wait(0.1)
                        # if stopped, don't get data
                        if self.isStopped():
                            self.writeAndLog('*CLS;:TRIG:SOUR INT;:TRIG:AVER OFF;:ABOR;')
                            return []
                    # get data as float32, convert to numpy array
                    if self.getModel() in ('E5071C',):
                        # new trace handling, use trace numbers
                        self.write(':FORM:DATA REAL32;:CALC:SEL:DATA:SDAT?', bCheckError=False)
                    else:
                        # old parameter handing
                        self.write(':FORM REAL,32;CALC:DATA? SDATA', bCheckError=False)
                    sData = self.read(ignore_termination=True)
                    if bWaitTrace:
                        self.writeAndLog(':TRIG:SOUR INT;:TRIG:AVER OFF;:ABOR;')
    
                    # strip header to find # of points
                    i0 = sData.find(b'#')
                    nDig = int(sData[i0+1:i0+2])
                    nByte = int(sData[i0+2:i0+2+nDig])
                    nData = int(nByte/4)
                    nPts = int(nData/2)
                    # get data to numpy array
                    vData = np.frombuffer(sData[(i0+2+nDig):(i0+2+nDig+nByte)], 
                                          dtype='>f', count=nData)
                    # data is in I0,Q0,I1,Q1,I2,Q2,.. format, convert to complex
                    mC = vData.reshape((nPts,2))
                    vComplex = mC[:,0] + 1j*mC[:,1]
                    # get start/stop frequencies
                    startFreq = self.readValueFromOther('Start frequency')
                    stopFreq = self.readValueFromOther('Stop frequency')
                    sweepType = self.readValueFromOther('Sweep type')
                    # if log scale, take log of start/stop frequencies
                    logX = (sweepType == 'Log')
                    value = quant.getTraceDict(vComplex, x0=startFreq, x1=stopFreq,
                                               logX=logX)
                else:
                    # not enabled, return empty array
                    value = quant.getTraceDict([])
            elif quant.name in ('Wait for new trace',):
                # do nothing, return local value
                value = quant.getValue()
            else:
                # for all other cases, call VISA driver
                value = VISA_Driver.performGetValue(self, quant, options)
            return value
            
    
        def getActiveMeasurements(self):
            """Retrieve and a list of measurement/parameters currently active"""
            # proceed depending on model
            if self.getModel() in ('E5071C',):
                # in this case, meas param is just a trace number
                self.dMeasParam = {}
                # get number or traces
                nTrace = int(self.askAndLog(":CALC:PAR:COUN?"))
                # get active trace names, one by one
                for n in range(nTrace):
                    sParam = self.askAndLog(":CALC:PAR%d:DEF?" % (n+1))
                    self.dMeasParam[sParam] = (n+1)
            else:
                sAll = self.askAndLog("CALC:PAR:CAT:EXT?")
                # strip "-characters
                sAll = sAll[1:-1]
                # parse list, format is channel, parameter, ...
                self.dMeasParam = {}
                lAll = sAll.split(',')
                nMeas = len(lAll)//2
                for n in range(nMeas):
                    sName = lAll[2*n] 
                    sParam = lAll[2*n + 1]
                    if sParam not in self.dMeasParam:
                        # create list with current name
                        self.dMeasParam[sParam] = [sName,]
                    else:
                        # add to existing list
                        self.dMeasParam[sParam].append(sName)
    
    
    
    if __name__ == '__main__':
        pass