spectrumdevice

A high-level, object-oriented Python library for controlling Spectrum Instrumentation devices.

spectrumdevice can connect to individual cards or StarHubs (e.g. the NetBox). spectrumdevice provides the following classes for controlling devices:

Hardware Classes

Name Purpose
SpectrumDigitiserCard Controlling individual digitiser cards
SpectrumDigitiserStarHub Controlling digitiser cards aggregated with a StarHub
SpectrumDigitiserAnalogChannel Controlling analog channels of a digitiser
SpectrumDigitiserIOLine Controlling multipurpose IO lines of a digitiser
SpectrumAWGCard Controlling individual AWG cards
SpectrumAWGStarHub (not yet implemented)
SpectrumAWGAnalogChannel Controlling analog channels of an AWG
SpectrumAWGIOLine Controlling multipurpose IO lines of an AWG
PulseGenerator Controlling pulse generators belonging to IO lines

Mock Classes

spectrumdevice also includes mock classes for testing software without drivers installed or hardware connected:

Name Purpose
MockSpectrumDigitiserCard Mocking individual digitiser cards
MockSpectrumDigitiserStarHub Mocking digitiser cards aggregated with a StarHub
MockSpectrumAWGCard Mocking individual AWG cards
MockSpectrumAWGStarHub Mocking AWG cards aggregated with a StarHub

Abstract Classes

The following abstract classes provide common implementations for methods whose functionality is identical across different Spectrum devices. They cannot be instantiated themselves, but are included here as they contain documentation for methods inherited by the concrete (and therefore instantiatable) classes above.

Name Purpose
AbstractSpectrumDevice Implements methods common to all devices
AbstractSpectrumCard Implements methods common to all card
AbstractSpectrumStarHub Implements methods common to all hubs
AbstractSpectrumChannel Implements methods common to all channels
AbstractSpectrumDigitiser Implements methods common to all digitisers
AbstractSpectrumAWG Implements methods common to all AWGs

Settings

The submodule spectrumdevice.settings provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.

Resources

Reference Documentation

View Source
"""
A high-level, object-oriented Python library for controlling Spectrum Instrumentation devices.

`spectrumdevice` can connect to individual cards or
[StarHubs](https://spectrum-instrumentation.com/en/m4i-star-hub) (e.g. the
[NetBox](https://spectrum-instrumentation.com/en/digitizernetbox)). `spectrumdevice` provides the following classes
for controlling devices:

### Hardware Classes
| Name                             | Purpose                                                 |
|----------------------------------|---------------------------------------------------------|
| `SpectrumDigitiserCard`          | Controlling individual digitiser cards                  |
| `SpectrumDigitiserStarHub`       | Controlling digitiser cards aggregated with a StarHub   |
| `SpectrumDigitiserAnalogChannel` | Controlling analog channels of a digitiser              |
| `SpectrumDigitiserIOLine`        | Controlling multipurpose IO lines of a digitiser        |
| `SpectrumAWGCard`                | Controlling individual AWG cards                        |
| `SpectrumAWGStarHub`             | (not yet implemented)                                   |
| `SpectrumAWGAnalogChannel`       | Controlling analog channels of an AWG                   |
| `SpectrumAWGIOLine`              | Controlling multipurpose IO lines of an AWG             |
| `PulseGenerator`                 | Controlling pulse generators belonging to IO lines      |

### Mock Classes
`spectrumdevice` also includes mock classes for testing software without drivers installed or hardware connected:

| Name                           | Purpose                                             |
|--------------------------------|-----------------------------------------------------|
| `MockSpectrumDigitiserCard`    | Mocking individual digitiser cards                  |
| `MockSpectrumDigitiserStarHub` | Mocking digitiser cards aggregated with a StarHub   |
| `MockSpectrumAWGCard`          | Mocking individual AWG cards                        |
| `MockSpectrumAWGStarHub`       | Mocking AWG cards aggregated with a StarHub         |

### Abstract Classes
The following abstract classes provide common implementations for methods whose functionality is identical across
different Spectrum devices. They cannot be instantiated themselves, but are included here as they contain
documentation for methods inherited by the concrete (and therefore instantiatable) classes above.

| Name                           | Purpose                                             |
|--------------------------------|-----------------------------------------------------|
| `AbstractSpectrumDevice`       | Implements methods common to all devices            |
| `AbstractSpectrumCard`         | Implements methods common to all card               |
| `AbstractSpectrumStarHub`      | Implements methods common to all hubs               |
| `AbstractSpectrumChannel`      | Implements methods common to all channels           |
| `AbstractSpectrumDigitiser`    | Implements methods common to all digitisers         |
| `AbstractSpectrumAWG`          | Implements methods common to all AWGs               |

### Settings
The submodule `spectrumdevice.settings` provides Enums and Dataclasses wrapping the register values provided by the
Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.

### Resources
* [Source on GitHub](https://github.com/KCL-BMEIS/spectrumdevice)
* [README including quickstart](https://github.com/KCL-BMEIS/spectrumdevice/blob/main/README.md)
* [Examples](https://github.com/KCL-BMEIS/spectrumdevice/tree/main/example_scripts)
* [PyPi](https://pypi.org/project/spectrumdevice/)
* [API reference documentation](https://kcl-bmeis.github.io/spectrumdevice/)

### Reference Documentation
"""

# Christian Baker, King's College London
# Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London
# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT.

from .measurement import Measurement
from .devices.digitiser.digitiser_card import SpectrumDigitiserCard
from .devices.digitiser.digitiser_channel import SpectrumDigitiserAnalogChannel, SpectrumDigitiserIOLine
from .devices.digitiser.digitiser_star_hub import SpectrumDigitiserStarHub
from .devices.awg.awg_card import SpectrumAWGCard
from .devices.awg.awg_channel import SpectrumAWGAnalogChannel, SpectrumAWGIOLine
from .devices.mocks import MockSpectrumDigitiserCard, MockSpectrumDigitiserStarHub, MockSpectrumAWGCard
from .devices.abstract_device import (
    AbstractSpectrumDevice,
    AbstractSpectrumCard,
    AbstractSpectrumChannel,
    AbstractSpectrumStarHub,
)
from .devices.digitiser.abstract_spectrum_digitiser import AbstractSpectrumDigitiser
from .features.pulse_generator.pulse_generator import PulseGenerator

__all__ = [
    "SpectrumDigitiserAnalogChannel",
    "SpectrumDigitiserIOLine",
    "SpectrumDigitiserCard",
    "SpectrumDigitiserStarHub",
    "MockSpectrumDigitiserCard",
    "MockSpectrumDigitiserStarHub",
    "AbstractSpectrumDigitiser",
    "AbstractSpectrumStarHub",
    "AbstractSpectrumCard",
    "AbstractSpectrumDevice",
    "AbstractSpectrumChannel",
    "settings",
    "features",
    "Measurement",
    "SpectrumAWGCard",
    "MockSpectrumAWGCard",
    "SpectrumAWGAnalogChannel",
    "SpectrumAWGIOLine",
    "PulseGenerator",
]


from . import _version

__version__ = _version.get_versions()["version"]  # type: ignore
#   class SpectrumDigitiserAnalogChannel(spectrumdevice.AbstractSpectrumChannel[spectrumdevice.settings.channel.SpectrumAnalogChannelName], spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface, abc.ABC):
View Source
class SpectrumDigitiserAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumDigitiserAnalogChannelInterface):
    """Class for controlling an individual channel of a spectrum digitiser. Channels are constructed automatically when
    a `SpectrumDigitiserCard` or `SpectrumDigitiserStarHub` is instantiated, and can then be accessed via the
    `.channels` property."""

    def __init__(self, channel_number: int, parent_device: SpectrumDigitiserInterface) -> None:

        if parent_device.type != CardType.SPCM_TYPE_AI:
            raise SpectrumCardIsNotADigitiser(parent_device.type)

        # pass unused args up the inheritance hierarchy
        super().__init__(channel_number=channel_number, parent_device=parent_device)

        self._full_scale_value = self._parent_device.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE)
        # used frequently so store locally instead of reading from device each time:
        self._vertical_range_mv = self.vertical_range_in_mv
        self._vertical_offset_in_percent = self.vertical_offset_in_percent

    def _get_settings_as_dict(self) -> dict:
        return {
            SpectrumDigitiserAnalogChannel.input_path.__name__: self.input_path,
            SpectrumDigitiserAnalogChannel.input_coupling.__name__: self.input_coupling,
            SpectrumDigitiserAnalogChannel.input_impedance.__name__: self.input_impedance,
            SpectrumDigitiserAnalogChannel.vertical_range_in_mv.__name__: self.vertical_range_in_mv,
            SpectrumDigitiserAnalogChannel.vertical_offset_in_percent.__name__: self.vertical_offset_in_percent,
        }

    def _set_settings_from_dict(self, settings: dict) -> None:
        self.set_input_path(settings[SpectrumDigitiserAnalogChannel.input_path.__name__])
        self.set_input_coupling(settings[SpectrumDigitiserAnalogChannel.input_coupling.__name__])
        self.set_input_impedance(settings[SpectrumDigitiserAnalogChannel.input_impedance.__name__])
        self.set_vertical_range_in_mv(settings[SpectrumDigitiserAnalogChannel.vertical_range_in_mv.__name__])
        self.set_vertical_offset_in_percent(
            settings[SpectrumDigitiserAnalogChannel.vertical_offset_in_percent.__name__]
        )

    def convert_raw_waveform_to_voltage_waveform(self, raw_waveform: ndarray) -> ndarray:
        vertical_offset_mv = 0.01 * float(self._vertical_range_mv * self._vertical_offset_in_percent)
        return 1e-3 * (
            float(self._vertical_range_mv) * raw_waveform / float(self._full_scale_value) + vertical_offset_mv
        )

    @property
    def vertical_range_in_mv(self) -> int:
        """The currently set input range of the channel in mV.

        Returns:
            vertical_range (int): The currently set vertical range in mV.
        """
        self._vertical_range_mv = self._parent_device.read_spectrum_device_register(
            VERTICAL_RANGE_COMMANDS[self._number]
        )
        return self._vertical_range_mv

    def set_vertical_range_in_mv(self, vertical_range: int) -> None:
        """Set the input range of the channel in mV. See Spectrum documentation for valid values.

        Args:
            vertical_range (int): The desired vertical range in mV.
        """
        self._parent_device.write_to_spectrum_device_register(VERTICAL_RANGE_COMMANDS[self._number], vertical_range)
        self._vertical_range_mv = vertical_range

    @property
    def vertical_offset_in_percent(self) -> int:
        """The currently set input offset of the channel in percent of the vertical range.

        Returns:
            offset (int): The currently set vertical offset in percent.
        """
        self._vertical_offset_in_percent = self._parent_device.read_spectrum_device_register(
            VERTICAL_OFFSET_COMMANDS[self._number]
        )
        return self._vertical_offset_in_percent

    def set_vertical_offset_in_percent(self, offset: int) -> None:
        """Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid
        values.

        Args:
            offset (int): The desired vertical offset in percent.
        """
        self._parent_device.write_to_spectrum_device_register(VERTICAL_OFFSET_COMMANDS[self._number], offset)
        self._vertical_offset_in_percent = offset

    @property
    def input_impedance(self) -> InputImpedance:
        """The current input impedance setting of the channel (50 Ohm or 1 MOhm)"""
        impedance_binary_value = self._parent_device.read_spectrum_device_register(
            INPUT_IMPEDANCE_COMMANDS[self._number]
        )
        return InputImpedance(impedance_binary_value)

    def set_input_impedance(self, input_impedance: InputImpedance) -> None:
        self._parent_device.write_to_spectrum_device_register(
            INPUT_IMPEDANCE_COMMANDS[self._number], input_impedance.value
        )

    @property
    def input_coupling(self) -> InputCoupling:
        """The coupling (AC or DC) setting of the channel. Only available on some hardware."""
        coupling_binary_value = self._parent_device.read_spectrum_device_register(INPUT_COUPLING_COMMANDS[self._number])
        return InputCoupling(coupling_binary_value)

    def set_input_coupling(self, input_coupling: InputCoupling) -> None:
        self._parent_device.write_to_spectrum_device_register(
            INPUT_COUPLING_COMMANDS[self._number], input_coupling.value
        )

    @property
    def input_path(self) -> InputPath:
        """The input path setting of the channel. Only available on some hardware."""
        path_binary_value = self._parent_device.read_spectrum_device_register(INPUT_PATH_COMMANDS[self._number])
        return InputPath(path_binary_value)

    def set_input_path(self, input_path: InputPath) -> None:
        self._parent_device.write_to_spectrum_device_register(INPUT_PATH_COMMANDS[self._number], input_path.value)

Class for controlling an individual channel of a spectrum digitiser. Channels are constructed automatically when a SpectrumDigitiserCard or SpectrumDigitiserStarHub is instantiated, and can then be accessed via the .channels property.

#   SpectrumDigitiserAnalogChannel( channel_number: int, parent_device: spectrumdevice.devices.digitiser.digitiser_interface.SpectrumDigitiserInterface )
View Source
    def __init__(self, channel_number: int, parent_device: SpectrumDigitiserInterface) -> None:

        if parent_device.type != CardType.SPCM_TYPE_AI:
            raise SpectrumCardIsNotADigitiser(parent_device.type)

        # pass unused args up the inheritance hierarchy
        super().__init__(channel_number=channel_number, parent_device=parent_device)

        self._full_scale_value = self._parent_device.read_spectrum_device_register(SPC_MIINST_MAXADCVALUE)
        # used frequently so store locally instead of reading from device each time:
        self._vertical_range_mv = self.vertical_range_in_mv
        self._vertical_offset_in_percent = self.vertical_offset_in_percent
#   def convert_raw_waveform_to_voltage_waveform(self, raw_waveform: numpy.ndarray) -> numpy.ndarray:
View Source
    def convert_raw_waveform_to_voltage_waveform(self, raw_waveform: ndarray) -> ndarray:
        vertical_offset_mv = 0.01 * float(self._vertical_range_mv * self._vertical_offset_in_percent)
        return 1e-3 * (
            float(self._vertical_range_mv) * raw_waveform / float(self._full_scale_value) + vertical_offset_mv
        )
#   vertical_range_in_mv: int

The currently set input range of the channel in mV.

Returns

vertical_range (int): The currently set vertical range in mV.

#   def set_vertical_range_in_mv(self, vertical_range: int) -> None:
View Source
    def set_vertical_range_in_mv(self, vertical_range: int) -> None:
        """Set the input range of the channel in mV. See Spectrum documentation for valid values.

        Args:
            vertical_range (int): The desired vertical range in mV.
        """
        self._parent_device.write_to_spectrum_device_register(VERTICAL_RANGE_COMMANDS[self._number], vertical_range)
        self._vertical_range_mv = vertical_range

Set the input range of the channel in mV. See Spectrum documentation for valid values.

Args
  • vertical_range (int): The desired vertical range in mV.
#   vertical_offset_in_percent: int

The currently set input offset of the channel in percent of the vertical range.

Returns

offset (int): The currently set vertical offset in percent.

#   def set_vertical_offset_in_percent(self, offset: int) -> None:
View Source
    def set_vertical_offset_in_percent(self, offset: int) -> None:
        """Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid
        values.

        Args:
            offset (int): The desired vertical offset in percent.
        """
        self._parent_device.write_to_spectrum_device_register(VERTICAL_OFFSET_COMMANDS[self._number], offset)
        self._vertical_offset_in_percent = offset

Set the input offset of the channel in percent of the vertical range. See spectrum documentation for valid values.

Args
  • offset (int): The desired vertical offset in percent.

The current input impedance setting of the channel (50 Ohm or 1 MOhm)

#   def set_input_impedance( self, input_impedance: spectrumdevice.settings.channel.InputImpedance ) -> None:
View Source
    def set_input_impedance(self, input_impedance: InputImpedance) -> None:
        self._parent_device.write_to_spectrum_device_register(
            INPUT_IMPEDANCE_COMMANDS[self._number], input_impedance.value
        )

The coupling (AC or DC) setting of the channel. Only available on some hardware.

#   def set_input_coupling( self, input_coupling: spectrumdevice.settings.channel.InputCoupling ) -> None:
View Source
    def set_input_coupling(self, input_coupling: InputCoupling) -> None:
        self._parent_device.write_to_spectrum_device_register(
            INPUT_COUPLING_COMMANDS[self._number], input_coupling.value
        )

The input path setting of the channel. Only available on some hardware.

#   def set_input_path(self, input_path: spectrumdevice.settings.channel.InputPath) -> None:
View Source
    def set_input_path(self, input_path: InputPath) -> None:
        self._parent_device.write_to_spectrum_device_register(INPUT_PATH_COMMANDS[self._number], input_path.value)
Inherited Members
AbstractSpectrumChannel
name
write_to_parent_device_register
read_parent_device_register
spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface
copy_settings_from_other_channel
#   class SpectrumDigitiserIOLine(spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumIOLineInterface, spectrumdevice.AbstractSpectrumChannel[spectrumdevice.settings.io_lines.SpectrumIOLineName], abc.ABC):
View Source
class SpectrumDigitiserIOLine(AbstractSpectrumIOLine, SpectrumDigitiserIOLineInterface):
    """Class for controlling multipurpose IO lines of a digitiser, e.g. X0, X1, X2 and X3."""

    def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None:
        if parent_device.type != CardType.SPCM_TYPE_AI:
            raise SpectrumCardIsNotADigitiser(parent_device.type)
        super().__init__(parent_device=parent_device, **kwargs)  # pass unused args up the inheritance hierarchy

    def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int:
        return 0  # no settings required for DigOut

Class for controlling multipurpose IO lines of a digitiser, e.g. X0, X1, X2 and X3.

#   SpectrumDigitiserIOLine( parent_device: spectrumdevice.devices.abstract_device.abstract_spectrum_card.AbstractSpectrumCard, **kwargs: Any )
View Source
    def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None:
        if parent_device.type != CardType.SPCM_TYPE_AI:
            raise SpectrumCardIsNotADigitiser(parent_device.type)
        super().__init__(parent_device=parent_device, **kwargs)  # pass unused args up the inheritance hierarchy
Inherited Members
spectrumdevice.devices.abstract_device.abstract_spectrum_io_line.AbstractSpectrumIOLine
mode
set_mode
pulse_generator
AbstractSpectrumChannel
name
write_to_parent_device_register
read_parent_device_register
View Source
class SpectrumDigitiserCard(
    AbstractSpectrumCard[SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface],
    AbstractSpectrumDigitiser,
):
    """Class for controlling individual Spectrum digitiser cards."""

    def __init__(self, device_number: int, ip_address: Optional[str] = None) -> None:
        """
        Args:
            device_number (int): Index of the card to control. If only one card is present, set to 0.
            ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.

        """
        # pass unused args up the inheritance hierarchy
        super().__init__(device_number=device_number, ip_address=ip_address)

        if self.type != CardType.SPCM_TYPE_AI:
            raise SpectrumCardIsNotADigitiser(self.type)
        self._acquisition_mode = self.acquisition_mode
        self._timestamper: Optional[Timestamper] = None
        self._batch_size = 1

    def _init_analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]:
        num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES)
        num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE)
        total_channels = num_modules * num_channels_per_module
        return tuple(
            [SpectrumDigitiserAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)]
        )

    def _init_io_lines(self) -> Sequence[SpectrumDigitiserIOLineInterface]:
        if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES:
            return tuple([SpectrumDigitiserIOLine(channel_number=n, parent_device=self) for n in range(4)])
        else:
            raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.")

    def enable_timestamping(self) -> None:
        self._timestamper = Timestamper(self, self._handle)

    def wait_for_acquisition_to_complete(self) -> None:
        """Blocks until the current acquisition has finished, or the timeout is reached.

        In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after `start()`. Once the call
            to `wait_for_acquisition_to_complete()` returns, the newly acquired samples are in the on_device buffer and
            ready for transfer to the `TransferBuffer` using `start_transfer()`.

        In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until
            `stop()` is called, so `wait_for_acquisition_to_complete()` should not be used.

        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY)

    def get_waveforms(self) -> List[List[NDArray[float_]]]:
        """Get a list of the most recently transferred waveforms, in channel order.

        This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays
        (waveforms) and returns the list.

        In Standard Single mode (SPC_REC_STD_SINGLE), `get_waveforms()` should be called after
        `wait_for_transfer_to_complete()` has returned.

        In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the
        `TransferBuffer`, this method should be called in a loop . The method will block until each new transfer is
        received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example,
        this would the rate at which your trigger source was running).

        Returns:
             waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
             and one array per enabled channel, in channel order. To average the acquisitions:
                `np.array(waveforms).mean(axis=0)`

        """
        if self._transfer_buffer is None:
            raise SpectrumNoTransferBufferDefined("Cannot find a samples transfer buffer")

        num_read_bytes = 0
        num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums)
        num_expected_bytes_per_frame = num_samples_per_frame * self._transfer_buffer.data_array.itemsize
        raw_samples = zeros(num_samples_per_frame * self._batch_size, dtype=self._transfer_buffer.data_array.dtype)

        if self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE):
            raw_samples = self._transfer_buffer.copy_contents()

        elif self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE):
            self.wait_for_transfer_chunk_to_complete()

            while num_read_bytes < (num_expected_bytes_per_frame * self._batch_size):
                num_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_LEN)
                position_of_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_POS)

                # Don't allow reading over the end of the transfer buffer
                if (
                    position_of_available_bytes + num_available_bytes
                ) > self._transfer_buffer.data_array_length_in_bytes:
                    num_available_bytes = self._transfer_buffer.data_array_length_in_bytes - position_of_available_bytes

                # Don't allow reading over the end of the current acquisition:
                if (num_read_bytes + num_available_bytes) > (num_expected_bytes_per_frame * self._batch_size):
                    num_available_bytes = (num_expected_bytes_per_frame * self._batch_size) - num_read_bytes

                num_available_samples = num_available_bytes // self._transfer_buffer.data_array.itemsize
                num_read_samples = num_read_bytes // self._transfer_buffer.data_array.itemsize

                raw_samples[
                    num_read_samples : num_read_samples + num_available_samples
                ] = self._transfer_buffer.read_chunk(position_of_available_bytes, num_available_bytes)
                self.write_to_spectrum_device_register(SPC_DATA_AVAIL_CARD_LEN, num_available_bytes)

                num_read_bytes += num_available_bytes

        waveforms_in_columns = raw_samples.reshape(
            (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums))
        )

        repeat_acquisitions = []
        for n in range(self._batch_size):
            repeat_acquisitions.append(
                [
                    cast(
                        SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num]
                    ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform))
                    for ch_num, waveform in zip(self.enabled_analog_channel_nums, waveforms_in_columns[n, :, :].T)
                ]
            )

        return repeat_acquisitions

    def get_timestamp(self) -> Optional[datetime.datetime]:
        """Get timestamp for the last acquisition"""
        if self._timestamper is not None:
            return self._timestamper.get_timestamp()
        else:
            return None

    @property
    def acquisition_length_in_samples(self) -> int:
        """The current recording length (per channel) in samples.

        Returns:
            length_in_samples (int): The current recording length ('acquisition length') in samples."""
        return self.read_spectrum_device_register(SPC_MEMSIZE)

    def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
        """Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size
          allowed by the connected card type.

        Args:
            length_in_samples (int): The desired recording length ('acquisition length'), in samples.
        """
        length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples)
        self.write_to_spectrum_device_register(SPC_SEGMENTSIZE, length_in_samples)
        self.write_to_spectrum_device_register(SPC_MEMSIZE, length_in_samples)

    @property
    def post_trigger_length_in_samples(self) -> int:
        """The number of samples of the recording that will contain data received after the trigger event.

        Returns:
            length_in_samples (int): The currently set post trigger length in samples.
        """
        return self.read_spectrum_device_register(SPC_POSTTRIGGER)

    def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None:
        """Change the number of samples of the recording that will contain data received after the trigger event.
        In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card.

        Args:
            length_in_samples (int): The desired post trigger length in samples."""
        length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples)
        if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI:
            if (self.acquisition_length_in_samples - length_in_samples) < get_memsize_step_size(self._model_number):
                logger.warning(
                    "FIFO mode: coercing post trigger length to maximum allowed value (step-size samples less than "
                    "the acquisition length)."
                )
                length_in_samples = self.acquisition_length_in_samples - get_memsize_step_size(self._model_number)
        self.write_to_spectrum_device_register(SPC_POSTTRIGGER, length_in_samples)

    def _coerce_num_samples_if_fifo(self, value: int) -> int:
        if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI:
            if value != mod(value, get_memsize_step_size(self._model_number)):
                logger.warning(
                    f"FIFO mode: coercing length to nearest {get_memsize_step_size(self._model_number)}" f" samples"
                )
                value = int(value - mod(value, get_memsize_step_size(self._model_number)))
        return value

    @property
    def number_of_averages(self) -> int:
        return self.read_spectrum_device_register(SPC_AVERAGES)

    def set_number_of_averages(self, num_averages: int) -> None:
        if num_averages > 0:
            self.write_to_spectrum_device_register(SPC_AVERAGES, num_averages)
        else:
            raise ValueError("Number of averages must be greater than 0.")

    @property
    def acquisition_mode(self) -> AcquisitionMode:
        """The currently enabled card mode. Will raise an exception if the current mode is not supported by
        `spectrumdevice`.

        Returns:
            mode (`AcquisitionMode`): The currently enabled card acquisition mode."""
        return AcquisitionMode(self.read_spectrum_device_register(SPC_CARDMODE))

    def set_acquisition_mode(self, mode: AcquisitionMode) -> None:
        """Change the currently enabled card mode. See `AcquisitionMode` and the Spectrum documentation
        for the available modes.

        Args:
            mode (`AcquisitionMode`): The desired acquisition mode."""
        self.write_to_spectrum_device_register(SPC_CARDMODE, mode.value)

    @property
    def batch_size(self) -> int:
        return self._batch_size

    def set_batch_size(self, batch_size: int) -> None:
        self._batch_size = batch_size

    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide a `TransferBuffer` object for receiving acquired samples from the device.

        If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode,
         with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode,
         one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be
         created using the Timestamper class.

        Args:
            buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed
                `TransferBuffer` set up for card-to-PC transfer of samples ("data"). The size of the buffer should be
                chosen according to the current number of active channels, the acquisition length and the number
                of acquisitions which you intend to download at a time using get_waveforms().
        """
        self._set_or_update_transfer_buffer_attribute(buffer)
        if self._transfer_buffer is not None:
            set_transfer_buffer(self._handle, self._transfer_buffer)

    def _set_or_update_transfer_buffer_attribute(self, buffer: Optional[Sequence[TransferBuffer]]) -> None:
        if buffer:
            self._transfer_buffer = buffer[0]
            if self._transfer_buffer.direction != BufferDirection.SPCM_DIR_CARDTOPC:
                raise ValueError("Digitisers need a transfer buffer with direction BufferDirection.SPCM_DIR_CARDTOPC")
            if self._transfer_buffer.type != BufferType.SPCM_BUF_DATA:
                raise ValueError("Digitisers need a transfer buffer with type BufferDirection.SPCM_BUF_DATA")
        elif self._transfer_buffer is None:
            if self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE):
                samples_per_batch = (
                    self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums) * self._batch_size
                )
                pages_per_batch = samples_per_batch * self.bytes_per_sample / PAGE_SIZE_IN_BYTES

                if pages_per_batch < DEFAULT_NOTIFY_SIZE_IN_PAGES:
                    notify_size = pages_per_batch
                else:
                    notify_size = DEFAULT_NOTIFY_SIZE_IN_PAGES

                # Make transfer buffer big enough to hold all samples in the batch
                self._transfer_buffer = create_samples_acquisition_transfer_buffer(
                    size_in_samples=samples_per_batch,
                    notify_size_in_pages=notify_size,
                    bytes_per_sample=self.bytes_per_sample,
                )
            elif self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE):
                self._transfer_buffer = create_samples_acquisition_transfer_buffer(
                    size_in_samples=self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums),
                    notify_size_in_pages=0,
                    bytes_per_sample=self.bytes_per_sample,
                )
            else:
                raise ValueError("AcquisitionMode not recognised")

    def __str__(self) -> str:
        return f"Card {self._visa_string}"

