|
|
@ -1,15 +1,11 @@ |
|
|
|
# -*- coding: utf-8 -*- |
|
|
|
|
|
|
|
import array |
|
|
|
from collections import namedtuple |
|
|
|
from datetime import datetime |
|
|
|
import numpy as np |
|
|
|
import re |
|
|
|
import struct |
|
|
|
from StringIO import StringIO |
|
|
|
|
|
|
|
|
|
|
|
field_of_view = namedtuple('FOV', ['number', 'x', 'y', 'z', 'pfs_offset']) |
|
|
|
import six |
|
|
|
|
|
|
|
|
|
|
|
class Nd2Parser(object): |
|
|
@ -19,9 +15,9 @@ class Nd2Parser(object): |
|
|
|
|
|
|
|
""" |
|
|
|
CHUNK_HEADER = 0xabeceda |
|
|
|
CHUNK_MAP_START = "ND2 FILEMAP SIGNATURE NAME 0001!" |
|
|
|
CHUNK_MAP_END = "ND2 CHUNK MAP SIGNATURE 0000001!" |
|
|
|
|
|
|
|
CHUNK_MAP_START = six.b("ND2 FILEMAP SIGNATURE NAME 0001!") |
|
|
|
CHUNK_MAP_END = six.b("ND2 CHUNK MAP SIGNATURE 0000001!") |
|
|
|
|
|
|
|
def __init__(self, filename): |
|
|
|
self._filename = filename |
|
|
|
self._fh = None |
|
|
@ -51,7 +47,7 @@ class Nd2Parser(object): |
|
|
|
:return: (int, array.array()) or None |
|
|
|
|
|
|
|
""" |
|
|
|
chunk = self._label_map["ImageDataSeq|%d!" % image_group_number] |
|
|
|
chunk = self._label_map[six.b("ImageDataSeq|%d!" % image_group_number)] |
|
|
|
data = self._read_chunk(chunk) |
|
|
|
# All images in the same image group share the same timestamp! So if you have complicated image data, |
|
|
|
# your timestamps may not be entirely accurate. Practically speaking though, they'll only be off by a few |
|
|
@ -82,18 +78,20 @@ class Nd2Parser(object): |
|
|
|
|
|
|
|
""" |
|
|
|
if self._dimension_text is None: |
|
|
|
for line in self.metadata['ImageTextInfo']['SLxImageTextInfo'].values(): |
|
|
|
if "Dimensions:" in line: |
|
|
|
for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values(): |
|
|
|
if six.b("Dimensions:") in line: |
|
|
|
metadata = line |
|
|
|
break |
|
|
|
else: |
|
|
|
raise ValueError("Could not parse metadata dimensions!") |
|
|
|
for line in metadata.split("\r\n"): |
|
|
|
if line.startswith("Dimensions:"): |
|
|
|
for line in metadata.split(six.b("\r\n")): |
|
|
|
if line.startswith(six.b("Dimensions:")): |
|
|
|
self._dimension_text = line |
|
|
|
break |
|
|
|
else: |
|
|
|
raise ValueError("Could not parse metadata dimensions!") |
|
|
|
if six.PY3: |
|
|
|
return self._dimension_text.decode("utf8") |
|
|
|
return self._dimension_text |
|
|
|
|
|
|
|
@property |
|
|
@ -105,19 +103,19 @@ class Nd2Parser(object): |
|
|
|
:rtype: str |
|
|
|
|
|
|
|
""" |
|
|
|
metadata = self.metadata['ImageMetadataSeq']['SLxPictureMetadata']['sPicturePlanes'] |
|
|
|
metadata = self.metadata[six.b('ImageMetadataSeq')][six.b('SLxPictureMetadata')][six.b('sPicturePlanes')] |
|
|
|
try: |
|
|
|
validity = self.metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx'][''][0]['ppNextLevelEx'][''][0]['pItemValid'] |
|
|
|
validity = self.metadata[six.b('ImageMetadata')][six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')] |
|
|
|
except KeyError: |
|
|
|
# If none of the channels have been deleted, there is no validity list, so we just make one |
|
|
|
validity = [True for _ in metadata] |
|
|
|
# Channel information is contained in dictionaries with the keys a0, a1...an where the number |
|
|
|
# indicates the order in which the channel is stored. So by sorting the dicts alphabetically |
|
|
|
# we get the correct order. |
|
|
|
for (label, chan), valid in zip(sorted(metadata['sPlaneNew'].items()), validity): |
|
|
|
for (label, chan), valid in zip(sorted(metadata[six.b('sPlaneNew')].items()), validity): |
|
|
|
if not valid: |
|
|
|
continue |
|
|
|
yield chan['sDescription'] |
|
|
|
yield chan[six.b('sDescription')].decode("utf8") |
|
|
|
|
|
|
|
def _calculate_image_group_number(self, time_index, fov, z_level): |
|
|
|
""" |
|
|
@ -154,17 +152,18 @@ class Nd2Parser(object): |
|
|
|
:rtype: datetime.datetime() |
|
|
|
|
|
|
|
""" |
|
|
|
for line in self.metadata['ImageTextInfo']['SLxImageTextInfo'].values(): |
|
|
|
for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values(): |
|
|
|
line = line.decode("utf8") |
|
|
|
absolute_start_12 = None |
|
|
|
absolute_start_24 = None |
|
|
|
# ND2s seem to randomly switch between 12- and 24-hour representations. |
|
|
|
try: |
|
|
|
absolute_start_24 = datetime.strptime(line, "%m/%d/%Y %H:%M:%S") |
|
|
|
except ValueError: |
|
|
|
except (TypeError, ValueError): |
|
|
|
pass |
|
|
|
try: |
|
|
|
absolute_start_12 = datetime.strptime(line, "%m/%d/%Y %I:%M:%S %p") |
|
|
|
except ValueError: |
|
|
|
except (TypeError, ValueError): |
|
|
|
pass |
|
|
|
if not absolute_start_12 and not absolute_start_24: |
|
|
|
continue |
|
|
@ -182,7 +181,7 @@ class Nd2Parser(object): |
|
|
|
pattern = r""".*?λ\((\d+)\).*?""" |
|
|
|
try: |
|
|
|
count = int(re.match(pattern, self._dimensions).group(1)) |
|
|
|
except AttributeError: |
|
|
|
except AttributeError as e: |
|
|
|
return 1 |
|
|
|
else: |
|
|
|
return count |
|
|
@ -246,7 +245,7 @@ class Nd2Parser(object): |
|
|
|
:rtype: int |
|
|
|
|
|
|
|
""" |
|
|
|
return self.metadata['ImageAttributes']['SLxImageAttributes']['uiSequenceCount'] |
|
|
|
return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiSequenceCount')] |
|
|
|
|
|
|
|
def _parse_metadata(self): |
|
|
|
""" |
|
|
@ -254,9 +253,9 @@ class Nd2Parser(object): |
|
|
|
|
|
|
|
""" |
|
|
|
for label in self._label_map.keys(): |
|
|
|
if label.endswith("LV!") or "LV|" in label: |
|
|
|
if label.endswith(six.b("LV!")) or six.b("LV|") in label: |
|
|
|
data = self._read_chunk(self._label_map[label]) |
|
|
|
stop = label.index("LV") |
|
|
|
stop = label.index(six.b("LV")) |
|
|
|
self.metadata[label[:stop]] = self._read_metadata(data, 1) |
|
|
|
|
|
|
|
def _read_map(self): |
|
|
@ -273,7 +272,7 @@ class Nd2Parser(object): |
|
|
|
label_start = raw_text.index(Nd2Parser.CHUNK_MAP_START) + 32 |
|
|
|
|
|
|
|
while True: |
|
|
|
data_start = raw_text.index("!", label_start) + 1 |
|
|
|
data_start = raw_text.index(six.b("!"), label_start) + 1 |
|
|
|
key = raw_text[label_start: data_start] |
|
|
|
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16]) |
|
|
|
if key == Nd2Parser.CHUNK_MAP_END: |
|
|
@ -312,7 +311,7 @@ class Nd2Parser(object): |
|
|
|
|
|
|
|
def _parse_string(self, data): |
|
|
|
value = data.read(2) |
|
|
|
while not value.endswith("\x00\x00"): |
|
|
|
while not value.endswith(six.b("\x00\x00")): |
|
|
|
# the string ends at the first instance of \x00\x00 |
|
|
|
value += data.read(2) |
|
|
|
return value.decode("utf16")[:-1].encode("utf8") |
|
|
@ -354,14 +353,16 @@ class Nd2Parser(object): |
|
|
|
Iterates over each element some section of the metadata and parses it. |
|
|
|
|
|
|
|
""" |
|
|
|
data = StringIO(data) |
|
|
|
data = six.BytesIO(data) |
|
|
|
metadata = {} |
|
|
|
for _ in xrange(count): |
|
|
|
for _ in range(count): |
|
|
|
self._cursor_position = data.tell() |
|
|
|
header = data.read(2) |
|
|
|
if not header: |
|
|
|
# We've reached the end of some hierarchy of data |
|
|
|
break |
|
|
|
if six.PY3: |
|
|
|
header = header.decode("utf8") |
|
|
|
data_type, name_length = map(ord, header) |
|
|
|
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8") |
|
|
|
value = self._get_value(data, data_type) |
|
|
|