pymagewell

pymagewell is a Python library for interfacing with Magewell ProCapture frame grabbers.

It is based on (and includes) Magewell's Windows SDK and is therefore Windows only. However, it provides a mock class that for testing and development that does not depend on the SDK's Windows .dll files, so pymagwell can also be installed on macOS and Linux.

Installation

pip install pymagewell

Example of use

A full working example is provided in example_script.py.

 1# Christian Baker, King's College London
 2# Copyright (c) 2022 School of Biomedical Engineering & Imaging Sciences, King's College London
 3# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT.
 4"""
 5`pymagewell` is a Python library for interfacing with Magewell ProCapture frame grabbers.
 6
 7It is based on (and includes) Magewell's Windows SDK and is therefore Windows only. However, it provides a mock class
 8that for testing and development that does not depend on the SDK's Windows .dll files, so `pymagwell` can also be
 9installed on macOS and Linux.
10
11* [GitHub page](https://github.com/KCL-BMEIS/pymagewell)
12* [API reference documentation](https://kcl-bmeis.github.io/pymagewell/)
13* [PyPI page](https://pypi.org/project/pymagewell/)
14
15### Installation
16
17```bash
18pip install pymagewell
19```
20
21### Example of use
22
23A full working example is provided in
24[`example_script.py`](https://github.com/KCL-BMEIS/pymagewell/blob/main/example_script.py).
25
26"""
27from . import _version
28from .pro_capture_controller import ProCaptureController
29
30from .pro_capture_device import ProCaptureDevice
31from .pro_capture_device.device_settings import ProCaptureSettings, TransferMode, ImageSizeInPixels, ColourFormat
32from .pro_capture_device.mock_pro_capture_device import MockProCaptureDevice
33
34__version__ = _version.get_versions()["version"]  # type: ignore
35
36__all__ = [
37    "ProCaptureDevice",
38    "MockProCaptureDevice",
39    "ProCaptureController",
40    "ProCaptureSettings",
41    "TransferMode",
42    "ImageSizeInPixels",
43    "ColourFormat",
44]
class ProCaptureDevice(pymagewell.pro_capture_device.pro_capture_device_impl.ProCaptureDeviceImpl, mwcapture.libmwcapture.mw_capture):
 53class ProCaptureDevice(ProCaptureDeviceImpl, mw_capture):
 54    """`ProCaptureDevice` represents a physical Magewell ProCapture device. It is intended to be controlled by a
 55    `ProCaptureDeviceController`.
 56
 57    ProCaptureDevice configures and initiates frame acquisition and transfer. It is responsible for constructing and
 58    registering events with the Magewell driver. It also provides methods for accessing information about the video
 59    source connected to the device. ProCaptureDevice inherits from the `mw_capture` class provided by Magwell's
 60    Windows SDK.
 61    """
 62
 63    def __init__(self, settings: ProCaptureSettings):
 64        """
 65        Args:
 66            settings: A `ProCaptureSettings` object containing the settings for the device.
 67        """
 68        ProCaptureDeviceImpl.__init__(self, settings)
 69        mw_capture.__init__(self)  # type: ignore
 70
 71        self.mw_capture_init_instance()  # type: ignore
 72        self.mw_refresh_device()  # type: ignore
 73        self._channel = _create_channel(self)
 74
 75        self._device_init_time = DeviceInitTime(
 76            device_time_in_s=self._get_device_time_in_s(), system_time_as_datetime=datetime.now()
 77        )
 78
 79        self._timer = _FrameTimer(self, self._channel, self._register_timer_event(TimerEvent()))
 80
 81        self._signal_change_event = cast(SignalChangeEvent, self._register_event(SignalChangeEvent()))
 82
 83        self._frame_buffered_event = FrameBufferedEvent()
 84        self._frame_buffering_event = FrameBufferingEvent()
 85
 86        if self._settings.transfer_mode == TransferMode.NORMAL:
 87            self._frame_buffered_event = cast(FrameBufferedEvent, self._register_event(self._frame_buffered_event))
 88            self._transfer_complete_event: TransferCompleteEvent = FrameTransferCompleteEvent()
 89
 90        elif self._settings.transfer_mode == TransferMode.LOW_LATENCY:
 91            self._frame_buffering_event = cast(FrameBufferingEvent, self._register_event(self._frame_buffering_event))
 92            self._transfer_complete_event = PartialFrameTransferCompleteEvent()
 93
 94        elif self._settings.transfer_mode.TIMER:
 95            self._transfer_complete_event = FrameTransferCompleteEvent()
 96
 97    @property
 98    def events(self) -> ProCaptureEvents:
 99        """events property
100        Returns:
101            A `ProCaptureEvents` object containing handles to the events generated by the device during frame grabbing.
102        """
103        return ProCaptureEvents(
104            transfer_complete=self._transfer_complete_event,
105            signal_change=self._signal_change_event,
106            frame_buffered=self._frame_buffered_event,
107            frame_buffering=self._frame_buffering_event,
108            timer_event=self._timer.event,
109        )
110
111    def _register_event(self, event: RegisterableEvent) -> RegisterableEvent:
112        notification_handle = self.mw_register_notify(self._channel, event.win32_event, event.registration_token)  # type: ignore
113        event.register(Notification(notification_handle, self._channel))
114        return event
115
116    def _register_timer_event(self, event: TimerEvent) -> TimerEvent:
117        """The _FrameTimer class handles constructing TimerEvents and registering them here."""
118        notification_handle = self.mw_register_timer(self._channel, event.win32_event)  # type: ignore
119        event.register(Notification(notification_handle, self._channel))
120        return event
121
122    def schedule_timer_event(self) -> None:
123        """For use if the TransferMode is TransferMode.TIMER, this method schedules a timer event to trigger the transfer of the
124        next frame."""
125        self._timer.schedule_timer_event(self._get_device_time_in_ticks())
126
127    @property
128    def buffer_status(self) -> OnDeviceBufferStatus:
129        """buffer_status property
130        Returns:
131            A `OnDeviceBufferStatus` object containing information about the device's buffer.
132        """
133        buffer_info = mwcap_video_buffer_info()
134        self.mw_get_video_buffer_info(self._channel, buffer_info)  # type: ignore
135        return OnDeviceBufferStatus.from_mwcap_video_buffer_info(buffer_info)
136
137    @property
138    def frame_info(self) -> FrameInfo:
139        """frame_info property
140        Returns:
141            A `FrameInfo` object containing information about the most recent frame.
142        """
143        frame_info = mwcap_video_frame_info()
144        self.mw_get_video_frame_info(self._channel, self.buffer_status.last_buffered_frame_index, frame_info)  # type: ignore
145        return FrameInfo.from_mwcap_video_frame_info(frame_info, self._device_init_time)
146
147    @property
148    def signal_status(self) -> SignalStatus:
149        """signal_status property
150        Returns:
151            A `SignalStatus` object containing information about the signal connected to the device.
152        """
153        mw_signal_status = mw_video_signal_status()
154        self.mw_get_video_signal_status(self._channel, mw_signal_status)  # type: ignore
155        return SignalStatus.from_mw_video_signal_status(mw_signal_status)
156
157    def start_grabbing(self) -> None:
158        """Starts the hardware acquiring frames."""
159        start_capture_result = self.mw_start_video_capture(self._channel, self.events.transfer_complete.win32_event)  # type: ignore
160        if start_capture_result != MW_SUCCEEDED:
161            raise ProCaptureError(f"Start capture failed (error code {start_capture_result}).")
162        if self.signal_status.state == SignalState.NONE:
163            logger.warning("Input signal status: None")
164        elif self.signal_status.state == SignalState.UNSUPPORTED:
165            logger.warning("Input signal status: Unsupported")
166        elif self.signal_status.state == SignalState.LOCKING:
167            logger.info("Input signal status: Locking")
168        elif self.signal_status.state == SignalState.LOCKED:
169            logger.info("Input signal status: Locked")
170
171        # Exit if signal not locked
172        if self.signal_status.state != SignalState.LOCKED:
173            self.mw_stop_video_capture(self._channel)  # type: ignore
174            self.shutdown()
175            raise ProCaptureError("Signal not locked. It is likely that no video signal is present.")
176
177    def stop_grabbing(self) -> None:
178        """Stops the hardware acquiring frames."""
179        self.mw_stop_video_capture(self._channel)  # type: ignore
180
181    @property
182    def transfer_status(self) -> TransferStatus:
183        """transfer_status property
184        Returns:
185            A `TransferStatus` object containing information about the transfer of frames from the device to the PC,
186            including the number of frames transferred, and the number of lines of the current frame that have been
187            transferred.
188        """
189        mw_capture_status = mw_video_capture_status()
190        self.mw_get_video_capture_status(self._channel, mw_capture_status)  # type: ignore
191        return TransferStatus.from_mw_video_capture_status(mw_capture_status)
192
193    def _get_device_time_in_ticks(self) -> mw_device_time:
194        """Read a timestamp from the device."""
195        time = mw_device_time()  # type: ignore
196        result = self.mw_get_device_time(self._channel, time)  # type: ignore
197        if result != MW_SUCCEEDED:
198            raise ProCaptureError("Failed to read time from device")
199        else:
200            return time
201
202    def _get_device_time_in_s(self) -> float:
203        return int(self._get_device_time_in_ticks().m_ll_device_time.value) * DEVICE_CLOCK_TICK_PERIOD_IN_SECONDS
204
205    def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime:
206        """start_a_frame_transfer starts the transfer of lines from the device to a buffer in PC memory.
207        Args:
208            frame_buffer (Array[c_char]): A buffer in PC memory to which the lines of the current frame will be
209            transferred.
210        Returns:
211            The time (datetime.datetime) at which the transfer of the first line of the current frame started.
212        """
213        in_low_latency_mode = self.transfer_mode == TransferMode.LOW_LATENCY
214        notify_size = self._settings.num_lines_per_chunk if in_low_latency_mode else 0
215
216        seconds_since_init = self._get_device_time_in_s() - self._device_init_time.device_time_in_s
217        frame_timestamp = self._device_init_time.system_time_as_datetime + timedelta(seconds=seconds_since_init)
218        result = self.mw_capture_video_frame_to_virtual_address_ex(  # type: ignore
219            hchannel=self._channel,
220            iframe=self.buffer_status.last_buffered_frame_index,
221            pbframe=addressof(frame_buffer),
222            cbframe=self._settings.image_size_in_bytes,
223            cbstride=self._settings.min_stride,
224            bbottomup=False,  # this is True in the C++ example, but false in python example,
225            pvcontext=0,
226            dwfourcc=self._settings.color_format.value,  # color format of captured frames
227            cx=self._settings.dimensions.cols,
228            cy=self._settings.dimensions.rows,
229            dwprocessswitchs=0,
230            cypartialnotify=notify_size,
231            hosdimage=0,
232            posdrects=0,
233            cosdrects=0,
234            scontrast=100,
235            sbrightness=0,
236            ssaturation=100,
237            shue=0,
238            deinterlacemode=MWCAP_VIDEO_DEINTERLACE_BLEND,
239            aspectratioconvertmode=MWCAP_VIDEO_ASPECT_RATIO_CROPPING,
240            prectsrc=0,  # 0 in C++ example, but configured using CLIP settings in python example,
241            prectdest=0,
242            naspectx=0,
243            naspecty=0,
244            colorformat=MWCAP_VIDEO_COLOR_FORMAT_UNKNOWN,
245            quantrange=MWCAP_VIDEO_QUANTIZATION_UNKNOWN,
246            satrange=MWCAP_VIDEO_SATURATION_UNKNOWN,
247        )
248        if result != MW_SUCCEEDED:
249            raise ProCaptureError(f"Frame grab failed with error code {result}")
250        else:
251            return frame_timestamp
252
253    def shutdown(self) -> None:
254        """shutdown releases the hardware resources used by the device."""
255        self._timer.shutdown()
256        self._signal_change_event.destroy()
257        self._transfer_complete_event.destroy()
258        self._frame_buffered_event.destroy()
259        self._frame_buffering_event.destroy()
260
261        self.mw_close_channel(self._channel)  # type: ignore
262        self.mw_capture_exit_instance()  # type: ignore