Class for controlling individual Spectrum digitiser cards.

#   SpectrumDigitiserCard(device_number: int, ip_address: Optional[str] = None)
View Source
    def __init__(self, device_number: int, ip_address: Optional[str] = None) -> None:
        """
        Args:
            device_number (int): Index of the card to control. If only one card is present, set to 0.
            ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.

        """
        # pass unused args up the inheritance hierarchy
        super().__init__(device_number=device_number, ip_address=ip_address)

        if self.type != CardType.SPCM_TYPE_AI:
            raise SpectrumCardIsNotADigitiser(self.type)
        self._acquisition_mode = self.acquisition_mode
        self._timestamper: Optional[Timestamper] = None
        self._batch_size = 1
Args
  • device_number (int): Index of the card to control. If only one card is present, set to 0.
  • ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.
#   def enable_timestamping(self) -> None:
View Source
    def enable_timestamping(self) -> None:
        self._timestamper = Timestamper(self, self._handle)
#   def wait_for_acquisition_to_complete(self) -> None:
View Source
    def wait_for_acquisition_to_complete(self) -> None:
        """Blocks until the current acquisition has finished, or the timeout is reached.

        In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after `start()`. Once the call
            to `wait_for_acquisition_to_complete()` returns, the newly acquired samples are in the on_device buffer and
            ready for transfer to the `TransferBuffer` using `start_transfer()`.

        In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until
            `stop()` is called, so `wait_for_acquisition_to_complete()` should not be used.

        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WAITREADY)

Blocks until the current acquisition has finished, or the timeout is reached.

In Standard Single mode (SPC_REC_STD_SINGLE), this should be called after start(). Once the call to wait_for_acquisition_to_complete() returns, the newly acquired samples are in the on_device buffer and ready for transfer to the TransferBuffer using start_transfer().

In FIFO mode (SPC_REC_FIFO_MULTI), the card will continue to acquire samples until stop() is called, so wait_for_acquisition_to_complete() should not be used.

#   def get_waveforms(self) -> List[List[numpy.ndarray[Any, numpy.dtype[numpy.float64]]]]:
View Source
    def get_waveforms(self) -> List[List[NDArray[float_]]]:
        """Get a list of the most recently transferred waveforms, in channel order.

        This method copies and reshapes the samples in the `TransferBuffer` into a list of lists of 1D NumPy arrays
        (waveforms) and returns the list.

        In Standard Single mode (SPC_REC_STD_SINGLE), `get_waveforms()` should be called after
        `wait_for_transfer_to_complete()` has returned.

        In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the
        `TransferBuffer`, this method should be called in a loop . The method will block until each new transfer is
        received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example,
        this would the rate at which your trigger source was running).

        Returns:
             waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition
             and one array per enabled channel, in channel order. To average the acquisitions:
                `np.array(waveforms).mean(axis=0)`

        """
        if self._transfer_buffer is None:
            raise SpectrumNoTransferBufferDefined("Cannot find a samples transfer buffer")

        num_read_bytes = 0
        num_samples_per_frame = self.acquisition_length_in_samples * len(self.enabled_analog_channel_nums)
        num_expected_bytes_per_frame = num_samples_per_frame * self._transfer_buffer.data_array.itemsize
        raw_samples = zeros(num_samples_per_frame * self._batch_size, dtype=self._transfer_buffer.data_array.dtype)

        if self.acquisition_mode in (AcquisitionMode.SPC_REC_STD_SINGLE, AcquisitionMode.SPC_REC_STD_AVERAGE):
            raw_samples = self._transfer_buffer.copy_contents()

        elif self.acquisition_mode in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE):
            self.wait_for_transfer_chunk_to_complete()

            while num_read_bytes < (num_expected_bytes_per_frame * self._batch_size):
                num_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_LEN)
                position_of_available_bytes = self.read_spectrum_device_register(SPC_DATA_AVAIL_USER_POS)

                # Don't allow reading over the end of the transfer buffer
                if (
                    position_of_available_bytes + num_available_bytes
                ) > self._transfer_buffer.data_array_length_in_bytes:
                    num_available_bytes = self._transfer_buffer.data_array_length_in_bytes - position_of_available_bytes

                # Don't allow reading over the end of the current acquisition:
                if (num_read_bytes + num_available_bytes) > (num_expected_bytes_per_frame * self._batch_size):
                    num_available_bytes = (num_expected_bytes_per_frame * self._batch_size) - num_read_bytes

                num_available_samples = num_available_bytes // self._transfer_buffer.data_array.itemsize
                num_read_samples = num_read_bytes // self._transfer_buffer.data_array.itemsize

                raw_samples[
                    num_read_samples : num_read_samples + num_available_samples
                ] = self._transfer_buffer.read_chunk(position_of_available_bytes, num_available_bytes)
                self.write_to_spectrum_device_register(SPC_DATA_AVAIL_CARD_LEN, num_available_bytes)

                num_read_bytes += num_available_bytes

        waveforms_in_columns = raw_samples.reshape(
            (self._batch_size, self.acquisition_length_in_samples, len(self.enabled_analog_channel_nums))
        )

        repeat_acquisitions = []
        for n in range(self._batch_size):
            repeat_acquisitions.append(
                [
                    cast(
                        SpectrumDigitiserAnalogChannel, self.analog_channels[ch_num]
                    ).convert_raw_waveform_to_voltage_waveform(squeeze(waveform))
                    for ch_num, waveform in zip(self.enabled_analog_channel_nums, waveforms_in_columns[n, :, :].T)
                ]
            )

        return repeat_acquisitions

Get a list of the most recently transferred waveforms, in channel order.

This method copies and reshapes the samples in the TransferBuffer into a list of lists of 1D NumPy arrays (waveforms) and returns the list.

In Standard Single mode (SPC_REC_STD_SINGLE), get_waveforms() should be called after wait_for_transfer_to_complete() has returned.

In FIFO mode (SPC_REC_FIFO_MULTI), while the card is continuously acquiring samples and transferring them to the TransferBuffer, this method should be called in a loop . The method will block until each new transfer is received, so the loop will run at the same rate as the acquisition (in SPC_REC_FIFO_MULTI mode, for example, this would the rate at which your trigger source was running).

Returns

waveforms (List[List[NDArray[float_]]]): A list of lists of 1D numpy arrays, one inner list per acquisition and one array per enabled channel, in channel order. To average the acquisitions: np.array(waveforms).mean(axis=0)

#   def get_timestamp(self) -> Optional[datetime.datetime]:
View Source
    def get_timestamp(self) -> Optional[datetime.datetime]:
        """Get timestamp for the last acquisition"""
        if self._timestamper is not None:
            return self._timestamper.get_timestamp()
        else:
            return None

Get timestamp for the last acquisition

#   acquisition_length_in_samples: int

The current recording length (per channel) in samples.

Returns

length_in_samples (int): The current recording length ('acquisition length') in samples.

#   def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
View Source
    def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
        """Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size
          allowed by the connected card type.

        Args:
            length_in_samples (int): The desired recording length ('acquisition length'), in samples.
        """
        length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples)
        self.write_to_spectrum_device_register(SPC_SEGMENTSIZE, length_in_samples)
        self.write_to_spectrum_device_register(SPC_MEMSIZE, length_in_samples)

Change the recording length (per channel). In FIFO mode, it will be quantised according to the step size allowed by the connected card type.

Args
  • length_in_samples (int): The desired recording length ('acquisition length'), in samples.
#   post_trigger_length_in_samples: int

The number of samples of the recording that will contain data received after the trigger event.

Returns

length_in_samples (int): The currently set post trigger length in samples.

#   def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None:
View Source
    def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None:
        """Change the number of samples of the recording that will contain data received after the trigger event.
        In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card.

        Args:
            length_in_samples (int): The desired post trigger length in samples."""
        length_in_samples = self._coerce_num_samples_if_fifo(length_in_samples)
        if self.acquisition_mode == AcquisitionMode.SPC_REC_FIFO_MULTI:
            if (self.acquisition_length_in_samples - length_in_samples) < get_memsize_step_size(self._model_number):
                logger.warning(
                    "FIFO mode: coercing post trigger length to maximum allowed value (step-size samples less than "
                    "the acquisition length)."
                )
                length_in_samples = self.acquisition_length_in_samples - get_memsize_step_size(self._model_number)
        self.write_to_spectrum_device_register(SPC_POSTTRIGGER, length_in_samples)

Change the number of samples of the recording that will contain data received after the trigger event. In FIFO mode, this will be quantised according to the minimum step size allowed by the connected card.

Args
  • length_in_samples (int): The desired post trigger length in samples.
#   number_of_averages: int
#   def set_number_of_averages(self, num_averages: int) -> None:
View Source
    def set_number_of_averages(self, num_averages: int) -> None:
        if num_averages > 0:
            self.write_to_spectrum_device_register(SPC_AVERAGES, num_averages)
        else:
            raise ValueError("Number of averages must be greater than 0.")

The currently enabled card mode. Will raise an exception if the current mode is not supported by spectrumdevice.

Returns

mode (AcquisitionMode): The currently enabled card acquisition mode.

#   def set_acquisition_mode( self, mode: spectrumdevice.settings.device_modes.AcquisitionMode ) -> None:
View Source
    def set_acquisition_mode(self, mode: AcquisitionMode) -> None:
        """Change the currently enabled card mode. See `AcquisitionMode` and the Spectrum documentation
        for the available modes.

        Args:
            mode (`AcquisitionMode`): The desired acquisition mode."""
        self.write_to_spectrum_device_register(SPC_CARDMODE, mode.value)

Change the currently enabled card mode. See AcquisitionMode and the Spectrum documentation for the available modes.

Args
  • mode (AcquisitionMode): The desired acquisition mode.
#   batch_size: int
#   def set_batch_size(self, batch_size: int) -> None:
View Source
    def set_batch_size(self, batch_size: int) -> None:
        self._batch_size = batch_size
#   def define_transfer_buffer( self, buffer: Optional[Sequence[spectrumdevice.settings.transfer_buffer.TransferBuffer]] = None ) -> None:
View Source
    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide a `TransferBuffer` object for receiving acquired samples from the device.

        If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode,
         with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode,
         one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be
         created using the Timestamper class.

        Args:
            buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed
                `TransferBuffer` set up for card-to-PC transfer of samples ("data"). The size of the buffer should be
                chosen according to the current number of active channels, the acquisition length and the number
                of acquisitions which you intend to download at a time using get_waveforms().
        """
        self._set_or_update_transfer_buffer_attribute(buffer)
        if self._transfer_buffer is not None:
            set_transfer_buffer(self._handle, self._transfer_buffer)

Create or provide a TransferBuffer object for receiving acquired samples from the device.

If no buffer is provided, and no buffer has previously been defined, then one will be created: in FIFO mode, with a notify size of 10 pages or the size of the acquisition, whichever is smaller; in Standard Single mode, one with the correct length and no notify size. A separate buffer for transferring Timestamps will also be created using the Timestamper class.

Args
  • buffer (Optional[List[TransferBuffer]]): A length-1 list containing a pre-constructed TransferBuffer set up for card-to-PC transfer of samples ("data"). The size of the buffer should be chosen according to the current number of active channels, the acquisition length and the number of acquisitions which you intend to download at a time using get_waveforms().
View Source
class SpectrumDigitiserStarHub(
    AbstractSpectrumStarHub[
        SpectrumDigitiserCard, SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface
    ],
    AbstractSpectrumDigitiser,
):
    """Composite class of `SpectrumDigitiserCard` for controlling a StarHub digitiser device, for example the Spectrum
    NetBox. StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the
    child cards of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the
    control of a StarHub device as if it were a single Spectrum card."""

    def __init__(self, device_number: int, child_cards: tuple[SpectrumDigitiserCard, ...], master_card_index: int):
        """
        Args:
            device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
            child_cards (Sequence[`SpectrumDigitiserCard`]): A list of `SpectrumCard` objects defining the child cards
                located within the StarHub, correctly constructed with their IP addresses and/or device numbers.
            master_card_index (int): The position within child_cards where the master card (the card which controls the
                clock) is located.
        """
        super().__init__(device_number=device_number, child_cards=child_cards, master_card_index=master_card_index)
        self._acquisition_mode = self.acquisition_mode

    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide `CardToPCDataTransferBuffer` objects for receiving acquired samples from the child cards.
        If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See
        `SpectrumDigitiserCard.define_transfer_buffer()` for more information

        Args:
            buffer (Optional[`CardToPCDataTransferBuffer`]): A list containing pre-constructed
            `CardToPCDataTransferBuffer` objects, one for each child card. The size of the buffers should be chosen
            according to the current number of active channels in each card and the acquisition length.
        """
        if buffer:
            for card, buff in zip(self._child_cards, buffer):
                card.define_transfer_buffer([buff])
        else:
            for card in self._child_cards:
                card.define_transfer_buffer()

    def wait_for_acquisition_to_complete(self) -> None:
        """Wait for each card to finish its acquisition. See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`
        for more information."""
        for card in self._child_cards:
            card.wait_for_acquisition_to_complete()

    def get_waveforms(self) -> List[List[NDArray[float_]]]:
        """Get a list of the most recently transferred waveforms.

        This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
        See `SpectrumDigitiserCard.get_waveforms()` for more information.

        Args:
            num_acquisitions (int): For FIFO mode:  the number of acquisitions (i.e. trigger events) to wait for and
            copy. Acquiring in batches (num_acquisitions > 1) can improve performance.

        Returns:
            waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition,
              and one array per enabled channel, in channel order.
        """
        card_ids_and_waveform_sets: Dict[str, list[list[NDArray[float_]]]] = {}

        def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None:
            this_cards_waveforms = digitiser_card.get_waveforms()
            card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms

        threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards]

        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()

        waveform_sets_all_cards_ordered = []
        for n in range(self.batch_size):
            waveforms_in_this_batch = []
            for card in self._child_cards:
                waveforms_in_this_batch += card_ids_and_waveform_sets[str(card)][n]
            waveform_sets_all_cards_ordered.append(waveforms_in_this_batch)

        return waveform_sets_all_cards_ordered

    def get_timestamp(self) -> Optional[datetime.datetime]:
        """Get timestamp for the last acquisition"""
        return self._triggering_card.get_timestamp()

    def enable_timestamping(self) -> None:
        self._triggering_card.enable_timestamping()

    @property
    def acquisition_length_in_samples(self) -> int:
        """The currently set recording length, which should be the same for all child cards. If different recording
        lengths are set, an exception is raised. See `SpectrumDigitiserCard.acquisition_length_in_samples` for more
        information.

        Returns:
            length_in_samples: The currently set acquisition length in samples."""
        lengths = []
        for d in self._child_cards:
            lengths.append(d.acquisition_length_in_samples)
        return check_settings_constant_across_devices(lengths, __name__)

    def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
        """Set a new recording length for all child cards. See `SpectrumDigitiserCard.set_acquisition_length_in_samples()`
        for more information.

        Args:
            length_in_samples (int): The desired acquisition length in samples."""
        for d in self._child_cards:
            d.set_acquisition_length_in_samples(length_in_samples)

    @property
    def post_trigger_length_in_samples(self) -> int:
        """The number of samples recorded after a trigger is received. This should be consistent across all child
        cards. If different values are found across the child cards, an exception is raised. See
        `SpectrumDigitiserCard.post_trigger_length_in_samples` for more information.

        Returns:
            length_in_samples (int): The current post trigger length in samples.
        """
        lengths = []
        for d in self._child_cards:
            lengths.append(d.post_trigger_length_in_samples)
        return check_settings_constant_across_devices(lengths, __name__)

    def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None:
        """Set a new post trigger length for all child cards. See `SpectrumDigitiserCard.set_post_trigger_length_in_samples()`
        for more information.

        Args:
            length_in_samples (int): The desired post trigger length in samples.
        """
        for d in self._child_cards:
            d.set_post_trigger_length_in_samples(length_in_samples)

    @property
    def acquisition_mode(self) -> AcquisitionMode:
        """The acquisition mode, which should be the same for all child cards. If it's not, an exception is raised.
        See `SpectrumDigitiserCard.acquisition_mode` for more information.

        Returns:
            mode (`AcquisitionMode`): The currently enabled acquisition mode.
        """
        modes = []
        for d in self._child_cards:
            modes.append(d.acquisition_mode)
        return AcquisitionMode(check_settings_constant_across_devices([m.value for m in modes], __name__))

    def set_acquisition_mode(self, mode: AcquisitionMode) -> None:
        """Change the acquisition mode for all child cards. See `SpectrumDigitiserCard.set_acquisition_mode()` for more
        information.

        Args:
            mode (`AcquisitionMode`): The desired acquisition mode."""
        for d in self._child_cards:
            d.set_acquisition_mode(mode)

    @property
    def batch_size(self) -> int:
        batch_sizes = []
        for d in self._child_cards:
            batch_sizes.append(d.batch_size)
        return check_settings_constant_across_devices(batch_sizes, __name__)

    def set_batch_size(self, batch_size: int) -> None:
        for d in self._child_cards:
            d.set_batch_size(batch_size)

    def force_trigger(self) -> None:
        for d in self._child_cards:
            d.force_trigger()

    @property
    def type(self) -> CardType:
        return self._child_cards[0].type

    @property
    def model_number(self) -> ModelNumber:
        return self._child_cards[0].model_number

    @property
    def analog_channels(self) -> Sequence[SpectrumDigitiserAnalogChannelInterface]:
        """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for
        more information.

        Returns:
            channels (Sequence[`SpectrumDigitiserAnalogChannelInterface`]):
            A tuple of `SpectrumDigitiserAnalogChannelInterface` objects.
        """
        return super().analog_channels

Composite class of SpectrumDigitiserCard for controlling a StarHub digitiser device, for example the Spectrum NetBox. StarHub digitiser devices are composites of more than one Spectrum digitiser card. Acquisition from the child cards of a StarHub is synchronised, aggregating the channels of all child cards. This class enables the control of a StarHub device as if it were a single Spectrum card.

#   SpectrumDigitiserStarHub( device_number: int, child_cards: tuple[spectrumdevice.devices.digitiser.digitiser_card.SpectrumDigitiserCard, ...], master_card_index: int )
View Source
    def __init__(self, device_number: int, child_cards: tuple[SpectrumDigitiserCard, ...], master_card_index: int):
        """
        Args:
            device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
            child_cards (Sequence[`SpectrumDigitiserCard`]): A list of `SpectrumCard` objects defining the child cards
                located within the StarHub, correctly constructed with their IP addresses and/or device numbers.
            master_card_index (int): The position within child_cards where the master card (the card which controls the
                clock) is located.
        """
        super().__init__(device_number=device_number, child_cards=child_cards, master_card_index=master_card_index)
        self._acquisition_mode = self.acquisition_mode
Args
  • device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
  • child_cards (Sequence[SpectrumDigitiserCard]): A list of SpectrumCard objects defining the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers.
  • master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
#   def define_transfer_buffer( self, buffer: Optional[Sequence[spectrumdevice.settings.transfer_buffer.TransferBuffer]] = None ) -> None:
View Source
    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide `CardToPCDataTransferBuffer` objects for receiving acquired samples from the child cards.
        If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See
        `SpectrumDigitiserCard.define_transfer_buffer()` for more information

        Args:
            buffer (Optional[`CardToPCDataTransferBuffer`]): A list containing pre-constructed
            `CardToPCDataTransferBuffer` objects, one for each child card. The size of the buffers should be chosen
            according to the current number of active channels in each card and the acquisition length.
        """
        if buffer:
            for card, buff in zip(self._child_cards, buffer):
                card.define_transfer_buffer([buff])
        else:
            for card in self._child_cards:
                card.define_transfer_buffer()

Create or provide CardToPCDataTransferBuffer objects for receiving acquired samples from the child cards. If no buffers are provided, they will be created with the correct size and a board_memory_offset_bytes of 0. See SpectrumDigitiserCard.define_transfer_buffer() for more information

Args
  • buffer (Optional[CardToPCDataTransferBuffer]): A list containing pre-constructed
  • CardToPCDataTransferBuffer objects, one for each child card. The size of the buffers should be chosen
  • according to the current number of active channels in each card and the acquisition length.
#   def wait_for_acquisition_to_complete(self) -> None:
View Source
    def wait_for_acquisition_to_complete(self) -> None:
        """Wait for each card to finish its acquisition. See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`
        for more information."""
        for card in self._child_cards:
            card.wait_for_acquisition_to_complete()

Wait for each card to finish its acquisition. See SpectrumDigitiserCard.wait_for_acquisition_to_complete() for more information.

#   def get_waveforms(self) -> List[List[numpy.ndarray[Any, numpy.dtype[numpy.float64]]]]:
View Source
    def get_waveforms(self) -> List[List[NDArray[float_]]]:
        """Get a list of the most recently transferred waveforms.

        This method gets the waveforms from each child card and joins them into a new list, ordered by channel number.
        See `SpectrumDigitiserCard.get_waveforms()` for more information.

        Args:
            num_acquisitions (int): For FIFO mode:  the number of acquisitions (i.e. trigger events) to wait for and
            copy. Acquiring in batches (num_acquisitions > 1) can improve performance.

        Returns:
            waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition,
              and one array per enabled channel, in channel order.
        """
        card_ids_and_waveform_sets: Dict[str, list[list[NDArray[float_]]]] = {}

        def _get_waveforms(digitiser_card: SpectrumDigitiserCard) -> None:
            this_cards_waveforms = digitiser_card.get_waveforms()
            card_ids_and_waveform_sets[str(digitiser_card)] = this_cards_waveforms

        threads = [Thread(target=_get_waveforms, args=(card,)) for card in self._child_cards]

        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()

        waveform_sets_all_cards_ordered = []
        for n in range(self.batch_size):
            waveforms_in_this_batch = []
            for card in self._child_cards:
                waveforms_in_this_batch += card_ids_and_waveform_sets[str(card)][n]
            waveform_sets_all_cards_ordered.append(waveforms_in_this_batch)

        return waveform_sets_all_cards_ordered

Get a list of the most recently transferred waveforms.

This method gets the waveforms from each child card and joins them into a new list, ordered by channel number. See SpectrumDigitiserCard.get_waveforms() for more information.

Args
  • num_acquisitions (int): For FIFO mode: the number of acquisitions (i.e. trigger events) to wait for and
  • copy. Acquiring in batches (num_acquisitions > 1) can improve performance.
Returns

waveforms (List[List[NDArray[float_]]]): A list lists of 1D numpy arrays, one inner list per acquisition, and one array per enabled channel, in channel order.

#   def get_timestamp(self) -> Optional[datetime.datetime]:
View Source
    def get_timestamp(self) -> Optional[datetime.datetime]:
        """Get timestamp for the last acquisition"""
        return self._triggering_card.get_timestamp()

Get timestamp for the last acquisition

#   def enable_timestamping(self) -> None:
View Source
    def enable_timestamping(self) -> None:
        self._triggering_card.enable_timestamping()
#   acquisition_length_in_samples: int

The currently set recording length, which should be the same for all child cards. If different recording lengths are set, an exception is raised. See SpectrumDigitiserCard.acquisition_length_in_samples for more information.

Returns

length_in_samples: The currently set acquisition length in samples.

#   def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
View Source
    def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
        """Set a new recording length for all child cards. See `SpectrumDigitiserCard.set_acquisition_length_in_samples()`
        for more information.

        Args:
            length_in_samples (int): The desired acquisition length in samples."""
        for d in self._child_cards:
            d.set_acquisition_length_in_samples(length_in_samples)

Set a new recording length for all child cards. See SpectrumDigitiserCard.set_acquisition_length_in_samples() for more information.

Args
  • length_in_samples (int): The desired acquisition length in samples.
#   post_trigger_length_in_samples: int

The number of samples recorded after a trigger is received. This should be consistent across all child cards. If different values are found across the child cards, an exception is raised. See SpectrumDigitiserCard.post_trigger_length_in_samples for more information.

Returns

length_in_samples (int): The current post trigger length in samples.

#   def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None:
View Source
    def set_post_trigger_length_in_samples(self, length_in_samples: int) -> None:
        """Set a new post trigger length for all child cards. See `SpectrumDigitiserCard.set_post_trigger_length_in_samples()`
        for more information.

        Args:
            length_in_samples (int): The desired post trigger length in samples.
        """
        for d in self._child_cards:
            d.set_post_trigger_length_in_samples(length_in_samples)

Set a new post trigger length for all child cards. See SpectrumDigitiserCard.set_post_trigger_length_in_samples() for more information.

Args
  • length_in_samples (int): The desired post trigger length in samples.

The acquisition mode, which should be the same for all child cards. If it's not, an exception is raised. See SpectrumDigitiserCard.acquisition_mode for more information.

Returns

mode (AcquisitionMode): The currently enabled acquisition mode.

#   def set_acquisition_mode( self, mode: spectrumdevice.settings.device_modes.AcquisitionMode ) -> None:
View Source
    def set_acquisition_mode(self, mode: AcquisitionMode) -> None:
        """Change the acquisition mode for all child cards. See `SpectrumDigitiserCard.set_acquisition_mode()` for more
        information.

        Args:
            mode (`AcquisitionMode`): The desired acquisition mode."""
        for d in self._child_cards:
            d.set_acquisition_mode(mode)

Change the acquisition mode for all child cards. See SpectrumDigitiserCard.set_acquisition_mode() for more information.

Args
  • mode (AcquisitionMode): The desired acquisition mode.
#   batch_size: int
#   def set_batch_size(self, batch_size: int) -> None:
View Source
    def set_batch_size(self, batch_size: int) -> None:
        for d in self._child_cards:
            d.set_batch_size(batch_size)
#   def force_trigger(self) -> None:
View Source
    def force_trigger(self) -> None:
        for d in self._child_cards:
            d.force_trigger()

A tuple containing of all the channels of the child cards of the hub. See AbstractSpectrumCard.channels for more information.

Returns

channels (Sequence[SpectrumDigitiserAnalogChannelInterface]): A tuple of SpectrumDigitiserAnalogChannelInterface objects.

View Source
class MockSpectrumDigitiserCard(MockAbstractSpectrumDigitiser, MockAbstractSpectrumCard, SpectrumDigitiserCard):
    """A mock spectrum card, for testing software written to use the `SpectrumDigitiserCard` class.

    This class overrides methods of `SpectrumDigitiserCard` that communicate with hardware with mocked implementations,
    allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. It overrides
    methods to use to set up a mock 'on-device buffer' attribute into which a mock waveform source will write
    samples. It also uses a MockTimestamper to generated timestamps for mock waveforms.
    """

    def __init__(
        self,
        device_number: int,
        model: ModelNumber,
        mock_source_frame_rate_hz: float,
        num_modules: int,
        num_channels_per_module: int,
        card_features: Optional[list[CardFeature]] = None,
        advanced_card_features: Optional[list[AdvancedCardFeature]] = None,
    ):
        """
        Args:
            device_number (int): The index of the mock device to create. Used to create a name for the device which is
                used internally.
            model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths.
            mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing
                data to the mock spectrum card.
            num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this
                is read from the device so does not need to be set. See the Spectrum documentation to work out how many
                modules your hardware has.
            num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On
                real hardware, this is read from the device so does not need to be set.
            card_features (list[CardFeature]): List of available features of the mock device
            advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device

        """

        super().__init__(
            device_number=device_number,
            model=model,
            mock_source_frame_rate_hz=mock_source_frame_rate_hz,
            num_modules=num_modules,
            num_channels_per_module=num_channels_per_module,
            card_type=CardType.SPCM_TYPE_AI,
            card_features=card_features if card_features is not None else [],
            advanced_card_features=advanced_card_features if advanced_card_features is not None else [],
        )
        self._connect(self._visa_string)
        self._acquisition_mode = self.acquisition_mode
        self._previous_transfer_chunk_count = 0
        self._param_dict[TRANSFER_CHUNK_COUNTER] = 0

    def enable_timestamping(self) -> None:
        self._timestamper: MockTimestamper = MockTimestamper(self, self._handle)

    def set_acquisition_mode(self, mode: AcquisitionMode) -> None:
        """Mock timestamper needs to be recreated if the acquisition mode is changed."""
        super().set_acquisition_mode(mode)
        self._timestamper = MockTimestamper(self, self._handle)

    def set_sample_rate_in_hz(self, rate: int) -> None:
        """Mock timestamper needs to be recreated if the sample rate is changed."""
        super().set_sample_rate_in_hz(rate)
        self._timestamper = MockTimestamper(self, self._handle)

    def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
        """Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples.
        See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal
        attributes related to the mock on-device buffer can be set.

        Args:
            length_in_samples (int): Number of samples in each generated mock waveform
        """
        super().set_acquisition_length_in_samples(length_in_samples)

    def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
        """Set the channels to enable for the mock acquisition. See `SpectrumDigitiserCard` for more information. This
        method is overridden here only so that the internal attributes related to the mock on-device buffer
        can be set.

        Args:
            channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2].

        """
        if len(list(filter(lambda x: 0 <= x < len(self.analog_channels), channels_nums))) == len(channels_nums):
            super().set_enabled_analog_channels(channels_nums)
        else:
            raise SpectrumSettingsMismatchError("Not enough channels in mock device configuration.")

    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide a `TransferBuffer` object for receiving acquired samples from the device.

        See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it
        does not write to any hardware device."""
        self._set_or_update_transfer_buffer_attribute(buffer)

    def start_transfer(self) -> None:
        """See `SpectrumDigitiserCard.start_transfer()`."""
        pass

    def stop_transfer(self) -> None:
        """See `SpectrumDigitiserCard.stop_transfer()`."""
        pass

    def wait_for_transfer_chunk_to_complete(self) -> None:
        """See `SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()`. This mock implementation blocks until a
        new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER."""
        if self._transfer_buffer:
            t0 = perf_counter()
            t_elapsed = 0.0
            while (
                self._previous_transfer_chunk_count == self._param_dict[TRANSFER_CHUNK_COUNTER]
            ) and t_elapsed < MOCK_TRANSFER_TIMEOUT_IN_S:
                sleep(0.1)
                t_elapsed = perf_counter() - t0
            self._previous_transfer_chunk_count = self._param_dict[TRANSFER_CHUNK_COUNTER]
        else:
            raise SpectrumNoTransferBufferDefined("No transfer in progress.")

    def wait_for_acquisition_to_complete(self) -> None:
        """See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`. This mock implementation blocks until a mock
        acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out
        according to the `self.timeout_ms attribute`."""
        if self._acquisition_thread is not None:
            self._acquisition_thread.join(timeout=1e-3 * self.timeout_in_ms)
            if self._acquisition_thread.is_alive():
                logger.warning("A timeout occurred while waiting for mock acquisition to complete.")
        else:
            logger.warning("No acquisition in progress. Wait for acquisition to complete has no effect")

