Browse Source

#125: refactored to make adding more metadata easier

zolfa-add_slices_loading
Jim Rybarski 9 years ago
parent
commit
1436068d4c
4 changed files with 259 additions and 145 deletions
  1. +97
    -0
      nd2reader/common/v3.py
  2. +3
    -3
      nd2reader/interface.py
  3. +5
    -0
      nd2reader/model/label.py
  4. +154
    -142
      nd2reader/parser/v3.py

+ 97
- 0
nd2reader/common/v3.py View File

@ -1,5 +1,6 @@
import struct
import array
import six
def read_chunk(fh, chunk_location):
@ -13,6 +14,8 @@ def read_chunk(fh, chunk_location):
:rtype: bytes
"""
if chunk_location is None:
return None
fh.seek(chunk_location)
# The chunk metadata is always 16 bytes long
chunk_metadata = fh.read(16)
@ -32,4 +35,98 @@ def read_array(fh, kind, chunk_location):
if kind not in kinds:
raise ValueError('You attempted to read an array of an unknown type.')
raw_data = read_chunk(fh, chunk_location)
if raw_data is None:
return None
return array.array(kinds[kind], raw_data)
def _parse_unsigned_char(data):
return struct.unpack("B", data.read(1))[0]
def _parse_unsigned_int(data):
return struct.unpack("I", data.read(4))[0]
def _parse_unsigned_long(data):
return struct.unpack("Q", data.read(8))[0]
def _parse_double(data):
return struct.unpack("d", data.read(8))[0]
def _parse_string(data):
value = data.read(2)
while not value.endswith(six.b("\x00\x00")):
# the string ends at the first instance of \x00\x00
value += data.read(2)
return value.decode("utf16")[:-1].encode("utf8")
def _parse_char_array(data):
array_length = struct.unpack("Q", data.read(8))[0]
return array.array("B", data.read(array_length))
def _parse_metadata_item(data, cursor_position):
"""
Reads hierarchical data, analogous to a Python dict.
"""
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - cursor_position
next_data_length = data.read(length)
value = read_metadata(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
return value
def _get_value(data, data_type, cursor_position):
"""
ND2s use various codes to indicate different data types, which we translate here.
"""
parser = {1: _parse_unsigned_char,
2: _parse_unsigned_int,
3: _parse_unsigned_int,
5: _parse_unsigned_long,
6: _parse_double,
8: _parse_string,
9: _parse_char_array,
11: _parse_metadata_item}
return parser[data_type](data) if data_type < 11 else parser[data_type](data, cursor_position)
def read_metadata(data, count):
"""
Iterates over each element some section of the metadata and parses it.
"""
if data is None:
return None
data = six.BytesIO(data)
metadata = {}
for _ in range(count):
cursor_position = data.tell()
header = data.read(2)
if not header:
# We've reached the end of some hierarchy of data
break
if six.PY3:
header = header.decode("utf8")
data_type, name_length = map(ord, header)
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
value = _get_value(data, data_type, cursor_position)
if name not in metadata.keys():
metadata[name] = value
else:
if not isinstance(metadata[name], list):
# We have encountered this key exactly once before. Since we're seeing it again, we know we
# need to convert it to a list before proceeding.
metadata[name] = [metadata[name]]
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
# the value to the already-existing list.
metadata[name].append(value)
return metadata

+ 3
- 3
nd2reader/interface.py View File

@ -11,9 +11,9 @@ class Nd2(object):
self._filename = filename
self._fh = open(filename, "rb")
major_version, minor_version = get_version(self._fh)
parser = get_parser(self._fh, major_version, minor_version)
self._driver = parser.driver
self._metadata = parser.metadata
self._parser = get_parser(self._fh, major_version, minor_version)
self._driver = self._parser.driver
self._metadata = self._parser.metadata
def __enter__(self):
return self


+ 5
- 0
nd2reader/model/label.py View File

@ -5,11 +5,16 @@ import re
class LabelMap(object):
"""
Contains pointers to metadata. This might only be valid for V3 files.
"""
def __init__(self, raw_binary_data):
self._data = raw_binary_data
self._image_data = {}
def image_attributes(self):
return self._get_location(six.b("ImageAttributesLV!"))
def _get_location(self, label):
try:
label_location = self._data.index(label) + len(label)


+ 154
- 142
nd2reader/parser/v3.py View File

@ -1,17 +1,128 @@
# -*- coding: utf-8 -*-
import array
from datetime import datetime
from nd2reader.model.metadata import Metadata
from nd2reader.model.label import LabelMap
from nd2reader.parser.base import BaseParser
from nd2reader.driver.v3 import V3Driver
from nd2reader.common.v3 import read_chunk, read_array
from nd2reader.common.v3 import read_chunk, read_array, read_metadata
import re
import six
import struct
def ignore_missing(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
return None
return wrapper
class V3RawMetadata(object):
def __init__(self, fh, label_map):
self._fh = fh
self._label_map = label_map
@property
@ignore_missing
def image_text_info(self):
return read_metadata(read_chunk(self._fh, self._label_map.image_text_info), 1)
@property
@ignore_missing
def image_metadata_sequence(self):
return read_metadata(read_chunk(self._fh, self._label_map.image_metadata_sequence), 1)
@property
@ignore_missing
def image_calibration(self):
return read_metadata(read_chunk(self._fh, self._label_map.image_calibration), 1)
@property
@ignore_missing
def image_attributes(self):
return read_metadata(read_chunk(self._fh, self._label_map.image_attributes), 1)
@property
@ignore_missing
def x_data(self):
return read_array(self._fh, 'double', self._label_map.x_data)
@property
@ignore_missing
def y_data(self):
return read_array(self._fh, 'double', self._label_map.y_data)
@property
@ignore_missing
def z_data(self):
return read_array(self._fh, 'double', self._label_map.z_data)
@property
@ignore_missing
def roi_metadata(self):
return read_metadata(read_chunk(self._fh, self._label_map.roi_metadata), 1)
@property
@ignore_missing
def pfs_status(self):
return read_array(self._fh, 'int', self._label_map.pfs_status)
@property
@ignore_missing
def pfs_offset(self):
return read_array(self._fh, 'int', self._label_map.pfs_offset)
@property
@ignore_missing
def camera_exposure_time(self):
return read_array(self._fh, 'double', self._label_map.camera_exposure_time)
@property
@ignore_missing
def lut_data(self):
return read_chunk(self._fh, self._label_map.lut_data)
@property
@ignore_missing
def grabber_settings(self):
return read_chunk(self._fh, self._label_map.grabber_settings)
@property
@ignore_missing
def custom_data(self):
return read_chunk(self._fh, self._label_map.custom_data)
@property
@ignore_missing
def app_info(self):
return read_chunk(self._fh, self._label_map.app_info)
@property
@ignore_missing
def camera_temp(self):
camera_temp = read_array(self._fh, 'double', self._label_map.camera_temp)
if camera_temp:
for temp in map(lambda x: round(x * 100.0, 2), camera_temp):
yield temp
@property
@ignore_missing
def acquisition_times(self):
acquisition_times = read_array(self._fh, 'double', self._label_map.acquisition_times)
if acquisition_times:
for acquisition_time in map(lambda x: x / 1000.0, acquisition_times):
yield acquisition_time
@property
@ignore_missing
def image_metadata(self):
if self._label_map.image_metadata:
return read_metadata(read_chunk(self._fh, self._label_map.image_metadata), 1)
class V3Parser(BaseParser):
""" Parses ND2 files and creates a Metadata and driver object. """
CHUNK_HEADER = 0xabeceda
@ -25,6 +136,7 @@ class V3Parser(BaseParser):
"""
self._fh = fh
self._metadata = None
self._raw_metadata = None
self._label_map = None
@property
@ -41,56 +153,37 @@ class V3Parser(BaseParser):
def driver(self):
return V3Driver(self.metadata, self._label_map, self._fh)
def _build_metadata_dict(self):
self._label_map = self._build_label_map()
raw_data = {"image_text_info": self._read_metadata(read_chunk(self._fh, self._label_map.image_text_info), 1),
"image_metadata_sequence": self._read_metadata(read_chunk(self._fh, self._label_map.image_metadata_sequence), 1),
"image_calibration": self._read_metadata(read_chunk(self._fh, self._label_map.image_calibration), 1),
"image_attributes": self._read_metadata(read_chunk(self._fh, self._label_map.image_attributes), 1),
"x_data": read_array(self._fh, 'double', self._label_map.x_data),
"y_data": read_array(self._fh, 'double', self._label_map.y_data),
"z_data": read_array(self._fh, 'double', self._label_map.z_data),
"roi_metadata": read_chunk(self._fh, self._label_map.roi_metadata),
"pfs_status": read_array(self._fh, 'int', self._label_map.pfs_status),
"pfs_offset": read_array(self._fh, 'int', self._label_map.pfs_offset),
"camera_exposure_time": read_array(self._fh, 'double', self._label_map.camera_exposure_time),
"camera_temp": map(lambda x: round(x * 100.0, 2), read_array(self._fh, 'double', self._label_map.camera_temp)),
"acquisition_times": map(lambda x: x / 1000.0, read_array(self._fh, 'double', self._label_map.acquisition_times)),
"lut_data": read_chunk(self._fh, self._label_map.lut_data),
"grabber_settings": read_chunk(self._fh, self._label_map.grabber_settings),
"custom_data": read_chunk(self._fh, self._label_map.custom_data),
"app_info": read_chunk(self._fh, self._label_map.app_info),
}
if self._label_map.image_metadata:
raw_data["image_metadata"] = self._read_metadata(read_chunk(self._fh, self._label_map.image_metadata), 1)
return raw_data
@property
def raw_metadata(self):
if not self._raw_metadata:
self._label_map = self._build_label_map()
self._raw_metadata = V3RawMetadata(self._fh, self._label_map)
return self._raw_metadata
def _parse_metadata(self):
"""
Reads all metadata and instantiates the Metadata object.
"""
metadata_dict = self._build_metadata_dict()
height = metadata_dict['image_attributes'][six.b('SLxImageAttributes')][six.b('uiHeight')]
width = metadata_dict['image_attributes'][six.b('SLxImageAttributes')][six.b('uiWidth')]
channels = self._parse_channels(metadata_dict)
date = self._parse_date(metadata_dict)
fields_of_view = self._parse_fields_of_view(metadata_dict)
frames = self._parse_frames(metadata_dict)
z_levels = self._parse_z_levels(metadata_dict)
total_images_per_channel = self._parse_total_images_per_channel(metadata_dict)
height = self.raw_metadata.image_attributes[six.b('SLxImageAttributes')][six.b('uiHeight')]
width = self.raw_metadata.image_attributes[six.b('SLxImageAttributes')][six.b('uiWidth')]
channels = self._parse_channels(self.raw_metadata)
date = self._parse_date(self.raw_metadata)
fields_of_view = self._parse_fields_of_view(self.raw_metadata)
frames = self._parse_frames(self.raw_metadata)
z_levels = self._parse_z_levels(self.raw_metadata)
total_images_per_channel = self._parse_total_images_per_channel(self.raw_metadata)
self._metadata = Metadata(height, width, channels, date, fields_of_view, frames, z_levels, total_images_per_channel)
def _parse_date(self, metadata_dict):
def _parse_date(self, raw_metadata):
"""
The date and time when acquisition began.
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: datetime.datetime() or None
"""
for line in metadata_dict['image_text_info'][six.b('SLxImageTextInfo')].values():
for line in raw_metadata.image_text_info[six.b('SLxImageTextInfo')].values():
line = line.decode("utf8")
absolute_start_12 = None
absolute_start_24 = None
@ -108,20 +201,20 @@ class V3Parser(BaseParser):
return absolute_start_12 if absolute_start_12 else absolute_start_24
return None
def _parse_channels(self, metadata_dict):
def _parse_channels(self, raw_metadata):
"""
These are labels created by the NIS Elements user. Typically they may a short description of the filter cube
used (e.g. "bright field", "GFP", etc.)
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: list
"""
channels = []
metadata = metadata_dict['image_metadata_sequence'][six.b('SLxPictureMetadata')][six.b('sPicturePlanes')]
metadata = raw_metadata.image_metadata_sequence[six.b('SLxPictureMetadata')][six.b('sPicturePlanes')]
try:
validity = metadata_dict['image_metadata'][six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')]
except KeyError:
validity = raw_metadata.image_metadata[six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')]
except (KeyError, TypeError):
# If none of the channels have been deleted, there is no validity list, so we just make one
validity = [True for _ in metadata]
# Channel information is contained in dictionaries with the keys a0, a1...an where the number
@ -133,50 +226,50 @@ class V3Parser(BaseParser):
channels.append(chan[six.b('sDescription')].decode("utf8"))
return channels
def _parse_fields_of_view(self, metadata_dict):
def _parse_fields_of_view(self, raw_metadata):
"""
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: list
"""
return self._parse_dimension(r""".*?XY\((\d+)\).*?""", metadata_dict)
return self._parse_dimension(r""".*?XY\((\d+)\).*?""", raw_metadata)
def _parse_frames(self, metadata_dict):
def _parse_frames(self, raw_metadata):
"""
The number of cycles.
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: list
"""
return self._parse_dimension(r""".*?T'?\((\d+)\).*?""", metadata_dict)
return self._parse_dimension(r""".*?T'?\((\d+)\).*?""", raw_metadata)
def _parse_z_levels(self, metadata_dict):
def _parse_z_levels(self, raw_metadata):
"""
The different levels in the Z-plane. Just a sequence from 0 to n.
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: list
"""
return self._parse_dimension(r""".*?Z\((\d+)\).*?""", metadata_dict)
return self._parse_dimension(r""".*?Z\((\d+)\).*?""", raw_metadata)
def _parse_dimension_text(self, metadata_dict):
def _parse_dimension_text(self, raw_metadata):
"""
While there are metadata values that represent a lot of what we want to capture, they seem to be unreliable.
Sometimes certain elements don't exist, or change their data type randomly. However, the human-readable text
is always there and in the same exact format, so we just parse that instead.
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: str
"""
for line in metadata_dict['image_text_info'][six.b('SLxImageTextInfo')].values():
for line in raw_metadata.image_text_info[six.b('SLxImageTextInfo')].values():
if six.b("Dimensions:") in line:
metadata = line
break
@ -190,16 +283,16 @@ class V3Parser(BaseParser):
return six.b("")
return dimension_text
def _parse_dimension(self, pattern, metadata_dict):
def _parse_dimension(self, pattern, raw_metadata):
"""
:param pattern: a valid regex pattern
:type pattern: str
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: list of int
"""
dimension_text = self._parse_dimension_text(metadata_dict)
dimension_text = self._parse_dimension_text(raw_metadata)
if six.PY3:
dimension_text = dimension_text.decode("utf8")
match = re.match(pattern, dimension_text)
@ -208,15 +301,15 @@ class V3Parser(BaseParser):
count = int(match.group(1))
return list(range(count))
def _parse_total_images_per_channel(self, metadata_dict):
def _parse_total_images_per_channel(self, raw_metadata):
"""
The total number of images per channel. Warning: this may be inaccurate as it includes "gap" images.
:type metadata_dict: dict
:type raw_metadata: V3RawMetadata
:rtype: int
"""
return metadata_dict['image_attributes'][six.b('SLxImageAttributes')][six.b('uiSequenceCount')]
return raw_metadata.image_attributes[six.b('SLxImageAttributes')][six.b('uiSequenceCount')]
def _build_label_map(self):
"""
@ -232,84 +325,3 @@ class V3Parser(BaseParser):
self._fh.seek(chunk_map_start_location)
raw_text = self._fh.read(-1)
return LabelMap(raw_text)
def _parse_unsigned_char(self, data):
return struct.unpack("B", data.read(1))[0]
def _parse_unsigned_int(self, data):
return struct.unpack("I", data.read(4))[0]
def _parse_unsigned_long(self, data):
return struct.unpack("Q", data.read(8))[0]
def _parse_double(self, data):
return struct.unpack("d", data.read(8))[0]
def _parse_string(self, data):
value = data.read(2)
while not value.endswith(six.b("\x00\x00")):
# the string ends at the first instance of \x00\x00
value += data.read(2)
return value.decode("utf16")[:-1].encode("utf8")
def _parse_char_array(self, data):
array_length = struct.unpack("Q", data.read(8))[0]
return array.array("B", data.read(array_length))
def _parse_metadata_item(self, data):
"""
Reads hierarchical data, analogous to a Python dict.
"""
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - self._cursor_position
next_data_length = data.read(length)
value = self._read_metadata(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
return value
def _get_value(self, data, data_type):
"""
ND2s use various codes to indicate different data types, which we translate here.
"""
parser = {1: self._parse_unsigned_char,
2: self._parse_unsigned_int,
3: self._parse_unsigned_int,
5: self._parse_unsigned_long,
6: self._parse_double,
8: self._parse_string,
9: self._parse_char_array,
11: self._parse_metadata_item}
return parser[data_type](data)
def _read_metadata(self, data, count):
"""
Iterates over each element some section of the metadata and parses it.
"""
data = six.BytesIO(data)
metadata = {}
for _ in range(count):
self._cursor_position = data.tell()
header = data.read(2)
if not header:
# We've reached the end of some hierarchy of data
break
if six.PY3:
header = header.decode("utf8")
data_type, name_length = map(ord, header)
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
value = self._get_value(data, data_type)
if name not in metadata.keys():
metadata[name] = value
else:
if not isinstance(metadata[name], list):
# We have encountered this key exactly once before. Since we're seeing it again, we know we
# need to convert it to a list before proceeding.
metadata[name] = [metadata[name]]
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
# the value to the already-existing list.
metadata[name].append(value)
return metadata

Loading…
Cancel
Save