__author__="Olivier Tache " __date__="23 may 2006" __version__=1 from PyTango import * import time WAIT_TIME=0.3 class pyTangoDevice: """ Class to use easily a Tango device ****************************************** Olivier Tache http://www-drecam.cea.fr/scm ------------------------------------------ DRECAM / SCM / LIONS Direction des Sciences de la Matiere Commissariat a l'Energie Atomique CEA SACLAY 91191 Gif / Yvette Cedex ****************************************** pyTangoDevice version 1 Class to use easily a Tango device ex : >>> test=pyTangoDevice("tango/tangotest/1") Now tango can access easily to the device Read an attribute : >>> test.short_scalar 45 Write an attribute : >>> test.short_scalar_w=3 Execute a command : >>> test.Init() Execute a command with a return value : >>> test.DevLong(1) 1 Get the current state : >>> device.State() PyTango.DevState.ON --- for some use we need a function to access to atributes, so each attribute have a set and get command >>> test.get_double_scalar() 221.70272980813175 >>> test.set_short_scalar_w(3) for READ attribute only --- for some use we need to wait that a device is ready (ON) so I add a function waitStateON() to wait the DevState::ON >>> test.waitStateON() will not work because tangotest state is UNKNOWN """ DeviceName="" DeviceProxy="" State="" Status="" myDict={} def __init__(self,name): """ Class to use easily a Tango device name : device server name """ #initialize attributes #print "a new device with ",str(len(myDict))," attributes" self.myDict={} self.DeviceName=name #name=self.DeviceName self.DeviceProxy=DeviceProxy(name) self.State=self.DeviceProxy.state self.Status=self.DeviceProxy.status """scanning attributes""" for attribute in self.DeviceProxy.get_attribute_list(): if not self.__dict__.has_key(attribute): #check if attribute is already defined self.myDict[attribute]="A" # get method self.__dict__["get_"+attribute]=self._DeviceReadAttr(self.DeviceProxy,attribute).execute # set method for write attribute is_writable=self.DeviceProxy.attribute_query(attribute).writable if (is_writable==AttrWriteType.READ_WRITE) or (is_writable==AttrWriteType.WRITE): self.__dict__["set_"+attribute]=self._DeviceWriteAttr(self.DeviceProxy,attribute).execute """scanning methods""" for method in self.DeviceProxy.command_list_query(): MethodName=method.cmd_name if (not self.myDict.has_key(MethodName))and(not self.__dict__.has_key(MethodName)): #check if attribute self.myDict[MethodName]="M" #setattr(self,MethodName,self._DeviceMethod(self.DeviceProxy,MethodName).execute) self.__dict__[MethodName]=self._DeviceMethod(self.DeviceProxy,MethodName).execute #print "add State() and Status() methods" def __getattr__(self,name): """ call for an attribute 'name' ex : >>> print mydevice.name call the function read_attribute('name').value """ if self.myDict.has_key(name): if self.myDict[name]=='A':#is it an attribute ? return self.DeviceProxy.read_attribute(name).value else: return self.__dict__[name] else: return self.__dict__[name] def __setattr__(self,name,arg): """ pass a value to an atribute name ex : >>> mydevice.name=10.2 check if attribute is writable call the function read_attribute and write_attribute """ if self.myDict.has_key(name): if self.myDict[name]=='A': #check if attribute is writable is_writable=self.DeviceProxy.attribute_query(name).writable if (is_writable==AttrWriteType.READ_WRITE) or (is_writable==AttrWriteType.WRITE): attr=self.DeviceProxy.read_attribute(name) attr.value=arg self.DeviceProxy.write_attribute(attr) else: raise AttributeError,name else: self.__dict__[name]=arg else: #print name+"="+arg self.__dict__[name]=arg def __repr__(self): st="-- Tango Device Binding to : "+self.DeviceName +" --\n" st+="* Added attributes : \n" for a in self.myDict: if self.myDict[a]=="A": st+=" "+ a+", " st+="\n* Added methods : \n" for a in self.myDict: if self.myDict[a]=="M": st+= a+"(), " st+="\n* Added methods : States(), Status(), waitStateON() and get_attribute() for each attribute\n" return st def __str__(self): return self.__repr__() def waitStateON(self): """ function to wait the DevState::ON (DevState.ON) of the device server """ while (self.State()<>DevState.ON): time.sleep(WAIT_TIME) return class _DeviceMethod: """ class to bind a method to pyTango command_inout ex : >>> cmd=_DeviceMethod(deviceproxy,"position") >>> cmd.execute(100.0) iused by pyTangoObject """ MethodName="" DeviceProxy="" def __init__(self,DeviceProxy,name): self.DeviceProxy=DeviceProxy self.MethodName=name def execute(self,*args): return self.DeviceProxy.command_inout(self.MethodName,*args) class _DeviceReadAttr: """ class to bind a attribute to a method ex : >>> motor.position -> motor.get_position() used by pyTangoDevice """ MethodName="" DeviceProxy="" def __init__(self,DeviceProxy,name): self.DeviceProxy=DeviceProxy self.MethodName=name def execute(self): return self.DeviceProxy.read_attribute(self.MethodName).value class _DeviceWriteAttr: """ class to bind a attribute to a method ex : >>> motor.position=10.0 -> motor.set_position(10.0) used by pyTangoDevice """ MethodName="" DeviceProxy="" def __init__(self,DeviceProxy,name): self.DeviceProxy=DeviceProxy self.MethodName=name def execute(self,arg): is_writable=self.DeviceProxy.attribute_query(self.MethodName).writable if (is_writable==AttrWriteType.READ_WRITE) or (is_writable==AttrWriteType.WRITE): attr=self.DeviceProxy.read_attribute(self.MethodName) attr.value=arg self.DeviceProxy.write_attribute(attr) else: raise AttributeError,MethodName if __name__=='__main__': """ for testing """ device="tango/tangotest/1" print "Connecting to "+device+" ..." test=pyTangoDevice(device) # Get the current state : print "current state is ", test.State() # reading attribute print "reading short scalar value : ",test.short_scalar # writing attribute test.short_scalar_w=3 #Execute a command : test.Init() # Execute a command with a return value : long_value=raw_input("Enter a value to send to tangotest : ") print "execute DevLong() method, returning ", test.DevLong(long(long_value)) # each attribute have a set and get command print "get double_scalar attribute : ",test.get_double_scalar() test.set_short_scalar_w(3)