Source code for scanlib.txm_pv

# -*- coding: utf-8 -*-

"""Classes for interactions between the TXM class and the real TXM.

TxmPV
  A descriptor for the process variables used by the microscopes.

"""

__author__ = 'Mark Wolf'
__copyright__ = 'Copyright (c) 2017, UChicago Argonne, LLC.'
__docformat__ = 'restructuredtext en'
__platform__ = 'Unix'
__version__ = '1.6'
__all__ = ['TxmPV', 'permit_required']

import logging
import warnings

from epics import PV as EpicsPV

from . import exceptions_

log = logging.getLogger(__name__)


[docs]def permit_required(real_func, return_value=None): """Decorates a method so it can only open with a permit. This method decorator ensures that the decorated method can only be called on an object that has a shutter permit. If it doesn't, then an exceptions is raised. Parameters ---------- real_func The function or method to decorate. """ def wrapped_func(obj, *args, **kwargs): # Inner function that checks the status of permit if obj.has_permit: ret = real_func(obj, *args, **kwargs) else: msg = "Shutter permit not granted." warnings.warn(msg, RuntimeWarning) ret = None return ret return wrapped_func
[docs]class TxmPV(object): """A descriptor representing a process variable in the EPICS system. This allows accessing process variables as if they were object attributes. If the descriptor owner (ie. TXM) is not attached, this descriptor performs like a regular attribute. Optionally, this can also be done for objects that have no shutter permit. Attributes ---------- put_complete : bool If False, there is a pending put operation. Parameters ---------- pv_name : str The name of the process variable to connect to as defined in the EPICS system. dtype : optional If given, the values returned by `PV.get` will be typecast. Example: ``dtype=int`` will return ``int(PV[pv].get())``. permit_required : bool, optional If truthy, data will only be sent if the owner `has_permit` attribute is true. Reading of process variable is still enabled either way. wait : bool, optional If truthy, setting this descriptor will block until the operation succeeds. as_string : bool, optional If truthy, the string representation of the process variable will be given, otherwise the raw bytes will be returned for character array variables. """ _epicsPV = None put_complete = True def __init__(self, pv_name, dtype=None, permit_required=False, wait=True, as_string=False): # Set default values self._namestring = pv_name self.dtype = dtype self.permit_required = permit_required self.wait = wait self.as_string = as_string
[docs] def epics_PV(self, txm): # Only create a PV if one doesn't exist or the IOC prefix has changed is_cached = (self._epicsPV is not None) if not is_cached: pv_name = self.pv_name(txm) self._epicsPV = EpicsPV(pv_name) return self._epicsPV
[docs] def pv_name(self, txm): """Do string formatting on the pv_name and return the result.""" return self._namestring.format(ioc_prefix=txm.ioc_prefix)
def __get__(self, txm, type=None): # Ask the PV for an updated value if possible pv_name = self.pv_name(txm) result = txm.pv_get(pv_name, as_string=self.as_string) # Convert to correct datatype if given if self.dtype is not None: try: result = self.dtype(result) except TypeError: msg = "Could not cast {} to type {}".format(result, self.dtype) warnings.warn(msg, RuntimeWarning) log.warn(msg) return result def __set__(self, txm, val): pv_name = self.pv_name(txm) log.debug("Setting PV value %s: %s", pv_name, val) self.txm = txm # Check that the TXM has shutter permit if required for this PV if (not self.permit_required) or txm.has_permit: # Set the PV, but only if the TXM has the required permits) was_successful = txm.pv_put(pv_name=pv_name, value=val, wait=self.wait) # Check that setting the new value was successful if not was_successful: msg = "Error waiting on response from PV {}".format(str(self)) log.error(msg) raise exceptions_.PVError(msg) else: # There's a valid TXM but we can't operate this PV msg = "PV {pv} was not set because TXM doesn't have beamline permit." msg = msg.format(pv=pv_name) warnings.warn(msg, RuntimeWarning) log.warning(msg) def __set_name__(self, txm, name): self.name = name def __str__(self): return getattr(self, 'name', self._namestring) def __repr__(self): return "<TxmPV: {}>".format(self._namestring)