A mock spectrum card, for testing software written to use the SpectrumDigitiserCard class.

This class overrides methods of SpectrumDigitiserCard that communicate with hardware with mocked implementations, allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI. It overrides methods to use to set up a mock 'on-device buffer' attribute into which a mock waveform source will write samples. It also uses a MockTimestamper to generated timestamps for mock waveforms.

#   MockSpectrumDigitiserCard( device_number: int, model: spectrumdevice.settings.card_dependent_properties.ModelNumber, mock_source_frame_rate_hz: float, num_modules: int, num_channels_per_module: int, card_features: Optional[list[spectrumdevice.settings.card_features.CardFeature]] = None, advanced_card_features: Optional[list[spectrumdevice.settings.card_features.AdvancedCardFeature]] = None )
View Source
    def __init__(
        self,
        device_number: int,
        model: ModelNumber,
        mock_source_frame_rate_hz: float,
        num_modules: int,
        num_channels_per_module: int,
        card_features: Optional[list[CardFeature]] = None,
        advanced_card_features: Optional[list[AdvancedCardFeature]] = None,
    ):
        """
        Args:
            device_number (int): The index of the mock device to create. Used to create a name for the device which is
                used internally.
            model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths.
            mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing
                data to the mock spectrum card.
            num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this
                is read from the device so does not need to be set. See the Spectrum documentation to work out how many
                modules your hardware has.
            num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On
                real hardware, this is read from the device so does not need to be set.
            card_features (list[CardFeature]): List of available features of the mock device
            advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device

        """

        super().__init__(
            device_number=device_number,
            model=model,
            mock_source_frame_rate_hz=mock_source_frame_rate_hz,
            num_modules=num_modules,
            num_channels_per_module=num_channels_per_module,
            card_type=CardType.SPCM_TYPE_AI,
            card_features=card_features if card_features is not None else [],
            advanced_card_features=advanced_card_features if advanced_card_features is not None else [],
        )
        self._connect(self._visa_string)
        self._acquisition_mode = self.acquisition_mode
        self._previous_transfer_chunk_count = 0
        self._param_dict[TRANSFER_CHUNK_COUNTER] = 0
Args
  • device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally.
  • model (ModelNumber): The model of card to mock. Affects the allowed acquisition and post-trigger lengths.
  • mock_source_frame_rate_hz (float): Rate at which waveforms will be generated by the mock source providing data to the mock spectrum card.
  • num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has.
  • num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set.
  • card_features (list[CardFeature]): List of available features of the mock device
  • advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
#   def enable_timestamping(self) -> None:
View Source
    def enable_timestamping(self) -> None:
        self._timestamper: MockTimestamper = MockTimestamper(self, self._handle)
#   def set_acquisition_mode( self, mode: spectrumdevice.settings.device_modes.AcquisitionMode ) -> None:
View Source
    def set_acquisition_mode(self, mode: AcquisitionMode) -> None:
        """Mock timestamper needs to be recreated if the acquisition mode is changed."""
        super().set_acquisition_mode(mode)
        self._timestamper = MockTimestamper(self, self._handle)

Mock timestamper needs to be recreated if the acquisition mode is changed.

#   def set_sample_rate_in_hz(self, rate: int) -> None:
View Source
    def set_sample_rate_in_hz(self, rate: int) -> None:
        """Mock timestamper needs to be recreated if the sample rate is changed."""
        super().set_sample_rate_in_hz(rate)
        self._timestamper = MockTimestamper(self, self._handle)

Mock timestamper needs to be recreated if the sample rate is changed.

#   def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
View Source
    def set_acquisition_length_in_samples(self, length_in_samples: int) -> None:
        """Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples.
        See `SpectrumDigitiserCard` for more information. This method is overridden here only so that the internal
        attributes related to the mock on-device buffer can be set.

        Args:
            length_in_samples (int): Number of samples in each generated mock waveform
        """
        super().set_acquisition_length_in_samples(length_in_samples)

Set length of mock recording (per channel). In FIFO mode, this will be quantised to the nearest 8 samples. See SpectrumDigitiserCard for more information. This method is overridden here only so that the internal attributes related to the mock on-device buffer can be set.

Args
  • length_in_samples (int): Number of samples in each generated mock waveform
#   def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
View Source
    def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
        """Set the channels to enable for the mock acquisition. See `SpectrumDigitiserCard` for more information. This
        method is overridden here only so that the internal attributes related to the mock on-device buffer
        can be set.

        Args:
            channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2].

        """
        if len(list(filter(lambda x: 0 <= x < len(self.analog_channels), channels_nums))) == len(channels_nums):
            super().set_enabled_analog_channels(channels_nums)
        else:
            raise SpectrumSettingsMismatchError("Not enough channels in mock device configuration.")

Set the channels to enable for the mock acquisition. See SpectrumDigitiserCard for more information. This method is overridden here only so that the internal attributes related to the mock on-device buffer can be set.

Args
  • channels_nums (List[int]): List of mock channel indices to enable, e.g. [0, 1, 2].