ProCaptureDevice represents a physical Magewell ProCapture device. It is intended to be controlled by a ProCaptureDeviceController.

ProCaptureDevice configures and initiates frame acquisition and transfer. It is responsible for constructing and registering events with the Magewell driver. It also provides methods for accessing information about the video source connected to the device. ProCaptureDevice inherits from the mw_capture class provided by Magwell's Windows SDK.

ProCaptureDevice( settings: pymagewell.ProCaptureSettings)
63    def __init__(self, settings: ProCaptureSettings):
64        """
65        Args:
66            settings: A `ProCaptureSettings` object containing the settings for the device.
67        """
68        ProCaptureDeviceImpl.__init__(self, settings)
69        mw_capture.__init__(self)  # type: ignore
70
71        self.mw_capture_init_instance()  # type: ignore
72        self.mw_refresh_device()  # type: ignore
73        self._channel = _create_channel(self)
74
75        self._device_init_time = DeviceInitTime(
76            device_time_in_s=self._get_device_time_in_s(), system_time_as_datetime=datetime.now()
77        )
78
79        self._timer = _FrameTimer(self, self._channel, self._register_timer_event(TimerEvent()))
80
81        self._signal_change_event = cast(SignalChangeEvent, self._register_event(SignalChangeEvent()))
82
83        self._frame_buffered_event = FrameBufferedEvent()
84        self._frame_buffering_event = FrameBufferingEvent()
85
86        if self._settings.transfer_mode == TransferMode.NORMAL:
87            self._frame_buffered_event = cast(FrameBufferedEvent, self._register_event(self._frame_buffered_event))
88            self._transfer_complete_event: TransferCompleteEvent = FrameTransferCompleteEvent()
89
90        elif self._settings.transfer_mode == TransferMode.LOW_LATENCY:
91            self._frame_buffering_event = cast(FrameBufferingEvent, self._register_event(self._frame_buffering_event))
92            self._transfer_complete_event = PartialFrameTransferCompleteEvent()
93
94        elif self._settings.transfer_mode.TIMER:
95            self._transfer_complete_event = FrameTransferCompleteEvent()
Args
events: pymagewell.pro_capture_device.device_interface.ProCaptureEvents

