|
|
@ -1,68 +1,210 @@ |
|
|
|
# -*- coding: utf-8 -*- |
|
|
|
|
|
|
|
import array |
|
|
|
from datetime import datetime |
|
|
|
from nd2reader.model.metadata import Metadata |
|
|
|
from nd2reader.model.metadata import Metadata, CameraSettings |
|
|
|
from nd2reader.model.label import LabelMap |
|
|
|
from nd2reader.parser.base import BaseParser |
|
|
|
from nd2reader.driver.v3 import V3Driver |
|
|
|
from nd2reader.common.v3 import read_chunk |
|
|
|
from nd2reader.common.v3 import read_chunk, read_array, read_metadata |
|
|
|
import re |
|
|
|
import six |
|
|
|
import struct |
|
|
|
import xmltodict |
|
|
|
|
|
|
|
|
|
|
|
def ignore_missing(func): |
|
|
|
def wrapper(*args, **kwargs): |
|
|
|
try: |
|
|
|
return func(*args, **kwargs) |
|
|
|
except: |
|
|
|
return None |
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
|
|
|
class V3RawMetadata(object): |
|
|
|
def __init__(self, fh, label_map): |
|
|
|
self._fh = fh |
|
|
|
self._label_map = label_map |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def image_text_info(self): |
|
|
|
return read_metadata(read_chunk(self._fh, self._label_map.image_text_info), 1) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def image_metadata_sequence(self): |
|
|
|
return read_metadata(read_chunk(self._fh, self._label_map.image_metadata_sequence), 1) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def image_calibration(self): |
|
|
|
return read_metadata(read_chunk(self._fh, self._label_map.image_calibration), 1) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def image_attributes(self): |
|
|
|
return read_metadata(read_chunk(self._fh, self._label_map.image_attributes), 1) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def x_data(self): |
|
|
|
return read_array(self._fh, 'double', self._label_map.x_data) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def y_data(self): |
|
|
|
return read_array(self._fh, 'double', self._label_map.y_data) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def z_data(self): |
|
|
|
return read_array(self._fh, 'double', self._label_map.z_data) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def roi_metadata(self): |
|
|
|
return read_metadata(read_chunk(self._fh, self._label_map.roi_metadata), 1) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def pfs_status(self): |
|
|
|
return read_array(self._fh, 'int', self._label_map.pfs_status) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def pfs_offset(self): |
|
|
|
return read_array(self._fh, 'int', self._label_map.pfs_offset) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def camera_exposure_time(self): |
|
|
|
return read_array(self._fh, 'double', self._label_map.camera_exposure_time) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def lut_data(self): |
|
|
|
return xmltodict.parse(read_chunk(self._fh, self._label_map.lut_data)) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def grabber_settings(self): |
|
|
|
return xmltodict.parse(read_chunk(self._fh, self._label_map.grabber_settings)) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def custom_data(self): |
|
|
|
return xmltodict.parse(read_chunk(self._fh, self._label_map.custom_data)) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def app_info(self): |
|
|
|
return xmltodict.parse(read_chunk(self._fh, self._label_map.app_info)) |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def camera_temp(self): |
|
|
|
camera_temp = read_array(self._fh, 'double', self._label_map.camera_temp) |
|
|
|
if camera_temp: |
|
|
|
for temp in map(lambda x: round(x * 100.0, 2), camera_temp): |
|
|
|
yield temp |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def acquisition_times(self): |
|
|
|
acquisition_times = read_array(self._fh, 'double', self._label_map.acquisition_times) |
|
|
|
if acquisition_times: |
|
|
|
for acquisition_time in map(lambda x: x / 1000.0, acquisition_times): |
|
|
|
yield acquisition_time |
|
|
|
|
|
|
|
@property |
|
|
|
@ignore_missing |
|
|
|
def image_metadata(self): |
|
|
|
if self._label_map.image_metadata: |
|
|
|
return read_metadata(read_chunk(self._fh, self._label_map.image_metadata), 1) |
|
|
|
|
|
|
|
|
|
|
|
class V3Parser(BaseParser): |
|
|
|
""" Parses ND2 files and creates a Metadata and ImageReader object. """ |
|
|
|
""" Parses ND2 files and creates a Metadata and driver object. """ |
|
|
|
CHUNK_HEADER = 0xabeceda |
|
|
|
CHUNK_MAP_START = six.b("ND2 FILEMAP SIGNATURE NAME 0001!") |
|
|
|
CHUNK_MAP_END = six.b("ND2 CHUNK MAP SIGNATURE 0000001!") |
|
|
|
|
|
|
|
def __init__(self, fh): |
|
|
|
self._fh = fh |
|
|
|
self._metadata = None |
|
|
|
self._label_map = None |
|
|
|
""" |
|
|
|
:type fh: file |
|
|
|
|
|
|
|
@property |
|
|
|
def metadata(self): |
|
|
|
if not self._metadata: |
|
|
|
self._parse_metadata() |
|
|
|
return self._metadata |
|
|
|
""" |
|
|
|
if six.PY3: |
|
|
|
super().__init__(fh) |
|
|
|
else: |
|
|
|
super(V3Parser, self).__init__(fh) |
|
|
|
self._label_map = self._build_label_map() |
|
|
|
self.raw_metadata = V3RawMetadata(self._fh, self._label_map) |
|
|
|
self._parse_camera_metadata() |
|
|
|
self._parse_metadata() |
|
|
|
|
|
|
|
@property |
|
|
|
def driver(self): |
|
|
|
""" |
|
|
|
Provides an object that knows how to look up and read images based on an index. |
|
|
|
|
|
|
|
""" |
|
|
|
return V3Driver(self.metadata, self._label_map, self._fh) |
|
|
|
|
|
|
|
def _parse_camera_metadata(self): |
|
|
|
""" |
|
|
|
Gets parsed data about the physical cameras used to produce images and throws them in a dictionary. |
|
|
|
|
|
|
|
""" |
|
|
|
self.camera_metadata = {} |
|
|
|
for camera_setting in self._parse_camera_settings(): |
|
|
|
self.camera_metadata[camera_setting.channel_name] = camera_setting |
|
|
|
|
|
|
|
def _parse_metadata(self): |
|
|
|
""" |
|
|
|
Reads all metadata. |
|
|
|
Reads all metadata and instantiates the Metadata object. |
|
|
|
|
|
|
|
""" |
|
|
|
metadata_dict = {} |
|
|
|
self._label_map = self._build_label_map() |
|
|
|
for label in self._label_map.keys(): |
|
|
|
if label.endswith(six.b("LV!")) or six.b("LV|") in label: |
|
|
|
data = read_chunk(self._fh, self._label_map[label]) |
|
|
|
stop = label.index(six.b("LV")) |
|
|
|
metadata_dict[label[:stop]] = self._read_metadata(data, 1) |
|
|
|
|
|
|
|
height = metadata_dict[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiHeight')] |
|
|
|
width = metadata_dict[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiWidth')] |
|
|
|
channels = self._parse_channels(metadata_dict) |
|
|
|
date = self._parse_date(metadata_dict) |
|
|
|
fields_of_view = self._parse_fields_of_view(metadata_dict) |
|
|
|
frames = self._parse_frames(metadata_dict) |
|
|
|
z_levels = self._parse_z_levels(metadata_dict) |
|
|
|
total_images_per_channel = self._parse_total_images_per_channel(metadata_dict) |
|
|
|
self._metadata = Metadata(height, width, channels, date, fields_of_view, frames, z_levels, total_images_per_channel) |
|
|
|
|
|
|
|
def _parse_date(self, metadata_dict): |
|
|
|
height = self.raw_metadata.image_attributes[six.b('SLxImageAttributes')][six.b('uiHeight')] |
|
|
|
width = self.raw_metadata.image_attributes[six.b('SLxImageAttributes')][six.b('uiWidth')] |
|
|
|
date = self._parse_date(self.raw_metadata) |
|
|
|
fields_of_view = self._parse_fields_of_view(self.raw_metadata) |
|
|
|
frames = self._parse_frames(self.raw_metadata) |
|
|
|
z_levels = self._parse_z_levels(self.raw_metadata) |
|
|
|
total_images_per_channel = self._parse_total_images_per_channel(self.raw_metadata) |
|
|
|
channels = sorted([key for key in self.camera_metadata.keys()]) |
|
|
|
self.metadata = Metadata(height, width, channels, date, fields_of_view, frames, z_levels, total_images_per_channel) |
|
|
|
|
|
|
|
def _parse_camera_settings(self): |
|
|
|
""" |
|
|
|
Looks up information in the raw metadata about the camera(s) and puts it into a CameraSettings object. |
|
|
|
Duplicate cameras can be returned if the same one was used for multiple channels. |
|
|
|
|
|
|
|
:return: |
|
|
|
""" |
|
|
|
for camera in self.raw_metadata.image_metadata_sequence[six.b('SLxPictureMetadata')][six.b('sPicturePlanes')][six.b('sSampleSetting')].values(): |
|
|
|
name = camera[six.b('pCameraSetting')][six.b('CameraUserName')] |
|
|
|
id = camera[six.b('pCameraSetting')][six.b('CameraUniqueName')] |
|
|
|
exposure = camera[six.b('dExposureTime')] |
|
|
|
x_binning = camera[six.b('pCameraSetting')][six.b('FormatFast')][six.b('fmtDesc')][six.b('dBinningX')] |
|
|
|
y_binning = camera[six.b('pCameraSetting')][six.b('FormatFast')][six.b('fmtDesc')][six.b('dBinningY')] |
|
|
|
optical_configs = camera[six.b('sOpticalConfigs')] |
|
|
|
if six.b('') in optical_configs.keys(): |
|
|
|
channel_name = optical_configs[six.b('')][six.b('sOpticalConfigName')] |
|
|
|
else: |
|
|
|
channel_name = None |
|
|
|
yield CameraSettings(name, id, exposure, x_binning, y_binning, channel_name) |
|
|
|
|
|
|
|
def _parse_date(self, raw_metadata): |
|
|
|
""" |
|
|
|
The date and time when acquisition began. |
|
|
|
|
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
:rtype: datetime.datetime() or None |
|
|
|
|
|
|
|
""" |
|
|
|
for line in metadata_dict[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values(): |
|
|
|
for line in raw_metadata.image_text_info[six.b('SLxImageTextInfo')].values(): |
|
|
|
line = line.decode("utf8") |
|
|
|
absolute_start_12 = None |
|
|
|
absolute_start_24 = None |
|
|
@ -80,19 +222,20 @@ class V3Parser(BaseParser): |
|
|
|
return absolute_start_12 if absolute_start_12 else absolute_start_24 |
|
|
|
return None |
|
|
|
|
|
|
|
def _parse_channels(self, metadata_dict): |
|
|
|
def _parse_channels(self, raw_metadata): |
|
|
|
""" |
|
|
|
These are labels created by the NIS Elements user. Typically they may a short description of the filter cube |
|
|
|
used (e.g. "bright field", "GFP", etc.) |
|
|
|
|
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
:rtype: list |
|
|
|
|
|
|
|
""" |
|
|
|
channels = [] |
|
|
|
metadata = metadata_dict[six.b('ImageMetadataSeq')][six.b('SLxPictureMetadata')][six.b('sPicturePlanes')] |
|
|
|
metadata = raw_metadata.image_metadata_sequence[six.b('SLxPictureMetadata')][six.b('sPicturePlanes')] |
|
|
|
try: |
|
|
|
validity = metadata_dict[six.b('ImageMetadata')][six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')] |
|
|
|
except KeyError: |
|
|
|
validity = raw_metadata.image_metadata[six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')] |
|
|
|
except (KeyError, TypeError): |
|
|
|
# If none of the channels have been deleted, there is no validity list, so we just make one |
|
|
|
validity = [True for _ in metadata] |
|
|
|
# Channel information is contained in dictionaries with the keys a0, a1...an where the number |
|
|
@ -104,46 +247,50 @@ class V3Parser(BaseParser): |
|
|
|
channels.append(chan[six.b('sDescription')].decode("utf8")) |
|
|
|
return channels |
|
|
|
|
|
|
|
def _parse_fields_of_view(self, metadata_dict): |
|
|
|
def _parse_fields_of_view(self, raw_metadata): |
|
|
|
""" |
|
|
|
The metadata contains information about fields of view, but it contains it even if some fields |
|
|
|
of view were cropped. We can't find anything that states which fields of view are actually |
|
|
|
in the image data, so we have to calculate it. There probably is something somewhere, since |
|
|
|
NIS Elements can figure it out, but we haven't found it yet. |
|
|
|
|
|
|
|
:rtype: list |
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
:rtype: list |
|
|
|
|
|
|
|
""" |
|
|
|
return self._parse_dimension(r""".*?XY\((\d+)\).*?""", metadata_dict) |
|
|
|
return self._parse_dimension(r""".*?XY\((\d+)\).*?""", raw_metadata) |
|
|
|
|
|
|
|
def _parse_frames(self, metadata_dict): |
|
|
|
def _parse_frames(self, raw_metadata): |
|
|
|
""" |
|
|
|
The number of cycles. |
|
|
|
|
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
:rtype: list |
|
|
|
|
|
|
|
""" |
|
|
|
return self._parse_dimension(r""".*?T'?\((\d+)\).*?""", metadata_dict) |
|
|
|
return self._parse_dimension(r""".*?T'?\((\d+)\).*?""", raw_metadata) |
|
|
|
|
|
|
|
def _parse_z_levels(self, metadata_dict): |
|
|
|
def _parse_z_levels(self, raw_metadata): |
|
|
|
""" |
|
|
|
The different levels in the Z-plane. Just a sequence from 0 to n. |
|
|
|
|
|
|
|
:rtype: list |
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
:rtype: list |
|
|
|
|
|
|
|
""" |
|
|
|
return self._parse_dimension(r""".*?Z\((\d+)\).*?""", metadata_dict) |
|
|
|
return self._parse_dimension(r""".*?Z\((\d+)\).*?""", raw_metadata) |
|
|
|
|
|
|
|
def _parse_dimension_text(self, metadata_dict): |
|
|
|
def _parse_dimension_text(self, raw_metadata): |
|
|
|
""" |
|
|
|
While there are metadata values that represent a lot of what we want to capture, they seem to be unreliable. |
|
|
|
Sometimes certain elements don't exist, or change their data type randomly. However, the human-readable text |
|
|
|
is always there and in the same exact format, so we just parse that instead. |
|
|
|
|
|
|
|
:rtype: str |
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
:rtype: str |
|
|
|
|
|
|
|
""" |
|
|
|
for line in metadata_dict[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values(): |
|
|
|
for line in raw_metadata.image_text_info[six.b('SLxImageTextInfo')].values(): |
|
|
|
if six.b("Dimensions:") in line: |
|
|
|
metadata = line |
|
|
|
break |
|
|
@ -157,8 +304,16 @@ class V3Parser(BaseParser): |
|
|
|
return six.b("") |
|
|
|
return dimension_text |
|
|
|
|
|
|
|
def _parse_dimension(self, pattern, metadata_dict): |
|
|
|
dimension_text = self._parse_dimension_text(metadata_dict) |
|
|
|
def _parse_dimension(self, pattern, raw_metadata): |
|
|
|
""" |
|
|
|
:param pattern: a valid regex pattern |
|
|
|
:type pattern: str |
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
|
|
|
|
:rtype: list of int |
|
|
|
|
|
|
|
""" |
|
|
|
dimension_text = self._parse_dimension_text(raw_metadata) |
|
|
|
if six.PY3: |
|
|
|
dimension_text = dimension_text.decode("utf8") |
|
|
|
match = re.match(pattern, dimension_text) |
|
|
@ -167,14 +322,15 @@ class V3Parser(BaseParser): |
|
|
|
count = int(match.group(1)) |
|
|
|
return list(range(count)) |
|
|
|
|
|
|
|
def _parse_total_images_per_channel(self, metadata_dict): |
|
|
|
def _parse_total_images_per_channel(self, raw_metadata): |
|
|
|
""" |
|
|
|
The total number of images per channel. Warning: this may be inaccurate as it includes "gap" images. |
|
|
|
|
|
|
|
:type raw_metadata: V3RawMetadata |
|
|
|
:rtype: int |
|
|
|
|
|
|
|
""" |
|
|
|
return metadata_dict[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiSequenceCount')] |
|
|
|
return raw_metadata.image_attributes[six.b('SLxImageAttributes')][six.b('uiSequenceCount')] |
|
|
|
|
|
|
|
def _build_label_map(self): |
|
|
|
""" |
|
|
@ -182,104 +338,11 @@ class V3Parser(BaseParser): |
|
|
|
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label, |
|
|
|
grab the subsequent data (always 16 bytes long), advance to the next label and repeat. |
|
|
|
|
|
|
|
:rtype: dict |
|
|
|
:rtype: LabelMap |
|
|
|
|
|
|
|
""" |
|
|
|
label_map = {} |
|
|
|
self._fh.seek(-8, 2) |
|
|
|
chunk_map_start_location = struct.unpack("Q", self._fh.read(8))[0] |
|
|
|
self._fh.seek(chunk_map_start_location) |
|
|
|
raw_text = self._fh.read(-1) |
|
|
|
label_start = raw_text.index(V3Parser.CHUNK_MAP_START) + 32 |
|
|
|
|
|
|
|
while True: |
|
|
|
data_start = raw_text.index(six.b("!"), label_start) + 1 |
|
|
|
key = raw_text[label_start: data_start] |
|
|
|
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16]) |
|
|
|
if key == V3Parser.CHUNK_MAP_END: |
|
|
|
# We've reached the end of the chunk map |
|
|
|
break |
|
|
|
label_map[key] = location |
|
|
|
label_start = data_start + 16 |
|
|
|
return label_map |
|
|
|
|
|
|
|
def _parse_unsigned_char(self, data): |
|
|
|
return struct.unpack("B", data.read(1))[0] |
|
|
|
|
|
|
|
def _parse_unsigned_int(self, data): |
|
|
|
return struct.unpack("I", data.read(4))[0] |
|
|
|
|
|
|
|
def _parse_unsigned_long(self, data): |
|
|
|
return struct.unpack("Q", data.read(8))[0] |
|
|
|
|
|
|
|
def _parse_double(self, data): |
|
|
|
return struct.unpack("d", data.read(8))[0] |
|
|
|
|
|
|
|
def _parse_string(self, data): |
|
|
|
value = data.read(2) |
|
|
|
while not value.endswith(six.b("\x00\x00")): |
|
|
|
# the string ends at the first instance of \x00\x00 |
|
|
|
value += data.read(2) |
|
|
|
return value.decode("utf16")[:-1].encode("utf8") |
|
|
|
|
|
|
|
def _parse_char_array(self, data): |
|
|
|
array_length = struct.unpack("Q", data.read(8))[0] |
|
|
|
return array.array("B", data.read(array_length)) |
|
|
|
|
|
|
|
def _parse_metadata_item(self, data): |
|
|
|
""" |
|
|
|
Reads hierarchical data, analogous to a Python dict. |
|
|
|
|
|
|
|
""" |
|
|
|
new_count, length = struct.unpack("<IQ", data.read(12)) |
|
|
|
length -= data.tell() - self._cursor_position |
|
|
|
next_data_length = data.read(length) |
|
|
|
value = self._read_metadata(next_data_length, new_count) |
|
|
|
# Skip some offsets |
|
|
|
data.read(new_count * 8) |
|
|
|
return value |
|
|
|
|
|
|
|
def _get_value(self, data, data_type): |
|
|
|
""" |
|
|
|
ND2s use various codes to indicate different data types, which we translate here. |
|
|
|
|
|
|
|
""" |
|
|
|
parser = {1: self._parse_unsigned_char, |
|
|
|
2: self._parse_unsigned_int, |
|
|
|
3: self._parse_unsigned_int, |
|
|
|
5: self._parse_unsigned_long, |
|
|
|
6: self._parse_double, |
|
|
|
8: self._parse_string, |
|
|
|
9: self._parse_char_array, |
|
|
|
11: self._parse_metadata_item} |
|
|
|
return parser[data_type](data) |
|
|
|
|
|
|
|
def _read_metadata(self, data, count): |
|
|
|
""" |
|
|
|
Iterates over each element some section of the metadata and parses it. |
|
|
|
|
|
|
|
""" |
|
|
|
data = six.BytesIO(data) |
|
|
|
metadata = {} |
|
|
|
for _ in range(count): |
|
|
|
self._cursor_position = data.tell() |
|
|
|
header = data.read(2) |
|
|
|
if not header: |
|
|
|
# We've reached the end of some hierarchy of data |
|
|
|
break |
|
|
|
if six.PY3: |
|
|
|
header = header.decode("utf8") |
|
|
|
data_type, name_length = map(ord, header) |
|
|
|
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8") |
|
|
|
value = self._get_value(data, data_type) |
|
|
|
if name not in metadata.keys(): |
|
|
|
metadata[name] = value |
|
|
|
else: |
|
|
|
if not isinstance(metadata[name], list): |
|
|
|
# We have encountered this key exactly once before. Since we're seeing it again, we know we |
|
|
|
# need to convert it to a list before proceeding. |
|
|
|
metadata[name] = [metadata[name]] |
|
|
|
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append |
|
|
|
# the value to the already-existing list. |
|
|
|
metadata[name].append(value) |
|
|
|
return metadata |
|
|
|
return LabelMap(raw_text) |