#   def define_transfer_buffer( self, buffer: Optional[Sequence[spectrumdevice.settings.transfer_buffer.TransferBuffer]] = None ) -> None:
View Source
    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide a `TransferBuffer` object for receiving acquired samples from the device.

        See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it
        does not write to any hardware device."""
        self._set_or_update_transfer_buffer_attribute(buffer)

Create or provide a TransferBuffer object for receiving acquired samples from the device.

See SpectrumDigitiserCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.

#   def start_transfer(self) -> None:
View Source
    def start_transfer(self) -> None:
        """See `SpectrumDigitiserCard.start_transfer()`."""
        pass
#   def stop_transfer(self) -> None:
View Source
    def stop_transfer(self) -> None:
        """See `SpectrumDigitiserCard.stop_transfer()`."""
        pass
#   def wait_for_transfer_chunk_to_complete(self) -> None:
View Source
    def wait_for_transfer_chunk_to_complete(self) -> None:
        """See `SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete()`. This mock implementation blocks until a
        new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER."""
        if self._transfer_buffer:
            t0 = perf_counter()
            t_elapsed = 0.0
            while (
                self._previous_transfer_chunk_count == self._param_dict[TRANSFER_CHUNK_COUNTER]
            ) and t_elapsed < MOCK_TRANSFER_TIMEOUT_IN_S:
                sleep(0.1)
                t_elapsed = perf_counter() - t0
            self._previous_transfer_chunk_count = self._param_dict[TRANSFER_CHUNK_COUNTER]
        else:
            raise SpectrumNoTransferBufferDefined("No transfer in progress.")

See SpectrumDigitiserCard.wait_for_transfer_chunk_to_complete(). This mock implementation blocks until a new mock transfer has been completed by waiting for a change to TRANSFER_CHUNK_COUNTER.

#   def wait_for_acquisition_to_complete(self) -> None:
View Source
    def wait_for_acquisition_to_complete(self) -> None:
        """See `SpectrumDigitiserCard.wait_for_acquisition_to_complete()`. This mock implementation blocks until a mock
        acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out
        according to the `self.timeout_ms attribute`."""
        if self._acquisition_thread is not None:
            self._acquisition_thread.join(timeout=1e-3 * self.timeout_in_ms)
            if self._acquisition_thread.is_alive():
                logger.warning("A timeout occurred while waiting for mock acquisition to complete.")
        else:
            logger.warning("No acquisition in progress. Wait for acquisition to complete has no effect")

See SpectrumDigitiserCard.wait_for_acquisition_to_complete(). This mock implementation blocks until a mock acquisition has been completed (i.e. the acquisition thread has shut down) or the request has timed out according to the self.timeout_ms attribute.

View Source
class MockSpectrumDigitiserStarHub(MockAbstractSpectrumStarHub, SpectrumDigitiserStarHub):
    """A mock spectrum StarHub, for testing software written to use the `SpectrumStarHub` class.

    Overrides methods of `SpectrumStarHub` and `AbstractSpectrumDigitiser` that communicate with hardware with mocked
    implementations allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during
    CI."""

    def __init__(self, **kwargs: Any):
        """
        Args:
            child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumDigitiserCard` objects defining the
                properties of the child cards located within the mock hub.
            master_card_index (int): The position within child_cards where the master card (the card which controls the
                clock) is located.
        """
        super().__init__(param_dict=None, **kwargs)
        self._visa_string = "/mock" + self._visa_string
        self._connect(self._visa_string)
        self._acquisition_mode = self.acquisition_mode

    def start(self) -> None:
        """Start a mock acquisition

        See `AbstractSpectrumDevice.start()`. With a hardware device, StarHub's only need to be sent a single
        instruction to start acquisition, which they automatically relay to their child cards - hence why
        `start` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and
        `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock `implementation`, each card's acquisition is
        started individually.

        """
        for card in self._child_cards:
            card.start()

    def stop(self) -> None:
        """Stop a mock acquisition

        See `AbstractSpectrumDevice.stop_acquisition`. With a hardware device, StarHub's only need to be sent a single
        instruction to stop acquisition, which they automatically relay to their child cards - hence why
        `stop_acquisition()` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and
        `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock implementation, each card's acquisition is
        stopped individually.

        """
        for card in self._child_cards:
            card.stop()

A mock spectrum StarHub, for testing software written to use the SpectrumStarHub class.

Overrides methods of SpectrumStarHub and AbstractSpectrumDigitiser that communicate with hardware with mocked implementations allowing software to be tested without Spectrum hardware connected or drivers installed, e.g. during CI.

#   MockSpectrumDigitiserStarHub(**kwargs: Any)
View Source
    def __init__(self, **kwargs: Any):
        """
        Args:
            child_cards (Sequence[`MockSpectrumDigitiserCard`]): A list of `MockSpectrumDigitiserCard` objects defining the
                properties of the child cards located within the mock hub.
            master_card_index (int): The position within child_cards where the master card (the card which controls the
                clock) is located.
        """
        super().__init__(param_dict=None, **kwargs)
        self._visa_string = "/mock" + self._visa_string
        self._connect(self._visa_string)
        self._acquisition_mode = self.acquisition_mode
Args
  • child_cards (Sequence[MockSpectrumDigitiserCard]): A list of MockSpectrumDigitiserCard objects defining the properties of the child cards located within the mock hub.
  • master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
#   def start(self) -> None:
View Source
    def start(self) -> None:
        """Start a mock acquisition

        See `AbstractSpectrumDevice.start()`. With a hardware device, StarHub's only need to be sent a single
        instruction to start acquisition, which they automatically relay to their child cards - hence why
        `start` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and
        `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock `implementation`, each card's acquisition is
        started individually.

        """
        for card in self._child_cards:
            card.start()

Start a mock acquisition

See AbstractSpectrumDevice.start(). With a hardware device, StarHub's only need to be sent a single instruction to start acquisition, which they automatically relay to their child cards - hence why start is implemented in AbstractSpectrumDevice (base class to both SpectrumDigitiserCard and SpectrumStarHub) rather than in SpectrumStarHub. In this mock implementation, each card's acquisition is started individually.

#   def stop(self) -> None:
View Source
    def stop(self) -> None:
        """Stop a mock acquisition

        See `AbstractSpectrumDevice.stop_acquisition`. With a hardware device, StarHub's only need to be sent a single
        instruction to stop acquisition, which they automatically relay to their child cards - hence why
        `stop_acquisition()` is implemented in `AbstractSpectrumDevice` (base class to both `SpectrumDigitiserCard` and
        `SpectrumStarHub`) rather than in `SpectrumStarHub`. In this mock implementation, each card's acquisition is
        stopped individually.

        """
        for card in self._child_cards:
            card.stop()

Stop a mock acquisition

See AbstractSpectrumDevice.stop_acquisition. With a hardware device, StarHub's only need to be sent a single instruction to stop acquisition, which they automatically relay to their child cards - hence why stop_acquisition() is implemented in AbstractSpectrumDevice (base class to both SpectrumDigitiserCard and SpectrumStarHub) rather than in SpectrumStarHub. In this mock implementation, each card's acquisition is stopped individually.

View Source
class AbstractSpectrumDigitiser(
    AbstractSpectrumDevice[SpectrumDigitiserAnalogChannelInterface, SpectrumDigitiserIOLineInterface],
    SpectrumDigitiserInterface,
    ABC,
):
    """Abstract superclass which implements methods common to all Spectrum digitiser devices. Instances of this class
    cannot be constructed directly. Instead, construct instances of the concrete classes (`SpectrumDigitiserCard`,
    `SpectrumDigitiserStarHub` or their mock equivalents) which inherit the methods defined here. Note that
    the mock devices override several of the methods defined here."""

    def configure_acquisition(self, settings: AcquisitionSettings) -> None:
        """Apply all the settings contained in an `AcquisitionSettings` dataclass to the device.

        Args:
            settings (`AcquisitionSettings`): An `AcquisitionSettings` dataclass containing the setting values to apply.
        """
        if settings.batch_size > 1 and settings.acquisition_mode == AcquisitionMode.SPC_REC_STD_SINGLE:
            raise ValueError("In standard single mode, only 1 acquisition can be downloaded at a time.")
        self._acquisition_mode = settings.acquisition_mode
        self.set_batch_size(settings.batch_size)
        self.set_acquisition_mode(settings.acquisition_mode)
        self.set_sample_rate_in_hz(settings.sample_rate_in_hz)
        self.set_acquisition_length_in_samples(settings.acquisition_length_in_samples)
        self.set_post_trigger_length_in_samples(
            settings.acquisition_length_in_samples - settings.pre_trigger_length_in_samples
        )
        self.set_timeout_in_ms(settings.timeout_in_ms)
        self.set_enabled_analog_channels(settings.enabled_channels)

        # Apply channel dependent settings
        for channel_num, v_range, v_offset, impedance in zip(
            self.enabled_analog_channel_nums,
            settings.vertical_ranges_in_mv,
            settings.vertical_offsets_in_percent,
            settings.input_impedances,
        ):
            channel = self.analog_channels[channel_num]
            channel.set_vertical_range_in_mv(v_range)
            channel.set_vertical_offset_in_percent(v_offset)
            channel.set_input_impedance(impedance)

        # Only some hardware has software programmable input coupling, so coupling can be None
        if settings.input_couplings is not None:
            for channel, coupling in zip(self.analog_channels, settings.input_couplings):
                channel.set_input_coupling(coupling)

        # Only some hardware has software programmable input paths, so it can be None
        if settings.input_paths is not None:
            for channel, path in zip(self.analog_channels, settings.input_paths):
                channel.set_input_path(path)

        # Write the configuration to the card
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)

        if settings.timestamping_enabled:
            self.enable_timestamping()

    def execute_standard_single_acquisition(self) -> Measurement:
        """Carry out a single measurement in standard single mode and return the acquired waveforms.

        This method automatically carries out a standard single mode acquisition, including handling the creation
        of a `TransferBuffer` and the retrieval of the acquired waveforms. After being called, it will wait until a
        trigger event is received before carrying out the acquisition and then transferring and returning the acquired
        waveforms. The device must be configured in SPC_REC_STD_SINGLE acquisition mode.

        Returns:
            measurement (Measurement): A Measurement object. The `.waveforms` attribute of `measurement` will be a list
                of 1D NumPy arrays, each array containing the waveform data received on one channel, in channel order.
                The Waveform object also has a timestamp attribute, which (if timestamping was enabled in acquisition
                settings) contains the time at which the acquisition was triggered.
        """
        if self._acquisition_mode != AcquisitionMode.SPC_REC_STD_SINGLE:
            raise SpectrumWrongAcquisitionMode(
                "Set the acquisition mode to SPC_REC_STD_SINGLE using "
                "configure_acquisition() or set_acquisition_mode() before executing "
                "a standard single mode acquisition."
            )
        self.start()
        self.wait_for_acquisition_to_complete()
        self.define_transfer_buffer()
        self.start_transfer()
        self.wait_for_transfer_chunk_to_complete()
        waveforms = self.get_waveforms()[0]
        self.stop()  # Only strictly required for Mock devices. Should not affect hardware.
        return Measurement(waveforms=waveforms, timestamp=self.get_timestamp())

    def execute_finite_fifo_acquisition(self, num_measurements: int) -> List[Measurement]:
        """Carry out a finite number of FIFO mode measurements and then stop the acquisitions.

        This method automatically carries out a defined number of measurement in Multi FIFO mode, including handling the
        creation of a `TransferBuffer`, streaming the acquired waveforms to the PC, terminating the acquisition and
        returning the acquired waveforms. After being called, it will wait for the requested number of triggers to be
        received, generating the correct number of measurements. It retrieves each measurement's waveforms from the
        `TransferBuffer` as they arrive. Once the requested number of measurements have been received, the acquisition
        is terminated and the waveforms are returned. The device must be configured in SPC_REC_FIFO_MULTI or
        SPC_REC_FIFO_AVERAGE acquisition mode.

        Args:
            num_measurements (int): The number of measurements to carry out.
        Returns:
            measurements (List[Measurement]): A list of Measurement objects with length `num_measurements`. Each
                Measurement object has a `waveforms` attribute containing a list of 1D NumPy arrays. Each array is a
                waveform acquired from one channel. The arrays are in channel order. The Waveform objects also have a
                timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at
                which the acquisition was triggered.
        """
        if (num_measurements % self.batch_size) != 0:
            raise ValueError(
                "Number of measurements in a finite FIFO acquisition must be a multiple of the "
                " batch size configured using AbstractSpectrumDigitiser.configure_acquisition()."
            )
        self.execute_continuous_fifo_acquisition()
        measurements = []
        for _ in range(num_measurements // self.batch_size):
            measurements += [
                Measurement(waveforms=frame, timestamp=self.get_timestamp()) for frame in self.get_waveforms()
            ]
        self.stop()
        return measurements

    def execute_continuous_fifo_acquisition(self) -> None:
        """Start a continuous FIFO mode acquisition.

        This method automatically starts acquiring and streaming samples in FIFO mode, including handling the
        creation of a `TransferBuffer` and streaming the acquired waveforms to the PC. It will return almost
        instantaneously. The acquired waveforms must then be read out of the transfer buffer in a loop using the
        `get_waveforms()` method. Waveforms must be read at least as fast as they are being acquired.
        The FIFO acquisition and streaming will continue until `stop_acquisition()` is called. The device
        must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode."""
        if self._acquisition_mode not in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE):
            raise SpectrumWrongAcquisitionMode(
                "Set the acquisition mode to SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE using "
                "configure_acquisition() or set_acquisition_mode() before executing "
                "a fifo mode acquisition."
            )
        self.define_transfer_buffer()
        self.start()
        self.start_transfer()

Abstract superclass which implements methods common to all Spectrum digitiser devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes (SpectrumDigitiserCard, SpectrumDigitiserStarHub or their mock equivalents) which inherit the methods defined here. Note that the mock devices override several of the methods defined here.

#   def configure_acquisition(self, settings: spectrumdevice.settings.AcquisitionSettings) -> None:
View Source
    def configure_acquisition(self, settings: AcquisitionSettings) -> None:
        """Apply all the settings contained in an `AcquisitionSettings` dataclass to the device.

        Args:
            settings (`AcquisitionSettings`): An `AcquisitionSettings` dataclass containing the setting values to apply.
        """
        if settings.batch_size > 1 and settings.acquisition_mode == AcquisitionMode.SPC_REC_STD_SINGLE:
            raise ValueError("In standard single mode, only 1 acquisition can be downloaded at a time.")
        self._acquisition_mode = settings.acquisition_mode
        self.set_batch_size(settings.batch_size)
        self.set_acquisition_mode(settings.acquisition_mode)
        self.set_sample_rate_in_hz(settings.sample_rate_in_hz)
        self.set_acquisition_length_in_samples(settings.acquisition_length_in_samples)
        self.set_post_trigger_length_in_samples(
            settings.acquisition_length_in_samples - settings.pre_trigger_length_in_samples
        )
        self.set_timeout_in_ms(settings.timeout_in_ms)
        self.set_enabled_analog_channels(settings.enabled_channels)

        # Apply channel dependent settings
        for channel_num, v_range, v_offset, impedance in zip(
            self.enabled_analog_channel_nums,
            settings.vertical_ranges_in_mv,
            settings.vertical_offsets_in_percent,
            settings.input_impedances,
        ):
            channel = self.analog_channels[channel_num]
            channel.set_vertical_range_in_mv(v_range)
            channel.set_vertical_offset_in_percent(v_offset)
            channel.set_input_impedance(impedance)

        # Only some hardware has software programmable input coupling, so coupling can be None
        if settings.input_couplings is not None:
            for channel, coupling in zip(self.analog_channels, settings.input_couplings):
                channel.set_input_coupling(coupling)

        # Only some hardware has software programmable input paths, so it can be None
        if settings.input_paths is not None:
            for channel, path in zip(self.analog_channels, settings.input_paths):
                channel.set_input_path(path)

        # Write the configuration to the card
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)

        if settings.timestamping_enabled:
            self.enable_timestamping()

Apply all the settings contained in an AcquisitionSettings dataclass to the device.

Args
  • settings (AcquisitionSettings): An AcquisitionSettings dataclass containing the setting values to apply.
#   def execute_standard_single_acquisition(self) -> spectrumdevice.measurement.Measurement:
View Source
    def execute_standard_single_acquisition(self) -> Measurement:
        """Carry out a single measurement in standard single mode and return the acquired waveforms.

        This method automatically carries out a standard single mode acquisition, including handling the creation
        of a `TransferBuffer` and the retrieval of the acquired waveforms. After being called, it will wait until a
        trigger event is received before carrying out the acquisition and then transferring and returning the acquired
        waveforms. The device must be configured in SPC_REC_STD_SINGLE acquisition mode.

        Returns:
            measurement (Measurement): A Measurement object. The `.waveforms` attribute of `measurement` will be a list
                of 1D NumPy arrays, each array containing the waveform data received on one channel, in channel order.
                The Waveform object also has a timestamp attribute, which (if timestamping was enabled in acquisition
                settings) contains the time at which the acquisition was triggered.
        """
        if self._acquisition_mode != AcquisitionMode.SPC_REC_STD_SINGLE:
            raise SpectrumWrongAcquisitionMode(
                "Set the acquisition mode to SPC_REC_STD_SINGLE using "
                "configure_acquisition() or set_acquisition_mode() before executing "
                "a standard single mode acquisition."
            )
        self.start()
        self.wait_for_acquisition_to_complete()
        self.define_transfer_buffer()
        self.start_transfer()
        self.wait_for_transfer_chunk_to_complete()
        waveforms = self.get_waveforms()[0]
        self.stop()  # Only strictly required for Mock devices. Should not affect hardware.
        return Measurement(waveforms=waveforms, timestamp=self.get_timestamp())

Carry out a single measurement in standard single mode and return the acquired waveforms.

This method automatically carries out a standard single mode acquisition, including handling the creation of a TransferBuffer and the retrieval of the acquired waveforms. After being called, it will wait until a trigger event is received before carrying out the acquisition and then transferring and returning the acquired waveforms. The device must be configured in SPC_REC_STD_SINGLE acquisition mode.

Returns

measurement (Measurement): A Measurement object. The .waveforms attribute of measurement will be a list of 1D NumPy arrays, each array containing the waveform data received on one channel, in channel order. The Waveform object also has a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered.

#   def execute_finite_fifo_acquisition( self, num_measurements: int ) -> List[spectrumdevice.measurement.Measurement]:
View Source
    def execute_finite_fifo_acquisition(self, num_measurements: int) -> List[Measurement]:
        """Carry out a finite number of FIFO mode measurements and then stop the acquisitions.

        This method automatically carries out a defined number of measurement in Multi FIFO mode, including handling the
        creation of a `TransferBuffer`, streaming the acquired waveforms to the PC, terminating the acquisition and
        returning the acquired waveforms. After being called, it will wait for the requested number of triggers to be
        received, generating the correct number of measurements. It retrieves each measurement's waveforms from the
        `TransferBuffer` as they arrive. Once the requested number of measurements have been received, the acquisition
        is terminated and the waveforms are returned. The device must be configured in SPC_REC_FIFO_MULTI or
        SPC_REC_FIFO_AVERAGE acquisition mode.

        Args:
            num_measurements (int): The number of measurements to carry out.
        Returns:
            measurements (List[Measurement]): A list of Measurement objects with length `num_measurements`. Each
                Measurement object has a `waveforms` attribute containing a list of 1D NumPy arrays. Each array is a
                waveform acquired from one channel. The arrays are in channel order. The Waveform objects also have a
                timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at
                which the acquisition was triggered.
        """
        if (num_measurements % self.batch_size) != 0:
            raise ValueError(
                "Number of measurements in a finite FIFO acquisition must be a multiple of the "
                " batch size configured using AbstractSpectrumDigitiser.configure_acquisition()."
            )
        self.execute_continuous_fifo_acquisition()
        measurements = []
        for _ in range(num_measurements // self.batch_size):
            measurements += [
                Measurement(waveforms=frame, timestamp=self.get_timestamp()) for frame in self.get_waveforms()
            ]
        self.stop()
        return measurements

Carry out a finite number of FIFO mode measurements and then stop the acquisitions.

This method automatically carries out a defined number of measurement in Multi FIFO mode, including handling the creation of a TransferBuffer, streaming the acquired waveforms to the PC, terminating the acquisition and returning the acquired waveforms. After being called, it will wait for the requested number of triggers to be received, generating the correct number of measurements. It retrieves each measurement's waveforms from the TransferBuffer as they arrive. Once the requested number of measurements have been received, the acquisition is terminated and the waveforms are returned. The device must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode.

Args
  • num_measurements (int): The number of measurements to carry out.
Returns

measurements (List[Measurement]): A list of Measurement objects with length num_measurements. Each Measurement object has a waveforms attribute containing a list of 1D NumPy arrays. Each array is a waveform acquired from one channel. The arrays are in channel order. The Waveform objects also have a timestamp attribute, which (if timestamping was enabled in acquisition settings) contains the time at which the acquisition was triggered.

#   def execute_continuous_fifo_acquisition(self) -> None:
View Source
    def execute_continuous_fifo_acquisition(self) -> None:
        """Start a continuous FIFO mode acquisition.

        This method automatically starts acquiring and streaming samples in FIFO mode, including handling the
        creation of a `TransferBuffer` and streaming the acquired waveforms to the PC. It will return almost
        instantaneously. The acquired waveforms must then be read out of the transfer buffer in a loop using the
        `get_waveforms()` method. Waveforms must be read at least as fast as they are being acquired.
        The FIFO acquisition and streaming will continue until `stop_acquisition()` is called. The device
        must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode."""
        if self._acquisition_mode not in (AcquisitionMode.SPC_REC_FIFO_MULTI, AcquisitionMode.SPC_REC_FIFO_AVERAGE):
            raise SpectrumWrongAcquisitionMode(
                "Set the acquisition mode to SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE using "
                "configure_acquisition() or set_acquisition_mode() before executing "
                "a fifo mode acquisition."
            )
        self.define_transfer_buffer()
        self.start()
        self.start_transfer()

Start a continuous FIFO mode acquisition.

This method automatically starts acquiring and streaming samples in FIFO mode, including handling the creation of a TransferBuffer and streaming the acquired waveforms to the PC. It will return almost instantaneously. The acquired waveforms must then be read out of the transfer buffer in a loop using the get_waveforms() method. Waveforms must be read at least as fast as they are being acquired. The FIFO acquisition and streaming will continue until stop_acquisition() is called. The device must be configured in SPC_REC_FIFO_MULTI or SPC_REC_FIFO_AVERAGE acquisition mode.

Inherited Members
AbstractSpectrumDevice
reset
start
stop
configure_trigger
configure_channel_pairing
write_to_spectrum_device_register
read_spectrum_device_register
spectrumdevice.devices.digitiser.digitiser_interface.SpectrumDigitiserInterface
wait_for_acquisition_to_complete
get_waveforms
get_timestamp
enable_timestamping
acquisition_length_in_samples
set_acquisition_length_in_samples
post_trigger_length_in_samples
set_post_trigger_length_in_samples
acquisition_mode
set_acquisition_mode
batch_size
set_batch_size
spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
connected
reconnect
status
start_transfer
stop_transfer
wait_for_transfer_chunk_to_complete
disconnect
transfer_buffers
define_transfer_buffer
analog_channels
io_lines
enabled_analog_channel_nums
set_enabled_analog_channels
trigger_sources
set_trigger_sources
external_trigger_mode
set_external_trigger_mode
external_trigger_level_in_mv
set_external_trigger_level_in_mv
external_trigger_pulse_width_in_samples
set_external_trigger_pulse_width_in_samples
apply_channel_enabling
clock_mode
set_clock_mode
sample_rate_in_hz
set_sample_rate_in_hz
available_io_modes
feature_list
timeout_in_ms
set_timeout_in_ms
force_trigger
bytes_per_sample
type
model_number
#   class AbstractSpectrumStarHub(spectrumdevice.AbstractSpectrumDevice, typing.Generic[~CardType, ~AnalogChannelInterfaceType, ~IOLineInterfaceType], abc.ABC):
View Source
class AbstractSpectrumStarHub(
    AbstractSpectrumDevice, Generic[CardType, AnalogChannelInterfaceType, IOLineInterfaceType], ABC
):
    """Composite abstract class of `AbstractSpectrumCard` implementing methods common to all StarHubs. StarHubs are
    composites of more than one Spectrum card. Acquisition and generation from the child cards of a StarHub
    is synchronised, aggregating the channels of all child cards."""

    def __init__(self, device_number: int, child_cards: Sequence[CardType], master_card_index: int, **kwargs: Any):
        """
        Args:
            device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
            child_cards (Sequence[`SpectrumDeviceInterface`]): A list of objects representing the child cards located
                within the StarHub, correctly constructed with their IP addresses and/or device numbers.
            master_card_index (int): The position within child_cards where the master card (the card which controls the
                clock) is located.
        """
        self._child_cards: Sequence[CardType] = child_cards
        self._master_card = child_cards[master_card_index]
        self._triggering_card = child_cards[master_card_index]
        child_card_logical_indices = (2**n for n in range(len(self._child_cards)))
        self._visa_string = f"sync{device_number}"
        self._connect(self._visa_string)
        all_cards_binary_mask = reduce(or_, child_card_logical_indices)
        self.write_to_spectrum_device_register(SPC_SYNC_ENABLEMASK, all_cards_binary_mask)

    def disconnect(self) -> None:
        """Disconnects from each child card and terminates connection to the hub itself."""
        if self._connected:
            destroy_handle(self._handle)
        for card in self._child_cards:
            card.disconnect()
        self._connected = False

    def reconnect(self) -> None:
        """Reconnects to the hub after a `disconnect()`, and reconnects to each child card."""
        self._connect(self._visa_string)
        for card in self._child_cards:
            card.reconnect()

    @property
    def status(self) -> DEVICE_STATUS_TYPE:
        """The statuses of each child card, in a list. See `SpectrumDigitiserCard.status` for more information.
        Returns:
            statuses (List[List[`CardStatus`]]): A list of lists of `CardStatus` (each card has a list of statuses).
        """
        return DEVICE_STATUS_TYPE([card.status[0] for card in self._child_cards])

    def start_transfer(self) -> None:
        """Start the transfer of data between the on-device buffer of each child card and its `TransferBuffer`. See
        `AbstractSpectrumCard.start_transfer()` for more information."""
        for card in self._child_cards:
            card.start_transfer()

    def stop_transfer(self) -> None:
        """Stop the transfer of data between each card and its `TransferBuffer`. See
        `AbstractSpectrumCard.stop_transfer()` for more information."""
        for card in self._child_cards:
            card.stop_transfer()

    def wait_for_transfer_chunk_to_complete(self) -> None:
        """Wait for all cards to stop transferring data to/from their `TransferBuffers`. See
        `AbstractSpectrumCard.wait_for_transfer_to_complete()` for more information."""
        for card in self._child_cards:
            card.wait_for_transfer_chunk_to_complete()

    @property
    def connected(self) -> bool:
        """True if the StarHub is connected, False if not."""
        return self._connected

    def set_triggering_card(self, card_index: int) -> None:
        """Change the index of the child card responsible for receiving a trigger. During construction, this is set
        equal to the index of the master card but in some situations it may be necessary to change it.

        Args:
            card_index (int): The index of the StarHub's triggering card within the list of child cards provided on
                __init__().
        """
        self._triggering_card = self._child_cards[card_index]

    @property
    def clock_mode(self) -> ClockMode:
        """The clock mode currently configured on the master card.

        Returns:
            mode (`ClockMode`): The currently configured clock mode."""
        return self._master_card.clock_mode

    def set_clock_mode(self, mode: ClockMode) -> None:
        """Change the clock mode configured on the master card.

        Args:
            mode (`ClockMode`): The desired clock mode."""
        self._master_card.set_clock_mode(mode)

    @property
    def sample_rate_in_hz(self) -> int:
        """The sample rate configured on the master card.

        Returns:
            rate (int): The current sample rate of the master card in Hz.
        """
        return self._master_card.sample_rate_in_hz

    def set_sample_rate_in_hz(self, rate: int) -> None:
        """Change the sample rate of the child cards (including the master card).
        Args:
            rate (int): The desired sample rate of the child cards in Hz.
        """
        for card in self._child_cards:
            card.set_sample_rate_in_hz(rate)

    @property
    def trigger_sources(self) -> List[TriggerSource]:
        """The trigger sources configured on the triggering card, which by default is the master card. See
        `AbstractSpectrumCard.trigger_sources()` for more information.

        Returns:
            sources (List[`TriggerSource`]): A list of the currently enabled trigger sources."""
        return self._triggering_card.trigger_sources

    def set_trigger_sources(self, sources: List[TriggerSource]) -> None:
        """Change the trigger sources configured on the triggering card, which by default is the master card. See
        `AbstractSpectrumCard.trigger_sources()` for more information.

        Args:
            sources (List[`TriggerSource`]): The trigger sources to enable, in a list."""
        self._triggering_card.set_trigger_sources(sources)
        for card in self._child_cards:
            if card is not self._triggering_card:
                card.set_trigger_sources([TriggerSource.SPC_TMASK_NONE])

    @property
    def external_trigger_mode(self) -> ExternalTriggerMode:
        """The trigger mode configured on the triggering card, which by default is the master card. See
        `AbstractSpectrumCard.external_trigger_mode()` for more information.

        Returns:
            mode (`ExternalTriggerMode`): The currently set external trigger mode.
        """
        return self._triggering_card.external_trigger_mode

    def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None:
        """Change the trigger mode configured on the triggering card, which by default is the master card. See
        `AbstractSpectrumCard.set_external_trigger_mode()` for more information.

        Args:
            mode (`ExternalTriggerMode`): The desired external trigger mode."""
        self._triggering_card.set_external_trigger_mode(mode)

    @property
    def external_trigger_level_in_mv(self) -> int:
        """The external trigger level configured on the triggering card, which by default is the master card. See
        `AbstractSpectrumCard.external_trigger_level_mv()` for more information.

        Returns:
            level (int): The external trigger level in mV.
        """
        return self._triggering_card.external_trigger_level_in_mv

    def set_external_trigger_level_in_mv(self, level: int) -> None:
        """Change the external trigger level configured on the triggering card, which by default is the master card.
        See `AbstractSpectrumCard.set_external_trigger_level_mv()` for more information.

        Args:
            level (int): The desired external trigger level in mV.
        """
        self._triggering_card.set_external_trigger_level_in_mv(level)

    @property
    def external_trigger_pulse_width_in_samples(self) -> int:
        """The trigger pulse width (samples) configured on the triggering card, which by default is the master card.
        See `AbstractSpectrumCard.external_trigger_pulse_width_in_samples()` for more information.

        Returns:
            width (int): The current trigger pulse width in samples.
        """
        return self._triggering_card.external_trigger_pulse_width_in_samples

    def set_external_trigger_pulse_width_in_samples(self, width: int) -> None:
        """Change the trigger pulse width (samples) configured on the triggering card, which by default is the master
        card. See `AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()` for more information.

        Args:
            width (int): The desired trigger pulse width in samples.
        """
        self._triggering_card.set_external_trigger_pulse_width_in_samples(width)

    def apply_channel_enabling(self) -> None:
        """Apply the enabled channels chosen using `set_enable_channels()`. This happens automatically and does not
        usually need to be called."""
        for d in self._child_cards:
            d.apply_channel_enabling()

    @property
    def enabled_analog_channel_nums(self) -> List[int]:
        """The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total
        number of channels available to the hub).

        Returns:
            channel_nums (List[int]): The currently enabled channel indices.
        """
        enabled_channels = []
        n_channels_in_previous_card = 0
        for card in self._child_cards:
            enabled_channels += [
                channel_num + n_channels_in_previous_card for channel_num in card.enabled_analog_channel_nums
            ]
            n_channels_in_previous_card = len(card.analog_channels)
        return enabled_channels

    def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
        """Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total
        number of channels available to the hub).

        Returns:
            channel_nums (List[int]): The indices to enable.
        """
        channels_nums.sort()
        channels_to_enable_all_cards = channels_nums

        for child_card in self._child_cards:
            n_channels_in_card = len(child_card.analog_channels)
            channels_to_enable_this_card = list(set(arange(n_channels_in_card)) & set(channels_to_enable_all_cards))
            num_channels_to_enable_this_card = len(channels_to_enable_this_card)
            child_card.set_enabled_analog_channels(channels_to_enable_this_card)
            channels_to_enable_all_cards = [
                num - n_channels_in_card for num in channels_nums[num_channels_to_enable_this_card:]
            ]

    @property
    def transfer_buffers(self) -> List[TransferBuffer]:
        """The `TransferBuffer`s of all the child cards of the hub. See `AbstractSpectrumCard.transfer_buffers` for more
        information.

        Returns:
            buffers (List[`TransferBuffer`]): A list of the transfer buffers for each child card."""
        return [card.transfer_buffers[0] for card in self._child_cards]

    @property
    def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]:
        """A tuple containing of all the channels of the child cards of the hub. See `AbstractSpectrumCard.channels` for
        more information.

        Returns:
            channels (Sequence[`SpectrumChannelInterface`]): A tuple of `SpectrumDigitiserChannel` objects.
        """
        channels: List[AnalogChannelInterfaceType] = []
        for device in self._child_cards:
            channels += device.analog_channels
        return tuple(channels)

    @property
    def io_lines(self) -> Sequence[IOLineInterfaceType]:
        """A tuple containing of all the Multipurpose IO Lines of the child cards of the hub.

        Returns:
            channels (Sequence[`SpectrumIOLineInterface`]): A tuple of `SpectrumIOLineInterface` objects.
        """
        io_lines: List[IOLineInterfaceType] = []
        for device in self._child_cards:
            io_lines += device.io_lines
        return tuple(io_lines)  # todo: this is probably wrong. I don't think both cards in a netbox have IO lines

    @property
    def timeout_in_ms(self) -> int:
        """The time for which the card will wait for a trigger to be received after a device has started
        before returning an error. This should be the same for all child cards. If it's not, an exception is raised.

        Returns:
            timeout_ms (int): The currently set timeout in ms.
        """
        timeouts = []
        for d in self._child_cards:
            timeouts.append(d.timeout_in_ms)
        return check_settings_constant_across_devices(timeouts, __name__)

    def set_timeout_in_ms(self, timeout_ms: int) -> None:
        """Change the timeout value for all child cards.

        Args:
            timeout_ms (int): The desired timeout setting in seconds."""
        for d in self._child_cards:
            d.set_timeout_in_ms(timeout_ms)

    @property
    def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]:
        """Get a list of the features of the child cards. See `CardFeature`, `AdvancedCardFeature` and the Spectrum
        documentation for more information.

        Returns:
            features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A list of tuples, one per child
                card. Each tuple contains a list of features and a list of advanced features for that card.
        """
        return [card.feature_list[0] for card in self._child_cards]

    @property
    def available_io_modes(self) -> AvailableIOModes:
        """For each multipurpose IO line on the master card, read the available modes. See `IOLineMode` and the Spectrum
        Documentation for all possible available modes and their meanings.

        Returns:
            modes (AvailableIOModes): An `AvailableIOModes` dataclass containing the modes available for each IO line.
        """
        return self._master_card.available_io_modes

    @property
    def bytes_per_sample(self) -> int:
        bytes_per_sample_each_card = []
        for d in self._child_cards:
            bytes_per_sample_each_card.append(d.bytes_per_sample)
        return check_settings_constant_across_devices(bytes_per_sample_each_card, __name__)

    def __str__(self) -> str:
        return f"StarHub {self._visa_string}"

Composite abstract class of AbstractSpectrumCard implementing methods common to all StarHubs. StarHubs are composites of more than one Spectrum card. Acquisition and generation from the child cards of a StarHub is synchronised, aggregating the channels of all child cards.

#   AbstractSpectrumStarHub( device_number: int, child_cards: Sequence[~CardType], master_card_index: int, **kwargs: Any )
View Source
    def __init__(self, device_number: int, child_cards: Sequence[CardType], master_card_index: int, **kwargs: Any):
        """
        Args:
            device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
            child_cards (Sequence[`SpectrumDeviceInterface`]): A list of objects representing the child cards located
                within the StarHub, correctly constructed with their IP addresses and/or device numbers.
            master_card_index (int): The position within child_cards where the master card (the card which controls the
                clock) is located.
        """
        self._child_cards: Sequence[CardType] = child_cards
        self._master_card = child_cards[master_card_index]
        self._triggering_card = child_cards[master_card_index]
        child_card_logical_indices = (2**n for n in range(len(self._child_cards)))
        self._visa_string = f"sync{device_number}"
        self._connect(self._visa_string)
        all_cards_binary_mask = reduce(or_, child_card_logical_indices)
        self.write_to_spectrum_device_register(SPC_SYNC_ENABLEMASK, all_cards_binary_mask)
Args
  • device_number (int): The index of the StarHub to connect to. If only one StarHub is present, set to 0.
  • child_cards (Sequence[SpectrumDeviceInterface]): A list of objects representing the child cards located within the StarHub, correctly constructed with their IP addresses and/or device numbers.
  • master_card_index (int): The position within child_cards where the master card (the card which controls the clock) is located.
#   def disconnect(self) -> None:
View Source
    def disconnect(self) -> None:
        """Disconnects from each child card and terminates connection to the hub itself."""
        if self._connected:
            destroy_handle(self._handle)
        for card in self._child_cards:
            card.disconnect()
        self._connected = False

Disconnects from each child card and terminates connection to the hub itself.

#   def reconnect(self) -> None:
View Source
    def reconnect(self) -> None:
        """Reconnects to the hub after a `disconnect()`, and reconnects to each child card."""
        self._connect(self._visa_string)
        for card in self._child_cards:
            card.reconnect()

Reconnects to the hub after a disconnect(), and reconnects to each child card.

The statuses of each child card, in a list. See SpectrumDigitiserCard.status for more information.

Returns

statuses (List[List[CardStatus]]): A list of lists of CardStatus (each card has a list of statuses).

#   def start_transfer(self) -> None:
View Source
    def start_transfer(self) -> None:
        """Start the transfer of data between the on-device buffer of each child card and its `TransferBuffer`. See
        `AbstractSpectrumCard.start_transfer()` for more information."""
        for card in self._child_cards:
            card.start_transfer()

Start the transfer of data between the on-device buffer of each child card and its TransferBuffer. See AbstractSpectrumCard.start_transfer() for more information.

#   def stop_transfer(self) -> None:
View Source
    def stop_transfer(self) -> None:
        """Stop the transfer of data between each card and its `TransferBuffer`. See
        `AbstractSpectrumCard.stop_transfer()` for more information."""
        for card in self._child_cards:
            card.stop_transfer()

Stop the transfer of data between each card and its TransferBuffer. See AbstractSpectrumCard.stop_transfer() for more information.

#   def wait_for_transfer_chunk_to_complete(self) -> None:
View Source
    def wait_for_transfer_chunk_to_complete(self) -> None:
        """Wait for all cards to stop transferring data to/from their `TransferBuffers`. See
        `AbstractSpectrumCard.wait_for_transfer_to_complete()` for more information."""
        for card in self._child_cards:
            card.wait_for_transfer_chunk_to_complete()

Wait for all cards to stop transferring data to/from their TransferBuffers. See AbstractSpectrumCard.wait_for_transfer_to_complete() for more information.

#   connected: bool

True if the StarHub is connected, False if not.

#   def set_triggering_card(self, card_index: int) -> None:
View Source
    def set_triggering_card(self, card_index: int) -> None:
        """Change the index of the child card responsible for receiving a trigger. During construction, this is set
        equal to the index of the master card but in some situations it may be necessary to change it.

        Args:
            card_index (int): The index of the StarHub's triggering card within the list of child cards provided on
                __init__().
        """
        self._triggering_card = self._child_cards[card_index]

Change the index of the child card responsible for receiving a trigger. During construction, this is set equal to the index of the master card but in some situations it may be necessary to change it.

Args
  • card_index (int): The index of the StarHub's triggering card within the list of child cards provided on __init__().

The clock mode currently configured on the master card.

Returns

mode (ClockMode): The currently configured clock mode.

#   def set_clock_mode(self, mode: spectrumdevice.settings.device_modes.ClockMode) -> None:
View Source
    def set_clock_mode(self, mode: ClockMode) -> None:
        """Change the clock mode configured on the master card.

        Args:
            mode (`ClockMode`): The desired clock mode."""
        self._master_card.set_clock_mode(mode)

Change the clock mode configured on the master card.

Args
  • mode (ClockMode): The desired clock mode.
#   sample_rate_in_hz: int

The sample rate configured on the master card.

Returns

rate (int): The current sample rate of the master card in Hz.

#   def set_sample_rate_in_hz(self, rate: int) -> None:
View Source
    def set_sample_rate_in_hz(self, rate: int) -> None:
        """Change the sample rate of the child cards (including the master card).
        Args:
            rate (int): The desired sample rate of the child cards in Hz.
        """
        for card in self._child_cards:
            card.set_sample_rate_in_hz(rate)

Change the sample rate of the child cards (including the master card).

Args
  • rate (int): The desired sample rate of the child cards in Hz.

The trigger sources configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.trigger_sources() for more information.

Returns

sources (List[TriggerSource]): A list of the currently enabled trigger sources.

#   def set_trigger_sources( self, sources: List[spectrumdevice.settings.triggering.TriggerSource] ) -> None:
View Source
    def set_trigger_sources(self, sources: List[TriggerSource]) -> None:
        """Change the trigger sources configured on the triggering card, which by default is the master card. See
        `AbstractSpectrumCard.trigger_sources()` for more information.

        Args:
            sources (List[`TriggerSource`]): The trigger sources to enable, in a list."""
        self._triggering_card.set_trigger_sources(sources)
        for card in self._child_cards:
            if card is not self._triggering_card:
                card.set_trigger_sources([TriggerSource.SPC_TMASK_NONE])

Change the trigger sources configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.trigger_sources() for more information.

Args
  • sources (List[TriggerSource]): The trigger sources to enable, in a list.

The trigger mode configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.external_trigger_mode() for more information.

Returns

mode (ExternalTriggerMode): The currently set external trigger mode.

#   def set_external_trigger_mode( self, mode: spectrumdevice.settings.triggering.ExternalTriggerMode ) -> None:
View Source
    def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None:
        """Change the trigger mode configured on the triggering card, which by default is the master card. See
        `AbstractSpectrumCard.set_external_trigger_mode()` for more information.

        Args:
            mode (`ExternalTriggerMode`): The desired external trigger mode."""
        self._triggering_card.set_external_trigger_mode(mode)

Change the trigger mode configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.set_external_trigger_mode() for more information.

Args
  • mode (ExternalTriggerMode): The desired external trigger mode.
#   external_trigger_level_in_mv: int

The external trigger level configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.external_trigger_level_mv() for more information.

Returns

level (int): The external trigger level in mV.

#   def set_external_trigger_level_in_mv(self, level: int) -> None:
View Source
    def set_external_trigger_level_in_mv(self, level: int) -> None:
        """Change the external trigger level configured on the triggering card, which by default is the master card.
        See `AbstractSpectrumCard.set_external_trigger_level_mv()` for more information.

        Args:
            level (int): The desired external trigger level in mV.
        """
        self._triggering_card.set_external_trigger_level_in_mv(level)

Change the external trigger level configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.set_external_trigger_level_mv() for more information.

Args
  • level (int): The desired external trigger level in mV.
#   external_trigger_pulse_width_in_samples: int

The trigger pulse width (samples) configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.external_trigger_pulse_width_in_samples() for more information.

Returns

width (int): The current trigger pulse width in samples.

#   def set_external_trigger_pulse_width_in_samples(self, width: int) -> None:
View Source
    def set_external_trigger_pulse_width_in_samples(self, width: int) -> None:
        """Change the trigger pulse width (samples) configured on the triggering card, which by default is the master
        card. See `AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples()` for more information.

        Args:
            width (int): The desired trigger pulse width in samples.
        """
        self._triggering_card.set_external_trigger_pulse_width_in_samples(width)

Change the trigger pulse width (samples) configured on the triggering card, which by default is the master card. See AbstractSpectrumCard.set_external_trigger_pulse_width_in_samples() for more information.

Args
  • width (int): The desired trigger pulse width in samples.
#   def apply_channel_enabling(self) -> None:
View Source
    def apply_channel_enabling(self) -> None:
        """Apply the enabled channels chosen using `set_enable_channels()`. This happens automatically and does not
        usually need to be called."""
        for d in self._child_cards:
            d.apply_channel_enabling()

Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not usually need to be called.

#   enabled_analog_channel_nums: List[int]

The currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub).

Returns

channel_nums (List[int]): The currently enabled channel indices.

#   def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
View Source
    def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
        """Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total
        number of channels available to the hub).

        Returns:
            channel_nums (List[int]): The indices to enable.
        """
        channels_nums.sort()
        channels_to_enable_all_cards = channels_nums

        for child_card in self._child_cards:
            n_channels_in_card = len(child_card.analog_channels)
            channels_to_enable_this_card = list(set(arange(n_channels_in_card)) & set(channels_to_enable_all_cards))
            num_channels_to_enable_this_card = len(channels_to_enable_this_card)
            child_card.set_enabled_analog_channels(channels_to_enable_this_card)
            channels_to_enable_all_cards = [
                num - n_channels_in_card for num in channels_nums[num_channels_to_enable_this_card:]
            ]

Change the currently enabled channel indices, indexed over the whole hub (from 0 to N-1, where N is the total number of channels available to the hub).

Returns

channel_nums (List[int]): The indices to enable.

The TransferBuffers of all the child cards of the hub. See AbstractSpectrumCard.transfer_buffers for more information.

Returns

buffers (List[TransferBuffer]): A list of the transfer buffers for each child card.

#   analog_channels: Sequence[~AnalogChannelInterfaceType]

A tuple containing of all the channels of the child cards of the hub. See AbstractSpectrumCard.channels for more information.

Returns

channels (Sequence[SpectrumChannelInterface]): A tuple of SpectrumDigitiserChannel objects.

#   io_lines: Sequence[~IOLineInterfaceType]

A tuple containing of all the Multipurpose IO Lines of the child cards of the hub.

Returns

channels (Sequence[SpectrumIOLineInterface]): A tuple of SpectrumIOLineInterface objects.

#   timeout_in_ms: int

The time for which the card will wait for a trigger to be received after a device has started before returning an error. This should be the same for all child cards. If it's not, an exception is raised.

Returns

timeout_ms (int): The currently set timeout in ms.

#   def set_timeout_in_ms(self, timeout_ms: int) -> None:
View Source
    def set_timeout_in_ms(self, timeout_ms: int) -> None:
        """Change the timeout value for all child cards.

        Args:
            timeout_ms (int): The desired timeout setting in seconds."""
        for d in self._child_cards:
            d.set_timeout_in_ms(timeout_ms)

Change the timeout value for all child cards.

Args
  • timeout_ms (int): The desired timeout setting in seconds.

Get a list of the features of the child cards. See CardFeature, AdvancedCardFeature and the Spectrum documentation for more information.

Returns

features (List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]): A list of tuples, one per child card. Each tuple contains a list of features and a list of advanced features for that card.

For each multipurpose IO line on the master card, read the available modes. See IOLineMode and the Spectrum Documentation for all possible available modes and their meanings.

Returns

modes (AvailableIOModes): An AvailableIOModes dataclass containing the modes available for each IO line.

#   bytes_per_sample: int
Inherited Members
AbstractSpectrumDevice
reset
start
stop
configure_trigger
configure_channel_pairing
write_to_spectrum_device_register
read_spectrum_device_register
spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
define_transfer_buffer
force_trigger
type
model_number
View Source
class AbstractSpectrumCard(AbstractSpectrumDevice[AnalogChannelInterfaceType, IOLineInterfaceType], ABC):
    """Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices)."""

    def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any):
        """
        Args:
            device_number (int): Index of the card to control. If only one card is present, set to 0.
            ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.

        """
        super().__init__()  # required for proper MRO resolution
        if ip_address is not None:
            self._visa_string = _create_visa_string_from_ip(ip_address, device_number)
        else:
            self._visa_string = f"/dev/spcm{device_number}"
        self._connect(self._visa_string)
        self._model_number = ModelNumber(self.read_spectrum_device_register(SPC_PCITYP))
        self._trigger_sources: List[TriggerSource] = []
        self._analog_channels = self._init_analog_channels()
        self._io_lines = self._init_io_lines()
        self._enabled_analog_channels: List[int] = [0]
        self._transfer_buffer: Optional[TransferBuffer] = None
        self.apply_channel_enabling()

    @property
    def model_number(self) -> ModelNumber:
        return self._model_number

    def reconnect(self) -> None:
        """Reconnect to the card after disconnect() has been called."""
        self._connect(self._visa_string)

    @property
    def status(self) -> DEVICE_STATUS_TYPE:
        """Read the current status of the card.
        Returns:
            Statuses (`List[List[StatusCode]]`): A length-1 list containing a list of `StatusCode` Enums describing the
            current acquisition status of the card. See `StatusCode` (and the Spectrum documentation) for the list off
            possible acquisition statuses.
        """
        return [decode_status(self.read_spectrum_device_register(SPC_M2STATUS))]

    def start_transfer(self) -> None:
        """Transfer between the on-device buffer and the `TransferBuffer`.

        Requires that a `TransferBuffer` has been defined (see `define_transfer_buffer()`).

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), `start_transfer()` should be called after each
        acquisition has completed to transfer the acquired waveforms from the device to the `TransferBuffer`.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), `start_transfer()` should be called immediately after
        `start()` has been called, so that the waveform data can be continuously streamed into the transfer buffer as it
        is acquired.

        # todo: docstring for AWG transfers
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STARTDMA)

    def stop_transfer(self) -> None:
        """Stop the transfer of data between the on-device buffer and the `TransferBuffer`.

        Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this
        method is rarely required. It may invalidate transferred samples.

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired
        samples have been transferred, so `stop_transfer()` should not be used. Instead, call
        `wait_for_transfer_to_complete()` after `start_transfer()`.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition,
        and transfer will automatically stop when `stop()` is called as there will be no more
        samples to transfer, so `stop_transfer()` should not be used.

        # todo: docstring for AWG
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STOPDMA)

    def wait_for_transfer_chunk_to_complete(self) -> None:
        """Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is
        complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer.

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method
        returns, all acquired waveforms have been transferred from the on-device buffer to the `TransferBuffer` and can
        be read using the `get_waveforms()` method.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms().

        # todo: update the above docstring to take into account cases where notify size < data lemgth
        # todo: docstring for AWG
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_WAITDMA)

    @property
    def transfer_buffers(self) -> List[TransferBuffer]:
        """Return the `TransferBuffer` configured for transferring data between the card and the software.

        Returns:
            buffer (List[`TransferBuffer`]): A length-1 list containing the `TransferBuffer` object. Any data within
                the `TransferBuffer` can be accessed using its own interface, but the samples are stored as a 1D array,
                with the samples of each channel interleaved as per the Spectrum user manual. For digitisers, it is more
                convenient to read waveform data using the `get_waveforms()` method.
        """
        if self._transfer_buffer is not None:
            return [self._transfer_buffer]
        else:
            raise SpectrumNoTransferBufferDefined("Cannot find TransferBuffer.")

    def disconnect(self) -> None:
        """Terminate the connection to the card."""
        if self.connected:
            destroy_handle(self._handle)
            self._connected = False

    @property
    def connected(self) -> bool:
        """Returns True if a card is currently connected, False if not."""
        return self._connected

    def __eq__(self, other: object) -> bool:
        if isinstance(other, self.__class__):
            return self._handle == other._handle
        else:
            raise NotImplementedError(f"Cannot compare {self.__class__} with {other.__class__}")

    @property
    def analog_channels(self) -> Sequence[AnalogChannelInterfaceType]:
        """A tuple containing the channels that belong to the card.

        Properties of the individual channels can be set by calling the methods of the
            returned objects directly. See `SpectrumDigitiserChannel` and `SpectrumAWGChannel` for more information.

        Returns:
            channels (Sequence[`SpectrumChannelInterface`]): A tuple of objects conforming to the
            `SpectrumChannelInterface` interface.
        """
        return self._analog_channels

    @property
    def io_lines(self) -> Sequence[IOLineInterfaceType]:
        """A tuple containing the Multipurpose IO Lines that belong to the card.

        Properties of the individual channels can be set by calling the methods of the
            returned objects directly.

        Returns:
            channels (Sequence[`SpectrumIOLineInterface`]): A tuple of objects conforming to the
            `SpectrumIOLineInterface` interface.
        """
        return self._io_lines

    @property
    def enabled_analog_channel_nums(self) -> List[int]:
        """The indices of the currently enabled channels.
        Returns:
            enabled_channels (List[int]): The indices of the currently enabled channels.
        """
        return self._enabled_analog_channels

    def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
        """Change which channels are enabled.

        Args:
            channels_nums (List[int]): The integer channel indices to enable.
        """
        if len(channels_nums) in [1, 2, 4, 8]:
            self._enabled_analog_channels = channels_nums
            self.apply_channel_enabling()
        else:
            raise SpectrumInvalidNumberOfEnabledChannels(f"{len(channels_nums)} cannot be enabled at once.")

    @property
    def trigger_sources(self) -> List[TriggerSource]:
        """The currently enabled trigger sources

        Returns:
            sources (List[`TriggerSource`]): A list of TriggerSources.
        """
        or_of_sources = self.read_spectrum_device_register(SPC_TRIG_ORMASK)
        self._trigger_sources = decode_trigger_sources(or_of_sources)
        return self._trigger_sources

    def set_trigger_sources(self, sources: List[TriggerSource]) -> None:
        """Change the enabled trigger sources.

        Args:
            sources (List[`TriggerSource`]): The TriggerSources to enable.
        """
        self._trigger_sources = sources
        or_of_sources = reduce(or_, [s.value for s in sources])
        self.write_to_spectrum_device_register(SPC_TRIG_ORMASK, or_of_sources)
        self.write_to_spectrum_device_register(SPC_TRIG_ANDMASK, 0)

    @property
    def external_trigger_mode(self) -> ExternalTriggerMode:
        """The currently enabled external trigger mode. An external trigger source must be enabled.

        Returns:
            sources (`ExternalTriggerMode`): The currently enabled `ExternalTriggerMode`."""
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger mode.")
        else:
            first_trig_source = self._active_external_triggers[0]
            try:
                return ExternalTriggerMode(
                    self.read_spectrum_device_register(EXTERNAL_TRIGGER_MODE_COMMANDS[first_trig_source.value])
                )
            except KeyError:
                raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger mode of {first_trig_source.name}.")

    def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None:
        """Change the currently enabled trigger mode. An external trigger source must be enabled.

        Args:
            mode (`ExternalTriggerMode`): The `ExternalTriggerMode` to apply.
        """
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger mode.")
        else:
            for trigger_source in self._active_external_triggers:
                try:
                    self.write_to_spectrum_device_register(
                        EXTERNAL_TRIGGER_MODE_COMMANDS[trigger_source.value], mode.value
                    )
                except KeyError:
                    raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger mode of {trigger_source.name}.")

    @property
    def _active_external_triggers(self) -> List[TriggerSource]:
        return [
            TriggerSource(val)
            for val in list(
                set(EXTERNAL_TRIGGER_MODE_COMMANDS.keys()) & set([source.value for source in self._trigger_sources])
            )
        ]

    @property
    def external_trigger_level_in_mv(self) -> int:
        """The signal level (mV) needed to trigger an event using an external trigger source. An external
        trigger source must be enabled.

        Returns:
            level (int): The currently set trigger level in mV.
        """
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger level.")
        else:
            first_trig_source = self._active_external_triggers[0]
            try:
                return self.read_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[first_trig_source.value])
            except KeyError:
                raise SpectrumTriggerOperationNotImplemented(f"Cannot get trigger level of {first_trig_source.name}.")

    def set_external_trigger_level_in_mv(self, level: int) -> None:
        """Change the signal level (mV) needed to trigger an event using an external trigger source. An external
        trigger source must be enabled.

        Args:
            level (int): The trigger level to set in mV.
        """
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger level.")
        else:
            for trigger_source in self._active_external_triggers:
                try:
                    self.write_to_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[trigger_source.value], level)
                except KeyError:
                    raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger level of {trigger_source.name}.")

    @property
    def external_trigger_pulse_width_in_samples(self) -> int:
        """The pulse width (in samples) needed to trigger an event using an external trigger source, if
        SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be
        enabled.

        Returns:
            width (int): The current trigger pulse width in samples.
        """
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot get external trigger pulse width.")
        else:
            first_trig_source = self._active_external_triggers[0]
            try:
                return self.read_spectrum_device_register(
                    EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[first_trig_source.value]
                )
            except KeyError:
                raise SpectrumTriggerOperationNotImplemented(f"Cannot get pulse width of {first_trig_source.name}.")

    def set_external_trigger_pulse_width_in_samples(self, width: int) -> None:
        """Change the pulse width (samples) needed to trigger an event using an external trigger source if
        SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be
        enabled.

        Args:
            width (int): The trigger pulse width to set, in samples."""
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger pulse width.")
        else:
            for trigger_source in self._active_external_triggers:
                try:
                    self.write_to_spectrum_device_register(
                        EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[trigger_source.value], width
                    )
                except KeyError:
                    raise SpectrumTriggerOperationNotImplemented(f"Cannot set pulse width of {trigger_source.name}.")

    def apply_channel_enabling(self) -> None:
        """Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not
        usually need to be called."""
        enabled_channel_spectrum_values = [self.analog_channels[i].name.value for i in self._enabled_analog_channels]
        if len(enabled_channel_spectrum_values) in [1, 2, 4, 8]:
            bitwise_or_of_enabled_channels = reduce(or_, enabled_channel_spectrum_values)
            self.write_to_spectrum_device_register(SPC_CHENABLE, bitwise_or_of_enabled_channels)
        else:
            raise SpectrumInvalidNumberOfEnabledChannels(
                f"Cannot enable {len(enabled_channel_spectrum_values)} " f"channels on one card."
            )

    @abstractmethod
    def _init_analog_channels(self) -> Sequence[AnalogChannelInterfaceType]:
        raise NotImplementedError()

    @abstractmethod
    def _init_io_lines(self) -> Sequence[IOLineInterfaceType]:
        raise NotImplementedError()

    @property
    def timeout_in_ms(self) -> int:
        """The time for which the card will wait for a trigger to be received after the device has been started
        before returning an error.

        Returns:
            timeout_in_ms (in)t: The currently set timeout in ms.
        """
        return self.read_spectrum_device_register(SPC_TIMEOUT)

    def set_timeout_in_ms(self, timeout_in_ms: int) -> None:
        """Change the time for which the card will wait for a trigger to tbe received after the device has started
        before returning an error.

        Args:
            timeout_in_ms (int): The desired timeout in ms.
        """
        self.write_to_spectrum_device_register(SPC_TIMEOUT, timeout_in_ms)

    @property
    def clock_mode(self) -> ClockMode:
        """The currently enabled clock mode.

        Returns:
            mode (`ClockMode`): The currently set clock mode.
        """
        return ClockMode(self.read_spectrum_device_register(SPC_CLOCKMODE))

    def set_clock_mode(self, mode: ClockMode) -> None:
        """Change the clock mode. See `ClockMode` and the Spectrum documentation for available modes.

        Args:
            mode (`ClockMode`): The desired clock mode.
        """
        self.write_to_spectrum_device_register(SPC_CLOCKMODE, mode.value)

    @property
    def available_io_modes(self) -> AvailableIOModes:
        """For each multipurpose IO line on the card, read the available modes. See IOLineMode and the Spectrum
        Documentation for all possible available modes and their meanings.

        Returns:
            modes (`AvailableIOModes`): An `AvailableIOModes` dataclass containing the modes for each IO line."""
        return AvailableIOModes(
            X0=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X0_AVAILMODES)),
            X1=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X1_AVAILMODES)),
            X2=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X2_AVAILMODES)),
            X3=decode_available_io_modes(self.read_spectrum_device_register(SPCM_X3_AVAILMODES)),
        )

    @property
    def feature_list(self) -> List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]:
        """Get a list of the features of the card. See `CardFeature`, `AdvancedCardFeature` and the Spectrum
        documentation for more information.

        Returns:
            features (List[Tuple[List[`CardFeature`], List[`AdvancedCardFeature`]]]): A tuple of two lists - of features
                and advanced features respectively - wrapped in a list.
        """
        normal_features = decode_card_features(self.read_spectrum_device_register(SPC_PCIFEATURES))
        advanced_features = decode_advanced_card_features(self.read_spectrum_device_register(SPC_PCIEXTFEATURES))
        return [(normal_features, advanced_features)]

    @property
    def sample_rate_in_hz(self) -> int:
        """The rate at which samples will be acquired or generated, in Hz.

        Returns:
            rate (int): The currently set sample rate in Hz.
        """
        return self.read_spectrum_device_register(SPC_SAMPLERATE, SpectrumRegisterLength.SIXTY_FOUR)

    def set_sample_rate_in_hz(self, rate: int) -> None:
        """Change the rate at which samples will be acquired or generated, in Hz.
        Args:
            rate (int): The desired sample rate in Hz.
        """
        self.write_to_spectrum_device_register(SPC_SAMPLERATE, rate, SpectrumRegisterLength.SIXTY_FOUR)

    def __str__(self) -> str:
        return f"Card {self._visa_string} (model {self.model_number.name})."

    @property
    def type(self) -> CardType:
        return CardType(self.read_spectrum_device_register(SPC_FNCTYPE))

    def force_trigger(self) -> None:
        """Force a trigger event to occur"""
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_FORCETRIGGER)

    @property
    def bytes_per_sample(self) -> int:
        return self.read_spectrum_device_register(SPC_MIINST_BYTESPERSAMPLE)