events property

Returns

A ProCaptureEvents object containing handles to the events generated by the device during frame grabbing.

def schedule_timer_event(self) -> None:
122    def schedule_timer_event(self) -> None:
123        """For use if the TransferMode is TransferMode.TIMER, this method schedules a timer event to trigger the transfer of the
124        next frame."""
125        self._timer.schedule_timer_event(self._get_device_time_in_ticks())

For use if the TransferMode is TransferMode.TIMER, this method schedules a timer event to trigger the transfer of the next frame.

buffer_status: pymagewell.pro_capture_device.device_status.OnDeviceBufferStatus

buffer_status property

Returns

A OnDeviceBufferStatus object containing information about the device's buffer.

frame_info: pymagewell.pro_capture_device.device_status.FrameInfo

frame_info property

Returns

A FrameInfo object containing information about the most recent frame.

signal_status: pymagewell.pro_capture_device.device_status.SignalStatus

signal_status property

Returns

A SignalStatus object containing information about the signal connected to the device.

def start_grabbing(self) -> None:
157    def start_grabbing(self) -> None:
158        """Starts the hardware acquiring frames."""
159        start_capture_result = self.mw_start_video_capture(self._channel, self.events.transfer_complete.win32_event)  # type: ignore
160        if start_capture_result != MW_SUCCEEDED:
161            raise ProCaptureError(f"Start capture failed (error code {start_capture_result}).")
162        if self.signal_status.state == SignalState.NONE:
163            logger.warning("Input signal status: None")
164        elif self.signal_status.state == SignalState.UNSUPPORTED:
165            logger.warning("Input signal status: Unsupported")
166        elif self.signal_status.state == SignalState.LOCKING:
167            logger.info("Input signal status: Locking")
168        elif self.signal_status.state == SignalState.LOCKED:
169            logger.info("Input signal status: Locked")
170
171        # Exit if signal not locked
172        if self.signal_status.state != SignalState.LOCKED:
173            self.mw_stop_video_capture(self._channel)  # type: ignore
174            self.shutdown()
175            raise ProCaptureError("Signal not locked. It is likely that no video signal is present.")

Starts the hardware acquiring frames.

def stop_grabbing(self) -> None:
177    def stop_grabbing(self) -> None:
178        """Stops the hardware acquiring frames."""
179        self.mw_stop_video_capture(self._channel)  # type: ignore

Stops the hardware acquiring frames.

transfer_status: pymagewell.pro_capture_device.device_status.TransferStatus

transfer_status property

Returns

A TransferStatus object containing information about the transfer of frames from the device to the PC, including the number of frames transferred, and the number of lines of the current frame that have been transferred.

def start_a_frame_transfer(self, frame_buffer: _ctypes.Array[ctypes.c_char]) -> datetime.datetime:
205    def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime:
206        """start_a_frame_transfer starts the transfer of lines from the device to a buffer in PC memory.
207        Args:
208            frame_buffer (Array[c_char]): A buffer in PC memory to which the lines of the current frame will be
209            transferred.
210        Returns:
211            The time (datetime.datetime) at which the transfer of the first line of the current frame started.
212        """
213        in_low_latency_mode = self.transfer_mode == TransferMode.LOW_LATENCY
214        notify_size = self._settings.num_lines_per_chunk if in_low_latency_mode else 0
215
216        seconds_since_init = self._get_device_time_in_s() - self._device_init_time.device_time_in_s
217        frame_timestamp = self._device_init_time.system_time_as_datetime + timedelta(seconds=seconds_since_init)
218        result = self.mw_capture_video_frame_to_virtual_address_ex(  # type: ignore
219            hchannel=self._channel,
220            iframe=self.buffer_status.last_buffered_frame_index,
221            pbframe=addressof(frame_buffer),
222            cbframe=self._settings.image_size_in_bytes,
223            cbstride=self._settings.min_stride,
224            bbottomup=False,  # this is True in the C++ example, but false in python example,
225            pvcontext=0,
226            dwfourcc=self._settings.color_format.value,  # color format of captured frames
227            cx=self._settings.dimensions.cols,
228            cy=self._settings.dimensions.rows,
229            dwprocessswitchs=0,
230            cypartialnotify=notify_size,
231            hosdimage=0,
232            posdrects=0,
233            cosdrects=0,
234            scontrast=100,
235            sbrightness=0,
236            ssaturation=100,
237            shue=0,
238            deinterlacemode=MWCAP_VIDEO_DEINTERLACE_BLEND,
239            aspectratioconvertmode=MWCAP_VIDEO_ASPECT_RATIO_CROPPING,
240            prectsrc=0,  # 0 in C++ example, but configured using CLIP settings in python example,
241            prectdest=0,
242            naspectx=0,
243            naspecty=0,
244            colorformat=MWCAP_VIDEO_COLOR_FORMAT_UNKNOWN,
245            quantrange=MWCAP_VIDEO_QUANTIZATION_UNKNOWN,
246            satrange=MWCAP_VIDEO_SATURATION_UNKNOWN,
247        )
248        if result != MW_SUCCEEDED:
249            raise ProCaptureError(f"Frame grab failed with error code {result}")
250        else:
251            return frame_timestamp

start_a_frame_transfer starts the transfer of lines from the device to a buffer in PC memory.

