Source code for aps_32id.txm

# -*- coding: utf-8 -*-

"""Defines TXM classes for controlling the Transmission X-ray
Microscope at Advanced Photon Source beamline 32-ID-C.

TXM
  A nano-CT transmission X-ray microscope.
MicroCT
  Similar to the nano-CT but for micro-CT.

"""

import time
import math
import logging
import warnings
from contextlib import contextmanager
from collections import namedtuple

import h5py
import tqdm
from epics import PV as EpicsPV, get_pv

from scanlib import TxmPV, permit_required, exceptions_

__author__ = 'Mark Wolf'
__copyright__ = 'Copyright (c) 2017, UChicago Argonne, LLC.'
__docformat__ = 'restructuredtext en'
__platform__ = 'Unix'
__version__ = '1.6'
__all__ = ['NanoTXM',
           'MicroTXM']

DEFAULT_TIMEOUT = 20 # PV timeout in seconds

log = logging.getLogger(__name__)


class PVPromise():
    is_complete = False
    result = None

    def __init__(self, pv_name=""):
        self.pv_name = pv_name
    
    def complete(self, pvname=""):
        self.is_complete = True


############################
# Main TXM Class definition
############################
[docs]class NanoTXM(object): """A class representing the Transmission X-ray Microscope at sector 32-ID-C. Attributes ---------- is_attached : bool Is this computer able to communicate with the instrument. If False, communication methods will be simulated. has_permit : bool Is the instrument authorized to open shutters and change the X-ray source. Could be false for any number of reasons, most likely the beamline is set for hutch B to operate. ioc_prefix : str, optional The prefix to use for the camera's I/O controller when conneting certain PV's. PV descriptor's can then use "{ioc_prefix}" in their PV nam and have it format automatically. use_shutter_A : bool, optional Whether shutter A should be used when getting light. use_shutter_B : bool, optional Whether shutter B should be used when getting light. zp_diameter : float, optional The diameter (in nanometers) of the zone-plate currently installed in the instrument. drn : float, optional The width of the zoneplate's outermost diffraction zone. """ zp_diameter = 180 drn = 60 gap_offset = 0.17 # Added to undulator gap setting pv_queue = None hdf_writer_ready = False tiff_writer_ready = False pg_external_trigger = True shutters_are_open = False E_RANGE = (6.4, 30) # How far can the X-ray energy be changed (in keV) POLL_INTERVAL = 0.01 # How often to check PV's in seconds. # Commonly used flags for PVs SHUTTER_OPEN = 0 SHUTTER_CLOSED = 1 RECURSIVE_FILTER_TYPE = "RecursiveAve" CAPTURE_ENABLED = 1 CAPTURE_DISABLED = 0 FRAME_DATA = 0 FRAME_DARK = 1 FRAME_WHITE = 2 DETECTOR_IDLE = 0 DETECTOR_ACQUIRE = 1 HDF_IDLE = 0 HDF_WRITING = 1 # Process variables # ----------------- # # Detector PV's Cam1_ImageMode = TxmPV('{ioc_prefix}cam1:ImageMode') Cam1_ArrayCallbacks = TxmPV('{ioc_prefix}cam1:ArrayCallbacks') Cam1_AcquirePeriod = TxmPV('{ioc_prefix}cam1:AcquirePeriod') Cam1_FrameRate_on_off = TxmPV('{ioc_prefix}cam1:FrameRateOnOff') Cam1_FrameRate_val = TxmPV('{ioc_prefix}cam1:FrameRateValAbs') Cam1_TriggerMode = TxmPV('{ioc_prefix}cam1:TriggerMode') Cam1_SoftwareTrigger = TxmPV('{ioc_prefix}cam1:SoftwareTrigger') Cam1_AcquireTime = TxmPV('{ioc_prefix}cam1:AcquireTime') Cam1_FrameRateOnOff = TxmPV('{ioc_prefix}cam1:FrameRateOnOff') Cam1_FrameType = TxmPV('{ioc_prefix}cam1:FrameType') Cam1_NumImages = TxmPV('{ioc_prefix}cam1:NumImages') Cam1_Acquire = TxmPV('{ioc_prefix}cam1:Acquire', wait=False) Cam1_Display = TxmPV('{ioc_prefix}image1:EnableCallbacks') Cam1_Status = TxmPV('{ioc_prefix}cam1:DetectorState_RBV', as_string=True) # HDF5 writer PV's HDF1_AutoSave = TxmPV('{ioc_prefix}HDF1:AutoSave') HDF1_DeleteDriverFile = TxmPV('{ioc_prefix}HDF1:DeleteDriverFile') HDF1_EnableCallbacks = TxmPV('{ioc_prefix}HDF1:EnableCallbacks') HDF1_BlockingCallbacks = TxmPV('{ioc_prefix}HDF1:BlockingCallbacks') HDF1_FileWriteMode = TxmPV('{ioc_prefix}HDF1:FileWriteMode') HDF1_NumCapture = TxmPV('{ioc_prefix}HDF1:NumCapture') HDF1_Capture = TxmPV('{ioc_prefix}HDF1:Capture', wait=False) HDF1_Capture_RBV = TxmPV('{ioc_prefix}HDF1:Capture_RBV') HDF1_FileName = TxmPV('{ioc_prefix}HDF1:FileName', dtype=str, as_string=True) HDF1_FullFileName_RBV = TxmPV('{ioc_prefix}HDF1:FullFileName_RBV', dtype=str, as_string=True) HDF1_FileTemplate = TxmPV('{ioc_prefix}HDF1:FileTemplate') HDF1_ArrayPort = TxmPV('{ioc_prefix}HDF1:NDArrayPort') HDF1_NextFile = TxmPV('{ioc_prefix}HDF1:FileNumber') # Tiff writer PV's TIFF1_AutoSave = TxmPV('{ioc_prefix}TIFF1:AutoSave') TIFF1_DeleteDriverFile = TxmPV('{ioc_prefix}TIFF1:DeleteDriverFile') TIFF1_EnableCallbacks = TxmPV('{ioc_prefix}TIFF1:EnableCallbacks') TIFF1_BlockingCallbacks = TxmPV('{ioc_prefix}TIFF1:BlockingCallbacks') TIFF1_FileWriteMode = TxmPV('{ioc_prefix}TIFF1:FileWriteMode') TIFF1_NumCapture = TxmPV('{ioc_prefix}TIFF1:NumCapture') TIFF1_Capture = TxmPV('{ioc_prefix}TIFF1:Capture') TIFF1_Capture_RBV = TxmPV('{ioc_prefix}TIFF1:Capture_RBV') TIFF1_FileName = TxmPV('{ioc_prefix}TIFF1:FileName') TIFF1_FullFileName_RBV = TxmPV('{ioc_prefix}TIFF1:FullFileName_RBV') TIFF1_FileTemplate = TxmPV('{ioc_prefix}TIFF1:FileTemplate') TIFF1_ArrayPort = TxmPV('{ioc_prefix}TIFF1:NDArrayPort') # Motor PV's Motor_SampleX = TxmPV('32idcTXM:nf:c0:m1.VAL', dtype=float) Motor_SampleY = TxmPV('32idcTXM:mxv:c1:m1.VAL', dtype=float) # Professional Instrument air bearing rotary stage Motor_SampleRot = TxmPV('32idcTXM:ens:c1:m1.VAL', dtype=float) # Smaract XZ TXM set Motor_Sample_Top_X = TxmPV('32idcTXM:mcs:c3:m7.VAL', dtype=float) Motor_Sample_Top_Z = TxmPV('32idcTXM:mcs:c3:m8.VAL', dtype=float) # # Mosaic scanning axes # Motor_X_Tile = TxmPV('32idc01:m33.VAL') # Motor_Y_Tile = TxmPV('32idc02:m15.VAL') # Zone plate: zone_plate_x = TxmPV('32idcTXM:mcs:c2:m2.VAL') zone_plate_y = TxmPV('32idc01:m110.VAL') zone_plate_z = TxmPV('32idcTXM:mcs:c2:m3.VAL') # MST2 = vertical axis # pv.Smaract_mode.put(':MST3,100,500,100') Smaract_mode = TxmPV('32idcTXM:mcsAsyn1.AOUT') zone_plate_2_x = TxmPV('32idcTXM:mcs:c0:m3.VAL') zone_plate_2_y = TxmPV('32idcTXM:mcs:c0:m1.VAL') zone_plate_2_z = TxmPV('32idcTXM:mcs:c0:m2.VAL') # CCD motors: CCD_Motor = TxmPV('32idcTXM:mxv:c1:m6.VAL', float) # Shutter PV's ShutterA_Open = TxmPV('32idb:rshtrA:Open', permit_required=True) ShutterA_Close = TxmPV('32idb:rshtrA:Close', permit_required=True) ShutterA_Move_Status = TxmPV('PB:32ID:STA_A_FES_CLSD_PL') ShutterB_Open = TxmPV('32idb:fbShutter:Open.PROC', permit_required=True) ShutterB_Close = TxmPV('32idb:fbShutter:Close.PROC', permit_required=True) ShutterB_Move_Status = TxmPV('PB:32ID:STA_B_SBS_CLSD_PL') ExternalShutter_Trigger = TxmPV('32idcTXM:shutCam:go', permit_required=True) # State 0 = Close, 1 = Open Fast_Shutter_Uniblitz = TxmPV('32idcTXM:uniblitz:control', permit_required=True) # Fly scan PV's for nano-ct TXM using Profession Instrument air-bearing stage Fly_ScanDelta = TxmPV('32idcTXM:PSOFly3:scanDelta') Fly_StartPos = TxmPV('32idcTXM:PSOFly3:startPos') Fly_EndPos = TxmPV('32idcTXM:PSOFly3:endPos') Fly_SlewSpeed = TxmPV('32idcTXM:PSOFly3:slewSpeed') Fly_Taxi = TxmPV('32idcTXM:PSOFly3:taxi') Fly_Run = TxmPV('32idcTXM:PSOFly3:fly') Fly_ScanControl = TxmPV('32idcTXM:PSOFly3:scanControl') Fly_Calc_Projections = TxmPV('32idcTXM:PSOFly3:numTriggers') Theta_Array = TxmPV('32idcTXM:PSOFly3:motorPos.AVAL') Fly_Set_Encoder_Pos = TxmPV('32idcTXM:eFly:EncoderPos') # Theta controls Reset_Theta = TxmPV('32idcTXM:SG_RdCntr:reset.PROC') Proc_Theta = TxmPV('32idcTXM:SG_RdCntr:cVals.PROC') Theta_Array = TxmPV('32idcTXM:eFly:motorPos.AVAL') Theta_Cnt = TxmPV('32idcTXM:SG_RdCntr:aSub.VALB') # Misc PV's Image1_Callbacks = TxmPV('{ioc_prefix}image1:EnableCallbacks') ExternShutterExposure = TxmPV('32idcTXM:shutCam:tExpose') SetSoftGlueForStep = TxmPV('32idcTXM:SG3:MUX2-1_SEL_Signal') # ClearTheta = TxmPV('32idcTXM:recPV:PV1_clear') ExternShutterDelay = TxmPV('32idcTXM:shutCam:tDly') Interferometer = TxmPV('32idcTXM:SG2:UpDnCntr-1_COUNTS_s') Interferometer_Update = TxmPV('32idcTXM:SG2:UpDnCntr-1_COUNTS_SCAN.PROC') Interferometer_Reset = TxmPV('32idcTXM:SG_RdCntr:reset.PROC') Interferometer_Cnt = TxmPV('32idcTXM:SG_RdCntr:aSub.VALB') Interferometer_Arr = TxmPV('32idcTXM:SG_RdCntr:cVals.AA') Interferometer_Proc_Arr = TxmPV('32idcTXM:SG_RdCntr:cVals.PROC') Interferometer_Val = TxmPV('32idcTXM:userAve4.VAL') Interferometer_Mode = TxmPV('32idcTXM:userAve4_mode.VAL') Interferometer_Acquire = TxmPV('32idcTXM:userAve4_acquire.PROC') # Proc1 PV's Proc1_Callbacks = TxmPV('{ioc_prefix}Proc1:EnableCallbacks') Proc1_ArrayPort = TxmPV('{ioc_prefix}Proc1:NDArrayPort') Proc1_Filter_Enable = TxmPV('{ioc_prefix}Proc1:EnableFilter') Proc1_Filter_Type = TxmPV('{ioc_prefix}Proc1:FilterType') Proc1_Num_Filter = TxmPV('{ioc_prefix}Proc1:NumFilter') Proc1_Reset_Filter = TxmPV('{ioc_prefix}Proc1:ResetFilter') Proc1_AutoReset_Filter = TxmPV('{ioc_prefix}Proc1:AutoResetFilter') Proc1_Filter_Callbacks = TxmPV('{ioc_prefix}Proc1:FilterCallbacks') # Energy PV's DCMmvt = TxmPV('32ida:KohzuModeBO.VAL', permit_required=True) GAPputEnergy = TxmPV('32id:ID32us_energy', permit_required=True, wait=False) EnergyWait = TxmPV('ID32us:Busy') DCMputEnergy = TxmPV('32ida:BraggEAO.VAL', dtype=float, permit_required=True) #interlaced Interlaced_PROC = TxmPV('32idcTXM:iFly:interlaceFlySub.PROC') Interlaced_Theta_Arr = TxmPV('32idcTXM:iFly:interlaceFlySub.VALC') Interlaced_Num_Cycles = TxmPV('32idcTXM:iFly:interlaceFlySub.C') Interlaced_Num_Cycles_RBV = TxmPV('32idcTXM:iFly:interlaceFlySub.VALH') Interlaced_Images_Per_Cycle = TxmPV('32idcTXM:iFly:interlaceFlySub.A') Interlaced_Images_Per_Cycle_RBV = TxmPV('32idcTXM:iFly:interlaceFlySub.VALF') Interlaced_Num_Sub_Cycles = TxmPV('32idcTXM:iFly:interlaceFlySub.B') Interlaced_Num_Sub_Cycles_RBV = TxmPV('32idcTXM:iFly:interlaceFlySub.VALG') def __init__(self, has_permit=False, ioc_prefix="32idcPG3:", use_shutter_A=False, use_shutter_B=True): self.has_permit = has_permit self.ioc_prefix = ioc_prefix self.use_shutter_A = use_shutter_A self.use_shutter_B = use_shutter_B
[docs] def pv_get(self, pv_name, *args, **kwargs): """Retrieve the current process variable value. Parameters ---------- *args, **kwargs Extra arguments that get passed to :py:meth:``epics.PV.get`` """ epics_pv = EpicsPV(pv_name) return epics_pv.get(*args, **kwargs)
[docs] def pv_put(self, pv_name, value, wait, *args, **kwargs): """Set the current process variable value. When ``wait=True``, this method becomes closely linked with the concept of deferred PVs. Normally, this method will block until the PV has been set. When inside a :py:meth:``TXM.wait_pvs`` context, this method adds a promise to the queue so the :py:meth:``TXM.wait_pvs`` manager can handle the blocking. When ``wait=False``, this method returns immediately once the value has been sent and does not alter the PV queue. Parameters ---------- wait : bool, optional If true, the method will keep track of when PV has been set. *args, **kwargs Extra arguments that get passed to :py:meth:``epics.PV.get`` """ if self.pv_queue is not None: # Non-blocking, deferred PV waiting promise = PVPromise(pv_name=pv_name) ret = self._pv_put(pv_name, value, wait=False, callback=promise.complete) self.pv_queue.append(promise) else: # Blocking PV waiting ret = self._pv_put(pv_name, value, wait=wait, *args, **kwargs) return ret
def _complete_promise(self, promise): print(promise) def _pv_put(self, pv_name, value, wait, *args, **kwargs): """Retrieves the epics PV and calls its ``put`` method.""" print(pv_name, value, wait) epics_pv = EpicsPV(pv_name) return epics_pv.put(value, wait=wait, *args, **kwargs) @contextmanager def wait_pvs(self, block=True): """Context manager that allows for setting multiple PVs asynchronously. This manager creates an empty queue for PV objects. If blocking, upon exiting the context it waits for all the PV's to finished before moving on. If non-blocking, this basically turns off blocking feature on any TxmPVs that have ``wait=True`` (so use with caution). Arguments --------- block : bool, optional If True, this function will wait for all PVs to finish before continuing. """ # Save old queue to resore it later on old_queue = self.pv_queue # Prepare a queue for holding PV promises self.pv_queue = [] # Return execution to the inner block yield self.pv_queue # Wait for all the PVs to be finished num_promises = len(self.pv_queue) while block and not all([pv.is_complete for pv in self.pv_queue]): time.sleep(0.5) print([(pv.pv_name, pv.is_complete) for pv in self.pv_queue]) log.debug("Completed %d queued PV's", num_promises) # Restore the old PV queue self.pv_queue = old_queue
[docs] def wait_pv(self, pv_name, target_val, timeout=DEFAULT_TIMEOUT): """Wait for a process variable to reach given value. This function polls the process variable (PV) and blocks until the PV reaches the target value or the max timeout, whichever comes first. This function immediately returns True if self.is_attached is False. Parameters ---------- pv : str The process variable to be monitored, as defined by the pyepics system. target_val The value the PV should acquire before returning. timeout : int, optional How long to wait, in seconds, before giving up. Negative values cause the function to wait forever. Returns ------- val : bool True if value was set properly, False if the timeout expired before the target value was reached. """ log_msg = "called wait_pv({name}, {val}, timeout={timeout})" log.debug(log_msg.format(name=pv_name, val=target_val, timeout=timeout)) # Delay for pv to change time.sleep(self.POLL_INTERVAL) startTime = time.time() # Enter into infinite loop polling the PV status while(True): real_PV = self.__class__.__dict__[pv_name] pv_val = real_PV.__get__(self) if (pv_val != target_val): if timeout > -1: curTime = time.time() diffTime = curTime - startTime if diffTime >= timeout: msg = "Timed out '{}' ({}) after {}s" msg = msg.format(pv_name, target_val, timeout) raise exceptions_.TimeoutError(msg) time.sleep(.01) else: log.debug("Ended wait_pv()") return True
[docs] def sample_position(self): """Retrieve the x, y, z and theta positions of the sample stage. Returns ------- position : 4-tuple (x, y, z, θ) tuple that is suitable for giving to :py:meth:`move_sample`. """ Position = namedtuple('Position', ['x', 'y', 'z', 'theta']) position = Position(self.Motor_Sample_Top_X, self.Motor_SampleY, self.Motor_Sample_Top_Z, self.Motor_SampleRot) return position
[docs] def move_sample(self, x=None, y=None, z=None, theta=None): """Move the sample to the given (x, y, z) position. Parameters ---------- x, y, z : float, optional The new position to move the sample to. theta : float, optional Rotation axis angle to set to. """ log.debug('Moving sample to (%s, %s, %s)', x, y, z) if theta is not None: self.Motor_SampleRot = theta if x is not None: self.Motor_Sample_Top_X = float(x) if y is not None: self.Motor_SampleY = float(y) if z is not None: self.Motor_Sample_Top_Z = float(z) # Log actual x, y, z, θ values msg = "Sample moved to (x={x:.2f}, y={y:.2f}, z={z:.2f}, θ={theta:.2f}°)" try: msg = msg.format( x=self.Motor_Sample_Top_X, y=self.Motor_SampleY, z=self.Motor_Sample_Top_Z, theta=self.Motor_SampleRot) except ValueError: # Sometimes incomplete values come back as "None" msg = "Sample moved to (x={x}, y={y}, z={z}, θ={theta}°)" msg = msg.format( x=self.Motor_Sample_Top_X, y=self.Motor_SampleY, z=self.Motor_Sample_Top_Z, theta=self.Motor_SampleRot) log.debug(msg)
[docs] def energy(self): """Get the current beam energy. Returns ------- energy : float Current X-ray energy in keV """ energy = self.DCMputEnergy return energy
@permit_required def move_energy(self, energy, constant_mag=True, correct_backlash=True): """Change the energy of the X-ray source and optics. The undulator gap, monochromator, zone-plate and (optionally) detector will be moved. Parameters ---------- energy : float The new energy (in kEV) for the X-ray source. constant_mag : bool, optional If truthy, the detector will also be moved to correct for the change in focal length. correct_backlash : bool, optional If enabled, this method will correct for slop in the gap motors. Only needed for large changes (eg >0.01 keV) """ # Helper function for converting energy to wavelength kev_to_nm = lambda kev: 1240. / (kev * 1000.) # Check that the energy given is valid for this instrument in_range = self.E_RANGE[0] <= energy <= self.E_RANGE[1] if not in_range: msg = "Energy {energy} keV not in range {lower} - {upper} keV" msg = msg.format(energy=energy, lower=self.E_RANGE[0], upper=self.E_RANGE[1]) raise exceptions_.EnergyError(msg) # Get the current values old_energy = self.energy() old_CCD = self.CCD_Motor old_wavelength = kev_to_nm(old_energy) old_ZP_focal = self.zp_diameter * self.drn / (1000.0 * old_wavelength) inner = math.sqrt(old_CCD**2 - 4.0 * old_CCD * old_ZP_focal) old_D = (old_CCD + inner) / 2.0 # Calculate target values new_wavelength = kev_to_nm(energy) new_ZP_focal = self.zp_diameter * self.drn / (1000.0 * new_wavelength) # Prepare the instrument for moving energy old_DCM_mode = self.DCMmvt self.DCMmvt = 1 # Move the detector and objective optics if constant_mag: # Calculate target values mag = (old_D - old_ZP_focal) / old_ZP_focal dist_ZP_ccd = mag * new_ZP_focal + new_ZP_focal ZP_WD = dist_ZP_ccd * new_ZP_focal / (dist_ZP_ccd - new_ZP_focal) new_CCD_position = ZP_WD + dist_ZP_ccd # Log new values log.debug("Constant magnification: %.2f", mag) log.debug("New CCD z-position: %f", new_CCD_position) # Execute motor movement self.CCD_Motor = new_CCD_position else: # Varying magnification new_D = (old_CCD + math.sqrt(old_CCD * old_CCD - 4.0 * old_CCD * new_ZP_focal) ) / 2.0 ZP_WD = new_D * new_ZP_focal / (new_D - new_ZP_focal) new_mag = (old_D - old_ZP_focal) / old_ZP_focal log.debug("New magnification: %.2f", new_mag) # Move the zoneplate log.debug("New zoneplate z-position: %.5f", ZP_WD) self.zone_plate_z = ZP_WD # Move the upstream source/optics log.debug("New DCM Energy and Gap Energy: %f", energy) self.DCMputEnergy = energy if correct_backlash: # Come up from below to correct for motor slop log.debug("Correcting backlash") self.GAPputEnergy = energy # self.wait_pv('EnergyWait', 0) time.sleep(1) self.GAPputEnergy = energy + self.gap_offset time.sleep(1) self.DCMmvt = old_DCM_mode #self.wait_pv('EnergyWait', 0) log.debug("Changed energy to %.4f keV (%.4f nm).", energy, new_wavelength) @permit_required def open_shutters(self): """Open the shutters to allow light in. The specific shutter(s) that opens depends on the values of ``self.use_shutter_A`` and ``self.use_shutter_B``. """ starttime = time.time() if self.use_shutter_A: log.debug("Opening shutter A") self.ShutterA_Open = 1 self.wait_pv('ShutterA_Move_Status', self.SHUTTER_OPEN) if self.use_shutter_B: log.debug("Opening shutter B") self.ShutterB_Open = 1 self.wait_pv('ShutterB_Move_Status', self.SHUTTER_OPEN) # Set status flags if self.use_shutter_A or self.use_shutter_B: self.shutters_are_open = True else: self.shutters_are_open = False # Log results info if self.use_shutter_A and self.use_shutter_B: which_shutters = "shutters A and B" elif self.use_shutter_A: which_shutters = "shutter A" elif self.use_shutter_B: which_shutters = "shutter B" else: which_shutters = "no shutters" if self.use_shutter_A or self.use_shutter_B or not self.is_attached: duration = time.time() - starttime log.debug("Opened %s in %.2f sec", which_shutters, duration) else: warnings.warn("Neither shutter A nor B enabled.") @permit_required def close_shutters(self): """Close the shutters to stop light in. The specific shutter(s) that closes depends on the values of ``self.use_shutter_A`` and ``self.use_shutter_B``. """ starttime = time.time() if self.use_shutter_A: log.debug("Closing shutter A") self.ShutterA_Close = 1 self.wait_pv('ShutteA_Move_Status', self.SHUTTER_CLOSED) if self.use_shutter_B: log.debug("Closing shutter B") self.ShutterB_Close = 1 self.wait_pv('ShutterB_Move_Status', self.SHUTTER_CLOSED) # Set status flags self.shutters_are_open = False # Log results info if self.use_shutter_A and self.use_shutter_B: which_shutters = "shutters A and B" elif self.use_shutter_A: which_shutters = "shutter A" elif self.use_shutter_B: which_shutters = "shutter B" else: which_shutters = "no shutters" if self.use_shutter_A or self.use_shutter_B or not self.is_attached: duration = time.time() - starttime log.info("Closed %s in %.2f sec", which_shutters, duration) else: warnings.warn("Neither shutter A nor B enabled.") @property def hdf_filename(self): return self.HDF1_FullFileName_RBV
[docs] def hdf_file(self, timeout=10, *args, **kwargs): # Wait for the HDF writer to be done using the HDF file self.wait_pv('HDF1_Capture_RBV', self.HDF_IDLE, timeout=timeout) return h5py.File(self.hdf_filename, *args, **kwargs)
@property def exposure_time(self): """Exposure time for the CCD in seconds.""" current_exposure = max(self.Cam1_AcquireTime, self.Cam1AcquirePeriod) return self.current_exposure @exposure_time.setter def exposure_time(self, val): self.Cam1_AcquireTime = val self.Cam1_AcquirePeriod = val
[docs] def stop_scan(self): log.debug("stop_scan called") self.TIFF1_AutoSave = 'No' self.TIFF1_Capture = 0 self.HDF1_Capture = 0 self.wait_pv('HDF1_Capture', 0) self.reset_ccd() self.reset_ccd() # Open the fast shutter (FOR SUJI) self.Fast_Shutter_Uniblitz = 1
[docs] def setup_detector(self, exposure=0.5, live_display=True): log.debug("%s live display.", "Enabled" if live_display else "Disabled") # Capture a dummy frame to that the HDF5 plugin will work self.Cam1_ImageMode = "Single" self.Cam1_TriggerMode = "Internal" self.exposure_time = 0.01 self.Cam1_Acquire = self.DETECTOR_ACQUIRE self.wait_pv('Cam1_Acquire', self.DETECTOR_IDLE) # Now set the real settings for the detector self.Cam1_Display = live_display self.Cam1_ArrayCallbacks = 'Enable' self.SetSoftGlueForStep = '0' self.Cam1_FrameRateOnOff = False self.Cam1_TriggerMode = 'Overlapped' self.exposure_time = exposure # Prepare external shutter if necessary external_shutter = False if external_shutter: global_PVs['ExternShutterExposure'].put(float(variableDict['ExposureTime'])) global_PVs['ExternShutterDelay'].put(float(variableDict['Ext_ShutterOpenDelay'])) global_PVs['SetSoftGlueForStep'].put('1') log.debug("Finished setting up detector.")
[docs] def setup_hdf_writer(self, num_projections=1, write_mode="Stream", num_recursive_images=1): """Prepare the HDF file writer to accept data. Parameters ---------- num_projections : int Total number of projections to collect at one time. write_mode : str, optional What mode to use for the HDF writer. Gets passed to a PV. num_recursive_images : int, optional How many images to use in the recursive filter. If 1 (default), recursive filtering will be disabled. """ log.debug('setup_hdf_writer() called') if num_recursive_images > 1: # Enable recursive filter self.Proc1_Callbacks = 'Enable' self.Proc1_Filter_Enable = 'Disable' self.HDF1_ArrayPort = 'PROC1' self.Proc1_Filter_Type = self.RECURSIVE_FILTER_TYPE self.Proc1_Num_Filter = num_recursive_images self.Proc1_Reset_Filter = 1 self.Proc1_AutoReset_Filter = 'Yes' self.Proc1_Filter_Callbacks = 'Array N only' self.Proc1_Filter_Enable = 'Enable' else: # No recursive filter, just 1 image self.Proc1_Filter_Enable = 'Disable' self.HDF1_ArrayPort = self.Proc1_ArrayPort # Count total number of projections needed self.HDF1_NumCapture = num_projections self.HDF1_FileWriteMode = write_mode self.HDF1_Capture = self.CAPTURE_ENABLED self.wait_pv('HDF1_Capture', self.CAPTURE_ENABLED) # Clean up and set some status variables log.debug("Finished setting up HDF writer for %s.", self.HDF1_FullFileName_RBV) self.hdf_writer_ready = True
[docs] def setup_tiff_writer(self, filename, num_projections=1, write_mode="Stream", num_recursive_images=1): """Prepare the TIFF file writer to accept data. Parameters ---------- filename : str The name of the HDF file to save data to. num_projections : int Total number of projections to collect at one time. write_mode : str, optional What mode to use for the HDF writer. Gets passed to a PV. num_recursive_images : int, optional How many images to use in the recursive filter. If 1 (default), recursive filtering will be disabled. """ log.warning("setup_tiff_writer() not tested") log.debug('setup_tiff_writer() called') if num_recursive_images > 1: # Recursive filter enabled self.Proc1_Callbacks = 'Enable' self.Proc1_Filter_Enable = 'Disable' self.TIFF1_ArrayPort = 'PROC1' self.Proc1_Filter_Type = self.RECURSIVE_FILTER_TYPE self.Proc1_Num_Filter = num_recursive_images self.Proc1_Reset_Filter = 1 self.Proc1_AutoReset_Filter = 'Yes' self.Proc1_Filter_Callbacks = 'Array N only' self.TIFF1_AutoSave = 'Yes' self.TIFF1_DeleteDriverFile = 'No' self.TIFF1_EnableCallbacks = 'Enable' self.TIFF1_BlockingCallbacks = 'No' self.TIFF1_NumCapture = num_projections self.TIFF1_FileWriteMode = write_mode self.TIFF1_FileName = filename self.TIFF1_Capture = self.CAPTURE_ENABLED # ?? Is this wait_pv really necessary? self.wait_pv('TIFF1_Capture', self.CAPTURE_ENABLED) log.debug("Finished setting up TIFF writer for %s.", filename)
def _trigger_projections(self, num_projections=1): """Trigger the detector to capture one (or more) projections. This method should only be used after setup_detector() and setup_hdf_writer() have been called. The value for num_projections given here should be less than or equal to the number given to each of the setup methods. Parameters ========== num_projections : int, optional How many projections to trigger. """ suffix = 's' if num_projections > 1 else '' log.debug("Triggering %d projection%s", num_projections, suffix) self.Cam1_ImageMode = "Single" self.Cam1_NumImages = 1 for i in range(num_projections): self.Cam1_Acquire = self.DETECTOR_ACQUIRE self.wait_pv('Cam1_Acquire', self.DETECTOR_ACQUIRE, 5) # Wait for the camera to be ready while self.Cam1_Acquire != self.DETECTOR_IDLE: time.sleep(0.01) self.Cam1_SoftwareTrigger = 1 self.wait_pv('Cam1_Acquire', self.DETECTOR_IDLE, 5)
[docs] def capture_projections(self, num_projections=1): """Trigger the capturing of projection images from the detector. Parameters ---------- num_projections : int, optional How many projections to acquire. """ # Raise a warning if the shutters are closed if not self.shutters_are_open: msg = "Collecting projections with shutters closed." warnings.warn(msg, RuntimeWarning) # Set frame collection data self.Cam1_FrameType = self.FRAME_DATA # Collect the data ret = self._trigger_projections(num_projections=num_projections) return ret
[docs] def capture_white_field(self, num_projections=1): """Trigger the capturing of projection images from the detector with the shutters open and no sample present. Parameters ---------- num_projections : int, optional How many projections to acquire. exposure : float, optional Exposure time for each frame in seconds. """ # Raise a warning if the shutters are closed. if not self.shutters_are_open: msg = "Collecting white field with shutters closed." warnings.warn(msg, RuntimeWarning) log.warning(msg) self.Cam1_FrameType = self.FRAME_WHITE # Collect the data ret = self._trigger_projections(num_projections=num_projections) return ret
[docs] def capture_dark_field(self, num_projections=1): """Trigger the capturing of projection images from the detector with the shutters closed. The shutter should be closed before calling this method. Parameters ---------- num_projections : int, optional How many projections to acquire. exposure : float, optional Exposure time for each frame in seconds. """ # Raise a warning if the shutters are open. if self.shutters_are_open: msg = "Collecting dark field with shutters open." warnings.warn(msg, RuntimeWarning) log.warning(msg) self.Cam1_FrameType = self.FRAME_DARK # Collect the data ret = self._trigger_projections(num_projections=num_projections) return ret
[docs] def capture_tomogram(self, angles, num_projections=1, stabilize_sleep=10): """Collect data frames over a range of angles. Parameters ========== angles : np.ndarray An array of angles (in degrees) to use for collecting projections. num_projections : int, optional Number of projections to average at each angle. stablize_sleep : int, optional How long (in milliseconds) to wait after moving the rotation stage. """ log.warning("capture_tomogram() not tested") log.debug('called tomo_scan()') # Prepare the instrument for data collection self.Cam1_FrameType = self.FRAME_DATA self.Cam1_NumImages = 1 if num_projections > 1: old_filter = self.Proc1_Filter_Enable self.Proc1_Filter_Enable = 'Enable' # Cycle through each angle and collect data for sample_rot in tqdm.tqdm(angles, desc="Capturing tomogram", unit='rot'): self.move_sample(theta=sample_rot) log.debug('Stabilize Sleep: %d ms', stabilize_sleep) time.sleep(stabilize_sleep / 1000) # Trigger the camera self._trigger_projections(num_projections=num_projections) # Restore previous filter enabled state if num_projections > 1: self.Proc1_Filter_Enable = old_filter
[docs] def epics_PV(self, pv_name): """Retrieve the epics process variable (PV) object for the given attribute name. Parameters ========== pv_name : str The name of the PV object. Should match the attribute on this TXM() object. """ return self.__class__.__dict__[pv_name].epics_PV(txm=self)
@contextmanager def run_scan(self): """A context manager for executing long-running scripts. At the end of the context, the CCD gets reset and several motor positions get restored. """ # Save the initial values init_position = self.sample_position() init_E = self.energy() # Return to the inner code block yield # Stop TIFF and HDF collection self.TIFF1_AutoSave = 'No' self.TIFF1_Capture = 0 self.HDF1_Capture = 0 self.wait_pv('HDF1_Capture', 0) # Restore the saved initial motor positions self.move_sample(*init_position) self.move_energy(init_E) # Reset the CCD so it's in continuous mode self.reset_ccd() # Open the fast shutter #### FOR SUJI self.Fast_Shutter_Uniblitz = 1 self.wait_pv('HDF1_Capture', 1)
[docs] def reset_ccd(self): log.debug("Resetting CCD") # Sequence Internal / Overlapped / internal because of CCD bug!! self.Cam1_TriggerMode = 'Internal' self.Cam1_TriggerMode = 'Overlapped' self.Cam1_TriggerMode = 'Internal' # Other PV settings self.Proc1_Filter_Callbacks = 'Every array' self.Cam1_ImageMode = 'Continuous' self.Cam1_Display = 1 self.Cam1_Acquire = self.DETECTOR_ACQUIRE self.wait_pv('Cam1_Acquire', self.DETECTOR_ACQUIRE, timeout=2)
[docs]class MicroTXM(NanoTXM): """TXM operating with the front micro-CT stage.""" # Flyscan PV's Fly_ScanDelta = TxmPV('32idcTXM:eFly:scanDelta') Fly_StartPos = TxmPV('32idcTXM:eFly:startPos') Fly_EndPos = TxmPV('32idcTXM:eFly:endPos') Fly_SlewSpeed = TxmPV('32idcTXM:eFly:slewSpeed') Fly_Taxi = TxmPV('32idcTXM:eFly:taxi') Fly_Run = TxmPV('32idcTXM:eFly:fly') Fly_ScanControl = TxmPV('32idcTXM:eFly:scanControl') Fly_Calc_Projections = TxmPV('32idcTXM:eFly:calcNumTriggers') Fly_Set_Encoder_Pos = TxmPV('32idcTXM:eFly:EncoderPos') Theta_Array = TxmPV('32idcTXM:eFly:motorPos.AVAL') # Motor PVs Motor_SampleX = TxmPV('32idc01:m33.VAL') Motor_SampleY = TxmPV('32idc02:m15.VAL') # for the micro-CT system Motor_SampleRot = TxmPV('32idcTXM:hydra:c0:m1.VAL') # PI Micos air bearing rotary stage Motor_SampleZ = TxmPV('32idcTXM:mcs:c1:m1.VAL') Motor_Sample_Top_X = TxmPV('32idcTXM:mcs:c1:m2.VAL') # Smaract XZ micro-CT set Motor_Sample_Top_Z = TxmPV('32idcTXM:mcs:c1:m1.VAL') # Smaract XZ micro-CT set Motor_X_Tile = TxmPV('32idc01:m33.VAL') Motor_Y_Tile = TxmPV('32idc02:m15.VAL')