Abstract superclass implementing methods common to all individual "card" devices (as opposed to "hub" devices).

#   AbstractSpectrumCard(device_number: int, ip_address: Optional[str] = None, **kwargs: Any)
View Source
    def __init__(self, device_number: int, ip_address: Optional[str] = None, **kwargs: Any):
        """
        Args:
            device_number (int): Index of the card to control. If only one card is present, set to 0.
            ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.

        """
        super().__init__()  # required for proper MRO resolution
        if ip_address is not None:
            self._visa_string = _create_visa_string_from_ip(ip_address, device_number)
        else:
            self._visa_string = f"/dev/spcm{device_number}"
        self._connect(self._visa_string)
        self._model_number = ModelNumber(self.read_spectrum_device_register(SPC_PCITYP))
        self._trigger_sources: List[TriggerSource] = []
        self._analog_channels = self._init_analog_channels()
        self._io_lines = self._init_io_lines()
        self._enabled_analog_channels: List[int] = [0]
        self._transfer_buffer: Optional[TransferBuffer] = None
        self.apply_channel_enabling()
Args
  • device_number (int): Index of the card to control. If only one card is present, set to 0.
  • ip_address (Optional[str]): If connecting to a networked card, provide the IP address here as a string.
#   def reconnect(self) -> None:
View Source
    def reconnect(self) -> None:
        """Reconnect to the card after disconnect() has been called."""
        self._connect(self._visa_string)

Reconnect to the card after disconnect() has been called.

Read the current status of the card.

Returns

Statuses (List[List[StatusCode]]): A length-1 list containing a list of StatusCode Enums describing the current acquisition status of the card. See StatusCode (and the Spectrum documentation) for the list off possible acquisition statuses.

#   def start_transfer(self) -> None:
View Source
    def start_transfer(self) -> None:
        """Transfer between the on-device buffer and the `TransferBuffer`.

        Requires that a `TransferBuffer` has been defined (see `define_transfer_buffer()`).

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), `start_transfer()` should be called after each
        acquisition has completed to transfer the acquired waveforms from the device to the `TransferBuffer`.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), `start_transfer()` should be called immediately after
        `start()` has been called, so that the waveform data can be continuously streamed into the transfer buffer as it
        is acquired.

        # todo: docstring for AWG transfers
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STARTDMA)

Transfer between the on-device buffer and the TransferBuffer.

Requires that a TransferBuffer has been defined (see define_transfer_buffer()).

For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), start_transfer() should be called after each acquisition has completed to transfer the acquired waveforms from the device to the TransferBuffer.

For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), start_transfer() should be called immediately after start() has been called, so that the waveform data can be continuously streamed into the transfer buffer as it is acquired.

todo: docstring for AWG transfers

#   def stop_transfer(self) -> None:
View Source
    def stop_transfer(self) -> None:
        """Stop the transfer of data between the on-device buffer and the `TransferBuffer`.

        Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this
        method is rarely required. It may invalidate transferred samples.

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired
        samples have been transferred, so `stop_transfer()` should not be used. Instead, call
        `wait_for_transfer_to_complete()` after `start_transfer()`.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition,
        and transfer will automatically stop when `stop()` is called as there will be no more
        samples to transfer, so `stop_transfer()` should not be used.

        # todo: docstring for AWG
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_STOPDMA)

Stop the transfer of data between the on-device buffer and the TransferBuffer.

Transfer is usually stopped automatically when an acquisition or stream of acquisitions completes, so this method is rarely required. It may invalidate transferred samples.

For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), transfer will automatically stop once all acquired samples have been transferred, so stop_transfer() should not be used. Instead, call wait_for_transfer_to_complete() after start_transfer().

For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), samples are transferred continuously during acquisition, and transfer will automatically stop when stop() is called as there will be no more samples to transfer, so stop_transfer() should not be used.

todo: docstring for AWG

