spectrumdevice
A high-level, object-oriented Python library for controlling Spectrum Instrumentation devices.
spectrumdevice
can connect to individual cards or
StarHubs (e.g. the
NetBox). spectrumdevice
provides the following classes
for controlling devices:
Hardware Classes
Name | Purpose |
---|---|
SpectrumDigitiserCard |
Controlling individual digitiser cards |
SpectrumDigitiserStarHub |
Controlling digitiser cards aggregated with a StarHub |
SpectrumDigitiserAnalogChannel |
Controlling analog channels of a digitiser |
SpectrumDigitiserIOLine |
Controlling multipurpose IO lines of a digitiser |
SpectrumAWGCard |
Controlling individual AWG cards |
SpectrumAWGStarHub |
(not yet implemented) |
SpectrumAWGAnalogChannel |
Controlling analog channels of an AWG |
SpectrumAWGIOLine |
Controlling multipurpose IO lines of an AWG |
PulseGenerator |
Controlling pulse generators belonging to IO lines |
Mock Classes
spectrumdevice
also includes mock classes for testing software without drivers installed or hardware connected:
Name | Purpose |
---|---|
MockSpectrumDigitiserCard |
Mocking individual digitiser cards |
MockSpectrumDigitiserStarHub |
Mocking digitiser cards aggregated with a StarHub |
MockSpectrumAWGCard |
Mocking individual AWG cards |
MockSpectrumAWGStarHub |
Mocking AWG cards aggregated with a StarHub |
Abstract Classes
The following abstract classes provide common implementations for methods whose functionality is identical across different Spectrum devices. They cannot be instantiated themselves, but are included here as they contain documentation for methods inherited by the concrete (and therefore instantiatable) classes above.
Name | Purpose |
---|---|
AbstractSpectrumDevice |
Implements methods common to all devices |
AbstractSpectrumCard |
Implements methods common to all card |
AbstractSpectrumStarHub |
Implements methods common to all hubs |
AbstractSpectrumChannel |
Implements methods common to all channels |
AbstractSpectrumDigitiser |
Implements methods common to all digitisers |
AbstractSpectrumAWG |
Implements methods common to all AWGs |
Settings
The submodule spectrumdevice.settings
provides Enums and Dataclasses wrapping the register values provided by the
Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.
Resources
Reference Documentation
1""" 2A high-level, object-oriented Python library for controlling Spectrum Instrumentation devices. 3 4`spectrumdevice` can connect to individual cards or 5[StarHubs](https://spectrum-instrumentation.com/en/m4i-star-hub) (e.g. the 6[NetBox](https://spectrum-instrumentation.com/en/digitizernetbox)). `spectrumdevice` provides the following classes 7for controlling devices: 8 9### Hardware Classes 10| Name | Purpose | 11|----------------------------------|---------------------------------------------------------| 12| `SpectrumDigitiserCard` | Controlling individual digitiser cards | 13| `SpectrumDigitiserStarHub` | Controlling digitiser cards aggregated with a StarHub | 14| `SpectrumDigitiserAnalogChannel` | Controlling analog channels of a digitiser | 15| `SpectrumDigitiserIOLine` | Controlling multipurpose IO lines of a digitiser | 16| `SpectrumAWGCard` | Controlling individual AWG cards | 17| `SpectrumAWGStarHub` | (not yet implemented) | 18| `SpectrumAWGAnalogChannel` | Controlling analog channels of an AWG | 19| `SpectrumAWGIOLine` | Controlling multipurpose IO lines of an AWG | 20| `PulseGenerator` | Controlling pulse generators belonging to IO lines | 21 22### Mock Classes 23`spectrumdevice` also includes mock classes for testing software without drivers installed or hardware connected: 24 25| Name | Purpose | 26|--------------------------------|-----------------------------------------------------| 27| `MockSpectrumDigitiserCard` | Mocking individual digitiser cards | 28| `MockSpectrumDigitiserStarHub` | Mocking digitiser cards aggregated with a StarHub | 29| `MockSpectrumAWGCard` | Mocking individual AWG cards | 30| `MockSpectrumAWGStarHub` | Mocking AWG cards aggregated with a StarHub | 31 32### Abstract Classes 33The following abstract classes provide common implementations for methods whose functionality is identical across 34different Spectrum devices. They cannot be instantiated themselves, but are included here as they contain 35documentation for methods inherited by the concrete (and therefore instantiatable) classes above. 36 37| Name | Purpose | 38|--------------------------------|-----------------------------------------------------| 39| `AbstractSpectrumDevice` | Implements methods common to all devices | 40| `AbstractSpectrumCard` | Implements methods common to all card | 41| `AbstractSpectrumStarHub` | Implements methods common to all hubs | 42| `AbstractSpectrumChannel` | Implements methods common to all channels | 43| `AbstractSpectrumDigitiser` | Implements methods common to all digitisers | 44| `AbstractSpectrumAWG` | Implements methods common to all AWGs | 45 46### Settings 47The submodule `spectrumdevice.settings` provides Enums and Dataclasses wrapping the register values provided by the 48Spectrum API, to be used for configuring hardware and interpreting responses received from hardware. 49 50### Resources 51* [Source on GitHub](https://github.com/KCL-BMEIS/spectrumdevice) 52* [README including quickstart](https://github.com/KCL-BMEIS/spectrumdevice/blob/main/README.md) 53* [Examples](https://github.com/KCL-BMEIS/spectrumdevice/tree/main/example_scripts) 54* [PyPi](https://pypi.org/project/spectrumdevice/) 55* [API reference documentation](https://kcl-bmeis.github.io/spectrumdevice/) 56 57### Reference Documentation 58""" 59 60# Christian Baker, King's College London 61# Copyright (c) 2024 School of Biomedical Engineering & Imaging Sciences, King's College London 62# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. 63 64from .measurement import Measurement 65from .devices.digitiser.digitiser_card import SpectrumDigitiserCard 66from .devices.digitiser.digitiser_channel import SpectrumDigitiserAnalogChannel, SpectrumDigitiserIOLine 67from .devices.digitiser.digitiser_star_hub import SpectrumDigitiserStarHub 68from .devices.awg.awg_card import SpectrumAWGCard 69from .devices.awg.awg_channel import SpectrumAWGAnalogChannel, SpectrumAWGIOLine 70from .devices.mocks import MockSpectrumDigitiserCard, MockSpectrumDigitiserStarHub, MockSpectrumAWGCard 71from .devices.abstract_device import ( 72 AbstractSpectrumDevice, 73 AbstractSpectrumCard, 74 AbstractSpectrumChannel, 75 AbstractSpectrumStarHub, 76) 77from .devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser 78from .features.pulse_generator.pulse_generator import PulseGenerator 79 80__all__ = [ 81 "SpectrumDigitiserAnalogChannel", 82 "SpectrumDigitiserIOLine", 83 "SpectrumDigitiserCard", 84 "SpectrumDigitiserStarHub", 85 "MockSpectrumDigitiserCard", 86 "MockSpectrumDigitiserStarHub", 87 "AbstractSpectrumDigitiser", 88 "AbstractSpectrumStarHub", 89 "AbstractSpectrumCard", 90 "AbstractSpectrumDevice", 91 "AbstractSpectrumChannel", 92 "settings", 93 "features", 94 "Measurement", 95 "SpectrumAWGCard", 96 "MockSpectrumAWGCard", 97 "SpectrumAWGAnalogChannel", 98 "SpectrumAWGIOLine", 99 "PulseGenerator", 100] 101 102 103from . import _version 104 105__version__ = _version.get_versions()["version"] # type: ignore
47class SpectrumDigitiserAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumDigitiserAnalogChannelInterface): 48 """Class for controlling an individual channel of a spectrum digitiser. Channels are constructed automatically when 49 a `SpectrumDigitiserCard` or `SpectrumDigitiserStarHub` is instantiated, and can then be accessed via the 50 `.channels` property.""" 51 52 def __init__(self, channel_number: int, parent_device: SpectrumDigitiserInterface) -> None: 53 54 if parent_device.type != CardType.SPCM_TYPE_AI: 55 raise SpectrumCardIsNotADigitiser(parent_device.type) 56 57 # pass unused args up the inheritance hierarchy 58 super().__init__(channel_number=channel_number, parent_device=parent_device) 59 60 self._full_scale_value = self._parent_device.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE) 61 # used frequently so store locally instead of reading from device each time: 62 self._vertical_range_mv = self.vertical_range_in_mv 63 self._vertical_offset_in_percent = self.vertical_offset_in_percent 64 65 def _get_settings_as_dict(self) -> dict: 66 return { 67 SpectrumDigitiserAnalogChannel.input_path.__name__: self.input_path, 68 SpectrumDigitiserAnalogChannel.input_coupling.__name__: self.input_coupling, 69 SpectrumDigitiserAnalogChannel.input_impedance.__name__: self.input_impedance, 70 SpectrumDigitiserAnalogChannel.vertical_range_in_mv.__name__: self.vertical_range_in_mv, 71 SpectrumDigitiserAnalogChannel.vertical_offset_in_percent.__name__: self.vertical_offset_in_percent, 72 } 73 74 def _set_settings_from_dict(self, settings: dict) -> None: 75 self.set_input_path(settings[SpectrumDigitiserAnalogChannel.input_path.__name__]) 76 self.set_input_coupling(settings[SpectrumDigitiserAnalogChannel.input_coupling.__name__]) 77 self.set_input_impedance(settings[SpectrumDigitiserAnalogChannel.input_impedance.__name__]) 78 self.set_vertical_range_in_mv(settings[SpectrumDigitiserAnalogChannel.vertical_range_in_mv.__name__]) 79 self.set_vertical_offset_in_percent( 80 settings[SpectrumDigitiserAnalogChannel.vertical_offset_in_percent.__name__] 81 ) 82 83 def convert_raw_waveform_to_voltage_waveform(self, raw_waveform: ndarray) -> ndarray: 84 vertical_offset_mv = 0.01 * float(self._vertical_range_mv * self._vertical_offset_in_percent) 85 return 1e-3 * ( 86 float(self._vertical_range_mv) * raw_waveform / float(self._full_scale_value) + vertical_offset_mv 87 ) 88 89 @property 90 def vertical_range_in_mv(self) -> int: 91 """The currently set input range of the channel in mV. 92 93 Returns: 94 vertical_range (int): The currently set vertical range in mV. 95 """ 96 self._vertical_range_mv = self._parent_device.read_spectrum_device_register( 97 VERTICAL_RANGE_COMMANDS[self._number] 98 ) 99 return self._vertical_range_mv 100 101 def set_vertical_range_in_mv(self, vertical_range: int) -> None: 102 """Set the input range of the channel in mV. See Spectrum documentation for valid values. 103 104 Args: 105 vertical_range (int): The desired vertical range in mV. 106 """ 107 self._parent_device.write_to_spectrum_device_register(VERTICAL_RANGE_COMMANDS[self._number], vertical_range) 108 self._vertical_range_mv = vertical_range 109 110 @property 111 def vertical_offset_in_percent(self) -> int: 112 """The currently set input offset of the channel in percent of the vertical range. 113 114 Returns: 115 offset (int): The currently set vertical offset in percent. 116 """ 117 self._vertical_offset_in_percent = self._parent_device.read_spectrum_device_register( 118 VERTICAL_OFFSET_COMMANDS[self._number] 119 ) 120 return self._vertical_offset_in_percent 121 122 def set_vertical_offset_in_percent(self, offset: int) -> None: 123 """Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid 124 values. 125 126 Args: 127 offset (int): The desired vertical offset in percent. 128 """ 129 self._parent_device.write_to_spectrum_device_register(VERTICAL_OFFSET_COMMANDS[self._number], offset) 130 self._vertical_offset_in_percent = offset 131 132 @property 133 def input_impedance(self) -> InputImpedance: 134 """The current input impedance setting of the channel (50 Ohm or 1 MOhm)""" 135 impedance_binary_value = self._parent_device.read_spectrum_device_register( 136 INPUT_IMPEDANCE_COMMANDS[self._number] 137 ) 138 return InputImpedance(impedance_binary_value) 139 140 def set_input_impedance(self, input_impedance: InputImpedance) -> None: 141 self._parent_device.write_to_spectrum_device_register( 142 INPUT_IMPEDANCE_COMMANDS[self._number], input_impedance.value 143 ) 144 145 @property 146 def input_coupling(self) -> InputCoupling: 147 """The coupling (AC or DC) setting of the channel. Only available on some hardware.""" 148 coupling_binary_value = self._parent_device.read_spectrum_device_register(INPUT_COUPLING_COMMANDS[self._number]) 149 return InputCoupling(coupling_binary_value) 150 151 def set_input_coupling(self, input_coupling: InputCoupling) -> None: 152 self._parent_device.write_to_spectrum_device_register( 153 INPUT_COUPLING_COMMANDS[self._number], input_coupling.value 154 ) 155 156 @property 157 def input_path(self) -> InputPath: 158 """The input path setting of the channel. Only available on some hardware.""" 159 path_binary_value = self._parent_device.read_spectrum_device_register(INPUT_PATH_COMMANDS[self._number]) 160 return InputPath(path_binary_value) 161 162 def set_input_path(self, input_path: InputPath) -> None: 163 self._parent_device.write_to_spectrum_device_register(INPUT_PATH_COMMANDS[self._number], input_path.value)
Class for controlling an individual channel of a spectrum digitiser. Channels are constructed automatically when
a SpectrumDigitiserCard
or SpectrumDigitiserStarHub
is instantiated, and can then be accessed via the
.channels
property.
52 def __init__(self, channel_number: int, parent_device: SpectrumDigitiserInterface) -> None: 53 54 if parent_device.type != CardType.SPCM_TYPE_AI: 55 raise SpectrumCardIsNotADigitiser(parent_device.type) 56 57 # pass unused args up the inheritance hierarchy 58 super().__init__(channel_number=channel_number, parent_device=parent_device) 59 60 self._full_scale_value = self._parent_device.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE) 61 # used frequently so store locally instead of reading from device each time: 62 self._vertical_range_mv = self.vertical_range_in_mv 63 self._vertical_offset_in_percent = self.vertical_offset_in_percent
83 def convert_raw_waveform_to_voltage_waveform(self, raw_waveform: ndarray) -> ndarray: 84 vertical_offset_mv = 0.01 * float(self._vertical_range_mv * self._vertical_offset_in_percent) 85 return 1e-3 * ( 86 float(self._vertical_range_mv) * raw_waveform / float(self._full_scale_value) + vertical_offset_mv 87 )
89 @property 90 def vertical_range_in_mv(self) -> int: 91 """The currently set input range of the channel in mV. 92 93 Returns: 94 vertical_range (int): The currently set vertical range in mV. 95 """ 96 self._vertical_range_mv = self._parent_device.read_spectrum_device_register( 97 VERTICAL_RANGE_COMMANDS[self._number] 98 ) 99 return self._vertical_range_mv
The currently set input range of the channel in mV.
Returns:
vertical_range (int): The currently set vertical range in mV.
101 def set_vertical_range_in_mv(self, vertical_range: int) -> None: 102 """Set the input range of the channel in mV. See Spectrum documentation for valid values. 103 104 Args: 105 vertical_range (int): The desired vertical range in mV. 106 """ 107 self._parent_device.write_to_spectrum_device_register(VERTICAL_RANGE_COMMANDS[self._number], vertical_range) 108 self._vertical_range_mv = vertical_range
Set the input range of the channel in mV. See Spectrum documentation for valid values.
Arguments:
- vertical_range (int): The desired vertical range in mV.
110 @property 111 def vertical_offset_in_percent(self) -> int: 112 """The currently set input offset of the channel in percent of the vertical range. 113 114 Returns: 115 offset (int): The currently set vertical offset in percent. 116 """ 117 self._vertical_offset_in_percent = self._parent_device.read_spectrum_device_register( 118 VERTICAL_OFFSET_COMMANDS[self._number] 119 ) 120 return self._vertical_offset_in_percent
The currently set input offset of the channel in percent of the vertical range.
Returns:
offset (int): The currently set vertical offset in percent.
122 def set_vertical_offset_in_percent(self, offset: int) -> None: 123 """Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid 124 values. 125 126 Args: 127 offset (int): The desired vertical offset in percent. 128 """ 129 self._parent_device.write_to_spectrum_device_register(VERTICAL_OFFSET_COMMANDS[self._number], offset) 130 self._vertical_offset_in_percent = offset
Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid values.
Arguments:
- offset (int): The desired vertical offset in percent.
132 @property 133 def input_impedance(self) -> InputImpedance: 134 """The current input impedance setting of the channel (50 Ohm or 1 MOhm)""" 135 impedance_binary_value = self._parent_device.read_spectrum_device_register( 136 INPUT_IMPEDANCE_COMMANDS[self._number] 137 ) 138 return InputImpedance(impedance_binary_value)
The current input impedance setting of the channel (50 Ohm or 1 MOhm)
145 @property 146 def input_coupling(self) -> InputCoupling: 147 """The coupling (AC or DC) setting of the channel. Only available on some hardware.""" 148 coupling_binary_value = self._parent_device.read_spectrum_device_register(INPUT_COUPLING_COMMANDS[self._number]) 149 return InputCoupling(coupling_binary_value)
The coupling (AC or DC) setting of the channel. Only available on some hardware.
156 @property 157 def input_path(self) -> InputPath: 158 """The input path setting of the channel. Only available on some hardware.""" 159 path_binary_value = self._parent_device.read_spectrum_device_register(INPUT_PATH_COMMANDS[self._number]) 160 return InputPath(path_binary_value)
The input path setting of the channel. Only available on some hardware.
Inherited Members
- spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface
- copy_settings_from_other_channel
35class SpectrumDigitiserIOLine(AbstractSpectrumIOLine, SpectrumDigitiserIOLineInterface): 36 """Class for controlling multipurpose IO lines of a digitiser, e.g. X0, X1, X2 and X3.""" 37 38 def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: 39 if parent_device.type != CardType.SPCM_TYPE_AI: 40 raise SpectrumCardIsNotADigitiser(parent_device.type) 41 super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy 42 43 def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: 44 return 0 # no settings required for DigOut
Class for controlling multipurpose IO lines of a digitiser, e.g. X0, X1, X2 and X3.
Inherited Members
- spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumIOLineInterface
- mode
- set_mode
- pulse_generator
57class SpectrumDigitiserCard( 58 AbstractSpectrumCard[SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface], 59 AbstractSpectrumDigitiser, 60): 61 """Class for controlling individual Spectrum digitiser cards.""" 62 63 def __init__(self, device_number: int, ip_address: Optional[str] = None) -> None: 64 """ 65 Args: 66 device_number (int): Index of the card to control. If only one card is present, set to 0. 67 ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. 68 69 """ 70 # pass unused args up the inheritance hierarchy 71 super().__init__(device_number=device_number, ip_address=ip_address) 72 73 if self.type != CardType.SPCM_TYPE_AI: 74 raise SpectrumCardIsNotADigitiser(self.type) 75 self._acquisition_mode = self.acquisition_mode 76 self._timestamper: Optional[Timestamper] = None 77 self._batch_size = 1 78 79 def _init_analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]: 80 num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES) 81 num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE) 82 total_channels = num_modules * num_channels_per_module 83 return tuple( 84 [SpectrumDigitiserAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)] 85 ) 86 87 def _init_io_lines(self) -> Sequence[SpectrumDigitiserIOLineInterface]: 88 if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES: 89 return tuple([SpectrumDigitiserIOLine(channel_number=n, parent_device=self) for n in range(4)]) 90 else: 91 raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.") 92 93 def enable_timestamping(self) -> None: 94 self._timestamper = Timestamper(self, self._handle) 95 96 def wait_for_acquisition_to_complete(self) -> None: 97 """Blocks until the current acquisition has finished, or the timeout is reached. 98 99 In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after `start()`. Once the call 100 to `wait_for_acquisition_to_complete()` returns, the newly acquired samples are in the on_device buffer and 101 ready for transfer to the `TransferBuffer` using `start_transfer()`. 102 103 In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until 104 `stop()` is called, so `wait_for_acquisition_to_complete()` should not be used. 105 106 """ 107 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY) 108 109 def get_raw_waveforms(self) -> List[List[NDArray[int16]]]: 110 """Get a list of the most recently transferred waveforms, in channel order, as 16-bit integers. 111 112 This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays 113 (waveforms) and returns the list. 114 115 In Standard Single mode (SPC_REC_STD_SINGLE), `get_waveforms()` should be called after 116 `wait_for_transfer_to_complete()` has returned. 117 118 In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the 119 `TransferBuffer`, this method should be called in a loop . The method will block until each new transfer is 120 received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example, 121 this would the rate at which your trigger source was running). 122 123 Returns: 124 waveforms (List[List[NDArray[int16]]]): A list of lists of 1D numpy arrays, one inner list per acquisition 125 and one array per enabled channel, in channel order. To average the acquisitions: 126 `np.array(waveforms).mean(axis=0)` 127 128 """ 129 if self._transfer_buffer is None: 130 raise SpectrumNoTransferBufferDefined("Cannot find a samples transfer buffer") 131 132 num_read_bytes = 0 133 num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums) 134 num_expected_bytes_per_frame = num_samples_per_frame * self._transfer_buffer.data_array.itemsize 135 raw_samples = zeros(num_samples_per_frame * self._batch_size, dtype=self._transfer_buffer.data_array.dtype) 136 137 if self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE): 138 raw_samples = self._transfer_buffer.copy_contents() 139 140 elif self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): 141 self.wait_for_transfer_chunk_to_complete() 142 143 while num_read_bytes < (num_expected_bytes_per_frame * self._batch_size): 144 num_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_LEN) 145 position_of_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_POS) 146 147 # Don't allow reading over the end of the transfer buffer 148 if ( 149 position_of_available_bytes + num_available_bytes 150 ) > self._transfer_buffer.data_array_length_in_bytes: 151 num_available_bytes = self._transfer_buffer.data_array_length_in_bytes - position_of_available_bytes 152 153 # Don't allow reading over the end of the current acquisition: 154 if (num_read_bytes + num_available_bytes) > (num_expected_bytes_per_frame * self._batch_size): 155 num_available_bytes = (num_expected_bytes_per_frame * self._batch_size) - num_read_bytes 156 157 num_available_samples = num_available_bytes // self._transfer_buffer.data_array.itemsize 158 num_read_samples = num_read_bytes // self._transfer_buffer.data_array.itemsize 159 160 raw_samples[ 161 num_read_samples : num_read_samples + num_available_samples 162 ] = self._transfer_buffer.read_chunk(position_of_available_bytes, num_available_bytes) 163 self.write_to_spectrum_device_register(SPC_DATA_AVAIL_CARD_LEN, num_available_bytes) 164 165 num_read_bytes += num_available_bytes 166 167 waveforms_in_columns = raw_samples.reshape( 168 (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums)) 169 ) 170 171 repeat_acquisitions = [] 172 for n in range(self._batch_size): 173 repeat_acquisitions.append([waveform for waveform in waveforms_in_columns[n, :, :].T]) 174 175 return repeat_acquisitions 176 177 def get_waveforms(self) -> List[List[NDArray[float64]]]: 178 """Get a list of the most recently transferred waveforms, in channel order, in Volts as floats. 179 180 See get_raw_waveforms() for details. 181 182 Returns: 183 waveforms (List[List[NDArray[float64]]]): A list of lists of 1D numpy arrays, one inner list per acquisition 184 and one array per enabled channel, in channel order. To average the acquisitions: 185 `np.array(waveforms).mean(axis=0)` 186 187 """ 188 raw_repeat_acquisitions = self.get_raw_waveforms() 189 repeat_acquisitions = [] 190 for n in range(self._batch_size): 191 repeat_acquisitions.append( 192 [ 193 cast( 194 SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num] 195 ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform)) 196 for ch_num, waveform in zip(self.enabled_analog_channel_nums, raw_repeat_acquisitions[n]) 197 ] 198 ) 199 return repeat_acquisitions 200 201 def get_timestamp(self) -> Optional[datetime.datetime]: 202 """Get timestamp for the last acquisition""" 203 if self._timestamper is not None: 204 return self._timestamper.get_timestamp() 205 else: 206 return None 207 208 @property 209 def acquisition_length_in_samples(self) -> int: 210 """The current recording length (per channel) in samples. 211 212 Returns: 213 length_in_samples (int): The current recording length ('acquisition length') in samples.""" 214 return self.read_spectrum_device_register(SPC_MEMSIZE) 215 216 def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: 217 """Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size 218 allowed by the connected card type. 219 220 Args: 221 length_in_samples (int): The desired recording length ('acquisition length'), in samples. 222 """ 223 length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) 224 self.write_to_spectrum_device_register(SPC_SEGMENTSIZE, length_in_samples) 225 self.write_to_spectrum_device_register(SPC_MEMSIZE, length_in_samples) 226 227 @property 228 def post_trigger_length_in_samples(self) -> int: 229 """The number of samples of the recording that will contain data received after the trigger event. 230 231 Returns: 232 length_in_samples (int): The currently set post trigger length in samples. 233 """ 234 return self.read_spectrum_device_register(SPC_POSTTRIGGER) 235 236 def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: 237 """Change the number of samples of the recording that will contain data received after the trigger event. 238 In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card. 239 240 Args: 241 length_in_samples (int): The desired post trigger length in samples.""" 242 length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) 243 if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI: 244 if (self.acquisition_length_in_samples - length_in_samples) < get_memsize_step_size(self._model_number): 245 logger.warning( 246 "FIFO mode: coercing post trigger length to maximum allowed value (step-size samples less than " 247 "the acquisition length)." 248 ) 249 length_in_samples = self.acquisition_length_in_samples - get_memsize_step_size(self._model_number) 250 self.write_to_spectrum_device_register(SPC_POSTTRIGGER, length_in_samples) 251 252 def _coerce_num_samples_if_fifo(self, value: int) -> int: 253 if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI: 254 if mod(value, get_memsize_step_size(self._model_number)) != 0: 255 logger.warning( 256 f"FIFO mode: coercing length to nearest {get_memsize_step_size(self._model_number)}" f" samples" 257 ) 258 value = int(value - mod(value, get_memsize_step_size(self._model_number))) 259 return value 260 261 @property 262 def number_of_averages(self) -> int: 263 return self.read_spectrum_device_register(SPC_AVERAGES) 264 265 def set_number_of_averages(self, num_averages: int) -> None: 266 if num_averages > 0: 267 self.write_to_spectrum_device_register(SPC_AVERAGES, num_averages) 268 else: 269 raise ValueError("Number of averages must be greater than 0.") 270 271 @property 272 def acquisition_mode(self) -> AcquisitionMode: 273 """The currently enabled card mode. Will raise an exception if the current mode is not supported by 274 `spectrumdevice`. 275 276 Returns: 277 mode (`AcquisitionMode`): The currently enabled card acquisition mode.""" 278 return AcquisitionMode(self.read_spectrum_device_register(SPC_CARDMODE)) 279 280 def set_acquisition_mode(self, mode: AcquisitionMode) -> None: 281 """Change the currently enabled card mode. See `AcquisitionMode` and the Spectrum documentation 282 for the available modes. 283 284 Args: 285 mode (`AcquisitionMode`): The desired acquisition mode.""" 286 self.write_to_spectrum_device_register(SPC_CARDMODE, mode.value) 287 288 @property 289 def batch_size(self) -> int: 290 return self._batch_size 291 292 def set_batch_size(self, batch_size: int) -> None: 293 self._batch_size = batch_size 294 295 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 296 """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. 297 298 If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode, 299 with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode, 300 one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be 301 created using the Timestamper class. 302 303 Args: 304 buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed 305 `TransferBuffer` set up for card-to-PC transfer of samples ("data"). The size of the buffer should be 306 chosen according to the current number of active channels, the acquisition length and the number 307 of acquisitions which you intend to download at a time using get_waveforms(). 308 """ 309 self._set_or_update_transfer_buffer_attribute(buffer) 310 if self._transfer_buffer is not None: 311 set_transfer_buffer(self._handle, self._transfer_buffer) 312 313 def _set_or_update_transfer_buffer_attribute(self, buffer: Optional[Sequence[TransferBuffer]]) -> None: 314 if buffer: 315 self._transfer_buffer = buffer[0] 316 if self._transfer_buffer.direction != BufferDirection.SPCM_DIR_CARDTOPC: 317 raise ValueError("Digitisers need a transfer buffer with direction BufferDirection.SPCM_DIR_CARDTOPC") 318 if self._transfer_buffer.type != BufferType.SPCM_BUF_DATA: 319 raise ValueError("Digitisers need a transfer buffer with type BufferDirection.SPCM_BUF_DATA") 320 elif self._transfer_buffer is None: 321 if self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): 322 samples_per_batch = ( 323 self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums) * self._batch_size 324 ) 325 pages_per_batch = samples_per_batch * self.bytes_per_sample / PAGE_SIZE_IN_BYTES 326 327 if pages_per_batch < DEFAULT_NOTIFY_SIZE_IN_PAGES: 328 notify_size = pages_per_batch 329 else: 330 notify_size = DEFAULT_NOTIFY_SIZE_IN_PAGES 331 332 # Make transfer buffer big enough to hold all samples in the batch 333 self._transfer_buffer = create_samples_acquisition_transfer_buffer( 334 size_in_samples=samples_per_batch, 335 notify_size_in_pages=notify_size, 336 bytes_per_sample=self.bytes_per_sample, 337 ) 338 elif self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE): 339 self._transfer_buffer = create_samples_acquisition_transfer_buffer( 340 size_in_samples=self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums), 341 notify_size_in_pages=0, 342 bytes_per_sample=self.bytes_per_sample, 343 ) 344 else: 345 raise ValueError("AcquisitionMode not recognised") 346 347 def __str__(self) -> str: 348 return f"Card {self._visa_string}"
Class for controlling individual Spectrum digitiser cards.
63 def __init__(self, device_number: int, ip_address: Optional[str] = None) -> None: 64 """ 65 Args: 66 device_number (int): Index of the card to control. If only one card is present, set to 0. 67 ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. 68 69 """ 70 # pass unused args up the inheritance hierarchy 71 super().__init__(device_number=device_number, ip_address=ip_address) 72 73 if self.type != CardType.SPCM_TYPE_AI: 74 raise SpectrumCardIsNotADigitiser(self.type) 75 self._acquisition_mode = self.acquisition_mode 76 self._timestamper: Optional[Timestamper] = None 77 self._batch_size = 1
Arguments:
- device_number (int): Index of the card to control. If only one card is present, set to 0.
- ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.
96 def wait_for_acquisition_to_complete(self) -> None: 97 """Blocks until the current acquisition has finished, or the timeout is reached. 98 99 In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after `start()`. Once the call 100 to `wait_for_acquisition_to_complete()` returns, the newly acquired samples are in the on_device buffer and 101 ready for transfer to the `TransferBuffer` using `start_transfer()`. 102 103 In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until 104 `stop()` is called, so `wait_for_acquisition_to_complete()` should not be used. 105 106 """ 107 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY)
Blocks until the current acquisition has finished, or the timeout is reached.
In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after start()
. Once the call
to wait_for_acquisition_to_complete()
returns, the newly acquired samples are in the on_device buffer and
ready for transfer to the TransferBuffer
using start_transfer()
.
In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until
stop()
is called, so wait_for_acquisition_to_complete()
should not be used.
109 def get_raw_waveforms(self) -> List[List[NDArray[int16]]]: 110 """Get a list of the most recently transferred waveforms, in channel order, as 16-bit integers. 111 112 This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays 113 (waveforms) and returns the list. 114 115 In Standard Single mode (SPC_REC_STD_SINGLE), `get_waveforms()` should be called after 116 `wait_for_transfer_to_complete()` has returned. 117 118 In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the 119 `TransferBuffer`, this method should be called in a loop . The method will block until each new transfer is 120 received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example, 121 this would the rate at which your trigger source was running). 122 123 Returns: 124 waveforms (List[List[NDArray[int16]]]): A list of lists of 1D numpy arrays, one inner list per acquisition 125 and one array per enabled channel, in channel order. To average the acquisitions: 126 `np.array(waveforms).mean(axis=0)` 127 128 """ 129 if self._transfer_buffer is None: 130 raise SpectrumNoTransferBufferDefined("Cannot find a samples transfer buffer") 131 132 num_read_bytes = 0 133 num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums) 134 num_expected_bytes_per_frame = num_samples_per_frame * self._transfer_buffer.data_array.itemsize 135 raw_samples = zeros(num_samples_per_frame * self._batch_size, dtype=self._transfer_buffer.data_array.dtype) 136 137 if self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE): 138 raw_samples = self._transfer_buffer.copy_contents() 139 140 elif self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): 141 self.wait_for_transfer_chunk_to_complete() 142 143 while num_read_bytes < (num_expected_bytes_per_frame * self._batch_size): 144 num_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_LEN) 145 position_of_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_POS) 146 147 # Don't allow reading over the end of the transfer buffer 148 if ( 149 position_of_available_bytes + num_available_bytes 150 ) > self._transfer_buffer.data_array_length_in_bytes: 151 num_available_bytes = self._transfer_buffer.data_array_length_in_bytes - position_of_available_bytes 152 153 # Don't allow reading over the end of the current acquisition: 154 if (num_read_bytes + num_available_bytes) > (num_expected_bytes_per_frame * self._batch_size): 155 num_available_bytes = (num_expected_bytes_per_frame * self._batch_size) - num_read_bytes 156 157 num_available_samples = num_available_bytes // self._transfer_buffer.data_array.itemsize 158 num_read_samples = num_read_bytes // self._transfer_buffer.data_array.itemsize 159 160 raw_samples[ 161 num_read_samples : num_read_samples + num_available_samples 162 ] = self._transfer_buffer.read_chunk(position_of_available_bytes, num_available_bytes) 163 self.write_to_spectrum_device_register(SPC_DATA_AVAIL_CARD_LEN, num_available_bytes) 164 165 num_read_bytes += num_available_bytes 166 167 waveforms_in_columns = raw_samples.reshape( 168 (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums)) 169 ) 170 171 repeat_acquisitions = [] 172 for n in range(self._batch_size): 173 repeat_acquisitions.append([waveform for waveform in waveforms_in_columns[n, :, :].T]) 174 175 return repeat_acquisitions
Get a list of the most recently transferred waveforms, in channel order, as 16-bit integers.
This method copies and reshapes the samples in the TransferBuffer
into a list of lists of 1D NumPy arrays
(waveforms) and returns the list.
In Standard Single mode (SPC_REC_STD_SINGLE), get_waveforms()
should be called after
wait_for_transfer_to_complete()
has returned.
In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the
TransferBuffer
, this method should be called in a loop . The method will block until each new transfer is
received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example,
this would the rate at which your trigger source was running).
Returns:
waveforms (List[List[NDArray[int16]]]): A list of lists of 1D numpy arrays, one inner list per acquisition and one array per enabled channel, in channel order. To average the acquisitions:
np.array(waveforms).mean(axis=0)
177 def get_waveforms(self) -> List[List[NDArray[float64]]]: 178 """Get a list of the most recently transferred waveforms, in channel order, in Volts as floats. 179 180 See get_raw_waveforms() for details. 181 182 Returns: 183 waveforms (List[List[NDArray[float64]]]): A list of lists of 1D numpy arrays, one inner list per acquisition 184 and one array per enabled channel, in channel order. To average the acquisitions: 185 `np.array(waveforms).mean(axis=0)` 186 187 """ 188 raw_repeat_acquisitions = self.get_raw_waveforms() 189 repeat_acquisitions = [] 190 for n in range(self._batch_size): 191 repeat_acquisitions.append( 192 [ 193 cast( 194 SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num] 195 ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform)) 196 for ch_num, waveform in zip(self.enabled_analog_channel_nums, raw_repeat_acquisitions[n]) 197 ] 198 ) 199 return repeat_acquisitions
Get a list of the most recently transferred waveforms, in channel order, in Volts as floats.
See get_raw_waveforms() for details.
Returns:
waveforms (List[List[NDArray[float64]]]): A list of lists of 1D numpy arrays, one inner list per acquisition and one array per enabled channel, in channel order. To average the acquisitions:
np.array(waveforms).mean(axis=0)
201 def get_timestamp(self) -> Optional[datetime.datetime]: 202 """Get timestamp for the last acquisition""" 203 if self._timestamper is not None: 204 return self._timestamper.get_timestamp() 205 else: 206 return None
Get timestamp for the last acquisition
208 @property 209 def acquisition_length_in_samples(self) -> int: 210 """The current recording length (per channel) in samples. 211 212 Returns: 213 length_in_samples (int): The current recording length ('acquisition length') in samples.""" 214 return self.read_spectrum_device_register(SPC_MEMSIZE)
The current recording length (per channel) in samples.
Returns:
length_in_samples (int): The current recording length ('acquisition length') in samples.
216 def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: 217 """Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size 218 allowed by the connected card type. 219 220 Args: 221 length_in_samples (int): The desired recording length ('acquisition length'), in samples. 222 """ 223 length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) 224 self.write_to_spectrum_device_register(SPC_SEGMENTSIZE, length_in_samples) 225 self.write_to_spectrum_device_register(SPC_MEMSIZE, length_in_samples)
Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size allowed by the connected card type.
Arguments:
- length_in_samples (int): The desired recording length ('acquisition length'), in samples.
227 @property 228 def post_trigger_length_in_samples(self) -> int: 229 """The number of samples of the recording that will contain data received after the trigger event. 230 231 Returns: 232 length_in_samples (int): The currently set post trigger length in samples. 233 """ 234 return self.read_spectrum_device_register(SPC_POSTTRIGGER)
The number of samples of the recording that will contain data received after the trigger event.
Returns:
length_in_samples (int): The currently set post trigger length in samples.
236 def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: 237 """Change the number of samples of the recording that will contain data received after the trigger event. 238 In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card. 239 240 Args: 241 length_in_samples (int): The desired post trigger length in samples.""" 242 length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples) 243 if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI: 244 if (self.acquisition_length_in_samples - length_in_samples) < get_memsize_step_size(self._model_number): 245 logger.warning( 246 "FIFO mode: coercing post trigger length to maximum allowed value (step-size samples less than " 247 "the acquisition length)." 248 ) 249 length_in_samples = self.acquisition_length_in_samples - get_memsize_step_size(self._model_number) 250 self.write_to_spectrum_device_register(SPC_POSTTRIGGER, length_in_samples)
Change the number of samples of the recording that will contain data received after the trigger event. In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card.
Arguments:
- length_in_samples (int): The desired post trigger length in samples.
271 @property 272 def acquisition_mode(self) -> AcquisitionMode: 273 """The currently enabled card mode. Will raise an exception if the current mode is not supported by 274 `spectrumdevice`. 275 276 Returns: 277 mode (`AcquisitionMode`): The currently enabled card acquisition mode.""" 278 return AcquisitionMode(self.read_spectrum_device_register(SPC_CARDMODE))
The currently enabled card mode. Will raise an exception if the current mode is not supported by
spectrumdevice
.
Returns:
mode (
AcquisitionMode
): The currently enabled card acquisition mode.
280 def set_acquisition_mode(self, mode: AcquisitionMode) -> None: 281 """Change the currently enabled card mode. See `AcquisitionMode` and the Spectrum documentation 282 for the available modes. 283 284 Args: 285 mode (`AcquisitionMode`): The desired acquisition mode.""" 286 self.write_to_spectrum_device_register(SPC_CARDMODE, mode.value)
Change the currently enabled card mode. See AcquisitionMode
and the Spectrum documentation
for the available modes.
Arguments:
- mode (
AcquisitionMode
): The desired acquisition mode.
295 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 296 """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. 297 298 If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode, 299 with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode, 300 one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be 301 created using the Timestamper class. 302 303 Args: 304 buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed 305 `TransferBuffer` set up for card-to-PC transfer of samples ("data"). The size of the buffer should be 306 chosen according to the current number of active channels, the acquisition length and the number 307 of acquisitions which you intend to download at a time using get_waveforms(). 308 """ 309 self._set_or_update_transfer_buffer_attribute(buffer) 310 if self._transfer_buffer is not None: 311 set_transfer_buffer(self._handle, self._transfer_buffer)
Create or provide a TransferBuffer
object for receiving acquired samples from the device.
If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode, with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode, one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be created using the Timestamper class.
Arguments:
- buffer (Optional[List[
TransferBuffer
]]): A length-1 list containing a pre-constructedTransferBuffer
set up for card-to-PC transfer of samples ("data"). The size of the buffer should be chosen according to the current number of active channels, the acquisition length and the number of acquisitions which you intend to download at a time using get_waveforms().
Inherited Members
- AbstractSpectrumDigitiser
- configure_acquisition
- execute_standard_single_acquisition
- execute_finite_fifo_acquisition
- execute_continuous_fifo_acquisition
- AbstractSpectrumCard
- model_number
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- set_sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
31class SpectrumDigitiserStarHub( 32 AbstractSpectrumStarHub[ 33 SpectrumDigitiserCard, SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface 34 ], 35 AbstractSpectrumDigitiser, 36): 37 """Composite class of `SpectrumDigitiserCard` for controlling a StarHub digitiser device, for example the Spectrum 38 NetBox. StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the 39 child cards of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the 40 control of a StarHub device as if it were a single Spectrum card.""" 41 42 def __init__(self, device_number: int, child_cards: tuple[SpectrumDigitiserCard, ...], master_card_index: int): 43 """ 44 Args: 45 device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. 46 child_cards (Sequence[`SpectrumDigitiserCard`]): A list of `SpectrumCard` objects defining the child cards 47 located within the StarHub, correctly constructed with their IP addresses and/or device numbers. 48 master_card_index (int): The position within child_cards where the master card (the card which controls the 49 clock) is located. 50 """ 51 super().__init__(device_number=device_number, child_cards=child_cards, master_card_index=master_card_index) 52 self._acquisition_mode = self.acquisition_mode 53 54 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 55 """Create or provide `CardToPCDataTransferBuffer` objects for receiving acquired samples from the child cards. 56 If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See 57 `SpectrumDigitiserCard.define_transfer_buffer()` for more information 58 59 Args: 60 buffer (Optional[`CardToPCDataTransferBuffer`]): A list containing pre-constructed 61 `CardToPCDataTransferBuffer` objects, one for each child card. The size of the buffers should be chosen 62 according to the current number of active channels in each card and the acquisition length. 63 """ 64 if buffer: 65 for card, buff in zip(self._child_cards, buffer): 66 card.define_transfer_buffer([buff]) 67 else: 68 for card in self._child_cards: 69 card.define_transfer_buffer() 70 71 def wait_for_acquisition_to_complete(self) -> None: 72 """Wait for each card to finish its acquisition. See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()` 73 for more information.""" 74 for card in self._child_cards: 75 card.wait_for_acquisition_to_complete() 76 77 def get_waveforms(self) -> List[List[NDArray[float64]]]: 78 """Get a list of the most recently transferred waveforms, as floating point voltages. 79 80 This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. 81 See `SpectrumDigitiserCard.get_waveforms()` for more information. 82 83 Returns: 84 waveforms (List[List[NDArray[float64]]]): A list lists of 1D numpy arrays, one inner list per acquisition, 85 and one array per enabled channel, in channel order. 86 """ 87 return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_waveforms) 88 89 def get_raw_waveforms(self) -> List[List[NDArray[int16]]]: 90 """Get a list of the most recently transferred waveforms, as integers. 91 92 This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. 93 See `SpectrumDigitiserCard.get_waveforms()` for more information. 94 95 Returns: 96 waveforms (List[List[NDArray[int16]]]): A list lists of 1D numpy arrays, one inner list per acquisition, 97 and one array per enabled channel, in channel order. 98 """ 99 return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_raw_waveforms) 100 101 def _get_waveforms_in_threads( 102 self, get_waveforms_method: Callable[[SpectrumDigitiserCard], List[List[WAVEFORM_TYPE_VAR]]] 103 ) -> List[List[WAVEFORM_TYPE_VAR]]: 104 """Gets waveforms from child cards in separate threads, using the SpectrumDigitiserCard method provided.""" 105 106 card_ids_and_waveform_sets: Dict[str, list[list[WAVEFORM_TYPE_VAR]]] = {} 107 108 def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None: 109 this_cards_waveforms = get_waveforms_method(digitiser_card) 110 card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms 111 112 threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards] 113 114 for thread in threads: 115 thread.start() 116 for thread in threads: 117 thread.join() 118 119 waveform_sets_all_cards_ordered = [] 120 for n in range(self.batch_size): 121 waveforms_in_this_batch = [] 122 for card in self._child_cards: 123 waveforms_in_this_batch += card_ids_and_waveform_sets[str(card)][n] 124 waveform_sets_all_cards_ordered.append(waveforms_in_this_batch) 125 126 return waveform_sets_all_cards_ordered 127 128 def get_timestamp(self) -> Optional[datetime.datetime]: 129 """Get timestamp for the last acquisition""" 130 return self._triggering_card.get_timestamp() 131 132 def enable_timestamping(self) -> None: 133 self._triggering_card.enable_timestamping() 134 135 @property 136 def acquisition_length_in_samples(self) -> int: 137 """The currently set recording length, which should be the same for all child cards. If different recording 138 lengths are set, an exception is raised. See `SpectrumDigitiserCard.acquisition_length_in_samples` for more 139 information. 140 141 Returns: 142 length_in_samples: The currently set acquisition length in samples.""" 143 lengths = [] 144 for d in self._child_cards: 145 lengths.append(d.acquisition_length_in_samples) 146 return check_settings_constant_across_devices(lengths, __name__) 147 148 def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: 149 """Set a new recording length for all child cards. See `SpectrumDigitiserCard.set_acquisition_length_in_samples()` 150 for more information. 151 152 Args: 153 length_in_samples (int): The desired acquisition length in samples.""" 154 for d in self._child_cards: 155 d.set_acquisition_length_in_samples(length_in_samples) 156 157 @property 158 def post_trigger_length_in_samples(self) -> int: 159 """The number of samples recorded after a trigger is received. This should be consistent across all child 160 cards. If different values are found across the child cards, an exception is raised. See 161 `SpectrumDigitiserCard.post_trigger_length_in_samples` for more information. 162 163 Returns: 164 length_in_samples (int): The current post trigger length in samples. 165 """ 166 lengths = [] 167 for d in self._child_cards: 168 lengths.append(d.post_trigger_length_in_samples) 169 return check_settings_constant_across_devices(lengths, __name__) 170 171 def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: 172 """Set a new post trigger length for all child cards. See `SpectrumDigitiserCard.set_post_trigger_length_in_samples()` 173 for more information. 174 175 Args: 176 length_in_samples (int): The desired post trigger length in samples. 177 """ 178 for d in self._child_cards: 179 d.set_post_trigger_length_in_samples(length_in_samples) 180 181 @property 182 def acquisition_mode(self) -> AcquisitionMode: 183 """The acquisition mode, which should be the same for all child cards. If it's not, an exception is raised. 184 See `SpectrumDigitiserCard.acquisition_mode` for more information. 185 186 Returns: 187 mode (`AcquisitionMode`): The currently enabled acquisition mode. 188 """ 189 modes = [] 190 for d in self._child_cards: 191 modes.append(d.acquisition_mode) 192 return AcquisitionMode(check_settings_constant_across_devices([m.value for m in modes], __name__)) 193 194 def set_acquisition_mode(self, mode: AcquisitionMode) -> None: 195 """Change the acquisition mode for all child cards. See `SpectrumDigitiserCard.set_acquisition_mode()` for more 196 information. 197 198 Args: 199 mode (`AcquisitionMode`): The desired acquisition mode.""" 200 for d in self._child_cards: 201 d.set_acquisition_mode(mode) 202 203 @property 204 def batch_size(self) -> int: 205 batch_sizes = [] 206 for d in self._child_cards: 207 batch_sizes.append(d.batch_size) 208 return check_settings_constant_across_devices(batch_sizes, __name__) 209 210 def set_batch_size(self, batch_size: int) -> None: 211 for d in self._child_cards: 212 d.set_batch_size(batch_size) 213 214 def force_trigger(self) -> None: 215 for d in self._child_cards: 216 d.force_trigger() 217 218 @property 219 def type(self) -> CardType: 220 return self._child_cards[0].type 221 222 @property 223 def model_number(self) -> ModelNumber: 224 return self._child_cards[0].model_number 225 226 @property 227 def analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]: 228 """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for 229 more information. 230 231 Returns: 232 channels (Sequence[`SpectrumDigitiserAnalogChannelInterface`]): 233 A tuple of `SpectrumDigitiserAnalogChannelInterface` objects. 234 """ 235 return super().analog_channels
Composite class of SpectrumDigitiserCard
for controlling a StarHub digitiser device, for example the Spectrum
NetBox. StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the
child cards of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the
control of a StarHub device as if it were a single Spectrum card.
42 def __init__(self, device_number: int, child_cards: tuple[SpectrumDigitiserCard, ...], master_card_index: int): 43 """ 44 Args: 45 device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. 46 child_cards (Sequence[`SpectrumDigitiserCard`]): A list of `SpectrumCard` objects defining the child cards 47 located within the StarHub, correctly constructed with their IP addresses and/or device numbers. 48 master_card_index (int): The position within child_cards where the master card (the card which controls the 49 clock) is located. 50 """ 51 super().__init__(device_number=device_number, child_cards=child_cards, master_card_index=master_card_index) 52 self._acquisition_mode = self.acquisition_mode
Arguments:
- device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
- child_cards (Sequence[
SpectrumDigitiserCard
]): A list ofSpectrumCard
objects defining the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. - master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
54 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 55 """Create or provide `CardToPCDataTransferBuffer` objects for receiving acquired samples from the child cards. 56 If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See 57 `SpectrumDigitiserCard.define_transfer_buffer()` for more information 58 59 Args: 60 buffer (Optional[`CardToPCDataTransferBuffer`]): A list containing pre-constructed 61 `CardToPCDataTransferBuffer` objects, one for each child card. The size of the buffers should be chosen 62 according to the current number of active channels in each card and the acquisition length. 63 """ 64 if buffer: 65 for card, buff in zip(self._child_cards, buffer): 66 card.define_transfer_buffer([buff]) 67 else: 68 for card in self._child_cards: 69 card.define_transfer_buffer()
Create or provide CardToPCDataTransferBuffer
objects for receiving acquired samples from the child cards.
If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See
SpectrumDigitiserCard.define_transfer_buffer()
for more information
Arguments:
- buffer (Optional[
CardToPCDataTransferBuffer
]): A list containing pre-constructed CardToPCDataTransferBuffer
objects, one for each child card. The size of the buffers should be chosen- according to the current number of active channels in each card and the acquisition length.
71 def wait_for_acquisition_to_complete(self) -> None: 72 """Wait for each card to finish its acquisition. See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()` 73 for more information.""" 74 for card in self._child_cards: 75 card.wait_for_acquisition_to_complete()
Wait for each card to finish its acquisition. See SpectrumDigitiserCard.wait_for_acquisition_to_complete()
for more information.
77 def get_waveforms(self) -> List[List[NDArray[float64]]]: 78 """Get a list of the most recently transferred waveforms, as floating point voltages. 79 80 This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. 81 See `SpectrumDigitiserCard.get_waveforms()` for more information. 82 83 Returns: 84 waveforms (List[List[NDArray[float64]]]): A list lists of 1D numpy arrays, one inner list per acquisition, 85 and one array per enabled channel, in channel order. 86 """ 87 return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_waveforms)
Get a list of the most recently transferred waveforms, as floating point voltages.
This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
See SpectrumDigitiserCard.get_waveforms()
for more information.
Returns:
waveforms (List[List[NDArray[float64]]]): A list lists of 1D numpy arrays, one inner list per acquisition, and one array per enabled channel, in channel order.
89 def get_raw_waveforms(self) -> List[List[NDArray[int16]]]: 90 """Get a list of the most recently transferred waveforms, as integers. 91 92 This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. 93 See `SpectrumDigitiserCard.get_waveforms()` for more information. 94 95 Returns: 96 waveforms (List[List[NDArray[int16]]]): A list lists of 1D numpy arrays, one inner list per acquisition, 97 and one array per enabled channel, in channel order. 98 """ 99 return self._get_waveforms_in_threads(SpectrumDigitiserCard.get_raw_waveforms)
Get a list of the most recently transferred waveforms, as integers.
This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
See SpectrumDigitiserCard.get_waveforms()
for more information.
Returns:
waveforms (List[List[NDArray[int16]]]): A list lists of 1D numpy arrays, one inner list per acquisition, and one array per enabled channel, in channel order.
128 def get_timestamp(self) -> Optional[datetime.datetime]: 129 """Get timestamp for the last acquisition""" 130 return self._triggering_card.get_timestamp()
Get timestamp for the last acquisition
135 @property 136 def acquisition_length_in_samples(self) -> int: 137 """The currently set recording length, which should be the same for all child cards. If different recording 138 lengths are set, an exception is raised. See `SpectrumDigitiserCard.acquisition_length_in_samples` for more 139 information. 140 141 Returns: 142 length_in_samples: The currently set acquisition length in samples.""" 143 lengths = [] 144 for d in self._child_cards: 145 lengths.append(d.acquisition_length_in_samples) 146 return check_settings_constant_across_devices(lengths, __name__)
The currently set recording length, which should be the same for all child cards. If different recording
lengths are set, an exception is raised. See SpectrumDigitiserCard.acquisition_length_in_samples
for more
information.
Returns:
length_in_samples: The currently set acquisition length in samples.
148 def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: 149 """Set a new recording length for all child cards. See `SpectrumDigitiserCard.set_acquisition_length_in_samples()` 150 for more information. 151 152 Args: 153 length_in_samples (int): The desired acquisition length in samples.""" 154 for d in self._child_cards: 155 d.set_acquisition_length_in_samples(length_in_samples)
Set a new recording length for all child cards. See SpectrumDigitiserCard.set_acquisition_length_in_samples()
for more information.
Arguments:
- length_in_samples (int): The desired acquisition length in samples.
157 @property 158 def post_trigger_length_in_samples(self) -> int: 159 """The number of samples recorded after a trigger is received. This should be consistent across all child 160 cards. If different values are found across the child cards, an exception is raised. See 161 `SpectrumDigitiserCard.post_trigger_length_in_samples` for more information. 162 163 Returns: 164 length_in_samples (int): The current post trigger length in samples. 165 """ 166 lengths = [] 167 for d in self._child_cards: 168 lengths.append(d.post_trigger_length_in_samples) 169 return check_settings_constant_across_devices(lengths, __name__)
The number of samples recorded after a trigger is received. This should be consistent across all child
cards. If different values are found across the child cards, an exception is raised. See
SpectrumDigitiserCard.post_trigger_length_in_samples
for more information.
Returns:
length_in_samples (int): The current post trigger length in samples.
171 def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None: 172 """Set a new post trigger length for all child cards. See `SpectrumDigitiserCard.set_post_trigger_length_in_samples()` 173 for more information. 174 175 Args: 176 length_in_samples (int): The desired post trigger length in samples. 177 """ 178 for d in self._child_cards: 179 d.set_post_trigger_length_in_samples(length_in_samples)
Set a new post trigger length for all child cards. See SpectrumDigitiserCard.set_post_trigger_length_in_samples()
for more information.
Arguments:
- length_in_samples (int): The desired post trigger length in samples.
181 @property 182 def acquisition_mode(self) -> AcquisitionMode: 183 """The acquisition mode, which should be the same for all child cards. If it's not, an exception is raised. 184 See `SpectrumDigitiserCard.acquisition_mode` for more information. 185 186 Returns: 187 mode (`AcquisitionMode`): The currently enabled acquisition mode. 188 """ 189 modes = [] 190 for d in self._child_cards: 191 modes.append(d.acquisition_mode) 192 return AcquisitionMode(check_settings_constant_across_devices([m.value for m in modes], __name__))
The acquisition mode, which should be the same for all child cards. If it's not, an exception is raised.
See SpectrumDigitiserCard.acquisition_mode
for more information.
Returns:
mode (
AcquisitionMode
): The currently enabled acquisition mode.
194 def set_acquisition_mode(self, mode: AcquisitionMode) -> None: 195 """Change the acquisition mode for all child cards. See `SpectrumDigitiserCard.set_acquisition_mode()` for more 196 information. 197 198 Args: 199 mode (`AcquisitionMode`): The desired acquisition mode.""" 200 for d in self._child_cards: 201 d.set_acquisition_mode(mode)
Change the acquisition mode for all child cards. See SpectrumDigitiserCard.set_acquisition_mode()
for more
information.
Arguments:
- mode (
AcquisitionMode
): The desired acquisition mode.
226 @property 227 def analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]: 228 """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for 229 more information. 230 231 Returns: 232 channels (Sequence[`SpectrumDigitiserAnalogChannelInterface`]): 233 A tuple of `SpectrumDigitiserAnalogChannelInterface` objects. 234 """ 235 return super().analog_channels
A tuple containing of all the channels of the child cards of the hub. See AbstractSpectrumCard.channels
for
more information.
Returns:
channels (Sequence[
SpectrumDigitiserAnalogChannelInterface
]): A tuple ofSpectrumDigitiserAnalogChannelInterface
objects.
Inherited Members
- AbstractSpectrumDigitiser
- configure_acquisition
- execute_standard_single_acquisition
- execute_finite_fifo_acquisition
- execute_continuous_fifo_acquisition
- AbstractSpectrumStarHub
- disconnect
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- connected
- set_triggering_card
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- enabled_analog_channel_nums
- set_enabled_analog_channels
- transfer_buffers
- io_lines
- timeout_in_ms
- set_timeout_in_ms
- feature_list
- available_io_modes
- bytes_per_sample
35class MockSpectrumDigitiserCard(MockAbstractSpectrumDigitiser, MockAbstractSpectrumCard, SpectrumDigitiserCard): 36 """A mock spectrum card, for testing software written to use the `SpectrumDigitiserCard` class. 37 38 This class overrides methods of `SpectrumDigitiserCard` that communicate with hardware with mocked implementations, 39 allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. It overrides 40 methods to use to set up a mock 'on-device buffer' attribute into which a mock waveform source will write 41 samples. It also uses a MockTimestamper to generated timestamps for mock waveforms. 42 """ 43 44 def __init__( 45 self, 46 device_number: int, 47 model: ModelNumber, 48 mock_source_frame_rate_hz: float, 49 num_modules: int, 50 num_channels_per_module: int, 51 card_features: Optional[list[CardFeature]] = None, 52 advanced_card_features: Optional[list[AdvancedCardFeature]] = None, 53 ): 54 """ 55 Args: 56 device_number (int): The index of the mock device to create. Used to create a name for the device which is 57 used internally. 58 model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths. 59 mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing 60 data to the mock spectrum card. 61 num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this 62 is read from the device so does not need to be set. See the Spectrum documentation to work out how many 63 modules your hardware has. 64 num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On 65 real hardware, this is read from the device so does not need to be set. 66 card_features (list[CardFeature]): List of available features of the mock device 67 advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device 68 69 """ 70 71 super().__init__( 72 device_number=device_number, 73 model=model, 74 mock_source_frame_rate_hz=mock_source_frame_rate_hz, 75 num_modules=num_modules, 76 num_channels_per_module=num_channels_per_module, 77 card_type=CardType.SPCM_TYPE_AI, 78 card_features=card_features if card_features is not None else [], 79 advanced_card_features=advanced_card_features if advanced_card_features is not None else [], 80 ) 81 self._connect(self._visa_string) 82 self._acquisition_mode = self.acquisition_mode 83 self._previous_transfer_chunk_count = 0 84 self._param_dict[TRANSFER_CHUNK_COUNTER] = 0 85 86 def enable_timestamping(self) -> None: 87 self._timestamper: MockTimestamper = MockTimestamper(self, self._handle) 88 89 def set_acquisition_mode(self, mode: AcquisitionMode) -> None: 90 """Mock timestamper needs to be recreated if the acquisition mode is changed.""" 91 super().set_acquisition_mode(mode) 92 self._timestamper = MockTimestamper(self, self._handle) 93 94 def set_sample_rate_in_hz(self, rate: int) -> None: 95 """Mock timestamper needs to be recreated if the sample rate is changed.""" 96 super().set_sample_rate_in_hz(rate) 97 self._timestamper = MockTimestamper(self, self._handle) 98 99 def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: 100 """Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples. 101 See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal 102 attributes related to the mock on-device buffer can be set. 103 104 Args: 105 length_in_samples (int): Number of samples in each generated mock waveform 106 """ 107 super().set_acquisition_length_in_samples(length_in_samples) 108 109 def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: 110 """Set the channels to enable for the mock acquisition. See `SpectrumDigitiserCard` for more information. This 111 method is overridden here only so that the internal attributes related to the mock on-device buffer 112 can be set. 113 114 Args: 115 channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2]. 116 117 """ 118 if len(list(filter(lambda x: 0 <= x < len(self.analog_channels), channels_nums))) == len(channels_nums): 119 super().set_enabled_analog_channels(channels_nums) 120 else: 121 raise SpectrumSettingsMismatchError("Not enough channels in mock device configuration.") 122 123 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 124 """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. 125 126 See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it 127 does not write to any hardware device.""" 128 self._set_or_update_transfer_buffer_attribute(buffer) 129 130 def start_transfer(self) -> None: 131 """See `SpectrumDigitiserCard.start_transfer()`.""" 132 pass 133 134 def stop_transfer(self) -> None: 135 """See `SpectrumDigitiserCard.stop_transfer()`.""" 136 pass 137 138 def wait_for_transfer_chunk_to_complete(self) -> None: 139 """See `SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()`. This mock implementation blocks until a 140 new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER.""" 141 if self._transfer_buffer: 142 t0 = perf_counter() 143 t_elapsed = 0.0 144 while ( 145 self._previous_transfer_chunk_count == self._param_dict[TRANSFER_CHUNK_COUNTER] 146 ) and t_elapsed < MOCK_TRANSFER_TIMEOUT_IN_S: 147 sleep(0.1) 148 t_elapsed = perf_counter() - t0 149 self._previous_transfer_chunk_count = self._param_dict[TRANSFER_CHUNK_COUNTER] 150 else: 151 raise SpectrumNoTransferBufferDefined("No transfer in progress.") 152 153 def wait_for_acquisition_to_complete(self) -> None: 154 """See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`. This mock implementation blocks until a mock 155 acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out 156 according to the `self.timeout_ms attribute`.""" 157 if self._acquisition_thread is not None: 158 self._acquisition_thread.join(timeout=1e-3 * self.timeout_in_ms) 159 if self._acquisition_thread.is_alive(): 160 logger.warning("A timeout occurred while waiting for mock acquisition to complete.") 161 else: 162 logger.warning("No acquisition in progress. Wait for acquisition to complete has no effect")
A mock spectrum card, for testing software written to use the SpectrumDigitiserCard
class.
This class overrides methods of SpectrumDigitiserCard
that communicate with hardware with mocked implementations,
allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. It overrides
methods to use to set up a mock 'on-device buffer' attribute into which a mock waveform source will write
samples. It also uses a MockTimestamper to generated timestamps for mock waveforms.
44 def __init__( 45 self, 46 device_number: int, 47 model: ModelNumber, 48 mock_source_frame_rate_hz: float, 49 num_modules: int, 50 num_channels_per_module: int, 51 card_features: Optional[list[CardFeature]] = None, 52 advanced_card_features: Optional[list[AdvancedCardFeature]] = None, 53 ): 54 """ 55 Args: 56 device_number (int): The index of the mock device to create. Used to create a name for the device which is 57 used internally. 58 model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths. 59 mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing 60 data to the mock spectrum card. 61 num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this 62 is read from the device so does not need to be set. See the Spectrum documentation to work out how many 63 modules your hardware has. 64 num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On 65 real hardware, this is read from the device so does not need to be set. 66 card_features (list[CardFeature]): List of available features of the mock device 67 advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device 68 69 """ 70 71 super().__init__( 72 device_number=device_number, 73 model=model, 74 mock_source_frame_rate_hz=mock_source_frame_rate_hz, 75 num_modules=num_modules, 76 num_channels_per_module=num_channels_per_module, 77 card_type=CardType.SPCM_TYPE_AI, 78 card_features=card_features if card_features is not None else [], 79 advanced_card_features=advanced_card_features if advanced_card_features is not None else [], 80 ) 81 self._connect(self._visa_string) 82 self._acquisition_mode = self.acquisition_mode 83 self._previous_transfer_chunk_count = 0 84 self._param_dict[TRANSFER_CHUNK_COUNTER] = 0
Arguments:
- device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally.
- model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths.
- mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing data to the mock spectrum card.
- num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has.
- num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set.
- card_features (list[CardFeature]): List of available features of the mock device
- advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
89 def set_acquisition_mode(self, mode: AcquisitionMode) -> None: 90 """Mock timestamper needs to be recreated if the acquisition mode is changed.""" 91 super().set_acquisition_mode(mode) 92 self._timestamper = MockTimestamper(self, self._handle)
Mock timestamper needs to be recreated if the acquisition mode is changed.
94 def set_sample_rate_in_hz(self, rate: int) -> None: 95 """Mock timestamper needs to be recreated if the sample rate is changed.""" 96 super().set_sample_rate_in_hz(rate) 97 self._timestamper = MockTimestamper(self, self._handle)
Mock timestamper needs to be recreated if the sample rate is changed.
99 def set_acquisition_length_in_samples(self, length_in_samples: int) -> None: 100 """Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples. 101 See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal 102 attributes related to the mock on-device buffer can be set. 103 104 Args: 105 length_in_samples (int): Number of samples in each generated mock waveform 106 """ 107 super().set_acquisition_length_in_samples(length_in_samples)
Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples.
See SpectrumDigitiserCard
for more information. This method is overridden here only so that the internal
attributes related to the mock on-device buffer can be set.
Arguments:
- length_in_samples (int): Number of samples in each generated mock waveform
109 def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: 110 """Set the channels to enable for the mock acquisition. See `SpectrumDigitiserCard` for more information. This 111 method is overridden here only so that the internal attributes related to the mock on-device buffer 112 can be set. 113 114 Args: 115 channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2]. 116 117 """ 118 if len(list(filter(lambda x: 0 <= x < len(self.analog_channels), channels_nums))) == len(channels_nums): 119 super().set_enabled_analog_channels(channels_nums) 120 else: 121 raise SpectrumSettingsMismatchError("Not enough channels in mock device configuration.")
Set the channels to enable for the mock acquisition. See SpectrumDigitiserCard
for more information. This
method is overridden here only so that the internal attributes related to the mock on-device buffer
can be set.
Arguments:
- channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2].
123 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 124 """Create or provide a `TransferBuffer` object for receiving acquired samples from the device. 125 126 See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it 127 does not write to any hardware device.""" 128 self._set_or_update_transfer_buffer_attribute(buffer)
Create or provide a TransferBuffer
object for receiving acquired samples from the device.
See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.
138 def wait_for_transfer_chunk_to_complete(self) -> None: 139 """See `SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()`. This mock implementation blocks until a 140 new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER.""" 141 if self._transfer_buffer: 142 t0 = perf_counter() 143 t_elapsed = 0.0 144 while ( 145 self._previous_transfer_chunk_count == self._param_dict[TRANSFER_CHUNK_COUNTER] 146 ) and t_elapsed < MOCK_TRANSFER_TIMEOUT_IN_S: 147 sleep(0.1) 148 t_elapsed = perf_counter() - t0 149 self._previous_transfer_chunk_count = self._param_dict[TRANSFER_CHUNK_COUNTER] 150 else: 151 raise SpectrumNoTransferBufferDefined("No transfer in progress.")
See SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()
. This mock implementation blocks until a
new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER.
153 def wait_for_acquisition_to_complete(self) -> None: 154 """See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`. This mock implementation blocks until a mock 155 acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out 156 according to the `self.timeout_ms attribute`.""" 157 if self._acquisition_thread is not None: 158 self._acquisition_thread.join(timeout=1e-3 * self.timeout_in_ms) 159 if self._acquisition_thread.is_alive(): 160 logger.warning("A timeout occurred while waiting for mock acquisition to complete.") 161 else: 162 logger.warning("No acquisition in progress. Wait for acquisition to complete has no effect")
See SpectrumDigitiserCard.wait_for_acquisition_to_complete()
. This mock implementation blocks until a mock
acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out
according to the self.timeout_ms attribute
.
Inherited Members
- AbstractSpectrumDigitiser
- configure_acquisition
- execute_standard_single_acquisition
- execute_finite_fifo_acquisition
- execute_continuous_fifo_acquisition
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDigitiser
- start
- stop
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDevice
- write_to_spectrum_device_register
- read_spectrum_device_register
- SpectrumDigitiserCard
- get_raw_waveforms
- get_waveforms
- get_timestamp
- acquisition_length_in_samples
- post_trigger_length_in_samples
- set_post_trigger_length_in_samples
- number_of_averages
- set_number_of_averages
- acquisition_mode
- batch_size
- set_batch_size
- AbstractSpectrumCard
- model_number
- reconnect
- status
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
214class MockSpectrumDigitiserStarHub(MockAbstractSpectrumStarHub, SpectrumDigitiserStarHub): 215 """A mock spectrum StarHub, for testing software written to use the `SpectrumStarHub` class. 216 217 Overrides methods of `SpectrumStarHub` and `AbstractSpectrumDigitiser` that communicate with hardware with mocked 218 implementations allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during 219 CI.""" 220 221 def __init__(self, **kwargs: Any): 222 """ 223 Args: 224 child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumDigitiserCard` objects defining the 225 properties of the child cards located within the mock hub. 226 master_card_index (int): The position within child_cards where the master card (the card which controls the 227 clock) is located. 228 """ 229 super().__init__(param_dict=None, **kwargs) 230 self._visa_string = "/mock" + self._visa_string 231 self._connect(self._visa_string) 232 self._acquisition_mode = self.acquisition_mode 233 234 def start(self) -> None: 235 """Start a mock acquisition 236 237 See `AbstractSpectrumDevice.start()`. With a hardware device, StarHub's only need to be sent a single 238 instruction to start acquisition, which they automatically relay to their child cards - hence why 239 `start` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and 240 `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock `implementation`, each card's acquisition is 241 started individually. 242 243 """ 244 for card in self._child_cards: 245 card.start() 246 247 def stop(self) -> None: 248 """Stop a mock acquisition 249 250 See `AbstractSpectrumDevice.stop_acquisition`. With a hardware device, StarHub's only need to be sent a single 251 instruction to stop acquisition, which they automatically relay to their child cards - hence why 252 `stop_acquisition()` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and 253 `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock implementation, each card's acquisition is 254 stopped individually. 255 256 """ 257 for card in self._child_cards: 258 card.stop()
A mock spectrum StarHub, for testing software written to use the SpectrumStarHub
class.
Overrides methods of SpectrumStarHub
and AbstractSpectrumDigitiser
that communicate with hardware with mocked
implementations allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during
CI.
221 def __init__(self, **kwargs: Any): 222 """ 223 Args: 224 child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumDigitiserCard` objects defining the 225 properties of the child cards located within the mock hub. 226 master_card_index (int): The position within child_cards where the master card (the card which controls the 227 clock) is located. 228 """ 229 super().__init__(param_dict=None, **kwargs) 230 self._visa_string = "/mock" + self._visa_string 231 self._connect(self._visa_string) 232 self._acquisition_mode = self.acquisition_mode
Arguments:
- child_cards (Sequence[
MockSpectrumDigitiserCard
]): A list ofMockSpectrumDigitiserCard
objects defining the properties of the child cards located within the mock hub. - master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
234 def start(self) -> None: 235 """Start a mock acquisition 236 237 See `AbstractSpectrumDevice.start()`. With a hardware device, StarHub's only need to be sent a single 238 instruction to start acquisition, which they automatically relay to their child cards - hence why 239 `start` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and 240 `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock `implementation`, each card's acquisition is 241 started individually. 242 243 """ 244 for card in self._child_cards: 245 card.start()
Start a mock acquisition
See AbstractSpectrumDevice.start()
. With a hardware device, StarHub's only need to be sent a single
instruction to start acquisition, which they automatically relay to their child cards - hence why
start
is implemented in AbstractSpectrumDevice
(base class to both SpectrumDigitiserCard
and
SpectrumStarHub
) rather than in SpectrumStarHub
. In this mock implementation
, each card's acquisition is
started individually.
247 def stop(self) -> None: 248 """Stop a mock acquisition 249 250 See `AbstractSpectrumDevice.stop_acquisition`. With a hardware device, StarHub's only need to be sent a single 251 instruction to stop acquisition, which they automatically relay to their child cards - hence why 252 `stop_acquisition()` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and 253 `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock implementation, each card's acquisition is 254 stopped individually. 255 256 """ 257 for card in self._child_cards: 258 card.stop()
Stop a mock acquisition
See AbstractSpectrumDevice.stop_acquisition
. With a hardware device, StarHub's only need to be sent a single
instruction to stop acquisition, which they automatically relay to their child cards - hence why
stop_acquisition()
is implemented in AbstractSpectrumDevice
(base class to both SpectrumDigitiserCard
and
SpectrumStarHub
) rather than in SpectrumStarHub
. In this mock implementation, each card's acquisition is
stopped individually.
Inherited Members
- AbstractSpectrumDigitiser
- configure_acquisition
- execute_standard_single_acquisition
- execute_finite_fifo_acquisition
- execute_continuous_fifo_acquisition
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDevice
- write_to_spectrum_device_register
- read_spectrum_device_register
- SpectrumDigitiserStarHub
- define_transfer_buffer
- wait_for_acquisition_to_complete
- get_waveforms
- get_raw_waveforms
- get_timestamp
- enable_timestamping
- acquisition_length_in_samples
- set_acquisition_length_in_samples
- post_trigger_length_in_samples
- set_post_trigger_length_in_samples
- acquisition_mode
- set_acquisition_mode
- batch_size
- set_batch_size
- force_trigger
- type
- model_number
- analog_channels
- AbstractSpectrumStarHub
- disconnect
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- connected
- set_triggering_card
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- enabled_analog_channel_nums
- set_enabled_analog_channels
- transfer_buffers
- io_lines
- timeout_in_ms
- set_timeout_in_ms
- feature_list
- available_io_modes
- bytes_per_sample
23class AbstractSpectrumDigitiser( 24 AbstractSpectrumDevice[SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface], 25 SpectrumDigitiserInterface, 26 ABC, 27): 28 """Abstract superclass which implements methods common to all Spectrum digitiser devices. Instances of this class 29 cannot be constructed directly. Instead, construct instances of the concrete classes (`SpectrumDigitiserCard`, 30 `SpectrumDigitiserStarHub` or their mock equivalents) which inherit the methods defined here. Note that 31 the mock devices override several of the methods defined here.""" 32 33 def configure_acquisition(self, settings: AcquisitionSettings) -> None: 34 """Apply all the settings contained in an `AcquisitionSettings` dataclass to the device. 35 36 Args: 37 settings (`AcquisitionSettings`): An `AcquisitionSettings` dataclass containing the setting values to apply. 38 """ 39 if settings.batch_size > 1 and settings.acquisition_mode == AcquisitionMode.SPC_REC_STD_SINGLE: 40 raise ValueError("In standard single mode, only 1 acquisition can be downloaded at a time.") 41 self._acquisition_mode = settings.acquisition_mode 42 self.set_batch_size(settings.batch_size) 43 self.set_acquisition_mode(settings.acquisition_mode) 44 self.set_sample_rate_in_hz(settings.sample_rate_in_hz) 45 self.set_acquisition_length_in_samples(settings.acquisition_length_in_samples) 46 self.set_post_trigger_length_in_samples( 47 settings.acquisition_length_in_samples - settings.pre_trigger_length_in_samples 48 ) 49 self.set_timeout_in_ms(settings.timeout_in_ms) 50 self.set_enabled_analog_channels(settings.enabled_channels) 51 52 # Apply channel dependent settings 53 for channel_num, v_range, v_offset, impedance in zip( 54 self.enabled_analog_channel_nums, 55 settings.vertical_ranges_in_mv, 56 settings.vertical_offsets_in_percent, 57 settings.input_impedances, 58 ): 59 channel = self.analog_channels[channel_num] 60 channel.set_vertical_range_in_mv(v_range) 61 channel.set_vertical_offset_in_percent(v_offset) 62 channel.set_input_impedance(impedance) 63 64 # Only some hardware has software programmable input coupling, so coupling can be None 65 if settings.input_couplings is not None: 66 for channel, coupling in zip(self.analog_channels, settings.input_couplings): 67 channel.set_input_coupling(coupling) 68 69 # Only some hardware has software programmable input paths, so it can be None 70 if settings.input_paths is not None: 71 for channel, path in zip(self.analog_channels, settings.input_paths): 72 channel.set_input_path(path) 73 74 # Write the configuration to the card 75 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) 76 77 if settings.timestamping_enabled: 78 self.enable_timestamping() 79 80 def execute_standard_single_acquisition(self, raw: bool = False) -> Measurement: 81 """Carry out a single measurement in standard single mode and return the acquired waveforms. 82 83 This method automatically carries out a standard single mode acquisition, including handling the creation 84 of a `TransferBuffer` and the retrieval of the acquired waveforms. After being called, it will wait until a 85 trigger event is received before carrying out the acquisition and then transferring and returning the acquired 86 waveforms. The device must be configured in SPC_REC_STD_SINGLE acquisition mode. 87 88 Args: 89 raw (bool, optional): Set to true to obtain raw (i.e. 16-bit integer) waveforms, instead of floating point 90 voltage waveforms. 91 92 Returns: 93 measurement (Measurement): A Measurement object. The `.waveforms` attribute of `measurement` will be a list 94 of 1D NumPy arrays, each array containing the waveform data received on one channel, in channel order. 95 The Waveform object also has a timestamp attribute, which (if timestamping was enabled in acquisition 96 settings) contains the time at which the acquisition was triggered. 97 """ 98 if self._acquisition_mode != AcquisitionMode.SPC_REC_STD_SINGLE: 99 raise SpectrumWrongAcquisitionMode( 100 "Set the acquisition mode to SPC_REC_STD_SINGLE using " 101 "configure_acquisition() or set_acquisition_mode() before executing " 102 "a standard single mode acquisition." 103 ) 104 self.start() 105 self.wait_for_acquisition_to_complete() 106 self.define_transfer_buffer() 107 self.start_transfer() 108 self.wait_for_transfer_chunk_to_complete() 109 waveforms: list[RawWaveformType] | list[VoltageWaveformType] = ( 110 self.get_raw_waveforms()[0] if raw else self.get_waveforms()[0] 111 ) 112 self.stop() # Only strictly required for Mock devices. Should not affect hardware. 113 return Measurement(waveforms=waveforms, timestamp=self.get_timestamp()) 114 115 def execute_finite_fifo_acquisition(self, num_measurements: int, raw: bool = False) -> List[Measurement]: 116 """Carry out a finite number of FIFO mode measurements and then stop the acquisitions. 117 118 This method automatically carries out a defined number of measurement in Multi FIFO mode, including handling the 119 creation of a `TransferBuffer`, streaming the acquired waveforms to the PC, terminating the acquisition and 120 returning the acquired waveforms. After being called, it will wait for the requested number of triggers to be 121 received, generating the correct number of measurements. It retrieves each measurement's waveforms from the 122 `TransferBuffer` as they arrive. Once the requested number of measurements have been received, the acquisition 123 is terminated and the waveforms are returned. The device must be configured in SPC_REC_FIFO_MULTI or 124 SPC_REC_FIFO_AVERAGE acquisition mode. 125 126 Args: 127 num_measurements (int): The number of measurements to carry out. 128 raw (bool, optional): Set to true to obtain raw (i.e. 16-bit integer) waveforms, instead of floating point 129 voltage waveforms. 130 Returns: 131 measurements (List[Measurement]): A list of Measurement objects with length `num_measurements`. Each 132 Measurement object has a `waveforms` attribute containing a list of 1D NumPy arrays. Each array is a 133 waveform acquired from one channel. The arrays are in channel order. The Waveform objects also have a 134 timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at 135 which the acquisition was triggered. 136 """ 137 if (num_measurements % self.batch_size) != 0: 138 raise ValueError( 139 "Number of measurements in a finite FIFO acquisition must be a multiple of the " 140 " batch size configured using AbstractSpectrumDigitiser.configure_acquisition()." 141 ) 142 self.execute_continuous_fifo_acquisition() 143 measurements = [] 144 for _ in range(num_measurements // self.batch_size): 145 waveforms: list[list[RawWaveformType]] | list[list[VoltageWaveformType]] = ( 146 self.get_raw_waveforms() if raw else self.get_waveforms() 147 ) 148 measurements += [Measurement(waveforms=frame, timestamp=self.get_timestamp()) for frame in waveforms] 149 self.stop() 150 return measurements 151 152 def execute_continuous_fifo_acquisition(self) -> None: 153 """Start a continuous FIFO mode acquisition. 154 155 This method automatically starts acquiring and streaming samples in FIFO mode, including handling the 156 creation of a `TransferBuffer` and streaming the acquired waveforms to the PC. It will return almost 157 instantaneously. The acquired waveforms must then be read out of the transfer buffer in a loop using the 158 `get_waveforms()` method. Waveforms must be read at least as fast as they are being acquired. 159 The FIFO acquisition and streaming will continue until `stop_acquisition()` is called. The device 160 must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode.""" 161 if self._acquisition_mode not in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE): 162 raise SpectrumWrongAcquisitionMode( 163 "Set the acquisition mode to SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE using " 164 "configure_acquisition() or set_acquisition_mode() before executing " 165 "a fifo mode acquisition." 166 ) 167 self.define_transfer_buffer() 168 self.start() 169 self.start_transfer()
Abstract superclass which implements methods common to all Spectrum digitiser devices. Instances of this class
cannot be constructed directly. Instead, construct instances of the concrete classes (SpectrumDigitiserCard
,
SpectrumDigitiserStarHub
or their mock equivalents) which inherit the methods defined here. Note that
the mock devices override several of the methods defined here.
Inherited Members
- spectrumdevice.devices.digitiser.digitiser_interface.SpectrumDigitiserInterface
- wait_for_acquisition_to_complete
- configure_acquisition
- execute_standard_single_acquisition
- execute_finite_fifo_acquisition
- execute_continuous_fifo_acquisition
- get_raw_waveforms
- get_waveforms
- get_timestamp
- enable_timestamping
- acquisition_length_in_samples
- set_acquisition_length_in_samples
- post_trigger_length_in_samples
- set_post_trigger_length_in_samples
- acquisition_mode
- set_acquisition_mode
- batch_size
- set_batch_size
- AbstractSpectrumDevice
- reset
- start
- stop
- configure_trigger
- configure_channel_pairing
- write_to_spectrum_device_register
- read_spectrum_device_register
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- connected
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- disconnect
- transfer_buffers
- define_transfer_buffer
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- available_io_modes
- feature_list
- timeout_in_ms
- set_timeout_in_ms
- force_trigger
- bytes_per_sample
- type
- model_number
40class AbstractSpectrumStarHub( 41 AbstractSpectrumDevice, Generic[CardType, AnalogChannelInterfaceType, IOLineInterfaceType], ABC 42): 43 """Composite abstract class of `AbstractSpectrumCard` implementing methods common to all StarHubs. StarHubs are 44 composites of more than one Spectrum card. Acquisition and generation from the child cards of a StarHub 45 is synchronised, aggregating the channels of all child cards.""" 46 47 def __init__(self, device_number: int, child_cards: Sequence[CardType], master_card_index: int, **kwargs: Any): 48 """ 49 Args: 50 device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. 51 child_cards (Sequence[`SpectrumDeviceInterface`]): A list of objects representing the child cards located 52 within the StarHub, correctly constructed with their IP addresses and/or device numbers. 53 master_card_index (int): The position within child_cards where the master card (the card which controls the 54 clock) is located. 55 """ 56 self._child_cards: Sequence[CardType] = child_cards 57 self._master_card = child_cards[master_card_index] 58 self._triggering_card = child_cards[master_card_index] 59 child_card_logical_indices = (2**n for n in range(len(self._child_cards))) 60 self._visa_string = f"sync{device_number}" 61 self._connect(self._visa_string) 62 all_cards_binary_mask = reduce(or_, child_card_logical_indices) 63 self.write_to_spectrum_device_register(SPC_SYNC_ENABLEMASK, all_cards_binary_mask) 64 65 def disconnect(self) -> None: 66 """Disconnects from each child card and terminates connection to the hub itself.""" 67 if self._connected: 68 destroy_handle(self._handle) 69 for card in self._child_cards: 70 card.disconnect() 71 self._connected = False 72 73 def reconnect(self) -> None: 74 """Reconnects to the hub after a `disconnect()`, and reconnects to each child card.""" 75 self._connect(self._visa_string) 76 for card in self._child_cards: 77 card.reconnect() 78 79 @property 80 def status(self) -> DEVICE_STATUS_TYPE: 81 """The statuses of each child card, in a list. See `SpectrumDigitiserCard.status` for more information. 82 Returns: 83 statuses (List[List[`CardStatus`]]): A list of lists of `CardStatus` (each card has a list of statuses). 84 """ 85 return [card.status[0] for card in self._child_cards] 86 87 def start_transfer(self) -> None: 88 """Start the transfer of data between the on-device buffer of each child card and its `TransferBuffer`. See 89 `AbstractSpectrumCard.start_transfer()` for more information.""" 90 for card in self._child_cards: 91 card.start_transfer() 92 93 def stop_transfer(self) -> None: 94 """Stop the transfer of data between each card and its `TransferBuffer`. See 95 `AbstractSpectrumCard.stop_transfer()` for more information.""" 96 for card in self._child_cards: 97 card.stop_transfer() 98 99 def wait_for_transfer_chunk_to_complete(self) -> None: 100 """Wait for all cards to stop transferring data to/from their `TransferBuffers`. See 101 `AbstractSpectrumCard.wait_for_transfer_to_complete()` for more information.""" 102 for card in self._child_cards: 103 card.wait_for_transfer_chunk_to_complete() 104 105 @property 106 def connected(self) -> bool: 107 """True if the StarHub is connected, False if not.""" 108 return self._connected 109 110 def set_triggering_card(self, card_index: int) -> None: 111 """Change the index of the child card responsible for receiving a trigger. During construction, this is set 112 equal to the index of the master card but in some situations it may be necessary to change it. 113 114 Args: 115 card_index (int): The index of the StarHub's triggering card within the list of child cards provided on 116 __init__(). 117 """ 118 self._triggering_card = self._child_cards[card_index] 119 120 @property 121 def clock_mode(self) -> ClockMode: 122 """The clock mode currently configured on the master card. 123 124 Returns: 125 mode (`ClockMode`): The currently configured clock mode.""" 126 return self._master_card.clock_mode 127 128 def set_clock_mode(self, mode: ClockMode) -> None: 129 """Change the clock mode configured on the master card. 130 131 Args: 132 mode (`ClockMode`): The desired clock mode.""" 133 self._master_card.set_clock_mode(mode) 134 135 @property 136 def sample_rate_in_hz(self) -> int: 137 """The sample rate configured on the master card. 138 139 Returns: 140 rate (int): The current sample rate of the master card in Hz. 141 """ 142 return self._master_card.sample_rate_in_hz 143 144 def set_sample_rate_in_hz(self, rate: int) -> None: 145 """Change the sample rate of the child cards (including the master card). 146 Args: 147 rate (int): The desired sample rate of the child cards in Hz. 148 """ 149 for card in self._child_cards: 150 card.set_sample_rate_in_hz(rate) 151 152 @property 153 def trigger_sources(self) -> List[TriggerSource]: 154 """The trigger sources configured on the triggering card, which by default is the master card. See 155 `AbstractSpectrumCard.trigger_sources()` for more information. 156 157 Returns: 158 sources (List[`TriggerSource`]): A list of the currently enabled trigger sources.""" 159 return self._triggering_card.trigger_sources 160 161 def set_trigger_sources(self, sources: List[TriggerSource]) -> None: 162 """Change the trigger sources configured on the triggering card, which by default is the master card. See 163 `AbstractSpectrumCard.trigger_sources()` for more information. 164 165 Args: 166 sources (List[`TriggerSource`]): The trigger sources to enable, in a list.""" 167 self._triggering_card.set_trigger_sources(sources) 168 for card in self._child_cards: 169 if card is not self._triggering_card: 170 card.set_trigger_sources([TriggerSource.SPC_TMASK_NONE]) 171 172 @property 173 def external_trigger_mode(self) -> ExternalTriggerMode: 174 """The trigger mode configured on the triggering card, which by default is the master card. See 175 `AbstractSpectrumCard.external_trigger_mode()` for more information. 176 177 Returns: 178 mode (`ExternalTriggerMode`): The currently set external trigger mode. 179 """ 180 return self._triggering_card.external_trigger_mode 181 182 def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: 183 """Change the trigger mode configured on the triggering card, which by default is the master card. See 184 `AbstractSpectrumCard.set_external_trigger_mode()` for more information. 185 186 Args: 187 mode (`ExternalTriggerMode`): The desired external trigger mode.""" 188 self._triggering_card.set_external_trigger_mode(mode) 189 190 @property 191 def external_trigger_level_in_mv(self) -> int: 192 """The external trigger level configured on the triggering card, which by default is the master card. See 193 `AbstractSpectrumCard.external_trigger_level_mv()` for more information. 194 195 Returns: 196 level (int): The external trigger level in mV. 197 """ 198 return self._triggering_card.external_trigger_level_in_mv 199 200 def set_external_trigger_level_in_mv(self, level: int) -> None: 201 """Change the external trigger level configured on the triggering card, which by default is the master card. 202 See `AbstractSpectrumCard.set_external_trigger_level_mv()` for more information. 203 204 Args: 205 level (int): The desired external trigger level in mV. 206 """ 207 self._triggering_card.set_external_trigger_level_in_mv(level) 208 209 @property 210 def external_trigger_pulse_width_in_samples(self) -> int: 211 """The trigger pulse width (samples) configured on the triggering card, which by default is the master card. 212 See `AbstractSpectrumCard.external_trigger_pulse_width_in_samples()` for more information. 213 214 Returns: 215 width (int): The current trigger pulse width in samples. 216 """ 217 return self._triggering_card.external_trigger_pulse_width_in_samples 218 219 def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: 220 """Change the trigger pulse width (samples) configured on the triggering card, which by default is the master 221 card. See `AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()` for more information. 222 223 Args: 224 width (int): The desired trigger pulse width in samples. 225 """ 226 self._triggering_card.set_external_trigger_pulse_width_in_samples(width) 227 228 def apply_channel_enabling(self) -> None: 229 """Apply the enabled channels chosen using `set_enable_channels()`. This happens automatically and does not 230 usually need to be called.""" 231 for d in self._child_cards: 232 d.apply_channel_enabling() 233 234 @property 235 def enabled_analog_channel_nums(self) -> List[int]: 236 """The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total 237 number of channels available to the hub). 238 239 Returns: 240 channel_nums (List[int]): The currently enabled channel indices. 241 """ 242 enabled_channels = [] 243 n_channels_in_previous_card = 0 244 for card in self._child_cards: 245 enabled_channels += [ 246 channel_num + n_channels_in_previous_card for channel_num in card.enabled_analog_channel_nums 247 ] 248 n_channels_in_previous_card = len(card.analog_channels) 249 return enabled_channels 250 251 def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: 252 """Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total 253 number of channels available to the hub). 254 255 Returns: 256 channel_nums (List[int]): The indices to enable. 257 """ 258 channels_nums.sort() 259 channels_to_enable_all_cards = channels_nums 260 261 for child_card in self._child_cards: 262 n_channels_in_card = len(child_card.analog_channels) 263 channels_to_enable_this_card = list(set(arange(n_channels_in_card)) & set(channels_to_enable_all_cards)) 264 num_channels_to_enable_this_card = len(channels_to_enable_this_card) 265 child_card.set_enabled_analog_channels(channels_to_enable_this_card) 266 channels_to_enable_all_cards = [ 267 num - n_channels_in_card for num in channels_nums[num_channels_to_enable_this_card:] 268 ] 269 270 @property 271 def transfer_buffers(self) -> List[TransferBuffer]: 272 """The `TransferBuffer`s of all the child cards of the hub. See `AbstractSpectrumCard.transfer_buffers` for more 273 information. 274 275 Returns: 276 buffers (List[`TransferBuffer`]): A list of the transfer buffers for each child card.""" 277 return [card.transfer_buffers[0] for card in self._child_cards] 278 279 @property 280 def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: 281 """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for 282 more information. 283 284 Returns: 285 channels (Sequence[`SpectrumChannelInterface`]): A tuple of `SpectrumDigitiserChannel` objects. 286 """ 287 channels: List[AnalogChannelInterfaceType] = [] 288 for device in self._child_cards: 289 channels += device.analog_channels 290 return tuple(channels) 291 292 @property 293 def io_lines(self) -> Sequence[IOLineInterfaceType]: 294 """A tuple containing of all the Multipurpose IO Lines of the child cards of the hub. 295 296 Returns: 297 channels (Sequence[`SpectrumIOLineInterface`]): A tuple of `SpectrumIOLineInterface` objects. 298 """ 299 io_lines: List[IOLineInterfaceType] = [] 300 for device in self._child_cards: 301 io_lines += device.io_lines 302 return tuple(io_lines) # todo: this is probably wrong. I don't think both cards in a netbox have IO lines 303 304 @property 305 def timeout_in_ms(self) -> int: 306 """The time for which the card will wait for a trigger to be received after a device has started 307 before returning an error. This should be the same for all child cards. If it's not, an exception is raised. 308 309 Returns: 310 timeout_ms (int): The currently set timeout in ms. 311 """ 312 timeouts = [] 313 for d in self._child_cards: 314 timeouts.append(d.timeout_in_ms) 315 return check_settings_constant_across_devices(timeouts, __name__) 316 317 def set_timeout_in_ms(self, timeout_ms: int) -> None: 318 """Change the timeout value for all child cards. 319 320 Args: 321 timeout_ms (int): The desired timeout setting in seconds.""" 322 for d in self._child_cards: 323 d.set_timeout_in_ms(timeout_ms) 324 325 @property 326 def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]: 327 """Get a list of the features of the child cards. See `CardFeature`, `AdvancedCardFeature` and the Spectrum 328 documentation for more information. 329 330 Returns: 331 features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A list of tuples, one per child 332 card. Each tuple contains a list of features and a list of advanced features for that card. 333 """ 334 return [card.feature_list[0] for card in self._child_cards] 335 336 @property 337 def available_io_modes(self) -> AvailableIOModes: 338 """For each multipurpose IO line on the master card, read the available modes. See `IOLineMode` and the Spectrum 339 Documentation for all possible available modes and their meanings. 340 341 Returns: 342 modes (AvailableIOModes): An `AvailableIOModes` dataclass containing the modes available for each IO line. 343 """ 344 return self._master_card.available_io_modes 345 346 @property 347 def bytes_per_sample(self) -> int: 348 bytes_per_sample_each_card = [] 349 for d in self._child_cards: 350 bytes_per_sample_each_card.append(d.bytes_per_sample) 351 return check_settings_constant_across_devices(bytes_per_sample_each_card, __name__) 352 353 def __str__(self) -> str: 354 return f"StarHub {self._visa_string}"
Composite abstract class of AbstractSpectrumCard
implementing methods common to all StarHubs. StarHubs are
composites of more than one Spectrum card. Acquisition and generation from the child cards of a StarHub
is synchronised, aggregating the channels of all child cards.
47 def __init__(self, device_number: int, child_cards: Sequence[CardType], master_card_index: int, **kwargs: Any): 48 """ 49 Args: 50 device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0. 51 child_cards (Sequence[`SpectrumDeviceInterface`]): A list of objects representing the child cards located 52 within the StarHub, correctly constructed with their IP addresses and/or device numbers. 53 master_card_index (int): The position within child_cards where the master card (the card which controls the 54 clock) is located. 55 """ 56 self._child_cards: Sequence[CardType] = child_cards 57 self._master_card = child_cards[master_card_index] 58 self._triggering_card = child_cards[master_card_index] 59 child_card_logical_indices = (2**n for n in range(len(self._child_cards))) 60 self._visa_string = f"sync{device_number}" 61 self._connect(self._visa_string) 62 all_cards_binary_mask = reduce(or_, child_card_logical_indices) 63 self.write_to_spectrum_device_register(SPC_SYNC_ENABLEMASK, all_cards_binary_mask)
Arguments:
- device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
- child_cards (Sequence[
SpectrumDeviceInterface
]): A list of objects representing the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers. - master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
65 def disconnect(self) -> None: 66 """Disconnects from each child card and terminates connection to the hub itself.""" 67 if self._connected: 68 destroy_handle(self._handle) 69 for card in self._child_cards: 70 card.disconnect() 71 self._connected = False
Disconnects from each child card and terminates connection to the hub itself.
73 def reconnect(self) -> None: 74 """Reconnects to the hub after a `disconnect()`, and reconnects to each child card.""" 75 self._connect(self._visa_string) 76 for card in self._child_cards: 77 card.reconnect()
Reconnects to the hub after a disconnect()
, and reconnects to each child card.
79 @property 80 def status(self) -> DEVICE_STATUS_TYPE: 81 """The statuses of each child card, in a list. See `SpectrumDigitiserCard.status` for more information. 82 Returns: 83 statuses (List[List[`CardStatus`]]): A list of lists of `CardStatus` (each card has a list of statuses). 84 """ 85 return [card.status[0] for card in self._child_cards]
The statuses of each child card, in a list. See SpectrumDigitiserCard.status
for more information.
Returns:
statuses (List[List[
CardStatus
]]): A list of lists ofCardStatus
(each card has a list of statuses).
87 def start_transfer(self) -> None: 88 """Start the transfer of data between the on-device buffer of each child card and its `TransferBuffer`. See 89 `AbstractSpectrumCard.start_transfer()` for more information.""" 90 for card in self._child_cards: 91 card.start_transfer()
Start the transfer of data between the on-device buffer of each child card and its TransferBuffer
. See
AbstractSpectrumCard.start_transfer()
for more information.
93 def stop_transfer(self) -> None: 94 """Stop the transfer of data between each card and its `TransferBuffer`. See 95 `AbstractSpectrumCard.stop_transfer()` for more information.""" 96 for card in self._child_cards: 97 card.stop_transfer()
Stop the transfer of data between each card and its TransferBuffer
. See
AbstractSpectrumCard.stop_transfer()
for more information.
99 def wait_for_transfer_chunk_to_complete(self) -> None: 100 """Wait for all cards to stop transferring data to/from their `TransferBuffers`. See 101 `AbstractSpectrumCard.wait_for_transfer_to_complete()` for more information.""" 102 for card in self._child_cards: 103 card.wait_for_transfer_chunk_to_complete()
Wait for all cards to stop transferring data to/from their TransferBuffers
. See
AbstractSpectrumCard.wait_for_transfer_to_complete()
for more information.
105 @property 106 def connected(self) -> bool: 107 """True if the StarHub is connected, False if not.""" 108 return self._connected
True if the StarHub is connected, False if not.
110 def set_triggering_card(self, card_index: int) -> None: 111 """Change the index of the child card responsible for receiving a trigger. During construction, this is set 112 equal to the index of the master card but in some situations it may be necessary to change it. 113 114 Args: 115 card_index (int): The index of the StarHub's triggering card within the list of child cards provided on 116 __init__(). 117 """ 118 self._triggering_card = self._child_cards[card_index]
Change the index of the child card responsible for receiving a trigger. During construction, this is set equal to the index of the master card but in some situations it may be necessary to change it.
Arguments:
- card_index (int): The index of the StarHub's triggering card within the list of child cards provided on __init__().
120 @property 121 def clock_mode(self) -> ClockMode: 122 """The clock mode currently configured on the master card. 123 124 Returns: 125 mode (`ClockMode`): The currently configured clock mode.""" 126 return self._master_card.clock_mode
The clock mode currently configured on the master card.
Returns:
mode (
ClockMode
): The currently configured clock mode.
128 def set_clock_mode(self, mode: ClockMode) -> None: 129 """Change the clock mode configured on the master card. 130 131 Args: 132 mode (`ClockMode`): The desired clock mode.""" 133 self._master_card.set_clock_mode(mode)
Change the clock mode configured on the master card.
Arguments:
- mode (
ClockMode
): The desired clock mode.
135 @property 136 def sample_rate_in_hz(self) -> int: 137 """The sample rate configured on the master card. 138 139 Returns: 140 rate (int): The current sample rate of the master card in Hz. 141 """ 142 return self._master_card.sample_rate_in_hz
The sample rate configured on the master card.
Returns:
rate (int): The current sample rate of the master card in Hz.
144 def set_sample_rate_in_hz(self, rate: int) -> None: 145 """Change the sample rate of the child cards (including the master card). 146 Args: 147 rate (int): The desired sample rate of the child cards in Hz. 148 """ 149 for card in self._child_cards: 150 card.set_sample_rate_in_hz(rate)
Change the sample rate of the child cards (including the master card).
Arguments:
- rate (int): The desired sample rate of the child cards in Hz.
152 @property 153 def trigger_sources(self) -> List[TriggerSource]: 154 """The trigger sources configured on the triggering card, which by default is the master card. See 155 `AbstractSpectrumCard.trigger_sources()` for more information. 156 157 Returns: 158 sources (List[`TriggerSource`]): A list of the currently enabled trigger sources.""" 159 return self._triggering_card.trigger_sources
The trigger sources configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.trigger_sources()
for more information.
Returns:
sources (List[
TriggerSource
]): A list of the currently enabled trigger sources.
161 def set_trigger_sources(self, sources: List[TriggerSource]) -> None: 162 """Change the trigger sources configured on the triggering card, which by default is the master card. See 163 `AbstractSpectrumCard.trigger_sources()` for more information. 164 165 Args: 166 sources (List[`TriggerSource`]): The trigger sources to enable, in a list.""" 167 self._triggering_card.set_trigger_sources(sources) 168 for card in self._child_cards: 169 if card is not self._triggering_card: 170 card.set_trigger_sources([TriggerSource.SPC_TMASK_NONE])
Change the trigger sources configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.trigger_sources()
for more information.
Arguments:
- sources (List[
TriggerSource
]): The trigger sources to enable, in a list.
172 @property 173 def external_trigger_mode(self) -> ExternalTriggerMode: 174 """The trigger mode configured on the triggering card, which by default is the master card. See 175 `AbstractSpectrumCard.external_trigger_mode()` for more information. 176 177 Returns: 178 mode (`ExternalTriggerMode`): The currently set external trigger mode. 179 """ 180 return self._triggering_card.external_trigger_mode
The trigger mode configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.external_trigger_mode()
for more information.
Returns:
mode (
ExternalTriggerMode
): The currently set external trigger mode.
182 def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: 183 """Change the trigger mode configured on the triggering card, which by default is the master card. See 184 `AbstractSpectrumCard.set_external_trigger_mode()` for more information. 185 186 Args: 187 mode (`ExternalTriggerMode`): The desired external trigger mode.""" 188 self._triggering_card.set_external_trigger_mode(mode)
Change the trigger mode configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.set_external_trigger_mode()
for more information.
Arguments:
- mode (
ExternalTriggerMode
): The desired external trigger mode.
190 @property 191 def external_trigger_level_in_mv(self) -> int: 192 """The external trigger level configured on the triggering card, which by default is the master card. See 193 `AbstractSpectrumCard.external_trigger_level_mv()` for more information. 194 195 Returns: 196 level (int): The external trigger level in mV. 197 """ 198 return self._triggering_card.external_trigger_level_in_mv
The external trigger level configured on the triggering card, which by default is the master card. See
AbstractSpectrumCard.external_trigger_level_mv()
for more information.
Returns:
level (int): The external trigger level in mV.
200 def set_external_trigger_level_in_mv(self, level: int) -> None: 201 """Change the external trigger level configured on the triggering card, which by default is the master card. 202 See `AbstractSpectrumCard.set_external_trigger_level_mv()` for more information. 203 204 Args: 205 level (int): The desired external trigger level in mV. 206 """ 207 self._triggering_card.set_external_trigger_level_in_mv(level)
Change the external trigger level configured on the triggering card, which by default is the master card.
See AbstractSpectrumCard.set_external_trigger_level_mv()
for more information.
Arguments:
- level (int): The desired external trigger level in mV.
209 @property 210 def external_trigger_pulse_width_in_samples(self) -> int: 211 """The trigger pulse width (samples) configured on the triggering card, which by default is the master card. 212 See `AbstractSpectrumCard.external_trigger_pulse_width_in_samples()` for more information. 213 214 Returns: 215 width (int): The current trigger pulse width in samples. 216 """ 217 return self._triggering_card.external_trigger_pulse_width_in_samples
The trigger pulse width (samples) configured on the triggering card, which by default is the master card.
See AbstractSpectrumCard.external_trigger_pulse_width_in_samples()
for more information.
Returns:
width (int): The current trigger pulse width in samples.
219 def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: 220 """Change the trigger pulse width (samples) configured on the triggering card, which by default is the master 221 card. See `AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()` for more information. 222 223 Args: 224 width (int): The desired trigger pulse width in samples. 225 """ 226 self._triggering_card.set_external_trigger_pulse_width_in_samples(width)
Change the trigger pulse width (samples) configured on the triggering card, which by default is the master
card. See AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()
for more information.
Arguments:
- width (int): The desired trigger pulse width in samples.
228 def apply_channel_enabling(self) -> None: 229 """Apply the enabled channels chosen using `set_enable_channels()`. This happens automatically and does not 230 usually need to be called.""" 231 for d in self._child_cards: 232 d.apply_channel_enabling()
Apply the enabled channels chosen using set_enable_channels()
. This happens automatically and does not
usually need to be called.
234 @property 235 def enabled_analog_channel_nums(self) -> List[int]: 236 """The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total 237 number of channels available to the hub). 238 239 Returns: 240 channel_nums (List[int]): The currently enabled channel indices. 241 """ 242 enabled_channels = [] 243 n_channels_in_previous_card = 0 244 for card in self._child_cards: 245 enabled_channels += [ 246 channel_num + n_channels_in_previous_card for channel_num in card.enabled_analog_channel_nums 247 ] 248 n_channels_in_previous_card = len(card.analog_channels) 249 return enabled_channels
The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub).
Returns:
channel_nums (List[int]): The currently enabled channel indices.
251 def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: 252 """Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total 253 number of channels available to the hub). 254 255 Returns: 256 channel_nums (List[int]): The indices to enable. 257 """ 258 channels_nums.sort() 259 channels_to_enable_all_cards = channels_nums 260 261 for child_card in self._child_cards: 262 n_channels_in_card = len(child_card.analog_channels) 263 channels_to_enable_this_card = list(set(arange(n_channels_in_card)) & set(channels_to_enable_all_cards)) 264 num_channels_to_enable_this_card = len(channels_to_enable_this_card) 265 child_card.set_enabled_analog_channels(channels_to_enable_this_card) 266 channels_to_enable_all_cards = [ 267 num - n_channels_in_card for num in channels_nums[num_channels_to_enable_this_card:] 268 ]
Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub).
Returns:
channel_nums (List[int]): The indices to enable.
270 @property 271 def transfer_buffers(self) -> List[TransferBuffer]: 272 """The `TransferBuffer`s of all the child cards of the hub. See `AbstractSpectrumCard.transfer_buffers` for more 273 information. 274 275 Returns: 276 buffers (List[`TransferBuffer`]): A list of the transfer buffers for each child card.""" 277 return [card.transfer_buffers[0] for card in self._child_cards]
The TransferBuffer
s of all the child cards of the hub. See AbstractSpectrumCard.transfer_buffers
for more
information.
Returns:
buffers (List[
TransferBuffer
]): A list of the transfer buffers for each child card.
279 @property 280 def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: 281 """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for 282 more information. 283 284 Returns: 285 channels (Sequence[`SpectrumChannelInterface`]): A tuple of `SpectrumDigitiserChannel` objects. 286 """ 287 channels: List[AnalogChannelInterfaceType] = [] 288 for device in self._child_cards: 289 channels += device.analog_channels 290 return tuple(channels)
A tuple containing of all the channels of the child cards of the hub. See AbstractSpectrumCard.channels
for
more information.
Returns:
channels (Sequence[
SpectrumChannelInterface
]): A tuple ofSpectrumDigitiserChannel
objects.
292 @property 293 def io_lines(self) -> Sequence[IOLineInterfaceType]: 294 """A tuple containing of all the Multipurpose IO Lines of the child cards of the hub. 295 296 Returns: 297 channels (Sequence[`SpectrumIOLineInterface`]): A tuple of `SpectrumIOLineInterface` objects. 298 """ 299 io_lines: List[IOLineInterfaceType] = [] 300 for device in self._child_cards: 301 io_lines += device.io_lines 302 return tuple(io_lines) # todo: this is probably wrong. I don't think both cards in a netbox have IO lines
A tuple containing of all the Multipurpose IO Lines of the child cards of the hub.
Returns:
channels (Sequence[
SpectrumIOLineInterface
]): A tuple ofSpectrumIOLineInterface
objects.
304 @property 305 def timeout_in_ms(self) -> int: 306 """The time for which the card will wait for a trigger to be received after a device has started 307 before returning an error. This should be the same for all child cards. If it's not, an exception is raised. 308 309 Returns: 310 timeout_ms (int): The currently set timeout in ms. 311 """ 312 timeouts = [] 313 for d in self._child_cards: 314 timeouts.append(d.timeout_in_ms) 315 return check_settings_constant_across_devices(timeouts, __name__)
The time for which the card will wait for a trigger to be received after a device has started before returning an error. This should be the same for all child cards. If it's not, an exception is raised.
Returns:
timeout_ms (int): The currently set timeout in ms.
317 def set_timeout_in_ms(self, timeout_ms: int) -> None: 318 """Change the timeout value for all child cards. 319 320 Args: 321 timeout_ms (int): The desired timeout setting in seconds.""" 322 for d in self._child_cards: 323 d.set_timeout_in_ms(timeout_ms)
Change the timeout value for all child cards.
Arguments:
- timeout_ms (int): The desired timeout setting in seconds.
325 @property 326 def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]: 327 """Get a list of the features of the child cards. See `CardFeature`, `AdvancedCardFeature` and the Spectrum 328 documentation for more information. 329 330 Returns: 331 features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A list of tuples, one per child 332 card. Each tuple contains a list of features and a list of advanced features for that card. 333 """ 334 return [card.feature_list[0] for card in self._child_cards]
Get a list of the features of the child cards. See CardFeature
, AdvancedCardFeature
and the Spectrum
documentation for more information.
Returns:
features (List[Tuple[List[
CardFeature
], List[AdvancedCardFeature
]]]): A list of tuples, one per child card. Each tuple contains a list of features and a list of advanced features for that card.
336 @property 337 def available_io_modes(self) -> AvailableIOModes: 338 """For each multipurpose IO line on the master card, read the available modes. See `IOLineMode` and the Spectrum 339 Documentation for all possible available modes and their meanings. 340 341 Returns: 342 modes (AvailableIOModes): An `AvailableIOModes` dataclass containing the modes available for each IO line. 343 """ 344 return self._master_card.available_io_modes
For each multipurpose IO line on the master card, read the available modes. See IOLineMode
and the Spectrum
Documentation for all possible available modes and their meanings.
Returns:
modes (AvailableIOModes): An
AvailableIOModes
dataclass containing the modes available for each IO line.
Inherited Members
- AbstractSpectrumDevice
- reset
- start
- stop
- configure_trigger
- configure_channel_pairing
- write_to_spectrum_device_register
- read_spectrum_device_register
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- define_transfer_buffer
- force_trigger
- type
- model_number
77class AbstractSpectrumCard(AbstractSpectrumDevice[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): 78 """Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices).""" 79 80 def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any): 81 """ 82 Args: 83 device_number (int): Index of the card to control. If only one card is present, set to 0. 84 ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. 85 86 """ 87 super().__init__() # required for proper MRO resolution 88 if ip_address is not None: 89 self._visa_string = _create_visa_string_from_ip(ip_address, device_number) 90 else: 91 self._visa_string = f"/dev/spcm{device_number}" 92 self._connect(self._visa_string) 93 self._model_number = ModelNumber(self.read_spectrum_device_register(SPC_PCITYP)) 94 self._trigger_sources: List[TriggerSource] = [] 95 self._analog_channels = self._init_analog_channels() 96 self._io_lines = self._init_io_lines() 97 self._enabled_analog_channels: List[int] = [0] 98 self._transfer_buffer: Optional[TransferBuffer] = None 99 self.apply_channel_enabling() 100 101 @property 102 def model_number(self) -> ModelNumber: 103 return self._model_number 104 105 def reconnect(self) -> None: 106 """Reconnect to the card after disconnect() has been called.""" 107 self._connect(self._visa_string) 108 109 @property 110 def status(self) -> DEVICE_STATUS_TYPE: 111 """Read the current status of the card. 112 Returns: 113 Statuses (`List[List[StatusCode]]`): A length-1 list containing a list of `StatusCode` Enums describing the 114 current acquisition status of the card. See `StatusCode` (and the Spectrum documentation) for the list off 115 possible acquisition statuses. 116 """ 117 return [decode_status(self.read_spectrum_device_register(SPC_M2STATUS))] 118 119 def start_transfer(self) -> None: 120 """Transfer between the on-device buffer and the `TransferBuffer`. 121 122 Requires that a `TransferBuffer` has been defined (see `define_transfer_buffer()`). 123 124 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), `start_transfer()` should be called after each 125 acquisition has completed to transfer the acquired waveforms from the device to the `TransferBuffer`. 126 127 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), `start_transfer()` should be called immediately after 128 `start()` has been called, so that the waveform data can be continuously streamed into the transfer buffer as it 129 is acquired. 130 131 # todo: docstring for AWG transfers 132 """ 133 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STARTDMA) 134 135 def stop_transfer(self) -> None: 136 """Stop the transfer of data between the on-device buffer and the `TransferBuffer`. 137 138 Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this 139 method is rarely required. It may invalidate transferred samples. 140 141 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired 142 samples have been transferred, so `stop_transfer()` should not be used. Instead, call 143 `wait_for_transfer_to_complete()` after `start_transfer()`. 144 145 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition, 146 and transfer will automatically stop when `stop()` is called as there will be no more 147 samples to transfer, so `stop_transfer()` should not be used. 148 149 # todo: docstring for AWG 150 """ 151 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STOPDMA) 152 153 def wait_for_transfer_chunk_to_complete(self) -> None: 154 """Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is 155 complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer. 156 157 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method 158 returns, all acquired waveforms have been transferred from the on-device buffer to the `TransferBuffer` and can 159 be read using the `get_waveforms()` method. 160 161 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms(). 162 163 # todo: update the above docstring to take into account cases where notify size < data lemgth 164 # todo: docstring for AWG 165 """ 166 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_WAITDMA) 167 168 @property 169 def transfer_buffers(self) -> List[TransferBuffer]: 170 """Return the `TransferBuffer` configured for transferring data between the card and the software. 171 172 Returns: 173 buffer (List[`TransferBuffer`]): A length-1 list containing the `TransferBuffer` object. Any data within 174 the `TransferBuffer` can be accessed using its own interface, but the samples are stored as a 1D array, 175 with the samples of each channel interleaved as per the Spectrum user manual. For digitisers, it is more 176 convenient to read waveform data using the `get_waveforms()` method. 177 """ 178 if self._transfer_buffer is not None: 179 return [self._transfer_buffer] 180 else: 181 raise SpectrumNoTransferBufferDefined("Cannot find TransferBuffer.") 182 183 def disconnect(self) -> None: 184 """Terminate the connection to the card.""" 185 if self.connected: 186 destroy_handle(self._handle) 187 self._connected = False 188 189 @property 190 def connected(self) -> bool: 191 """Returns True if a card is currently connected, False if not.""" 192 return self._connected 193 194 def __eq__(self, other: object) -> bool: 195 if isinstance(other, self.__class__): 196 return self._handle == other._handle 197 else: 198 raise NotImplementedError(f"Cannot compare {self.__class__} with {other.__class__}") 199 200 @property 201 def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: 202 """A tuple containing the channels that belong to the card. 203 204 Properties of the individual channels can be set by calling the methods of the 205 returned objects directly. See `SpectrumDigitiserChannel` and `SpectrumAWGChannel` for more information. 206 207 Returns: 208 channels (Sequence[`SpectrumChannelInterface`]): A tuple of objects conforming to the 209 `SpectrumChannelInterface` interface. 210 """ 211 return self._analog_channels 212 213 @property 214 def io_lines(self) -> Sequence[IOLineInterfaceType]: 215 """A tuple containing the Multipurpose IO Lines that belong to the card. 216 217 Properties of the individual channels can be set by calling the methods of the 218 returned objects directly. 219 220 Returns: 221 channels (Sequence[`SpectrumIOLineInterface`]): A tuple of objects conforming to the 222 `SpectrumIOLineInterface` interface. 223 """ 224 return self._io_lines 225 226 @property 227 def enabled_analog_channel_nums(self) -> List[int]: 228 """The indices of the currently enabled channels. 229 Returns: 230 enabled_channels (List[int]): The indices of the currently enabled channels. 231 """ 232 return self._enabled_analog_channels 233 234 def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: 235 """Change which channels are enabled. 236 237 Args: 238 channels_nums (List[int]): The integer channel indices to enable. 239 """ 240 if len(channels_nums) in [1, 2, 4, 8]: 241 self._enabled_analog_channels = channels_nums 242 self.apply_channel_enabling() 243 else: 244 raise SpectrumInvalidNumberOfEnabledChannels(f"{len(channels_nums)} cannot be enabled at once.") 245 246 @property 247 def trigger_sources(self) -> List[TriggerSource]: 248 """The currently enabled trigger sources 249 250 Returns: 251 sources (List[`TriggerSource`]): A list of TriggerSources. 252 """ 253 or_of_sources = self.read_spectrum_device_register(SPC_TRIG_ORMASK) 254 self._trigger_sources = decode_trigger_sources(or_of_sources) 255 return self._trigger_sources 256 257 def set_trigger_sources(self, sources: List[TriggerSource]) -> None: 258 """Change the enabled trigger sources. 259 260 Args: 261 sources (List[`TriggerSource`]): The TriggerSources to enable. 262 """ 263 self._trigger_sources = sources 264 or_of_sources = reduce(or_, [s.value for s in sources]) 265 self.write_to_spectrum_device_register(SPC_TRIG_ORMASK, or_of_sources) 266 self.write_to_spectrum_device_register(SPC_TRIG_ANDMASK, 0) 267 268 @property 269 def external_trigger_mode(self) -> ExternalTriggerMode: 270 """The currently enabled external trigger mode. An external trigger source must be enabled. 271 272 Returns: 273 sources (`ExternalTriggerMode`): The currently enabled `ExternalTriggerMode`.""" 274 if len(self._active_external_triggers) == 0: 275 raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger mode.") 276 else: 277 first_trig_source = self._active_external_triggers[0] 278 try: 279 return ExternalTriggerMode( 280 self.read_spectrum_device_register(EXTERNAL_TRIGGER_MODE_COMMANDS[first_trig_source.value]) 281 ) 282 except KeyError: 283 raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger mode of {first_trig_source.name}.") 284 285 def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: 286 """Change the currently enabled trigger mode. An external trigger source must be enabled. 287 288 Args: 289 mode (`ExternalTriggerMode`): The `ExternalTriggerMode` to apply. 290 """ 291 if len(self._active_external_triggers) == 0: 292 raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger mode.") 293 else: 294 for trigger_source in self._active_external_triggers: 295 try: 296 self.write_to_spectrum_device_register( 297 EXTERNAL_TRIGGER_MODE_COMMANDS[trigger_source.value], mode.value 298 ) 299 except KeyError: 300 raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger mode of {trigger_source.name}.") 301 302 @property 303 def _active_external_triggers(self) -> List[TriggerSource]: 304 return [ 305 TriggerSource(val) 306 for val in list( 307 set(EXTERNAL_TRIGGER_MODE_COMMANDS.keys()) & set([source.value for source in self._trigger_sources]) 308 ) 309 ] 310 311 @property 312 def external_trigger_level_in_mv(self) -> int: 313 """The signal level (mV) needed to trigger an event using an external trigger source. An external 314 trigger source must be enabled. 315 316 Returns: 317 level (int): The currently set trigger level in mV. 318 """ 319 if len(self._active_external_triggers) == 0: 320 raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger level.") 321 else: 322 first_trig_source = self._active_external_triggers[0] 323 try: 324 return self.read_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[first_trig_source.value]) 325 except KeyError: 326 raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger level of {first_trig_source.name}.") 327 328 def set_external_trigger_level_in_mv(self, level: int) -> None: 329 """Change the signal level (mV) needed to trigger an event using an external trigger source. An external 330 trigger source must be enabled. 331 332 Args: 333 level (int): The trigger level to set in mV. 334 """ 335 if len(self._active_external_triggers) == 0: 336 raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger level.") 337 else: 338 for trigger_source in self._active_external_triggers: 339 try: 340 self.write_to_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[trigger_source.value], level) 341 except KeyError: 342 raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger level of {trigger_source.name}.") 343 344 @property 345 def external_trigger_pulse_width_in_samples(self) -> int: 346 """The pulse width (in samples) needed to trigger an event using an external trigger source, if 347 SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be 348 enabled. 349 350 Returns: 351 width (int): The current trigger pulse width in samples. 352 """ 353 if len(self._active_external_triggers) == 0: 354 raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger pulse width.") 355 else: 356 first_trig_source = self._active_external_triggers[0] 357 try: 358 return self.read_spectrum_device_register( 359 EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[first_trig_source.value] 360 ) 361 except KeyError: 362 raise SpectrumTriggerOperationNotImplemented(f"Cannot get pulse width of {first_trig_source.name}.") 363 364 def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: 365 """Change the pulse width (samples) needed to trigger an event using an external trigger source if 366 SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be 367 enabled. 368 369 Args: 370 width (int): The trigger pulse width to set, in samples.""" 371 if len(self._active_external_triggers) == 0: 372 raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger pulse width.") 373 else: 374 for trigger_source in self._active_external_triggers: 375 try: 376 self.write_to_spectrum_device_register( 377 EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[trigger_source.value], width 378 ) 379 except KeyError: 380 raise SpectrumTriggerOperationNotImplemented(f"Cannot set pulse width of {trigger_source.name}.") 381 382 def apply_channel_enabling(self) -> None: 383 """Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not 384 usually need to be called.""" 385 enabled_channel_spectrum_values = [self.analog_channels[i].name.value for i in self._enabled_analog_channels] 386 if len(enabled_channel_spectrum_values) in [1, 2, 4, 8]: 387 bitwise_or_of_enabled_channels = reduce(or_, enabled_channel_spectrum_values) 388 self.write_to_spectrum_device_register(SPC_CHENABLE, bitwise_or_of_enabled_channels) 389 else: 390 raise SpectrumInvalidNumberOfEnabledChannels( 391 f"Cannot enable {len(enabled_channel_spectrum_values)} " f"channels on one card." 392 ) 393 394 @abstractmethod 395 def _init_analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: 396 raise NotImplementedError() 397 398 @abstractmethod 399 def _init_io_lines(self) -> Sequence[IOLineInterfaceType]: 400 raise NotImplementedError() 401 402 @property 403 def timeout_in_ms(self) -> int: 404 """The time for which the card will wait for a trigger to be received after the device has been started 405 before returning an error. 406 407 Returns: 408 timeout_in_ms (in)t: The currently set timeout in ms. 409 """ 410 return self.read_spectrum_device_register(SPC_TIMEOUT) 411 412 def set_timeout_in_ms(self, timeout_in_ms: int) -> None: 413 """Change the time for which the card will wait for a trigger to tbe received after the device has started 414 before returning an error. 415 416 Args: 417 timeout_in_ms (int): The desired timeout in ms. 418 """ 419 self.write_to_spectrum_device_register(SPC_TIMEOUT, timeout_in_ms) 420 421 @property 422 def clock_mode(self) -> ClockMode: 423 """The currently enabled clock mode. 424 425 Returns: 426 mode (`ClockMode`): The currently set clock mode. 427 """ 428 return ClockMode(self.read_spectrum_device_register(SPC_CLOCKMODE)) 429 430 def set_clock_mode(self, mode: ClockMode) -> None: 431 """Change the clock mode. See `ClockMode` and the Spectrum documentation for available modes. 432 433 Args: 434 mode (`ClockMode`): The desired clock mode. 435 """ 436 self.write_to_spectrum_device_register(SPC_CLOCKMODE, mode.value) 437 438 @property 439 def available_io_modes(self) -> AvailableIOModes: 440 """For each multipurpose IO line on the card, read the available modes. See IOLineMode and the Spectrum 441 Documentation for all possible available modes and their meanings. 442 443 Returns: 444 modes (`AvailableIOModes`): An `AvailableIOModes` dataclass containing the modes for each IO line.""" 445 return AvailableIOModes( 446 X0=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X0_AVAILMODES)), 447 X1=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X1_AVAILMODES)), 448 X2=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X2_AVAILMODES)), 449 X3=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X3_AVAILMODES)), 450 ) 451 452 @property 453 def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]: 454 """Get a list of the features of the card. See `CardFeature`, `AdvancedCardFeature` and the Spectrum 455 documentation for more information. 456 457 Returns: 458 features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A tuple of two lists - of features 459 and advanced features respectively - wrapped in a list. 460 """ 461 normal_features = decode_card_features(self.read_spectrum_device_register(SPC_PCIFEATURES)) 462 advanced_features = decode_advanced_card_features(self.read_spectrum_device_register(SPC_PCIEXTFEATURES)) 463 return [(normal_features, advanced_features)] 464 465 @property 466 def sample_rate_in_hz(self) -> int: 467 """The rate at which samples will be acquired or generated, in Hz. 468 469 Returns: 470 rate (int): The currently set sample rate in Hz. 471 """ 472 return self.read_spectrum_device_register(SPC_SAMPLERATE, SpectrumRegisterLength.SIXTY_FOUR) 473 474 def set_sample_rate_in_hz(self, rate: int) -> None: 475 """Change the rate at which samples will be acquired or generated, in Hz. 476 Args: 477 rate (int): The desired sample rate in Hz. 478 """ 479 self.write_to_spectrum_device_register(SPC_SAMPLERATE, rate, SpectrumRegisterLength.SIXTY_FOUR) 480 481 def __str__(self) -> str: 482 return f"Card {self._visa_string} (model {self.model_number.name})." 483 484 @property 485 def type(self) -> CardType: 486 return CardType(self.read_spectrum_device_register(SPC_FNCTYPE)) 487 488 def force_trigger(self) -> None: 489 """Force a trigger event to occur""" 490 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_FORCETRIGGER) 491 492 @property 493 def bytes_per_sample(self) -> int: 494 return self.read_spectrum_device_register(SPC_MIINST_BYTESPERSAMPLE)
Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices).
80 def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any): 81 """ 82 Args: 83 device_number (int): Index of the card to control. If only one card is present, set to 0. 84 ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string. 85 86 """ 87 super().__init__() # required for proper MRO resolution 88 if ip_address is not None: 89 self._visa_string = _create_visa_string_from_ip(ip_address, device_number) 90 else: 91 self._visa_string = f"/dev/spcm{device_number}" 92 self._connect(self._visa_string) 93 self._model_number = ModelNumber(self.read_spectrum_device_register(SPC_PCITYP)) 94 self._trigger_sources: List[TriggerSource] = [] 95 self._analog_channels = self._init_analog_channels() 96 self._io_lines = self._init_io_lines() 97 self._enabled_analog_channels: List[int] = [0] 98 self._transfer_buffer: Optional[TransferBuffer] = None 99 self.apply_channel_enabling()
Arguments:
- device_number (int): Index of the card to control. If only one card is present, set to 0.
- ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.
105 def reconnect(self) -> None: 106 """Reconnect to the card after disconnect() has been called.""" 107 self._connect(self._visa_string)
Reconnect to the card after disconnect() has been called.
109 @property 110 def status(self) -> DEVICE_STATUS_TYPE: 111 """Read the current status of the card. 112 Returns: 113 Statuses (`List[List[StatusCode]]`): A length-1 list containing a list of `StatusCode` Enums describing the 114 current acquisition status of the card. See `StatusCode` (and the Spectrum documentation) for the list off 115 possible acquisition statuses. 116 """ 117 return [decode_status(self.read_spectrum_device_register(SPC_M2STATUS))]
Read the current status of the card.
Returns:
Statuses (
List[List[StatusCode]]
): A length-1 list containing a list ofStatusCode
Enums describing the current acquisition status of the card. SeeStatusCode
(and the Spectrum documentation) for the list off possible acquisition statuses.
119 def start_transfer(self) -> None: 120 """Transfer between the on-device buffer and the `TransferBuffer`. 121 122 Requires that a `TransferBuffer` has been defined (see `define_transfer_buffer()`). 123 124 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), `start_transfer()` should be called after each 125 acquisition has completed to transfer the acquired waveforms from the device to the `TransferBuffer`. 126 127 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), `start_transfer()` should be called immediately after 128 `start()` has been called, so that the waveform data can be continuously streamed into the transfer buffer as it 129 is acquired. 130 131 # todo: docstring for AWG transfers 132 """ 133 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STARTDMA)
Transfer between the on-device buffer and the TransferBuffer
.
Requires that a TransferBuffer
has been defined (see define_transfer_buffer()
).
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), start_transfer()
should be called after each
acquisition has completed to transfer the acquired waveforms from the device to the TransferBuffer
.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), start_transfer()
should be called immediately after
start()
has been called, so that the waveform data can be continuously streamed into the transfer buffer as it
is acquired.
todo: docstring for AWG transfers
135 def stop_transfer(self) -> None: 136 """Stop the transfer of data between the on-device buffer and the `TransferBuffer`. 137 138 Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this 139 method is rarely required. It may invalidate transferred samples. 140 141 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired 142 samples have been transferred, so `stop_transfer()` should not be used. Instead, call 143 `wait_for_transfer_to_complete()` after `start_transfer()`. 144 145 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition, 146 and transfer will automatically stop when `stop()` is called as there will be no more 147 samples to transfer, so `stop_transfer()` should not be used. 148 149 # todo: docstring for AWG 150 """ 151 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STOPDMA)
Stop the transfer of data between the on-device buffer and the TransferBuffer
.
Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this method is rarely required. It may invalidate transferred samples.
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired
samples have been transferred, so stop_transfer()
should not be used. Instead, call
wait_for_transfer_to_complete()
after start_transfer()
.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition,
and transfer will automatically stop when stop()
is called as there will be no more
samples to transfer, so stop_transfer()
should not be used.
todo: docstring for AWG
153 def wait_for_transfer_chunk_to_complete(self) -> None: 154 """Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is 155 complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer. 156 157 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method 158 returns, all acquired waveforms have been transferred from the on-device buffer to the `TransferBuffer` and can 159 be read using the `get_waveforms()` method. 160 161 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms(). 162 163 # todo: update the above docstring to take into account cases where notify size < data lemgth 164 # todo: docstring for AWG 165 """ 166 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_WAITDMA)
Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer.
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method
returns, all acquired waveforms have been transferred from the on-device buffer to the TransferBuffer
and can
be read using the get_waveforms()
method.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms().
todo: update the above docstring to take into account cases where notify size < data lemgth
todo: docstring for AWG
168 @property 169 def transfer_buffers(self) -> List[TransferBuffer]: 170 """Return the `TransferBuffer` configured for transferring data between the card and the software. 171 172 Returns: 173 buffer (List[`TransferBuffer`]): A length-1 list containing the `TransferBuffer` object. Any data within 174 the `TransferBuffer` can be accessed using its own interface, but the samples are stored as a 1D array, 175 with the samples of each channel interleaved as per the Spectrum user manual. For digitisers, it is more 176 convenient to read waveform data using the `get_waveforms()` method. 177 """ 178 if self._transfer_buffer is not None: 179 return [self._transfer_buffer] 180 else: 181 raise SpectrumNoTransferBufferDefined("Cannot find TransferBuffer.")
Return the TransferBuffer
configured for transferring data between the card and the software.
Returns:
buffer (List[
TransferBuffer
]): A length-1 list containing theTransferBuffer
object. Any data within theTransferBuffer
can be accessed using its own interface, but the samples are stored as a 1D array, with the samples of each channel interleaved as per the Spectrum user manual. For digitisers, it is more convenient to read waveform data using theget_waveforms()
method.
183 def disconnect(self) -> None: 184 """Terminate the connection to the card.""" 185 if self.connected: 186 destroy_handle(self._handle) 187 self._connected = False
Terminate the connection to the card.
189 @property 190 def connected(self) -> bool: 191 """Returns True if a card is currently connected, False if not.""" 192 return self._connected
Returns True if a card is currently connected, False if not.
200 @property 201 def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]: 202 """A tuple containing the channels that belong to the card. 203 204 Properties of the individual channels can be set by calling the methods of the 205 returned objects directly. See `SpectrumDigitiserChannel` and `SpectrumAWGChannel` for more information. 206 207 Returns: 208 channels (Sequence[`SpectrumChannelInterface`]): A tuple of objects conforming to the 209 `SpectrumChannelInterface` interface. 210 """ 211 return self._analog_channels
A tuple containing the channels that belong to the card.
Properties of the individual channels can be set by calling the methods of the
returned objects directly. See SpectrumDigitiserChannel
and SpectrumAWGChannel
for more information.
Returns:
channels (Sequence[
SpectrumChannelInterface
]): A tuple of objects conforming to theSpectrumChannelInterface
interface.
213 @property 214 def io_lines(self) -> Sequence[IOLineInterfaceType]: 215 """A tuple containing the Multipurpose IO Lines that belong to the card. 216 217 Properties of the individual channels can be set by calling the methods of the 218 returned objects directly. 219 220 Returns: 221 channels (Sequence[`SpectrumIOLineInterface`]): A tuple of objects conforming to the 222 `SpectrumIOLineInterface` interface. 223 """ 224 return self._io_lines
A tuple containing the Multipurpose IO Lines that belong to the card.
Properties of the individual channels can be set by calling the methods of the returned objects directly.
Returns:
channels (Sequence[
SpectrumIOLineInterface
]): A tuple of objects conforming to theSpectrumIOLineInterface
interface.
226 @property 227 def enabled_analog_channel_nums(self) -> List[int]: 228 """The indices of the currently enabled channels. 229 Returns: 230 enabled_channels (List[int]): The indices of the currently enabled channels. 231 """ 232 return self._enabled_analog_channels
The indices of the currently enabled channels.
Returns:
enabled_channels (List[int]): The indices of the currently enabled channels.
234 def set_enabled_analog_channels(self, channels_nums: List[int]) -> None: 235 """Change which channels are enabled. 236 237 Args: 238 channels_nums (List[int]): The integer channel indices to enable. 239 """ 240 if len(channels_nums) in [1, 2, 4, 8]: 241 self._enabled_analog_channels = channels_nums 242 self.apply_channel_enabling() 243 else: 244 raise SpectrumInvalidNumberOfEnabledChannels(f"{len(channels_nums)} cannot be enabled at once.")
Change which channels are enabled.
Arguments:
- channels_nums (List[int]): The integer channel indices to enable.
246 @property 247 def trigger_sources(self) -> List[TriggerSource]: 248 """The currently enabled trigger sources 249 250 Returns: 251 sources (List[`TriggerSource`]): A list of TriggerSources. 252 """ 253 or_of_sources = self.read_spectrum_device_register(SPC_TRIG_ORMASK) 254 self._trigger_sources = decode_trigger_sources(or_of_sources) 255 return self._trigger_sources
The currently enabled trigger sources
Returns:
sources (List[
TriggerSource
]): A list of TriggerSources.
257 def set_trigger_sources(self, sources: List[TriggerSource]) -> None: 258 """Change the enabled trigger sources. 259 260 Args: 261 sources (List[`TriggerSource`]): The TriggerSources to enable. 262 """ 263 self._trigger_sources = sources 264 or_of_sources = reduce(or_, [s.value for s in sources]) 265 self.write_to_spectrum_device_register(SPC_TRIG_ORMASK, or_of_sources) 266 self.write_to_spectrum_device_register(SPC_TRIG_ANDMASK, 0)
Change the enabled trigger sources.
Arguments:
- sources (List[
TriggerSource
]): The TriggerSources to enable.
268 @property 269 def external_trigger_mode(self) -> ExternalTriggerMode: 270 """The currently enabled external trigger mode. An external trigger source must be enabled. 271 272 Returns: 273 sources (`ExternalTriggerMode`): The currently enabled `ExternalTriggerMode`.""" 274 if len(self._active_external_triggers) == 0: 275 raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger mode.") 276 else: 277 first_trig_source = self._active_external_triggers[0] 278 try: 279 return ExternalTriggerMode( 280 self.read_spectrum_device_register(EXTERNAL_TRIGGER_MODE_COMMANDS[first_trig_source.value]) 281 ) 282 except KeyError: 283 raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger mode of {first_trig_source.name}.")
The currently enabled external trigger mode. An external trigger source must be enabled.
Returns:
sources (
ExternalTriggerMode
): The currently enabledExternalTriggerMode
.
285 def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None: 286 """Change the currently enabled trigger mode. An external trigger source must be enabled. 287 288 Args: 289 mode (`ExternalTriggerMode`): The `ExternalTriggerMode` to apply. 290 """ 291 if len(self._active_external_triggers) == 0: 292 raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger mode.") 293 else: 294 for trigger_source in self._active_external_triggers: 295 try: 296 self.write_to_spectrum_device_register( 297 EXTERNAL_TRIGGER_MODE_COMMANDS[trigger_source.value], mode.value 298 ) 299 except KeyError: 300 raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger mode of {trigger_source.name}.")
Change the currently enabled trigger mode. An external trigger source must be enabled.
Arguments:
- mode (
ExternalTriggerMode
): TheExternalTriggerMode
to apply.
311 @property 312 def external_trigger_level_in_mv(self) -> int: 313 """The signal level (mV) needed to trigger an event using an external trigger source. An external 314 trigger source must be enabled. 315 316 Returns: 317 level (int): The currently set trigger level in mV. 318 """ 319 if len(self._active_external_triggers) == 0: 320 raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger level.") 321 else: 322 first_trig_source = self._active_external_triggers[0] 323 try: 324 return self.read_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[first_trig_source.value]) 325 except KeyError: 326 raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger level of {first_trig_source.name}.")
The signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled.
Returns:
level (int): The currently set trigger level in mV.
328 def set_external_trigger_level_in_mv(self, level: int) -> None: 329 """Change the signal level (mV) needed to trigger an event using an external trigger source. An external 330 trigger source must be enabled. 331 332 Args: 333 level (int): The trigger level to set in mV. 334 """ 335 if len(self._active_external_triggers) == 0: 336 raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger level.") 337 else: 338 for trigger_source in self._active_external_triggers: 339 try: 340 self.write_to_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[trigger_source.value], level) 341 except KeyError: 342 raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger level of {trigger_source.name}.")
Change the signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled.
Arguments:
- level (int): The trigger level to set in mV.
344 @property 345 def external_trigger_pulse_width_in_samples(self) -> int: 346 """The pulse width (in samples) needed to trigger an event using an external trigger source, if 347 SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be 348 enabled. 349 350 Returns: 351 width (int): The current trigger pulse width in samples. 352 """ 353 if len(self._active_external_triggers) == 0: 354 raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger pulse width.") 355 else: 356 first_trig_source = self._active_external_triggers[0] 357 try: 358 return self.read_spectrum_device_register( 359 EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[first_trig_source.value] 360 ) 361 except KeyError: 362 raise SpectrumTriggerOperationNotImplemented(f"Cannot get pulse width of {first_trig_source.name}.")
The pulse width (in samples) needed to trigger an event using an external trigger source, if
SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER ExternalTriggerMode
is selected. An external trigger source must be
enabled.
Returns:
width (int): The current trigger pulse width in samples.
364 def set_external_trigger_pulse_width_in_samples(self, width: int) -> None: 365 """Change the pulse width (samples) needed to trigger an event using an external trigger source if 366 SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be 367 enabled. 368 369 Args: 370 width (int): The trigger pulse width to set, in samples.""" 371 if len(self._active_external_triggers) == 0: 372 raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger pulse width.") 373 else: 374 for trigger_source in self._active_external_triggers: 375 try: 376 self.write_to_spectrum_device_register( 377 EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[trigger_source.value], width 378 ) 379 except KeyError: 380 raise SpectrumTriggerOperationNotImplemented(f"Cannot set pulse width of {trigger_source.name}.")
Change the pulse width (samples) needed to trigger an event using an external trigger source if
SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER ExternalTriggerMode
is selected. An external trigger source must be
enabled.
Arguments:
- width (int): The trigger pulse width to set, in samples.
382 def apply_channel_enabling(self) -> None: 383 """Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not 384 usually need to be called.""" 385 enabled_channel_spectrum_values = [self.analog_channels[i].name.value for i in self._enabled_analog_channels] 386 if len(enabled_channel_spectrum_values) in [1, 2, 4, 8]: 387 bitwise_or_of_enabled_channels = reduce(or_, enabled_channel_spectrum_values) 388 self.write_to_spectrum_device_register(SPC_CHENABLE, bitwise_or_of_enabled_channels) 389 else: 390 raise SpectrumInvalidNumberOfEnabledChannels( 391 f"Cannot enable {len(enabled_channel_spectrum_values)} " f"channels on one card." 392 )
Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not usually need to be called.
402 @property 403 def timeout_in_ms(self) -> int: 404 """The time for which the card will wait for a trigger to be received after the device has been started 405 before returning an error. 406 407 Returns: 408 timeout_in_ms (in)t: The currently set timeout in ms. 409 """ 410 return self.read_spectrum_device_register(SPC_TIMEOUT)
The time for which the card will wait for a trigger to be received after the device has been started before returning an error.
Returns:
timeout_in_ms (in)t: The currently set timeout in ms.
412 def set_timeout_in_ms(self, timeout_in_ms: int) -> None: 413 """Change the time for which the card will wait for a trigger to tbe received after the device has started 414 before returning an error. 415 416 Args: 417 timeout_in_ms (int): The desired timeout in ms. 418 """ 419 self.write_to_spectrum_device_register(SPC_TIMEOUT, timeout_in_ms)
Change the time for which the card will wait for a trigger to tbe received after the device has started before returning an error.
Arguments:
- timeout_in_ms (int): The desired timeout in ms.
421 @property 422 def clock_mode(self) -> ClockMode: 423 """The currently enabled clock mode. 424 425 Returns: 426 mode (`ClockMode`): The currently set clock mode. 427 """ 428 return ClockMode(self.read_spectrum_device_register(SPC_CLOCKMODE))
The currently enabled clock mode.
Returns:
mode (
ClockMode
): The currently set clock mode.
430 def set_clock_mode(self, mode: ClockMode) -> None: 431 """Change the clock mode. See `ClockMode` and the Spectrum documentation for available modes. 432 433 Args: 434 mode (`ClockMode`): The desired clock mode. 435 """ 436 self.write_to_spectrum_device_register(SPC_CLOCKMODE, mode.value)
Change the clock mode. See ClockMode
and the Spectrum documentation for available modes.
Arguments:
- mode (
ClockMode
): The desired clock mode.
438 @property 439 def available_io_modes(self) -> AvailableIOModes: 440 """For each multipurpose IO line on the card, read the available modes. See IOLineMode and the Spectrum 441 Documentation for all possible available modes and their meanings. 442 443 Returns: 444 modes (`AvailableIOModes`): An `AvailableIOModes` dataclass containing the modes for each IO line.""" 445 return AvailableIOModes( 446 X0=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X0_AVAILMODES)), 447 X1=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X1_AVAILMODES)), 448 X2=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X2_AVAILMODES)), 449 X3=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X3_AVAILMODES)), 450 )
For each multipurpose IO line on the card, read the available modes. See IOLineMode and the Spectrum Documentation for all possible available modes and their meanings.
Returns:
modes (
AvailableIOModes
): AnAvailableIOModes
dataclass containing the modes for each IO line.
452 @property 453 def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]: 454 """Get a list of the features of the card. See `CardFeature`, `AdvancedCardFeature` and the Spectrum 455 documentation for more information. 456 457 Returns: 458 features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A tuple of two lists - of features 459 and advanced features respectively - wrapped in a list. 460 """ 461 normal_features = decode_card_features(self.read_spectrum_device_register(SPC_PCIFEATURES)) 462 advanced_features = decode_advanced_card_features(self.read_spectrum_device_register(SPC_PCIEXTFEATURES)) 463 return [(normal_features, advanced_features)]
Get a list of the features of the card. See CardFeature
, AdvancedCardFeature
and the Spectrum
documentation for more information.
Returns:
features (List[Tuple[List[
CardFeature
], List[AdvancedCardFeature
]]]): A tuple of two lists - of features and advanced features respectively - wrapped in a list.
465 @property 466 def sample_rate_in_hz(self) -> int: 467 """The rate at which samples will be acquired or generated, in Hz. 468 469 Returns: 470 rate (int): The currently set sample rate in Hz. 471 """ 472 return self.read_spectrum_device_register(SPC_SAMPLERATE, SpectrumRegisterLength.SIXTY_FOUR)
The rate at which samples will be acquired or generated, in Hz.
Returns:
rate (int): The currently set sample rate in Hz.
474 def set_sample_rate_in_hz(self, rate: int) -> None: 475 """Change the rate at which samples will be acquired or generated, in Hz. 476 Args: 477 rate (int): The desired sample rate in Hz. 478 """ 479 self.write_to_spectrum_device_register(SPC_SAMPLERATE, rate, SpectrumRegisterLength.SIXTY_FOUR)
Change the rate at which samples will be acquired or generated, in Hz.
Arguments:
- rate (int): The desired sample rate in Hz.
488 def force_trigger(self) -> None: 489 """Force a trigger event to occur""" 490 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_FORCETRIGGER)
Force a trigger event to occur
Inherited Members
- AbstractSpectrumDevice
- reset
- start
- stop
- configure_trigger
- configure_channel_pairing
- write_to_spectrum_device_register
- read_spectrum_device_register
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- define_transfer_buffer
43class AbstractSpectrumDevice(SpectrumDeviceInterface[AnalogChannelInterfaceType, IOLineInterfaceType], ABC): 44 """Abstract superclass which implements methods common to all Spectrum devices. Instances of this class 45 cannot be constructed directly. Instead, construct instances of the concrete classes listed in 46 spectrumdevice/__init__.py, which inherit the methods defined here. Note that the concrete mock devices override 47 several of the methods defined here.""" 48 49 def _connect(self, visa_string: str) -> None: 50 self._handle = spectrum_handle_factory(visa_string) 51 self._connected = True 52 53 def reset(self) -> None: 54 """Perform a software and hardware reset. 55 56 All settings are set to hardware default values. The data in the board’s on-board memory will be no longer 57 valid. Any output signals (including triggers and clocks) will be disabled.""" 58 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_RESET) 59 60 def start(self) -> None: 61 """Start the device. 62 63 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each 64 acquisition. In-between calls, waveforms must be manually transferred from the device to a `TransferBuffer` 65 using `start_transfer()`. The `TransferBuffer` need not be defined until after `start` is called. 66 67 For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by 68 a call to `start_transfer()`. Frames will then be continuously streamed to the `TransferBuffer`, which must have 69 already been defined. 70 71 # todo: docstring for different AWG modes 72 """ 73 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER) 74 75 def stop(self) -> None: 76 """Stop the device. 77 78 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that 79 occurs after calling `start()`. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE). 80 81 # todo: docstring for AWG 82 """ 83 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_STOP) 84 85 def configure_trigger(self, settings: TriggerSettings) -> None: 86 """Apply all the trigger settings contained in a `TriggerSettings` dataclass to the device. 87 88 Args: 89 settings (`TriggerSettings`): A `TriggerSettings` dataclass containing the setting values to apply.""" 90 self.set_trigger_sources(settings.trigger_sources) 91 if len(set(self.trigger_sources) & set(EXTERNAL_TRIGGER_SOURCES)) > 0: 92 if settings.external_trigger_mode is not None: 93 self.set_external_trigger_mode(settings.external_trigger_mode) 94 if settings.external_trigger_level_in_mv is not None: 95 self.set_external_trigger_level_in_mv(settings.external_trigger_level_in_mv) 96 if settings.external_trigger_pulse_width_in_samples is not None: 97 self.set_external_trigger_pulse_width_in_samples(settings.external_trigger_pulse_width_in_samples) 98 99 # Write the configuration to the card 100 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) 101 102 def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None: 103 """Configures a pair of consecutive channels to operate either independently, in differential mode or 104 in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically 105 configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is 106 required by the Spectrum API. 107 108 Args: 109 channel_pair (ChannelPair): The pair of channels to configure 110 mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL 111 """ 112 113 doubling_enabled = int(mode == ChannelPairingMode.DOUBLE) 114 differential_mode_enabled = int(mode == ChannelPairingMode.DIFFERENTIAL) 115 116 if doubling_enabled and channel_pair in (channel_pair.CHANNEL_4_AND_5, channel_pair.CHANNEL_6_AND_7): 117 raise ValueError("Doubling can only be enabled for channel pairs CHANNEL_0_AND_1 or CHANNEL_2_AND_3.") 118 119 if doubling_enabled or differential_mode_enabled: 120 self._mirror_even_channel_settings_on_odd_channel(channel_pair) 121 self._disable_odd_channel(channel_pair) 122 123 self.write_to_spectrum_device_register( 124 DIFFERENTIAL_CHANNEL_PAIR_COMMANDS[channel_pair], differential_mode_enabled 125 ) 126 self.write_to_spectrum_device_register(DOUBLING_CHANNEL_PAIR_COMMANDS[channel_pair], doubling_enabled) 127 128 def _disable_odd_channel(self, channel_pair: ChannelPair) -> None: 129 try: 130 enabled_channels = copy(self.enabled_analog_channel_nums) 131 enabled_channels.remove(channel_pair.value + 1) 132 self.set_enabled_analog_channels(enabled_channels) 133 except ValueError: 134 pass # odd numbered channel was not enable, so no need to disable it. 135 136 def _mirror_even_channel_settings_on_odd_channel(self, channel_pair: ChannelPair) -> None: 137 self.analog_channels[channel_pair.value + 1].copy_settings_from_other_channel( 138 self.analog_channels[channel_pair.value] 139 ) 140 141 def write_to_spectrum_device_register( 142 self, 143 spectrum_register: int, 144 value: int, 145 length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, 146 ) -> None: 147 """Set the value of a register on the Spectrum device. 148 149 This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to configure a hardware 150 device, but can also be used to set the value of registers that are not implemented in 151 `AbstractSpectrumDigitiser` and its subclasses. 152 153 Args: 154 spectrum_register (int): Identifier of the register to set. This should be a global constant imported from 155 regs.py in the spectrum_gmbh package. 156 value (int): Value to write to the register. This should be a global constant imported from 157 regs.py in the spectrum_gmbh package. 158 length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register 159 to set, in bits. 160 """ 161 if not SPECTRUM_DRIVERS_FOUND: 162 raise SpectrumDriversNotFound( 163 "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" 164 " MockSpectrumDigitiserCard instead." 165 ) 166 if self.connected: 167 if length == SpectrumRegisterLength.THIRTY_TWO: 168 set_spectrum_i32_api_param(self._handle, spectrum_register, value) 169 elif length == SpectrumRegisterLength.SIXTY_FOUR: 170 set_spectrum_i64_api_param(self._handle, spectrum_register, value) 171 else: 172 raise ValueError("Spectrum integer length not recognised.") 173 else: 174 raise SpectrumDeviceNotConnected("The device has been disconnected.") 175 176 def read_spectrum_device_register( 177 self, 178 spectrum_register: int, 179 length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, 180 ) -> int: 181 """Get the value of a register on the Spectrum digitiser. 182 183 This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to read the configuration of a 184 hardware device, but can be also used to get the value of registers that are not implemented in 185 `AbstractSpectrumDigitiser` and its subclasses. 186 187 Args: 188 spectrum_register (int): Identifier of the register to set. This should be a global constant imported from 189 spectrum_gmbh.py_header.regs. 190 length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register 191 to set, in bits. 192 193 Returns: 194 value (int): Value of the register. This can be matched to a global constant imported from 195 spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module. 196 """ 197 if not SPECTRUM_DRIVERS_FOUND: 198 raise SpectrumDriversNotFound( 199 "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" 200 " a mock device instead (e.g. MockSpectrumDigitiserCard or MockSpectrumStarHub)." 201 ) 202 if self.connected: 203 if length == SpectrumRegisterLength.THIRTY_TWO: 204 return get_spectrum_i32_api_param(self._handle, spectrum_register) 205 elif length == SpectrumRegisterLength.SIXTY_FOUR: 206 return get_spectrum_i64_api_param(self._handle, spectrum_register) 207 else: 208 raise ValueError("Spectrum integer length not recognised.") 209 else: 210 raise SpectrumDeviceNotConnected("The device has been disconnected.") 211 212 def __repr__(self) -> str: 213 return str(self)
Abstract superclass which implements methods common to all Spectrum devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes listed in spectrumdevice/__init__.py, which inherit the methods defined here. Note that the concrete mock devices override several of the methods defined here.
53 def reset(self) -> None: 54 """Perform a software and hardware reset. 55 56 All settings are set to hardware default values. The data in the board’s on-board memory will be no longer 57 valid. Any output signals (including triggers and clocks) will be disabled.""" 58 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_RESET)
Perform a software and hardware reset.
All settings are set to hardware default values. The data in the board’s on-board memory will be no longer valid. Any output signals (including triggers and clocks) will be disabled.
60 def start(self) -> None: 61 """Start the device. 62 63 For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each 64 acquisition. In-between calls, waveforms must be manually transferred from the device to a `TransferBuffer` 65 using `start_transfer()`. The `TransferBuffer` need not be defined until after `start` is called. 66 67 For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by 68 a call to `start_transfer()`. Frames will then be continuously streamed to the `TransferBuffer`, which must have 69 already been defined. 70 71 # todo: docstring for different AWG modes 72 """ 73 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER)
Start the device.
For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each
acquisition. In-between calls, waveforms must be manually transferred from the device to a TransferBuffer
using start_transfer()
. The TransferBuffer
need not be defined until after start
is called.
For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by
a call to start_transfer()
. Frames will then be continuously streamed to the TransferBuffer
, which must have
already been defined.
todo: docstring for different AWG modes
75 def stop(self) -> None: 76 """Stop the device. 77 78 For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that 79 occurs after calling `start()`. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE). 80 81 # todo: docstring for AWG 82 """ 83 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_STOP)
Stop the device.
For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that
occurs after calling start()
. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE).
todo: docstring for AWG
85 def configure_trigger(self, settings: TriggerSettings) -> None: 86 """Apply all the trigger settings contained in a `TriggerSettings` dataclass to the device. 87 88 Args: 89 settings (`TriggerSettings`): A `TriggerSettings` dataclass containing the setting values to apply.""" 90 self.set_trigger_sources(settings.trigger_sources) 91 if len(set(self.trigger_sources) & set(EXTERNAL_TRIGGER_SOURCES)) > 0: 92 if settings.external_trigger_mode is not None: 93 self.set_external_trigger_mode(settings.external_trigger_mode) 94 if settings.external_trigger_level_in_mv is not None: 95 self.set_external_trigger_level_in_mv(settings.external_trigger_level_in_mv) 96 if settings.external_trigger_pulse_width_in_samples is not None: 97 self.set_external_trigger_pulse_width_in_samples(settings.external_trigger_pulse_width_in_samples) 98 99 # Write the configuration to the card 100 self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)
Apply all the trigger settings contained in a TriggerSettings
dataclass to the device.
Arguments:
- settings (
TriggerSettings
): ATriggerSettings
dataclass containing the setting values to apply.
102 def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None: 103 """Configures a pair of consecutive channels to operate either independently, in differential mode or 104 in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically 105 configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is 106 required by the Spectrum API. 107 108 Args: 109 channel_pair (ChannelPair): The pair of channels to configure 110 mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL 111 """ 112 113 doubling_enabled = int(mode == ChannelPairingMode.DOUBLE) 114 differential_mode_enabled = int(mode == ChannelPairingMode.DIFFERENTIAL) 115 116 if doubling_enabled and channel_pair in (channel_pair.CHANNEL_4_AND_5, channel_pair.CHANNEL_6_AND_7): 117 raise ValueError("Doubling can only be enabled for channel pairs CHANNEL_0_AND_1 or CHANNEL_2_AND_3.") 118 119 if doubling_enabled or differential_mode_enabled: 120 self._mirror_even_channel_settings_on_odd_channel(channel_pair) 121 self._disable_odd_channel(channel_pair) 122 123 self.write_to_spectrum_device_register( 124 DIFFERENTIAL_CHANNEL_PAIR_COMMANDS[channel_pair], differential_mode_enabled 125 ) 126 self.write_to_spectrum_device_register(DOUBLING_CHANNEL_PAIR_COMMANDS[channel_pair], doubling_enabled)
Configures a pair of consecutive channels to operate either independently, in differential mode or in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is required by the Spectrum API.
Arguments:
- channel_pair (ChannelPair): The pair of channels to configure
- mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL
141 def write_to_spectrum_device_register( 142 self, 143 spectrum_register: int, 144 value: int, 145 length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, 146 ) -> None: 147 """Set the value of a register on the Spectrum device. 148 149 This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to configure a hardware 150 device, but can also be used to set the value of registers that are not implemented in 151 `AbstractSpectrumDigitiser` and its subclasses. 152 153 Args: 154 spectrum_register (int): Identifier of the register to set. This should be a global constant imported from 155 regs.py in the spectrum_gmbh package. 156 value (int): Value to write to the register. This should be a global constant imported from 157 regs.py in the spectrum_gmbh package. 158 length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register 159 to set, in bits. 160 """ 161 if not SPECTRUM_DRIVERS_FOUND: 162 raise SpectrumDriversNotFound( 163 "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" 164 " MockSpectrumDigitiserCard instead." 165 ) 166 if self.connected: 167 if length == SpectrumRegisterLength.THIRTY_TWO: 168 set_spectrum_i32_api_param(self._handle, spectrum_register, value) 169 elif length == SpectrumRegisterLength.SIXTY_FOUR: 170 set_spectrum_i64_api_param(self._handle, spectrum_register, value) 171 else: 172 raise ValueError("Spectrum integer length not recognised.") 173 else: 174 raise SpectrumDeviceNotConnected("The device has been disconnected.")
Set the value of a register on the Spectrum device.
This method is used internally by AbstractSpectrumDigitiser
and its subclasses to configure a hardware
device, but can also be used to set the value of registers that are not implemented in
AbstractSpectrumDigitiser
and its subclasses.
Arguments:
- spectrum_register (int): Identifier of the register to set. This should be a global constant imported from regs.py in the spectrum_gmbh package.
- value (int): Value to write to the register. This should be a global constant imported from regs.py in the spectrum_gmbh package.
- length (
SpectrumRegisterLength
): ASpectrumRegisterLength
object specifying the length of the register to set, in bits.
176 def read_spectrum_device_register( 177 self, 178 spectrum_register: int, 179 length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, 180 ) -> int: 181 """Get the value of a register on the Spectrum digitiser. 182 183 This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to read the configuration of a 184 hardware device, but can be also used to get the value of registers that are not implemented in 185 `AbstractSpectrumDigitiser` and its subclasses. 186 187 Args: 188 spectrum_register (int): Identifier of the register to set. This should be a global constant imported from 189 spectrum_gmbh.py_header.regs. 190 length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register 191 to set, in bits. 192 193 Returns: 194 value (int): Value of the register. This can be matched to a global constant imported from 195 spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module. 196 """ 197 if not SPECTRUM_DRIVERS_FOUND: 198 raise SpectrumDriversNotFound( 199 "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use" 200 " a mock device instead (e.g. MockSpectrumDigitiserCard or MockSpectrumStarHub)." 201 ) 202 if self.connected: 203 if length == SpectrumRegisterLength.THIRTY_TWO: 204 return get_spectrum_i32_api_param(self._handle, spectrum_register) 205 elif length == SpectrumRegisterLength.SIXTY_FOUR: 206 return get_spectrum_i64_api_param(self._handle, spectrum_register) 207 else: 208 raise ValueError("Spectrum integer length not recognised.") 209 else: 210 raise SpectrumDeviceNotConnected("The device has been disconnected.")
Get the value of a register on the Spectrum digitiser.
This method is used internally by AbstractSpectrumDigitiser
and its subclasses to read the configuration of a
hardware device, but can be also used to get the value of registers that are not implemented in
AbstractSpectrumDigitiser
and its subclasses.
Arguments:
- spectrum_register (int): Identifier of the register to set. This should be a global constant imported from spectrum_gmbh.py_header.regs.
- length (
SpectrumRegisterLength
): ASpectrumRegisterLength
object specifying the length of the register to set, in bits.
Returns:
value (int): Value of the register. This can be matched to a global constant imported from spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module.
Inherited Members
- spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
- connected
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- disconnect
- transfer_buffers
- define_transfer_buffer
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- clock_mode
- set_clock_mode
- sample_rate_in_hz
- set_sample_rate_in_hz
- available_io_modes
- feature_list
- timeout_in_ms
- set_timeout_in_ms
- force_trigger
- bytes_per_sample
- type
- model_number
22class AbstractSpectrumChannel(SpectrumChannelInterface, Generic[ChannelNameType]): 23 """Partially implemented abstract superclass contain code common for controlling an individual channel or IO Line of 24 all spectrum devices.""" 25 26 def __init__(self, channel_number: int, parent_device: SpectrumDeviceInterface, **kwargs: Any) -> None: 27 super().__init__(**kwargs) 28 self._name = self._make_name(channel_number) 29 self._parent_device = parent_device 30 self._enabled = True 31 32 @property 33 @abstractmethod 34 def _name_prefix(self) -> str: 35 raise NotImplementedError 36 37 @abstractmethod 38 def _make_name(self, channel_number: int) -> ChannelNameType: 39 raise NotImplementedError 40 41 @property 42 def name(self) -> ChannelNameType: 43 """The identifier assigned by the spectrum driver, formatted as an Enum by the settings package. 44 45 Returns: 46 name (`SpectrumChannelName`): The name of the channel, as assigned by the driver.""" 47 return self._name 48 49 @property 50 def _number(self) -> int: 51 return int(self.name.name.split(self._name_prefix)[-1]) 52 53 def write_to_parent_device_register( 54 self, 55 spectrum_register: int, 56 value: int, 57 length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, 58 ) -> None: 59 self._parent_device.write_to_spectrum_device_register(spectrum_register, value, length) 60 61 def read_parent_device_register( 62 self, 63 spectrum_register: int, 64 length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, 65 ) -> int: 66 return self._parent_device.read_spectrum_device_register(spectrum_register, length) 67 68 def __eq__(self, other: object) -> bool: 69 if isinstance(other, AbstractSpectrumChannel): 70 return (self.name == other.name) and (self._parent_device == other._parent_device) 71 else: 72 raise NotImplementedError() 73 74 def __str__(self) -> str: 75 return f"{self._name.name} of {self._parent_device}" 76 77 def __repr__(self) -> str: 78 return str(self)
Partially implemented abstract superclass contain code common for controlling an individual channel or IO Line of all spectrum devices.
Inherited Members
- spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumChannelInterface
- name
- write_to_parent_device_register
- read_parent_device_register
13@dataclass 14class Measurement: 15 """Measurement is a dataclass for storing a set of waveforms generated by a single acquisition, with a timestamp.""" 16 17 waveforms: list[VoltageWaveformType] | list[RawWaveformType] 18 """Contains the acquired waveforms as a list of 1D NumPy arrays or either floats or ints""" 19 timestamp: datetime | None 20 """The time at which the acquisition was triggered, as a datetime.datetime object"""
Measurement is a dataclass for storing a set of waveforms generated by a single acquisition, with a timestamp.
31class SpectrumAWGCard( 32 AbstractSpectrumCard[SpectrumAWGAnalogChannelInterface, SpectrumAWGIOLineInterface], AbstractSpectrumAWG 33): 34 """Class for controlling individual Spectrum AWG cards.""" 35 36 def _init_analog_channels(self) -> Sequence[SpectrumAWGAnalogChannelInterface]: 37 num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES) 38 num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE) 39 total_channels = num_modules * num_channels_per_module 40 return tuple([SpectrumAWGAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)]) 41 42 def _init_io_lines(self) -> Sequence[SpectrumAWGIOLineInterface]: 43 if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES: 44 return tuple([SpectrumAWGIOLine(channel_number=n, parent_device=self) for n in range(4)]) 45 else: 46 raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.") 47 48 def transfer_waveform(self, waveform: NDArray[int16]) -> None: 49 """ "Write an arbitrary waveform to the card's on-board memory. 50 51 Args: 52 waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the 53 waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see 54 SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16 55 (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of 56 8 samples, the waveform will be zero-padded so its length is the next multiple of 8. 57 58 """ 59 if len(waveform) < 16: 60 raise ValueError("Waveform must be at least 16 samples long") 61 step_size = get_memsize_step_size(self._model_number) 62 remainder = len(waveform) % step_size 63 if remainder > 0: 64 logger.warning( 65 "Length of waveform transmitted to AWG is not a multiple of 8 samples. Waveform in card memory will be " 66 "zero-padded to the next multiple of 8." 67 ) 68 coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder) 69 70 buffer = transfer_buffer_factory( 71 buffer_type=BufferType.SPCM_BUF_DATA, 72 direction=BufferDirection.SPCM_DIR_PCTOCARD, 73 size_in_samples=coerced_mem_size, 74 bytes_per_sample=self.bytes_per_sample, 75 ) 76 buffer.data_array[:] = concatenate([waveform, zeros(coerced_mem_size - len(waveform), dtype=int16)]) 77 self.define_transfer_buffer((buffer,)) 78 self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size) 79 self.start_transfer() 80 self.wait_for_transfer_chunk_to_complete() 81 82 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 83 """Provide a `TransferBuffer` object for transferring samples to the card. This is called internally when 84 transfer_waveform is used to send a single waveform to the card. 85 86 Args: 87 buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed 88 `TransferBuffer` The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and 89 BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the 90 length of the data to transfer. 91 """ 92 if buffer is None: 93 raise ValueError( 94 "You must provide a preconfigured buffer for transferring samples to an AWG because the" 95 "buffer size cannot be inferred." 96 ) 97 self._transfer_buffer = buffer[0] 98 set_transfer_buffer(self._handle, self._transfer_buffer)
Class for controlling individual Spectrum AWG cards.
48 def transfer_waveform(self, waveform: NDArray[int16]) -> None: 49 """ "Write an arbitrary waveform to the card's on-board memory. 50 51 Args: 52 waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the 53 waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see 54 SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16 55 (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of 56 8 samples, the waveform will be zero-padded so its length is the next multiple of 8. 57 58 """ 59 if len(waveform) < 16: 60 raise ValueError("Waveform must be at least 16 samples long") 61 step_size = get_memsize_step_size(self._model_number) 62 remainder = len(waveform) % step_size 63 if remainder > 0: 64 logger.warning( 65 "Length of waveform transmitted to AWG is not a multiple of 8 samples. Waveform in card memory will be " 66 "zero-padded to the next multiple of 8." 67 ) 68 coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder) 69 70 buffer = transfer_buffer_factory( 71 buffer_type=BufferType.SPCM_BUF_DATA, 72 direction=BufferDirection.SPCM_DIR_PCTOCARD, 73 size_in_samples=coerced_mem_size, 74 bytes_per_sample=self.bytes_per_sample, 75 ) 76 buffer.data_array[:] = concatenate([waveform, zeros(coerced_mem_size - len(waveform), dtype=int16)]) 77 self.define_transfer_buffer((buffer,)) 78 self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size) 79 self.start_transfer() 80 self.wait_for_transfer_chunk_to_complete()
"Write an arbitrary waveform to the card's on-board memory.
Arguments:
- waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16 (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of 8 samples, the waveform will be zero-padded so its length is the next multiple of 8.
82 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 83 """Provide a `TransferBuffer` object for transferring samples to the card. This is called internally when 84 transfer_waveform is used to send a single waveform to the card. 85 86 Args: 87 buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed 88 `TransferBuffer` The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and 89 BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the 90 length of the data to transfer. 91 """ 92 if buffer is None: 93 raise ValueError( 94 "You must provide a preconfigured buffer for transferring samples to an AWG because the" 95 "buffer size cannot be inferred." 96 ) 97 self._transfer_buffer = buffer[0] 98 set_transfer_buffer(self._handle, self._transfer_buffer)
Provide a TransferBuffer
object for transferring samples to the card. This is called internally when
transfer_waveform is used to send a single waveform to the card.
Arguments:
- buffer (Optional[List[
TransferBuffer
]]): A length-1 list containing a pre-constructedTransferBuffer
The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the length of the data to transfer.
Inherited Members
- AbstractSpectrumCard
- AbstractSpectrumCard
- model_number
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- set_sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
- spectrumdevice.devices.awg.abstract_spectrum_awg.AbstractSpectrumAWG
- configure_generation
- generation_mode
- set_generation_mode
- num_loops
- set_num_loops
165class MockSpectrumAWGCard(MockAbstractSpectrumAWG, MockAbstractSpectrumCard, SpectrumAWGCard): 166 """A mock AWG card.""" 167 168 def __init__( 169 self, 170 device_number: int, 171 model: ModelNumber, 172 num_modules: int, 173 num_channels_per_module: int, 174 card_features: Optional[list[CardFeature]] = None, 175 advanced_card_features: Optional[list[AdvancedCardFeature]] = None, 176 ) -> None: 177 """ 178 Args: 179 device_number (int): The index of the mock device to create. Used to create a name for the device which is 180 used internally. 181 model (ModelNumber): The model of card to mock. 182 num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this 183 is read from the device so does not need to be set. See the Spectrum documentation to work out how many 184 modules your hardware has. 185 num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On 186 real hardware, this is read from the device so does not need to be set. 187 card_features (list[CardFeature]): List of available features of the mock device 188 advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device 189 """ 190 super().__init__( 191 card_type=CardType.SPCM_TYPE_AO, 192 device_number=device_number, 193 model=model, 194 num_modules=num_modules, 195 num_channels_per_module=num_channels_per_module, 196 card_features=card_features if card_features is not None else [], 197 advanced_card_features=advanced_card_features if advanced_card_features is not None else [], 198 ) 199 self._connect(self._visa_string) 200 201 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 202 """Create or provide a `TransferBuffer` object for transferring samples from the device. 203 204 See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it 205 does not write to any hardware device.""" 206 if buffer is None: 207 raise ValueError( 208 "You must provide a preconfigured buffer for transferring samples to an AWG because the" 209 "buffer size cannot be inferred." 210 ) 211 self._transfer_buffer = buffer[0]
A mock AWG card.
168 def __init__( 169 self, 170 device_number: int, 171 model: ModelNumber, 172 num_modules: int, 173 num_channels_per_module: int, 174 card_features: Optional[list[CardFeature]] = None, 175 advanced_card_features: Optional[list[AdvancedCardFeature]] = None, 176 ) -> None: 177 """ 178 Args: 179 device_number (int): The index of the mock device to create. Used to create a name for the device which is 180 used internally. 181 model (ModelNumber): The model of card to mock. 182 num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this 183 is read from the device so does not need to be set. See the Spectrum documentation to work out how many 184 modules your hardware has. 185 num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On 186 real hardware, this is read from the device so does not need to be set. 187 card_features (list[CardFeature]): List of available features of the mock device 188 advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device 189 """ 190 super().__init__( 191 card_type=CardType.SPCM_TYPE_AO, 192 device_number=device_number, 193 model=model, 194 num_modules=num_modules, 195 num_channels_per_module=num_channels_per_module, 196 card_features=card_features if card_features is not None else [], 197 advanced_card_features=advanced_card_features if advanced_card_features is not None else [], 198 ) 199 self._connect(self._visa_string)
Arguments:
- device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally.
- model (ModelNumber): The model of card to mock.
- num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has.
- num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set.
- card_features (list[CardFeature]): List of available features of the mock device
- advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
201 def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None: 202 """Create or provide a `TransferBuffer` object for transferring samples from the device. 203 204 See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it 205 does not write to any hardware device.""" 206 if buffer is None: 207 raise ValueError( 208 "You must provide a preconfigured buffer for transferring samples to an AWG because the" 209 "buffer size cannot be inferred." 210 ) 211 self._transfer_buffer = buffer[0]
Create or provide a TransferBuffer
object for transferring samples from the device.
See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.
Inherited Members
- spectrumdevice.devices.awg.abstract_spectrum_awg.AbstractSpectrumAWG
- configure_generation
- generation_mode
- set_generation_mode
- num_loops
- set_num_loops
- spectrumdevice.devices.mocks.mock_abstract_devices.MockAbstractSpectrumDevice
- write_to_spectrum_device_register
- read_spectrum_device_register
- AbstractSpectrumCard
- model_number
- reconnect
- status
- start_transfer
- stop_transfer
- wait_for_transfer_chunk_to_complete
- transfer_buffers
- disconnect
- connected
- analog_channels
- io_lines
- enabled_analog_channel_nums
- set_enabled_analog_channels
- trigger_sources
- set_trigger_sources
- external_trigger_mode
- set_external_trigger_mode
- external_trigger_level_in_mv
- set_external_trigger_level_in_mv
- external_trigger_pulse_width_in_samples
- set_external_trigger_pulse_width_in_samples
- apply_channel_enabling
- timeout_in_ms
- set_timeout_in_ms
- clock_mode
- set_clock_mode
- available_io_modes
- feature_list
- sample_rate_in_hz
- set_sample_rate_in_hz
- type
- force_trigger
- bytes_per_sample
60class SpectrumAWGAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumAWGAnalogChannelInterface): 61 """Class for controlling analog channels of an AWG.""" 62 63 def __init__(self, parent_device: SpectrumAWGInterface, **kwargs: Any) -> None: 64 if parent_device.type != CardType.SPCM_TYPE_AO: 65 raise SpectrumCardIsNotAnAWG(parent_device.type) 66 super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy 67 68 def _get_settings_as_dict(self) -> dict: 69 return { 70 SpectrumAWGAnalogChannel.signal_amplitude_in_mv.__name__: self.signal_amplitude_in_mv, 71 SpectrumAWGAnalogChannel.dc_offset_in_mv.__name__: self.dc_offset_in_mv, 72 SpectrumAWGAnalogChannel.output_filter.__name__: self.output_filter, 73 SpectrumAWGAnalogChannel.stop_level_mode.__name__: self.stop_level_mode, 74 SpectrumAWGAnalogChannel.stop_level_custom_value.__name__: self.stop_level_custom_value, 75 } 76 77 def _set_settings_from_dict(self, settings: dict) -> None: 78 self.set_signal_amplitude_in_mv(settings[SpectrumAWGAnalogChannel.signal_amplitude_in_mv.__name__]) 79 self.set_dc_offset_in_mv(settings[SpectrumAWGAnalogChannel.dc_offset_in_mv.__name__]) 80 self.set_output_filter(settings[SpectrumAWGAnalogChannel.output_filter.__name__]) 81 self.set_stop_level_mode(settings[SpectrumAWGAnalogChannel.stop_level_mode.__name__]) 82 self.set_stop_level_custom_value(settings[SpectrumAWGAnalogChannel.stop_level_custom_value.__name__]) 83 84 @property 85 def is_switched_on(self) -> bool: 86 """Returns "True" if the output channel is switched on, or "False" if it is muted.""" 87 return bool(self._parent_device.read_spectrum_device_register(OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number])) 88 89 def set_is_switched_on(self, is_enabled: bool) -> None: 90 """Switches the output channel on ("True") or off ("False").""" 91 self._parent_device.write_to_spectrum_device_register( 92 OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number], int(is_enabled) 93 ) 94 95 @property 96 def dc_offset_in_mv(self) -> int: 97 """The current output signal DC offset in mV. 98 99 Returns: 100 dc_offset (int): The currently set output signal DC offset in mV. 101 """ 102 return self._parent_device.read_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number]) 103 104 def set_dc_offset_in_mv(self, dc_offset: int) -> None: 105 if dc_offset > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: 106 raise ValueError( 107 f"Max allowed signal DC offset for card {self._parent_device.model_number} is " 108 f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " 109 f"so {dc_offset} mV is too high." 110 ) 111 self._parent_device.write_to_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number], dc_offset) 112 113 @property 114 def signal_amplitude_in_mv(self) -> int: 115 """The current output signal amplitude in mV. 116 117 Returns: 118 amplitude (int): The currently set output signal amplitude in mV. 119 """ 120 return self._parent_device.read_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number]) 121 122 def set_signal_amplitude_in_mv(self, amplitude: int) -> None: 123 if amplitude > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: 124 raise ValueError( 125 f"Max allowed signal amplitude for card {self._parent_device.model_number} is " 126 f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " 127 f"so {amplitude} mV is too high." 128 ) 129 self._parent_device.write_to_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number], amplitude) 130 131 @property 132 def output_filter(self) -> OutputChannelFilter: 133 """The current output filter setting. 134 135 Returns: 136 output_filter (OutputChannelFilter): The currently set output filter. 137 """ 138 return OutputChannelFilter( 139 self._parent_device.read_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number]) 140 ) 141 142 def set_output_filter(self, output_filter: OutputChannelFilter) -> None: 143 self._parent_device.write_to_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number], output_filter.value) 144 145 @property 146 def stop_level_mode(self) -> OutputChannelStopLevelMode: 147 """Sets the behavior of the channel when the output is stopped or playback finished.""" 148 return OutputChannelStopLevelMode( 149 self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number]) 150 ) 151 152 def set_stop_level_mode(self, mode: OutputChannelStopLevelMode) -> None: 153 self._parent_device.write_to_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number], mode.value) 154 155 @property 156 def stop_level_custom_value(self) -> int16: 157 """Sets the level to which the output will be set when the output is stopped or playback finished and 158 stop_level_mode is set to `OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM`.""" 159 return int16( 160 self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number]) 161 ) 162 163 def set_stop_level_custom_value(self, value: int16) -> None: 164 self._parent_device.write_to_spectrum_device_register( 165 OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number], int(value) 166 )
Class for controlling analog channels of an AWG.
84 @property 85 def is_switched_on(self) -> bool: 86 """Returns "True" if the output channel is switched on, or "False" if it is muted.""" 87 return bool(self._parent_device.read_spectrum_device_register(OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number]))
Returns "True" if the output channel is switched on, or "False" if it is muted.
89 def set_is_switched_on(self, is_enabled: bool) -> None: 90 """Switches the output channel on ("True") or off ("False").""" 91 self._parent_device.write_to_spectrum_device_register( 92 OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number], int(is_enabled) 93 )
Switches the output channel on ("True") or off ("False").
95 @property 96 def dc_offset_in_mv(self) -> int: 97 """The current output signal DC offset in mV. 98 99 Returns: 100 dc_offset (int): The currently set output signal DC offset in mV. 101 """ 102 return self._parent_device.read_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number])
The current output signal DC offset in mV.
Returns:
dc_offset (int): The currently set output signal DC offset in mV.
104 def set_dc_offset_in_mv(self, dc_offset: int) -> None: 105 if dc_offset > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: 106 raise ValueError( 107 f"Max allowed signal DC offset for card {self._parent_device.model_number} is " 108 f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " 109 f"so {dc_offset} mV is too high." 110 ) 111 self._parent_device.write_to_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number], dc_offset)
113 @property 114 def signal_amplitude_in_mv(self) -> int: 115 """The current output signal amplitude in mV. 116 117 Returns: 118 amplitude (int): The currently set output signal amplitude in mV. 119 """ 120 return self._parent_device.read_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number])
The current output signal amplitude in mV.
Returns:
amplitude (int): The currently set output signal amplitude in mV.
122 def set_signal_amplitude_in_mv(self, amplitude: int) -> None: 123 if amplitude > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]: 124 raise ValueError( 125 f"Max allowed signal amplitude for card {self._parent_device.model_number} is " 126 f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, " 127 f"so {amplitude} mV is too high." 128 ) 129 self._parent_device.write_to_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number], amplitude)
131 @property 132 def output_filter(self) -> OutputChannelFilter: 133 """The current output filter setting. 134 135 Returns: 136 output_filter (OutputChannelFilter): The currently set output filter. 137 """ 138 return OutputChannelFilter( 139 self._parent_device.read_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number]) 140 )
The current output filter setting.
Returns:
output_filter (OutputChannelFilter): The currently set output filter.
145 @property 146 def stop_level_mode(self) -> OutputChannelStopLevelMode: 147 """Sets the behavior of the channel when the output is stopped or playback finished.""" 148 return OutputChannelStopLevelMode( 149 self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number]) 150 )
Sets the behavior of the channel when the output is stopped or playback finished.
155 @property 156 def stop_level_custom_value(self) -> int16: 157 """Sets the level to which the output will be set when the output is stopped or playback finished and 158 stop_level_mode is set to `OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM`.""" 159 return int16( 160 self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number]) 161 )
Sets the level to which the output will be set when the output is stopped or playback finished and
stop_level_mode is set to OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM
.
Inherited Members
- spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface
- copy_settings_from_other_channel
34class SpectrumAWGIOLine(AbstractSpectrumIOLine, SpectrumAWGIOLineInterface): 35 """Class for controlling Multipurpose IO lines of an AWG (e.g. X0, X1, X2 and X3)""" 36 37 def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: 38 if parent_device.type != CardType.SPCM_TYPE_AO: 39 raise SpectrumCardIsNotAnAWG(parent_device.type) 40 super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy 41 self._dig_out_settings = DigOutIOLineModeSettings( 42 source_channel=DigOutSourceChannel.SPCM_XMODE_DIGOUTSRC_CH0, 43 source_bit=DigOutSourceBit.SPCM_XMODE_DIGOUTSRC_BIT15, 44 ) 45 46 @property 47 def dig_out_settings(self) -> DigOutIOLineModeSettings: 48 return self._dig_out_settings 49 50 def set_dig_out_settings(self, dig_out_settings: DigOutIOLineModeSettings) -> None: 51 self._dig_out_settings = dig_out_settings 52 53 def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int: 54 if mode == IOLineMode.SPCM_XMODE_DIGOUT: 55 return self._dig_out_settings.source_channel.value | self._dig_out_settings.source_bit.value 56 else: 57 return 0
Class for controlling Multipurpose IO lines of an AWG (e.g. X0, X1, X2 and X3)
37 def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None: 38 if parent_device.type != CardType.SPCM_TYPE_AO: 39 raise SpectrumCardIsNotAnAWG(parent_device.type) 40 super().__init__(parent_device=parent_device, **kwargs) # pass unused args up the inheritance hierarchy 41 self._dig_out_settings = DigOutIOLineModeSettings( 42 source_channel=DigOutSourceChannel.SPCM_XMODE_DIGOUTSRC_CH0, 43 source_bit=DigOutSourceBit.SPCM_XMODE_DIGOUTSRC_BIT15, 44 )
Inherited Members
- spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumIOLineInterface
- mode
- set_mode
- pulse_generator
55class PulseGenerator(PulseGeneratorInterface): 56 """Class for controlling pulse generators associated with IO lines (requires firmware option be enabled).""" 57 58 def __init__(self, parent: SpectrumIOLineInterface): 59 self._parent_io_line = parent 60 # last char of IO line name is IO line chanel number, which is used to set pulse generator number 61 self._number = int(parent.name.name[-1]) 62 available_advanced_features = decode_advanced_card_features( 63 self.read_parent_device_register(SPC_PCIEXTFEATURES) 64 ) 65 if AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN not in available_advanced_features: 66 raise SpectrumFeatureNotSupportedByCard( 67 call_description=self.__str__() + ".__init__()", 68 message="Pulse generator firmware option not installed on device.", 69 ) 70 self._multiplexer_1 = PulseGeneratorMultiplexer1(parent=self) 71 self._multiplexer_2 = PulseGeneratorMultiplexer2(parent=self) 72 73 def configure_output( 74 self, settings: PulseGeneratorOutputSettings, coerce: bool = True 75 ) -> PulseGeneratorOutputSettings: 76 """Configure all pulse generator output settings at once. By default, all values are coerced to the 77 nearest values allowed by the hardware, and the coerced values are returned.""" 78 self.set_output_inversion(settings.output_inversion) 79 coerced_settings = PulseGeneratorOutputSettings( 80 period_in_seconds=self.set_period_in_seconds(settings.period_in_seconds, coerce=coerce), 81 duty_cycle=self.set_duty_cycle(settings.duty_cycle, coerce=coerce), 82 num_pulses=self.set_num_pulses(settings.num_pulses, coerce=coerce), 83 delay_in_seconds=self.set_delay_in_seconds(settings.delay_in_seconds, coerce=coerce), 84 output_inversion=settings.output_inversion, 85 ) 86 self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) 87 return coerced_settings 88 89 def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None: 90 """Configure all pulse generator trigger settings at once.""" 91 self.set_trigger_mode(settings.trigger_mode) 92 self.set_trigger_detection_mode(settings.trigger_detection_mode) 93 self.multiplexer_1.set_trigger_source(settings.multiplexer_1_source) 94 self.multiplexer_2.set_trigger_source(settings.multiplexer_2_source) 95 self.multiplexer_1.set_output_inversion(settings.multiplexer_1_output_inversion) 96 self.multiplexer_2.set_output_inversion(settings.multiplexer_2_output_inversion) 97 self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) 98 99 def force_trigger(self) -> None: 100 """Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'.""" 101 if ( 102 self._multiplexer_2.trigger_source 103 != PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE 104 ): 105 raise SpectrumIOError("Force trigger can only be used if trigger source (mux 2) is set to 'software'") 106 self.write_to_parent_device_register(SPC_XIO_PULSEGEN_COMMAND, SPCM_PULSEGEN_CMD_FORCE) 107 108 @property 109 def number(self) -> int: 110 """The index of the pulse generator. Corresponds to the index of the IO line to which it belongs.""" 111 return self._number 112 113 @property 114 def multiplexer_1(self) -> PulseGeneratorMultiplexer1: 115 """Change the trigger source of this multiplexer to control when it is possible to trigger the pulse generator.""" 116 return self._multiplexer_1 117 118 @property 119 def multiplexer_2(self) -> PulseGeneratorMultiplexer2: 120 """Change the trigger source of this multiplexer to control how the pulse generator is triggered.""" 121 return self._multiplexer_2 122 123 def read_parent_device_register( 124 self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO 125 ) -> int: 126 return self._parent_io_line.read_parent_device_register(spectrum_register, length) 127 128 def write_to_parent_device_register( 129 self, 130 spectrum_register: int, 131 value: int, 132 length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO, 133 ) -> None: 134 self._parent_io_line.write_to_parent_device_register(spectrum_register, value, length) 135 136 def _convert_clock_cycles_to_seconds(self, clock_cycles: int) -> float: 137 return clock_cycles * self.clock_period_in_seconds 138 139 def _convert_seconds_to_clock_cycles(self, seconds: float) -> float: 140 # round to nearest milli-cycle to avoid floating point precision problems 141 return round(seconds * self.clock_rate_in_hz * 1e3) / 1e3 142 143 def _get_enabled_pulse_generator_ids(self) -> list[int]: 144 return decode_enabled_pulse_gens(self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE)) 145 146 @property 147 def clock_rate_in_hz(self) -> int: 148 """The current pulse generator clock rate. Affected by the sample rate of the parent card, and the number of 149 channels enabled. Effects the precision with which pulse timings can be set, and their min and max values.""" 150 return self.read_parent_device_register(SPC_XIO_PULSEGEN_CLOCK) 151 152 @property 153 def clock_period_in_seconds(self) -> float: 154 """The reciprocal of the clock rate, in seconds.""" 155 return 1 / self.clock_rate_in_hz 156 157 @property 158 def enabled(self) -> bool: 159 """True if the pulse generator is currently enabled.""" 160 return PULSE_GEN_ENABLE_COMMANDS[self._number] in self._get_enabled_pulse_generator_ids() 161 162 def enable(self) -> None: 163 """Enable the pulse generator. Note that the mode of the parent IO Line must also be set to 164 IOLineMOdO.SPCM_XMODE_PULSEGEN.""" 165 current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) 166 new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], True) 167 self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value) 168 169 def disable(self) -> None: 170 """Disable the pulse generator.""" 171 current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) 172 new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], False) 173 self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value) 174 175 @property 176 def output_inversion(self) -> bool: 177 currently_enabled_config_options = decode_pulse_gen_config( 178 self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) 179 ) 180 return SPCM_PULSEGEN_CONFIG_INVERT in currently_enabled_config_options 181 182 def set_output_inversion(self, inverted: bool) -> None: 183 current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) 184 new_register_value = toggle_bitmap_value(current_register_value, SPCM_PULSEGEN_CONFIG_INVERT, inverted) 185 self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value) 186 187 @property 188 def trigger_detection_mode(self) -> PulseGeneratorTriggerDetectionMode: 189 """How the pulse generator trigger circuit responds to a trigger signal, .e.g rising edge...""" 190 currently_enabled_config_options = decode_pulse_gen_config( 191 self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) 192 ) 193 if PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value in currently_enabled_config_options: 194 return PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH 195 else: 196 return PulseGeneratorTriggerDetectionMode.RISING_EDGE 197 198 def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None: 199 """e.g. rising edge, high-voltage...""" 200 current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) 201 high_voltage_mode_value = PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value 202 new_register_value = toggle_bitmap_value( 203 current_register_value, 204 high_voltage_mode_value, 205 mode == PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH, 206 ) 207 self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value) 208 209 @property 210 def trigger_mode(self) -> PulseGeneratorTriggerMode: 211 """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.""" 212 return PulseGeneratorTriggerMode( 213 self.read_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number]) 214 ) 215 216 def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None: 217 """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.""" 218 self.write_to_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number], mode.value) 219 220 @property 221 def min_allowed_period_in_seconds(self) -> float: 222 """Minimum allowed pulse period in seconds, given the current clock rate.""" 223 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MIN) 224 reg_val = 0 if reg_val < 0 else reg_val 225 return self._convert_clock_cycles_to_seconds(reg_val) 226 227 @property 228 def max_allowed_period_in_seconds(self) -> float: 229 """Maximum allowed pulse period in seconds, given the current clock rate.""" 230 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MAX) 231 reg_val = iinfo(int16).max if reg_val < 0 else reg_val 232 return self._convert_clock_cycles_to_seconds(reg_val) 233 234 @property 235 def _allowed_period_step_size_in_clock_cycles(self) -> int: 236 return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_STEP) 237 238 @property 239 def allowed_period_step_size_in_seconds(self) -> float: 240 """Resolution with which the pulse period can be set, given the current clock rate.""" 241 return self._convert_clock_cycles_to_seconds(self._allowed_period_step_size_in_clock_cycles) 242 243 @property 244 def period_in_seconds(self) -> float: 245 """The pulse length in seconds, including both the high-voltage and low-voltage sections.""" 246 return self._convert_clock_cycles_to_seconds( 247 self.read_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number]) 248 ) 249 250 def set_period_in_seconds(self, period: float, coerce: bool = False) -> float: 251 """Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value 252 will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and 253 allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is 254 requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of 255 active channels and the sample rate.""" 256 period_in_clock_cycles = self._convert_seconds_to_clock_cycles(period) 257 coerced_period = _coerce_fractional_value_to_allowed_integer( 258 period_in_clock_cycles, 259 int(self._convert_seconds_to_clock_cycles(self.min_allowed_period_in_seconds)), 260 int(self._convert_seconds_to_clock_cycles(self.max_allowed_period_in_seconds)), 261 self._allowed_period_step_size_in_clock_cycles, 262 ) 263 if not coerce and coerced_period != period_in_clock_cycles: 264 raise SpectrumInvalidParameterValue( 265 "pulse generator period", 266 period, 267 self.min_allowed_period_in_seconds, 268 self.max_allowed_period_in_seconds, 269 self.allowed_period_step_size_in_seconds, 270 ) 271 272 self.write_to_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number], int(coerced_period)) 273 return self._convert_clock_cycles_to_seconds(coerced_period) 274 275 @property 276 def min_allowed_high_voltage_duration_in_seconds(self) -> float: 277 """Minimum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.""" 278 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MIN) 279 reg_val = 0 if reg_val < 0 else reg_val 280 return self._convert_clock_cycles_to_seconds(reg_val) 281 282 @property 283 def max_allowed_high_voltage_duration_in_seconds(self) -> float: 284 """Maximum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.""" 285 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MAX) 286 reg_val = iinfo(int16).max if reg_val < 0 else reg_val 287 return self._convert_clock_cycles_to_seconds(reg_val) 288 289 @property 290 def _allowed_high_voltage_duration_step_size_in_clock_cycles(self) -> int: 291 return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_STEP) 292 293 @property 294 def allowed_high_voltage_duration_step_size_in_seconds(self) -> float: 295 """Resolution with which the high-voltage duration can be set, in seconds, given the current clock rate.""" 296 return self._convert_clock_cycles_to_seconds(self._allowed_high_voltage_duration_step_size_in_clock_cycles) 297 298 @property 299 def duration_of_high_voltage_in_seconds(self) -> float: 300 """The length of the high-voltage part of a pulse, in seconds. Equal to the pulse duration * duty cycle.""" 301 return self._convert_clock_cycles_to_seconds( 302 self.read_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number]) 303 ) 304 305 @property 306 def duration_of_low_voltage_in_seconds(self) -> float: 307 """The length of the low-voltage part of a pulse, in seconds. Equal to the pulse duration * (1 - duty cycle).""" 308 return self.period_in_seconds - self.duration_of_high_voltage_in_seconds 309 310 @property 311 def duty_cycle(self) -> float: 312 """The ratio between the high-voltage and low-voltage parts of the pulse.""" 313 return self.duration_of_high_voltage_in_seconds / self.period_in_seconds 314 315 def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float: 316 """Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and 317 use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested 318 an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active 319 channels and the sample rate. 320 """ 321 requested_high_v_duration_in_clock_cycles = self._convert_seconds_to_clock_cycles( 322 self.period_in_seconds * duty_cycle 323 ) 324 clipped_duration = _coerce_fractional_value_to_allowed_integer( 325 requested_high_v_duration_in_clock_cycles, 326 int(self._convert_seconds_to_clock_cycles(self.min_allowed_high_voltage_duration_in_seconds)), 327 int(self._convert_seconds_to_clock_cycles(self.max_allowed_high_voltage_duration_in_seconds)), 328 self._allowed_high_voltage_duration_step_size_in_clock_cycles, 329 ) 330 if not coerce and clipped_duration != requested_high_v_duration_in_clock_cycles: 331 raise SpectrumInvalidParameterValue( 332 "high-voltage duration", 333 self.period_in_seconds * duty_cycle, 334 self.min_allowed_high_voltage_duration_in_seconds, 335 self.max_allowed_high_voltage_duration_in_seconds, 336 self.allowed_high_voltage_duration_step_size_in_seconds, 337 ) 338 self.write_to_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number], clipped_duration) 339 return self._convert_clock_cycles_to_seconds(clipped_duration) / self.period_in_seconds 340 341 @property 342 def min_allowed_pulses(self) -> int: 343 """Minimum allowed number of pulses to transmit.""" 344 return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MIN) 345 346 @property 347 def max_allowed_pulses(self) -> int: 348 """Maximum allowed number of pulses to transmit.""" 349 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MAX) 350 # my card has this register set to -2, which I assume means no limit (can't work it out from the docs) 351 return reg_val if reg_val > 0 else iinfo(int16).max 352 353 @property 354 def allowed_num_pulses_step_size(self) -> int: 355 """Resolution with which the number of pulses to transmit can be set.""" 356 return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_STEP) 357 358 @property 359 def num_pulses(self) -> int: 360 """The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously.""" 361 return self.read_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number]) 362 363 def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int: 364 """Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated 365 continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, 366 max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a 367 SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.""" 368 369 num_pulses = max(0, num_pulses) # make negative value 0 to enable continuous pulse generation 370 371 coerced_num_pulses = _coerce_fractional_value_to_allowed_integer( 372 float(num_pulses), self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size 373 ) 374 375 if not coerce and coerced_num_pulses != num_pulses: 376 raise SpectrumInvalidParameterValue( 377 "number of pulses", 378 num_pulses, 379 self.min_allowed_pulses, 380 self.max_allowed_pulses, 381 self.allowed_num_pulses_step_size, 382 ) 383 384 self.write_to_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number], coerced_num_pulses) 385 return coerced_num_pulses 386 387 @property 388 def min_allowed_delay_in_seconds(self) -> float: 389 """Minimum allowed delay between the trigger event and pulse generation, in seconds, given the current clock 390 rate.""" 391 reg_value = self.read_parent_device_register(602007) # SPC_XIO_PULSEGEN_AVAILDELAY_MIN not in regs.py 392 reg_value = 0 if reg_value == -1 else reg_value 393 return self._convert_clock_cycles_to_seconds(reg_value) 394 395 @property 396 def max_allowed_delay_in_seconds(self) -> float: 397 """Maximum allowed delay between the trigger event and pulse generation, in seconds, given the current clock 398 rate.""" 399 reg_value = self.read_parent_device_register(602008) # SPC_XIO_PULSEGEN_AVAILDELAY_MAX not in regs.py 400 reg_value = iinfo(int16).max if reg_value == -1 else reg_value 401 return self._convert_clock_cycles_to_seconds(reg_value) 402 403 @property 404 def allowed_delay_step_size_in_seconds(self) -> float: 405 """resolution with which the delay between the trigger event and pulse generation can be set, in seconds, given 406 the current clock rate.""" 407 return self._convert_clock_cycles_to_seconds( 408 self.read_parent_device_register(602009) # SPC_XIO_PULSEGEN_AVAILDELAY_STEP not in regs.py 409 ) 410 411 @property 412 def delay_in_seconds(self) -> float: 413 """The delay between the trigger and the first pulse transmission""" 414 return self._convert_clock_cycles_to_seconds( 415 self.read_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number]) 416 ) 417 418 def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float: 419 """Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is 420 coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and 421 allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised 422 if the requested value is invalid.""" 423 424 requested_delay_in_clock_cycles = self._convert_seconds_to_clock_cycles(delay_in_seconds) 425 clipped_delay_in_clock_cycles = _coerce_fractional_value_to_allowed_integer( 426 requested_delay_in_clock_cycles, 427 int(self._convert_seconds_to_clock_cycles(self.min_allowed_delay_in_seconds)), 428 int(self._convert_seconds_to_clock_cycles(self.max_allowed_delay_in_seconds)), 429 int(self._convert_seconds_to_clock_cycles(self.allowed_delay_step_size_in_seconds)), 430 ) 431 432 if not coerce and clipped_delay_in_clock_cycles != requested_delay_in_clock_cycles: 433 raise SpectrumInvalidParameterValue( 434 "delay in seconds", 435 requested_delay_in_clock_cycles, 436 self.min_allowed_delay_in_seconds, 437 self.max_allowed_delay_in_seconds, 438 self.allowed_delay_step_size_in_seconds, 439 ) 440 441 self.write_to_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number], clipped_delay_in_clock_cycles) 442 return self._convert_clock_cycles_to_seconds(clipped_delay_in_clock_cycles) 443 444 def __str__(self) -> str: 445 return f"Pulse generator {self._number} of {self._parent_io_line}."
Class for controlling pulse generators associated with IO lines (requires firmware option be enabled).
58 def __init__(self, parent: SpectrumIOLineInterface): 59 self._parent_io_line = parent 60 # last char of IO line name is IO line chanel number, which is used to set pulse generator number 61 self._number = int(parent.name.name[-1]) 62 available_advanced_features = decode_advanced_card_features( 63 self.read_parent_device_register(SPC_PCIEXTFEATURES) 64 ) 65 if AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN not in available_advanced_features: 66 raise SpectrumFeatureNotSupportedByCard( 67 call_description=self.__str__() + ".__init__()", 68 message="Pulse generator firmware option not installed on device.", 69 ) 70 self._multiplexer_1 = PulseGeneratorMultiplexer1(parent=self) 71 self._multiplexer_2 = PulseGeneratorMultiplexer2(parent=self)
73 def configure_output( 74 self, settings: PulseGeneratorOutputSettings, coerce: bool = True 75 ) -> PulseGeneratorOutputSettings: 76 """Configure all pulse generator output settings at once. By default, all values are coerced to the 77 nearest values allowed by the hardware, and the coerced values are returned.""" 78 self.set_output_inversion(settings.output_inversion) 79 coerced_settings = PulseGeneratorOutputSettings( 80 period_in_seconds=self.set_period_in_seconds(settings.period_in_seconds, coerce=coerce), 81 duty_cycle=self.set_duty_cycle(settings.duty_cycle, coerce=coerce), 82 num_pulses=self.set_num_pulses(settings.num_pulses, coerce=coerce), 83 delay_in_seconds=self.set_delay_in_seconds(settings.delay_in_seconds, coerce=coerce), 84 output_inversion=settings.output_inversion, 85 ) 86 self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP) 87 return coerced_settings
Configure all pulse generator output settings at once. By default, all values are coerced to the nearest values allowed by the hardware, and the coerced values are returned.
89 def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None: 90 """Configure all pulse generator trigger settings at once.""" 91 self.set_trigger_mode(settings.trigger_mode) 92 self.set_trigger_detection_mode(settings.trigger_detection_mode) 93 self.multiplexer_1.set_trigger_source(settings.multiplexer_1_source) 94 self.multiplexer_2.set_trigger_source(settings.multiplexer_2_source) 95 self.multiplexer_1.set_output_inversion(settings.multiplexer_1_output_inversion) 96 self.multiplexer_2.set_output_inversion(settings.multiplexer_2_output_inversion) 97 self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)
Configure all pulse generator trigger settings at once.
99 def force_trigger(self) -> None: 100 """Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'.""" 101 if ( 102 self._multiplexer_2.trigger_source 103 != PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE 104 ): 105 raise SpectrumIOError("Force trigger can only be used if trigger source (mux 2) is set to 'software'") 106 self.write_to_parent_device_register(SPC_XIO_PULSEGEN_COMMAND, SPCM_PULSEGEN_CMD_FORCE)
Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'.
108 @property 109 def number(self) -> int: 110 """The index of the pulse generator. Corresponds to the index of the IO line to which it belongs.""" 111 return self._number
The index of the pulse generator. Corresponds to the index of the IO line to which it belongs.
113 @property 114 def multiplexer_1(self) -> PulseGeneratorMultiplexer1: 115 """Change the trigger source of this multiplexer to control when it is possible to trigger the pulse generator.""" 116 return self._multiplexer_1
Change the trigger source of this multiplexer to control when it is possible to trigger the pulse generator.
118 @property 119 def multiplexer_2(self) -> PulseGeneratorMultiplexer2: 120 """Change the trigger source of this multiplexer to control how the pulse generator is triggered.""" 121 return self._multiplexer_2
Change the trigger source of this multiplexer to control how the pulse generator is triggered.
146 @property 147 def clock_rate_in_hz(self) -> int: 148 """The current pulse generator clock rate. Affected by the sample rate of the parent card, and the number of 149 channels enabled. Effects the precision with which pulse timings can be set, and their min and max values.""" 150 return self.read_parent_device_register(SPC_XIO_PULSEGEN_CLOCK)
The current pulse generator clock rate. Affected by the sample rate of the parent card, and the number of channels enabled. Effects the precision with which pulse timings can be set, and their min and max values.
152 @property 153 def clock_period_in_seconds(self) -> float: 154 """The reciprocal of the clock rate, in seconds.""" 155 return 1 / self.clock_rate_in_hz
The reciprocal of the clock rate, in seconds.
157 @property 158 def enabled(self) -> bool: 159 """True if the pulse generator is currently enabled.""" 160 return PULSE_GEN_ENABLE_COMMANDS[self._number] in self._get_enabled_pulse_generator_ids()
True if the pulse generator is currently enabled.
162 def enable(self) -> None: 163 """Enable the pulse generator. Note that the mode of the parent IO Line must also be set to 164 IOLineMOdO.SPCM_XMODE_PULSEGEN.""" 165 current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) 166 new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], True) 167 self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)
Enable the pulse generator. Note that the mode of the parent IO Line must also be set to IOLineMOdO.SPCM_XMODE_PULSEGEN.
169 def disable(self) -> None: 170 """Disable the pulse generator.""" 171 current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE) 172 new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], False) 173 self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)
Disable the pulse generator.
182 def set_output_inversion(self, inverted: bool) -> None: 183 current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) 184 new_register_value = toggle_bitmap_value(current_register_value, SPCM_PULSEGEN_CONFIG_INVERT, inverted) 185 self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)
187 @property 188 def trigger_detection_mode(self) -> PulseGeneratorTriggerDetectionMode: 189 """How the pulse generator trigger circuit responds to a trigger signal, .e.g rising edge...""" 190 currently_enabled_config_options = decode_pulse_gen_config( 191 self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) 192 ) 193 if PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value in currently_enabled_config_options: 194 return PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH 195 else: 196 return PulseGeneratorTriggerDetectionMode.RISING_EDGE
How the pulse generator trigger circuit responds to a trigger signal, .e.g rising edge...
198 def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None: 199 """e.g. rising edge, high-voltage...""" 200 current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number]) 201 high_voltage_mode_value = PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value 202 new_register_value = toggle_bitmap_value( 203 current_register_value, 204 high_voltage_mode_value, 205 mode == PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH, 206 ) 207 self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)
e.g. rising edge, high-voltage...
209 @property 210 def trigger_mode(self) -> PulseGeneratorTriggerMode: 211 """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.""" 212 return PulseGeneratorTriggerMode( 213 self.read_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number]) 214 )
Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.
216 def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None: 217 """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.""" 218 self.write_to_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number], mode.value)
Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.
220 @property 221 def min_allowed_period_in_seconds(self) -> float: 222 """Minimum allowed pulse period in seconds, given the current clock rate.""" 223 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MIN) 224 reg_val = 0 if reg_val < 0 else reg_val 225 return self._convert_clock_cycles_to_seconds(reg_val)
Minimum allowed pulse period in seconds, given the current clock rate.
227 @property 228 def max_allowed_period_in_seconds(self) -> float: 229 """Maximum allowed pulse period in seconds, given the current clock rate.""" 230 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MAX) 231 reg_val = iinfo(int16).max if reg_val < 0 else reg_val 232 return self._convert_clock_cycles_to_seconds(reg_val)
Maximum allowed pulse period in seconds, given the current clock rate.
238 @property 239 def allowed_period_step_size_in_seconds(self) -> float: 240 """Resolution with which the pulse period can be set, given the current clock rate.""" 241 return self._convert_clock_cycles_to_seconds(self._allowed_period_step_size_in_clock_cycles)
Resolution with which the pulse period can be set, given the current clock rate.
243 @property 244 def period_in_seconds(self) -> float: 245 """The pulse length in seconds, including both the high-voltage and low-voltage sections.""" 246 return self._convert_clock_cycles_to_seconds( 247 self.read_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number]) 248 )
The pulse length in seconds, including both the high-voltage and low-voltage sections.
250 def set_period_in_seconds(self, period: float, coerce: bool = False) -> float: 251 """Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value 252 will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and 253 allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is 254 requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of 255 active channels and the sample rate.""" 256 period_in_clock_cycles = self._convert_seconds_to_clock_cycles(period) 257 coerced_period = _coerce_fractional_value_to_allowed_integer( 258 period_in_clock_cycles, 259 int(self._convert_seconds_to_clock_cycles(self.min_allowed_period_in_seconds)), 260 int(self._convert_seconds_to_clock_cycles(self.max_allowed_period_in_seconds)), 261 self._allowed_period_step_size_in_clock_cycles, 262 ) 263 if not coerce and coerced_period != period_in_clock_cycles: 264 raise SpectrumInvalidParameterValue( 265 "pulse generator period", 266 period, 267 self.min_allowed_period_in_seconds, 268 self.max_allowed_period_in_seconds, 269 self.allowed_period_step_size_in_seconds, 270 ) 271 272 self.write_to_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number], int(coerced_period)) 273 return self._convert_clock_cycles_to_seconds(coerced_period)
Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.
275 @property 276 def min_allowed_high_voltage_duration_in_seconds(self) -> float: 277 """Minimum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.""" 278 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MIN) 279 reg_val = 0 if reg_val < 0 else reg_val 280 return self._convert_clock_cycles_to_seconds(reg_val)
Minimum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.
282 @property 283 def max_allowed_high_voltage_duration_in_seconds(self) -> float: 284 """Maximum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.""" 285 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MAX) 286 reg_val = iinfo(int16).max if reg_val < 0 else reg_val 287 return self._convert_clock_cycles_to_seconds(reg_val)
Maximum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.
293 @property 294 def allowed_high_voltage_duration_step_size_in_seconds(self) -> float: 295 """Resolution with which the high-voltage duration can be set, in seconds, given the current clock rate.""" 296 return self._convert_clock_cycles_to_seconds(self._allowed_high_voltage_duration_step_size_in_clock_cycles)
Resolution with which the high-voltage duration can be set, in seconds, given the current clock rate.
298 @property 299 def duration_of_high_voltage_in_seconds(self) -> float: 300 """The length of the high-voltage part of a pulse, in seconds. Equal to the pulse duration * duty cycle.""" 301 return self._convert_clock_cycles_to_seconds( 302 self.read_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number]) 303 )
The length of the high-voltage part of a pulse, in seconds. Equal to the pulse duration * duty cycle.
305 @property 306 def duration_of_low_voltage_in_seconds(self) -> float: 307 """The length of the low-voltage part of a pulse, in seconds. Equal to the pulse duration * (1 - duty cycle).""" 308 return self.period_in_seconds - self.duration_of_high_voltage_in_seconds
The length of the low-voltage part of a pulse, in seconds. Equal to the pulse duration * (1 - duty cycle).
310 @property 311 def duty_cycle(self) -> float: 312 """The ratio between the high-voltage and low-voltage parts of the pulse.""" 313 return self.duration_of_high_voltage_in_seconds / self.period_in_seconds
The ratio between the high-voltage and low-voltage parts of the pulse.
315 def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float: 316 """Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and 317 use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested 318 an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active 319 channels and the sample rate. 320 """ 321 requested_high_v_duration_in_clock_cycles = self._convert_seconds_to_clock_cycles( 322 self.period_in_seconds * duty_cycle 323 ) 324 clipped_duration = _coerce_fractional_value_to_allowed_integer( 325 requested_high_v_duration_in_clock_cycles, 326 int(self._convert_seconds_to_clock_cycles(self.min_allowed_high_voltage_duration_in_seconds)), 327 int(self._convert_seconds_to_clock_cycles(self.max_allowed_high_voltage_duration_in_seconds)), 328 self._allowed_high_voltage_duration_step_size_in_clock_cycles, 329 ) 330 if not coerce and clipped_duration != requested_high_v_duration_in_clock_cycles: 331 raise SpectrumInvalidParameterValue( 332 "high-voltage duration", 333 self.period_in_seconds * duty_cycle, 334 self.min_allowed_high_voltage_duration_in_seconds, 335 self.max_allowed_high_voltage_duration_in_seconds, 336 self.allowed_high_voltage_duration_step_size_in_seconds, 337 ) 338 self.write_to_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number], clipped_duration) 339 return self._convert_clock_cycles_to_seconds(clipped_duration) / self.period_in_seconds
Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.
341 @property 342 def min_allowed_pulses(self) -> int: 343 """Minimum allowed number of pulses to transmit.""" 344 return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MIN)
Minimum allowed number of pulses to transmit.
346 @property 347 def max_allowed_pulses(self) -> int: 348 """Maximum allowed number of pulses to transmit.""" 349 reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MAX) 350 # my card has this register set to -2, which I assume means no limit (can't work it out from the docs) 351 return reg_val if reg_val > 0 else iinfo(int16).max
Maximum allowed number of pulses to transmit.
353 @property 354 def allowed_num_pulses_step_size(self) -> int: 355 """Resolution with which the number of pulses to transmit can be set.""" 356 return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_STEP)
Resolution with which the number of pulses to transmit can be set.
358 @property 359 def num_pulses(self) -> int: 360 """The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously.""" 361 return self.read_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number])
The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously.
363 def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int: 364 """Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated 365 continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, 366 max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a 367 SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.""" 368 369 num_pulses = max(0, num_pulses) # make negative value 0 to enable continuous pulse generation 370 371 coerced_num_pulses = _coerce_fractional_value_to_allowed_integer( 372 float(num_pulses), self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size 373 ) 374 375 if not coerce and coerced_num_pulses != num_pulses: 376 raise SpectrumInvalidParameterValue( 377 "number of pulses", 378 num_pulses, 379 self.min_allowed_pulses, 380 self.max_allowed_pulses, 381 self.allowed_num_pulses_step_size, 382 ) 383 384 self.write_to_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number], coerced_num_pulses) 385 return coerced_num_pulses
Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.
387 @property 388 def min_allowed_delay_in_seconds(self) -> float: 389 """Minimum allowed delay between the trigger event and pulse generation, in seconds, given the current clock 390 rate.""" 391 reg_value = self.read_parent_device_register(602007) # SPC_XIO_PULSEGEN_AVAILDELAY_MIN not in regs.py 392 reg_value = 0 if reg_value == -1 else reg_value 393 return self._convert_clock_cycles_to_seconds(reg_value)
Minimum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.
395 @property 396 def max_allowed_delay_in_seconds(self) -> float: 397 """Maximum allowed delay between the trigger event and pulse generation, in seconds, given the current clock 398 rate.""" 399 reg_value = self.read_parent_device_register(602008) # SPC_XIO_PULSEGEN_AVAILDELAY_MAX not in regs.py 400 reg_value = iinfo(int16).max if reg_value == -1 else reg_value 401 return self._convert_clock_cycles_to_seconds(reg_value)
Maximum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.
403 @property 404 def allowed_delay_step_size_in_seconds(self) -> float: 405 """resolution with which the delay between the trigger event and pulse generation can be set, in seconds, given 406 the current clock rate.""" 407 return self._convert_clock_cycles_to_seconds( 408 self.read_parent_device_register(602009) # SPC_XIO_PULSEGEN_AVAILDELAY_STEP not in regs.py 409 )
resolution with which the delay between the trigger event and pulse generation can be set, in seconds, given the current clock rate.
411 @property 412 def delay_in_seconds(self) -> float: 413 """The delay between the trigger and the first pulse transmission""" 414 return self._convert_clock_cycles_to_seconds( 415 self.read_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number]) 416 )
The delay between the trigger and the first pulse transmission
418 def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float: 419 """Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is 420 coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and 421 allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised 422 if the requested value is invalid.""" 423 424 requested_delay_in_clock_cycles = self._convert_seconds_to_clock_cycles(delay_in_seconds) 425 clipped_delay_in_clock_cycles = _coerce_fractional_value_to_allowed_integer( 426 requested_delay_in_clock_cycles, 427 int(self._convert_seconds_to_clock_cycles(self.min_allowed_delay_in_seconds)), 428 int(self._convert_seconds_to_clock_cycles(self.max_allowed_delay_in_seconds)), 429 int(self._convert_seconds_to_clock_cycles(self.allowed_delay_step_size_in_seconds)), 430 ) 431 432 if not coerce and clipped_delay_in_clock_cycles != requested_delay_in_clock_cycles: 433 raise SpectrumInvalidParameterValue( 434 "delay in seconds", 435 requested_delay_in_clock_cycles, 436 self.min_allowed_delay_in_seconds, 437 self.max_allowed_delay_in_seconds, 438 self.allowed_delay_step_size_in_seconds, 439 ) 440 441 self.write_to_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number], clipped_delay_in_clock_cycles) 442 return self._convert_clock_cycles_to_seconds(clipped_delay_in_clock_cycles)
Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised if the requested value is invalid.