Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 145 additions & 69 deletions src/pymodaq_plugins_utils/hardware/camera_base_pylablib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import dataclasses
from typing import Type

import cv2

from pymodaq_data import DataToExport
from pymodaq_utils.logger import set_logger, get_module_name
from pymodaq_utils.utils import ThreadCommand
from pymodaq_gui.parameter import Parameter
Expand All @@ -10,15 +15,20 @@
from pymodaq.utils.data import DataFromPlugins, Axis
from pymodaq.control_modules.viewer_utility_classes import DAQ_Viewer_base, comon_parameters, main

from pylablib.devices.interface.camera import trim_frames

from qtpy import QtWidgets, QtCore
import numpy as np
from time import perf_counter


logger = set_logger(get_module_name(__file__))


cam_params = [
{'title': 'Camera name:', 'name': 'camera_name', 'type': 'str', 'value': '', 'readonly': True},
{'title': 'Sensor type:', 'name': 'sensor', 'type': 'list', 'limits': ['Monochrome', 'Bayer']},
{'title': 'Ouput Color:', 'name': 'output_color', 'type': 'list', 'limits': ['RGB', 'MonoChrome']},
{'title': 'Color Conversion:', 'name': 'color_conversion', 'type': 'list',
'limits': ['None', 'RGB2GRAY', 'BAYER_BG2RGB', 'BAYER_BG2GRAY']},
{'title': 'ROI', 'name': 'roi', 'type': 'group', 'children': [
{'title': 'Update ROI from Viewer', 'name': 'update_roi', 'type': 'led', 'value': False},
{'title': 'Apply ROI', 'name': 'apply_roi', 'type': 'led', 'value': False},
Expand All @@ -42,6 +52,60 @@
]


@dataclasses.dataclass
class Grab:
do_acquisition: bool = True
snap: bool = False
since: str = 'now'
nframes: int = 1
n_average: int = 1


class CameraCallback(QtCore.QObject):
"""Callback object """
data_sig = QtCore.Signal(np.ndarray)
error = QtCore.Signal()

def __init__(self, controller):
super().__init__()
# Set the wait function
self.controller = controller
self.do_acquisition = True

def set_do_grab(self, mode: Grab):
self.do_acquisition = mode.do_acquisition
if mode.do_acquisition:
self.wait_for_acquisition(mode)

def wait_for_acquisition(self, mode: Grab):
while self.do_acquisition:
try:
ind_average = 0
while ind_average < mode.n_average:
ind_frames = 0
while ind_frames < mode.nframes:
self.controller.wait_for_frame(since='now')
new_frames, rng = self.controller.read_multiple_images(missing_frame='skip', return_rng=True)
if ind_average == 0 and ind_frames == 0:
shape = list(new_frames.shape[1:])
shape = [mode.n_average, mode.nframes] + shape
frames = np.zeros(shape, dtype=new_frames.dtype)
nacq = rng[1] - rng[0]
frames[ind_average, ind_frames:nacq, ...] = new_frames
ind_frames += nacq
ind_average += 1
self.data_sig.emit(frames)
QtCore.QThread.msleep(10)
except Exception as e:
logger.exception(str(e))
self.error.emit()
break
QtWidgets.QApplication.processEvents()
if not self.do_acquisition or mode.snap:
break



class CameraBasePyLabLib(DAQ_Viewer_base):
"""
Base implementation for Camera using pylablib framework. Works for TSI and uc480 thorlabs camera and rpobaly others
Expand All @@ -52,13 +116,15 @@ class CameraBasePyLabLib(DAQ_Viewer_base):

params = comon_parameters + serial_params + cam_params

callback_signal = QtCore.Signal(bool)
callback_signal = QtCore.Signal(Grab)
live_mode_available = True

hardware_averaging = True

def ini_attributes(self):
self.controller = None
self.callback_thread: QtCore.QThread = None
self.is_live: bool = False
self.Naverage: int = 1

self.x_axis: Axis = None
self.y_axis: Axis = None
Expand Down Expand Up @@ -135,31 +201,28 @@ def commit_settings(self, param: Parameter):
if param.name() == "exposure_time":
self.controller.set_exposure(param.value()/1000)

if param.name() == "fps_on":
elif param.name() == "fps_on":
self.settings.child('timing_opts', 'fps').setOpts(visible=param.value())

if param.name() == "apply_roi":
elif param.name() == "apply_roi":
if param.value(): # Switching on ROI
self.apply_roi()
else:
self.clear_roi()

if param.name() in ['x_binning', 'y_binning']:
elif param.name() in ['x_binning', 'y_binning']:
# We handle ROI and binning separately for clarity
(x0, w, y0, h, *_) = self.controller.get_roi() # Get current ROI
xbin = self.settings['roi', 'x_binning']
ybin = self.settings['roi', 'y_binning']
new_roi = (x0, w, xbin, y0, h, ybin)
self.update_rois(new_roi)

if param.name() == "clear_roi":
elif param.name() == "clear_roi":
if param.value(): # Switching on ROI
self.clear_roi()
param.setValue(False)

if param.name() == 'sensor':
self.get_set_color()

def ini_detector_custom(self, controller=None):
raise NotImplementedError

Expand All @@ -178,12 +241,13 @@ def ini_detector(self, controller=None):
initialized: bool
False if initialization failed otherwise True
"""

self.ini_detector_custom(controller)

self.get_device_info()
self.get_set_color()
self.get_set_main_parameters()
self.setup_callback_thread()
self.controller.set_frame_format("array")

info = "Initialized camera"
initialized = True
Expand All @@ -199,13 +263,6 @@ def get_device_info(self):
elif hasattr(device_info, 'model'):
self.settings.child('camera_name').setValue(device_info.model)

def get_set_color(self):
if 'monochrome' in self.settings['sensor'].lower():
self.settings.child('output_color').setValue('MonoChrome')
self.settings.child('output_color').setOpts(visible=False)
else:
self.settings.child('output_color').setOpts(visible=True)

def get_set_main_parameters(self):
# Set exposure time
self.controller.set_exposure(self.settings['timing_opts', 'exposure_time']/1000)
Expand All @@ -227,25 +284,37 @@ def get_set_main_parameters(self):
self.settings.child('roi', 'roi_slices').setValue(str(slices))
self.compute_axes()

@property
def callback(self) -> Type[CameraCallback]:
""" Return the class handling the wait for acquisition and signal emission

Should be reimplement as well as CameraCallback if needed
"""
return CameraCallback

def setup_callback_thread(self):
# Way to define a wait function with arguments
wait_func = lambda: self.controller.wait_for_frame(since=self.settings['buffer', 'mode'],
nframes=1, timeout=20.0)
callback = CameraCallback(wait_func)
callback = CameraCallback(self.controller)
self.settings.child('buffer', 'mode').setReadonly(True)


self.callback_thread = QtCore.QThread() # creation of a Qt5 thread
callback.moveToThread(self.callback_thread) # callback object will live within this thread

callback.data_sig.connect(
self.emit_data) # when the wait for acquisition returns (with data taken), emit_data will be fired
callback.error.connect(self.handle_error)

self.callback_signal.connect(callback.set_do_grab)
self.callback_thread.callback = callback
self.callback_thread.start()

self._prepare_view()

def handle_error(self):
self.stop()

def _prepare_view(self):
"""Preparing a data viewer by emitting temporary data. Typically, needs to be called whenever the
Expand Down Expand Up @@ -280,14 +349,20 @@ def grab_data(self, Naverage=1, **kwargs):
"""
try:
# Warning, acquisition_in_progress returns 1,0 and not a real bool
if not kwargs.get('live', False):
self.emit_data(self.controller.snap())
else:
if not self.controller.acquisition_in_progress():
self.controller.clear_acquisition()
self.controller.start_acquisition(nframes=self.settings['buffer', 'size'])
#Then start the acquisition
self.callback_signal.emit(True) # will trigger the wait for acquisition
self.is_live = kwargs.get('live', False)
self.Naverage = Naverage

self.n_frames = 1

if not self.controller.acquisition_in_progress():
self.controller.clear_acquisition()
self.controller.start_acquisition(nframes=self.n_frames)
#Then start the acquisition
self.callback_signal.emit(Grab(do_acquisition=True,
snap=not self.is_live,
n_average=Naverage,
nframes=self.n_frames,
since=self.settings['buffer', 'mode']))

except Exception as e:
self.emit_status(ThreadCommand('Update_Status', [str(e), "log"]))
Expand All @@ -309,21 +384,48 @@ def emit_data(self, frame: np.ndarray=None):
if frame is None:
frame = self.controller.read_newest_image()
# Emit the frame.
if frame is not None: # happens for last frame when stopping camera
if self.settings['output_color'] == 'RGB':
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BAYER_BG2RGB)
data_arrays = [np.atleast_1d(rgb_image[..., ind]) for ind in range(3)]
if frame is not None:
conversion_str = self.settings['color_conversion']
if conversion_str != "None":
for ind_average in range(frame.shape[0]):
for ind_frame in range(frame.shape[1]):
if ind_frame == 0 and ind_average == 0:
new_frame = cv2.cvtColor(frame[ind_average, ind_frame, ...],
getattr(cv2, f'COLOR_{conversion_str}'))
shape = [frame.shape[0], frame.shape[1]] + list(new_frame.shape)
out_frames = np.zeros(shape, dtype=new_frame.dtype)
out_frames[ind_average, ind_frame, ...] = new_frame
else:
cv2.cvtColor(frame[ind_average, ind_frame, ...],
getattr(cv2, f'COLOR_{conversion_str}'),
out_frames[ind_average, ind_frame, ...])
else:
out_frames = frame
if self.Naverage > 1:
out_frames = np.sum(out_frames, axis=0) / self.Naverage
else:
out_frames = out_frames[0, ...]

if self.n_frames > 1:
pass
#todo handle chunks of frames in ND data
else:
out_frames = out_frames[0, ...]

if out_frames.shape[-1] == 3:
data_arrays = [np.atleast_1d(out_frames[..., ind]) for ind in range(3)]
labels = ['Red', 'Green', 'Blue']
else:
if 'monochrome' in self.settings['sensor'].lower():
data_arrays = [np.atleast_1d(frame)]
else:
data_arrays = [np.atleast_1d(cv2.cvtColor(frame, cv2.COLOR_BAYER_BG2GRAY))]

self.data_grabed_signal.emit([DataFromPlugins(name='Thorlabs Camera',
data=data_arrays,
dim=self.data_shape,
labels=[f'ThorCam_{self.data_shape}'],
axes=[self.x_axis, self.y_axis])])
labels = ['Intensity']
data_arrays = [out_frames]

self.dte_signal.emit(
DataToExport('Camera',
data=[DataFromPlugins(name='Camera',
data=data_arrays,
dim=self.data_shape,
labels=labels,
axes=[self.y_axis, self.x_axis])]))
if self.settings.child('timing_opts', 'fps_on').value():
self.update_fps()

Expand Down Expand Up @@ -367,37 +469,11 @@ def close(self):

def stop(self):
"""Stop the acquisition."""
self.callback_signal.emit(False)
self.callback_signal.emit(Grab(do_acquisition=False))
QtWidgets.QApplication.processEvents()

self.controller.clear_acquisition()
return ''


class CameraCallback(QtCore.QObject):
"""Callback object """
data_sig = QtCore.Signal()

def __init__(self, wait_fn):
super().__init__()
# Set the wait function
self.wait_fn = wait_fn
self.do_grab = True

def set_do_grab(self, do_grab=True):
self.do_grab = do_grab
if do_grab:
self.wait_for_acquisition()

def wait_for_acquisition(self):
while self.do_grab:
try:
new_data = self.wait_fn()
if new_data is not False: # will be returned if the main thread called CancelWait
self.data_sig.emit()
except Exception as e:
pass
QtWidgets.QApplication.processEvents()



Loading