#   def wait_for_transfer_chunk_to_complete(self) -> None:
View Source
    def wait_for_transfer_chunk_to_complete(self) -> None:
        """Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is
        complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer.

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method
        returns, all acquired waveforms have been transferred from the on-device buffer to the `TransferBuffer` and can
        be read using the `get_waveforms()` method.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms().

        # todo: update the above docstring to take into account cases where notify size < data lemgth
        # todo: docstring for AWG
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_DATA_WAITDMA)

Blocks until the currently active transfer between the on-device buffer and the TransferBuffer is complete. This will be when there at least TransferBuffer.notify_size_in_pages pages available in the buffer.

For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), use after starting a transfer. Once the method returns, all acquired waveforms have been transferred from the on-device buffer to the TransferBuffer and can be read using the get_waveforms() method.

For digitisers in FIFO mode (SPC_REC_FIFO_MULTI) this method is internally used by get_waveforms().

todo: update the above docstring to take into account cases where notify size < data lemgth

todo: docstring for AWG

Return the TransferBuffer configured for transferring data between the card and the software.

Returns

buffer (List[TransferBuffer]): A length-1 list containing the TransferBuffer object. Any data within the TransferBuffer can be accessed using its own interface, but the samples are stored as a 1D array, with the samples of each channel interleaved as per the Spectrum user manual. For digitisers, it is more convenient to read waveform data using the get_waveforms() method.

#   def disconnect(self) -> None:
View Source
    def disconnect(self) -> None:
        """Terminate the connection to the card."""
        if self.connected:
            destroy_handle(self._handle)
            self._connected = False

Terminate the connection to the card.

#   connected: bool

Returns True if a card is currently connected, False if not.

#   analog_channels: Sequence[~AnalogChannelInterfaceType]

A tuple containing the channels that belong to the card.

Properties of the individual channels can be set by calling the methods of the returned objects directly. See SpectrumDigitiserChannel and SpectrumAWGChannel for more information.

Returns

channels (Sequence[SpectrumChannelInterface]): A tuple of objects conforming to the SpectrumChannelInterface interface.

#   io_lines: Sequence[~IOLineInterfaceType]

A tuple containing the Multipurpose IO Lines that belong to the card.

Properties of the individual channels can be set by calling the methods of the returned objects directly.

Returns

channels (Sequence[SpectrumIOLineInterface]): A tuple of objects conforming to the SpectrumIOLineInterface interface.

#   enabled_analog_channel_nums: List[int]

The indices of the currently enabled channels.

Returns

enabled_channels (List[int]): The indices of the currently enabled channels.

#   def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
View Source
    def set_enabled_analog_channels(self, channels_nums: List[int]) -> None:
        """Change which channels are enabled.

        Args:
            channels_nums (List[int]): The integer channel indices to enable.
        """
        if len(channels_nums) in [1, 2, 4, 8]:
            self._enabled_analog_channels = channels_nums
            self.apply_channel_enabling()
        else:
            raise SpectrumInvalidNumberOfEnabledChannels(f"{len(channels_nums)} cannot be enabled at once.")

Change which channels are enabled.

Args
  • channels_nums (List[int]): The integer channel indices to enable.

The currently enabled trigger sources

Returns

sources (List[TriggerSource]): A list of TriggerSources.

#   def set_trigger_sources( self, sources: List[spectrumdevice.settings.triggering.TriggerSource] ) -> None:
View Source
    def set_trigger_sources(self, sources: List[TriggerSource]) -> None:
        """Change the enabled trigger sources.

        Args:
            sources (List[`TriggerSource`]): The TriggerSources to enable.
        """
        self._trigger_sources = sources
        or_of_sources = reduce(or_, [s.value for s in sources])
        self.write_to_spectrum_device_register(SPC_TRIG_ORMASK, or_of_sources)
        self.write_to_spectrum_device_register(SPC_TRIG_ANDMASK, 0)

Change the enabled trigger sources.

Args
  • sources (List[TriggerSource]): The TriggerSources to enable.

The currently enabled external trigger mode. An external trigger source must be enabled.

Returns

sources (ExternalTriggerMode): The currently enabled ExternalTriggerMode.

#   def set_external_trigger_mode( self, mode: spectrumdevice.settings.triggering.ExternalTriggerMode ) -> None:
View Source
    def set_external_trigger_mode(self, mode: ExternalTriggerMode) -> None:
        """Change the currently enabled trigger mode. An external trigger source must be enabled.

        Args:
            mode (`ExternalTriggerMode`): The `ExternalTriggerMode` to apply.
        """
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger mode.")
        else:
            for trigger_source in self._active_external_triggers:
                try:
                    self.write_to_spectrum_device_register(
                        EXTERNAL_TRIGGER_MODE_COMMANDS[trigger_source.value], mode.value
                    )
                except KeyError:
                    raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger mode of {trigger_source.name}.")

Change the currently enabled trigger mode. An external trigger source must be enabled.

Args
  • mode (ExternalTriggerMode): The ExternalTriggerMode to apply.
#   external_trigger_level_in_mv: int

The signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled.

Returns

level (int): The currently set trigger level in mV.

#   def set_external_trigger_level_in_mv(self, level: int) -> None:
View Source
    def set_external_trigger_level_in_mv(self, level: int) -> None:
        """Change the signal level (mV) needed to trigger an event using an external trigger source. An external
        trigger source must be enabled.

        Args:
            level (int): The trigger level to set in mV.
        """
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger level.")
        else:
            for trigger_source in self._active_external_triggers:
                try:
                    self.write_to_spectrum_device_register(EXTERNAL_TRIGGER_LEVEL_COMMANDS[trigger_source.value], level)
                except KeyError:
                    raise SpectrumTriggerOperationNotImplemented(f"Cannot set trigger level of {trigger_source.name}.")

Change the signal level (mV) needed to trigger an event using an external trigger source. An external trigger source must be enabled.

Args
  • level (int): The trigger level to set in mV.
#   external_trigger_pulse_width_in_samples: int

The pulse width (in samples) needed to trigger an event using an external trigger source, if SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER ExternalTriggerMode is selected. An external trigger source must be enabled.

Returns

width (int): The current trigger pulse width in samples.

#   def set_external_trigger_pulse_width_in_samples(self, width: int) -> None:
View Source
    def set_external_trigger_pulse_width_in_samples(self, width: int) -> None:
        """Change the pulse width (samples) needed to trigger an event using an external trigger source if
        SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER `ExternalTriggerMode` is selected. An external trigger source must be
        enabled.

        Args:
            width (int): The trigger pulse width to set, in samples."""
        if len(self._active_external_triggers) == 0:
            raise SpectrumExternalTriggerNotEnabled("Cannot set external trigger pulse width.")
        else:
            for trigger_source in self._active_external_triggers:
                try:
                    self.write_to_spectrum_device_register(
                        EXTERNAL_TRIGGER_PULSE_WIDTH_COMMANDS[trigger_source.value], width
                    )
                except KeyError:
                    raise SpectrumTriggerOperationNotImplemented(f"Cannot set pulse width of {trigger_source.name}.")

Change the pulse width (samples) needed to trigger an event using an external trigger source if SPC_TM_PW_SMALLER or SPC_TM_PW_GREATER ExternalTriggerMode is selected. An external trigger source must be enabled.

Args
  • width (int): The trigger pulse width to set, in samples.
#   def apply_channel_enabling(self) -> None:
View Source
    def apply_channel_enabling(self) -> None:
        """Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not
        usually need to be called."""
        enabled_channel_spectrum_values = [self.analog_channels[i].name.value for i in self._enabled_analog_channels]
        if len(enabled_channel_spectrum_values) in [1, 2, 4, 8]:
            bitwise_or_of_enabled_channels = reduce(or_, enabled_channel_spectrum_values)
            self.write_to_spectrum_device_register(SPC_CHENABLE, bitwise_or_of_enabled_channels)
        else:
            raise SpectrumInvalidNumberOfEnabledChannels(
                f"Cannot enable {len(enabled_channel_spectrum_values)} " f"channels on one card."
            )

Apply the enabled channels chosen using set_enable_channels(). This happens automatically and does not usually need to be called.

#   timeout_in_ms: int

The time for which the card will wait for a trigger to be received after the device has been started before returning an error.

Returns

timeout_in_ms (in)t: The currently set timeout in ms.

#   def set_timeout_in_ms(self, timeout_in_ms: int) -> None:
View Source
    def set_timeout_in_ms(self, timeout_in_ms: int) -> None:
        """Change the time for which the card will wait for a trigger to tbe received after the device has started
        before returning an error.

        Args:
            timeout_in_ms (int): The desired timeout in ms.
        """
        self.write_to_spectrum_device_register(SPC_TIMEOUT, timeout_in_ms)

Change the time for which the card will wait for a trigger to tbe received after the device has started before returning an error.

Args
  • timeout_in_ms (int): The desired timeout in ms.

The currently enabled clock mode.

Returns

mode (ClockMode): The currently set clock mode.

#   def set_clock_mode(self, mode: spectrumdevice.settings.device_modes.ClockMode) -> None:
View Source
    def set_clock_mode(self, mode: ClockMode) -> None:
        """Change the clock mode. See `ClockMode` and the Spectrum documentation for available modes.

        Args:
            mode (`ClockMode`): The desired clock mode.
        """
        self.write_to_spectrum_device_register(SPC_CLOCKMODE, mode.value)

Change the clock mode. See ClockMode and the Spectrum documentation for available modes.

Args
  • mode (ClockMode): The desired clock mode.

For each multipurpose IO line on the card, read the available modes. See IOLineMode and the Spectrum Documentation for all possible available modes and their meanings.

Returns

modes (AvailableIOModes): An AvailableIOModes dataclass containing the modes for each IO line.

Get a list of the features of the card. See CardFeature, AdvancedCardFeature and the Spectrum documentation for more information.

Returns

features (List[Tuple[List[CardFeature], List[AdvancedCardFeature]]]): A tuple of two lists - of features and advanced features respectively - wrapped in a list.

#   sample_rate_in_hz: int

The rate at which samples will be acquired or generated, in Hz.

Returns

rate (int): The currently set sample rate in Hz.

#   def set_sample_rate_in_hz(self, rate: int) -> None:
View Source
    def set_sample_rate_in_hz(self, rate: int) -> None:
        """Change the rate at which samples will be acquired or generated, in Hz.
        Args:
            rate (int): The desired sample rate in Hz.
        """
        self.write_to_spectrum_device_register(SPC_SAMPLERATE, rate, SpectrumRegisterLength.SIXTY_FOUR)

Change the rate at which samples will be acquired or generated, in Hz.

Args
  • rate (int): The desired sample rate in Hz.
#   def force_trigger(self) -> None:
View Source
    def force_trigger(self) -> None:
        """Force a trigger event to occur"""
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_FORCETRIGGER)

Force a trigger event to occur

#   bytes_per_sample: int
Inherited Members
AbstractSpectrumDevice
reset
start
stop
configure_trigger
configure_channel_pairing
write_to_spectrum_device_register
read_spectrum_device_register
spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
define_transfer_buffer
#   class AbstractSpectrumDevice(spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface[~AnalogChannelInterfaceType, ~IOLineInterfaceType], abc.ABC):
View Source
class AbstractSpectrumDevice(SpectrumDeviceInterface[AnalogChannelInterfaceType, IOLineInterfaceType], ABC):
    """Abstract superclass which implements methods common to all Spectrum devices. Instances of this class
    cannot be constructed directly. Instead, construct instances of the concrete classes listed in
    spectrumdevice/__init__.py, which inherit the methods defined here. Note that the concrete mock devices override
    several of the methods defined here."""

    def _connect(self, visa_string: str) -> None:
        self._handle = spectrum_handle_factory(visa_string)
        self._connected = True

    def reset(self) -> None:
        """Perform a software and hardware reset.

        All settings are set to hardware default values. The data in the board’s on-board memory will be no longer
        valid. Any output signals (including triggers and clocks) will be disabled."""
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_RESET)

    def start(self) -> None:
        """Start the device.

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each
        acquisition. In-between calls, waveforms must be manually transferred from the device to a `TransferBuffer`
        using `start_transfer()`. The `TransferBuffer` need not be defined until after `start` is called.

        For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by
        a call to `start_transfer()`. Frames will then be continuously streamed to the `TransferBuffer`, which must have
        already been defined.

        # todo: docstring for different AWG modes
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER)

    def stop(self) -> None:
        """Stop the device.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that
        occurs after calling `start()`. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE).

        # todo: docstring for AWG
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_STOP)

    def configure_trigger(self, settings: TriggerSettings) -> None:
        """Apply all the trigger settings contained in a `TriggerSettings` dataclass to the device.

        Args:
            settings (`TriggerSettings`): A `TriggerSettings` dataclass containing the setting values to apply."""
        self.set_trigger_sources(settings.trigger_sources)
        if len(set(self.trigger_sources) & set(EXTERNAL_TRIGGER_SOURCES)) > 0:
            if settings.external_trigger_mode is not None:
                self.set_external_trigger_mode(settings.external_trigger_mode)
            if settings.external_trigger_level_in_mv is not None:
                self.set_external_trigger_level_in_mv(settings.external_trigger_level_in_mv)
            if settings.external_trigger_pulse_width_in_samples is not None:
                self.set_external_trigger_pulse_width_in_samples(settings.external_trigger_pulse_width_in_samples)

        # Write the configuration to the card
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)

    def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None:
        """Configures a pair of consecutive channels to operate either independently, in differential mode or
        in double  mode. If enabling differential or double mode, then the odd-numbered channel will be automatically
        configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is
        required by the Spectrum API.

        Args:
            channel_pair (ChannelPair): The pair of channels to configure
            mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL
        """

        doubling_enabled = int(mode == ChannelPairingMode.DOUBLE)
        differential_mode_enabled = int(mode == ChannelPairingMode.DIFFERENTIAL)

        if doubling_enabled and channel_pair in (channel_pair.CHANNEL_4_AND_5, channel_pair.CHANNEL_6_AND_7):
            raise ValueError("Doubling can only be enabled for channel pairs CHANNEL_0_AND_1 or CHANNEL_2_AND_3.")

        if doubling_enabled or differential_mode_enabled:
            self._mirror_even_channel_settings_on_odd_channel(channel_pair)
            self._disable_odd_channel(channel_pair)

        self.write_to_spectrum_device_register(
            DIFFERENTIAL_CHANNEL_PAIR_COMMANDS[channel_pair], differential_mode_enabled
        )
        self.write_to_spectrum_device_register(DOUBLING_CHANNEL_PAIR_COMMANDS[channel_pair], doubling_enabled)

    def _disable_odd_channel(self, channel_pair: ChannelPair) -> None:
        try:
            enabled_channels = copy(self.enabled_analog_channel_nums)
            enabled_channels.remove(channel_pair.value + 1)
            self.set_enabled_analog_channels(enabled_channels)
        except ValueError:
            pass  # odd numbered channel was not enable, so no need to disable it.

    def _mirror_even_channel_settings_on_odd_channel(self, channel_pair: ChannelPair) -> None:
        self.analog_channels[channel_pair.value + 1].copy_settings_from_other_channel(
            self.analog_channels[channel_pair.value]
        )

    def write_to_spectrum_device_register(
        self,
        spectrum_register: int,
        value: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> None:
        """Set the value of a register on the Spectrum device.

        This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to configure a hardware
        device, but can also be used to set the value of registers that are not implemented in
        `AbstractSpectrumDigitiser` and its subclasses.

        Args:
            spectrum_register (int): Identifier of the register to set. This should be a global constant imported from
                regs.py in the spectrum_gmbh package.
            value (int): Value to write to the register. This should be a global constant imported from
                regs.py in the spectrum_gmbh package.
            length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register
                to set, in bits.
        """
        if not SPECTRUM_DRIVERS_FOUND:
            raise SpectrumDriversNotFound(
                "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use"
                " MockSpectrumDigitiserCard instead."
            )
        if self.connected:
            if length == SpectrumRegisterLength.THIRTY_TWO:
                set_spectrum_i32_api_param(self._handle, spectrum_register, value)
            elif length == SpectrumRegisterLength.SIXTY_FOUR:
                set_spectrum_i64_api_param(self._handle, spectrum_register, value)
            else:
                raise ValueError("Spectrum integer length not recognised.")
        else:
            raise SpectrumDeviceNotConnected("The device has been disconnected.")

    def read_spectrum_device_register(
        self,
        spectrum_register: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> int:
        """Get the value of a register on the Spectrum digitiser.

        This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to read the configuration of a
        hardware device, but can be also used to get the value of registers that are not implemented in
        `AbstractSpectrumDigitiser` and its subclasses.

        Args:
            spectrum_register (int): Identifier of the register to set. This should be a global constant imported from
                spectrum_gmbh.py_header.regs.
            length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register
                to set, in bits.

        Returns:
            value (int): Value of the register. This can be matched to a global constant imported from
                spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module.
        """
        if not SPECTRUM_DRIVERS_FOUND:
            raise SpectrumDriversNotFound(
                "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use"
                " a mock device instead (e.g. MockSpectrumDigitiserCard or MockSpectrumStarHub)."
            )
        if self.connected:
            if length == SpectrumRegisterLength.THIRTY_TWO:
                return get_spectrum_i32_api_param(self._handle, spectrum_register)
            elif length == SpectrumRegisterLength.SIXTY_FOUR:
                return get_spectrum_i64_api_param(self._handle, spectrum_register)
            else:
                raise ValueError("Spectrum integer length not recognised.")
        else:
            raise SpectrumDeviceNotConnected("The device has been disconnected.")

    def __repr__(self) -> str:
        return str(self)

Abstract superclass which implements methods common to all Spectrum devices. Instances of this class cannot be constructed directly. Instead, construct instances of the concrete classes listed in spectrumdevice/__init__.py, which inherit the methods defined here. Note that the concrete mock devices override several of the methods defined here.

#   def reset(self) -> None:
View Source
    def reset(self) -> None:
        """Perform a software and hardware reset.

        All settings are set to hardware default values. The data in the board’s on-board memory will be no longer
        valid. Any output signals (including triggers and clocks) will be disabled."""
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_RESET)

Perform a software and hardware reset.

All settings are set to hardware default values. The data in the board’s on-board memory will be no longer valid. Any output signals (including triggers and clocks) will be disabled.

#   def start(self) -> None:
View Source
    def start(self) -> None:
        """Start the device.

        For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each
        acquisition. In-between calls, waveforms must be manually transferred from the device to a `TransferBuffer`
        using `start_transfer()`. The `TransferBuffer` need not be defined until after `start` is called.

        For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by
        a call to `start_transfer()`. Frames will then be continuously streamed to the `TransferBuffer`, which must have
        already been defined.

        # todo: docstring for different AWG modes
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER)

Start the device.

For digitisers in Standard Single mode (SPC_REC_STD_SINGLE), this will need to be called once for each acquisition. In-between calls, waveforms must be manually transferred from the device to a TransferBuffer using start_transfer(). The TransferBuffer need not be defined until after start is called.

For digitisers in Multi FIFO mode (SPC_REC_FIFO_MULTI), it needs to be called only once, immediately followed by a call to start_transfer(). Frames will then be continuously streamed to the TransferBuffer, which must have already been defined.

todo: docstring for different AWG modes

#   def stop(self) -> None:
View Source
    def stop(self) -> None:
        """Stop the device.

        For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that
        occurs after calling `start()`. Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE).

        # todo: docstring for AWG
        """
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_STOP)

Stop the device.

For digitisers in FIFO mode (SPC_REC_FIFO_MULTI), this stops the continuous acquisition of waveform data that occurs after calling start(). Does not need to be called in Standard Single mode (SPC_REC_STD_SINGLE).

todo: docstring for AWG

#   def configure_trigger(self, settings: spectrumdevice.settings.TriggerSettings) -> None:
View Source
    def configure_trigger(self, settings: TriggerSettings) -> None:
        """Apply all the trigger settings contained in a `TriggerSettings` dataclass to the device.

        Args:
            settings (`TriggerSettings`): A `TriggerSettings` dataclass containing the setting values to apply."""
        self.set_trigger_sources(settings.trigger_sources)
        if len(set(self.trigger_sources) & set(EXTERNAL_TRIGGER_SOURCES)) > 0:
            if settings.external_trigger_mode is not None:
                self.set_external_trigger_mode(settings.external_trigger_mode)
            if settings.external_trigger_level_in_mv is not None:
                self.set_external_trigger_level_in_mv(settings.external_trigger_level_in_mv)
            if settings.external_trigger_pulse_width_in_samples is not None:
                self.set_external_trigger_pulse_width_in_samples(settings.external_trigger_pulse_width_in_samples)

        # Write the configuration to the card
        self.write_to_spectrum_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)

Apply all the trigger settings contained in a TriggerSettings dataclass to the device.

Args
  • settings (TriggerSettings): A TriggerSettings dataclass containing the setting values to apply.
View Source
    def configure_channel_pairing(self, channel_pair: ChannelPair, mode: ChannelPairingMode) -> None:
        """Configures a pair of consecutive channels to operate either independently, in differential mode or
        in double  mode. If enabling differential or double mode, then the odd-numbered channel will be automatically
        configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is
        required by the Spectrum API.

        Args:
            channel_pair (ChannelPair): The pair of channels to configure
            mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL
        """

        doubling_enabled = int(mode == ChannelPairingMode.DOUBLE)
        differential_mode_enabled = int(mode == ChannelPairingMode.DIFFERENTIAL)

        if doubling_enabled and channel_pair in (channel_pair.CHANNEL_4_AND_5, channel_pair.CHANNEL_6_AND_7):
            raise ValueError("Doubling can only be enabled for channel pairs CHANNEL_0_AND_1 or CHANNEL_2_AND_3.")

        if doubling_enabled or differential_mode_enabled:
            self._mirror_even_channel_settings_on_odd_channel(channel_pair)
            self._disable_odd_channel(channel_pair)

        self.write_to_spectrum_device_register(
            DIFFERENTIAL_CHANNEL_PAIR_COMMANDS[channel_pair], differential_mode_enabled
        )
        self.write_to_spectrum_device_register(DOUBLING_CHANNEL_PAIR_COMMANDS[channel_pair], doubling_enabled)

Configures a pair of consecutive channels to operate either independently, in differential mode or in double mode. If enabling differential or double mode, then the odd-numbered channel will be automatically configured to be identical to the even-numbered channel, and the odd-numbered channel will be disabled as is required by the Spectrum API.

Args
  • channel_pair (ChannelPair): The pair of channels to configure
  • mode (ChannelPairingMode): The mode to enable: SINGLE, DOUBLE, or DIFFERENTIAL
