pymagewell
pymagewell
is a Python library for interfacing with Magewell ProCapture frame grabbers.
It is based on (and includes) Magewell's Windows SDK and is therefore Windows only. However, it provides a mock class
that for testing and development that does not depend on the SDK's Windows .dll files, so pymagwell
can also be
installed on macOS and Linux.
Installation
pip install pymagewell
Example of use
A full working example is provided in
example_script.py
.
1# Christian Baker, King's College London 2# Copyright (c) 2022 School of Biomedical Engineering & Imaging Sciences, King's College London 3# Licensed under the MIT. You may obtain a copy at https://opensource.org/licenses/MIT. 4""" 5`pymagewell` is a Python library for interfacing with Magewell ProCapture frame grabbers. 6 7It is based on (and includes) Magewell's Windows SDK and is therefore Windows only. However, it provides a mock class 8that for testing and development that does not depend on the SDK's Windows .dll files, so `pymagwell` can also be 9installed on macOS and Linux. 10 11* [GitHub page](https://github.com/KCL-BMEIS/pymagewell) 12* [API reference documentation](https://kcl-bmeis.github.io/pymagewell/) 13* [PyPI page](https://pypi.org/project/pymagewell/) 14 15### Installation 16 17```bash 18pip install pymagewell 19``` 20 21### Example of use 22 23A full working example is provided in 24[`example_script.py`](https://github.com/KCL-BMEIS/pymagewell/blob/main/example_script.py). 25 26""" 27from . import _version 28from .pro_capture_controller import ProCaptureController 29 30from .pro_capture_device import ProCaptureDevice 31from .pro_capture_device.device_settings import ProCaptureSettings, TransferMode, ImageSizeInPixels, ColourFormat 32from .pro_capture_device.mock_pro_capture_device import MockProCaptureDevice 33 34__version__ = _version.get_versions()["version"] # type: ignore 35 36__all__ = [ 37 "ProCaptureDevice", 38 "MockProCaptureDevice", 39 "ProCaptureController", 40 "ProCaptureSettings", 41 "TransferMode", 42 "ImageSizeInPixels", 43 "ColourFormat", 44]
53class ProCaptureDevice(ProCaptureDeviceImpl, mw_capture): 54 """`ProCaptureDevice` represents a physical Magewell ProCapture device. It is intended to be controlled by a 55 `ProCaptureDeviceController`. 56 57 ProCaptureDevice configures and initiates frame acquisition and transfer. It is responsible for constructing and 58 registering events with the Magewell driver. It also provides methods for accessing information about the video 59 source connected to the device. ProCaptureDevice inherits from the `mw_capture` class provided by Magwell's 60 Windows SDK. 61 """ 62 63 def __init__(self, settings: ProCaptureSettings): 64 """ 65 Args: 66 settings: A `ProCaptureSettings` object containing the settings for the device. 67 """ 68 ProCaptureDeviceImpl.__init__(self, settings) 69 mw_capture.__init__(self) # type: ignore 70 71 self.mw_capture_init_instance() # type: ignore 72 self.mw_refresh_device() # type: ignore 73 self._channel = _create_channel(self) 74 75 self._device_init_time = DeviceInitTime( 76 device_time_in_s=self._get_device_time_in_s(), system_time_as_datetime=datetime.now() 77 ) 78 79 self._timer = _FrameTimer(self, self._channel, self._register_timer_event(TimerEvent())) 80 81 self._signal_change_event = cast(SignalChangeEvent, self._register_event(SignalChangeEvent())) 82 83 self._frame_buffered_event = FrameBufferedEvent() 84 self._frame_buffering_event = FrameBufferingEvent() 85 86 if self._settings.transfer_mode == TransferMode.NORMAL: 87 self._frame_buffered_event = cast(FrameBufferedEvent, self._register_event(self._frame_buffered_event)) 88 self._transfer_complete_event: TransferCompleteEvent = FrameTransferCompleteEvent() 89 90 elif self._settings.transfer_mode == TransferMode.LOW_LATENCY: 91 self._frame_buffering_event = cast(FrameBufferingEvent, self._register_event(self._frame_buffering_event)) 92 self._transfer_complete_event = PartialFrameTransferCompleteEvent() 93 94 elif self._settings.transfer_mode.TIMER: 95 self._transfer_complete_event = FrameTransferCompleteEvent() 96 97 @property 98 def events(self) -> ProCaptureEvents: 99 """events property 100 Returns: 101 A `ProCaptureEvents` object containing handles to the events generated by the device during frame grabbing. 102 """ 103 return ProCaptureEvents( 104 transfer_complete=self._transfer_complete_event, 105 signal_change=self._signal_change_event, 106 frame_buffered=self._frame_buffered_event, 107 frame_buffering=self._frame_buffering_event, 108 timer_event=self._timer.event, 109 ) 110 111 def _register_event(self, event: RegisterableEvent) -> RegisterableEvent: 112 notification_handle = self.mw_register_notify(self._channel, event.win32_event, event.registration_token) # type: ignore 113 event.register(Notification(notification_handle, self._channel)) 114 return event 115 116 def _register_timer_event(self, event: TimerEvent) -> TimerEvent: 117 """The _FrameTimer class handles constructing TimerEvents and registering them here.""" 118 notification_handle = self.mw_register_timer(self._channel, event.win32_event) # type: ignore 119 event.register(Notification(notification_handle, self._channel)) 120 return event 121 122 def schedule_timer_event(self) -> None: 123 """For use if the TransferMode is TransferMode.TIMER, this method schedules a timer event to trigger the transfer of the 124 next frame.""" 125 self._timer.schedule_timer_event(self._get_device_time_in_ticks()) 126 127 @property 128 def buffer_status(self) -> OnDeviceBufferStatus: 129 """buffer_status property 130 Returns: 131 A `OnDeviceBufferStatus` object containing information about the device's buffer. 132 """ 133 buffer_info = mwcap_video_buffer_info() 134 self.mw_get_video_buffer_info(self._channel, buffer_info) # type: ignore 135 return OnDeviceBufferStatus.from_mwcap_video_buffer_info(buffer_info) 136 137 @property 138 def frame_info(self) -> FrameInfo: 139 """frame_info property 140 Returns: 141 A `FrameInfo` object containing information about the most recent frame. 142 """ 143 frame_info = mwcap_video_frame_info() 144 self.mw_get_video_frame_info(self._channel, self.buffer_status.last_buffered_frame_index, frame_info) # type: ignore 145 return FrameInfo.from_mwcap_video_frame_info(frame_info, self._device_init_time) 146 147 @property 148 def signal_status(self) -> SignalStatus: 149 """signal_status property 150 Returns: 151 A `SignalStatus` object containing information about the signal connected to the device. 152 """ 153 mw_signal_status = mw_video_signal_status() 154 self.mw_get_video_signal_status(self._channel, mw_signal_status) # type: ignore 155 return SignalStatus.from_mw_video_signal_status(mw_signal_status) 156 157 def start_grabbing(self) -> None: 158 """Starts the hardware acquiring frames.""" 159 start_capture_result = self.mw_start_video_capture(self._channel, self.events.transfer_complete.win32_event) # type: ignore 160 if start_capture_result != MW_SUCCEEDED: 161 raise ProCaptureError(f"Start capture failed (error code {start_capture_result}).") 162 if self.signal_status.state == SignalState.NONE: 163 logger.warning("Input signal status: None") 164 elif self.signal_status.state == SignalState.UNSUPPORTED: 165 logger.warning("Input signal status: Unsupported") 166 elif self.signal_status.state == SignalState.LOCKING: 167 logger.info("Input signal status: Locking") 168 elif self.signal_status.state == SignalState.LOCKED: 169 logger.info("Input signal status: Locked") 170 171 # Exit if signal not locked 172 if self.signal_status.state != SignalState.LOCKED: 173 self.mw_stop_video_capture(self._channel) # type: ignore 174 self.shutdown() 175 raise ProCaptureError("Signal not locked. It is likely that no video signal is present.") 176 177 def stop_grabbing(self) -> None: 178 """Stops the hardware acquiring frames.""" 179 self.mw_stop_video_capture(self._channel) # type: ignore 180 181 @property 182 def transfer_status(self) -> TransferStatus: 183 """transfer_status property 184 Returns: 185 A `TransferStatus` object containing information about the transfer of frames from the device to the PC, 186 including the number of frames transferred, and the number of lines of the current frame that have been 187 transferred. 188 """ 189 mw_capture_status = mw_video_capture_status() 190 self.mw_get_video_capture_status(self._channel, mw_capture_status) # type: ignore 191 return TransferStatus.from_mw_video_capture_status(mw_capture_status) 192 193 def _get_device_time_in_ticks(self) -> mw_device_time: 194 """Read a timestamp from the device.""" 195 time = mw_device_time() # type: ignore 196 result = self.mw_get_device_time(self._channel, time) # type: ignore 197 if result != MW_SUCCEEDED: 198 raise ProCaptureError("Failed to read time from device") 199 else: 200 return time 201 202 def _get_device_time_in_s(self) -> float: 203 return int(self._get_device_time_in_ticks().m_ll_device_time.value) * DEVICE_CLOCK_TICK_PERIOD_IN_SECONDS 204 205 def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime: 206 """start_a_frame_transfer starts the transfer of lines from the device to a buffer in PC memory. 207 Args: 208 frame_buffer (Array[c_char]): A buffer in PC memory to which the lines of the current frame will be 209 transferred. 210 Returns: 211 The time (datetime.datetime) at which the transfer of the first line of the current frame started. 212 """ 213 in_low_latency_mode = self.transfer_mode == TransferMode.LOW_LATENCY 214 notify_size = self._settings.num_lines_per_chunk if in_low_latency_mode else 0 215 216 seconds_since_init = self._get_device_time_in_s() - self._device_init_time.device_time_in_s 217 frame_timestamp = self._device_init_time.system_time_as_datetime + timedelta(seconds=seconds_since_init) 218 result = self.mw_capture_video_frame_to_virtual_address_ex( # type: ignore 219 hchannel=self._channel, 220 iframe=self.buffer_status.last_buffered_frame_index, 221 pbframe=addressof(frame_buffer), 222 cbframe=self._settings.image_size_in_bytes, 223 cbstride=self._settings.min_stride, 224 bbottomup=False, # this is True in the C++ example, but false in python example, 225 pvcontext=0, 226 dwfourcc=self._settings.color_format.value, # color format of captured frames 227 cx=self._settings.dimensions.cols, 228 cy=self._settings.dimensions.rows, 229 dwprocessswitchs=0, 230 cypartialnotify=notify_size, 231 hosdimage=0, 232 posdrects=0, 233 cosdrects=0, 234 scontrast=100, 235 sbrightness=0, 236 ssaturation=100, 237 shue=0, 238 deinterlacemode=MWCAP_VIDEO_DEINTERLACE_BLEND, 239 aspectratioconvertmode=MWCAP_VIDEO_ASPECT_RATIO_CROPPING, 240 prectsrc=0, # 0 in C++ example, but configured using CLIP settings in python example, 241 prectdest=0, 242 naspectx=0, 243 naspecty=0, 244 colorformat=MWCAP_VIDEO_COLOR_FORMAT_UNKNOWN, 245 quantrange=MWCAP_VIDEO_QUANTIZATION_UNKNOWN, 246 satrange=MWCAP_VIDEO_SATURATION_UNKNOWN, 247 ) 248 if result != MW_SUCCEEDED: 249 raise ProCaptureError(f"Frame grab failed with error code {result}") 250 else: 251 return frame_timestamp 252 253 def shutdown(self) -> None: 254 """shutdown releases the hardware resources used by the device.""" 255 self._timer.shutdown() 256 self._signal_change_event.destroy() 257 self._transfer_complete_event.destroy() 258 self._frame_buffered_event.destroy() 259 self._frame_buffering_event.destroy() 260 261 self.mw_close_channel(self._channel) # type: ignore 262 self.mw_capture_exit_instance() # type: ignore
ProCaptureDevice
represents a physical Magewell ProCapture device. It is intended to be controlled by a
ProCaptureDeviceController
.
ProCaptureDevice configures and initiates frame acquisition and transfer. It is responsible for constructing and
registering events with the Magewell driver. It also provides methods for accessing information about the video
source connected to the device. ProCaptureDevice inherits from the mw_capture
class provided by Magwell's
Windows SDK.
63 def __init__(self, settings: ProCaptureSettings): 64 """ 65 Args: 66 settings: A `ProCaptureSettings` object containing the settings for the device. 67 """ 68 ProCaptureDeviceImpl.__init__(self, settings) 69 mw_capture.__init__(self) # type: ignore 70 71 self.mw_capture_init_instance() # type: ignore 72 self.mw_refresh_device() # type: ignore 73 self._channel = _create_channel(self) 74 75 self._device_init_time = DeviceInitTime( 76 device_time_in_s=self._get_device_time_in_s(), system_time_as_datetime=datetime.now() 77 ) 78 79 self._timer = _FrameTimer(self, self._channel, self._register_timer_event(TimerEvent())) 80 81 self._signal_change_event = cast(SignalChangeEvent, self._register_event(SignalChangeEvent())) 82 83 self._frame_buffered_event = FrameBufferedEvent() 84 self._frame_buffering_event = FrameBufferingEvent() 85 86 if self._settings.transfer_mode == TransferMode.NORMAL: 87 self._frame_buffered_event = cast(FrameBufferedEvent, self._register_event(self._frame_buffered_event)) 88 self._transfer_complete_event: TransferCompleteEvent = FrameTransferCompleteEvent() 89 90 elif self._settings.transfer_mode == TransferMode.LOW_LATENCY: 91 self._frame_buffering_event = cast(FrameBufferingEvent, self._register_event(self._frame_buffering_event)) 92 self._transfer_complete_event = PartialFrameTransferCompleteEvent() 93 94 elif self._settings.transfer_mode.TIMER: 95 self._transfer_complete_event = FrameTransferCompleteEvent()
Args
- settings: A
ProCaptureSettings
object containing the settings for the device.
events property
Returns
A
ProCaptureEvents
object containing handles to the events generated by the device during frame grabbing.
122 def schedule_timer_event(self) -> None: 123 """For use if the TransferMode is TransferMode.TIMER, this method schedules a timer event to trigger the transfer of the 124 next frame.""" 125 self._timer.schedule_timer_event(self._get_device_time_in_ticks())
For use if the TransferMode is TransferMode.TIMER, this method schedules a timer event to trigger the transfer of the next frame.
buffer_status property
Returns
A
OnDeviceBufferStatus
object containing information about the device's buffer.
frame_info property
Returns
A
FrameInfo
object containing information about the most recent frame.
signal_status property
Returns
A
SignalStatus
object containing information about the signal connected to the device.
157 def start_grabbing(self) -> None: 158 """Starts the hardware acquiring frames.""" 159 start_capture_result = self.mw_start_video_capture(self._channel, self.events.transfer_complete.win32_event) # type: ignore 160 if start_capture_result != MW_SUCCEEDED: 161 raise ProCaptureError(f"Start capture failed (error code {start_capture_result}).") 162 if self.signal_status.state == SignalState.NONE: 163 logger.warning("Input signal status: None") 164 elif self.signal_status.state == SignalState.UNSUPPORTED: 165 logger.warning("Input signal status: Unsupported") 166 elif self.signal_status.state == SignalState.LOCKING: 167 logger.info("Input signal status: Locking") 168 elif self.signal_status.state == SignalState.LOCKED: 169 logger.info("Input signal status: Locked") 170 171 # Exit if signal not locked 172 if self.signal_status.state != SignalState.LOCKED: 173 self.mw_stop_video_capture(self._channel) # type: ignore 174 self.shutdown() 175 raise ProCaptureError("Signal not locked. It is likely that no video signal is present.")
Starts the hardware acquiring frames.
177 def stop_grabbing(self) -> None: 178 """Stops the hardware acquiring frames.""" 179 self.mw_stop_video_capture(self._channel) # type: ignore
Stops the hardware acquiring frames.
transfer_status property
Returns
A
TransferStatus
object containing information about the transfer of frames from the device to the PC, including the number of frames transferred, and the number of lines of the current frame that have been transferred.
205 def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime: 206 """start_a_frame_transfer starts the transfer of lines from the device to a buffer in PC memory. 207 Args: 208 frame_buffer (Array[c_char]): A buffer in PC memory to which the lines of the current frame will be 209 transferred. 210 Returns: 211 The time (datetime.datetime) at which the transfer of the first line of the current frame started. 212 """ 213 in_low_latency_mode = self.transfer_mode == TransferMode.LOW_LATENCY 214 notify_size = self._settings.num_lines_per_chunk if in_low_latency_mode else 0 215 216 seconds_since_init = self._get_device_time_in_s() - self._device_init_time.device_time_in_s 217 frame_timestamp = self._device_init_time.system_time_as_datetime + timedelta(seconds=seconds_since_init) 218 result = self.mw_capture_video_frame_to_virtual_address_ex( # type: ignore 219 hchannel=self._channel, 220 iframe=self.buffer_status.last_buffered_frame_index, 221 pbframe=addressof(frame_buffer), 222 cbframe=self._settings.image_size_in_bytes, 223 cbstride=self._settings.min_stride, 224 bbottomup=False, # this is True in the C++ example, but false in python example, 225 pvcontext=0, 226 dwfourcc=self._settings.color_format.value, # color format of captured frames 227 cx=self._settings.dimensions.cols, 228 cy=self._settings.dimensions.rows, 229 dwprocessswitchs=0, 230 cypartialnotify=notify_size, 231 hosdimage=0, 232 posdrects=0, 233 cosdrects=0, 234 scontrast=100, 235 sbrightness=0, 236 ssaturation=100, 237 shue=0, 238 deinterlacemode=MWCAP_VIDEO_DEINTERLACE_BLEND, 239 aspectratioconvertmode=MWCAP_VIDEO_ASPECT_RATIO_CROPPING, 240 prectsrc=0, # 0 in C++ example, but configured using CLIP settings in python example, 241 prectdest=0, 242 naspectx=0, 243 naspecty=0, 244 colorformat=MWCAP_VIDEO_COLOR_FORMAT_UNKNOWN, 245 quantrange=MWCAP_VIDEO_QUANTIZATION_UNKNOWN, 246 satrange=MWCAP_VIDEO_SATURATION_UNKNOWN, 247 ) 248 if result != MW_SUCCEEDED: 249 raise ProCaptureError(f"Frame grab failed with error code {result}") 250 else: 251 return frame_timestamp
start_a_frame_transfer starts the transfer of lines from the device to a buffer in PC memory.
Args
- frame_buffer (Array[c_char]): A buffer in PC memory to which the lines of the current frame will be
- transferred.
Returns
The time (datetime.datetime) at which the transfer of the first line of the current frame started.
253 def shutdown(self) -> None: 254 """shutdown releases the hardware resources used by the device.""" 255 self._timer.shutdown() 256 self._signal_change_event.destroy() 257 self._transfer_complete_event.destroy() 258 self._frame_buffered_event.destroy() 259 self._frame_buffering_event.destroy() 260 261 self.mw_close_channel(self._channel) # type: ignore 262 self.mw_capture_exit_instance() # type: ignore
shutdown releases the hardware resources used by the device.
Inherited Members
- mwcapture.libmwcapture.mw_capture
- load_win_funcs
- mw_capture_init_instance
- mw_capture_exit_instance
- mw_get_version
- mw_refresh_device
- mw_get_channel_count
- mw_get_channel_info_by_index
- mw_get_channel_info
- mw_get_device_path
- mw_open_channel_by_path
- mw_close_channel
- mw_create_video_capture
- mw_destory_video_capture
- mw_create_audio_capture
- mw_destory_audio_capture
- mw_get_video_signal_status
- mw_get_audio_signal_status
- mw_get_video_caps
- mw_start_video_capture
- mw_register_notify
- mw_register_timer
- mw_get_device_time
- mw_pin_video_buffer
- mw_schedule_timer
- mw_get_video_buffer_info
- mw_get_video_frame_info
- mw_capture_video_frame_to_virtual_address_ex
- mw_unpin_video_buffer
- mw_unregister_notify
- mw_unregister_timer
- mw_stop_video_capture
- mw_get_video_capture_status
- mw_start_audio_capture
- mw_get_notify_status
- mw_capture_audio_frame
- mw_stop_audio_capture
- mw_start_video_eco_capture
- mw_get_video_eco_capture_status
- mwcapture_set_video_eco_frame
- mw_stop_video_eco_capture
54class MockProCaptureDevice(ProCaptureDeviceImpl): 55 """MockProCaptureDevice is intended to be used during testing, development and CI in the absence of a hardware frame 56 grabber or Magewell Windows SDK. Does not require Magewell driver or hardware. 57 58 Only TransferMode.Timer is supported. 59 60 The class generates test frames. The frame rate is limited to 2 frames per second because copying the mock frames 61 to a provided PC transfer buffer takes a surprisingly long time (~0.11s). 62 63 It's recommended to use ColourFormat.RGB24 only. You can use some other formats if you have ffmpeg installed, but 64 this is quite slow.""" 65 66 def __init__(self, settings: ProCaptureSettings): 67 """ 68 Args: 69 settings (ProCaptureSettings): The settings to use for the mock device. settings.transfer_mode must be 70 set to TransferMode.Timer. 71 """ 72 if settings.transfer_mode != TransferMode.TIMER: 73 raise ValueError("MockProCaptureDevice only works in Timer transfer mode.") 74 super().__init__(settings) 75 self._is_grabbing = False 76 self._events = ProCaptureEvents( 77 transfer_complete=TransferCompleteEvent(), 78 signal_change=SignalChangeEvent(), 79 frame_buffered=FrameBufferedEvent(), 80 frame_buffering=FrameBufferingEvent(), 81 timer_event=TimerEvent(), 82 ) 83 self._events.signal_change.register(Notification(0, 0)) 84 self._events.timer_event.register(Notification(0, 0)) 85 86 self._mock_timer = _MockTimer(self._events.timer_event, MOCK_FRAME_RATE_HZ) 87 88 self._frame_counter: int = 0 89 mock_frames_np_arrays = [create_mock_frame() for _ in range(NUM_TEST_FRAMES)] 90 for i, frame in enumerate(mock_frames_np_arrays): 91 putText( 92 frame, 93 str(i), 94 (frame.shape[1] // 2, frame.shape[0] // 2), 95 FONT_HERSHEY_SIMPLEX, 96 1, 97 (255, 255, 255), 98 1, 99 LINE_AA, 100 ) 101 self._mock_frames: List[bytes] = [] 102 if self.frame_properties.format == ColourFormat.RGB24: 103 self._mock_frames = [frame.tobytes() for frame in mock_frames_np_arrays] 104 else: 105 ffmpeg = FFMPEG("FFMPEG is required to use Mock mode with any colour format other than RGB24.") 106 self._mock_frames = [ 107 ffmpeg.encode_rgb24_array(frame, self.frame_properties.format) for frame in mock_frames_np_arrays 108 ] 109 110 @property 111 def events(self) -> ProCaptureEvents: 112 """events property 113 Returns: 114 A `ProCaptureEvents` object containing handles to the events generated by the device during frame grabbing. 115 """ 116 return self._events 117 118 def schedule_timer_event(self) -> None: 119 self._mock_timer.schedule_event() 120 121 @property 122 def buffer_status(self) -> OnDeviceBufferStatus: 123 return OnDeviceBufferStatus( 124 buffer_size_in_frames=1, 125 num_chunks_in_buffer=1, 126 buffering_field_index=1, 127 last_buffered_field_index=1, 128 last_buffered_frame_index=1, 129 num_fully_buffered_frames=1, 130 num_chunks_being_buffered=1, 131 ) 132 133 @property 134 def frame_info(self) -> FrameInfo: 135 return FrameInfo( 136 state=FrameState.BUFFERED, 137 interlaced=False, 138 segmented=False, 139 dimensions=MOCK_RESOLUTION, 140 aspect_ratio=MOCK_ASPECT_RATIO, 141 buffering_start_time=datetime.now(), 142 buffering_complete_time=datetime.now(), 143 ) 144 145 @property 146 def signal_status(self) -> SignalStatus: 147 return SignalStatus( 148 state=SignalState.LOCKED, 149 start_position=ImageCoordinateInPixels(row=0, col=0), 150 image_dimensions=MOCK_RESOLUTION, 151 total_dimensions=MOCK_RESOLUTION, 152 interlaced=False, 153 frame_period_s=1 / MOCK_FRAME_RATE_HZ, 154 aspect_ratio=MOCK_ASPECT_RATIO, 155 segmented=False, 156 ) 157 158 @property 159 def transfer_status(self) -> TransferStatus: 160 return TransferStatus( 161 whole_frame_transferred=True, 162 num_lines_transferred=MOCK_RESOLUTION.rows, 163 num_lines_transferred_previously=MOCK_RESOLUTION.rows, 164 frame_index=0, 165 ) 166 167 def start_grabbing(self) -> None: 168 self._is_grabbing = True 169 170 def stop_grabbing(self) -> None: 171 self._is_grabbing = False 172 173 def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime: 174 """start_a_frame_transfer immediately writes a mock frame to the provided buffer. 175 Args: 176 frame_buffer (Array[c_char]): The buffer to write the mock frame to. 177 Returns: 178 The time (datetime.datetime) the frame transfer was completed. 179 """ 180 frame_buffer[: self.frame_properties.size_in_bytes] = self._mock_frames[ # type: ignore 181 self._frame_counter % NUM_TEST_FRAMES 182 ] 183 self.events.transfer_complete.set() 184 self._frame_counter += 1 185 return datetime.now() 186 187 def shutdown(self) -> None: 188 self._is_grabbing = False
MockProCaptureDevice is intended to be used during testing, development and CI in the absence of a hardware frame grabber or Magewell Windows SDK. Does not require Magewell driver or hardware.
Only TransferMode.Timer is supported.
The class generates test frames. The frame rate is limited to 2 frames per second because copying the mock frames to a provided PC transfer buffer takes a surprisingly long time (~0.11s).
It's recommended to use ColourFormat.RGB24 only. You can use some other formats if you have ffmpeg installed, but this is quite slow.
66 def __init__(self, settings: ProCaptureSettings): 67 """ 68 Args: 69 settings (ProCaptureSettings): The settings to use for the mock device. settings.transfer_mode must be 70 set to TransferMode.Timer. 71 """ 72 if settings.transfer_mode != TransferMode.TIMER: 73 raise ValueError("MockProCaptureDevice only works in Timer transfer mode.") 74 super().__init__(settings) 75 self._is_grabbing = False 76 self._events = ProCaptureEvents( 77 transfer_complete=TransferCompleteEvent(), 78 signal_change=SignalChangeEvent(), 79 frame_buffered=FrameBufferedEvent(), 80 frame_buffering=FrameBufferingEvent(), 81 timer_event=TimerEvent(), 82 ) 83 self._events.signal_change.register(Notification(0, 0)) 84 self._events.timer_event.register(Notification(0, 0)) 85 86 self._mock_timer = _MockTimer(self._events.timer_event, MOCK_FRAME_RATE_HZ) 87 88 self._frame_counter: int = 0 89 mock_frames_np_arrays = [create_mock_frame() for _ in range(NUM_TEST_FRAMES)] 90 for i, frame in enumerate(mock_frames_np_arrays): 91 putText( 92 frame, 93 str(i), 94 (frame.shape[1] // 2, frame.shape[0] // 2), 95 FONT_HERSHEY_SIMPLEX, 96 1, 97 (255, 255, 255), 98 1, 99 LINE_AA, 100 ) 101 self._mock_frames: List[bytes] = [] 102 if self.frame_properties.format == ColourFormat.RGB24: 103 self._mock_frames = [frame.tobytes() for frame in mock_frames_np_arrays] 104 else: 105 ffmpeg = FFMPEG("FFMPEG is required to use Mock mode with any colour format other than RGB24.") 106 self._mock_frames = [ 107 ffmpeg.encode_rgb24_array(frame, self.frame_properties.format) for frame in mock_frames_np_arrays 108 ]
Args
- settings (ProCaptureSettings): The settings to use for the mock device. settings.transfer_mode must be set to TransferMode.Timer.
events property
Returns
A
ProCaptureEvents
object containing handles to the events generated by the device during frame grabbing.
173 def start_a_frame_transfer(self, frame_buffer: Array[c_char]) -> datetime: 174 """start_a_frame_transfer immediately writes a mock frame to the provided buffer. 175 Args: 176 frame_buffer (Array[c_char]): The buffer to write the mock frame to. 177 Returns: 178 The time (datetime.datetime) the frame transfer was completed. 179 """ 180 frame_buffer[: self.frame_properties.size_in_bytes] = self._mock_frames[ # type: ignore 181 self._frame_counter % NUM_TEST_FRAMES 182 ] 183 self.events.transfer_complete.set() 184 self._frame_counter += 1 185 return datetime.now()
start_a_frame_transfer immediately writes a mock frame to the provided buffer.
Args
- frame_buffer (Array[c_char]): The buffer to write the mock frame to.
Returns
The time (datetime.datetime) the frame transfer was completed.
25class ProCaptureController: 26 """ProCaptureController controls the transfer of frames from a ProCaptureDevice or MockProCaptureDevice to a PC.""" 27 28 def __init__(self, device: ProCaptureDeviceInterface): 29 """ 30 Args: 31 device (ProCaptureDeviceInterface): The implementation of ProCaptureDeviceInterface to use for frame 32 transfer. ProCaptureDevice or MockProCaptureDevice are both valid implementations. 33 """ 34 self._device = device 35 self._transfer_buffer = create_string_buffer(3840 * 2160 * 4) 36 self._device.start_grabbing() 37 38 def transfer_when_ready(self, timeout_ms: int = 2000) -> VideoFrame: 39 """transfer_when_ready wait for the device to be ready to start transferring, transfers it and returns it. 40 41 This method will block until the frame has been transferred or the timeout has been reached. 42 43 In TransferMode.TIMER and TransferMode.NORMAL, frame transfer will start after a whole frame has been grabbed 44 by the device. In TransferMode.LOW_LATENCY, frame transfer will start after a frame has started to be 45 buffered onto the device. 46 """ 47 if self._device.transfer_mode == TransferMode.TIMER: 48 self._device.schedule_timer_event() 49 event = self._wait_for_event(timeout_ms=timeout_ms) 50 frame = self._handle_event(event) 51 if frame is None: 52 return self.transfer_when_ready() 53 else: 54 return frame 55 56 def _wait_for_event(self, timeout_ms: int) -> Event: 57 """Wait for events to be raised by the ProCaptureDevice or Timer, and return the raised event.""" 58 if self._device.transfer_mode == TransferMode.TIMER: 59 grab_event: Event = self._device.events.timer_event 60 elif self._device.transfer_mode == TransferMode.NORMAL: 61 grab_event = self._device.events.frame_buffered 62 elif self._device.transfer_mode == TransferMode.LOW_LATENCY: 63 grab_event = self._device.events.frame_buffering 64 else: 65 raise NotImplementedError("Invalid grab mode.") 66 67 events_to_wait_for = [grab_event, self._device.events.signal_change] 68 try: 69 event_that_occurred = wait_for_events(events_to_wait_for, timeout_ms=timeout_ms) 70 except WaitForEventTimeout as e: 71 self.shutdown() 72 raise e 73 return event_that_occurred 74 75 @singledispatchmethod 76 def _handle_event(self, event: Event) -> Optional[VideoFrame]: 77 """Handle a raised event, including transferring a frame if the event means one is ready.""" 78 raise NotImplementedError() 79 80 @_handle_event.register 81 def _(self, event: TimerEvent) -> Optional[VideoFrame]: 82 """If timer event received, then whole frame is on device. Only subscribed to in TIMER mode. This method 83 transfers the frame to a buffer in PC memory, makes a copy, marks the buffer memory as free and then returns 84 the copy.""" 85 transfer_started_timestamp = self._device.start_a_frame_transfer(self._transfer_buffer) 86 buffering_started_timestamp = self._device.frame_info.buffering_start_time 87 transfer_complete_timestamp = self._wait_for_frame_or_chunk_transfer_to_complete(timeout_ms=2000) 88 buffering_complete_timestamp = self._device.frame_info.buffering_complete_time 89 if not self._device.transfer_status.whole_frame_transferred: # this marks the buffer memory as free 90 raise ProCaptureError("Only part of frame has been acquired") 91 return self._format_frame( 92 timestamps=VideoFrameTimestamps( 93 transfer_started=transfer_started_timestamp, 94 transfer_complete=transfer_complete_timestamp, 95 buffering_started=buffering_started_timestamp, 96 buffering_complete=buffering_complete_timestamp, 97 ) 98 ) 99 100 @_handle_event.register 101 def _(self, event: FrameBufferedEvent) -> Optional[VideoFrame]: 102 """If FrameBufferedEvent event received, then whole frame is on device. This event is only subscribed to in 103 NORMAL mode. This method transfers it to a buffer in PC memory, makes a copy, marks the buffer memory as free 104 and then returns the copy.""" 105 transfer_started_timestamp = self._device.start_a_frame_transfer(self._transfer_buffer) 106 transfer_complete_timestamp = self._wait_for_frame_or_chunk_transfer_to_complete(timeout_ms=2000) 107 buffering_started_timestamp = self._device.frame_info.buffering_start_time 108 buffering_complete_timestamp = self._device.frame_info.buffering_complete_time 109 if not self._device.transfer_status.whole_frame_transferred: # this marks the buffer memory as free 110 raise ProCaptureError("Only part of frame has been acquired") 111 return self._format_frame( 112 timestamps=VideoFrameTimestamps( 113 transfer_started=transfer_started_timestamp, 114 transfer_complete=transfer_complete_timestamp, 115 buffering_started=buffering_started_timestamp, 116 buffering_complete=buffering_complete_timestamp, 117 ) 118 ) 119 120 @_handle_event.register 121 def _(self, event: FrameBufferingEvent) -> Optional[VideoFrame]: 122 """If FrameBufferingEvent event received, then a frame has started to be acquired by the card. This event is 123 only subscribed to in LOW_LATENCY mode. This method starts the transfer of the available lines to a buffer in 124 PC memory while the acquisition is still happening. It then waits until all lines have been received (this query 125 also frees the memory), copies the buffer contents and returns the copy.""" 126 transfer_started_timestamp = self._device.start_a_frame_transfer(self._transfer_buffer) 127 buffering_started_timestamp = self._device.frame_info.buffering_start_time 128 self._wait_for_frame_or_chunk_transfer_to_complete(timeout_ms=2000) 129 wait_start_t = time.perf_counter() 130 while ( 131 self._device.transfer_status.num_lines_transferred < self._device.frame_properties.dimensions.rows 132 and (time.perf_counter() - wait_start_t) < 1 133 ): 134 pass 135 transfer_complete_timestamp = datetime.now() 136 buffering_complete_timestamp = self._device.frame_info.buffering_complete_time 137 138 return self._format_frame( 139 timestamps=VideoFrameTimestamps( 140 transfer_started=transfer_started_timestamp, 141 transfer_complete=transfer_complete_timestamp, 142 buffering_started=buffering_started_timestamp, 143 buffering_complete=buffering_complete_timestamp, 144 ) 145 ) 146 147 @_handle_event.register 148 def _(self, event: SignalChangeEvent) -> None: 149 """If a SignalChangeEvent is received, then the source signal has changed and no frame is available.""" 150 logger.info("Frame grabber signal change detected") 151 152 def _format_frame(self, timestamps: VideoFrameTimestamps) -> VideoFrame: 153 """Copy the contents of the transfer buffer, and return it as a VideoFrame.""" 154 # Copy the acquired frame 155 string_buffer = string_at(self._transfer_buffer, self._device.frame_properties.size_in_bytes) 156 frame = VideoFrame( 157 string_buffer, 158 dimensions=self._device.frame_properties.dimensions, 159 timestamps=timestamps, 160 format=self._device.frame_properties.format, 161 ) 162 return frame 163 164 def _wait_for_frame_or_chunk_transfer_to_complete(self, timeout_ms: int) -> datetime: 165 """Waits until a whole frame (or chunk of a frame in low latency mode) has been transferred to the buffer in 166 PC memory.""" 167 try: 168 wait_for_event(self._device.events.transfer_complete, timeout_ms=timeout_ms) 169 return datetime.now() 170 except WaitForEventTimeout as e: 171 self.shutdown() 172 raise e 173 174 def shutdown(self) -> None: 175 """Shuts down the frame grabber device.""" 176 self._device.stop_grabbing() 177 self._device.shutdown()
ProCaptureController controls the transfer of frames from a ProCaptureDevice or MockProCaptureDevice to a PC.
28 def __init__(self, device: ProCaptureDeviceInterface): 29 """ 30 Args: 31 device (ProCaptureDeviceInterface): The implementation of ProCaptureDeviceInterface to use for frame 32 transfer. ProCaptureDevice or MockProCaptureDevice are both valid implementations. 33 """ 34 self._device = device 35 self._transfer_buffer = create_string_buffer(3840 * 2160 * 4) 36 self._device.start_grabbing()
Args
- device (ProCaptureDeviceInterface): The implementation of ProCaptureDeviceInterface to use for frame transfer. ProCaptureDevice or MockProCaptureDevice are both valid implementations.
38 def transfer_when_ready(self, timeout_ms: int = 2000) -> VideoFrame: 39 """transfer_when_ready wait for the device to be ready to start transferring, transfers it and returns it. 40 41 This method will block until the frame has been transferred or the timeout has been reached. 42 43 In TransferMode.TIMER and TransferMode.NORMAL, frame transfer will start after a whole frame has been grabbed 44 by the device. In TransferMode.LOW_LATENCY, frame transfer will start after a frame has started to be 45 buffered onto the device. 46 """ 47 if self._device.transfer_mode == TransferMode.TIMER: 48 self._device.schedule_timer_event() 49 event = self._wait_for_event(timeout_ms=timeout_ms) 50 frame = self._handle_event(event) 51 if frame is None: 52 return self.transfer_when_ready() 53 else: 54 return frame
transfer_when_ready wait for the device to be ready to start transferring, transfers it and returns it.
This method will block until the frame has been transferred or the timeout has been reached.
In TransferMode.TIMER and TransferMode.NORMAL, frame transfer will start after a whole frame has been grabbed by the device. In TransferMode.LOW_LATENCY, frame transfer will start after a frame has started to be buffered onto the device.
336@dataclass 337class ProCaptureSettings: 338 """Settings for the ProCapture device.""" 339 340 dimensions: ImageSizeInPixels = ImageSizeInPixels(1920, 1080) 341 """The dimensions of the frames to be acquired in pixels.""" 342 color_format: ColourFormat = ColourFormat.BGR24 343 """The colour format of the frames to be acquired.""" 344 transfer_mode: TransferMode = TransferMode.NORMAL 345 """The method to use for transferring frames from the device to the PC. See `TransferMode` for details.""" 346 num_lines_per_chunk: int = 64 347 """The number of lines of a frame to transfer at a time (for `TransferMode.LOW_LATENCY` transfers).""" 348 349 def __post_init__(self) -> None: 350 _check_valid_chunk_size(self.num_lines_per_chunk) 351 352 @property 353 def min_stride(self) -> int: 354 return cast(int, fourcc_calc_min_stride(self.color_format.value, self.dimensions.cols, 2)) # type: ignore 355 356 @property 357 def image_size_in_bytes(self) -> int: 358 if self.color_format == ColourFormat.NV12: 359 return self.dimensions.cols * self.dimensions.rows * 2 # copied from line 223 of capture.py 360 else: 361 return cast( 362 int, 363 fourcc_calc_image_size( # type: ignore 364 self.color_format, 365 self.dimensions.cols, 366 self.dimensions.rows, 367 self.min_stride, 368 ), 369 )
Settings for the ProCapture device.
The method to use for transferring frames from the device to the PC. See TransferMode
for details.
The number of lines of a frame to transfer at a time (for TransferMode.LOW_LATENCY
transfers).
281class TransferMode(Enum): 282 """Enumeration of the supported methods for triggering the transfer of frames to the PC.""" 283 284 TIMER = 0 285 """ Transferred are triggered by a software timer event, allowing arbitrary frame rates. This is the only mode 286 supported by MockProCaptureDevice. """ 287 NORMAL = 1 288 """ Transfers are triggered by a notification received from the device when a whole frame has been received, and 289 therefore grabbing happens at the source frame rate.""" 290 LOW_LATENCY = 2 291 """ Transfers are triggered by a notification received from the device when the first chunk of a frame has been 292 received. Grabbing happens at the source frame rate, but with a lower latency."""
Enumeration of the supported methods for triggering the transfer of frames to the PC.
Transferred are triggered by a software timer event, allowing arbitrary frame rates. This is the only mode supported by MockProCaptureDevice.
Transfers are triggered by a notification received from the device when a whole frame has been received, and therefore grabbing happens at the source frame rate.
Transfers are triggered by a notification received from the device when the first chunk of a frame has been received. Grabbing happens at the source frame rate, but with a lower latency.
Inherited Members
- enum.Enum
- name
- value
75class ColourFormat(Enum): 76 """Enumeration of the supported colour formats.""" 77 78 UNK = MWFOURCC_UNK 79 GREY = MWFOURCC_GREY 80 Y800 = MWFOURCC_Y800 81 Y8 = MWFOURCC_Y8 82 Y16 = MWFOURCC_Y16 83 RGB15 = MWFOURCC_RGB15 84 RGB16 = MWFOURCC_RGB16 85 RGB24 = MWFOURCC_RGB24 86 RGBA = MWFOURCC_RGBA 87 ARGB = MWFOURCC_ARGB 88 BGR15 = MWFOURCC_BGR15 89 BGR16 = MWFOURCC_BGR16 90 BGR24 = MWFOURCC_BGR24 91 BGRA = MWFOURCC_BGRA 92 ABGR = MWFOURCC_ABGR 93 NV16 = MWFOURCC_NV16 94 NV61 = MWFOURCC_NV61 95 I422 = MWFOURCC_I422 96 YV16 = MWFOURCC_YV16 97 YUY2 = MWFOURCC_YUY2 98 YUYV = MWFOURCC_YUYV 99 UYVY = MWFOURCC_UYVY 100 YVYU = MWFOURCC_YVYU 101 VYUY = MWFOURCC_VYUY 102 I420 = MWFOURCC_I420 103 IYUV = MWFOURCC_IYUV 104 NV12 = MWFOURCC_NV12 105 YV12 = MWFOURCC_YV12 106 NV21 = MWFOURCC_NV21 107 P010 = MWFOURCC_P010 108 P210 = MWFOURCC_P210 109 IYU2 = MWFOURCC_IYU2 110 V308 = MWFOURCC_V308 111 AYUV = MWFOURCC_AYUV 112 UYVA = MWFOURCC_UYVA 113 V408 = MWFOURCC_V408 114 VYUA = MWFOURCC_VYUA 115 V210 = MWFOURCC_V210 116 Y410 = MWFOURCC_Y410 117 V410 = MWFOURCC_V410 118 RGB10 = MWFOURCC_RGB10 119 BGR10 = MWFOURCC_BGR10 120 121 @property 122 def fourcc_string(self) -> str: 123 return struct.pack("<I", self.value).decode("utf-8") 124 125 @property 126 def bits_per_pixel(self) -> int: 127 return cast(int, fourcc_get_bpp(self.value)) # type: ignore 128 129 @property 130 def has_alpha_channel(self) -> bool: 131 return "A" in self.fourcc_string 132 133 @property 134 def num_channels(self) -> int: 135 if self in [ColourFormat.Y8, ColourFormat.Y16, ColourFormat.Y800, ColourFormat.GREY]: 136 return 1 137 elif self.has_alpha_channel: 138 return 4 139 else: 140 return 3 141 142 @property 143 def colour_space(self) -> ColourSpace: 144 if self == ColourFormat.UNK: 145 return ColourSpace.UNKNOWN 146 elif self in [ColourFormat.GREY, ColourFormat.Y800, ColourFormat.Y8, ColourFormat.Y16]: 147 return ColourSpace.GREY 148 elif self in [ 149 ColourFormat.RGB24, 150 ColourFormat.RGB10, 151 ColourFormat.RGB15, 152 ColourFormat.RGB16, 153 ColourFormat.ARGB, 154 ColourFormat.RGBA, 155 ColourFormat.BGR24, 156 ColourFormat.BGR10, 157 ColourFormat.BGR15, 158 ColourFormat.BGR16, 159 ColourFormat.ABGR, 160 ColourFormat.BGRA, 161 ]: 162 return ColourSpace.RGB 163 else: 164 return ColourSpace.YUV 165 166 def channel_order(self) -> RGBChannelOrder: 167 if self.colour_space != ColourSpace.RGB: 168 raise NotImplementedError("Channel order property only implemented for RGB colour formats") 169 if self in [ 170 ColourFormat.RGB24, 171 ColourFormat.ARGB, 172 ColourFormat.RGBA, 173 ColourFormat.RGB15, 174 ColourFormat.RGB16, 175 ColourFormat.RGB10, 176 ]: 177 return RGBChannelOrder.RGB 178 elif self in [ 179 ColourFormat.BGR24, 180 ColourFormat.ABGR, 181 ColourFormat.BGRA, 182 ColourFormat.BGR15, 183 ColourFormat.BGR16, 184 ColourFormat.BGR10, 185 ]: 186 return RGBChannelOrder.BGR 187 else: 188 raise NotImplementedError(f"Channel order property not implemented for colour format {self}.") 189 190 @property 191 def alpha_channel_index(self) -> int: 192 if not self.has_alpha_channel: 193 raise ValueError(f"Colour format {self} does not have an alpha channel") 194 alpha_index = self.fourcc_string.find("A") 195 if alpha_index == -1: 196 raise ValueError(f"Could not find index of alpha channel for colour format {self}") 197 return alpha_index 198 199 def as_ffmpeg_pixel_format(self) -> str: 200 if self == ColourFormat.UNK: 201 raise ValueError("Colour format not known") 202 return ffmpeg_pixel_formats[self] 203 204 @property 205 def pixel_dtype(self) -> type: 206 if self == ColourFormat.UNK: 207 raise ValueError("Colour format not known") 208 bits_per_sample_per_channel = int(floor(self.bits_per_pixel / self.num_channels)) 209 if bits_per_sample_per_channel <= 8: 210 return uint8 211 elif bits_per_sample_per_channel <= 16: 212 return uint16 213 else: 214 raise ValueError("ColourFormat has unrecognised structure.")
Enumeration of the supported colour formats.
166 def channel_order(self) -> RGBChannelOrder: 167 if self.colour_space != ColourSpace.RGB: 168 raise NotImplementedError("Channel order property only implemented for RGB colour formats") 169 if self in [ 170 ColourFormat.RGB24, 171 ColourFormat.ARGB, 172 ColourFormat.RGBA, 173 ColourFormat.RGB15, 174 ColourFormat.RGB16, 175 ColourFormat.RGB10, 176 ]: 177 return RGBChannelOrder.RGB 178 elif self in [ 179 ColourFormat.BGR24, 180 ColourFormat.ABGR, 181 ColourFormat.BGRA, 182 ColourFormat.BGR15, 183 ColourFormat.BGR16, 184 ColourFormat.BGR10, 185 ]: 186 return RGBChannelOrder.BGR 187 else: 188 raise NotImplementedError(f"Channel order property not implemented for colour format {self}.")
Inherited Members
- enum.Enum
- name
- value