spectrumdevice
A high-level, object-oriented Python library for controlling Spectrum Instrumentation devices.
spectrumdevice
can connect to individual cards or
StarHubs (e.g. the
NetBox). spectrumdevice
provides the following classes
for controlling devices:
Hardware Classes
Name | Purpose |
---|---|
SpectrumDigitiserCard |
Controlling individual digitiser cards |
SpectrumDigitiserStarHub |
Controlling digitiser cards aggregated with a StarHub |
SpectrumDigitiserAnalogChannel |
Controlling analog channels of a digitiser |
SpectrumDigitiserIOLine |
Controlling multipurpose IO lines of a digitiser |
SpectrumAWGCard |
Controlling individual AWG cards |
SpectrumAWGStarHub |
(not yet implemented) |
SpectrumAWGAnalogChannel |
Controlling analog channels of an AWG |
SpectrumAWGIOLine |
Controlling multipurpose IO lines of an AWG |
PulseGenerator |
Controlling pulse generators belonging to IO lines |
Mock Classes
spectrumdevice
also includes mock classes for testing software without drivers installed or hardware connected:
Name | Purpose |
---|---|
MockSpectrumDigitiserCard |
Mocking individual digitiser cards |
MockSpectrumDigitiserStarHub |
Mocking digitiser cards aggregated with a StarHub |
MockSpectrumAWGCard |
Mocking individual AWG cards |
MockSpectrumAWGStarHub |
Mocking AWG cards aggregated with a StarHub |
Abstract Classes
The following abstract classes provide common implementations for methods whose functionality is identical across different Spectrum devices. They cannot be instantiated themselves, but are included here as they contain documentation for methods inherited by the concrete (and therefore instantiatable) classes above.
Name | Purpose |
---|---|
AbstractSpectrumDevice |
Implements methods common to all devices |
AbstractSpectrumCard |
Implements methods common to all card |
AbstractSpectrumStarHub |
Implements methods common to all hubs |
AbstractSpectrumChannel |
Implements methods common to all channels |
AbstractSpectrumDigitiser |
Implements methods common to all digitisers |
AbstractSpectrumAWG |
Implements methods common to all AWGs |
Settings
The submodule spectrumdevice.settings
provides Enums and Dataclasses wrapping the register values provided by the
Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.
Resources
Reference Documentation
View Source
""" A high-level, object-oriented Python library for controlling Spectrum Instrumentation devices. `spectrumdevice` can connect to individual cards or [StarHubs](https://spectrum-instrumentation.com/en/m4i-star-hub) (e.g. the [NetBox](https://spectrum-instrumentation.com/en/digitizernetbox)). `spectrumdevice` provides the following classes for controlling devices: ### Hardware Classes | Name | Purpose | |----------------------------------|---------------------------------------------------------| | `SpectrumDigitiserCard` | Controlling individual digitiser cards | | `SpectrumDigitiserStarHub` | Controlling digitiser cards aggregated with a StarHub | | `SpectrumDigitiserAnalogChannel` | Controlling analog channels of a digitiser | | `SpectrumDigitiserIOLine` | Controlling multipurpose IO lines of a digitiser | | `SpectrumAWGCard` | Controlling individual AWG cards | | `SpectrumAWGStarHub` | (not yet implemented) | | `SpectrumAWGAnalogChannel` | Controlling analog channels of an AWG | | `SpectrumAWGIOLine` | Controlling multipurpose IO lines of an AWG | | `PulseGenerator` | Controlling pulse generators belonging to IO lines | ### Mock Classes `spectrumdevice` also includes mock classes for testing software without drivers installed or hardware connected: | Name | Purpose | |--------------------------------|-----------------------------------------------------| | `MockSpectrumDigitiserCard` | Mocking individual digitiser cards | | `MockSpectrumDigitiserStarHub` | Mocking digitiser cards aggregated with a StarHub | | `MockSpectrumAWGCard` | Mocking individual AWG cards | | `MockSpectrumAWGStarHub` | Mocking AWG cards aggregated with a StarHub | ### Abstract Classes The following abstract classes provide common implementations for methods whose functionality is identical across different Spectrum devices. They cannot be instantiated themselves, but are included here as they contain documentation for methods inherited by the concrete (and therefore instantiatable) classes above. | Name | Purpose | |--------------------------------|-----------------------------------------------------| | `AbstractSpectrumDevice` | Implements methods common to all devices | | `AbstractSpectrumCard` | Implements methods common to all card | | `AbstractSpectrumStarHub` | Implements methods common to all hubs | | `AbstractSpectrumChannel` | Implements methods common to all channels | | `AbstractSpectrumDigitiser` | Implements methods common to all digitisers | | `AbstractSpectrumAWG` | Implements methods common to all AWGs | ### Settings The submodule `spectrumdevice.settings` provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware. ### Resources * [Source on GitHub](https://github.com/KCL-BMEIS/spectrumdevice) * [README including quickstart](https://github.com/KCL-BMEIS/spectrumdevice/blob/main/README.md) * [Examples](https://github.com/KCL-BMEIS/spectrumdevice/tree/main/example_scripts) * [PyPi](https://pypi.org/project/spectrumdevice/) * [API reference documentation](https://kcl-bmeis.github.io/spectrumdevice/) ### Reference Documentation """ # Christian Baker, King's College London # Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. from .measurement import Measurement from .devices.digitiser.digitiser_card import SpectrumDigitiserCard from .devices.digitiser.digitiser_channel import SpectrumDigitiserAnalogChannel, SpectrumDigitiserIOLine from .devices.digitiser.digitiser_star_hub import SpectrumDigitiserStarHub from .devices.awg.awg_card import SpectrumAWGCard from .devices.awg.awg_channel import SpectrumAWGAnalogChannel, SpectrumAWGIOLine from .devices.mocks import MockSpectrumDigitiserCard, MockSpectrumDigitiserStarHub, MockSpectrumAWGCard from .devices.abstract_device import ( AbstractSpectrumDevice, AbstractSpectrumCard, AbstractSpectrumChannel, AbstractSpectrumStarHub, ) from .devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser from .features.pulse_generator.pulse_generator import PulseGenerator __all__ = [ "SpectrumDigitiserAnalogChannel", "SpectrumDigitiserIOLine", "SpectrumDigitiserCard", "SpectrumDigitiserStarHub", "MockSpectrumDigitiserCard", "MockSpectrumDigitiserStarHub", "AbstractSpectrumDigitiser", "AbstractSpectrumStarHub", "AbstractSpectrumCard", "AbstractSpectrumDevice", "AbstractSpectrumChannel", "settings", "features", "Measurement", "SpectrumAWGCard", "MockSpectrumAWGCard", "SpectrumAWGAnalogChannel", "SpectrumAWGIOLine", "PulseGenerator", ] from . import _version __version__ = _version.get_versions()["version"] # type: ignore
View Source
class SpectrumDigitiserAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumDigitiserAnalogChannelInterface): """Class for controlling an individual channel of a spectrum digitiser. Channels are constructed automatically when a `SpectrumDigitiserCard` or `SpectrumDigitiserStarHub` is instantiated, and can then be accessed via the `.channels` property.""" def __init__(self, channel_number: int, parent_device: SpectrumDigitiserInterface) -> None: if parent_device.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(parent_device.type) # pass unused args up the inheritance hierarchy super().__init__(channel_number=channel_number, parent_device=parent_device) self._full_scale_value = self._parent_device.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE) # used frequently so store locally instead of reading from device each time: self._vertical_range_mv = self.vertical_range_in_mv self._vertical_offset_in_percent = self.vertical_offset_in_percent def _get_settings_as_dict(self) -> dict: return { SpectrumDigitiserAnalogChannel.input_path.__name__: self.input_path, SpectrumDigitiserAnalogChannel.input_coupling.__name__: self.input_coupling, SpectrumDigitiserAnalogChannel.input_impedance.__name__: self.input_impedance, SpectrumDigitiserAnalogChannel.vertical_range_in_mv.__name__: self.vertical_range_in_mv, SpectrumDigitiserAnalogChannel.vertical_offset_in_percent.__name__: self.vertical_offset_in_percent, } def _set_settings_from_dict(self, settings: dict) -> None: self.set_input_path(settings[SpectrumDigitiserAnalogChannel.input_path.__name__]) self.set_input_coupling(settings[SpectrumDigitiserAnalogChannel.input_coupling.__name__]) self.set_input_impedance(settings[SpectrumDigitiserAnalogChannel.input_impedance.__name__]) self.set_vertical_range_in_mv(settings[SpectrumDigitiserAnalogChannel.vertical_range_in_mv.__name__]) self.set_vertical_offset_in_percent( settings[SpectrumDigitiserAnalogChannel.vertical_offset_in_percent.__name__] ) def convert_raw_waveform_to_voltage_waveform(self, raw_waveform: ndarray) -> ndarray: vertical_offset_mv = 0.01 * float(self._vertical_range_mv * self._vertical_offset_in_percent) return 1e-3 * ( float(self._vertical_range_mv) * raw_waveform / float(self._full_scale_value) + vertical_offset_mv ) @property def vertical_range_in_mv(self) -> int: """The currently set input range of the channel in mV. Returns: vertical_range (int): The currently set vertical range in mV. """ self._vertical_range_mv = self._parent_device.read_spectrum_device_register( VERTICAL_RANGE_COMMANDS[self._number] ) return self._vertical_range_mv def set_vertical_range_in_mv(self, vertical_range: int) -> None: """Set the input range of the channel in mV. See Spectrum documentation for valid values. Args: vertical_range (int): The desired vertical range in mV. """ self._parent_device.write_to_spectrum_device_register(VERTICAL_RANGE_COMMANDS[self._number], vertical_range) self._vertical_range_mv = vertical_range @property def vertical_offset_in_percent(self) -> int: """The currently set input offset of the channel in percent of the vertical range. Returns: offset (int): The currently set vertical offset in percent. """ self._vertical_offset_in_percent = self._parent_device.read_spectrum_device_register( VERTICAL_OFFSET_COMMANDS[self._number] ) return self._vertical_offset_in_percent def set_vertical_offset_in_percent(self, offset: int) -> None: """Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid values. Args: offset (int): The desired vertical offset in percent. """ self._parent_device.write_to_spectrum_device_register(VERTICAL_OFFSET_COMMANDS[self._number], offset) self._vertical_offset_in_percent = offset @property def input_impedance(self) -> InputImpedance: """The current input impedance setting of the channel (50 Ohm or 1 MOhm)""" impedance_binary_value = self._parent_device.read_spectrum_device_register( INPUT_IMPEDANCE_COMMANDS[self._number] ) return InputImpedance(impedance_binary_value) def set_input_impedance(self, input_impedance: InputImpedance) -> None: self._parent_device.write_to_spectrum_device_register( INPUT_IMPEDANCE_COMMANDS[self._number], input_impedance.value ) @property def input_coupling(self) -> InputCoupling: """The coupling (AC or DC) setting of the channel. Only available on some hardware.""" coupling_binary_value = self._parent_device.read_spectrum_device_register(INPUT_COUPLING_COMMANDS[self._number]) return InputCoupling(coupling_binary_value) def set_input_coupling(self, input_coupling: InputCoupling) -> None: self._parent_device.write_to_spectrum_device_register( INPUT_COUPLING_COMMANDS[self._number], input_coupling.value ) @property def input_path(self) -> InputPath: """The input path setting of the channel. Only available on some hardware.""" path_binary_value = self._parent_device.read_spectrum_device_register(INPUT_PATH_COMMANDS[self._number]) return InputPath(path_binary_value) def set_input_path(self, input_path: InputPath) -> None: self._parent_device.write_to_spectrum_device_register(INPUT_PATH_COMMANDS[self._number], input_path.value)
Class for controlling an individual channel of a spectrum digitiser. Channels are constructed automatically when
a SpectrumDigitiserCard
or SpectrumDigitiserStarHub
is instantiated, and can then be accessed via the
.channels
property.
View Source
def __init__(self, channel_number: int, parent_device: SpectrumDigitiserInterface) -> None: if parent_device.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(parent_device.type) # pass unused args up the inheritance hierarchy super().__init__(channel_number=channel_number, parent_device=parent_device) self._full_scale_value = self._parent_device.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE) # used frequently so store locally instead of reading from device each time: self._vertical_range_mv = self.vertical_range_in_mv self._vertical_offset_in_percent = self.vertical_offset_in_percent
View Source
def convert_raw_waveform_to_voltage_waveform(self, raw_waveform: ndarray) -> ndarray: vertical_offset_mv = 0.01 * float(self._vertical_range_mv * self._vertical_offset_in_percent) return 1e-3 * ( float(self._vertical_range_mv) * raw_waveform / float(self._full_scale_value) + vertical_offset_mv )
The currently set input range of the channel in mV.
Returns
vertical_range (int): The currently set vertical range in mV.
View Source
def set_vertical_range_in_mv(self, vertical_range: int) -> None: """Set the input range of the channel in mV. See Spectrum documentation for valid values. Args: vertical_range (int): The desired vertical range in mV. """ self._parent_device.write_to_spectrum_device_register(VERTICAL_RANGE_COMMANDS[self._number], vertical_range) self._vertical_range_mv = vertical_range
Set the input range of the channel in mV. See Spectrum documentation for valid values.
Args
- vertical_range (int): The desired vertical range in mV.
The currently set input offset of the channel in percent of the vertical range.
Returns
offset (int): The currently set vertical offset in percent.
View Source
def set_vertical_offset_in_percent(self, offset: int) -> None: """Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid values. Args: offset (int): The desired vertical offset in percent. """ self._parent_device.write_to_spectrum_device_register(VERTICAL_OFFSET_COMMANDS[self._number], offset) self._vertical_offset_in_percent = offset
Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid values.
Args
- offset (int): The desired vertical offset in percent.
The current input impedance setting of the channel (50 Ohm or 1 MOhm)
View Source
def set_input_impedance(self, input_impedance: InputImpedance) -> None: self._parent_device.write_to_spectrum_device_register( INPUT_IMPEDANCE_COMMANDS[self._number], input_impedance.value )
The coupling (AC or DC) setting of the channel. Only available on some hardware.
View Source
def set_input_coupling(self, input_coupling: InputCoupling) -> None: self._parent_device.write_to_spectrum_device_register( INPUT_COUPLING_COMMANDS[self._number], input_coupling.value )
The input path setting of the channel. Only available on some hardware.
View Source
def set_input_path(self, input_path: InputPath) -> None: self._parent_device.write_to_spectrum_device_register(INPUT_PATH_COMMANDS[self._number], input_path.value)
Inherited Members
- spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface
- copy_settings_from_other_channel
View Source
class SpectrumDigitiserIOLine(AbstractSpectrumIOLine, SpectrumDigitiserIOLineInterface): """Class for controlling multipurpose IO lines of a digitiser, e.g. X0, X1, X2 and X3.""" def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: if parent_device.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(parent_device.type) super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: return 0 # no settings required for DigOut
Class for controlling multipurpose IO lines of a digitiser, e.g. X0, X1, X2 and X3.
View Source
def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: if parent_device.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(parent_device.type) super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy
Inherited Members
- spectrumdevice.devices.abstract_device.abstract_spectrum_io_line.AbstractSpectrumIOLine
- mode
- set_mode
- pulse_generator
View Source
class SpectrumDigitiserCard( AbstractSpectrumCard[SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface], AbstractSpectrumDigitiser, ): """Class for controlling individual Spectrum digitiser cards.""" def __init__(self, device_number: int, ip_address: Optional[str] = None) -> None: """ Args: device_number (int): Index of the card to control. If only one card is present, set to 0. ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. """ # pass unused args up the inheritance hierarchy super().__init__(device_number=device_number, ip_address=ip_address) if self.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(self.type) self._acquisition_mode = self.acquisition_mode self._timestamper: Optional[Timestamper] = None self._batch_size = 1 def _init_analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]: num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES) num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE) total_channels = num_modules * num_channels_per_module return tuple( [SpectrumDigitiserAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)] ) def _init_io_lines(self) -> Sequence[SpectrumDigitiserIOLineInterface]: if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES: return tuple([SpectrumDigitiserIOLine(channel_number=n, parent_device=self) for n in range(4)]) else: raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.") def enable_timestamping(self) -> None: self._timestamper = Timestamper(self, self._handle) def wait_for_acquisition_to_complete(self) -> None: """Blocks until the current acquisition has finished, or the timeout is reached. In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after `start()`. Once the call to `wait_for_acquisition_to_complete()` returns, the newly acquired samples are in the on_device buffer and ready for transfer to the `TransferBuffer` using `start_transfer()`. In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until `stop()` is called, so `wait_for_acquisition_to_complete()` should not be used. """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY) def get_waveforms(self) -> List[List[NDArray[float_]]]: """Get a list of the most recently transferred waveforms, in channel order. This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays (waveforms) and returns the list. In Standard Single mode (SPC_REC_STD_SINGLE), `get_waveforms()` should be called after `wait_for_transfer_to_complete()` has returned. In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the `TransferBuffer`, this method should be called in a loop . The method will block until each new transfer is received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example, this would the rate at which your trigger source was running). Returns: waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition and one array per enabled channel, in channel order. To average the acquisitions: `np.array(waveforms).mean(axis=0)` """ if self._transfer_buffer is None: raise SpectrumNoTransferBufferDefined("Cannot find a samples transfer buffer") num_read_bytes = 0 num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums) num_expected_bytes_per_frame = num_samples_per_frame * self._transfer_buffer.data_array.itemsize raw_samples = zeros(num_samples_per_frame * self._batch_size, dtype=self._transfer_buffer.data_array.dtype) if self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE): raw_samples = self._transfer_buffer.copy_contents() elif self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): self.wait_for_transfer_chunk_to_complete() while num_read_bytes < (num_expected_bytes_per_frame * self._batch_size): num_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_LEN) position_of_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_POS) # Don't allow reading over the end of the transfer buffer if ( position_of_available_bytes + num_available_bytes ) > self._transfer_buffer.data_array_length_in_bytes: num_available_bytes = self._transfer_buffer.data_array_length_in_bytes - position_of_available_bytes # Don't allow reading over the end of the current acquisition: if (num_read_bytes + num_available_bytes) > (num_expected_bytes_per_frame * self._batch_size): num_available_bytes = (num_expected_bytes_per_frame * self._batch_size) - num_read_bytes num_available_samples = num_available_bytes // self._transfer_buffer.data_array.itemsize num_read_samples = num_read_bytes // self._transfer_buffer.data_array.itemsize raw_samples[ num_read_samples : num_read_samples + num_available_samples ] = self._transfer_buffer.read_chunk(position_of_available_bytes, num_available_bytes) self.write_to_spectrum_device_register(SPC_DATA_AVAIL_CARD_LEN, num_available_bytes) num_read_bytes += num_available_bytes waveforms_in_columns = raw_samples.reshape( (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums)) ) repeat_acquisitions = [] for n in range(self._batch_size): repeat_acquisitions.append( [ cast( SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num] ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform)) for ch_num, waveform in zip(self.enabled_analog_channel_nums, waveforms_in_columns[n, :, :].T) ] ) return repeat_acquisitions def get_timestamp(self) -> Optional[datetime.datetime]: """Get timestamp for the last acquisition""" if self._timestamper is not None: return self._timestamper.get_timestamp() else: return None @property def acquisition_length_in_samples(self) -> int: """The current recording length (per channel) in samples. Returns: length_in_samples (int): The current recording length ('acquisition length') in samples.""" return self.read_spectrum_device_register(SPC_MEMSIZE) def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: """Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size allowed by the connected card type. Args: length_in_samples (int): The desired recording length ('acquisition length'), in samples. """ length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) self.write_to_spectrum_device_register(SPC_SEGMENTSIZE, length_in_samples) self.write_to_spectrum_device_register(SPC_MEMSIZE, length_in_samples) @property def post_trigger_length_in_samples(self) -> int: """The number of samples of the recording that will contain data received after the trigger event. Returns: length_in_samples (int): The currently set post trigger length in samples. """ return self.read_spectrum_device_register(SPC_POSTTRIGGER) def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: """Change the number of samples of the recording that will contain data received after the trigger event. In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card. Args: length_in_samples (int): The desired post trigger length in samples.""" length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI: if (self.acquisition_length_in_samples - length_in_samples) < get_memsize_step_size(self._model_number): logger.warning( "FIFO mode: coercing post trigger length to maximum allowed value (step-size samples less than " "the acquisition length)." ) length_in_samples = self.acquisition_length_in_samples - get_memsize_step_size(self._model_number) self.write_to_spectrum_device_register(SPC_POSTTRIGGER, length_in_samples) def _coerce_num_samples_if_fifo(self, value: int) -> int: if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI: if value != mod(value, get_memsize_step_size(self._model_number)): logger.warning( f"FIFO mode: coercing length to nearest {get_memsize_step_size(self._model_number)}" f" samples" ) value = int(value - mod(value, get_memsize_step_size(self._model_number))) return value @property def number_of_averages(self) -> int: return self.read_spectrum_device_register(SPC_AVERAGES) def set_number_of_averages(self, num_averages: int) -> None: if num_averages > 0: self.write_to_spectrum_device_register(SPC_AVERAGES, num_averages) else: raise ValueError("Number of averages must be greater than 0.") @property def acquisition_mode(self) -> AcquisitionMode: """The currently enabled card mode. Will raise an exception if the current mode is not supported by `spectrumdevice`. Returns: mode (`AcquisitionMode`): The currently enabled card acquisition mode.""" return AcquisitionMode(self.read_spectrum_device_register(SPC_CARDMODE)) def set_acquisition_mode(self, mode: AcquisitionMode) -> None: """Change the currently enabled card mode. See `AcquisitionMode` and the Spectrum documentation for the available modes. Args: mode (`AcquisitionMode`): The desired acquisition mode.""" self.write_to_spectrum_device_register(SPC_CARDMODE, mode.value) @property def batch_size(self) -> int: return self._batch_size def set_batch_size(self, batch_size: int) -> None: self._batch_size = batch_size def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode, with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode, one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be created using the Timestamper class. Args: buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed `TransferBuffer` set up for card-to-PC transfer of samples ("data"). The size of the buffer should be chosen according to the current number of active channels, the acquisition length and the number of acquisitions which you intend to download at a time using get_waveforms(). """ self._set_or_update_transfer_buffer_attribute(buffer) if self._transfer_buffer is not None: set_transfer_buffer(self._handle, self._transfer_buffer) def _set_or_update_transfer_buffer_attribute(self, buffer: Optional[Sequence[TransferBuffer]]) -> None: if buffer: self._transfer_buffer = buffer[0] if self._transfer_buffer.direction != BufferDirection.SPCM_DIR_CARDTOPC: raise ValueError("Digitisers need a transfer buffer with direction BufferDirection.SPCM_DIR_CARDTOPC") if self._transfer_buffer.type != BufferType.SPCM_BUF_DATA: raise ValueError("Digitisers need a transfer buffer with type BufferDirection.SPCM_BUF_DATA") elif self._transfer_buffer is None: if self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): samples_per_batch = ( self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums) * self._batch_size ) pages_per_batch = samples_per_batch * self.bytes_per_sample / PAGE_SIZE_IN_BYTES if pages_per_batch < DEFAULT_NOTIFY_SIZE_IN_PAGES: notify_size = pages_per_batch else: notify_size = DEFAULT_NOTIFY_SIZE_IN_PAGES # Make transfer buffer big enough to hold all samples in the batch self._transfer_buffer = create_samples_acquisition_transfer_buffer( size_in_samples=samples_per_batch, notify_size_in_pages=notify_size, bytes_per_sample=self.bytes_per_sample, ) elif self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE): self._transfer_buffer = create_samples_acquisition_transfer_buffer( size_in_samples=self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums), notify_size_in_pages=0, bytes_per_sample=self.bytes_per_sample, ) else: raise ValueError("AcquisitionMode not recognised") def __str__(self) -> str: return f"Card {self._visa_string}"
Class for controlling individual Spectrum digitiser cards.
View Source
def __init__(self, device_number: int, ip_address: Optional[str] = None) -> None: """ Args: device_number (int): Index of the card to control. If only one card is present, set to 0. ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. """ # pass unused args up the inheritance hierarchy super().__init__(device_number=device_number, ip_address=ip_address) if self.type != CardType.SPCM_TYPE_AI: raise SpectrumCardIsNotADigitiser(self.type) self._acquisition_mode = self.acquisition_mode self._timestamper: Optional[Timestamper] = None self._batch_size = 1
Args
- device_number (int): Index of the card to control. If only one card is present, set to 0.
- ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.
View Source
def enable_timestamping(self) -> None: self._timestamper = Timestamper(self, self._handle)
View Source
def wait_for_acquisition_to_complete(self) -> None: """Blocks until the current acquisition has finished, or the timeout is reached. In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after `start()`. Once the call to `wait_for_acquisition_to_complete()` returns, the newly acquired samples are in the on_device buffer and ready for transfer to the `TransferBuffer` using `start_transfer()`. In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until `stop()` is called, so `wait_for_acquisition_to_complete()` should not be used. """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY)
Blocks until the current acquisition has finished, or the timeout is reached.
In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after start()
. Once the call
to wait_for_acquisition_to_complete()
returns, the newly acquired samples are in the on_device buffer and
ready for transfer to the TransferBuffer
using start_transfer()
.
In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until
stop()
is called, so wait_for_acquisition_to_complete()
should not be used.
View Source
def get_waveforms(self) -> List[List[NDArray[float_]]]: """Get a list of the most recently transferred waveforms, in channel order. This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays (waveforms) and returns the list. In Standard Single mode (SPC_REC_STD_SINGLE), `get_waveforms()` should be called after `wait_for_transfer_to_complete()` has returned. In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the `TransferBuffer`, this method should be called in a loop . The method will block until each new transfer is received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example, this would the rate at which your trigger source was running). Returns: waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition and one array per enabled channel, in channel order. To average the acquisitions: `np.array(waveforms).mean(axis=0)` """ if self._transfer_buffer is None: raise SpectrumNoTransferBufferDefined("Cannot find a samples transfer buffer") num_read_bytes = 0 num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums) num_expected_bytes_per_frame = num_samples_per_frame * self._transfer_buffer.data_array.itemsize raw_samples = zeros(num_samples_per_frame * self._batch_size, dtype=self._transfer_buffer.data_array.dtype) if self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE): raw_samples = self._transfer_buffer.copy_contents() elif self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): self.wait_for_transfer_chunk_to_complete() while num_read_bytes < (num_expected_bytes_per_frame * self._batch_size): num_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_LEN) position_of_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_POS) # Don't allow reading over the end of the transfer buffer if ( position_of_available_bytes + num_available_bytes ) > self._transfer_buffer.data_array_length_in_bytes: num_available_bytes = self._transfer_buffer.data_array_length_in_bytes - position_of_available_bytes # Don't allow reading over the end of the current acquisition: if (num_read_bytes + num_available_bytes) > (num_expected_bytes_per_frame * self._batch_size): num_available_bytes = (num_expected_bytes_per_frame * self._batch_size) - num_read_bytes num_available_samples = num_available_bytes // self._transfer_buffer.data_array.itemsize num_read_samples = num_read_bytes // self._transfer_buffer.data_array.itemsize raw_samples[ num_read_samples : num_read_samples + num_available_samples ] = self._transfer_buffer.read_chunk(position_of_available_bytes, num_available_bytes) self.write_to_spectrum_device_register(SPC_DATA_AVAIL_CARD_LEN, num_available_bytes) num_read_bytes += num_available_bytes waveforms_in_columns = raw_samples.reshape( (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums)) ) repeat_acquisitions = [] for n in range(self._batch_size): repeat_acquisitions.append( [ cast( SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num] ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform)) for ch_num, waveform in zip(self.enabled_analog_channel_nums, waveforms_in_columns[n, :, :].T) ] ) return repeat_acquisitions
Get a list of the most recently transferred waveforms, in channel order.
This method copies and reshapes the samples in the TransferBuffer
into a list of lists of 1D NumPy arrays
(waveforms) and returns the list.
In Standard Single mode (SPC_REC_STD_SINGLE), get_waveforms()
should be called after
wait_for_transfer_to_complete()
has returned.
In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the
TransferBuffer
, this method should be called in a loop . The method will block until each new transfer is
received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example,
this would the rate at which your trigger source was running).
Returns
waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition and one array per enabled channel, in channel order. To average the acquisitions:
np.array(waveforms).mean(axis=0)
View Source
def get_timestamp(self) -> Optional[datetime.datetime]: """Get timestamp for the last acquisition""" if self._timestamper is not None: return self._timestamper.get_timestamp() else: return None
Get timestamp for the last acquisition
The current recording length (per channel) in samples.
Returns
length_in_samples (int): The current recording length ('acquisition length') in samples.
View Source
def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: """Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size allowed by the connected card type. Args: length_in_samples (int): The desired recording length ('acquisition length'), in samples. """ length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) self.write_to_spectrum_device_register(SPC_SEGMENTSIZE, length_in_samples) self.write_to_spectrum_device_register(SPC_MEMSIZE, length_in_samples)
Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size allowed by the connected card type.
Args
- length_in_samples (int): The desired recording length ('acquisition length'), in samples.
The number of samples of the recording that will contain data received after the trigger event.
Returns
length_in_samples (int): The currently set post trigger length in samples.
View Source
def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: """Change the number of samples of the recording that will contain data received after the trigger event. In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card. Args: length_in_samples (int): The desired post trigger length in samples.""" length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI: if (self.acquisition_length_in_samples - length_in_samples) < get_memsize_step_size(self._model_number): logger.warning( "FIFO mode: coercing post trigger length to maximum allowed value (step-size samples less than " "the acquisition length)." ) length_in_samples = self.acquisition_length_in_samples - get_memsize_step_size(self._model_number) self.write_to_spectrum_device_register(SPC_POSTTRIGGER, length_in_samples)
Change the number of samples of the recording that will contain data received after the trigger event. In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card.
Args
- length_in_samples (int): The desired post trigger length in samples.
View Source
def set_number_of_averages(self, num_averages: int) -> None: if num_averages > 0: self.write_to_spectrum_device_register(SPC_AVERAGES, num_averages) else: raise ValueError("Number of averages must be greater than 0.")
The currently enabled card mode. Will raise an exception if the current mode is not supported by
spectrumdevice
.
Returns
mode (
AcquisitionMode
): The currently enabled card acquisition mode.
View Source
def set_acquisition_mode(self, mode: AcquisitionMode) -> None: """Change the currently enabled card mode. See `AcquisitionMode` and the Spectrum documentation for the available modes. Args: mode (`AcquisitionMode`): The desired acquisition mode.""" self.write_to_spectrum_device_register(SPC_CARDMODE, mode.value)
Change the currently enabled card mode. See AcquisitionMode
and the Spectrum documentation
for the available modes.
Args
- mode (
AcquisitionMode
): The desired acquisition mode.
View Source
def set_batch_size(self, batch_size: int) -> None: self._batch_size = batch_size
View Source
def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode, with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode, one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be created using the Timestamper class. Args: buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed `TransferBuffer` set up for card-to-PC transfer of samples ("data"). The size of the buffer should be chosen according to the current number of active channels, the acquisition length and the number of acquisitions which you intend to download at a time using get_waveforms(). """ self._set_or_update_transfer_buffer_attribute(buffer) if self._transfer_buffer is not None: set_transfer_buffer(self._handle, self._transfer_buffer)
Create or provide a TransferBuffer
object for receiving acquired samples from the device.
If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode, with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode, one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be created using the Timestamper class.
Args
- buffer (Optional[List[
TransferBuffer
]]): A length-1 list containing a pre-constructedTransferBuffer
set up for card-to-PC transfer of samples ("data"). The size of the buffer should be chosen according to the current number of active channels, the acquisition length and the number of acquisitions which you intend to download at a time using get_waveforms().
Inherited Members
- AbstractSpectrumCard
- model_number
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- set_sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
View Source
class SpectrumDigitiserStarHub( AbstractSpectrumStarHub[ SpectrumDigitiserCard, SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface ], AbstractSpectrumDigitiser, ): """Composite class of `SpectrumDigitiserCard` for controlling a StarHub digitiser device, for example the Spectrum NetBox. StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the child cards of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the control of a StarHub device as if it were a single Spectrum card.""" def __init__(self, device_number: int, child_cards: tuple[SpectrumDigitiserCard, ...], master_card_index: int): """ Args: device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. child_cards (Sequence[`SpectrumDigitiserCard`]): A list of `SpectrumCard` objects defining the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ super().__init__(device_number=device_number, child_cards=child_cards, master_card_index=master_card_index) self._acquisition_mode = self.acquisition_mode def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide `CardToPCDataTransferBuffer` objects for receiving acquired samples from the child cards. If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See `SpectrumDigitiserCard.define_transfer_buffer()` for more information Args: buffer (Optional[`CardToPCDataTransferBuffer`]): A list containing pre-constructed `CardToPCDataTransferBuffer` objects, one for each child card. The size of the buffers should be chosen according to the current number of active channels in each card and the acquisition length. """ if buffer: for card, buff in zip(self._child_cards, buffer): card.define_transfer_buffer([buff]) else: for card in self._child_cards: card.define_transfer_buffer() def wait_for_acquisition_to_complete(self) -> None: """Wait for each card to finish its acquisition. See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()` for more information.""" for card in self._child_cards: card.wait_for_acquisition_to_complete() def get_waveforms(self) -> List[List[NDArray[float_]]]: """Get a list of the most recently transferred waveforms. This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. See `SpectrumDigitiserCard.get_waveforms()` for more information. Args: num_acquisitions (int): For FIFO mode: the number of acquisitions (i.e. trigger events) to wait for and copy. Acquiring in batches (num_acquisitions > 1) can improve performance. Returns: waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition, and one array per enabled channel, in channel order. """ card_ids_and_waveform_sets: Dict[str, list[list[NDArray[float_]]]] = {} def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None: this_cards_waveforms = digitiser_card.get_waveforms() card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards] for thread in threads: thread.start() for thread in threads: thread.join() waveform_sets_all_cards_ordered = [] for n in range(self.batch_size): waveforms_in_this_batch = [] for card in self._child_cards: waveforms_in_this_batch += card_ids_and_waveform_sets[str(card)][n] waveform_sets_all_cards_ordered.append(waveforms_in_this_batch) return waveform_sets_all_cards_ordered def get_timestamp(self) -> Optional[datetime.datetime]: """Get timestamp for the last acquisition""" return self._triggering_card.get_timestamp() def enable_timestamping(self) -> None: self._triggering_card.enable_timestamping() @property def acquisition_length_in_samples(self) -> int: """The currently set recording length, which should be the same for all child cards. If different recording lengths are set, an exception is raised. See `SpectrumDigitiserCard.acquisition_length_in_samples` for more information. Returns: length_in_samples: The currently set acquisition length in samples.""" lengths = [] for d in self._child_cards: lengths.append(d.acquisition_length_in_samples) return check_settings_constant_across_devices(lengths, __name__) def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: """Set a new recording length for all child cards. See `SpectrumDigitiserCard.set_acquisition_length_in_samples()` for more information. Args: length_in_samples (int): The desired acquisition length in samples.""" for d in self._child_cards: d.set_acquisition_length_in_samples(length_in_samples) @property def post_trigger_length_in_samples(self) -> int: """The number of samples recorded after a trigger is received. This should be consistent across all child cards. If different values are found across the child cards, an exception is raised. See `SpectrumDigitiserCard.post_trigger_length_in_samples` for more information. Returns: length_in_samples (int): The current post trigger length in samples. """ lengths = [] for d in self._child_cards: lengths.append(d.post_trigger_length_in_samples) return check_settings_constant_across_devices(lengths, __name__) def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: """Set a new post trigger length for all child cards. See `SpectrumDigitiserCard.set_post_trigger_length_in_samples()` for more information. Args: length_in_samples (int): The desired post trigger length in samples. """ for d in self._child_cards: d.set_post_trigger_length_in_samples(length_in_samples) @property def acquisition_mode(self) -> AcquisitionMode: """The acquisition mode, which should be the same for all child cards. If it's not, an exception is raised. See `SpectrumDigitiserCard.acquisition_mode` for more information. Returns: mode (`AcquisitionMode`): The currently enabled acquisition mode. """ modes = [] for d in self._child_cards: modes.append(d.acquisition_mode) return AcquisitionMode(check_settings_constant_across_devices([m.value for m in modes], __name__)) def set_acquisition_mode(self, mode: AcquisitionMode) -> None: """Change the acquisition mode for all child cards. See `SpectrumDigitiserCard.set_acquisition_mode()` for more information. Args: mode (`AcquisitionMode`): The desired acquisition mode.""" for d in self._child_cards: d.set_acquisition_mode(mode) @property def batch_size(self) -> int: batch_sizes = [] for d in self._child_cards: batch_sizes.append(d.batch_size) return check_settings_constant_across_devices(batch_sizes, __name__) def set_batch_size(self, batch_size: int) -> None: for d in self._child_cards: d.set_batch_size(batch_size) def force_trigger(self) -> None: for d in self._child_cards: d.force_trigger() @property def type(self) -> CardType: return self._child_cards[0].type @property def model_number(self) -> ModelNumber: return self._child_cards[0].model_number @property def analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]: """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for more information. Returns: channels (Sequence[`SpectrumDigitiserAnalogChannelInterface`]): A tuple of `SpectrumDigitiserAnalogChannelInterface` objects. """ return super().analog_channels
Composite class of SpectrumDigitiserCard
for controlling a StarHub digitiser device, for example the Spectrum
NetBox. StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the
child cards of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the
control of a StarHub device as if it were a single Spectrum card.
View Source
def __init__(self, device_number: int, child_cards: tuple[SpectrumDigitiserCard, ...], master_card_index: int): """ Args: device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. child_cards (Sequence[`SpectrumDigitiserCard`]): A list of `SpectrumCard` objects defining the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ super().__init__(device_number=device_number, child_cards=child_cards, master_card_index=master_card_index) self._acquisition_mode = self.acquisition_mode
Args
- device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
- child_cards (Sequence[
SpectrumDigitiserCard
]): A list ofSpectrumCard
objects defining the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. - master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
View Source
def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide `CardToPCDataTransferBuffer` objects for receiving acquired samples from the child cards. If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See `SpectrumDigitiserCard.define_transfer_buffer()` for more information Args: buffer (Optional[`CardToPCDataTransferBuffer`]): A list containing pre-constructed `CardToPCDataTransferBuffer` objects, one for each child card. The size of the buffers should be chosen according to the current number of active channels in each card and the acquisition length. """ if buffer: for card, buff in zip(self._child_cards, buffer): card.define_transfer_buffer([buff]) else: for card in self._child_cards: card.define_transfer_buffer()
Create or provide CardToPCDataTransferBuffer
objects for receiving acquired samples from the child cards.
If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See
SpectrumDigitiserCard.define_transfer_buffer()
for more information
Args
- buffer (Optional[
CardToPCDataTransferBuffer
]): A list containing pre-constructed CardToPCDataTransferBuffer
objects, one for each child card. The size of the buffers should be chosen- according to the current number of active channels in each card and the acquisition length.
View Source
def wait_for_acquisition_to_complete(self) -> None: """Wait for each card to finish its acquisition. See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()` for more information.""" for card in self._child_cards: card.wait_for_acquisition_to_complete()
Wait for each card to finish its acquisition. See SpectrumDigitiserCard.wait_for_acquisition_to_complete()
for more information.
View Source
def get_waveforms(self) -> List[List[NDArray[float_]]]: """Get a list of the most recently transferred waveforms. This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. See `SpectrumDigitiserCard.get_waveforms()` for more information. Args: num_acquisitions (int): For FIFO mode: the number of acquisitions (i.e. trigger events) to wait for and copy. Acquiring in batches (num_acquisitions > 1) can improve performance. Returns: waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition, and one array per enabled channel, in channel order. """ card_ids_and_waveform_sets: Dict[str, list[list[NDArray[float_]]]] = {} def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None: this_cards_waveforms = digitiser_card.get_waveforms() card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards] for thread in threads: thread.start() for thread in threads: thread.join() waveform_sets_all_cards_ordered = [] for n in range(self.batch_size): waveforms_in_this_batch = [] for card in self._child_cards: waveforms_in_this_batch += card_ids_and_waveform_sets[str(card)][n] waveform_sets_all_cards_ordered.append(waveforms_in_this_batch) return waveform_sets_all_cards_ordered
Get a list of the most recently transferred waveforms.
This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
See SpectrumDigitiserCard.get_waveforms()
for more information.
Args
- num_acquisitions (int): For FIFO mode: the number of acquisitions (i.e. trigger events) to wait for and
- copy. Acquiring in batches (num_acquisitions > 1) can improve performance.
Returns
waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition, and one array per enabled channel, in channel order.
View Source
def get_timestamp(self) -> Optional[datetime.datetime]: """Get timestamp for the last acquisition""" return self._triggering_card.get_timestamp()
Get timestamp for the last acquisition
View Source
def enable_timestamping(self) -> None: self._triggering_card.enable_timestamping()
The currently set recording length, which should be the same for all child cards. If different recording
lengths are set, an exception is raised. See SpectrumDigitiserCard.acquisition_length_in_samples
for more
information.
Returns
length_in_samples: The currently set acquisition length in samples.
View Source
def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: """Set a new recording length for all child cards. See `SpectrumDigitiserCard.set_acquisition_length_in_samples()` for more information. Args: length_in_samples (int): The desired acquisition length in samples.""" for d in self._child_cards: d.set_acquisition_length_in_samples(length_in_samples)
Set a new recording length for all child cards. See SpectrumDigitiserCard.set_acquisition_length_in_samples()
for more information.
Args
- length_in_samples (int): The desired acquisition length in samples.
The number of samples recorded after a trigger is received. This should be consistent across all child
cards. If different values are found across the child cards, an exception is raised. See
SpectrumDigitiserCard.post_trigger_length_in_samples
for more information.
Returns
length_in_samples (int): The current post trigger length in samples.
View Source
def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: """Set a new post trigger length for all child cards. See `SpectrumDigitiserCard.set_post_trigger_length_in_samples()` for more information. Args: length_in_samples (int): The desired post trigger length in samples. """ for d in self._child_cards: d.set_post_trigger_length_in_samples(length_in_samples)
Set a new post trigger length for all child cards. See SpectrumDigitiserCard.set_post_trigger_length_in_samples()
for more information.
Args
- length_in_samples (int): The desired post trigger length in samples.
The acquisition mode, which should be the same for all child cards. If it's not, an exception is raised.
See SpectrumDigitiserCard.acquisition_mode
for more information.
Returns
mode (
AcquisitionMode
): The currently enabled acquisition mode.
View Source
def set_acquisition_mode(self, mode: AcquisitionMode) -> None: """Change the acquisition mode for all child cards. See `SpectrumDigitiserCard.set_acquisition_mode()` for more information. Args: mode (`AcquisitionMode`): The desired acquisition mode.""" for d in self._child_cards: d.set_acquisition_mode(mode)
Change the acquisition mode for all child cards. See SpectrumDigitiserCard.set_acquisition_mode()
for more
information.
Args
- mode (
AcquisitionMode
): The desired acquisition mode.
View Source
def set_batch_size(self, batch_size: int) -> None: for d in self._child_cards: d.set_batch_size(batch_size)
View Source
def force_trigger(self) -> None: for d in self._child_cards: d.force_trigger()
A tuple containing of all the channels of the child cards of the hub. See AbstractSpectrumCard.channels
for
more information.
Returns
channels (Sequence[
SpectrumDigitiserAnalogChannelInterface
]): A tuple ofSpectrumDigitiserAnalogChannelInterface
objects.
Inherited Members
- AbstractSpectrumStarHub
- disconnect
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- connected
- set_triggering_card
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- enabled_analog_channel_nums
- set_enabled_analog_channels
- transfer_buffers
- io_lines
- timeout_in_ms
- set_timeout_in_ms
- feature_list
- available_io_modes
- bytes_per_sample
View Source
class MockSpectrumDigitiserCard(MockAbstractSpectrumDigitiser, MockAbstractSpectrumCard, SpectrumDigitiserCard): """A mock spectrum card, for testing software written to use the `SpectrumDigitiserCard` class. This class overrides methods of `SpectrumDigitiserCard` that communicate with hardware with mocked implementations, allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. It overrides methods to use to set up a mock 'on-device buffer' attribute into which a mock waveform source will write samples. It also uses a MockTimestamper to generated timestamps for mock waveforms. """ def __init__( self, device_number: int, model: ModelNumber, mock_source_frame_rate_hz: float, num_modules: int, num_channels_per_module: int, card_features: Optional[list[CardFeature]] = None, advanced_card_features: Optional[list[AdvancedCardFeature]] = None, ): """ Args: device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally. model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths. mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing data to the mock spectrum card. num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has. num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set. card_features (list[CardFeature]): List of available features of the mock device advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device """ super().__init__( device_number=device_number, model=model, mock_source_frame_rate_hz=mock_source_frame_rate_hz, num_modules=num_modules, num_channels_per_module=num_channels_per_module, card_type=CardType.SPCM_TYPE_AI, card_features=card_features if card_features is not None else [], advanced_card_features=advanced_card_features if advanced_card_features is not None else [], ) self._connect(self._visa_string) self._acquisition_mode = self.acquisition_mode self._previous_transfer_chunk_count = 0 self._param_dict[TRANSFER_CHUNK_COUNTER] = 0 def enable_timestamping(self) -> None: self._timestamper: MockTimestamper = MockTimestamper(self, self._handle) def set_acquisition_mode(self, mode: AcquisitionMode) -> None: """Mock timestamper needs to be recreated if the acquisition mode is changed.""" super().set_acquisition_mode(mode) self._timestamper = MockTimestamper(self, self._handle) def set_sample_rate_in_hz(self, rate: int) -> None: """Mock timestamper needs to be recreated if the sample rate is changed.""" super().set_sample_rate_in_hz(rate) self._timestamper = MockTimestamper(self, self._handle) def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: """Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples. See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal attributes related to the mock on-device buffer can be set. Args: length_in_samples (int): Number of samples in each generated mock waveform """ super().set_acquisition_length_in_samples(length_in_samples) def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Set the channels to enable for the mock acquisition. See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal attributes related to the mock on-device buffer can be set. Args: channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2]. """ if len(list(filter(lambda x: 0 <= x < len(self.analog_channels), channels_nums))) == len(channels_nums): super().set_enabled_analog_channels(channels_nums) else: raise SpectrumSettingsMismatchError("Not enough channels in mock device configuration.") def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.""" self._set_or_update_transfer_buffer_attribute(buffer) def start_transfer(self) -> None: """See `SpectrumDigitiserCard.start_transfer()`.""" pass def stop_transfer(self) -> None: """See `SpectrumDigitiserCard.stop_transfer()`.""" pass def wait_for_transfer_chunk_to_complete(self) -> None: """See `SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()`. This mock implementation blocks until a new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER.""" if self._transfer_buffer: t0 = perf_counter() t_elapsed = 0.0 while ( self._previous_transfer_chunk_count == self._param_dict[TRANSFER_CHUNK_COUNTER] ) and t_elapsed < MOCK_TRANSFER_TIMEOUT_IN_S: sleep(0.1) t_elapsed = perf_counter() - t0 self._previous_transfer_chunk_count = self._param_dict[TRANSFER_CHUNK_COUNTER] else: raise SpectrumNoTransferBufferDefined("No transfer in progress.") def wait_for_acquisition_to_complete(self) -> None: """See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`. This mock implementation blocks until a mock acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out according to the `self.timeout_ms attribute`.""" if self._acquisition_thread is not None: self._acquisition_thread.join(timeout=1e-3 * self.timeout_in_ms) if self._acquisition_thread.is_alive(): logger.warning("A timeout occurred while waiting for mock acquisition to complete.") else: logger.warning("No acquisition in progress. Wait for acquisition to complete has no effect")
A mock spectrum card, for testing software written to use the SpectrumDigitiserCard
class.
This class overrides methods of SpectrumDigitiserCard
that communicate with hardware with mocked implementations,
allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. It overrides
methods to use to set up a mock 'on-device buffer' attribute into which a mock waveform source will write
samples. It also uses a MockTimestamper to generated timestamps for mock waveforms.
View Source
def __init__( self, device_number: int, model: ModelNumber, mock_source_frame_rate_hz: float, num_modules: int, num_channels_per_module: int, card_features: Optional[list[CardFeature]] = None, advanced_card_features: Optional[list[AdvancedCardFeature]] = None, ): """ Args: device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally. model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths. mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing data to the mock spectrum card. num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has. num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set. card_features (list[CardFeature]): List of available features of the mock device advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device """ super().__init__( device_number=device_number, model=model, mock_source_frame_rate_hz=mock_source_frame_rate_hz, num_modules=num_modules, num_channels_per_module=num_channels_per_module, card_type=CardType.SPCM_TYPE_AI, card_features=card_features if card_features is not None else [], advanced_card_features=advanced_card_features if advanced_card_features is not None else [], ) self._connect(self._visa_string) self._acquisition_mode = self.acquisition_mode self._previous_transfer_chunk_count = 0 self._param_dict[TRANSFER_CHUNK_COUNTER] = 0
Args
- device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally.
- model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths.
- mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing data to the mock spectrum card.
- num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has.
- num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set.
- card_features (list[CardFeature]): List of available features of the mock device
- advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
View Source
def enable_timestamping(self) -> None: self._timestamper: MockTimestamper = MockTimestamper(self, self._handle)
View Source
def set_acquisition_mode(self, mode: AcquisitionMode) -> None: """Mock timestamper needs to be recreated if the acquisition mode is changed.""" super().set_acquisition_mode(mode) self._timestamper = MockTimestamper(self, self._handle)
Mock timestamper needs to be recreated if the acquisition mode is changed.
View Source
def set_sample_rate_in_hz(self, rate: int) -> None: """Mock timestamper needs to be recreated if the sample rate is changed.""" super().set_sample_rate_in_hz(rate) self._timestamper = MockTimestamper(self, self._handle)
Mock timestamper needs to be recreated if the sample rate is changed.
View Source
def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: """Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples. See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal attributes related to the mock on-device buffer can be set. Args: length_in_samples (int): Number of samples in each generated mock waveform """ super().set_acquisition_length_in_samples(length_in_samples)
Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples.
See SpectrumDigitiserCard
for more information. This method is overridden here only so that the internal
attributes related to the mock on-device buffer can be set.
Args
- length_in_samples (int): Number of samples in each generated mock waveform
View Source
def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Set the channels to enable for the mock acquisition. See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal attributes related to the mock on-device buffer can be set. Args: channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2]. """ if len(list(filter(lambda x: 0 <= x < len(self.analog_channels), channels_nums))) == len(channels_nums): super().set_enabled_analog_channels(channels_nums) else: raise SpectrumSettingsMismatchError("Not enough channels in mock device configuration.")
Set the channels to enable for the mock acquisition. See SpectrumDigitiserCard
for more information. This
method is overridden here only so that the internal attributes related to the mock on-device buffer
can be set.
Args
- channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2].
View Source
def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.""" self._set_or_update_transfer_buffer_attribute(buffer)
Create or provide a TransferBuffer
object for receiving acquired samples from the device.
See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.
View Source
def start_transfer(self) -> None: """See `SpectrumDigitiserCard.start_transfer()`.""" pass
View Source
def stop_transfer(self) -> None: """See `SpectrumDigitiserCard.stop_transfer()`.""" pass
View Source
def wait_for_transfer_chunk_to_complete(self) -> None: """See `SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()`. This mock implementation blocks until a new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER.""" if self._transfer_buffer: t0 = perf_counter() t_elapsed = 0.0 while ( self._previous_transfer_chunk_count == self._param_dict[TRANSFER_CHUNK_COUNTER] ) and t_elapsed < MOCK_TRANSFER_TIMEOUT_IN_S: sleep(0.1) t_elapsed = perf_counter() - t0 self._previous_transfer_chunk_count = self._param_dict[TRANSFER_CHUNK_COUNTER] else: raise SpectrumNoTransferBufferDefined("No transfer in progress.")
See SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()
. This mock implementation blocks until a
new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER.
View Source
def wait_for_acquisition_to_complete(self) -> None: """See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`. This mock implementation blocks until a mock acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out according to the `self.timeout_ms attribute`.""" if self._acquisition_thread is not None: self._acquisition_thread.join(timeout=1e-3 * self.timeout_in_ms) if self._acquisition_thread.is_alive(): logger.warning("A timeout occurred while waiting for mock acquisition to complete.") else: logger.warning("No acquisition in progress. Wait for acquisition to complete has no effect")
See SpectrumDigitiserCard.wait_for_acquisition_to_complete()
. This mock implementation blocks until a mock
acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out
according to the self.timeout_ms attribute
.
Inherited Members
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDigitiser
- start
- stop
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDevice
- write_to_spectrum_device_register
- read_spectrum_device_register
- SpectrumDigitiserCard
- get_waveforms
- get_timestamp
- acquisition_length_in_samples
- post_trigger_length_in_samples
- set_post_trigger_length_in_samples
- number_of_averages
- set_number_of_averages
- acquisition_mode
- batch_size
- set_batch_size
- AbstractSpectrumCard
- model_number
- reconnect
- status
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
View Source
class MockSpectrumDigitiserStarHub(MockAbstractSpectrumStarHub, SpectrumDigitiserStarHub): """A mock spectrum StarHub, for testing software written to use the `SpectrumStarHub` class. Overrides methods of `SpectrumStarHub` and `AbstractSpectrumDigitiser` that communicate with hardware with mocked implementations allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI.""" def __init__(self, **kwargs: Any): """ Args: child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumDigitiserCard` objects defining the properties of the child cards located within the mock hub. master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ super().__init__(param_dict=None, **kwargs) self._visa_string = "/mock" + self._visa_string self._connect(self._visa_string) self._acquisition_mode = self.acquisition_mode def start(self) -> None: """Start a mock acquisition See `AbstractSpectrumDevice.start()`. With a hardware device, StarHub's only need to be sent a single instruction to start acquisition, which they automatically relay to their child cards - hence why `start` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock `implementation`, each card's acquisition is started individually. """ for card in self._child_cards: card.start() def stop(self) -> None: """Stop a mock acquisition See `AbstractSpectrumDevice.stop_acquisition`. With a hardware device, StarHub's only need to be sent a single instruction to stop acquisition, which they automatically relay to their child cards - hence why `stop_acquisition()` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock implementation, each card's acquisition is stopped individually. """ for card in self._child_cards: card.stop()
A mock spectrum StarHub, for testing software written to use the SpectrumStarHub
class.
Overrides methods of SpectrumStarHub
and AbstractSpectrumDigitiser
that communicate with hardware with mocked
implementations allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during
CI.
View Source
def __init__(self, **kwargs: Any): """ Args: child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumDigitiserCard` objects defining the properties of the child cards located within the mock hub. master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ super().__init__(param_dict=None, **kwargs) self._visa_string = "/mock" + self._visa_string self._connect(self._visa_string) self._acquisition_mode = self.acquisition_mode
Args
- child_cards (Sequence[
MockSpectrumDigitiserCard
]): A list ofMockSpectrumDigitiserCard
objects defining the properties of the child cards located within the mock hub. - master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
View Source
def start(self) -> None: """Start a mock acquisition See `AbstractSpectrumDevice.start()`. With a hardware device, StarHub's only need to be sent a single instruction to start acquisition, which they automatically relay to their child cards - hence why `start` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock `implementation`, each card's acquisition is started individually. """ for card in self._child_cards: card.start()
Start a mock acquisition
See AbstractSpectrumDevice.start()
. With a hardware device, StarHub's only need to be sent a single
instruction to start acquisition, which they automatically relay to their child cards - hence why
start
is implemented in AbstractSpectrumDevice
(base class to both SpectrumDigitiserCard
and
SpectrumStarHub
) rather than in SpectrumStarHub
. In this mock implementation
, each card's acquisition is
started individually.
View Source
def stop(self) -> None: """Stop a mock acquisition See `AbstractSpectrumDevice.stop_acquisition`. With a hardware device, StarHub's only need to be sent a single instruction to stop acquisition, which they automatically relay to their child cards - hence why `stop_acquisition()` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock implementation, each card's acquisition is stopped individually. """ for card in self._child_cards: card.stop()
Stop a mock acquisition
See AbstractSpectrumDevice.stop_acquisition
. With a hardware device, StarHub's only need to be sent a single
instruction to stop acquisition, which they automatically relay to their child cards - hence why
stop_acquisition()
is implemented in AbstractSpectrumDevice
(base class to both SpectrumDigitiserCard
and
SpectrumStarHub
) rather than in SpectrumStarHub
. In this mock implementation, each card's acquisition is
stopped individually.
Inherited Members
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDevice
- write_to_spectrum_device_register
- read_spectrum_device_register
- SpectrumDigitiserStarHub
- define_transfer_buffer
- wait_for_acquisition_to_complete
- get_waveforms
- get_timestamp
- enable_timestamping
- acquisition_length_in_samples
- set_acquisition_length_in_samples
- post_trigger_length_in_samples
- set_post_trigger_length_in_samples
- acquisition_mode
- set_acquisition_mode
- batch_size
- set_batch_size
- force_trigger
- type
- model_number
- analog_channels
- AbstractSpectrumStarHub
- disconnect
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- connected
- set_triggering_card
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- enabled_analog_channel_nums
- set_enabled_analog_channels
- transfer_buffers
- io_lines
- timeout_in_ms
- set_timeout_in_ms
- feature_list
- available_io_modes
- bytes_per_sample
View Source
class AbstractSpectrumDigitiser( AbstractSpectrumDevice[SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface], SpectrumDigitiserInterface, ABC, ): """Abstract superclass which implements methods common to all Spectrum digitiser devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes (`SpectrumDigitiserCard`, `SpectrumDigitiserStarHub` or their mock equivalents) which inherit the methods defined here. Note that the mock devices override several of the methods defined here.""" def configure_acquisition(self, settings: AcquisitionSettings) -> None: """Apply all the settings contained in an `AcquisitionSettings` dataclass to the device. Args: settings (`AcquisitionSettings`): An `AcquisitionSettings` dataclass containing the setting values to apply. """ if settings.batch_size > 1 and settings.acquisition_mode == AcquisitionMode.SPC_REC_STD_SINGLE: raise ValueError("In standard single mode, only 1 acquisition can be downloaded at a time.") self._acquisition_mode = settings.acquisition_mode self.set_batch_size(settings.batch_size) self.set_acquisition_mode(settings.acquisition_mode) self.set_sample_rate_in_hz(settings.sample_rate_in_hz) self.set_acquisition_length_in_samples(settings.acquisition_length_in_samples) self.set_post_trigger_length_in_samples( settings.acquisition_length_in_samples - settings.pre_trigger_length_in_samples ) self.set_timeout_in_ms(settings.timeout_in_ms) self.set_enabled_analog_channels(settings.enabled_channels) # Apply channel dependent settings for channel_num, v_range, v_offset, impedance in zip( self.enabled_analog_channel_nums, settings.vertical_ranges_in_mv, settings.vertical_offsets_in_percent, settings.input_impedances, ): channel = self.analog_channels[channel_num] channel.set_vertical_range_in_mv(v_range) channel.set_vertical_offset_in_percent(v_offset) channel.set_input_impedance(impedance) # Only some hardware has software programmable input coupling, so coupling can be None if settings.input_couplings is not None: for channel, coupling in zip(self.analog_channels, settings.input_couplings): channel.set_input_coupling(coupling) # Only some hardware has software programmable input paths, so it can be None if settings.input_paths is not None: for channel, path in zip(self.analog_channels, settings.input_paths): channel.set_input_path(path) # Write the configuration to the card self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) if settings.timestamping_enabled: self.enable_timestamping() def execute_standard_single_acquisition(self) -> Measurement: """Carry out a single measurement in standard single mode and return the acquired waveforms. This method automatically carries out a standard single mode acquisition, including handling the creation of a `TransferBuffer` and the retrieval of the acquired waveforms. After being called, it will wait until a trigger event is received before carrying out the acquisition and then transferring and returning the acquired waveforms. The device must be configured in SPC_REC_STD_SINGLE acquisition mode. Returns: measurement (Measurement): A Measurement object. The `.waveforms` attribute of `measurement` will be a list of 1D NumPy arrays, each array containing the waveform data received on one channel, in channel order. The Waveform object also has a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered. """ if self._acquisition_mode != AcquisitionMode.SPC_REC_STD_SINGLE: raise SpectrumWrongAcquisitionMode( "Set the acquisition mode to SPC_REC_STD_SINGLE using " "configure_acquisition() or set_acquisition_mode() before executing " "a standard single mode acquisition." ) self.start() self.wait_for_acquisition_to_complete() self.define_transfer_buffer() self.start_transfer() self.wait_for_transfer_chunk_to_complete() waveforms = self.get_waveforms()[0] self.stop() # Only strictly required for Mock devices. Should not affect hardware. return Measurement(waveforms=waveforms, timestamp=self.get_timestamp()) def execute_finite_fifo_acquisition(self, num_measurements: int) -> List[Measurement]: """Carry out a finite number of FIFO mode measurements and then stop the acquisitions. This method automatically carries out a defined number of measurement in Multi FIFO mode, including handling the creation of a `TransferBuffer`, streaming the acquired waveforms to the PC, terminating the acquisition and returning the acquired waveforms. After being called, it will wait for the requested number of triggers to be received, generating the correct number of measurements. It retrieves each measurement's waveforms from the `TransferBuffer` as they arrive. Once the requested number of measurements have been received, the acquisition is terminated and the waveforms are returned. The device must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode. Args: num_measurements (int): The number of measurements to carry out. Returns: measurements (List[Measurement]): A list of Measurement objects with length `num_measurements`. Each Measurement object has a `waveforms` attribute containing a list of 1D NumPy arrays. Each array is a waveform acquired from one channel. The arrays are in channel order. The Waveform objects also have a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered. """ if (num_measurements % self.batch_size) != 0: raise ValueError( "Number of measurements in a finite FIFO acquisition must be a multiple of the " " batch size configured using AbstractSpectrumDigitiser.configure_acquisition()." ) self.execute_continuous_fifo_acquisition() measurements = [] for _ in range(num_measurements // self.batch_size): measurements += [ Measurement(waveforms=frame, timestamp=self.get_timestamp()) for frame in self.get_waveforms() ] self.stop() return measurements def execute_continuous_fifo_acquisition(self) -> None: """Start a continuous FIFO mode acquisition. This method automatically starts acquiring and streaming samples in FIFO mode, including handling the creation of a `TransferBuffer` and streaming the acquired waveforms to the PC. It will return almost instantaneously. The acquired waveforms must then be read out of the transfer buffer in a loop using the `get_waveforms()` method. Waveforms must be read at least as fast as they are being acquired. The FIFO acquisition and streaming will continue until `stop_acquisition()` is called. The device must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode.""" if self._acquisition_mode not in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): raise SpectrumWrongAcquisitionMode( "Set the acquisition mode to SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE using " "configure_acquisition() or set_acquisition_mode() before executing " "a fifo mode acquisition." ) self.define_transfer_buffer() self.start() self.start_transfer()
Abstract superclass which implements methods common to all Spectrum digitiser devices. Instances of this class
cannot be constructed directly. Instead, construct instances of the concrete classes (SpectrumDigitiserCard
,
SpectrumDigitiserStarHub
or their mock equivalents) which inherit the methods defined here. Note that
the mock devices override several of the methods defined here.
View Source
def configure_acquisition(self, settings: AcquisitionSettings) -> None: """Apply all the settings contained in an `AcquisitionSettings` dataclass to the device. Args: settings (`AcquisitionSettings`): An `AcquisitionSettings` dataclass containing the setting values to apply. """ if settings.batch_size > 1 and settings.acquisition_mode == AcquisitionMode.SPC_REC_STD_SINGLE: raise ValueError("In standard single mode, only 1 acquisition can be downloaded at a time.") self._acquisition_mode = settings.acquisition_mode self.set_batch_size(settings.batch_size) self.set_acquisition_mode(settings.acquisition_mode) self.set_sample_rate_in_hz(settings.sample_rate_in_hz) self.set_acquisition_length_in_samples(settings.acquisition_length_in_samples) self.set_post_trigger_length_in_samples( settings.acquisition_length_in_samples - settings.pre_trigger_length_in_samples ) self.set_timeout_in_ms(settings.timeout_in_ms) self.set_enabled_analog_channels(settings.enabled_channels) # Apply channel dependent settings for channel_num, v_range, v_offset, impedance in zip( self.enabled_analog_channel_nums, settings.vertical_ranges_in_mv, settings.vertical_offsets_in_percent, settings.input_impedances, ): channel = self.analog_channels[channel_num] channel.set_vertical_range_in_mv(v_range) channel.set_vertical_offset_in_percent(v_offset) channel.set_input_impedance(impedance) # Only some hardware has software programmable input coupling, so coupling can be None if settings.input_couplings is not None: for channel, coupling in zip(self.analog_channels, settings.input_couplings): channel.set_input_coupling(coupling) # Only some hardware has software programmable input paths, so it can be None if settings.input_paths is not None: for channel, path in zip(self.analog_channels, settings.input_paths): channel.set_input_path(path) # Write the configuration to the card self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) if settings.timestamping_enabled: self.enable_timestamping()
Apply all the settings contained in an AcquisitionSettings
dataclass to the device.
Args
- settings (
AcquisitionSettings
): AnAcquisitionSettings
dataclass containing the setting values to apply.
View Source
def execute_standard_single_acquisition(self) -> Measurement: """Carry out a single measurement in standard single mode and return the acquired waveforms. This method automatically carries out a standard single mode acquisition, including handling the creation of a `TransferBuffer` and the retrieval of the acquired waveforms. After being called, it will wait until a trigger event is received before carrying out the acquisition and then transferring and returning the acquired waveforms. The device must be configured in SPC_REC_STD_SINGLE acquisition mode. Returns: measurement (Measurement): A Measurement object. The `.waveforms` attribute of `measurement` will be a list of 1D NumPy arrays, each array containing the waveform data received on one channel, in channel order. The Waveform object also has a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered. """ if self._acquisition_mode != AcquisitionMode.SPC_REC_STD_SINGLE: raise SpectrumWrongAcquisitionMode( "Set the acquisition mode to SPC_REC_STD_SINGLE using " "configure_acquisition() or set_acquisition_mode() before executing " "a standard single mode acquisition." ) self.start() self.wait_for_acquisition_to_complete() self.define_transfer_buffer() self.start_transfer() self.wait_for_transfer_chunk_to_complete() waveforms = self.get_waveforms()[0] self.stop() # Only strictly required for Mock devices. Should not affect hardware. return Measurement(waveforms=waveforms, timestamp=self.get_timestamp())
Carry out a single measurement in standard single mode and return the acquired waveforms.
This method automatically carries out a standard single mode acquisition, including handling the creation
of a TransferBuffer
and the retrieval of the acquired waveforms. After being called, it will wait until a
trigger event is received before carrying out the acquisition and then transferring and returning the acquired
waveforms. The device must be configured in SPC_REC_STD_SINGLE acquisition mode.
Returns
measurement (Measurement): A Measurement object. The
.waveforms
attribute ofmeasurement
will be a list of 1D NumPy arrays, each array containing the waveform data received on one channel, in channel order. The Waveform object also has a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered.
View Source
def execute_finite_fifo_acquisition(self, num_measurements: int) -> List[Measurement]: """Carry out a finite number of FIFO mode measurements and then stop the acquisitions. This method automatically carries out a defined number of measurement in Multi FIFO mode, including handling the creation of a `TransferBuffer`, streaming the acquired waveforms to the PC, terminating the acquisition and returning the acquired waveforms. After being called, it will wait for the requested number of triggers to be received, generating the correct number of measurements. It retrieves each measurement's waveforms from the `TransferBuffer` as they arrive. Once the requested number of measurements have been received, the acquisition is terminated and the waveforms are returned. The device must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode. Args: num_measurements (int): The number of measurements to carry out. Returns: measurements (List[Measurement]): A list of Measurement objects with length `num_measurements`. Each Measurement object has a `waveforms` attribute containing a list of 1D NumPy arrays. Each array is a waveform acquired from one channel. The arrays are in channel order. The Waveform objects also have a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered. """ if (num_measurements % self.batch_size) != 0: raise ValueError( "Number of measurements in a finite FIFO acquisition must be a multiple of the " " batch size configured using AbstractSpectrumDigitiser.configure_acquisition()." ) self.execute_continuous_fifo_acquisition() measurements = [] for _ in range(num_measurements // self.batch_size): measurements += [ Measurement(waveforms=frame, timestamp=self.get_timestamp()) for frame in self.get_waveforms() ] self.stop() return measurements
Carry out a finite number of FIFO mode measurements and then stop the acquisitions.
This method automatically carries out a defined number of measurement in Multi FIFO mode, including handling the
creation of a TransferBuffer
, streaming the acquired waveforms to the PC, terminating the acquisition and
returning the acquired waveforms. After being called, it will wait for the requested number of triggers to be
received, generating the correct number of measurements. It retrieves each measurement's waveforms from the
TransferBuffer
as they arrive. Once the requested number of measurements have been received, the acquisition
is terminated and the waveforms are returned. The device must be configured in SPC_REC_FIFO_MULTI or
SPC_REC_FIFO_AVERAGE acquisition mode.
Args
- num_measurements (int): The number of measurements to carry out.
Returns
measurements (List[Measurement]): A list of Measurement objects with length
num_measurements
. Each Measurement object has awaveforms
attribute containing a list of 1D NumPy arrays. Each array is a waveform acquired from one channel. The arrays are in channel order. The Waveform objects also have a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered.
View Source
def execute_continuous_fifo_acquisition(self) -> None: """Start a continuous FIFO mode acquisition. This method automatically starts acquiring and streaming samples in FIFO mode, including handling the creation of a `TransferBuffer` and streaming the acquired waveforms to the PC. It will return almost instantaneously. The acquired waveforms must then be read out of the transfer buffer in a loop using the `get_waveforms()` method. Waveforms must be read at least as fast as they are being acquired. The FIFO acquisition and streaming will continue until `stop_acquisition()` is called. The device must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode.""" if self._acquisition_mode not in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): raise SpectrumWrongAcquisitionMode( "Set the acquisition mode to SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE using " "configure_acquisition() or set_acquisition_mode() before executing " "a fifo mode acquisition." ) self.define_transfer_buffer() self.start() self.start_transfer()
Start a continuous FIFO mode acquisition.
This method automatically starts acquiring and streaming samples in FIFO mode, including handling the
creation of a TransferBuffer
and streaming the acquired waveforms to the PC. It will return almost
instantaneously. The acquired waveforms must then be read out of the transfer buffer in a loop using the
get_waveforms()
method. Waveforms must be read at least as fast as they are being acquired.
The FIFO acquisition and streaming will continue until stop_acquisition()
is called. The device
must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode.
Inherited Members
- AbstractSpectrumDevice
- reset
- start
- stop
- configure_trigger
- configure_channel_pairing
- write_to_spectrum_device_register
- read_spectrum_device_register
- spectrumdevice.devices.digitiser.digitiser_interface.SpectrumDigitiserInterface
- wait_for_acquisition_to_complete
- get_waveforms
- get_timestamp
- enable_timestamping
- acquisition_length_in_samples
- set_acquisition_length_in_samples
- post_trigger_length_in_samples
- set_post_trigger_length_in_samples
- acquisition_mode
- set_acquisition_mode
- batch_size
- set_batch_size
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- connected
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- disconnect
- transfer_buffers
- define_transfer_buffer
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- available_io_modes
- feature_list
- timeout_in_ms
- set_timeout_in_ms
- force_trigger
- bytes_per_sample
- type
- model_number
View Source
class AbstractSpectrumStarHub( AbstractSpectrumDevice, Generic[CardType, AnalogChannelInterfaceType, IOLineInterfaceType], ABC ): """Composite abstract class of `AbstractSpectrumCard` implementing methods common to all StarHubs. StarHubs are composites of more than one Spectrum card. Acquisition and generation from the child cards of a StarHub is synchronised, aggregating the channels of all child cards.""" def __init__(self, device_number: int, child_cards: Sequence[CardType], master_card_index: int, **kwargs: Any): """ Args: device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. child_cards (Sequence[`SpectrumDeviceInterface`]): A list of objects representing the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ self._child_cards: Sequence[CardType] = child_cards self._master_card = child_cards[master_card_index] self._triggering_card = child_cards[master_card_index] child_card_logical_indices = (2**n for n in range(len(self._child_cards))) self._visa_string = f"sync{device_number}" self._connect(self._visa_string) all_cards_binary_mask = reduce(or_, child_card_logical_indices) self.write_to_spectrum_device_register(SPC_SYNC_ENABLEMASK, all_cards_binary_mask) def disconnect(self) -> None: """Disconnects from each child card and terminates connection to the hub itself.""" if self._connected: destroy_handle(self._handle) for card in self._child_cards: card.disconnect() self._connected = False def reconnect(self) -> None: """Reconnects to the hub after a `disconnect()`, and reconnects to each child card.""" self._connect(self._visa_string) for card in self._child_cards: card.reconnect() @property def status(self) -> DEVICE_STATUS_TYPE: """The statuses of each child card, in a list. See `SpectrumDigitiserCard.status` for more information. Returns: statuses (List[List[`CardStatus`]]): A list of lists of `CardStatus` (each card has a list of statuses). """ return DEVICE_STATUS_TYPE([card.status[0] for card in self._child_cards]) def start_transfer(self) -> None: """Start the transfer of data between the on-device buffer of each child card and its `TransferBuffer`. See `AbstractSpectrumCard.start_transfer()` for more information.""" for card in self._child_cards: card.start_transfer() def stop_transfer(self) -> None: """Stop the transfer of data between each card and its `TransferBuffer`. See `AbstractSpectrumCard.stop_transfer()` for more information.""" for card in self._child_cards: card.stop_transfer() def wait_for_transfer_chunk_to_complete(self) -> None: """Wait for all cards to stop transferring data to/from their `TransferBuffers`. See `AbstractSpectrumCard.wait_for_transfer_to_complete()` for more information.""" for card in self._child_cards: card.wait_for_transfer_chunk_to_complete() @property def connected(self) -> bool: """True if the StarHub is connected, False if not.""" return self._connected def set_triggering_card(self, card_index: int) -> None: """Change the index of the child card responsible for receiving a trigger. During construction, this is set equal to the index of the master card but in some situations it may be necessary to change it. Args: card_index (int): The index of the StarHub's triggering card within the list of child cards provided on __init__(). """ self._triggering_card = self._child_cards[card_index] @property def clock_mode(self) -> ClockMode: """The clock mode currently configured on the master card. Returns: mode (`ClockMode`): The currently configured clock mode.""" return self._master_card.clock_mode def set_clock_mode(self, mode: ClockMode) -> None: """Change the clock mode configured on the master card. Args: mode (`ClockMode`): The desired clock mode.""" self._master_card.set_clock_mode(mode) @property def sample_rate_in_hz(self) -> int: """The sample rate configured on the master card. Returns: rate (int): The current sample rate of the master card in Hz. """ return self._master_card.sample_rate_in_hz def set_sample_rate_in_hz(self, rate: int) -> None: """Change the sample rate of the child cards (including the master card). Args: rate (int): The desired sample rate of the child cards in Hz. """ for card in self._child_cards: card.set_sample_rate_in_hz(rate) @property def trigger_sources(self) -> List[TriggerSource]: """The trigger sources configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.trigger_sources()` for more information. Returns: sources (List[`TriggerSource`]): A list of the currently enabled trigger sources.""" return self._triggering_card.trigger_sources def set_trigger_sources(self, sources: List[TriggerSource]) -> None: """Change the trigger sources configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.trigger_sources()` for more information. Args: sources (List[`TriggerSource`]): The trigger sources to enable, in a list.""" self._triggering_card.set_trigger_sources(sources) for card in self._child_cards: if card is not self._triggering_card: card.set_trigger_sources([TriggerSource.SPC_TMASK_NONE]) @property def external_trigger_mode(self) -> ExternalTriggerMode: """The trigger mode configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.external_trigger_mode()` for more information. Returns: mode (`ExternalTriggerMode`): The currently set external trigger mode. """ return self._triggering_card.external_trigger_mode def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: """Change the trigger mode configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.set_external_trigger_mode()` for more information. Args: mode (`ExternalTriggerMode`): The desired external trigger mode.""" self._triggering_card.set_external_trigger_mode(mode) @property def external_trigger_level_in_mv(self) -> int: """The external trigger level configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.external_trigger_level_mv()` for more information. Returns: level (int): The external trigger level in mV. """ return self._triggering_card.external_trigger_level_in_mv def set_external_trigger_level_in_mv(self, level: int) -> None: """Change the external trigger level configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.set_external_trigger_level_mv()` for more information. Args: level (int): The desired external trigger level in mV. """ self._triggering_card.set_external_trigger_level_in_mv(level) @property def external_trigger_pulse_width_in_samples(self) -> int: """The trigger pulse width (samples) configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.external_trigger_pulse_width_in_samples()` for more information. Returns: width (int): The current trigger pulse width in samples. """ return self._triggering_card.external_trigger_pulse_width_in_samples def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: """Change the trigger pulse width (samples) configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()` for more information. Args: width (int): The desired trigger pulse width in samples. """ self._triggering_card.set_external_trigger_pulse_width_in_samples(width) def apply_channel_enabling(self) -> None: """Apply the enabled channels chosen using `set_enable_channels()`. This happens automatically and does not usually need to be called.""" for d in self._child_cards: d.apply_channel_enabling() @property def enabled_analog_channel_nums(self) -> List[int]: """The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub). Returns: channel_nums (List[int]): The currently enabled channel indices. """ enabled_channels = [] n_channels_in_previous_card = 0 for card in self._child_cards: enabled_channels += [ channel_num + n_channels_in_previous_card for channel_num in card.enabled_analog_channel_nums ] n_channels_in_previous_card = len(card.analog_channels) return enabled_channels def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub). Returns: channel_nums (List[int]): The indices to enable. """ channels_nums.sort() channels_to_enable_all_cards = channels_nums for child_card in self._child_cards: n_channels_in_card = len(child_card.analog_channels) channels_to_enable_this_card = list(set(arange(n_channels_in_card)) & set(channels_to_enable_all_cards)) num_channels_to_enable_this_card = len(channels_to_enable_this_card) child_card.set_enabled_analog_channels(channels_to_enable_this_card) channels_to_enable_all_cards = [ num - n_channels_in_card for num in channels_nums[num_channels_to_enable_this_card:] ] @property def transfer_buffers(self) -> List[TransferBuffer]: """The `TransferBuffer`s of all the child cards of the hub. See `AbstractSpectrumCard.transfer_buffers` for more information. Returns: buffers (List[`TransferBuffer`]): A list of the transfer buffers for each child card.""" return [card.transfer_buffers[0] for card in self._child_cards] @property def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for more information. Returns: channels (Sequence[`SpectrumChannelInterface`]): A tuple of `SpectrumDigitiserChannel` objects. """ channels: List[AnalogChannelInterfaceType] = [] for device in self._child_cards: channels += device.analog_channels return tuple(channels) @property def io_lines(self) -> Sequence[IOLineInterfaceType]: """A tuple containing of all the Multipurpose IO Lines of the child cards of the hub. Returns: channels (Sequence[`SpectrumIOLineInterface`]): A tuple of `SpectrumIOLineInterface` objects. """ io_lines: List[IOLineInterfaceType] = [] for device in self._child_cards: io_lines += device.io_lines return tuple(io_lines) # todo: this is probably wrong. I don't think both cards in a netbox have IO lines @property def timeout_in_ms(self) -> int: """The time for which the card will wait for a trigger to be received after a device has started before returning an error. This should be the same for all child cards. If it's not, an exception is raised. Returns: timeout_ms (int): The currently set timeout in ms. """ timeouts = [] for d in self._child_cards: timeouts.append(d.timeout_in_ms) return check_settings_constant_across_devices(timeouts, __name__) def set_timeout_in_ms(self, timeout_ms: int) -> None: """Change the timeout value for all child cards. Args: timeout_ms (int): The desired timeout setting in seconds.""" for d in self._child_cards: d.set_timeout_in_ms(timeout_ms) @property def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]: """Get a list of the features of the child cards. See `CardFeature`, `AdvancedCardFeature` and the Spectrum documentation for more information. Returns: features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A list of tuples, one per child card. Each tuple contains a list of features and a list of advanced features for that card. """ return [card.feature_list[0] for card in self._child_cards] @property def available_io_modes(self) -> AvailableIOModes: """For each multipurpose IO line on the master card, read the available modes. See `IOLineMode` and the Spectrum Documentation for all possible available modes and their meanings. Returns: modes (AvailableIOModes): An `AvailableIOModes` dataclass containing the modes available for each IO line. """ return self._master_card.available_io_modes @property def bytes_per_sample(self) -> int: bytes_per_sample_each_card = [] for d in self._child_cards: bytes_per_sample_each_card.append(d.bytes_per_sample) return check_settings_constant_across_devices(bytes_per_sample_each_card, __name__) def __str__(self) -> str: return f"StarHub {self._visa_string}"
Composite abstract class of AbstractSpectrumCard
implementing methods common to all StarHubs. StarHubs are
composites of more than one Spectrum card. Acquisition and generation from the child cards of a StarHub
is synchronised, aggregating the channels of all child cards.
View Source
def __init__(self, device_number: int, child_cards: Sequence[CardType], master_card_index: int, **kwargs: Any): """ Args: device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. child_cards (Sequence[`SpectrumDeviceInterface`]): A list of objects representing the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located. """ self._child_cards: Sequence[CardType] = child_cards self._master_card = child_cards[master_card_index] self._triggering_card = child_cards[master_card_index] child_card_logical_indices = (2**n for n in range(len(self._child_cards))) self._visa_string = f"sync{device_number}" self._connect(self._visa_string) all_cards_binary_mask = reduce(or_, child_card_logical_indices) self.write_to_spectrum_device_register(SPC_SYNC_ENABLEMASK, all_cards_binary_mask)
Args
- device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
- child_cards (Sequence[
SpectrumDeviceInterface
]): A list of objects representing the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. - master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
View Source
def disconnect(self) -> None: """Disconnects from each child card and terminates connection to the hub itself.""" if self._connected: destroy_handle(self._handle) for card in self._child_cards: card.disconnect() self._connected = False
Disconnects from each child card and terminates connection to the hub itself.
View Source
def reconnect(self) -> None: """Reconnects to the hub after a `disconnect()`, and reconnects to each child card.""" self._connect(self._visa_string) for card in self._child_cards: card.reconnect()
Reconnects to the hub after a disconnect()
, and reconnects to each child card.
The statuses of each child card, in a list. See SpectrumDigitiserCard.status
for more information.
Returns
statuses (List[List[
CardStatus
]]): A list of lists ofCardStatus
(each card has a list of statuses).
View Source
def start_transfer(self) -> None: """Start the transfer of data between the on-device buffer of each child card and its `TransferBuffer`. See `AbstractSpectrumCard.start_transfer()` for more information.""" for card in self._child_cards: card.start_transfer()
Start the transfer of data between the on-device buffer of each child card and its TransferBuffer
. See
AbstractSpectrumCard.start_transfer()
for more information.
View Source
def stop_transfer(self) -> None: """Stop the transfer of data between each card and its `TransferBuffer`. See `AbstractSpectrumCard.stop_transfer()` for more information.""" for card in self._child_cards: card.stop_transfer()
Stop the transfer of data between each card and its TransferBuffer
. See
AbstractSpectrumCard.stop_transfer()
for more information.
View Source
def wait_for_transfer_chunk_to_complete(self) -> None: """Wait for all cards to stop transferring data to/from their `TransferBuffers`. See `AbstractSpectrumCard.wait_for_transfer_to_complete()` for more information.""" for card in self._child_cards: card.wait_for_transfer_chunk_to_complete()
Wait for all cards to stop transferring data to/from their TransferBuffers
. See
AbstractSpectrumCard.wait_for_transfer_to_complete()
for more information.
True if the StarHub is connected, False if not.
View Source
def set_triggering_card(self, card_index: int) -> None: """Change the index of the child card responsible for receiving a trigger. During construction, this is set equal to the index of the master card but in some situations it may be necessary to change it. Args: card_index (int): The index of the StarHub's triggering card within the list of child cards provided on __init__(). """ self._triggering_card = self._child_cards[card_index]
Change the index of the child card responsible for receiving a trigger. During construction, this is set equal to the index of the master card but in some situations it may be necessary to change it.
Args
- card_index (int): The index of the StarHub's triggering card within the list of child cards provided on __init__().
The clock mode currently configured on the master card.
Returns
mode (
ClockMode
): The currently configured clock mode.
View Source
def set_clock_mode(self, mode: ClockMode) -> None: """Change the clock mode configured on the master card. Args: mode (`ClockMode`): The desired clock mode.""" self._master_card.set_clock_mode(mode)
Change the clock mode configured on the master card.
Args
- mode (
ClockMode
): The desired clock mode.
The sample rate configured on the master card.
Returns
rate (int): The current sample rate of the master card in Hz.
View Source
def set_sample_rate_in_hz(self, rate: int) -> None: """Change the sample rate of the child cards (including the master card). Args: rate (int): The desired sample rate of the child cards in Hz. """ for card in self._child_cards: card.set_sample_rate_in_hz(rate)
Change the sample rate of the child cards (including the master card).
Args
- rate (int): The desired sample rate of the child cards in Hz.
The trigger sources configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.trigger_sources()
for more information.
Returns
sources (List[
TriggerSource
]): A list of the currently enabled trigger sources.
View Source
def set_trigger_sources(self, sources: List[TriggerSource]) -> None: """Change the trigger sources configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.trigger_sources()` for more information. Args: sources (List[`TriggerSource`]): The trigger sources to enable, in a list.""" self._triggering_card.set_trigger_sources(sources) for card in self._child_cards: if card is not self._triggering_card: card.set_trigger_sources([TriggerSource.SPC_TMASK_NONE])
Change the trigger sources configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.trigger_sources()
for more information.
Args
- sources (List[
TriggerSource
]): The trigger sources to enable, in a list.
The trigger mode configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.external_trigger_mode()
for more information.
Returns
mode (
ExternalTriggerMode
): The currently set external trigger mode.
View Source
def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: """Change the trigger mode configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.set_external_trigger_mode()` for more information. Args: mode (`ExternalTriggerMode`): The desired external trigger mode.""" self._triggering_card.set_external_trigger_mode(mode)
Change the trigger mode configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.set_external_trigger_mode()
for more information.
Args
- mode (
ExternalTriggerMode
): The desired external trigger mode.
The external trigger level configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.external_trigger_level_mv()
for more information.
Returns
level (int): The external trigger level in mV.
View Source
def set_external_trigger_level_in_mv(self, level: int) -> None: """Change the external trigger level configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.set_external_trigger_level_mv()` for more information. Args: level (int): The desired external trigger level in mV. """ self._triggering_card.set_external_trigger_level_in_mv(level)
Change the external trigger level configured on the triggering card, which by default is the master card.
See AbstractSpectrumCard.set_external_trigger_level_mv()
for more information.
Args
- level (int): The desired external trigger level in mV.
The trigger pulse width (samples) configured on the triggering card, which by default is the master card.
See AbstractSpectrumCard.external_trigger_pulse_width_in_samples()
for more information.
Returns
width (int): The current trigger pulse width in samples.
View Source
def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: """Change the trigger pulse width (samples) configured on the triggering card, which by default is the master card. See `AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()` for more information. Args: width (int): The desired trigger pulse width in samples. """ self._triggering_card.set_external_trigger_pulse_width_in_samples(width)
Change the trigger pulse width (samples) configured on the triggering card, which by default is the master
card. See AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()
for more information.
Args
- width (int): The desired trigger pulse width in samples.
View Source
def apply_channel_enabling(self) -> None: """Apply the enabled channels chosen using `set_enable_channels()`. This happens automatically and does not usually need to be called.""" for d in self._child_cards: d.apply_channel_enabling()
Apply the enabled channels chosen using set_enable_channels()
. This happens automatically and does not
usually need to be called.
The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub).
Returns
channel_nums (List[int]): The currently enabled channel indices.
View Source
def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub). Returns: channel_nums (List[int]): The indices to enable. """ channels_nums.sort() channels_to_enable_all_cards = channels_nums for child_card in self._child_cards: n_channels_in_card = len(child_card.analog_channels) channels_to_enable_this_card = list(set(arange(n_channels_in_card)) & set(channels_to_enable_all_cards)) num_channels_to_enable_this_card = len(channels_to_enable_this_card) child_card.set_enabled_analog_channels(channels_to_enable_this_card) channels_to_enable_all_cards = [ num - n_channels_in_card for num in channels_nums[num_channels_to_enable_this_card:] ]
Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub).
Returns
channel_nums (List[int]): The indices to enable.
The TransferBuffer
s of all the child cards of the hub. See AbstractSpectrumCard.transfer_buffers
for more
information.
Returns
buffers (List[
TransferBuffer
]): A list of the transfer buffers for each child card.
A tuple containing of all the channels of the child cards of the hub. See AbstractSpectrumCard.channels
for
more information.
Returns
channels (Sequence[
SpectrumChannelInterface
]): A tuple ofSpectrumDigitiserChannel
objects.
A tuple containing of all the Multipurpose IO Lines of the child cards of the hub.
Returns
channels (Sequence[
SpectrumIOLineInterface
]): A tuple ofSpectrumIOLineInterface
objects.
The time for which the card will wait for a trigger to be received after a device has started before returning an error. This should be the same for all child cards. If it's not, an exception is raised.
Returns
timeout_ms (int): The currently set timeout in ms.
View Source
def set_timeout_in_ms(self, timeout_ms: int) -> None: """Change the timeout value for all child cards. Args: timeout_ms (int): The desired timeout setting in seconds.""" for d in self._child_cards: d.set_timeout_in_ms(timeout_ms)
Change the timeout value for all child cards.
Args
- timeout_ms (int): The desired timeout setting in seconds.
Get a list of the features of the child cards. See CardFeature
, AdvancedCardFeature
and the Spectrum
documentation for more information.
Returns
features (List[Tuple[List[
CardFeature
], List[AdvancedCardFeature
]]]): A list of tuples, one per child card. Each tuple contains a list of features and a list of advanced features for that card.
For each multipurpose IO line on the master card, read the available modes. See IOLineMode
and the Spectrum
Documentation for all possible available modes and their meanings.
Returns
modes (AvailableIOModes): An
AvailableIOModes
dataclass containing the modes available for each IO line.
Inherited Members
- AbstractSpectrumDevice
- reset
- start
- stop
- configure_trigger
- configure_channel_pairing
- write_to_spectrum_device_register
- read_spectrum_device_register
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- define_transfer_buffer
- force_trigger
- type
- model_number
View Source
class AbstractSpectrumCard(AbstractSpectrumDevice[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): """Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices).""" def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any): """ Args: device_number (int): Index of the card to control. If only one card is present, set to 0. ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. """ super().__init__() # required for proper MRO resolution if ip_address is not None: self._visa_string = _create_visa_string_from_ip(ip_address, device_number) else: self._visa_string = f"/dev/spcm{device_number}" self._connect(self._visa_string) self._model_number = ModelNumber(self.read_spectrum_device_register(SPC_PCITYP)) self._trigger_sources: List[TriggerSource] = [] self._analog_channels = self._init_analog_channels() self._io_lines = self._init_io_lines() self._enabled_analog_channels: List[int] = [0] self._transfer_buffer: Optional[TransferBuffer] = None self.apply_channel_enabling() @property def model_number(self) -> ModelNumber: return self._model_number def reconnect(self) -> None: """Reconnect to the card after disconnect() has been called.""" self._connect(self._visa_string) @property def status(self) -> DEVICE_STATUS_TYPE: """Read the current status of the card. Returns: Statuses (`List[List[StatusCode]]`): A length-1 list containing a list of `StatusCode` Enums describing the current acquisition status of the card. See `StatusCode` (and the Spectrum documentation) for the list off possible acquisition statuses. """ return [decode_status(self.read_spectrum_device_register(SPC_M2STATUS))] def start_transfer(self) -> None: """Transfer between the on-device buffer and the `TransferBuffer`. Requires that a `TransferBuffer` has been defined (see `define_transfer_buffer()`). For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), `start_transfer()` should be called after each acquisition has completed to transfer the acquired waveforms from the device to the `TransferBuffer`. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), `start_transfer()` should be called immediately after `start()` has been called, so that the waveform data can be continuously streamed into the transfer buffer as it is acquired. # todo: docstring for AWG transfers """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STARTDMA) def stop_transfer(self) -> None: """Stop the transfer of data between the on-device buffer and the `TransferBuffer`. Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this method is rarely required. It may invalidate transferred samples. For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired samples have been transferred, so `stop_transfer()` should not be used. Instead, call `wait_for_transfer_to_complete()` after `start_transfer()`. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition, and transfer will automatically stop when `stop()` is called as there will be no more samples to transfer, so `stop_transfer()` should not be used. # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STOPDMA) def wait_for_transfer_chunk_to_complete(self) -> None: """Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer. For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method returns, all acquired waveforms have been transferred from the on-device buffer to the `TransferBuffer` and can be read using the `get_waveforms()` method. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms(). # todo: update the above docstring to take into account cases where notify size < data lemgth # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_WAITDMA) @property def transfer_buffers(self) -> List[TransferBuffer]: """Return the `TransferBuffer` configured for transferring data between the card and the software. Returns: buffer (List[`TransferBuffer`]): A length-1 list containing the `TransferBuffer` object. Any data within the `TransferBuffer` can be accessed using its own interface, but the samples are stored as a 1D array, with the samples of each channel interleaved as per the Spectrum user manual. For digitisers, it is more convenient to read waveform data using the `get_waveforms()` method. """ if self._transfer_buffer is not None: return [self._transfer_buffer] else: raise SpectrumNoTransferBufferDefined("Cannot find TransferBuffer.") def disconnect(self) -> None: """Terminate the connection to the card.""" if self.connected: destroy_handle(self._handle) self._connected = False @property def connected(self) -> bool: """Returns True if a card is currently connected, False if not.""" return self._connected def __eq__(self, other: object) -> bool: if isinstance(other, self.__class__): return self._handle == other._handle else: raise NotImplementedError(f"Cannot compare {self.__class__} with {other.__class__}") @property def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: """A tuple containing the channels that belong to the card. Properties of the individual channels can be set by calling the methods of the returned objects directly. See `SpectrumDigitiserChannel` and `SpectrumAWGChannel` for more information. Returns: channels (Sequence[`SpectrumChannelInterface`]): A tuple of objects conforming to the `SpectrumChannelInterface` interface. """ return self._analog_channels @property def io_lines(self) -> Sequence[IOLineInterfaceType]: """A tuple containing the Multipurpose IO Lines that belong to the card. Properties of the individual channels can be set by calling the methods of the returned objects directly. Returns: channels (Sequence[`SpectrumIOLineInterface`]): A tuple of objects conforming to the `SpectrumIOLineInterface` interface. """ return self._io_lines @property def enabled_analog_channel_nums(self) -> List[int]: """The indices of the currently enabled channels. Returns: enabled_channels (List[int]): The indices of the currently enabled channels. """ return self._enabled_analog_channels def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Change which channels are enabled. Args: channels_nums (List[int]): The integer channel indices to enable. """ if len(channels_nums) in [1, 2, 4, 8]: self._enabled_analog_channels = channels_nums self.apply_channel_enabling() else: raise SpectrumInvalidNumberOfEnabledChannels(f"{len(channels_nums)} cannot be enabled at once.") @property def trigger_sources(self) -> List[TriggerSource]: """The currently enabled trigger sources Returns: sources (List[`TriggerSource`]): A list of TriggerSources. """ or_of_sources = self.read_spectrum_device_register(SPC_TRIG_ORMASK) self._trigger_sources = decode_trigger_sources(or_of_sources) return self._trigger_sources def set_trigger_sources(self, sources: List[TriggerSource]) -> None: """Change the enabled trigger sources. Args: sources (List[`TriggerSource`]): The TriggerSources to enable. """ self._trigger_sources = sources or_of_sources = reduce(or_, [s.value for s in sources]) self.write_to_spectrum_device_register(SPC_TRIG_ORMASK, or_of_sources) self.write_to_spectrum_device_register(SPC_TRIG_ANDMASK, 0) @property def external_trigger_mode(self) -> ExternalTriggerMode: """The currently enabled external trigger mode. An external trigger source must be enabled. Returns: sources (`ExternalTriggerMode`): The currently enabled `ExternalTriggerMode`.""" if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger mode.") else: first_trig_source = self._active_external_triggers[0] try: return ExternalTriggerMode( self.read_spectrum_device_register(EXTERNAL_TRIGGER_MODE_COMMANDS[first_trig_source.value]) ) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger mode of {first_trig_source.name}.") def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: """Change the currently enabled trigger mode. An external trigger source must be enabled. Args: mode (`ExternalTriggerMode`): The `ExternalTriggerMode` to apply. """ if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger mode.") else: for trigger_source in self._active_external_triggers: try: self.write_to_spectrum_device_register( EXTERNAL_TRIGGER_MODE_COMMANDS[trigger_source.value], mode.value ) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger mode of {trigger_source.name}.") @property def _active_external_triggers(self) -> List[TriggerSource]: return [ TriggerSource(val) for val in list( set(EXTERNAL_TRIGGER_MODE_COMMANDS.keys()) & set([source.value for source in self._trigger_sources]) ) ] @property def external_trigger_level_in_mv(self) -> int: """The signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled. Returns: level (int): The currently set trigger level in mV. """ if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger level.") else: first_trig_source = self._active_external_triggers[0] try: return self.read_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[first_trig_source.value]) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger level of {first_trig_source.name}.") def set_external_trigger_level_in_mv(self, level: int) -> None: """Change the signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled. Args: level (int): The trigger level to set in mV. """ if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger level.") else: for trigger_source in self._active_external_triggers: try: self.write_to_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[trigger_source.value], level) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger level of {trigger_source.name}.") @property def external_trigger_pulse_width_in_samples(self) -> int: """The pulse width (in samples) needed to trigger an event using an external trigger source, if SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be enabled. Returns: width (int): The current trigger pulse width in samples. """ if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger pulse width.") else: first_trig_source = self._active_external_triggers[0] try: return self.read_spectrum_device_register( EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[first_trig_source.value] ) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot get pulse width of {first_trig_source.name}.") def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: """Change the pulse width (samples) needed to trigger an event using an external trigger source if SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be enabled. Args: width (int): The trigger pulse width to set, in samples.""" if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger pulse width.") else: for trigger_source in self._active_external_triggers: try: self.write_to_spectrum_device_register( EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[trigger_source.value], width ) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot set pulse width of {trigger_source.name}.") def apply_channel_enabling(self) -> None: """Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not usually need to be called.""" enabled_channel_spectrum_values = [self.analog_channels[i].name.value for i in self._enabled_analog_channels] if len(enabled_channel_spectrum_values) in [1, 2, 4, 8]: bitwise_or_of_enabled_channels = reduce(or_, enabled_channel_spectrum_values) self.write_to_spectrum_device_register(SPC_CHENABLE, bitwise_or_of_enabled_channels) else: raise SpectrumInvalidNumberOfEnabledChannels( f"Cannot enable {len(enabled_channel_spectrum_values)} " f"channels on one card." ) @abstractmethod def _init_analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: raise NotImplementedError() @abstractmethod def _init_io_lines(self) -> Sequence[IOLineInterfaceType]: raise NotImplementedError() @property def timeout_in_ms(self) -> int: """The time for which the card will wait for a trigger to be received after the device has been started before returning an error. Returns: timeout_in_ms (in)t: The currently set timeout in ms. """ return self.read_spectrum_device_register(SPC_TIMEOUT) def set_timeout_in_ms(self, timeout_in_ms: int) -> None: """Change the time for which the card will wait for a trigger to tbe received after the device has started before returning an error. Args: timeout_in_ms (int): The desired timeout in ms. """ self.write_to_spectrum_device_register(SPC_TIMEOUT, timeout_in_ms) @property def clock_mode(self) -> ClockMode: """The currently enabled clock mode. Returns: mode (`ClockMode`): The currently set clock mode. """ return ClockMode(self.read_spectrum_device_register(SPC_CLOCKMODE)) def set_clock_mode(self, mode: ClockMode) -> None: """Change the clock mode. See `ClockMode` and the Spectrum documentation for available modes. Args: mode (`ClockMode`): The desired clock mode. """ self.write_to_spectrum_device_register(SPC_CLOCKMODE, mode.value) @property def available_io_modes(self) -> AvailableIOModes: """For each multipurpose IO line on the card, read the available modes. See IOLineMode and the Spectrum Documentation for all possible available modes and their meanings. Returns: modes (`AvailableIOModes`): An `AvailableIOModes` dataclass containing the modes for each IO line.""" return AvailableIOModes( X0=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X0_AVAILMODES)), X1=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X1_AVAILMODES)), X2=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X2_AVAILMODES)), X3=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X3_AVAILMODES)), ) @property def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]: """Get a list of the features of the card. See `CardFeature`, `AdvancedCardFeature` and the Spectrum documentation for more information. Returns: features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A tuple of two lists - of features and advanced features respectively - wrapped in a list. """ normal_features = decode_card_features(self.read_spectrum_device_register(SPC_PCIFEATURES)) advanced_features = decode_advanced_card_features(self.read_spectrum_device_register(SPC_PCIEXTFEATURES)) return [(normal_features, advanced_features)] @property def sample_rate_in_hz(self) -> int: """The rate at which samples will be acquired or generated, in Hz. Returns: rate (int): The currently set sample rate in Hz. """ return self.read_spectrum_device_register(SPC_SAMPLERATE, SpectrumRegisterLength.SIXTY_FOUR) def set_sample_rate_in_hz(self, rate: int) -> None: """Change the rate at which samples will be acquired or generated, in Hz. Args: rate (int): The desired sample rate in Hz. """ self.write_to_spectrum_device_register(SPC_SAMPLERATE, rate, SpectrumRegisterLength.SIXTY_FOUR) def __str__(self) -> str: return f"Card {self._visa_string} (model {self.model_number.name})." @property def type(self) -> CardType: return CardType(self.read_spectrum_device_register(SPC_FNCTYPE)) def force_trigger(self) -> None: """Force a trigger event to occur""" self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_FORCETRIGGER) @property def bytes_per_sample(self) -> int: return self.read_spectrum_device_register(SPC_MIINST_BYTESPERSAMPLE)
Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices).
View Source
def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any): """ Args: device_number (int): Index of the card to control. If only one card is present, set to 0. ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. """ super().__init__() # required for proper MRO resolution if ip_address is not None: self._visa_string = _create_visa_string_from_ip(ip_address, device_number) else: self._visa_string = f"/dev/spcm{device_number}" self._connect(self._visa_string) self._model_number = ModelNumber(self.read_spectrum_device_register(SPC_PCITYP)) self._trigger_sources: List[TriggerSource] = [] self._analog_channels = self._init_analog_channels() self._io_lines = self._init_io_lines() self._enabled_analog_channels: List[int] = [0] self._transfer_buffer: Optional[TransferBuffer] = None self.apply_channel_enabling()
Args
- device_number (int): Index of the card to control. If only one card is present, set to 0.
- ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.
View Source
def reconnect(self) -> None: """Reconnect to the card after disconnect() has been called.""" self._connect(self._visa_string)
Reconnect to the card after disconnect() has been called.
Read the current status of the card.
Returns
Statuses (
List[List[StatusCode]]
): A length-1 list containing a list ofStatusCode
Enums describing the current acquisition status of the card. SeeStatusCode
(and the Spectrum documentation) for the list off possible acquisition statuses.
View Source
def start_transfer(self) -> None: """Transfer between the on-device buffer and the `TransferBuffer`. Requires that a `TransferBuffer` has been defined (see `define_transfer_buffer()`). For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), `start_transfer()` should be called after each acquisition has completed to transfer the acquired waveforms from the device to the `TransferBuffer`. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), `start_transfer()` should be called immediately after `start()` has been called, so that the waveform data can be continuously streamed into the transfer buffer as it is acquired. # todo: docstring for AWG transfers """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STARTDMA)
Transfer between the on-device buffer and the TransferBuffer
.
Requires that a TransferBuffer
has been defined (see define_transfer_buffer()
).
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), start_transfer()
should be called after each
acquisition has completed to transfer the acquired waveforms from the device to the TransferBuffer
.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), start_transfer()
should be called immediately after
start()
has been called, so that the waveform data can be continuously streamed into the transfer buffer as it
is acquired.
todo: docstring for AWG transfers
View Source
def stop_transfer(self) -> None: """Stop the transfer of data between the on-device buffer and the `TransferBuffer`. Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this method is rarely required. It may invalidate transferred samples. For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired samples have been transferred, so `stop_transfer()` should not be used. Instead, call `wait_for_transfer_to_complete()` after `start_transfer()`. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition, and transfer will automatically stop when `stop()` is called as there will be no more samples to transfer, so `stop_transfer()` should not be used. # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STOPDMA)
Stop the transfer of data between the on-device buffer and the TransferBuffer
.
Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this method is rarely required. It may invalidate transferred samples.
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired
samples have been transferred, so stop_transfer()
should not be used. Instead, call
wait_for_transfer_to_complete()
after start_transfer()
.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition,
and transfer will automatically stop when stop()
is called as there will be no more
samples to transfer, so stop_transfer()
should not be used.
todo: docstring for AWG
View Source
def wait_for_transfer_chunk_to_complete(self) -> None: """Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer. For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method returns, all acquired waveforms have been transferred from the on-device buffer to the `TransferBuffer` and can be read using the `get_waveforms()` method. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms(). # todo: update the above docstring to take into account cases where notify size < data lemgth # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_WAITDMA)
Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer.
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method
returns, all acquired waveforms have been transferred from the on-device buffer to the TransferBuffer
and can
be read using the get_waveforms()
method.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms().
todo: update the above docstring to take into account cases where notify size < data lemgth
todo: docstring for AWG
Return the TransferBuffer
configured for transferring data between the card and the software.
Returns
buffer (List[
TransferBuffer
]): A length-1 list containing theTransferBuffer
object. Any data within theTransferBuffer
can be accessed using its own interface, but the samples are stored as a 1D array, with the samples of each channel interleaved as per the Spectrum user manual. For digitisers, it is more convenient to read waveform data using theget_waveforms()
method.
View Source
def disconnect(self) -> None: """Terminate the connection to the card.""" if self.connected: destroy_handle(self._handle) self._connected = False
Terminate the connection to the card.
Returns True if a card is currently connected, False if not.
A tuple containing the channels that belong to the card.
Properties of the individual channels can be set by calling the methods of the
returned objects directly. See SpectrumDigitiserChannel
and SpectrumAWGChannel
for more information.
Returns
channels (Sequence[
SpectrumChannelInterface
]): A tuple of objects conforming to theSpectrumChannelInterface
interface.
A tuple containing the Multipurpose IO Lines that belong to the card.
Properties of the individual channels can be set by calling the methods of the returned objects directly.
Returns
channels (Sequence[
SpectrumIOLineInterface
]): A tuple of objects conforming to theSpectrumIOLineInterface
interface.
The indices of the currently enabled channels.
Returns
enabled_channels (List[int]): The indices of the currently enabled channels.
View Source
def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: """Change which channels are enabled. Args: channels_nums (List[int]): The integer channel indices to enable. """ if len(channels_nums) in [1, 2, 4, 8]: self._enabled_analog_channels = channels_nums self.apply_channel_enabling() else: raise SpectrumInvalidNumberOfEnabledChannels(f"{len(channels_nums)} cannot be enabled at once.")
Change which channels are enabled.
Args
- channels_nums (List[int]): The integer channel indices to enable.
The currently enabled trigger sources
Returns
sources (List[
TriggerSource
]): A list of TriggerSources.
View Source
def set_trigger_sources(self, sources: List[TriggerSource]) -> None: """Change the enabled trigger sources. Args: sources (List[`TriggerSource`]): The TriggerSources to enable. """ self._trigger_sources = sources or_of_sources = reduce(or_, [s.value for s in sources]) self.write_to_spectrum_device_register(SPC_TRIG_ORMASK, or_of_sources) self.write_to_spectrum_device_register(SPC_TRIG_ANDMASK, 0)
Change the enabled trigger sources.
Args
- sources (List[
TriggerSource
]): The TriggerSources to enable.
The currently enabled external trigger mode. An external trigger source must be enabled.
Returns
sources (
ExternalTriggerMode
): The currently enabledExternalTriggerMode
.
View Source
def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: """Change the currently enabled trigger mode. An external trigger source must be enabled. Args: mode (`ExternalTriggerMode`): The `ExternalTriggerMode` to apply. """ if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger mode.") else: for trigger_source in self._active_external_triggers: try: self.write_to_spectrum_device_register( EXTERNAL_TRIGGER_MODE_COMMANDS[trigger_source.value], mode.value ) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger mode of {trigger_source.name}.")
Change the currently enabled trigger mode. An external trigger source must be enabled.
Args
- mode (
ExternalTriggerMode
): TheExternalTriggerMode
to apply.
The signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled.
Returns
level (int): The currently set trigger level in mV.
View Source
def set_external_trigger_level_in_mv(self, level: int) -> None: """Change the signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled. Args: level (int): The trigger level to set in mV. """ if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger level.") else: for trigger_source in self._active_external_triggers: try: self.write_to_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[trigger_source.value], level) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger level of {trigger_source.name}.")
Change the signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled.
Args
- level (int): The trigger level to set in mV.
The pulse width (in samples) needed to trigger an event using an external trigger source, if
SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER ExternalTriggerMode
is selected. An external trigger source must be
enabled.
Returns
width (int): The current trigger pulse width in samples.
View Source
def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: """Change the pulse width (samples) needed to trigger an event using an external trigger source if SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be enabled. Args: width (int): The trigger pulse width to set, in samples.""" if len(self._active_external_triggers) == 0: raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger pulse width.") else: for trigger_source in self._active_external_triggers: try: self.write_to_spectrum_device_register( EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[trigger_source.value], width ) except KeyError: raise SpectrumTriggerOperationNotImplemented(f"Cannot set pulse width of {trigger_source.name}.")
Change the pulse width (samples) needed to trigger an event using an external trigger source if
SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER ExternalTriggerMode
is selected. An external trigger source must be
enabled.
Args
- width (int): The trigger pulse width to set, in samples.
View Source
def apply_channel_enabling(self) -> None: """Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not usually need to be called.""" enabled_channel_spectrum_values = [self.analog_channels[i].name.value for i in self._enabled_analog_channels] if len(enabled_channel_spectrum_values) in [1, 2, 4, 8]: bitwise_or_of_enabled_channels = reduce(or_, enabled_channel_spectrum_values) self.write_to_spectrum_device_register(SPC_CHENABLE, bitwise_or_of_enabled_channels) else: raise SpectrumInvalidNumberOfEnabledChannels( f"Cannot enable {len(enabled_channel_spectrum_values)} " f"channels on one card." )
Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not usually need to be called.
The time for which the card will wait for a trigger to be received after the device has been started before returning an error.
Returns
timeout_in_ms (in)t: The currently set timeout in ms.
View Source
def set_timeout_in_ms(self, timeout_in_ms: int) -> None: """Change the time for which the card will wait for a trigger to tbe received after the device has started before returning an error. Args: timeout_in_ms (int): The desired timeout in ms. """ self.write_to_spectrum_device_register(SPC_TIMEOUT, timeout_in_ms)
Change the time for which the card will wait for a trigger to tbe received after the device has started before returning an error.
Args
- timeout_in_ms (int): The desired timeout in ms.
The currently enabled clock mode.
Returns
mode (
ClockMode
): The currently set clock mode.
View Source
def set_clock_mode(self, mode: ClockMode) -> None: """Change the clock mode. See `ClockMode` and the Spectrum documentation for available modes. Args: mode (`ClockMode`): The desired clock mode. """ self.write_to_spectrum_device_register(SPC_CLOCKMODE, mode.value)
Change the clock mode. See ClockMode
and the Spectrum documentation for available modes.
Args
- mode (
ClockMode
): The desired clock mode.
For each multipurpose IO line on the card, read the available modes. See IOLineMode and the Spectrum Documentation for all possible available modes and their meanings.
Returns
modes (
AvailableIOModes
): AnAvailableIOModes
dataclass containing the modes for each IO line.
Get a list of the features of the card. See CardFeature
, AdvancedCardFeature
and the Spectrum
documentation for more information.
Returns
features (List[Tuple[List[
CardFeature
], List[AdvancedCardFeature
]]]): A tuple of two lists - of features and advanced features respectively - wrapped in a list.
The rate at which samples will be acquired or generated, in Hz.
Returns
rate (int): The currently set sample rate in Hz.
View Source
def set_sample_rate_in_hz(self, rate: int) -> None: """Change the rate at which samples will be acquired or generated, in Hz. Args: rate (int): The desired sample rate in Hz. """ self.write_to_spectrum_device_register(SPC_SAMPLERATE, rate, SpectrumRegisterLength.SIXTY_FOUR)
Change the rate at which samples will be acquired or generated, in Hz.
Args
- rate (int): The desired sample rate in Hz.
View Source
def force_trigger(self) -> None: """Force a trigger event to occur""" self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_FORCETRIGGER)
Force a trigger event to occur
Inherited Members
- AbstractSpectrumDevice
- reset
- start
- stop
- configure_trigger
- configure_channel_pairing
- write_to_spectrum_device_register
- read_spectrum_device_register
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- define_transfer_buffer
View Source
class AbstractSpectrumDevice(SpectrumDeviceInterface[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): """Abstract superclass which implements methods common to all Spectrum devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes listed in spectrumdevice/__init__.py, which inherit the methods defined here. Note that the concrete mock devices override several of the methods defined here.""" def _connect(self, visa_string: str) -> None: self._handle = spectrum_handle_factory(visa_string) self._connected = True def reset(self) -> None: """Perform a software and hardware reset. All settings are set to hardware default values. The data in the board’s on-board memory will be no longer valid. Any output signals (including triggers and clocks) will be disabled.""" self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_RESET) def start(self) -> None: """Start the device. For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each acquisition. In-between calls, waveforms must be manually transferred from the device to a `TransferBuffer` using `start_transfer()`. The `TransferBuffer` need not be defined until after `start` is called. For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by a call to `start_transfer()`. Frames will then be continuously streamed to the `TransferBuffer`, which must have already been defined. # todo: docstring for different AWG modes """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER) def stop(self) -> None: """Stop the device. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that occurs after calling `start()`. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE). # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_STOP) def configure_trigger(self, settings: TriggerSettings) -> None: """Apply all the trigger settings contained in a `TriggerSettings` dataclass to the device. Args: settings (`TriggerSettings`): A `TriggerSettings` dataclass containing the setting values to apply.""" self.set_trigger_sources(settings.trigger_sources) if len(set(self.trigger_sources) & set(EXTERNAL_TRIGGER_SOURCES)) > 0: if settings.external_trigger_mode is not None: self.set_external_trigger_mode(settings.external_trigger_mode) if settings.external_trigger_level_in_mv is not None: self.set_external_trigger_level_in_mv(settings.external_trigger_level_in_mv) if settings.external_trigger_pulse_width_in_samples is not None: self.set_external_trigger_pulse_width_in_samples(settings.external_trigger_pulse_width_in_samples) # Write the configuration to the card self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None: """Configures a pair of consecutive channels to operate either independently, in differential mode or in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is required by the Spectrum API. Args: channel_pair (ChannelPair): The pair of channels to configure mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL """ doubling_enabled = int(mode == ChannelPairingMode.DOUBLE) differential_mode_enabled = int(mode == ChannelPairingMode.DIFFERENTIAL) if doubling_enabled and channel_pair in (channel_pair.CHANNEL_4_AND_5, channel_pair.CHANNEL_6_AND_7): raise ValueError("Doubling can only be enabled for channel pairs CHANNEL_0_AND_1 or CHANNEL_2_AND_3.") if doubling_enabled or differential_mode_enabled: self._mirror_even_channel_settings_on_odd_channel(channel_pair) self._disable_odd_channel(channel_pair) self.write_to_spectrum_device_register( DIFFERENTIAL_CHANNEL_PAIR_COMMANDS[channel_pair], differential_mode_enabled ) self.write_to_spectrum_device_register(DOUBLING_CHANNEL_PAIR_COMMANDS[channel_pair], doubling_enabled) def _disable_odd_channel(self, channel_pair: ChannelPair) -> None: try: enabled_channels = copy(self.enabled_analog_channel_nums) enabled_channels.remove(channel_pair.value + 1) self.set_enabled_analog_channels(enabled_channels) except ValueError: pass # odd numbered channel was not enable, so no need to disable it. def _mirror_even_channel_settings_on_odd_channel(self, channel_pair: ChannelPair) -> None: self.analog_channels[channel_pair.value + 1].copy_settings_from_other_channel( self.analog_channels[channel_pair.value] ) def write_to_spectrum_device_register( self, spectrum_register: int, value: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> None: """Set the value of a register on the Spectrum device. This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to configure a hardware device, but can also be used to set the value of registers that are not implemented in `AbstractSpectrumDigitiser` and its subclasses. Args: spectrum_register (int): Identifier of the register to set. This should be a global constant imported from regs.py in the spectrum_gmbh package. value (int): Value to write to the register. This should be a global constant imported from regs.py in the spectrum_gmbh package. length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register to set, in bits. """ if not SPECTRUM_DRIVERS_FOUND: raise SpectrumDriversNotFound( "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" " MockSpectrumDigitiserCard instead." ) if self.connected: if length == SpectrumRegisterLength.THIRTY_TWO: set_spectrum_i32_api_param(self._handle, spectrum_register, value) elif length == SpectrumRegisterLength.SIXTY_FOUR: set_spectrum_i64_api_param(self._handle, spectrum_register, value) else: raise ValueError("Spectrum integer length not recognised.") else: raise SpectrumDeviceNotConnected("The device has been disconnected.") def read_spectrum_device_register( self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> int: """Get the value of a register on the Spectrum digitiser. This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to read the configuration of a hardware device, but can be also used to get the value of registers that are not implemented in `AbstractSpectrumDigitiser` and its subclasses. Args: spectrum_register (int): Identifier of the register to set. This should be a global constant imported from spectrum_gmbh.py_header.regs. length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register to set, in bits. Returns: value (int): Value of the register. This can be matched to a global constant imported from spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module. """ if not SPECTRUM_DRIVERS_FOUND: raise SpectrumDriversNotFound( "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" " a mock device instead (e.g. MockSpectrumDigitiserCard or MockSpectrumStarHub)." ) if self.connected: if length == SpectrumRegisterLength.THIRTY_TWO: return get_spectrum_i32_api_param(self._handle, spectrum_register) elif length == SpectrumRegisterLength.SIXTY_FOUR: return get_spectrum_i64_api_param(self._handle, spectrum_register) else: raise ValueError("Spectrum integer length not recognised.") else: raise SpectrumDeviceNotConnected("The device has been disconnected.") def __repr__(self) -> str: return str(self)
Abstract superclass which implements methods common to all Spectrum devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes listed in spectrumdevice/__init__.py, which inherit the methods defined here. Note that the concrete mock devices override several of the methods defined here.
View Source
def reset(self) -> None: """Perform a software and hardware reset. All settings are set to hardware default values. The data in the board’s on-board memory will be no longer valid. Any output signals (including triggers and clocks) will be disabled.""" self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_RESET)
Perform a software and hardware reset.
All settings are set to hardware default values. The data in the board’s on-board memory will be no longer valid. Any output signals (including triggers and clocks) will be disabled.
View Source
def start(self) -> None: """Start the device. For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each acquisition. In-between calls, waveforms must be manually transferred from the device to a `TransferBuffer` using `start_transfer()`. The `TransferBuffer` need not be defined until after `start` is called. For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by a call to `start_transfer()`. Frames will then be continuously streamed to the `TransferBuffer`, which must have already been defined. # todo: docstring for different AWG modes """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER)
Start the device.
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each
acquisition. In-between calls, waveforms must be manually transferred from the device to a TransferBuffer
using start_transfer()
. The TransferBuffer
need not be defined until after start
is called.
For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by
a call to start_transfer()
. Frames will then be continuously streamed to the TransferBuffer
, which must have
already been defined.
todo: docstring for different AWG modes
View Source
def stop(self) -> None: """Stop the device. For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that occurs after calling `start()`. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE). # todo: docstring for AWG """ self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_STOP)
Stop the device.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that
occurs after calling start()
. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE).
todo: docstring for AWG
View Source
def configure_trigger(self, settings: TriggerSettings) -> None: """Apply all the trigger settings contained in a `TriggerSettings` dataclass to the device. Args: settings (`TriggerSettings`): A `TriggerSettings` dataclass containing the setting values to apply.""" self.set_trigger_sources(settings.trigger_sources) if len(set(self.trigger_sources) & set(EXTERNAL_TRIGGER_SOURCES)) > 0: if settings.external_trigger_mode is not None: self.set_external_trigger_mode(settings.external_trigger_mode) if settings.external_trigger_level_in_mv is not None: self.set_external_trigger_level_in_mv(settings.external_trigger_level_in_mv) if settings.external_trigger_pulse_width_in_samples is not None: self.set_external_trigger_pulse_width_in_samples(settings.external_trigger_pulse_width_in_samples) # Write the configuration to the card self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)
Apply all the trigger settings contained in a TriggerSettings
dataclass to the device.
Args
- settings (
TriggerSettings
): ATriggerSettings
dataclass containing the setting values to apply.
View Source
def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None: """Configures a pair of consecutive channels to operate either independently, in differential mode or in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is required by the Spectrum API. Args: channel_pair (ChannelPair): The pair of channels to configure mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL """ doubling_enabled = int(mode == ChannelPairingMode.DOUBLE) differential_mode_enabled = int(mode == ChannelPairingMode.DIFFERENTIAL) if doubling_enabled and channel_pair in (channel_pair.CHANNEL_4_AND_5, channel_pair.CHANNEL_6_AND_7): raise ValueError("Doubling can only be enabled for channel pairs CHANNEL_0_AND_1 or CHANNEL_2_AND_3.") if doubling_enabled or differential_mode_enabled: self._mirror_even_channel_settings_on_odd_channel(channel_pair) self._disable_odd_channel(channel_pair) self.write_to_spectrum_device_register( DIFFERENTIAL_CHANNEL_PAIR_COMMANDS[channel_pair], differential_mode_enabled ) self.write_to_spectrum_device_register(DOUBLING_CHANNEL_PAIR_COMMANDS[channel_pair], doubling_enabled)
Configures a pair of consecutive channels to operate either independently, in differential mode or in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is required by the Spectrum API.
Args
- channel_pair (ChannelPair): The pair of channels to configure
- mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL
View Source
def write_to_spectrum_device_register( self, spectrum_register: int, value: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> None: """Set the value of a register on the Spectrum device. This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to configure a hardware device, but can also be used to set the value of registers that are not implemented in `AbstractSpectrumDigitiser` and its subclasses. Args: spectrum_register (int): Identifier of the register to set. This should be a global constant imported from regs.py in the spectrum_gmbh package. value (int): Value to write to the register. This should be a global constant imported from regs.py in the spectrum_gmbh package. length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register to set, in bits. """ if not SPECTRUM_DRIVERS_FOUND: raise SpectrumDriversNotFound( "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" " MockSpectrumDigitiserCard instead." ) if self.connected: if length == SpectrumRegisterLength.THIRTY_TWO: set_spectrum_i32_api_param(self._handle, spectrum_register, value) elif length == SpectrumRegisterLength.SIXTY_FOUR: set_spectrum_i64_api_param(self._handle, spectrum_register, value) else: raise ValueError("Spectrum integer length not recognised.") else: raise SpectrumDeviceNotConnected("The device has been disconnected.")
Set the value of a register on the Spectrum device.
This method is used internally by AbstractSpectrumDigitiser
and its subclasses to configure a hardware
device, but can also be used to set the value of registers that are not implemented in
AbstractSpectrumDigitiser
and its subclasses.
Args
- spectrum_register (int): Identifier of the register to set. This should be a global constant imported from regs.py in the spectrum_gmbh package.
- value (int): Value to write to the register. This should be a global constant imported from regs.py in the spectrum_gmbh package.
- length (
SpectrumRegisterLength
): ASpectrumRegisterLength
object specifying the length of the register to set, in bits.
View Source
def read_spectrum_device_register( self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> int: """Get the value of a register on the Spectrum digitiser. This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to read the configuration of a hardware device, but can be also used to get the value of registers that are not implemented in `AbstractSpectrumDigitiser` and its subclasses. Args: spectrum_register (int): Identifier of the register to set. This should be a global constant imported from spectrum_gmbh.py_header.regs. length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register to set, in bits. Returns: value (int): Value of the register. This can be matched to a global constant imported from spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module. """ if not SPECTRUM_DRIVERS_FOUND: raise SpectrumDriversNotFound( "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" " a mock device instead (e.g. MockSpectrumDigitiserCard or MockSpectrumStarHub)." ) if self.connected: if length == SpectrumRegisterLength.THIRTY_TWO: return get_spectrum_i32_api_param(self._handle, spectrum_register) elif length == SpectrumRegisterLength.SIXTY_FOUR: return get_spectrum_i64_api_param(self._handle, spectrum_register) else: raise ValueError("Spectrum integer length not recognised.") else: raise SpectrumDeviceNotConnected("The device has been disconnected.")
Get the value of a register on the Spectrum digitiser.
This method is used internally by AbstractSpectrumDigitiser
and its subclasses to read the configuration of a
hardware device, but can be also used to get the value of registers that are not implemented in
AbstractSpectrumDigitiser
and its subclasses.
Args
- spectrum_register (int): Identifier of the register to set. This should be a global constant imported from spectrum_gmbh.py_header.regs.
- length (
SpectrumRegisterLength
): ASpectrumRegisterLength
object specifying the length of the register to set, in bits.
Returns
value (int): Value of the register. This can be matched to a global constant imported from spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module.
Inherited Members
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- connected
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- disconnect
- transfer_buffers
- define_transfer_buffer
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- available_io_modes
- feature_list
- timeout_in_ms
- set_timeout_in_ms
- force_trigger
- bytes_per_sample
- type
- model_number
View Source
class AbstractSpectrumChannel(SpectrumChannelInterface, Generic[ChannelNameType]): """Partially implemented abstract superclass contain code common for controlling an individual channel or IO Line of all spectrum devices.""" def __init__(self, channel_number: int, parent_device: SpectrumDeviceInterface, **kwargs: Any) -> None: super().__init__(**kwargs) self._name = self._make_name(channel_number) self._parent_device = parent_device self._enabled = True @property @abstractmethod def _name_prefix(self) -> str: raise NotImplementedError @abstractmethod def _make_name(self, channel_number: int) -> ChannelNameType: raise NotImplementedError @property def name(self) -> ChannelNameType: """The identifier assigned by the spectrum driver, formatted as an Enum by the settings package. Returns: name (`SpectrumChannelName`): The name of the channel, as assigned by the driver.""" return self._name @property def _number(self) -> int: return int(self.name.name.split(self._name_prefix)[-1]) def write_to_parent_device_register( self, spectrum_register: int, value: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> None: self._parent_device.write_to_spectrum_device_register(spectrum_register, value, length) def read_parent_device_register( self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> int: return self._parent_device.read_spectrum_device_register(spectrum_register, length) def __eq__(self, other: object) -> bool: if isinstance(other, AbstractSpectrumChannel): return (self.name == other.name) and (self._parent_device == other._parent_device) else: raise NotImplementedError() def __str__(self) -> str: return f"{self._name.name} of {self._parent_device}" def __repr__(self) -> str: return str(self)
Partially implemented abstract superclass contain code common for controlling an individual channel or IO Line of all spectrum devices.
The identifier assigned by the spectrum driver, formatted as an Enum by the settings package.
Returns
name (
SpectrumChannelName
): The name of the channel, as assigned by the driver.
View Source
def write_to_parent_device_register( self, spectrum_register: int, value: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> None: self._parent_device.write_to_spectrum_device_register(spectrum_register, value, length)
View Source
def read_parent_device_register( self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> int: return self._parent_device.read_spectrum_device_register(spectrum_register, length)
View Source
"""Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.""" # Christian Baker, King's College London # Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. from dataclasses import dataclass from enum import Enum from typing import List, Optional from numpy import int16 from numpy.typing import NDArray from spectrumdevice.settings.card_dependent_properties import ModelNumber from spectrumdevice.settings.card_features import CardFeature, AdvancedCardFeature from spectrumdevice.settings.channel import ( InputImpedance, InputCoupling, InputPath, OutputChannelFilter, OutputChannelStopLevelMode, ) from spectrumdevice.settings.device_modes import AcquisitionMode, ClockMode, GenerationMode from spectrumdevice.settings.io_lines import IOLineMode, AvailableIOModes from spectrumdevice.settings.transfer_buffer import ( TransferBuffer, ) from spectrumdevice.settings.triggering import TriggerSource, ExternalTriggerMode from spectrumdevice.settings.status import CARD_STATUS_TYPE, DEVICE_STATUS_TYPE, StatusCode from spectrumdevice.settings.pulse_generator import ( PulseGeneratorTriggerSettings, PulseGeneratorTriggerMode, PulseGeneratorTriggerDetectionMode, PulseGeneratorMultiplexer1TriggerSource, PulseGeneratorMultiplexer2TriggerSource, PulseGeneratorOutputSettings, ) __all__ = [ "AcquisitionSettings", "TriggerSettings", "AcquisitionMode", "ClockMode", "CardFeature", "AdvancedCardFeature", "IOLineMode", "AvailableIOModes", "TransferBuffer", "TriggerSource", "ExternalTriggerMode", "CARD_STATUS_TYPE", "DEVICE_STATUS_TYPE", "StatusCode", "SpectrumRegisterLength", "ModelNumber", "GenerationSettings", "OutputChannelFilter", "OutputChannelStopLevelMode", "GenerationMode", "PulseGeneratorTriggerSettings", "PulseGeneratorTriggerMode", "PulseGeneratorTriggerDetectionMode", "PulseGeneratorMultiplexer1TriggerSource", "PulseGeneratorMultiplexer2TriggerSource", "PulseGeneratorOutputSettings", ] @dataclass class TriggerSettings: """A dataclass collecting all settings related to triggering generation and acquisition. See Spectrum documentation. Note that pulse generators have their own trigger options.""" trigger_sources: List[TriggerSource] """The trigger sources to enable""" external_trigger_mode: Optional[ExternalTriggerMode] = None """The external trigger mode (if an external trigger is enabled).""" external_trigger_level_in_mv: Optional[int] = None """The level an external signal must reach to cause a trigger event (if an external trigger is enabled).""" external_trigger_pulse_width_in_samples: Optional[int] = None """The required width of an external trigger pulse (if an external trigger is enabled).""" @dataclass class AcquisitionSettings: """A dataclass collecting all settings required to configure an acquisition. See Spectrum documentation.""" acquisition_mode: AcquisitionMode """Standard Single mode, Multi FIF mode or an averaging mode.""" sample_rate_in_hz: int """Acquisition rate in samples per second.""" acquisition_length_in_samples: int """The length of the recording in samples per channel.""" pre_trigger_length_in_samples: int """The number of samples of the recording that will have been acquired before the trigger event.""" timeout_in_ms: int """How long to wait for a trigger event before timing out.""" enabled_channels: List[int] """The channel indices to enable for the acquisition.""" vertical_ranges_in_mv: List[int] """The voltage range to apply to each enabled channel in mW.""" vertical_offsets_in_percent: List[int] """The DC offset to apply to each enabled channel as percentages of their vertical ranges.""" input_impedances: List[InputImpedance] """The input impedance settings to apply to each channel""" timestamping_enabled: bool """If True, Measurements will include the time at which the acquisition was triggered. Increases latency by ~10 ms. """ batch_size: int = 1 """The number of acquisitions to transfer to the PC before the resulting waveforms are returned by SpectrumDigitiserCard.get_waveforms().""" number_of_averages: int = 1 """If an averaging AcquisitionMode is selected, this defines the number of averages.""" input_couplings: Optional[List[InputCoupling]] = None """The coupling (AC or DC) to apply to each channel. Only available on some hardware, so default is None.""" input_paths: Optional[List[InputPath]] = None """The input path (HF or Buffered) to apply to each channel. Only available on some hardware, so default is None.""" @dataclass class GenerationSettings: """A dataclass collecting all settings required to configure signal generation. See Spectrum documentation.""" generation_mode: GenerationMode """SPC_REP_STD_SINGLE , SPC_REP_STD_SINGLERESTART""" waveform: NDArray[int16] """The waveform to generate.""" sample_rate_in_hz: int """Generation rate in samples per second.""" num_loops: int """In SPC_REP_STD_SINGLE mode: the number of times to repeat the waveform after a trigger is received. In SPC_REP_STD_SINGLERESTART: The number of times to wait for a trigger and generate waveform once.""" enabled_channels: list[int] """List of analog channel indices to enable for signal generation""" signal_amplitudes_in_mv: list[int] """The amplitude of each enabled channel.""" dc_offsets_in_mv: list[int] """The dc offset of each enabled channel.""" output_filters: list[OutputChannelFilter] """The output filter setting for each enabled channel.""" stop_level_modes: list[OutputChannelStopLevelMode] """The behavior of each enabled channel after the waveform ends.""" custom_stop_levels: Optional[list[Optional[int]]] = None """The stop level each channel will use it stop level mode is set to custom.""" class SpectrumRegisterLength(Enum): """Enum defining the possible lengths of a spectrum register.""" THIRTY_TWO = 0 """32 bit register""" SIXTY_FOUR = 1 """64 bit register""" def __repr__(self) -> str: return self.name
Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.
View Source
"""Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.""" # Christian Baker, King's College London # Copyright (c) 2024 School of Biomedical Engineering & Imaging Sciences, King's College London # Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT.
Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.
View Source
@dataclass class Measurement: """Measurement is a dataclass for storing a set of waveforms generated by a single acquisition, with a timestamp.""" waveforms: List[NDArray[float_]] """Contains the acquired waveforms as a list of 1D NumPy arrays""" timestamp: Optional[datetime] """The time at which the acquisition was triggered, as a datetime.datetime object"""
Measurement is a dataclass for storing a set of waveforms generated by a single acquisition, with a timestamp.
Contains the acquired waveforms as a list of 1D NumPy arrays
The time at which the acquisition was triggered, as a datetime.datetime object
View Source
class SpectrumAWGCard( AbstractSpectrumCard[SpectrumAWGAnalogChannelInterface, SpectrumAWGIOLineInterface], AbstractSpectrumAWG ): """Class for controlling individual Spectrum AWG cards.""" def _init_analog_channels(self) -> Sequence[SpectrumAWGAnalogChannelInterface]: num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES) num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE) total_channels = num_modules * num_channels_per_module return tuple([SpectrumAWGAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)]) def _init_io_lines(self) -> Sequence[SpectrumAWGIOLineInterface]: if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES: return tuple([SpectrumAWGIOLine(channel_number=n, parent_device=self) for n in range(4)]) else: raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.") def transfer_waveform(self, waveform: NDArray[int16]) -> None: """ "Write an arbitrary waveform to the card's on-board memory. Args: waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16 (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of 8 samples, the waveform will be zero-padded so its length is the next multiple of 8. """ if len(waveform) < 16: raise ValueError("Waveform must be at least 16 samples long") step_size = get_memsize_step_size(self._model_number) remainder = len(waveform) % step_size if remainder > 0: logger.warning( "Length of waveform transmitted to AWG is not a multiple of 8 samples. Waveform in card memory will be " "zero-padded to the next multiple of 8." ) coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder) buffer = transfer_buffer_factory( buffer_type=BufferType.SPCM_BUF_DATA, direction=BufferDirection.SPCM_DIR_PCTOCARD, size_in_samples=coerced_mem_size, bytes_per_sample=self.bytes_per_sample, ) buffer.data_array[:] = concatenate([waveform, zeros(coerced_mem_size - len(waveform), dtype=int16)]) self.define_transfer_buffer((buffer,)) self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size) self.start_transfer() self.wait_for_transfer_chunk_to_complete() def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Provide a `TransferBuffer` object for transferring samples to the card. This is called internally when transfer_waveform is used to send a single waveform to the card. Args: buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed `TransferBuffer` The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the length of the data to transfer. """ if buffer is None: raise ValueError( "You must provide a preconfigured buffer for transferring samples to an AWG because the" "buffer size cannot be inferred." ) self._transfer_buffer = buffer[0] set_transfer_buffer(self._handle, self._transfer_buffer)
Class for controlling individual Spectrum AWG cards.
View Source
def transfer_waveform(self, waveform: NDArray[int16]) -> None: """ "Write an arbitrary waveform to the card's on-board memory. Args: waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16 (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of 8 samples, the waveform will be zero-padded so its length is the next multiple of 8. """ if len(waveform) < 16: raise ValueError("Waveform must be at least 16 samples long") step_size = get_memsize_step_size(self._model_number) remainder = len(waveform) % step_size if remainder > 0: logger.warning( "Length of waveform transmitted to AWG is not a multiple of 8 samples. Waveform in card memory will be " "zero-padded to the next multiple of 8." ) coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder) buffer = transfer_buffer_factory( buffer_type=BufferType.SPCM_BUF_DATA, direction=BufferDirection.SPCM_DIR_PCTOCARD, size_in_samples=coerced_mem_size, bytes_per_sample=self.bytes_per_sample, ) buffer.data_array[:] = concatenate([waveform, zeros(coerced_mem_size - len(waveform), dtype=int16)]) self.define_transfer_buffer((buffer,)) self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size) self.start_transfer() self.wait_for_transfer_chunk_to_complete()
"Write an arbitrary waveform to the card's on-board memory.
Args
- waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16 (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of 8 samples, the waveform will be zero-padded so its length is the next multiple of 8.
View Source
def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Provide a `TransferBuffer` object for transferring samples to the card. This is called internally when transfer_waveform is used to send a single waveform to the card. Args: buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed `TransferBuffer` The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the length of the data to transfer. """ if buffer is None: raise ValueError( "You must provide a preconfigured buffer for transferring samples to an AWG because the" "buffer size cannot be inferred." ) self._transfer_buffer = buffer[0] set_transfer_buffer(self._handle, self._transfer_buffer)
Provide a TransferBuffer
object for transferring samples to the card. This is called internally when
transfer_waveform is used to send a single waveform to the card.
Args
- buffer (Optional[List[
TransferBuffer
]]): A length-1 list containing a pre-constructedTransferBuffer
The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the length of the data to transfer.
Inherited Members
- AbstractSpectrumCard
- AbstractSpectrumCard
- model_number
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- set_sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
- spectrumdevice.devices.awg.abstract_spectrum_awg.AbstractSpectrumAWG
- configure_generation
- generation_mode
- set_generation_mode
- num_loops
- set_num_loops
View Source
class MockSpectrumAWGCard(MockAbstractSpectrumAWG, MockAbstractSpectrumCard, SpectrumAWGCard): """A mock AWG card.""" def __init__( self, device_number: int, model: ModelNumber, num_modules: int, num_channels_per_module: int, card_features: Optional[list[CardFeature]] = None, advanced_card_features: Optional[list[AdvancedCardFeature]] = None, ) -> None: """ Args: device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally. model (ModelNumber): The model of card to mock. num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has. num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set. card_features (list[CardFeature]): List of available features of the mock device advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device """ super().__init__( card_type=CardType.SPCM_TYPE_AO, device_number=device_number, model=model, num_modules=num_modules, num_channels_per_module=num_channels_per_module, card_features=card_features if card_features is not None else [], advanced_card_features=advanced_card_features if advanced_card_features is not None else [], ) self._connect(self._visa_string) def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide a `TransferBuffer` object for transferring samples from the device. See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.""" if buffer is None: raise ValueError( "You must provide a preconfigured buffer for transferring samples to an AWG because the" "buffer size cannot be inferred." ) self._transfer_buffer = buffer[0]
A mock AWG card.
View Source
def __init__( self, device_number: int, model: ModelNumber, num_modules: int, num_channels_per_module: int, card_features: Optional[list[CardFeature]] = None, advanced_card_features: Optional[list[AdvancedCardFeature]] = None, ) -> None: """ Args: device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally. model (ModelNumber): The model of card to mock. num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has. num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set. card_features (list[CardFeature]): List of available features of the mock device advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device """ super().__init__( card_type=CardType.SPCM_TYPE_AO, device_number=device_number, model=model, num_modules=num_modules, num_channels_per_module=num_channels_per_module, card_features=card_features if card_features is not None else [], advanced_card_features=advanced_card_features if advanced_card_features is not None else [], ) self._connect(self._visa_string)
Args
- device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally.
- model (ModelNumber): The model of card to mock.
- num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has.
- num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set.
- card_features (list[CardFeature]): List of available features of the mock device
- advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
View Source
def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: """Create or provide a `TransferBuffer` object for transferring samples from the device. See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.""" if buffer is None: raise ValueError( "You must provide a preconfigured buffer for transferring samples to an AWG because the" "buffer size cannot be inferred." ) self._transfer_buffer = buffer[0]
Create or provide a TransferBuffer
object for transferring samples from the device.
See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.
Inherited Members
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDevice
- write_to_spectrum_device_register
- read_spectrum_device_register
- AbstractSpectrumCard
- model_number
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- set_sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
- spectrumdevice.devices.awg.abstract_spectrum_awg.AbstractSpectrumAWG
- configure_generation
- generation_mode
- set_generation_mode
- num_loops
- set_num_loops
View Source
class SpectrumAWGAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumAWGAnalogChannelInterface): """Class for controlling analog channels of an AWG.""" def __init__(self, parent_device: SpectrumAWGInterface, **kwargs: Any) -> None: if parent_device.type != CardType.SPCM_TYPE_AO: raise SpectrumCardIsNotAnAWG(parent_device.type) super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy def _get_settings_as_dict(self) -> dict: return { SpectrumAWGAnalogChannel.signal_amplitude_in_mv.__name__: self.signal_amplitude_in_mv, SpectrumAWGAnalogChannel.dc_offset_in_mv.__name__: self.dc_offset_in_mv, SpectrumAWGAnalogChannel.output_filter.__name__: self.output_filter, SpectrumAWGAnalogChannel.stop_level_mode.__name__: self.stop_level_mode, SpectrumAWGAnalogChannel.stop_level_custom_value.__name__: self.stop_level_custom_value, } def _set_settings_from_dict(self, settings: dict) -> None: self.set_signal_amplitude_in_mv(settings[SpectrumAWGAnalogChannel.signal_amplitude_in_mv.__name__]) self.set_dc_offset_in_mv(settings[SpectrumAWGAnalogChannel.dc_offset_in_mv.__name__]) self.set_output_filter(settings[SpectrumAWGAnalogChannel.output_filter.__name__]) self.set_stop_level_mode(settings[SpectrumAWGAnalogChannel.stop_level_mode.__name__]) self.set_stop_level_custom_value(settings[SpectrumAWGAnalogChannel.stop_level_custom_value.__name__]) @property def is_switched_on(self) -> bool: """Returns "True" if the output channel is switched on, or "False" if it is muted.""" return bool(self._parent_device.read_spectrum_device_register(OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number])) def set_is_switched_on(self, is_enabled: bool) -> None: """Switches the output channel on ("True") or off ("False").""" self._parent_device.write_to_spectrum_device_register( OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number], int(is_enabled) ) @property def dc_offset_in_mv(self) -> int: """The current output signal DC offset in mV. Returns: dc_offset (int): The currently set output signal DC offset in mV. """ return self._parent_device.read_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number]) def set_dc_offset_in_mv(self, dc_offset: int) -> None: if dc_offset > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: raise ValueError( f"Max allowed signal DC offset for card {self._parent_device.model_number} is " f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " f"so {dc_offset} mV is too high." ) self._parent_device.write_to_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number], dc_offset) @property def signal_amplitude_in_mv(self) -> int: """The current output signal amplitude in mV. Returns: amplitude (int): The currently set output signal amplitude in mV. """ return self._parent_device.read_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number]) def set_signal_amplitude_in_mv(self, amplitude: int) -> None: if amplitude > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: raise ValueError( f"Max allowed signal amplitude for card {self._parent_device.model_number} is " f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " f"so {amplitude} mV is too high." ) self._parent_device.write_to_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number], amplitude) @property def output_filter(self) -> OutputChannelFilter: """The current output filter setting. Returns: output_filter (OutputChannelFilter): The currently set output filter. """ return OutputChannelFilter( self._parent_device.read_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number]) ) def set_output_filter(self, output_filter: OutputChannelFilter) -> None: self._parent_device.write_to_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number], output_filter.value) @property def stop_level_mode(self) -> OutputChannelStopLevelMode: """Sets the behavior of the channel when the output is stopped or playback finished.""" return OutputChannelStopLevelMode( self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number]) ) def set_stop_level_mode(self, mode: OutputChannelStopLevelMode) -> None: self._parent_device.write_to_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number], mode.value) @property def stop_level_custom_value(self) -> int16: """Sets the level to which the output will be set when the output is stopped or playback finished and stop_level_mode is set to `OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM`.""" return int16( self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number]) ) def set_stop_level_custom_value(self, value: int16) -> None: self._parent_device.write_to_spectrum_device_register( OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number], int(value) )
Class for controlling analog channels of an AWG.
View Source
def __init__(self, parent_device: SpectrumAWGInterface, **kwargs: Any) -> None: if parent_device.type != CardType.SPCM_TYPE_AO: raise SpectrumCardIsNotAnAWG(parent_device.type) super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy
Returns "True" if the output channel is switched on, or "False" if it is muted.
View Source
def set_is_switched_on(self, is_enabled: bool) -> None: """Switches the output channel on ("True") or off ("False").""" self._parent_device.write_to_spectrum_device_register( OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number], int(is_enabled) )
Switches the output channel on ("True") or off ("False").
The current output signal DC offset in mV.
Returns
dc_offset (int): The currently set output signal DC offset in mV.
View Source
def set_dc_offset_in_mv(self, dc_offset: int) -> None: if dc_offset > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: raise ValueError( f"Max allowed signal DC offset for card {self._parent_device.model_number} is " f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " f"so {dc_offset} mV is too high." ) self._parent_device.write_to_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number], dc_offset)
The current output signal amplitude in mV.
Returns
amplitude (int): The currently set output signal amplitude in mV.
View Source
def set_signal_amplitude_in_mv(self, amplitude: int) -> None: if amplitude > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: raise ValueError( f"Max allowed signal amplitude for card {self._parent_device.model_number} is " f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " f"so {amplitude} mV is too high." ) self._parent_device.write_to_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number], amplitude)
The current output filter setting.
Returns
output_filter (OutputChannelFilter): The currently set output filter.
View Source
def set_output_filter(self, output_filter: OutputChannelFilter) -> None: self._parent_device.write_to_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number], output_filter.value)
Sets the behavior of the channel when the output is stopped or playback finished.
View Source
def set_stop_level_mode(self, mode: OutputChannelStopLevelMode) -> None: self._parent_device.write_to_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number], mode.value)
Sets the level to which the output will be set when the output is stopped or playback finished and
stop_level_mode is set to OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM
.
View Source
def set_stop_level_custom_value(self, value: int16) -> None: self._parent_device.write_to_spectrum_device_register( OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number], int(value) )
Inherited Members
- spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface
- copy_settings_from_other_channel
View Source
class SpectrumAWGIOLine(AbstractSpectrumIOLine, SpectrumAWGIOLineInterface): """Class for controlling Multipurpose IO lines of an AWG (e.g. X0, X1, X2 and X3)""" def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: if parent_device.type != CardType.SPCM_TYPE_AO: raise SpectrumCardIsNotAnAWG(parent_device.type) super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy self._dig_out_settings = DigOutIOLineModeSettings( source_channel=DigOutSourceChannel.SPCM_XMODE_DIGOUTSRC_CH0, source_bit=DigOutSourceBit.SPCM_XMODE_DIGOUTSRC_BIT15, ) @property def dig_out_settings(self) -> DigOutIOLineModeSettings: return self._dig_out_settings def set_dig_out_settings(self, dig_out_settings: DigOutIOLineModeSettings) -> None: self._dig_out_settings = dig_out_settings def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: if mode == IOLineMode.SPCM_XMODE_DIGOUT: return self._dig_out_settings.source_channel.value | self._dig_out_settings.source_bit.value else: return 0
Class for controlling Multipurpose IO lines of an AWG (e.g. X0, X1, X2 and X3)
View Source
def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: if parent_device.type != CardType.SPCM_TYPE_AO: raise SpectrumCardIsNotAnAWG(parent_device.type) super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy self._dig_out_settings = DigOutIOLineModeSettings( source_channel=DigOutSourceChannel.SPCM_XMODE_DIGOUTSRC_CH0, source_bit=DigOutSourceBit.SPCM_XMODE_DIGOUTSRC_BIT15, )
View Source
def set_dig_out_settings(self, dig_out_settings: DigOutIOLineModeSettings) -> None: self._dig_out_settings = dig_out_settings
Inherited Members
- spectrumdevice.devices.abstract_device.abstract_spectrum_io_line.AbstractSpectrumIOLine
- mode
- set_mode
- pulse_generator
View Source
class PulseGenerator(PulseGeneratorInterface): """Class for controlling pulse generators associated with IO lines (requires firmware option be enabled).""" def __init__(self, parent: SpectrumIOLineInterface): self._parent_io_line = parent # last char of IO line name is IO line chanel number, which is used to set pulse generator number self._number = int(parent.name.name[-1]) available_advanced_features = decode_advanced_card_features( self.read_parent_device_register(SPC_PCIEXTFEATURES) ) if AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN not in available_advanced_features: raise SpectrumFeatureNotSupportedByCard( call_description=self.__str__() + ".__init__()", message="Pulse generator firmware option not installed on device.", ) self._multiplexer_1 = PulseGeneratorMultiplexer1(parent=self) self._multiplexer_2 = PulseGeneratorMultiplexer2(parent=self) def configure_output( self, settings: PulseGeneratorOutputSettings, coerce: bool = True ) -> PulseGeneratorOutputSettings: """Configure all pulse generator output settings at once. By default, all values are coerced to the nearest values allowed by the hardware, and the coerced values are returned.""" self.set_output_inversion(settings.output_inversion) coerced_settings = PulseGeneratorOutputSettings( period_in_seconds=self.set_period_in_seconds(settings.period_in_seconds, coerce=coerce), duty_cycle=self.set_duty_cycle(settings.duty_cycle, coerce=coerce), num_pulses=self.set_num_pulses(settings.num_pulses, coerce=coerce), delay_in_seconds=self.set_delay_in_seconds(settings.delay_in_seconds, coerce=coerce), output_inversion=settings.output_inversion, ) self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) return coerced_settings def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None: """Configure all pulse generator trigger settings at once.""" self.set_trigger_mode(settings.trigger_mode) self.set_trigger_detection_mode(settings.trigger_detection_mode) self.multiplexer_1.set_trigger_source(settings.multiplexer_1_source) self.multiplexer_2.set_trigger_source(settings.multiplexer_2_source) self.multiplexer_1.set_output_inversion(settings.multiplexer_1_output_inversion) self.multiplexer_2.set_output_inversion(settings.multiplexer_2_output_inversion) self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) def force_trigger(self) -> None: """Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'.""" if ( self._multiplexer_2.trigger_source != PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE ): raise SpectrumIOError("Force trigger can only be used if trigger source (mux 2) is set to 'software'") self.write_to_parent_device_register(SPC_XIO_PULSEGEN_COMMAND, SPCM_PULSEGEN_CMD_FORCE) @property def number(self) -> int: """The index of the pulse generator. Corresponds to the index of the IO line to which it belongs.""" return self._number @property def multiplexer_1(self) -> PulseGeneratorMultiplexer1: """Change the trigger source of this multiplexer to control when it is possible to trigger the pulse generator.""" return self._multiplexer_1 @property def multiplexer_2(self) -> PulseGeneratorMultiplexer2: """Change the trigger source of this multiplexer to control how the pulse generator is triggered.""" return self._multiplexer_2 def read_parent_device_register( self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO ) -> int: return self._parent_io_line.read_parent_device_register(spectrum_register, length) def write_to_parent_device_register( self, spectrum_register: int, value: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> None: self._parent_io_line.write_to_parent_device_register(spectrum_register, value, length) def _convert_clock_cycles_to_seconds(self, clock_cycles: int) -> float: return clock_cycles * self.clock_period_in_seconds def _convert_seconds_to_clock_cycles(self, seconds: float) -> float: # round to nearest milli-cycle to avoid floating point precision problems return round(seconds * self.clock_rate_in_hz * 1e3) / 1e3 def _get_enabled_pulse_generator_ids(self) -> list[int]: return decode_enabled_pulse_gens(self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE)) @property def clock_rate_in_hz(self) -> int: """The current pulse generator clock rate. Affected by the sample rate of the parent card, and the number of channels enabled. Effects the precision with which pulse timings can be set, and their min and max values.""" return self.read_parent_device_register(SPC_XIO_PULSEGEN_CLOCK) @property def clock_period_in_seconds(self) -> float: """The reciprocal of the clock rate, in seconds.""" return 1 / self.clock_rate_in_hz @property def enabled(self) -> bool: """True if the pulse generator is currently enabled.""" return PULSE_GEN_ENABLE_COMMANDS[self._number] in self._get_enabled_pulse_generator_ids() def enable(self) -> None: """Enable the pulse generator. Note that the mode of the parent IO Line must also be set to IOLineMOdO.SPCM_XMODE_PULSEGEN.""" current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], True) self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value) def disable(self) -> None: """Disable the pulse generator.""" current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], False) self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value) @property def output_inversion(self) -> bool: currently_enabled_config_options = decode_pulse_gen_config( self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) ) return SPCM_PULSEGEN_CONFIG_INVERT in currently_enabled_config_options def set_output_inversion(self, inverted: bool) -> None: current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) new_register_value = toggle_bitmap_value(current_register_value, SPCM_PULSEGEN_CONFIG_INVERT, inverted) self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value) @property def trigger_detection_mode(self) -> PulseGeneratorTriggerDetectionMode: """How the pulse generator trigger circuit responds to a trigger signal, .e.g rising edge...""" currently_enabled_config_options = decode_pulse_gen_config( self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) ) if PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value in currently_enabled_config_options: return PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH else: return PulseGeneratorTriggerDetectionMode.RISING_EDGE def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None: """e.g. rising edge, high-voltage...""" current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) high_voltage_mode_value = PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value new_register_value = toggle_bitmap_value( current_register_value, high_voltage_mode_value, mode == PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH, ) self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value) @property def trigger_mode(self) -> PulseGeneratorTriggerMode: """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.""" return PulseGeneratorTriggerMode( self.read_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number]) ) def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None: """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.""" self.write_to_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number], mode.value) @property def min_allowed_period_in_seconds(self) -> float: """Minimum allowed pulse period in seconds, given the current clock rate.""" reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MIN) reg_val = 0 if reg_val < 0 else reg_val return self._convert_clock_cycles_to_seconds(reg_val) @property def max_allowed_period_in_seconds(self) -> float: """Maximum allowed pulse period in seconds, given the current clock rate.""" reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MAX) reg_val = iinfo(int16).max if reg_val < 0 else reg_val return self._convert_clock_cycles_to_seconds(reg_val) @property def _allowed_period_step_size_in_clock_cycles(self) -> int: return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_STEP) @property def allowed_period_step_size_in_seconds(self) -> float: """Resolution with which the pulse period can be set, given the current clock rate.""" return self._convert_clock_cycles_to_seconds(self._allowed_period_step_size_in_clock_cycles) @property def period_in_seconds(self) -> float: """The pulse length in seconds, including both the high-voltage and low-voltage sections.""" return self._convert_clock_cycles_to_seconds( self.read_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number]) ) def set_period_in_seconds(self, period: float, coerce: bool = False) -> float: """Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.""" period_in_clock_cycles = self._convert_seconds_to_clock_cycles(period) coerced_period = _coerce_fractional_value_to_allowed_integer( period_in_clock_cycles, int(self._convert_seconds_to_clock_cycles(self.min_allowed_period_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.max_allowed_period_in_seconds)), self._allowed_period_step_size_in_clock_cycles, ) if not coerce and coerced_period != period_in_clock_cycles: raise SpectrumInvalidParameterValue( "pulse generator period", period, self.min_allowed_period_in_seconds, self.max_allowed_period_in_seconds, self.allowed_period_step_size_in_seconds, ) self.write_to_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number], int(coerced_period)) return self._convert_clock_cycles_to_seconds(coerced_period) @property def min_allowed_high_voltage_duration_in_seconds(self) -> float: """Minimum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.""" reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MIN) reg_val = 0 if reg_val < 0 else reg_val return self._convert_clock_cycles_to_seconds(reg_val) @property def max_allowed_high_voltage_duration_in_seconds(self) -> float: """Maximum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.""" reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MAX) reg_val = iinfo(int16).max if reg_val < 0 else reg_val return self._convert_clock_cycles_to_seconds(reg_val) @property def _allowed_high_voltage_duration_step_size_in_clock_cycles(self) -> int: return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_STEP) @property def allowed_high_voltage_duration_step_size_in_seconds(self) -> float: """Resolution with which the high-voltage duration can be set, in seconds, given the current clock rate.""" return self._convert_clock_cycles_to_seconds(self._allowed_high_voltage_duration_step_size_in_clock_cycles) @property def duration_of_high_voltage_in_seconds(self) -> float: """The length of the high-voltage part of a pulse, in seconds. Equal to the pulse duration * duty cycle.""" return self._convert_clock_cycles_to_seconds( self.read_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number]) ) @property def duration_of_low_voltage_in_seconds(self) -> float: """The length of the low-voltage part of a pulse, in seconds. Equal to the pulse duration * (1 - duty cycle).""" return self.period_in_seconds - self.duration_of_high_voltage_in_seconds @property def duty_cycle(self) -> float: """The ratio between the high-voltage and low-voltage parts of the pulse.""" return self.duration_of_high_voltage_in_seconds / self.period_in_seconds def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float: """Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate. """ requested_high_v_duration_in_clock_cycles = self._convert_seconds_to_clock_cycles( self.period_in_seconds * duty_cycle ) clipped_duration = _coerce_fractional_value_to_allowed_integer( requested_high_v_duration_in_clock_cycles, int(self._convert_seconds_to_clock_cycles(self.min_allowed_high_voltage_duration_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.max_allowed_high_voltage_duration_in_seconds)), self._allowed_high_voltage_duration_step_size_in_clock_cycles, ) if not coerce and clipped_duration != requested_high_v_duration_in_clock_cycles: raise SpectrumInvalidParameterValue( "high-voltage duration", self.period_in_seconds * duty_cycle, self.min_allowed_high_voltage_duration_in_seconds, self.max_allowed_high_voltage_duration_in_seconds, self.allowed_high_voltage_duration_step_size_in_seconds, ) self.write_to_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number], clipped_duration) return self._convert_clock_cycles_to_seconds(clipped_duration) / self.period_in_seconds @property def min_allowed_pulses(self) -> int: """Minimum allowed number of pulses to transmit.""" return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MIN) @property def max_allowed_pulses(self) -> int: """Maximum allowed number of pulses to transmit.""" reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MAX) # my card has this register set to -2, which I assume means no limit (can't work it out from the docs) return reg_val if reg_val > 0 else iinfo(int16).max @property def allowed_num_pulses_step_size(self) -> int: """Resolution with which the number of pulses to transmit can be set.""" return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_STEP) @property def num_pulses(self) -> int: """The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously.""" return self.read_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number]) def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int: """Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.""" num_pulses = max(0, num_pulses) # make negative value 0 to enable continuous pulse generation coerced_num_pulses = _coerce_fractional_value_to_allowed_integer( float(num_pulses), self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size ) if not coerce and coerced_num_pulses != num_pulses: raise SpectrumInvalidParameterValue( "number of pulses", num_pulses, self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size, ) self.write_to_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number], coerced_num_pulses) return coerced_num_pulses @property def min_allowed_delay_in_seconds(self) -> float: """Minimum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.""" reg_value = self.read_parent_device_register(602007) # SPC_XIO_PULSEGEN_AVAILDELAY_MIN not in regs.py reg_value = 0 if reg_value == -1 else reg_value return self._convert_clock_cycles_to_seconds(reg_value) @property def max_allowed_delay_in_seconds(self) -> float: """Maximum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.""" reg_value = self.read_parent_device_register(602008) # SPC_XIO_PULSEGEN_AVAILDELAY_MAX not in regs.py reg_value = iinfo(int16).max if reg_value == -1 else reg_value return self._convert_clock_cycles_to_seconds(reg_value) @property def allowed_delay_step_size_in_seconds(self) -> float: """resolution with which the delay between the trigger event and pulse generation can be set, in seconds, given the current clock rate.""" return self._convert_clock_cycles_to_seconds( self.read_parent_device_register(602009) # SPC_XIO_PULSEGEN_AVAILDELAY_STEP not in regs.py ) @property def delay_in_seconds(self) -> float: """The delay between the trigger and the first pulse transmission""" return self._convert_clock_cycles_to_seconds( self.read_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number]) ) def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float: """Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised if the requested value is invalid.""" requested_delay_in_clock_cycles = self._convert_seconds_to_clock_cycles(delay_in_seconds) clipped_delay_in_clock_cycles = _coerce_fractional_value_to_allowed_integer( requested_delay_in_clock_cycles, int(self._convert_seconds_to_clock_cycles(self.min_allowed_delay_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.max_allowed_delay_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.allowed_delay_step_size_in_seconds)), ) if not coerce and clipped_delay_in_clock_cycles != requested_delay_in_clock_cycles: raise SpectrumInvalidParameterValue( "delay in seconds", requested_delay_in_clock_cycles, self.min_allowed_delay_in_seconds, self.max_allowed_delay_in_seconds, self.allowed_delay_step_size_in_seconds, ) self.write_to_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number], clipped_delay_in_clock_cycles) return self._convert_clock_cycles_to_seconds(clipped_delay_in_clock_cycles) def __str__(self) -> str: return f"Pulse generator {self._number} of {self._parent_io_line}."
Class for controlling pulse generators associated with IO lines (requires firmware option be enabled).
View Source
def __init__(self, parent: SpectrumIOLineInterface): self._parent_io_line = parent # last char of IO line name is IO line chanel number, which is used to set pulse generator number self._number = int(parent.name.name[-1]) available_advanced_features = decode_advanced_card_features( self.read_parent_device_register(SPC_PCIEXTFEATURES) ) if AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN not in available_advanced_features: raise SpectrumFeatureNotSupportedByCard( call_description=self.__str__() + ".__init__()", message="Pulse generator firmware option not installed on device.", ) self._multiplexer_1 = PulseGeneratorMultiplexer1(parent=self) self._multiplexer_2 = PulseGeneratorMultiplexer2(parent=self)
View Source
def configure_output( self, settings: PulseGeneratorOutputSettings, coerce: bool = True ) -> PulseGeneratorOutputSettings: """Configure all pulse generator output settings at once. By default, all values are coerced to the nearest values allowed by the hardware, and the coerced values are returned.""" self.set_output_inversion(settings.output_inversion) coerced_settings = PulseGeneratorOutputSettings( period_in_seconds=self.set_period_in_seconds(settings.period_in_seconds, coerce=coerce), duty_cycle=self.set_duty_cycle(settings.duty_cycle, coerce=coerce), num_pulses=self.set_num_pulses(settings.num_pulses, coerce=coerce), delay_in_seconds=self.set_delay_in_seconds(settings.delay_in_seconds, coerce=coerce), output_inversion=settings.output_inversion, ) self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) return coerced_settings
Configure all pulse generator output settings at once. By default, all values are coerced to the nearest values allowed by the hardware, and the coerced values are returned.
View Source
def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None: """Configure all pulse generator trigger settings at once.""" self.set_trigger_mode(settings.trigger_mode) self.set_trigger_detection_mode(settings.trigger_detection_mode) self.multiplexer_1.set_trigger_source(settings.multiplexer_1_source) self.multiplexer_2.set_trigger_source(settings.multiplexer_2_source) self.multiplexer_1.set_output_inversion(settings.multiplexer_1_output_inversion) self.multiplexer_2.set_output_inversion(settings.multiplexer_2_output_inversion) self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)
Configure all pulse generator trigger settings at once.
View Source
def force_trigger(self) -> None: """Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'.""" if ( self._multiplexer_2.trigger_source != PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE ): raise SpectrumIOError("Force trigger can only be used if trigger source (mux 2) is set to 'software'") self.write_to_parent_device_register(SPC_XIO_PULSEGEN_COMMAND, SPCM_PULSEGEN_CMD_FORCE)
Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'.
The index of the pulse generator. Corresponds to the index of the IO line to which it belongs.
Change the trigger source of this multiplexer to control when it is possible to trigger the pulse generator.
Change the trigger source of this multiplexer to control how the pulse generator is triggered.
View Source
def read_parent_device_register( self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO ) -> int: return self._parent_io_line.read_parent_device_register(spectrum_register, length)
View Source
def write_to_parent_device_register( self, spectrum_register: int, value: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, ) -> None: self._parent_io_line.write_to_parent_device_register(spectrum_register, value, length)
The current pulse generator clock rate. Affected by the sample rate of the parent card, and the number of channels enabled. Effects the precision with which pulse timings can be set, and their min and max values.
The reciprocal of the clock rate, in seconds.
True if the pulse generator is currently enabled.
View Source
def enable(self) -> None: """Enable the pulse generator. Note that the mode of the parent IO Line must also be set to IOLineMOdO.SPCM_XMODE_PULSEGEN.""" current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], True) self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)
Enable the pulse generator. Note that the mode of the parent IO Line must also be set to IOLineMOdO.SPCM_XMODE_PULSEGEN.
View Source
def disable(self) -> None: """Disable the pulse generator.""" current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], False) self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)
Disable the pulse generator.
View Source
def set_output_inversion(self, inverted: bool) -> None: current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) new_register_value = toggle_bitmap_value(current_register_value, SPCM_PULSEGEN_CONFIG_INVERT, inverted) self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)
How the pulse generator trigger circuit responds to a trigger signal, .e.g rising edge...
View Source
def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None: """e.g. rising edge, high-voltage...""" current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) high_voltage_mode_value = PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value new_register_value = toggle_bitmap_value( current_register_value, high_voltage_mode_value, mode == PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH, ) self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)
e.g. rising edge, high-voltage...
Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.
View Source
def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None: """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.""" self.write_to_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number], mode.value)
Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.
Minimum allowed pulse period in seconds, given the current clock rate.
Maximum allowed pulse period in seconds, given the current clock rate.
Resolution with which the pulse period can be set, given the current clock rate.
The pulse length in seconds, including both the high-voltage and low-voltage sections.
View Source
def set_period_in_seconds(self, period: float, coerce: bool = False) -> float: """Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.""" period_in_clock_cycles = self._convert_seconds_to_clock_cycles(period) coerced_period = _coerce_fractional_value_to_allowed_integer( period_in_clock_cycles, int(self._convert_seconds_to_clock_cycles(self.min_allowed_period_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.max_allowed_period_in_seconds)), self._allowed_period_step_size_in_clock_cycles, ) if not coerce and coerced_period != period_in_clock_cycles: raise SpectrumInvalidParameterValue( "pulse generator period", period, self.min_allowed_period_in_seconds, self.max_allowed_period_in_seconds, self.allowed_period_step_size_in_seconds, ) self.write_to_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number], int(coerced_period)) return self._convert_clock_cycles_to_seconds(coerced_period)
Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.
Minimum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.
Maximum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.
Resolution with which the high-voltage duration can be set, in seconds, given the current clock rate.
The length of the high-voltage part of a pulse, in seconds. Equal to the pulse duration * duty cycle.
The length of the low-voltage part of a pulse, in seconds. Equal to the pulse duration * (1 - duty cycle).
The ratio between the high-voltage and low-voltage parts of the pulse.
View Source
def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float: """Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate. """ requested_high_v_duration_in_clock_cycles = self._convert_seconds_to_clock_cycles( self.period_in_seconds * duty_cycle ) clipped_duration = _coerce_fractional_value_to_allowed_integer( requested_high_v_duration_in_clock_cycles, int(self._convert_seconds_to_clock_cycles(self.min_allowed_high_voltage_duration_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.max_allowed_high_voltage_duration_in_seconds)), self._allowed_high_voltage_duration_step_size_in_clock_cycles, ) if not coerce and clipped_duration != requested_high_v_duration_in_clock_cycles: raise SpectrumInvalidParameterValue( "high-voltage duration", self.period_in_seconds * duty_cycle, self.min_allowed_high_voltage_duration_in_seconds, self.max_allowed_high_voltage_duration_in_seconds, self.allowed_high_voltage_duration_step_size_in_seconds, ) self.write_to_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number], clipped_duration) return self._convert_clock_cycles_to_seconds(clipped_duration) / self.period_in_seconds
Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.
Minimum allowed number of pulses to transmit.
Maximum allowed number of pulses to transmit.
Resolution with which the number of pulses to transmit can be set.
The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously.
View Source
def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int: """Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.""" num_pulses = max(0, num_pulses) # make negative value 0 to enable continuous pulse generation coerced_num_pulses = _coerce_fractional_value_to_allowed_integer( float(num_pulses), self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size ) if not coerce and coerced_num_pulses != num_pulses: raise SpectrumInvalidParameterValue( "number of pulses", num_pulses, self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size, ) self.write_to_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number], coerced_num_pulses) return coerced_num_pulses
Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.
Minimum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.
Maximum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.
resolution with which the delay between the trigger event and pulse generation can be set, in seconds, given the current clock rate.
The delay between the trigger and the first pulse transmission
View Source
def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float: """Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised if the requested value is invalid.""" requested_delay_in_clock_cycles = self._convert_seconds_to_clock_cycles(delay_in_seconds) clipped_delay_in_clock_cycles = _coerce_fractional_value_to_allowed_integer( requested_delay_in_clock_cycles, int(self._convert_seconds_to_clock_cycles(self.min_allowed_delay_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.max_allowed_delay_in_seconds)), int(self._convert_seconds_to_clock_cycles(self.allowed_delay_step_size_in_seconds)), ) if not coerce and clipped_delay_in_clock_cycles != requested_delay_in_clock_cycles: raise SpectrumInvalidParameterValue( "delay in seconds", requested_delay_in_clock_cycles, self.min_allowed_delay_in_seconds, self.max_allowed_delay_in_seconds, self.allowed_delay_step_size_in_seconds, ) self.write_to_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number], clipped_delay_in_clock_cycles) return self._convert_clock_cycles_to_seconds(clipped_delay_in_clock_cycles)
Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised if the requested value is invalid.