#   def write_to_spectrum_device_register( self, spectrum_register: int, value: int, length: spectrumdevice.settings.SpectrumRegisterLength = THIRTY_TWO ) -> None:
View Source
    def write_to_spectrum_device_register(
        self,
        spectrum_register: int,
        value: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> None:
        """Set the value of a register on the Spectrum device.

        This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to configure a hardware
        device, but can also be used to set the value of registers that are not implemented in
        `AbstractSpectrumDigitiser` and its subclasses.

        Args:
            spectrum_register (int): Identifier of the register to set. This should be a global constant imported from
                regs.py in the spectrum_gmbh package.
            value (int): Value to write to the register. This should be a global constant imported from
                regs.py in the spectrum_gmbh package.
            length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register
                to set, in bits.
        """
        if not SPECTRUM_DRIVERS_FOUND:
            raise SpectrumDriversNotFound(
                "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use"
                " MockSpectrumDigitiserCard instead."
            )
        if self.connected:
            if length == SpectrumRegisterLength.THIRTY_TWO:
                set_spectrum_i32_api_param(self._handle, spectrum_register, value)
            elif length == SpectrumRegisterLength.SIXTY_FOUR:
                set_spectrum_i64_api_param(self._handle, spectrum_register, value)
            else:
                raise ValueError("Spectrum integer length not recognised.")
        else:
            raise SpectrumDeviceNotConnected("The device has been disconnected.")

Set the value of a register on the Spectrum device.

This method is used internally by AbstractSpectrumDigitiser and its subclasses to configure a hardware device, but can also be used to set the value of registers that are not implemented in AbstractSpectrumDigitiser and its subclasses.

Args
  • spectrum_register (int): Identifier of the register to set. This should be a global constant imported from regs.py in the spectrum_gmbh package.
  • value (int): Value to write to the register. This should be a global constant imported from regs.py in the spectrum_gmbh package.
  • length (SpectrumRegisterLength): A SpectrumRegisterLength object specifying the length of the register to set, in bits.
#   def read_spectrum_device_register( self, spectrum_register: int, length: spectrumdevice.settings.SpectrumRegisterLength = THIRTY_TWO ) -> int:
View Source
    def read_spectrum_device_register(
        self,
        spectrum_register: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> int:
        """Get the value of a register on the Spectrum digitiser.

        This method is used internally by `AbstractSpectrumDigitiser` and its subclasses to read the configuration of a
        hardware device, but can be also used to get the value of registers that are not implemented in
        `AbstractSpectrumDigitiser` and its subclasses.

        Args:
            spectrum_register (int): Identifier of the register to set. This should be a global constant imported from
                spectrum_gmbh.py_header.regs.
            length (`SpectrumRegisterLength`): A `SpectrumRegisterLength` object specifying the length of the register
                to set, in bits.

        Returns:
            value (int): Value of the register. This can be matched to a global constant imported from
                spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module.
        """
        if not SPECTRUM_DRIVERS_FOUND:
            raise SpectrumDriversNotFound(
                "Cannot communicate with hardware. For testing on a system without drivers or connected hardware, use"
                " a mock device instead (e.g. MockSpectrumDigitiserCard or MockSpectrumStarHub)."
            )
        if self.connected:
            if length == SpectrumRegisterLength.THIRTY_TWO:
                return get_spectrum_i32_api_param(self._handle, spectrum_register)
            elif length == SpectrumRegisterLength.SIXTY_FOUR:
                return get_spectrum_i64_api_param(self._handle, spectrum_register)
            else:
                raise ValueError("Spectrum integer length not recognised.")
        else:
            raise SpectrumDeviceNotConnected("The device has been disconnected.")

Get the value of a register on the Spectrum digitiser.

This method is used internally by AbstractSpectrumDigitiser and its subclasses to read the configuration of a hardware device, but can be also used to get the value of registers that are not implemented in AbstractSpectrumDigitiser and its subclasses.

Args
  • spectrum_register (int): Identifier of the register to set. This should be a global constant imported from spectrum_gmbh.py_header.regs.
  • length (SpectrumRegisterLength): A SpectrumRegisterLength object specifying the length of the register to set, in bits.
Returns

value (int): Value of the register. This can be matched to a global constant imported from spectrum_gmbh.py_header.regs, usually using one of the Enums defined in the settings module.

Inherited Members
spectrumdevice.devices.abstract_device.device_interface.SpectrumDeviceInterface
connected
reconnect
status
start_transfer
stop_transfer
wait_for_transfer_chunk_to_complete
disconnect
transfer_buffers
define_transfer_buffer
analog_channels
io_lines
enabled_analog_channel_nums
set_enabled_analog_channels
trigger_sources
set_trigger_sources
external_trigger_mode
set_external_trigger_mode
external_trigger_level_in_mv
set_external_trigger_level_in_mv
external_trigger_pulse_width_in_samples
set_external_trigger_pulse_width_in_samples
apply_channel_enabling
clock_mode
set_clock_mode
sample_rate_in_hz
set_sample_rate_in_hz
available_io_modes
feature_list
timeout_in_ms
set_timeout_in_ms
force_trigger
bytes_per_sample
type
model_number
#   class AbstractSpectrumChannel(spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumChannelInterface, typing.Generic[~ChannelNameType]):
View Source
class AbstractSpectrumChannel(SpectrumChannelInterface, Generic[ChannelNameType]):
    """Partially implemented abstract superclass contain code common for controlling an individual channel or IO Line of
    all spectrum devices."""

    def __init__(self, channel_number: int, parent_device: SpectrumDeviceInterface, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self._name = self._make_name(channel_number)
        self._parent_device = parent_device
        self._enabled = True

    @property
    @abstractmethod
    def _name_prefix(self) -> str:
        raise NotImplementedError

    @abstractmethod
    def _make_name(self, channel_number: int) -> ChannelNameType:
        raise NotImplementedError

    @property
    def name(self) -> ChannelNameType:
        """The identifier assigned by the spectrum driver, formatted as an Enum by the settings package.

        Returns:
            name (`SpectrumChannelName`): The name of the channel, as assigned by the driver."""
        return self._name

    @property
    def _number(self) -> int:
        return int(self.name.name.split(self._name_prefix)[-1])

    def write_to_parent_device_register(
        self,
        spectrum_register: int,
        value: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> None:
        self._parent_device.write_to_spectrum_device_register(spectrum_register, value, length)

    def read_parent_device_register(
        self,
        spectrum_register: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> int:
        return self._parent_device.read_spectrum_device_register(spectrum_register, length)

    def __eq__(self, other: object) -> bool:
        if isinstance(other, AbstractSpectrumChannel):
            return (self.name == other.name) and (self._parent_device == other._parent_device)
        else:
            raise NotImplementedError()

    def __str__(self) -> str:
        return f"{self._name.name} of {self._parent_device}"

    def __repr__(self) -> str:
        return str(self)

Partially implemented abstract superclass contain code common for controlling an individual channel or IO Line of all spectrum devices.

#   name: ~ChannelNameType

The identifier assigned by the spectrum driver, formatted as an Enum by the settings package.

Returns

name (SpectrumChannelName): The name of the channel, as assigned by the driver.

#   def write_to_parent_device_register( self, spectrum_register: int, value: int, length: spectrumdevice.settings.SpectrumRegisterLength = THIRTY_TWO ) -> None:
View Source
    def write_to_parent_device_register(
        self,
        spectrum_register: int,
        value: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> None:
        self._parent_device.write_to_spectrum_device_register(spectrum_register, value, length)
#   def read_parent_device_register( self, spectrum_register: int, length: spectrumdevice.settings.SpectrumRegisterLength = THIRTY_TWO ) -> int:
View Source
    def read_parent_device_register(
        self,
        spectrum_register: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> int:
        return self._parent_device.read_spectrum_device_register(spectrum_register, length)
View Source
"""Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring
hardware and interpreting responses received from hardware."""

# Christian Baker, King's College London
# Copyright (c) 2021 School of Biomedical Engineering & Imaging Sciences, King's College London
# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT.

from dataclasses import dataclass
from enum import Enum
from typing import List, Optional

from numpy import int16
from numpy.typing import NDArray

from spectrumdevice.settings.card_dependent_properties import ModelNumber
from spectrumdevice.settings.card_features import CardFeature, AdvancedCardFeature
from spectrumdevice.settings.channel import (
    InputImpedance,
    InputCoupling,
    InputPath,
    OutputChannelFilter,
    OutputChannelStopLevelMode,
)
from spectrumdevice.settings.device_modes import AcquisitionMode, ClockMode, GenerationMode
from spectrumdevice.settings.io_lines import IOLineMode, AvailableIOModes
from spectrumdevice.settings.transfer_buffer import (
    TransferBuffer,
)
from spectrumdevice.settings.triggering import TriggerSource, ExternalTriggerMode
from spectrumdevice.settings.status import CARD_STATUS_TYPE, DEVICE_STATUS_TYPE, StatusCode
from spectrumdevice.settings.pulse_generator import (
    PulseGeneratorTriggerSettings,
    PulseGeneratorTriggerMode,
    PulseGeneratorTriggerDetectionMode,
    PulseGeneratorMultiplexer1TriggerSource,
    PulseGeneratorMultiplexer2TriggerSource,
    PulseGeneratorOutputSettings,
)


__all__ = [
    "AcquisitionSettings",
    "TriggerSettings",
    "AcquisitionMode",
    "ClockMode",
    "CardFeature",
    "AdvancedCardFeature",
    "IOLineMode",
    "AvailableIOModes",
    "TransferBuffer",
    "TriggerSource",
    "ExternalTriggerMode",
    "CARD_STATUS_TYPE",
    "DEVICE_STATUS_TYPE",
    "StatusCode",
    "SpectrumRegisterLength",
    "ModelNumber",
    "GenerationSettings",
    "OutputChannelFilter",
    "OutputChannelStopLevelMode",
    "GenerationMode",
    "PulseGeneratorTriggerSettings",
    "PulseGeneratorTriggerMode",
    "PulseGeneratorTriggerDetectionMode",
    "PulseGeneratorMultiplexer1TriggerSource",
    "PulseGeneratorMultiplexer2TriggerSource",
    "PulseGeneratorOutputSettings",
]


@dataclass
class TriggerSettings:
    """A dataclass collecting all settings related to triggering generation and acquisition. See Spectrum documentation.
    Note that pulse generators have their own trigger options."""

    trigger_sources: List[TriggerSource]
    """The trigger sources to enable"""
    external_trigger_mode: Optional[ExternalTriggerMode] = None
    """The external trigger mode (if an external trigger is enabled)."""
    external_trigger_level_in_mv: Optional[int] = None
    """The level an external signal must reach to cause a trigger event (if an external trigger is enabled)."""
    external_trigger_pulse_width_in_samples: Optional[int] = None
    """The required width of an external trigger pulse (if an external trigger is enabled)."""


@dataclass
class AcquisitionSettings:
    """A dataclass collecting all settings required to configure an acquisition. See Spectrum documentation."""

    acquisition_mode: AcquisitionMode
    """Standard Single mode, Multi FIF mode or an averaging mode."""
    sample_rate_in_hz: int
    """Acquisition rate in samples per second."""
    acquisition_length_in_samples: int
    """The length of the recording in samples per channel."""
    pre_trigger_length_in_samples: int
    """The number of samples of the recording that will have been acquired before the trigger event."""
    timeout_in_ms: int
    """How long to wait for a trigger event before timing out."""
    enabled_channels: List[int]
    """The channel indices to enable for the acquisition."""
    vertical_ranges_in_mv: List[int]
    """The voltage range to apply to each enabled channel in mW."""
    vertical_offsets_in_percent: List[int]
    """The DC offset to apply to each enabled channel as percentages of their vertical ranges."""
    input_impedances: List[InputImpedance]
    """The input impedance settings to apply to each channel"""
    timestamping_enabled: bool
    """If True, Measurements will include the time at which the acquisition was triggered. Increases latency by ~10 ms.
    """
    batch_size: int = 1
    """The number of acquisitions to transfer to the PC before the resulting waveforms are returned by
      SpectrumDigitiserCard.get_waveforms()."""
    number_of_averages: int = 1
    """If an averaging AcquisitionMode is selected, this defines the number of averages."""
    input_couplings: Optional[List[InputCoupling]] = None
    """The coupling (AC or DC) to apply to each channel. Only available on some hardware, so default is None."""
    input_paths: Optional[List[InputPath]] = None
    """The input path (HF or Buffered) to apply to each channel. Only available on some hardware, so default is None."""


@dataclass
class GenerationSettings:
    """A dataclass collecting all settings required to configure signal generation. See Spectrum documentation."""

    generation_mode: GenerationMode
    """SPC_REP_STD_SINGLE , SPC_REP_STD_SINGLERESTART"""
    waveform: NDArray[int16]
    """The waveform to generate."""
    sample_rate_in_hz: int
    """Generation rate in samples per second."""
    num_loops: int
    """In SPC_REP_STD_SINGLE mode: the number of times to repeat the waveform after a trigger is received. In
     SPC_REP_STD_SINGLERESTART: The number of times to wait for a trigger and generate waveform once."""
    enabled_channels: list[int]
    """List of analog channel indices to enable for signal generation"""
    signal_amplitudes_in_mv: list[int]
    """The amplitude of each enabled channel."""
    dc_offsets_in_mv: list[int]
    """The dc offset of each enabled channel."""
    output_filters: list[OutputChannelFilter]
    """The output filter setting for each enabled channel."""
    stop_level_modes: list[OutputChannelStopLevelMode]
    """The behavior of each enabled channel after the waveform ends."""
    custom_stop_levels: Optional[list[Optional[int]]] = None
    """The stop level each channel will use it stop level mode is set to custom."""


class SpectrumRegisterLength(Enum):
    """Enum defining the possible lengths of a spectrum register."""

    THIRTY_TWO = 0
    """32 bit register"""
    SIXTY_FOUR = 1
    """64 bit register"""

    def __repr__(self) -> str:
        return self.name

Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.

View Source
"""Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring
hardware and interpreting responses received from hardware."""

# Christian Baker, King's College London
# Copyright (c) 2024 School of Biomedical Engineering & Imaging Sciences, King's College London
# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT.

Provides Enums and Dataclasses wrapping the register values provided by the Spectrum API, to be used for configuring hardware and interpreting responses received from hardware.

#  
@dataclass
class Measurement:
View Source
@dataclass
class Measurement:
    """Measurement is a dataclass for storing a set of waveforms generated by a single acquisition, with a timestamp."""

    waveforms: List[NDArray[float_]]
    """Contains the acquired waveforms as a list of 1D NumPy arrays"""
    timestamp: Optional[datetime]
    """The time at which the acquisition was triggered, as a datetime.datetime object"""

Measurement is a dataclass for storing a set of waveforms generated by a single acquisition, with a timestamp.

#   Measurement( waveforms: List[numpy.ndarray[Any, numpy.dtype[numpy.float64]]], timestamp: Optional[datetime.datetime] )
#   waveforms: List[numpy.ndarray[Any, numpy.dtype[numpy.float64]]]

Contains the acquired waveforms as a list of 1D NumPy arrays

#   timestamp: Optional[datetime.datetime]

The time at which the acquisition was triggered, as a datetime.datetime object

View Source
class SpectrumAWGCard(
    AbstractSpectrumCard[SpectrumAWGAnalogChannelInterface, SpectrumAWGIOLineInterface], AbstractSpectrumAWG
):
    """Class for controlling individual Spectrum AWG cards."""

    def _init_analog_channels(self) -> Sequence[SpectrumAWGAnalogChannelInterface]:
        num_modules = self.read_spectrum_device_register(SPC_MIINST_MODULES)
        num_channels_per_module = self.read_spectrum_device_register(SPC_MIINST_CHPERMODULE)
        total_channels = num_modules * num_channels_per_module
        return tuple([SpectrumAWGAnalogChannel(channel_number=n, parent_device=self) for n in range(total_channels)])

    def _init_io_lines(self) -> Sequence[SpectrumAWGIOLineInterface]:
        if (self.model_number.value & TYP_SERIESMASK) == TYP_M2PEXPSERIES:
            return tuple([SpectrumAWGIOLine(channel_number=n, parent_device=self) for n in range(4)])
        else:
            raise NotImplementedError("Don't know how many IO lines other types of card have. Only M2P series.")

    def transfer_waveform(self, waveform: NDArray[int16]) -> None:
        """ "Write an arbitrary waveform to the card's on-board memory.

        Args:
            waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the
                waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see
                SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16
                (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of
                8 samples, the waveform will be zero-padded so its length is the next multiple of 8.

        """
        if len(waveform) < 16:
            raise ValueError("Waveform must be at least 16 samples long")
        step_size = get_memsize_step_size(self._model_number)
        remainder = len(waveform) % step_size
        if remainder > 0:
            logger.warning(
                "Length of waveform transmitted to AWG is not a multiple of 8 samples. Waveform in card memory will be "
                "zero-padded to the next multiple of 8."
            )
        coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder)

        buffer = transfer_buffer_factory(
            buffer_type=BufferType.SPCM_BUF_DATA,
            direction=BufferDirection.SPCM_DIR_PCTOCARD,
            size_in_samples=coerced_mem_size,
            bytes_per_sample=self.bytes_per_sample,
        )
        buffer.data_array[:] = concatenate([waveform, zeros(coerced_mem_size - len(waveform), dtype=int16)])
        self.define_transfer_buffer((buffer,))
        self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size)
        self.start_transfer()
        self.wait_for_transfer_chunk_to_complete()

    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Provide a `TransferBuffer` object for transferring samples to the card. This is called internally when
        transfer_waveform is used to send a single waveform to the card.

        Args:
            buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed
                `TransferBuffer`  The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and
                BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the
                length of the data to transfer.
        """
        if buffer is None:
            raise ValueError(
                "You must provide a preconfigured buffer for transferring samples to an AWG because the"
                "buffer size cannot be inferred."
            )
        self._transfer_buffer = buffer[0]
        set_transfer_buffer(self._handle, self._transfer_buffer)

Class for controlling individual Spectrum AWG cards.

#   def transfer_waveform( self, waveform: numpy.ndarray[typing.Any, numpy.dtype[numpy.int16]] ) -> None:
View Source
    def transfer_waveform(self, waveform: NDArray[int16]) -> None:
        """ "Write an arbitrary waveform to the card's on-board memory.

        Args:
            waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the
                waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see
                SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16
                (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of
                8 samples, the waveform will be zero-padded so its length is the next multiple of 8.

        """
        if len(waveform) < 16:
            raise ValueError("Waveform must be at least 16 samples long")
        step_size = get_memsize_step_size(self._model_number)
        remainder = len(waveform) % step_size
        if remainder > 0:
            logger.warning(
                "Length of waveform transmitted to AWG is not a multiple of 8 samples. Waveform in card memory will be "
                "zero-padded to the next multiple of 8."
            )
        coerced_mem_size = len(waveform) if remainder == 0 else len(waveform) + (step_size - remainder)

        buffer = transfer_buffer_factory(
            buffer_type=BufferType.SPCM_BUF_DATA,
            direction=BufferDirection.SPCM_DIR_PCTOCARD,
            size_in_samples=coerced_mem_size,
            bytes_per_sample=self.bytes_per_sample,
        )
        buffer.data_array[:] = concatenate([waveform, zeros(coerced_mem_size - len(waveform), dtype=int16)])
        self.define_transfer_buffer((buffer,))
        self.write_to_spectrum_device_register(SPC_MEMSIZE, coerced_mem_size)
        self.start_transfer()
        self.wait_for_transfer_chunk_to_complete()

"Write an arbitrary waveform to the card's on-board memory.

Args
  • waveform (NDArray[int16]): A numpy array of signed 16-bit integers representing the samples of the waveform to transfer. The amplitude and offset of the generated signals can be set per-channel (see SpectrumAWGAnalogChannel), so the waveform provided here should be scaled to the full range of int16 (i.e. -32767 to 32767). Must be at least 16 samples long. If the waveform length is not a multiple of 8 samples, the waveform will be zero-padded so its length is the next multiple of 8.
#   def define_transfer_buffer( self, buffer: Optional[Sequence[spectrumdevice.settings.transfer_buffer.TransferBuffer]] = None ) -> None:
View Source
    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Provide a `TransferBuffer` object for transferring samples to the card. This is called internally when
        transfer_waveform is used to send a single waveform to the card.

        Args:
            buffer (Optional[List[`TransferBuffer`]]): A length-1 list containing a pre-constructed
                `TransferBuffer`  The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and
                BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the
                length of the data to transfer.
        """
        if buffer is None:
            raise ValueError(
                "You must provide a preconfigured buffer for transferring samples to an AWG because the"
                "buffer size cannot be inferred."
            )
        self._transfer_buffer = buffer[0]
        set_transfer_buffer(self._handle, self._transfer_buffer)

Provide a TransferBuffer object for transferring samples to the card. This is called internally when transfer_waveform is used to send a single waveform to the card.

Args
  • buffer (Optional[List[TransferBuffer]]): A length-1 list containing a pre-constructed TransferBuffer The buffer should have buffer_type=BufferType.SPCM_BUF_DATA and BufferDirection.SPCM_DIR_PCTOCARD. The size of the buffer should be chosen according to the length of the data to transfer.
View Source
class MockSpectrumAWGCard(MockAbstractSpectrumAWG, MockAbstractSpectrumCard, SpectrumAWGCard):
    """A mock AWG card."""

    def __init__(
        self,
        device_number: int,
        model: ModelNumber,
        num_modules: int,
        num_channels_per_module: int,
        card_features: Optional[list[CardFeature]] = None,
        advanced_card_features: Optional[list[AdvancedCardFeature]] = None,
    ) -> None:
        """
        Args:
            device_number (int): The index of the mock device to create. Used to create a name for the device which is
                used internally.
            model (ModelNumber): The model of card to mock.
            num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this
                is read from the device so does not need to be set. See the Spectrum documentation to work out how many
                modules your hardware has.
            num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On
                real hardware, this is read from the device so does not need to be set.
            card_features (list[CardFeature]): List of available features of the mock device
            advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
        """
        super().__init__(
            card_type=CardType.SPCM_TYPE_AO,
            device_number=device_number,
            model=model,
            num_modules=num_modules,
            num_channels_per_module=num_channels_per_module,
            card_features=card_features if card_features is not None else [],
            advanced_card_features=advanced_card_features if advanced_card_features is not None else [],
        )
        self._connect(self._visa_string)

    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide a `TransferBuffer` object for transferring samples from the device.

        See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it
        does not write to any hardware device."""
        if buffer is None:
            raise ValueError(
                "You must provide a preconfigured buffer for transferring samples to an AWG because the"
                "buffer size cannot be inferred."
            )
        self._transfer_buffer = buffer[0]

A mock AWG card.

#   MockSpectrumAWGCard( device_number: int, model: spectrumdevice.settings.card_dependent_properties.ModelNumber, num_modules: int, num_channels_per_module: int, card_features: Optional[list[spectrumdevice.settings.card_features.CardFeature]] = None, advanced_card_features: Optional[list[spectrumdevice.settings.card_features.AdvancedCardFeature]] = None )
View Source
    def __init__(
        self,
        device_number: int,
        model: ModelNumber,
        num_modules: int,
        num_channels_per_module: int,
        card_features: Optional[list[CardFeature]] = None,
        advanced_card_features: Optional[list[AdvancedCardFeature]] = None,
    ) -> None:
        """
        Args:
            device_number (int): The index of the mock device to create. Used to create a name for the device which is
                used internally.
            model (ModelNumber): The model of card to mock.
            num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this
                is read from the device so does not need to be set. See the Spectrum documentation to work out how many
                modules your hardware has.
            num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On
                real hardware, this is read from the device so does not need to be set.
            card_features (list[CardFeature]): List of available features of the mock device
            advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
        """
        super().__init__(
            card_type=CardType.SPCM_TYPE_AO,
            device_number=device_number,
            model=model,
            num_modules=num_modules,
            num_channels_per_module=num_channels_per_module,
            card_features=card_features if card_features is not None else [],
            advanced_card_features=advanced_card_features if advanced_card_features is not None else [],
        )
        self._connect(self._visa_string)
Args
  • device_number (int): The index of the mock device to create. Used to create a name for the device which is used internally.
  • model (ModelNumber): The model of card to mock.
  • num_modules (int): The number of internal modules to assign the mock card. Default 2. On real hardware, this is read from the device so does not need to be set. See the Spectrum documentation to work out how many modules your hardware has.
  • num_channels_per_module (int): The number of channels per module. Default 4 (so 8 channels in total). On real hardware, this is read from the device so does not need to be set.
  • card_features (list[CardFeature]): List of available features of the mock device
  • advanced_card_features (list[AdvancedCardFeature]): List of available advanced features of the mock device
#   def define_transfer_buffer( self, buffer: Optional[Sequence[spectrumdevice.settings.transfer_buffer.TransferBuffer]] = None ) -> None:
View Source
    def define_transfer_buffer(self, buffer: Optional[Sequence[TransferBuffer]] = None) -> None:
        """Create or provide a `TransferBuffer` object for transferring samples from the device.

        See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it
        does not write to any hardware device."""
        if buffer is None:
            raise ValueError(
                "You must provide a preconfigured buffer for transferring samples to an AWG because the"
                "buffer size cannot be inferred."
            )
        self._transfer_buffer = buffer[0]

Create or provide a TransferBuffer object for transferring samples from the device.

See SpectrumAWGCard.define_transfer_buffer(). This mock implementation is identical apart from that it does not write to any hardware device.

#   class SpectrumAWGAnalogChannel(spectrumdevice.AbstractSpectrumChannel[spectrumdevice.settings.channel.SpectrumAnalogChannelName], spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface, abc.ABC):
View Source
class SpectrumAWGAnalogChannel(AbstractSpectrumAnalogChannel, SpectrumAWGAnalogChannelInterface):
    """Class for controlling analog channels of an AWG."""

    def __init__(self, parent_device: SpectrumAWGInterface, **kwargs: Any) -> None:
        if parent_device.type != CardType.SPCM_TYPE_AO:
            raise SpectrumCardIsNotAnAWG(parent_device.type)
        super().__init__(parent_device=parent_device, **kwargs)  # pass unused args up the inheritance hierarchy

    def _get_settings_as_dict(self) -> dict:
        return {
            SpectrumAWGAnalogChannel.signal_amplitude_in_mv.__name__: self.signal_amplitude_in_mv,
            SpectrumAWGAnalogChannel.dc_offset_in_mv.__name__: self.dc_offset_in_mv,
            SpectrumAWGAnalogChannel.output_filter.__name__: self.output_filter,
            SpectrumAWGAnalogChannel.stop_level_mode.__name__: self.stop_level_mode,
            SpectrumAWGAnalogChannel.stop_level_custom_value.__name__: self.stop_level_custom_value,
        }

    def _set_settings_from_dict(self, settings: dict) -> None:
        self.set_signal_amplitude_in_mv(settings[SpectrumAWGAnalogChannel.signal_amplitude_in_mv.__name__])
        self.set_dc_offset_in_mv(settings[SpectrumAWGAnalogChannel.dc_offset_in_mv.__name__])
        self.set_output_filter(settings[SpectrumAWGAnalogChannel.output_filter.__name__])
        self.set_stop_level_mode(settings[SpectrumAWGAnalogChannel.stop_level_mode.__name__])
        self.set_stop_level_custom_value(settings[SpectrumAWGAnalogChannel.stop_level_custom_value.__name__])

    @property
    def is_switched_on(self) -> bool:
        """Returns "True" if the output channel is switched on, or "False" if it is muted."""
        return bool(self._parent_device.read_spectrum_device_register(OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number]))

    def set_is_switched_on(self, is_enabled: bool) -> None:
        """Switches the output channel on ("True") or off ("False")."""
        self._parent_device.write_to_spectrum_device_register(
            OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number], int(is_enabled)
        )

    @property
    def dc_offset_in_mv(self) -> int:
        """The current output signal DC offset in mV.

        Returns:
            dc_offset (int): The currently set output signal DC offset in mV.
        """
        return self._parent_device.read_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number])

    def set_dc_offset_in_mv(self, dc_offset: int) -> None:
        if dc_offset > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]:
            raise ValueError(
                f"Max allowed signal DC offset for card {self._parent_device.model_number} is "
                f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, "
                f"so {dc_offset} mV is too high."
            )
        self._parent_device.write_to_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number], dc_offset)

    @property
    def signal_amplitude_in_mv(self) -> int:
        """The current output signal amplitude in mV.

        Returns:
            amplitude (int): The currently set output signal amplitude in mV.
        """
        return self._parent_device.read_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number])

    def set_signal_amplitude_in_mv(self, amplitude: int) -> None:
        if amplitude > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]:
            raise ValueError(
                f"Max allowed signal amplitude for card {self._parent_device.model_number} is "
                f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, "
                f"so {amplitude} mV is too high."
            )
        self._parent_device.write_to_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number], amplitude)

    @property
    def output_filter(self) -> OutputChannelFilter:
        """The current output filter setting.

        Returns:
            output_filter (OutputChannelFilter): The currently set output filter.
        """
        return OutputChannelFilter(
            self._parent_device.read_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number])
        )

    def set_output_filter(self, output_filter: OutputChannelFilter) -> None:
        self._parent_device.write_to_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number], output_filter.value)

    @property
    def stop_level_mode(self) -> OutputChannelStopLevelMode:
        """Sets the behavior of the channel when the output is stopped or playback finished."""
        return OutputChannelStopLevelMode(
            self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number])
        )

    def set_stop_level_mode(self, mode: OutputChannelStopLevelMode) -> None:
        self._parent_device.write_to_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number], mode.value)

    @property
    def stop_level_custom_value(self) -> int16:
        """Sets the level to which the output will be set when the output is stopped or playback finished and
        stop_level_mode is set to `OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM`."""
        return int16(
            self._parent_device.read_spectrum_device_register(OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number])
        )

    def set_stop_level_custom_value(self, value: int16) -> None:
        self._parent_device.write_to_spectrum_device_register(
            OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number], int(value)
        )

Class for controlling analog channels of an AWG.

#   SpectrumAWGAnalogChannel( parent_device: spectrumdevice.devices.awg.awg_interface.SpectrumAWGInterface, **kwargs: Any )
View Source
    def __init__(self, parent_device: SpectrumAWGInterface, **kwargs: Any) -> None:
        if parent_device.type != CardType.SPCM_TYPE_AO:
            raise SpectrumCardIsNotAnAWG(parent_device.type)
        super().__init__(parent_device=parent_device, **kwargs)  # pass unused args up the inheritance hierarchy
#   is_switched_on: bool

Returns "True" if the output channel is switched on, or "False" if it is muted.

#   def set_is_switched_on(self, is_enabled: bool) -> None:
View Source
    def set_is_switched_on(self, is_enabled: bool) -> None:
        """Switches the output channel on ("True") or off ("False")."""
        self._parent_device.write_to_spectrum_device_register(
            OUTPUT_CHANNEL_ENABLED_COMMANDS[self._number], int(is_enabled)
        )

Switches the output channel on ("True") or off ("False").

#   dc_offset_in_mv: int

The current output signal DC offset in mV.

Returns

dc_offset (int): The currently set output signal DC offset in mV.

#   def set_dc_offset_in_mv(self, dc_offset: int) -> None:
View Source
    def set_dc_offset_in_mv(self, dc_offset: int) -> None:
        if dc_offset > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]:
            raise ValueError(
                f"Max allowed signal DC offset for card {self._parent_device.model_number} is "
                f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, "
                f"so {dc_offset} mV is too high."
            )
        self._parent_device.write_to_spectrum_device_register(OUTPUT_DC_OFFSET_COMMANDS[self._number], dc_offset)
#   signal_amplitude_in_mv: int

The current output signal amplitude in mV.

Returns

amplitude (int): The currently set output signal amplitude in mV.

#   def set_signal_amplitude_in_mv(self, amplitude: int) -> None:
View Source
    def set_signal_amplitude_in_mv(self, amplitude: int) -> None:
        if amplitude > OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]:
            raise ValueError(
                f"Max allowed signal amplitude for card {self._parent_device.model_number} is "
                f"{OUTPUT_AMPLITUDE_LIMITS_IN_MV[self._parent_device.model_number]} mV, "
                f"so {amplitude} mV is too high."
            )
        self._parent_device.write_to_spectrum_device_register(OUTPUT_AMPLITUDE_COMMANDS[self._number], amplitude)

The current output filter setting.

Returns

output_filter (OutputChannelFilter): The currently set output filter.

#   def set_output_filter( self, output_filter: spectrumdevice.settings.channel.OutputChannelFilter ) -> None:
View Source
    def set_output_filter(self, output_filter: OutputChannelFilter) -> None:
        self._parent_device.write_to_spectrum_device_register(OUTPUT_FILTER_COMMANDS[self._number], output_filter.value)

Sets the behavior of the channel when the output is stopped or playback finished.

#   def set_stop_level_mode( self, mode: spectrumdevice.settings.channel.OutputChannelStopLevelMode ) -> None:
View Source
    def set_stop_level_mode(self, mode: OutputChannelStopLevelMode) -> None:
        self._parent_device.write_to_spectrum_device_register(OUTPUT_STOP_LEVEL_MODE_COMMANDS[self._number], mode.value)
#   stop_level_custom_value: numpy.int16

Sets the level to which the output will be set when the output is stopped or playback finished and stop_level_mode is set to OutputChannelStopLevelMode.SPCM_STOPLVL_CUSTOM.

#   def set_stop_level_custom_value(self, value: numpy.int16) -> None:
View Source
    def set_stop_level_custom_value(self, value: int16) -> None:
        self._parent_device.write_to_spectrum_device_register(
            OUTPUT_STOP_LEVEL_CUSTOM_VALUE_COMMANDS[self._number], int(value)
        )
Inherited Members
AbstractSpectrumChannel
name
write_to_parent_device_register
read_parent_device_register
spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumAnalogChannelInterface
copy_settings_from_other_channel
#   class SpectrumAWGIOLine(spectrumdevice.devices.abstract_device.channel_interfaces.SpectrumIOLineInterface, spectrumdevice.AbstractSpectrumChannel[spectrumdevice.settings.io_lines.SpectrumIOLineName], abc.ABC):
View Source
class SpectrumAWGIOLine(AbstractSpectrumIOLine, SpectrumAWGIOLineInterface):
    """Class for controlling Multipurpose IO lines of an AWG (e.g. X0, X1, X2 and X3)"""

    def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None:
        if parent_device.type != CardType.SPCM_TYPE_AO:
            raise SpectrumCardIsNotAnAWG(parent_device.type)
        super().__init__(parent_device=parent_device, **kwargs)  # pass unused args up the inheritance hierarchy
        self._dig_out_settings = DigOutIOLineModeSettings(
            source_channel=DigOutSourceChannel.SPCM_XMODE_DIGOUTSRC_CH0,
            source_bit=DigOutSourceBit.SPCM_XMODE_DIGOUTSRC_BIT15,
        )

    @property
    def dig_out_settings(self) -> DigOutIOLineModeSettings:
        return self._dig_out_settings

    def set_dig_out_settings(self, dig_out_settings: DigOutIOLineModeSettings) -> None:
        self._dig_out_settings = dig_out_settings

    def _get_io_line_mode_settings_mask(self, mode: IOLineMode) -> int:
        if mode == IOLineMode.SPCM_XMODE_DIGOUT:
            return self._dig_out_settings.source_channel.value | self._dig_out_settings.source_bit.value
        else:
            return 0

Class for controlling Multipurpose IO lines of an AWG (e.g. X0, X1, X2 and X3)

View Source
    def __init__(self, parent_device: AbstractSpectrumCard, **kwargs: Any) -> None:
        if parent_device.type != CardType.SPCM_TYPE_AO:
            raise SpectrumCardIsNotAnAWG(parent_device.type)
        super().__init__(parent_device=parent_device, **kwargs)  # pass unused args up the inheritance hierarchy
        self._dig_out_settings = DigOutIOLineModeSettings(
            source_channel=DigOutSourceChannel.SPCM_XMODE_DIGOUTSRC_CH0,
            source_bit=DigOutSourceBit.SPCM_XMODE_DIGOUTSRC_BIT15,
        )
#   def set_dig_out_settings( self, dig_out_settings: spectrumdevice.settings.io_lines.DigOutIOLineModeSettings ) -> None:
View Source
    def set_dig_out_settings(self, dig_out_settings: DigOutIOLineModeSettings) -> None:
        self._dig_out_settings = dig_out_settings
Inherited Members
spectrumdevice.devices.abstract_device.abstract_spectrum_io_line.AbstractSpectrumIOLine
mode
set_mode
pulse_generator
AbstractSpectrumChannel
name
write_to_parent_device_register
read_parent_device_register
View Source
class PulseGenerator(PulseGeneratorInterface):
    """Class for controlling pulse generators associated with IO lines (requires firmware option be enabled)."""

    def __init__(self, parent: SpectrumIOLineInterface):
        self._parent_io_line = parent
        # last char of IO line name is IO line chanel number, which is used to set pulse generator number
        self._number = int(parent.name.name[-1])
        available_advanced_features = decode_advanced_card_features(
            self.read_parent_device_register(SPC_PCIEXTFEATURES)
        )
        if AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN not in available_advanced_features:
            raise SpectrumFeatureNotSupportedByCard(
                call_description=self.__str__() + ".__init__()",
                message="Pulse generator firmware option not installed on device.",
            )
        self._multiplexer_1 = PulseGeneratorMultiplexer1(parent=self)
        self._multiplexer_2 = PulseGeneratorMultiplexer2(parent=self)

    def configure_output(
        self, settings: PulseGeneratorOutputSettings, coerce: bool = True
    ) -> PulseGeneratorOutputSettings:
        """Configure all pulse generator output settings at once. By default, all values are coerced to the
        nearest values allowed by the hardware, and the coerced values are returned."""
        self.set_output_inversion(settings.output_inversion)
        coerced_settings = PulseGeneratorOutputSettings(
            period_in_seconds=self.set_period_in_seconds(settings.period_in_seconds, coerce=coerce),
            duty_cycle=self.set_duty_cycle(settings.duty_cycle, coerce=coerce),
            num_pulses=self.set_num_pulses(settings.num_pulses, coerce=coerce),
            delay_in_seconds=self.set_delay_in_seconds(settings.delay_in_seconds, coerce=coerce),
            output_inversion=settings.output_inversion,
        )
        self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)
        return coerced_settings

    def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None:
        """Configure all pulse generator trigger settings at once."""
        self.set_trigger_mode(settings.trigger_mode)
        self.set_trigger_detection_mode(settings.trigger_detection_mode)
        self.multiplexer_1.set_trigger_source(settings.multiplexer_1_source)
        self.multiplexer_2.set_trigger_source(settings.multiplexer_2_source)
        self.multiplexer_1.set_output_inversion(settings.multiplexer_1_output_inversion)
        self.multiplexer_2.set_output_inversion(settings.multiplexer_2_output_inversion)
        self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)

    def force_trigger(self) -> None:
        """Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'."""
        if (
            self._multiplexer_2.trigger_source
            != PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE
        ):
            raise SpectrumIOError("Force trigger can only be used if trigger source (mux 2) is set to 'software'")
        self.write_to_parent_device_register(SPC_XIO_PULSEGEN_COMMAND, SPCM_PULSEGEN_CMD_FORCE)

    @property
    def number(self) -> int:
        """The index of the pulse generator. Corresponds to the index of the IO line to which it belongs."""
        return self._number

    @property
    def multiplexer_1(self) -> PulseGeneratorMultiplexer1:
        """Change the trigger source of this multiplexer to control when it is possible to trigger the pulse generator."""
        return self._multiplexer_1

    @property
    def multiplexer_2(self) -> PulseGeneratorMultiplexer2:
        """Change the trigger source of this multiplexer to control how the pulse generator is triggered."""
        return self._multiplexer_2

    def read_parent_device_register(
        self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO
    ) -> int:
        return self._parent_io_line.read_parent_device_register(spectrum_register, length)

    def write_to_parent_device_register(
        self,
        spectrum_register: int,
        value: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> None:
        self._parent_io_line.write_to_parent_device_register(spectrum_register, value, length)

    def _convert_clock_cycles_to_seconds(self, clock_cycles: int) -> float:
        return clock_cycles * self.clock_period_in_seconds

    def _convert_seconds_to_clock_cycles(self, seconds: float) -> float:
        # round to nearest milli-cycle to avoid floating point precision problems
        return round(seconds * self.clock_rate_in_hz * 1e3) / 1e3

    def _get_enabled_pulse_generator_ids(self) -> list[int]:
        return decode_enabled_pulse_gens(self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE))

    @property
    def clock_rate_in_hz(self) -> int:
        """The current pulse generator clock rate. Affected by the sample rate of the parent card, and the number of
        channels enabled. Effects the precision with which pulse timings can be set, and their min and max values."""
        return self.read_parent_device_register(SPC_XIO_PULSEGEN_CLOCK)

    @property
    def clock_period_in_seconds(self) -> float:
        """The reciprocal of the clock rate, in seconds."""
        return 1 / self.clock_rate_in_hz

    @property
    def enabled(self) -> bool:
        """True if the pulse generator is currently enabled."""
        return PULSE_GEN_ENABLE_COMMANDS[self._number] in self._get_enabled_pulse_generator_ids()

    def enable(self) -> None:
        """Enable the pulse generator. Note that the mode of the parent IO Line must also be set to
        IOLineMOdO.SPCM_XMODE_PULSEGEN."""
        current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE)
        new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], True)
        self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)

    def disable(self) -> None:
        """Disable the pulse generator."""
        current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE)
        new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], False)
        self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)

    @property
    def output_inversion(self) -> bool:
        currently_enabled_config_options = decode_pulse_gen_config(
            self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number])
        )
        return SPCM_PULSEGEN_CONFIG_INVERT in currently_enabled_config_options

    def set_output_inversion(self, inverted: bool) -> None:
        current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number])
        new_register_value = toggle_bitmap_value(current_register_value, SPCM_PULSEGEN_CONFIG_INVERT, inverted)
        self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)

    @property
    def trigger_detection_mode(self) -> PulseGeneratorTriggerDetectionMode:
        """How the pulse generator trigger circuit responds to a trigger signal, .e.g rising edge..."""
        currently_enabled_config_options = decode_pulse_gen_config(
            self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number])
        )
        if PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value in currently_enabled_config_options:
            return PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH
        else:
            return PulseGeneratorTriggerDetectionMode.RISING_EDGE

    def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None:
        """e.g. rising edge, high-voltage..."""
        current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number])
        high_voltage_mode_value = PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value
        new_register_value = toggle_bitmap_value(
            current_register_value,
            high_voltage_mode_value,
            mode == PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH,
        )
        self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)

    @property
    def trigger_mode(self) -> PulseGeneratorTriggerMode:
        """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information."""
        return PulseGeneratorTriggerMode(
            self.read_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number])
        )

    def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None:
        """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information."""
        self.write_to_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number], mode.value)

    @property
    def min_allowed_period_in_seconds(self) -> float:
        """Minimum allowed pulse period in seconds, given the current clock rate."""
        reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MIN)
        reg_val = 0 if reg_val < 0 else reg_val
        return self._convert_clock_cycles_to_seconds(reg_val)

    @property
    def max_allowed_period_in_seconds(self) -> float:
        """Maximum allowed pulse period in seconds, given the current clock rate."""
        reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_MAX)
        reg_val = iinfo(int16).max if reg_val < 0 else reg_val
        return self._convert_clock_cycles_to_seconds(reg_val)

    @property
    def _allowed_period_step_size_in_clock_cycles(self) -> int:
        return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLEN_STEP)

    @property
    def allowed_period_step_size_in_seconds(self) -> float:
        """Resolution with which the pulse period can be set, given the current clock rate."""
        return self._convert_clock_cycles_to_seconds(self._allowed_period_step_size_in_clock_cycles)

    @property
    def period_in_seconds(self) -> float:
        """The pulse length in seconds, including both the high-voltage and low-voltage sections."""
        return self._convert_clock_cycles_to_seconds(
            self.read_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number])
        )

    def set_period_in_seconds(self, period: float, coerce: bool = False) -> float:
        """Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value
        will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and
        allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is
        requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of
        active channels and the sample rate."""
        period_in_clock_cycles = self._convert_seconds_to_clock_cycles(period)
        coerced_period = _coerce_fractional_value_to_allowed_integer(
            period_in_clock_cycles,
            int(self._convert_seconds_to_clock_cycles(self.min_allowed_period_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.max_allowed_period_in_seconds)),
            self._allowed_period_step_size_in_clock_cycles,
        )
        if not coerce and coerced_period != period_in_clock_cycles:
            raise SpectrumInvalidParameterValue(
                "pulse generator period",
                period,
                self.min_allowed_period_in_seconds,
                self.max_allowed_period_in_seconds,
                self.allowed_period_step_size_in_seconds,
            )

        self.write_to_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number], int(coerced_period))
        return self._convert_clock_cycles_to_seconds(coerced_period)

    @property
    def min_allowed_high_voltage_duration_in_seconds(self) -> float:
        """Minimum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate."""
        reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MIN)
        reg_val = 0 if reg_val < 0 else reg_val
        return self._convert_clock_cycles_to_seconds(reg_val)

    @property
    def max_allowed_high_voltage_duration_in_seconds(self) -> float:
        """Maximum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate."""
        reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_MAX)
        reg_val = iinfo(int16).max if reg_val < 0 else reg_val
        return self._convert_clock_cycles_to_seconds(reg_val)

    @property
    def _allowed_high_voltage_duration_step_size_in_clock_cycles(self) -> int:
        return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILHIGH_STEP)

    @property
    def allowed_high_voltage_duration_step_size_in_seconds(self) -> float:
        """Resolution with which the high-voltage duration can be set, in seconds, given the current clock rate."""
        return self._convert_clock_cycles_to_seconds(self._allowed_high_voltage_duration_step_size_in_clock_cycles)

    @property
    def duration_of_high_voltage_in_seconds(self) -> float:
        """The length of the high-voltage part of a pulse, in seconds. Equal to the pulse duration * duty cycle."""
        return self._convert_clock_cycles_to_seconds(
            self.read_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number])
        )

    @property
    def duration_of_low_voltage_in_seconds(self) -> float:
        """The length of the low-voltage part of a pulse, in seconds. Equal to the pulse duration * (1 - duty cycle)."""
        return self.period_in_seconds - self.duration_of_high_voltage_in_seconds

    @property
    def duty_cycle(self) -> float:
        """The ratio between the high-voltage and low-voltage parts of the pulse."""
        return self.duration_of_high_voltage_in_seconds / self.period_in_seconds

    def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float:
        """Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and
        use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested
        an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active
        channels and the sample rate.
        """
        requested_high_v_duration_in_clock_cycles = self._convert_seconds_to_clock_cycles(
            self.period_in_seconds * duty_cycle
        )
        clipped_duration = _coerce_fractional_value_to_allowed_integer(
            requested_high_v_duration_in_clock_cycles,
            int(self._convert_seconds_to_clock_cycles(self.min_allowed_high_voltage_duration_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.max_allowed_high_voltage_duration_in_seconds)),
            self._allowed_high_voltage_duration_step_size_in_clock_cycles,
        )
        if not coerce and clipped_duration != requested_high_v_duration_in_clock_cycles:
            raise SpectrumInvalidParameterValue(
                "high-voltage duration",
                self.period_in_seconds * duty_cycle,
                self.min_allowed_high_voltage_duration_in_seconds,
                self.max_allowed_high_voltage_duration_in_seconds,
                self.allowed_high_voltage_duration_step_size_in_seconds,
            )
        self.write_to_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number], clipped_duration)
        return self._convert_clock_cycles_to_seconds(clipped_duration) / self.period_in_seconds

    @property
    def min_allowed_pulses(self) -> int:
        """Minimum allowed number of pulses to transmit."""
        return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MIN)

    @property
    def max_allowed_pulses(self) -> int:
        """Maximum allowed number of pulses to transmit."""
        reg_val = self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_MAX)
        # my card has this register set to -2, which I assume means no limit (can't work it out from the docs)
        return reg_val if reg_val > 0 else iinfo(int16).max

    @property
    def allowed_num_pulses_step_size(self) -> int:
        """Resolution with which the number of pulses to transmit can be set."""
        return self.read_parent_device_register(SPC_XIO_PULSEGEN_AVAILLOOPS_STEP)

    @property
    def num_pulses(self) -> int:
        """The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously."""
        return self.read_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number])

    def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int:
        """Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated
        continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses,
        max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a
        SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested."""

        num_pulses = max(0, num_pulses)  # make negative value 0 to enable continuous pulse generation

        coerced_num_pulses = _coerce_fractional_value_to_allowed_integer(
            float(num_pulses), self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size
        )

        if not coerce and coerced_num_pulses != num_pulses:
            raise SpectrumInvalidParameterValue(
                "number of pulses",
                num_pulses,
                self.min_allowed_pulses,
                self.max_allowed_pulses,
                self.allowed_num_pulses_step_size,
            )

        self.write_to_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number], coerced_num_pulses)
        return coerced_num_pulses

    @property
    def min_allowed_delay_in_seconds(self) -> float:
        """Minimum allowed delay between the trigger event and pulse generation, in seconds, given the current clock
        rate."""
        reg_value = self.read_parent_device_register(602007)  # SPC_XIO_PULSEGEN_AVAILDELAY_MIN not in regs.py
        reg_value = 0 if reg_value == -1 else reg_value
        return self._convert_clock_cycles_to_seconds(reg_value)

    @property
    def max_allowed_delay_in_seconds(self) -> float:
        """Maximum allowed delay between the trigger event and pulse generation, in seconds, given the current clock
        rate."""
        reg_value = self.read_parent_device_register(602008)  # SPC_XIO_PULSEGEN_AVAILDELAY_MAX not in regs.py
        reg_value = iinfo(int16).max if reg_value == -1 else reg_value
        return self._convert_clock_cycles_to_seconds(reg_value)

    @property
    def allowed_delay_step_size_in_seconds(self) -> float:
        """resolution with which the delay between the trigger event and pulse generation can be set, in seconds, given
        the current clock rate."""
        return self._convert_clock_cycles_to_seconds(
            self.read_parent_device_register(602009)  # SPC_XIO_PULSEGEN_AVAILDELAY_STEP not in regs.py
        )

    @property
    def delay_in_seconds(self) -> float:
        """The delay between the trigger and the first pulse transmission"""
        return self._convert_clock_cycles_to_seconds(
            self.read_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number])
        )

    def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float:
        """Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is
        coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and
        allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised
        if the requested value is invalid."""

        requested_delay_in_clock_cycles = self._convert_seconds_to_clock_cycles(delay_in_seconds)
        clipped_delay_in_clock_cycles = _coerce_fractional_value_to_allowed_integer(
            requested_delay_in_clock_cycles,
            int(self._convert_seconds_to_clock_cycles(self.min_allowed_delay_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.max_allowed_delay_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.allowed_delay_step_size_in_seconds)),
        )

        if not coerce and clipped_delay_in_clock_cycles != requested_delay_in_clock_cycles:
            raise SpectrumInvalidParameterValue(
                "delay in seconds",
                requested_delay_in_clock_cycles,
                self.min_allowed_delay_in_seconds,
                self.max_allowed_delay_in_seconds,
                self.allowed_delay_step_size_in_seconds,
            )

        self.write_to_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number], clipped_delay_in_clock_cycles)
        return self._convert_clock_cycles_to_seconds(clipped_delay_in_clock_cycles)

    def __str__(self) -> str:
        return f"Pulse generator {self._number} of {self._parent_io_line}."