Args
  • frame_buffer (Array[c_char]): A buffer in PC memory to which the lines of the current frame will be
  • transferred.
Returns

The time (datetime.datetime) at which the transfer of the first line of the current frame started.

def shutdown(self) -> None:
253    def shutdown(self) -> None:
254        """shutdown releases the hardware resources used by the device."""
255        self._timer.shutdown()
256        self._signal_change_event.destroy()
257        self._transfer_complete_event.destroy()
258        self._frame_buffered_event.destroy()
259        self._frame_buffering_event.destroy()
260
261        self.mw_close_channel(self._channel)  # type: ignore
262        self.mw_capture_exit_instance()  # type: ignore

shutdown releases the hardware resources used by the device.

Inherited Members
mwcapture.libmwcapture.mw_capture
load_win_funcs
mw_capture_init_instance
mw_capture_exit_instance
mw_get_version
mw_refresh_device
mw_get_channel_count
mw_get_channel_info_by_index
mw_get_channel_info
mw_get_device_path
mw_open_channel_by_path
mw_close_channel
mw_create_video_capture
mw_destory_video_capture
mw_create_audio_capture
mw_destory_audio_capture
mw_get_video_signal_status
mw_get_audio_signal_status
mw_get_video_caps
mw_start_video_capture
mw_register_notify
mw_register_timer
mw_get_device_time
mw_pin_video_buffer
mw_schedule_timer
mw_get_video_buffer_info
mw_get_video_frame_info
mw_capture_video_frame_to_virtual_address_ex
mw_unpin_video_buffer
mw_unregister_notify
mw_unregister_timer
mw_stop_video_capture
mw_get_video_capture_status
mw_start_audio_capture
mw_get_notify_status
mw_capture_audio_frame
mw_stop_audio_capture
mw_start_video_eco_capture
mw_get_video_eco_capture_status
mwcapture_set_video_eco_frame
mw_stop_video_eco_capture
class MockProCaptureDevice(pymagewell.pro_capture_device.pro_capture_device_impl.ProCaptureDeviceImpl):
 54class MockProCaptureDevice(ProCaptureDeviceImpl):
 55    """MockProCaptureDevice is intended to be used during testing, development and CI in the absence of a hardware frame
 56    grabber or Magewell Windows SDK. Does not require Magewell driver or hardware.
 57
 58    Only TransferMode.Timer is supported.
 59
 60    The class generates test frames. The frame rate is limited to 2 frames per second because copying the mock frames
 61    to a provided PC transfer buffer takes a surprisingly long time (~0.11s).
 62
 63    It's recommended to use ColourFormat.RGB24 only. You can use some other formats if you have ffmpeg installed, but
 64    this is quite slow."""
 65
 66    def __init__(self, settings: ProCaptureSettings):
 67        """
 68        Args:
 69            settings (ProCaptureSettings): The settings to use for the mock device. settings.transfer_mode must be
 70              set to TransferMode.Timer.
 71        """
 72        if settings.transfer_mode != TransferMode.TIMER:
 73            raise ValueError("MockProCaptureDevice only works in Timer transfer mode.")
 74        super().__init__(settings)
 75        self._is_grabbing = False
 76        self._events = ProCaptureEvents(
 77            transfer_complete=TransferCompleteEvent(),
 78            signal_change=SignalChangeEvent(),
 79            frame_buffered=FrameBufferedEvent(),
 80            frame_buffering=FrameBufferingEvent(),
 81            timer_event=TimerEvent(),
 82        )
 83        self._events.signal_change.register(Notification(0, 0))
 84        self._events.timer_event.register(Notification(0, 0))
 85
 86        self._mock_timer = _MockTimer(self._events.timer_event, MOCK_FRAME_RATE_HZ)
 87
 88        self._frame_counter: int = 0
 89        mock_frames_np_arrays = [create_mock_frame() for _ in range(NUM_TEST_FRAMES)]
 90        for i, frame in enumerate(mock_frames_np_arrays):
 91            putText(
 92                frame,
 93                str(i),
 94                (frame.shape[1] // 2, frame.shape[0] // 2),
 95                FONT_HERSHEY_SIMPLEX,
 96                1,
 97                (255, 255, 255),
 98                1,
 99                LINE_AA,
100            )
101        self._mock_frames: List[bytes] = []
102        if self.frame_properties.format == ColourFormat.RGB24:
103            self._mock_frames = [frame.tobytes() for frame in mock_frames_np_arrays]
104        else:
105            ffmpeg = FFMPEG("FFMPEG is required to use Mock mode with any colour format other than RGB24.")
106            self._mock_frames = [
107                ffmpeg.encode_rgb24_array(frame, self.frame_properties.format) for frame in mock_frames_np_arrays
108            ]
109
110    @property
111    def events(self) -> ProCaptureEvents:
112        """events property
113        Returns:
114            A `ProCaptureEvents` object containing handles to the events generated by the device during frame grabbing.
115        """
116        return self._events
117
118    def schedule_timer_event(self) -> None:
119        self._mock_timer.schedule_event()
120
121    @property
122    def buffer_status(self) -> OnDeviceBufferStatus:
123        return OnDeviceBufferStatus(
124            buffer_size_in_frames=1,
125            num_chunks_in_buffer=1,
126            buffering_field_index=1,
127            last_buffered_field_index=1,
128            last_buffered_frame_index=1,
129            num_fully_buffered_frames=1,
130            num_chunks_being_buffered=1,
131        )
132
133    @property
134    def frame_info(self) -> FrameInfo:
135        return FrameInfo(
136            state=FrameState.BUFFERED,
137            interlaced=False,
138            segmented=False,
139            dimensions=MOCK_RESOLUTION,
140            aspect_ratio=MOCK_ASPECT_RATIO,
141            buffering_start_time=datetime.now(),
142            buffering_complete_time=datetime.now(),
143        )
144
145    @property
146    def signal_status(self) -> SignalStatus:
147        return SignalStatus(
148            state=SignalState.LOCKED,
149            start_position=ImageCoordinateInPixels(row=0, col=0),
150            image_dimensions=MOCK_RESOLUTION,
151            total_dimensions=MOCK_RESOLUTION,
152            interlaced=False,
153            frame_period_s=1 / MOCK_FRAME_RATE_HZ,
154            aspect_ratio=MOCK_ASPECT_RATIO,
155            segmented=False,
156        )
157
158    @property
159    def transfer_status(self) -> TransferStatus:
160        return TransferStatus(
161            whole_frame_transferred=True,
162            num_lines_transferred=MOCK_RESOLUTION.rows,
163            num_lines_transferred_previously=MOCK_RESOLUTION.rows,
164            frame_index=0,
165        )
166
167    def start_grabbing(self) -> None:
168        self._is_grabbing = True
169
170    def stop_grabbing(self) -> None:
171        self._is_grabbing = False
172
173    def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime:
174        """start_a_frame_transfer immediately writes a mock frame to the provided buffer.
175        Args:
176            frame_buffer (Array[c_char]): The buffer to write the mock frame to.
177        Returns:
178            The time (datetime.datetime) the frame transfer was completed.
179        """
180        frame_buffer[: self.frame_properties.size_in_bytes] = self._mock_frames[  # type: ignore
181            self._frame_counter % NUM_TEST_FRAMES
182        ]
183        self.events.transfer_complete.set()
184        self._frame_counter += 1
185        return datetime.now()
186
187    def shutdown(self) -> None:
188        self._is_grabbing = False

MockProCaptureDevice is intended to be used during testing, development and CI in the absence of a hardware frame grabber or Magewell Windows SDK. Does not require Magewell driver or hardware.

Only TransferMode.Timer is supported.

The class generates test frames. The frame rate is limited to 2 frames per second because copying the mock frames to a provided PC transfer buffer takes a surprisingly long time (~0.11s).

It's recommended to use ColourFormat.RGB24 only. You can use some other formats if you have ffmpeg installed, but this is quite slow.

MockProCaptureDevice( settings: pymagewell.ProCaptureSettings)
 66    def __init__(self, settings: ProCaptureSettings):
 67        """
 68        Args:
 69            settings (ProCaptureSettings): The settings to use for the mock device. settings.transfer_mode must be
 70              set to TransferMode.Timer.
 71        """
 72        if settings.transfer_mode != TransferMode.TIMER:
 73            raise ValueError("MockProCaptureDevice only works in Timer transfer mode.")
 74        super().__init__(settings)
 75        self._is_grabbing = False
 76        self._events = ProCaptureEvents(
 77            transfer_complete=TransferCompleteEvent(),
 78            signal_change=SignalChangeEvent(),
 79            frame_buffered=FrameBufferedEvent(),
 80            frame_buffering=FrameBufferingEvent(),
 81            timer_event=TimerEvent(),
 82        )
 83        self._events.signal_change.register(Notification(0, 0))
 84        self._events.timer_event.register(Notification(0, 0))
 85
 86        self._mock_timer = _MockTimer(self._events.timer_event, MOCK_FRAME_RATE_HZ)
 87
 88        self._frame_counter: int = 0
 89        mock_frames_np_arrays = [create_mock_frame() for _ in range(NUM_TEST_FRAMES)]
 90        for i, frame in enumerate(mock_frames_np_arrays):
 91            putText(
 92                frame,
 93                str(i),
 94                (frame.shape[1] // 2, frame.shape[0] // 2),
 95                FONT_HERSHEY_SIMPLEX,
 96                1,
 97                (255, 255, 255),
 98                1,
 99                LINE_AA,
100            )
101        self._mock_frames: List[bytes] = []
102        if self.frame_properties.format == ColourFormat.RGB24:
103            self._mock_frames = [frame.tobytes() for frame in mock_frames_np_arrays]
104        else:
105            ffmpeg = FFMPEG("FFMPEG is required to use Mock mode with any colour format other than RGB24.")
106            self._mock_frames = [
107                ffmpeg.encode_rgb24_array(frame, self.frame_properties.format) for frame in mock_frames_np_arrays
108            ]
Args
  • settings (ProCaptureSettings): The settings to use for the mock device. settings.transfer_mode must be set to TransferMode.Timer.
events: pymagewell.pro_capture_device.device_interface.ProCaptureEvents

events property

Returns

A ProCaptureEvents object containing handles to the events generated by the device during frame grabbing.

def schedule_timer_event(self) -> None:
118    def schedule_timer_event(self) -> None:
119        self._mock_timer.schedule_event()
def start_grabbing(self) -> None:
167    def start_grabbing(self) -> None:
168        self._is_grabbing = True
def stop_grabbing(self) -> None:
170    def stop_grabbing(self) -> None:
171        self._is_grabbing = False
def start_a_frame_transfer(self, frame_buffer: _ctypes.Array[ctypes.c_char]) -> datetime.datetime:
173    def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime:
174        """start_a_frame_transfer immediately writes a mock frame to the provided buffer.
175        Args:
176            frame_buffer (Array[c_char]): The buffer to write the mock frame to.
177        Returns:
178            The time (datetime.datetime) the frame transfer was completed.
179        """
180        frame_buffer[: self.frame_properties.size_in_bytes] = self._mock_frames[  # type: ignore
181            self._frame_counter % NUM_TEST_FRAMES
182        ]
183        self.events.transfer_complete.set()
184        self._frame_counter += 1
185        return datetime.now()

start_a_frame_transfer immediately writes a mock frame to the provided buffer.

Args
  • frame_buffer (Array[c_char]): The buffer to write the mock frame to.
Returns

The time (datetime.datetime) the frame transfer was completed.

def shutdown(self) -> None:
187    def shutdown(self) -> None:
188        self._is_grabbing = False
class ProCaptureController:
 25class ProCaptureController:
 26    """ProCaptureController controls the transfer of frames from a ProCaptureDevice or MockProCaptureDevice to a PC."""
 27
 28    def __init__(self, device: ProCaptureDeviceInterface):
 29        """
 30        Args:
 31            device (ProCaptureDeviceInterface): The implementation of ProCaptureDeviceInterface to use for frame
 32              transfer. ProCaptureDevice or MockProCaptureDevice are both valid implementations.
 33        """
 34        self._device = device
 35        self._transfer_buffer = create_string_buffer(3840 * 2160 * 4)
 36        self._device.start_grabbing()
 37
 38    def transfer_when_ready(self, timeout_ms: int = 2000) -> VideoFrame:
 39        """transfer_when_ready wait for the device to be ready to start transferring, transfers it and returns it.
 40
 41        This method will block until the frame has been transferred or the timeout has been reached.
 42
 43        In TransferMode.TIMER and TransferMode.NORMAL, frame transfer will start after a whole frame has been grabbed
 44          by the device. In TransferMode.LOW_LATENCY, frame transfer will start after a frame has started to be
 45          buffered onto the device.
 46        """
 47        if self._device.transfer_mode == TransferMode.TIMER:
 48            self._device.schedule_timer_event()
 49        event = self._wait_for_event(timeout_ms=timeout_ms)
 50        frame = self._handle_event(event)
 51        if frame is None:
 52            return self.transfer_when_ready()
 53        else:
 54            return frame
 55
 56    def _wait_for_event(self, timeout_ms: int) -> Event:
 57        """Wait for events to be raised by the ProCaptureDevice or Timer, and return the raised event."""
 58        if self._device.transfer_mode == TransferMode.TIMER:
 59            grab_event: Event = self._device.events.timer_event
 60        elif self._device.transfer_mode == TransferMode.NORMAL:
 61            grab_event = self._device.events.frame_buffered
 62        elif self._device.transfer_mode == TransferMode.LOW_LATENCY:
 63            grab_event = self._device.events.frame_buffering
 64        else:
 65            raise NotImplementedError("Invalid grab mode.")
 66
 67        events_to_wait_for = [grab_event, self._device.events.signal_change]
 68        try:
 69            event_that_occurred = wait_for_events(events_to_wait_for, timeout_ms=timeout_ms)
 70        except WaitForEventTimeout as e:
 71            self.shutdown()
 72            raise e
 73        return event_that_occurred
 74
 75    @singledispatchmethod
 76    def _handle_event(self, event: Event) -> Optional[VideoFrame]:
 77        """Handle a raised event, including transferring a frame if the event means one is ready."""
 78        raise NotImplementedError()
 79
 80    @_handle_event.register
 81    def _(self, event: TimerEvent) -> Optional[VideoFrame]:
 82        """If timer event received, then whole frame is on device. Only subscribed to in TIMER mode. This method
 83        transfers the frame to a buffer in PC memory, makes a copy, marks the buffer memory as free and then returns
 84        the copy."""
 85        transfer_started_timestamp = self._device.start_a_frame_transfer(self._transfer_buffer)
 86        buffering_started_timestamp = self._device.frame_info.buffering_start_time
 87        transfer_complete_timestamp = self._wait_for_frame_or_chunk_transfer_to_complete(timeout_ms=2000)
 88        buffering_complete_timestamp = self._device.frame_info.buffering_complete_time
 89        if not self._device.transfer_status.whole_frame_transferred:  # this marks the buffer memory as free
 90            raise ProCaptureError("Only part of frame has been acquired")
 91        return self._format_frame(
 92            timestamps=VideoFrameTimestamps(
 93                transfer_started=transfer_started_timestamp,
 94                transfer_complete=transfer_complete_timestamp,
 95                buffering_started=buffering_started_timestamp,
 96                buffering_complete=buffering_complete_timestamp,
 97            )
 98        )
 99
100    @_handle_event.register
101    def _(self, event: FrameBufferedEvent) -> Optional[VideoFrame]:
102        """If FrameBufferedEvent event received, then whole frame is on device. This event is only subscribed to in
103        NORMAL mode. This method transfers it to a buffer in PC memory, makes a copy, marks the buffer memory as free
104        and then returns the copy."""
105        transfer_started_timestamp = self._device.start_a_frame_transfer(self._transfer_buffer)
106        transfer_complete_timestamp = self._wait_for_frame_or_chunk_transfer_to_complete(timeout_ms=2000)
107        buffering_started_timestamp = self._device.frame_info.buffering_start_time
108        buffering_complete_timestamp = self._device.frame_info.buffering_complete_time
109        if not self._device.transfer_status.whole_frame_transferred:  # this marks the buffer memory as free
110            raise ProCaptureError("Only part of frame has been acquired")
111        return self._format_frame(
112            timestamps=VideoFrameTimestamps(
113                transfer_started=transfer_started_timestamp,
114                transfer_complete=transfer_complete_timestamp,
115                buffering_started=buffering_started_timestamp,
116                buffering_complete=buffering_complete_timestamp,
117            )
118        )
119
120    @_handle_event.register
121    def _(self, event: FrameBufferingEvent) -> Optional[VideoFrame]:
122        """If FrameBufferingEvent event received, then a frame has started to be acquired by the card. This event is
123        only subscribed to in LOW_LATENCY mode. This method starts the transfer of the available lines to a buffer in
124        PC memory while the acquisition is still happening. It then waits until all lines have been received (this query
125        also frees the memory), copies the buffer contents and returns the copy."""
126        transfer_started_timestamp = self._device.start_a_frame_transfer(self._transfer_buffer)
127        buffering_started_timestamp = self._device.frame_info.buffering_start_time
128        self._wait_for_frame_or_chunk_transfer_to_complete(timeout_ms=2000)
129        wait_start_t = time.perf_counter()
130        while (
131            self._device.transfer_status.num_lines_transferred < self._device.frame_properties.dimensions.rows
132            and (time.perf_counter() - wait_start_t) < 1
133        ):
134            pass
135        transfer_complete_timestamp = datetime.now()
136        buffering_complete_timestamp = self._device.frame_info.buffering_complete_time
137
138        return self._format_frame(
139            timestamps=VideoFrameTimestamps(
140                transfer_started=transfer_started_timestamp,
141                transfer_complete=transfer_complete_timestamp,
142                buffering_started=buffering_started_timestamp,
143                buffering_complete=buffering_complete_timestamp,
144            )
145        )
146
147    @_handle_event.register
148    def _(self, event: SignalChangeEvent) -> None:
149        """If a SignalChangeEvent is received, then the source signal has changed and no frame is available."""
150        logger.info("Frame grabber signal change detected")
151
152    def _format_frame(self, timestamps: VideoFrameTimestamps) -> VideoFrame:
153        """Copy the contents of the transfer buffer, and return it as a VideoFrame."""
154        # Copy the acquired frame
155        string_buffer = string_at(self._transfer_buffer, self._device.frame_properties.size_in_bytes)
156        frame = VideoFrame(
157            string_buffer,
158            dimensions=self._device.frame_properties.dimensions,
159            timestamps=timestamps,
160            format=self._device.frame_properties.format,
161        )
162        return frame
163
164    def _wait_for_frame_or_chunk_transfer_to_complete(self, timeout_ms: int) -> datetime:
165        """Waits until a whole frame (or chunk of a frame in low latency mode) has been transferred to the buffer in
166        PC memory."""
167        try:
168            wait_for_event(self._device.events.transfer_complete, timeout_ms=timeout_ms)
169            return datetime.now()
170        except WaitForEventTimeout as e:
171            self.shutdown()
172            raise e
173
174    def shutdown(self) -> None:
175        """Shuts down the frame grabber device."""
176        self._device.stop_grabbing()
177        self._device.shutdown()

ProCaptureController controls the transfer of frames from a ProCaptureDevice or MockProCaptureDevice to a PC.

ProCaptureController( device: pymagewell.pro_capture_device.device_interface.ProCaptureDeviceInterface)
28    def __init__(self, device: ProCaptureDeviceInterface):
29        """
30        Args:
31            device (ProCaptureDeviceInterface): The implementation of ProCaptureDeviceInterface to use for frame
32              transfer. ProCaptureDevice or MockProCaptureDevice are both valid implementations.
33        """
34        self._device = device
35        self._transfer_buffer = create_string_buffer(3840 * 2160 * 4)
36        self._device.start_grabbing()
Args
  • device (ProCaptureDeviceInterface): The implementation of ProCaptureDeviceInterface to use for frame transfer. ProCaptureDevice or MockProCaptureDevice are both valid implementations.
def transfer_when_ready(self, timeout_ms: int = 2000) -> pymagewell.video_frame.VideoFrame:
38    def transfer_when_ready(self, timeout_ms: int = 2000) -> VideoFrame:
39        """transfer_when_ready wait for the device to be ready to start transferring, transfers it and returns it.
40
41        This method will block until the frame has been transferred or the timeout has been reached.
42
43        In TransferMode.TIMER and TransferMode.NORMAL, frame transfer will start after a whole frame has been grabbed
44          by the device. In TransferMode.LOW_LATENCY, frame transfer will start after a frame has started to be
45          buffered onto the device.
46        """
47        if self._device.transfer_mode == TransferMode.TIMER:
48            self._device.schedule_timer_event()
49        event = self._wait_for_event(timeout_ms=timeout_ms)
50        frame = self._handle_event(event)
51        if frame is None:
52            return self.transfer_when_ready()
53        else:
54            return frame

transfer_when_ready wait for the device to be ready to start transferring, transfers it and returns it.

This method will block until the frame has been transferred or the timeout has been reached.

In TransferMode.TIMER and TransferMode.NORMAL, frame transfer will start after a whole frame has been grabbed by the device. In TransferMode.LOW_LATENCY, frame transfer will start after a frame has started to be buffered onto the device.

def shutdown(self) -> None:
174    def shutdown(self) -> None:
175        """Shuts down the frame grabber device."""
176        self._device.stop_grabbing()
177        self._device.shutdown()

Shuts down the frame grabber device.

@dataclass
class ProCaptureSettings:
336@dataclass
337class ProCaptureSettings:
338    """Settings for the ProCapture device."""
339
340    dimensions: ImageSizeInPixels = ImageSizeInPixels(1920, 1080)
341    """The dimensions of the frames to be acquired in pixels."""
342    color_format: ColourFormat = ColourFormat.BGR24
343    """The colour format of the frames to be acquired."""
344    transfer_mode: TransferMode = TransferMode.NORMAL
345    """The method to use for transferring frames from the device to the PC. See `TransferMode` for details."""
346    num_lines_per_chunk: int = 64
347    """The number of lines of a frame to transfer at a time (for `TransferMode.LOW_LATENCY` transfers)."""
348
349    def __post_init__(self) -> None:
350        _check_valid_chunk_size(self.num_lines_per_chunk)
351
352    @property
353    def min_stride(self) -> int:
354        return cast(int, fourcc_calc_min_stride(self.color_format.value, self.dimensions.cols, 2))  # type: ignore
355
356    @property
357    def image_size_in_bytes(self) -> int:
358        if self.color_format == ColourFormat.NV12:
359            return self.dimensions.cols * self.dimensions.rows * 2  # copied from line 223 of capture.py
360        else:
361            return cast(
362                int,
363                fourcc_calc_image_size(  # type: ignore
364                    self.color_format,
365                    self.dimensions.cols,
366                    self.dimensions.rows,
367                    self.min_stride,
368                ),
369            )

Settings for the ProCapture device.

ProCaptureSettings( dimensions: pymagewell.ImageSizeInPixels = ImageSizeInPixels(cols=1920, rows=1080), color_format: pymagewell.ColourFormat = <ColourFormat.BGR24: 542263106>, transfer_mode: pymagewell.TransferMode = <TransferMode.NORMAL: 1>, num_lines_per_chunk: int = 64)
dimensions: pymagewell.ImageSizeInPixels = ImageSizeInPixels(cols=1920, rows=1080)

The dimensions of the frames to be acquired in pixels.

color_format: pymagewell.ColourFormat = <ColourFormat.BGR24: 542263106>

The colour format of the frames to be acquired.

The method to use for transferring frames from the device to the PC. See TransferMode for details.

num_lines_per_chunk: int = 64

The number of lines of a frame to transfer at a time (for TransferMode.LOW_LATENCY transfers).

class TransferMode(enum.Enum):
281class TransferMode(Enum):
282    """Enumeration of the supported methods for triggering the transfer of frames to the PC."""
283
284    TIMER = 0
285    """ Transferred are triggered by a software timer event, allowing arbitrary frame rates. This is the only mode
286        supported by MockProCaptureDevice. """
287    NORMAL = 1
288    """ Transfers are triggered by a notification received from the device when a whole frame has been received, and
289    therefore grabbing happens at the source frame rate."""
290    LOW_LATENCY = 2
291    """ Transfers are triggered by a notification received from the device when the first chunk of a frame has been
292    received. Grabbing happens at the source frame rate, but with a lower latency."""

Enumeration of the supported methods for triggering the transfer of frames to the PC.

TIMER = <TransferMode.TIMER: 0>

Transferred are triggered by a software timer event, allowing arbitrary frame rates. This is the only mode supported by MockProCaptureDevice.

NORMAL = <TransferMode.NORMAL: 1>

Transfers are triggered by a notification received from the device when a whole frame has been received, and therefore grabbing happens at the source frame rate.

LOW_LATENCY = <TransferMode.LOW_LATENCY: 2>

Transfers are triggered by a notification received from the device when the first chunk of a frame has been received. Grabbing happens at the source frame rate, but with a lower latency.

Inherited Members
enum.Enum
name
value
@dataclass(frozen=True)
class ImageSizeInPixels:
269@dataclass(frozen=True)
270class ImageSizeInPixels:
271    cols: int
272    rows: int
ImageSizeInPixels(cols: int, rows: int)
class ColourFormat(enum.Enum):
 75class ColourFormat(Enum):
 76    """Enumeration of the supported colour formats."""
 77
 78    UNK = MWFOURCC_UNK
 79    GREY = MWFOURCC_GREY
 80    Y800 = MWFOURCC_Y800
 81    Y8 = MWFOURCC_Y8
 82    Y16 = MWFOURCC_Y16
 83    RGB15 = MWFOURCC_RGB15
 84    RGB16 = MWFOURCC_RGB16
 85    RGB24 = MWFOURCC_RGB24
 86    RGBA = MWFOURCC_RGBA
 87    ARGB = MWFOURCC_ARGB
 88    BGR15 = MWFOURCC_BGR15
 89    BGR16 = MWFOURCC_BGR16
 90    BGR24 = MWFOURCC_BGR24
 91    BGRA = MWFOURCC_BGRA
 92    ABGR = MWFOURCC_ABGR
 93    NV16 = MWFOURCC_NV16
 94    NV61 = MWFOURCC_NV61
 95    I422 = MWFOURCC_I422
 96    YV16 = MWFOURCC_YV16
 97    YUY2 = MWFOURCC_YUY2
 98    YUYV = MWFOURCC_YUYV
 99    UYVY = MWFOURCC_UYVY
100    YVYU = MWFOURCC_YVYU
101    VYUY = MWFOURCC_VYUY
102    I420 = MWFOURCC_I420
103    IYUV = MWFOURCC_IYUV
104    NV12 = MWFOURCC_NV12
105    YV12 = MWFOURCC_YV12
106    NV21 = MWFOURCC_NV21
107    P010 = MWFOURCC_P010
108    P210 = MWFOURCC_P210
109    IYU2 = MWFOURCC_IYU2
110    V308 = MWFOURCC_V308
111    AYUV = MWFOURCC_AYUV
112    UYVA = MWFOURCC_UYVA
113    V408 = MWFOURCC_V408
114    VYUA = MWFOURCC_VYUA
115    V210 = MWFOURCC_V210
116    Y410 = MWFOURCC_Y410
117    V410 = MWFOURCC_V410
118    RGB10 = MWFOURCC_RGB10
119    BGR10 = MWFOURCC_BGR10
120
121    @property
122    def fourcc_string(self) -> str:
123        return struct.pack("<I", self.value).decode("utf-8")
124
125    @property
126    def bits_per_pixel(self) -> int:
127        return cast(int, fourcc_get_bpp(self.value))  # type: ignore
128
129    @property
130    def has_alpha_channel(self) -> bool:
131        return "A" in self.fourcc_string
132
133    @property
134    def num_channels(self) -> int:
135        if self in [ColourFormat.Y8, ColourFormat.Y16, ColourFormat.Y800, ColourFormat.GREY]:
136            return 1
137        elif self.has_alpha_channel:
138            return 4
139        else:
140            return 3
141
142    @property
143    def colour_space(self) -> ColourSpace:
144        if self == ColourFormat.UNK:
145            return ColourSpace.UNKNOWN
146        elif self in [ColourFormat.GREY, ColourFormat.Y800, ColourFormat.Y8, ColourFormat.Y16]:
147            return ColourSpace.GREY
148        elif self in [
149            ColourFormat.RGB24,
150            ColourFormat.RGB10,
151            ColourFormat.RGB15,
152            ColourFormat.RGB16,
153            ColourFormat.ARGB,
154            ColourFormat.RGBA,
155            ColourFormat.BGR24,
156            ColourFormat.BGR10,
157            ColourFormat.BGR15,
158            ColourFormat.BGR16,
159            ColourFormat.ABGR,
160            ColourFormat.BGRA,
161        ]:
162            return ColourSpace.RGB
163        else:
164            return ColourSpace.YUV
165
166    def channel_order(self) -> RGBChannelOrder:
167        if self.colour_space != ColourSpace.RGB:
168            raise NotImplementedError("Channel order property only implemented for RGB colour formats")
169        if self in [
170            ColourFormat.RGB24,
171            ColourFormat.ARGB,
172            ColourFormat.RGBA,
173            ColourFormat.RGB15,
174            ColourFormat.RGB16,
175            ColourFormat.RGB10,
176        ]:
177            return RGBChannelOrder.RGB
178        elif self in [
179            ColourFormat.BGR24,
180            ColourFormat.ABGR,
181            ColourFormat.BGRA,
182            ColourFormat.BGR15,
183            ColourFormat.BGR16,
184            ColourFormat.BGR10,
185        ]:
186            return RGBChannelOrder.BGR
187        else:
188            raise NotImplementedError(f"Channel order property not implemented for colour format {self}.")
189
190    @property
191    def alpha_channel_index(self) -> int:
192        if not self.has_alpha_channel:
193            raise ValueError(f"Colour format {self} does not have an alpha channel")
194        alpha_index = self.fourcc_string.find("A")
195        if alpha_index == -1:
196            raise ValueError(f"Could not find index of alpha channel for colour format {self}")
197        return alpha_index
198
199    def as_ffmpeg_pixel_format(self) -> str:
200        if self == ColourFormat.UNK:
201            raise ValueError("Colour format not known")
202        return ffmpeg_pixel_formats[self]
203
204    @property
205    def pixel_dtype(self) -> type:
206        if self == ColourFormat.UNK:
207            raise ValueError("Colour format not known")
208        bits_per_sample_per_channel = int(floor(self.bits_per_pixel / self.num_channels))
209        if bits_per_sample_per_channel <= 8:
210            return uint8
211        elif bits_per_sample_per_channel <= 16:
212            return uint16
213        else:
214            raise ValueError("ColourFormat has unrecognised structure.")

Enumeration of the supported colour formats.

def channel_order(self) -> pymagewell.pro_capture_device.device_settings.RGBChannelOrder:
166    def channel_order(self) -> RGBChannelOrder:
167        if self.colour_space != ColourSpace.RGB:
168            raise NotImplementedError("Channel order property only implemented for RGB colour formats")
169        if self in [
170            ColourFormat.RGB24,
171            ColourFormat.ARGB,
172            ColourFormat.RGBA,
173            ColourFormat.RGB15,
174            ColourFormat.RGB16,
175            ColourFormat.RGB10,
176        ]:
177            return RGBChannelOrder.RGB
178        elif self in [
179            ColourFormat.BGR24,
180            ColourFormat.ABGR,
181            ColourFormat.BGRA,
182            ColourFormat.BGR15,
183            ColourFormat.BGR16,
184            ColourFormat.BGR10,
185        ]:
186            return RGBChannelOrder.BGR
187        else:
188            raise NotImplementedError(f"Channel order property not implemented for colour format {self}.")
def as_ffmpeg_pixel_format(self) -> str:
199    def as_ffmpeg_pixel_format(self) -> str:
200        if self == ColourFormat.UNK:
201            raise ValueError("Colour format not known")
202        return ffmpeg_pixel_formats[self]
Inherited Members
enum.Enum
name
value