Class for controlling pulse generators associated with IO lines (requires firmware option be enabled).

View Source
    def __init__(self, parent: SpectrumIOLineInterface):
        self._parent_io_line = parent
        # last char of IO line name is IO line chanel number, which is used to set pulse generator number
        self._number = int(parent.name.name[-1])
        available_advanced_features = decode_advanced_card_features(
            self.read_parent_device_register(SPC_PCIEXTFEATURES)
        )
        if AdvancedCardFeature.SPCM_FEAT_EXTFW_PULSEGEN not in available_advanced_features:
            raise SpectrumFeatureNotSupportedByCard(
                call_description=self.__str__() + ".__init__()",
                message="Pulse generator firmware option not installed on device.",
            )
        self._multiplexer_1 = PulseGeneratorMultiplexer1(parent=self)
        self._multiplexer_2 = PulseGeneratorMultiplexer2(parent=self)
View Source
    def configure_output(
        self, settings: PulseGeneratorOutputSettings, coerce: bool = True
    ) -> PulseGeneratorOutputSettings:
        """Configure all pulse generator output settings at once. By default, all values are coerced to the
        nearest values allowed by the hardware, and the coerced values are returned."""
        self.set_output_inversion(settings.output_inversion)
        coerced_settings = PulseGeneratorOutputSettings(
            period_in_seconds=self.set_period_in_seconds(settings.period_in_seconds, coerce=coerce),
            duty_cycle=self.set_duty_cycle(settings.duty_cycle, coerce=coerce),
            num_pulses=self.set_num_pulses(settings.num_pulses, coerce=coerce),
            delay_in_seconds=self.set_delay_in_seconds(settings.delay_in_seconds, coerce=coerce),
            output_inversion=settings.output_inversion,
        )
        self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)
        return coerced_settings

Configure all pulse generator output settings at once. By default, all values are coerced to the nearest values allowed by the hardware, and the coerced values are returned.

#   def configure_trigger( self, settings: spectrumdevice.settings.pulse_generator.PulseGeneratorTriggerSettings ) -> None:
View Source
    def configure_trigger(self, settings: PulseGeneratorTriggerSettings) -> None:
        """Configure all pulse generator trigger settings at once."""
        self.set_trigger_mode(settings.trigger_mode)
        self.set_trigger_detection_mode(settings.trigger_detection_mode)
        self.multiplexer_1.set_trigger_source(settings.multiplexer_1_source)
        self.multiplexer_2.set_trigger_source(settings.multiplexer_2_source)
        self.multiplexer_1.set_output_inversion(settings.multiplexer_1_output_inversion)
        self.multiplexer_2.set_output_inversion(settings.multiplexer_2_output_inversion)
        self.write_to_parent_device_register(SPC_M2CMD, M2CMD_CARD_WRITESETUP)

Configure all pulse generator trigger settings at once.

#   def force_trigger(self) -> None:
View Source
    def force_trigger(self) -> None:
        """Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'."""
        if (
            self._multiplexer_2.trigger_source
            != PulseGeneratorMultiplexer2TriggerSource.SPCM_PULSEGEN_MUX2_SRC_SOFTWARE
        ):
            raise SpectrumIOError("Force trigger can only be used if trigger source (mux 2) is set to 'software'")
        self.write_to_parent_device_register(SPC_XIO_PULSEGEN_COMMAND, SPCM_PULSEGEN_CMD_FORCE)

Generates a pulse when the pulse generator trigger source (mux 2) is set to 'software'.

#   number: int

The index of the pulse generator. Corresponds to the index of the IO line to which it belongs.

Change the trigger source of this multiplexer to control when it is possible to trigger the pulse generator.

Change the trigger source of this multiplexer to control how the pulse generator is triggered.

#   def read_parent_device_register( self, spectrum_register: int, length: spectrumdevice.settings.SpectrumRegisterLength = THIRTY_TWO ) -> int:
View Source
    def read_parent_device_register(
        self, spectrum_register: int, length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO
    ) -> int:
        return self._parent_io_line.read_parent_device_register(spectrum_register, length)
#   def write_to_parent_device_register( self, spectrum_register: int, value: int, length: spectrumdevice.settings.SpectrumRegisterLength = THIRTY_TWO ) -> None:
View Source
    def write_to_parent_device_register(
        self,
        spectrum_register: int,
        value: int,
        length: SpectrumRegisterLength = SpectrumRegisterLength.THIRTY_TWO,
    ) -> None:
        self._parent_io_line.write_to_parent_device_register(spectrum_register, value, length)
#   clock_rate_in_hz: int

The current pulse generator clock rate. Affected by the sample rate of the parent card, and the number of channels enabled. Effects the precision with which pulse timings can be set, and their min and max values.

#   clock_period_in_seconds: float

The reciprocal of the clock rate, in seconds.

#   enabled: bool

True if the pulse generator is currently enabled.

#   def enable(self) -> None:
View Source
    def enable(self) -> None:
        """Enable the pulse generator. Note that the mode of the parent IO Line must also be set to
        IOLineMOdO.SPCM_XMODE_PULSEGEN."""
        current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE)
        new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], True)
        self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)

Enable the pulse generator. Note that the mode of the parent IO Line must also be set to IOLineMOdO.SPCM_XMODE_PULSEGEN.

#   def disable(self) -> None:
View Source
    def disable(self) -> None:
        """Disable the pulse generator."""
        current_register_value = self.read_parent_device_register(SPC_XIO_PULSEGEN_ENABLE)
        new_register_value = toggle_bitmap_value(current_register_value, PULSE_GEN_ENABLE_COMMANDS[self._number], False)
        self.write_to_parent_device_register(SPC_XIO_PULSEGEN_ENABLE, new_register_value)

Disable the pulse generator.

#   output_inversion: bool
#   def set_output_inversion(self, inverted: bool) -> None:
View Source
    def set_output_inversion(self, inverted: bool) -> None:
        current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number])
        new_register_value = toggle_bitmap_value(current_register_value, SPCM_PULSEGEN_CONFIG_INVERT, inverted)
        self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)

How the pulse generator trigger circuit responds to a trigger signal, .e.g rising edge...

#   def set_trigger_detection_mode( self, mode: spectrumdevice.settings.pulse_generator.PulseGeneratorTriggerDetectionMode ) -> None:
View Source
    def set_trigger_detection_mode(self, mode: PulseGeneratorTriggerDetectionMode) -> None:
        """e.g. rising edge, high-voltage..."""
        current_register_value = self.read_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number])
        high_voltage_mode_value = PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH.value
        new_register_value = toggle_bitmap_value(
            current_register_value,
            high_voltage_mode_value,
            mode == PulseGeneratorTriggerDetectionMode.SPCM_PULSEGEN_CONFIG_HIGH,
        )
        self.write_to_parent_device_register(PULSE_GEN_CONFIG_COMMANDS[self._number], new_register_value)

e.g. rising edge, high-voltage...

Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.

#   def set_trigger_mode( self, mode: spectrumdevice.settings.pulse_generator.PulseGeneratorTriggerMode ) -> None:
View Source
    def set_trigger_mode(self, mode: PulseGeneratorTriggerMode) -> None:
        """Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information."""
        self.write_to_parent_device_register(PULSE_GEN_TRIGGER_MODE_COMMANDS[self._number], mode.value)

Gated, triggered or single-shot. See PulseGeneratorTriggerMode for more information.

#   min_allowed_period_in_seconds: float

Minimum allowed pulse period in seconds, given the current clock rate.

#   max_allowed_period_in_seconds: float

Maximum allowed pulse period in seconds, given the current clock rate.

#   allowed_period_step_size_in_seconds: float

Resolution with which the pulse period can be set, given the current clock rate.

#   period_in_seconds: float

The pulse length in seconds, including both the high-voltage and low-voltage sections.

#   def set_period_in_seconds(self, period: float, coerce: bool = False) -> float:
View Source
    def set_period_in_seconds(self, period: float, coerce: bool = False) -> float:
        """Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value
        will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and
        allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is
        requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of
        active channels and the sample rate."""
        period_in_clock_cycles = self._convert_seconds_to_clock_cycles(period)
        coerced_period = _coerce_fractional_value_to_allowed_integer(
            period_in_clock_cycles,
            int(self._convert_seconds_to_clock_cycles(self.min_allowed_period_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.max_allowed_period_in_seconds)),
            self._allowed_period_step_size_in_clock_cycles,
        )
        if not coerce and coerced_period != period_in_clock_cycles:
            raise SpectrumInvalidParameterValue(
                "pulse generator period",
                period,
                self.min_allowed_period_in_seconds,
                self.max_allowed_period_in_seconds,
                self.allowed_period_step_size_in_seconds,
            )

        self.write_to_parent_device_register(PULSE_GEN_PULSE_PERIOD_COMMANDS[self._number], int(coerced_period))
        return self._convert_clock_cycles_to_seconds(coerced_period)

Set the time between the start of each generated pulse in seconds. If coerce is True, the requested value will be coerced according to min_allowed_period_in_seconds, max_allowed_period_in_seconds and allowed_period_step_size_in_seconds and the coerced value is returned. Otherwise, when an invalid value is requested a SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.

#   min_allowed_high_voltage_duration_in_seconds: float

Minimum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.

#   max_allowed_high_voltage_duration_in_seconds: float

Maximum allowed duration of the high-voltage part of the pulse in seconds, given the current clock rate.

#   allowed_high_voltage_duration_step_size_in_seconds: float

Resolution with which the high-voltage duration can be set, in seconds, given the current clock rate.

#   duration_of_high_voltage_in_seconds: float

The length of the high-voltage part of a pulse, in seconds. Equal to the pulse duration * duty cycle.

#   duration_of_low_voltage_in_seconds: float

The length of the low-voltage part of a pulse, in seconds. Equal to the pulse duration * (1 - duty cycle).

#   duty_cycle: float

The ratio between the high-voltage and low-voltage parts of the pulse.

#   def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float:
View Source
    def set_duty_cycle(self, duty_cycle: float, coerce: bool = False) -> float:
        """Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and
        use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested
        an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active
        channels and the sample rate.
        """
        requested_high_v_duration_in_clock_cycles = self._convert_seconds_to_clock_cycles(
            self.period_in_seconds * duty_cycle
        )
        clipped_duration = _coerce_fractional_value_to_allowed_integer(
            requested_high_v_duration_in_clock_cycles,
            int(self._convert_seconds_to_clock_cycles(self.min_allowed_high_voltage_duration_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.max_allowed_high_voltage_duration_in_seconds)),
            self._allowed_high_voltage_duration_step_size_in_clock_cycles,
        )
        if not coerce and clipped_duration != requested_high_v_duration_in_clock_cycles:
            raise SpectrumInvalidParameterValue(
                "high-voltage duration",
                self.period_in_seconds * duty_cycle,
                self.min_allowed_high_voltage_duration_in_seconds,
                self.max_allowed_high_voltage_duration_in_seconds,
                self.allowed_high_voltage_duration_step_size_in_seconds,
            )
        self.write_to_parent_device_register(PULSE_GEN_HIGH_DURATION_COMMANDS[self._number], clipped_duration)
        return self._convert_clock_cycles_to_seconds(clipped_duration) / self.period_in_seconds

Set the duty cycle. If coerce is True, the requested value will be coerced to be within allowed range and use allowed step size and then the coerced value wll be returned. Otherwise, when an invalid value is requested an SpectrumInvalidParameterValue will be raised. The allowed values are affected by the number of active channels and the sample rate.

#   min_allowed_pulses: int

Minimum allowed number of pulses to transmit.

#   max_allowed_pulses: int

Maximum allowed number of pulses to transmit.

#   allowed_num_pulses_step_size: int

Resolution with which the number of pulses to transmit can be set.

#   num_pulses: int

The number of pulses to generate on receipt of a trigger. If 0, pulses will be generated continuously.

#   def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int:
View Source
    def set_num_pulses(self, num_pulses: int, coerce: bool = False) -> int:
        """Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated
        continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses,
        max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a
        SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested."""

        num_pulses = max(0, num_pulses)  # make negative value 0 to enable continuous pulse generation

        coerced_num_pulses = _coerce_fractional_value_to_allowed_integer(
            float(num_pulses), self.min_allowed_pulses, self.max_allowed_pulses, self.allowed_num_pulses_step_size
        )

        if not coerce and coerced_num_pulses != num_pulses:
            raise SpectrumInvalidParameterValue(
                "number of pulses",
                num_pulses,
                self.min_allowed_pulses,
                self.max_allowed_pulses,
                self.allowed_num_pulses_step_size,
            )

        self.write_to_parent_device_register(PULSE_GEN_NUM_REPEATS_COMMANDS[self._number], coerced_num_pulses)
        return coerced_num_pulses

Set the number of pulses to generate on receipt of a trigger. If 0 or negative, pulses will be generated continuously. If coerce if True, the requested number of pulses will be coerced according to min_allowed_pulses, max_allowed_pulses and allowed_num_pulses_step_size and the coerced value is returned. Otherwise, a SpectrumInvalidParameterValue exception is raised if an invalid number of pulses is requested.

#   min_allowed_delay_in_seconds: float

Minimum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.

#   max_allowed_delay_in_seconds: float

Maximum allowed delay between the trigger event and pulse generation, in seconds, given the current clock rate.

#   allowed_delay_step_size_in_seconds: float

resolution with which the delay between the trigger event and pulse generation can be set, in seconds, given the current clock rate.

#   delay_in_seconds: float

The delay between the trigger and the first pulse transmission

#   def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float:
View Source
    def set_delay_in_seconds(self, delay_in_seconds: float, coerce: bool = False) -> float:
        """Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is
        coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and
        allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised
        if the requested value is invalid."""

        requested_delay_in_clock_cycles = self._convert_seconds_to_clock_cycles(delay_in_seconds)
        clipped_delay_in_clock_cycles = _coerce_fractional_value_to_allowed_integer(
            requested_delay_in_clock_cycles,
            int(self._convert_seconds_to_clock_cycles(self.min_allowed_delay_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.max_allowed_delay_in_seconds)),
            int(self._convert_seconds_to_clock_cycles(self.allowed_delay_step_size_in_seconds)),
        )

        if not coerce and clipped_delay_in_clock_cycles != requested_delay_in_clock_cycles:
            raise SpectrumInvalidParameterValue(
                "delay in seconds",
                requested_delay_in_clock_cycles,
                self.min_allowed_delay_in_seconds,
                self.max_allowed_delay_in_seconds,
                self.allowed_delay_step_size_in_seconds,
            )

        self.write_to_parent_device_register(PULSE_GEN_DELAY_COMMANDS[self._number], clipped_delay_in_clock_cycles)
        return self._convert_clock_cycles_to_seconds(clipped_delay_in_clock_cycles)

Set the delay between the trigger and the first pulse transmission. If coerce=True, the requested value is coerced according to min_allowed_delay_in_seconds, max_allowed_delay_in_seconds and allowed_delay_step_size_in_seconds, and then the coerced value is returned. Otherwise, an ValueError is raised if the requested value is invalid.