|
|
@ -1,9 +1,59 @@ |
|
|
|
import array |
|
|
|
import numpy as np |
|
|
|
import struct |
|
|
|
from collections import namedtuple |
|
|
|
from StringIO import StringIO |
|
|
|
|
|
|
|
chunk = namedtuple('Chunk', ['location', 'length']) |
|
|
|
field_of_view = namedtuple('FOV', ['x', 'y', 'z', 'pfs_offset']) |
|
|
|
channel = namedtuple('Channel', ['name', 'camera', 'exposure_time']) |
|
|
|
|
|
|
|
class Nd2Reader(object): |
|
|
|
|
|
|
|
class Nd2(object): |
|
|
|
def __init__(self, filename): |
|
|
|
self._parser = Nd2Parser(filename) |
|
|
|
|
|
|
|
@property |
|
|
|
def height(self): |
|
|
|
return self._parser.metadata['ImageAttributes']['SLxImageAttributes']['uiHeight'] |
|
|
|
|
|
|
|
@property |
|
|
|
def width(self): |
|
|
|
return self._parser.metadata['ImageAttributes']['SLxImageAttributes']['uiWidth'] |
|
|
|
|
|
|
|
@property |
|
|
|
def fields_of_view(self): |
|
|
|
for fov in self.metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx'][''][0]['uLoopPars']['Points']['']: |
|
|
|
yield field_of_view(x=fov['dPosX'], y=fov['dPosY'], z=fov['dPosZ'], pfs_offset=fov['dPFSOffset']) |
|
|
|
|
|
|
|
@property |
|
|
|
def fov_count(self): |
|
|
|
return len(list(self.fields_of_view)) |
|
|
|
|
|
|
|
@property |
|
|
|
def channels(self): |
|
|
|
metadata = self.metadata['ImageMetadataSeq']['SLxPictureMetadata']['sPicturePlanes'] |
|
|
|
for label, chan in metadata['sPlaneNew'].items(): |
|
|
|
name = chan['sDescription'] |
|
|
|
exposure_time = metadata['sSampleSetting'][label]['dExposureTime'] |
|
|
|
camera = metadata['sSampleSetting'][label]['pCameraSetting']['CameraUserName'] |
|
|
|
yield channel(name=name, exposure_time=exposure_time, camera=camera) |
|
|
|
|
|
|
|
@property |
|
|
|
def metadata(self): |
|
|
|
return self._parser.metadata |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Nd2Parser(object): |
|
|
|
""" |
|
|
|
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data. |
|
|
|
|
|
|
@ -13,7 +63,9 @@ class Nd2Reader(object): |
|
|
|
self._file_handler = None |
|
|
|
self._chunk_map_start_location = None |
|
|
|
self._label_map = {} |
|
|
|
self._metadata = {} |
|
|
|
self._read_map() |
|
|
|
self._parse_dict_data() |
|
|
|
|
|
|
|
@property |
|
|
|
def fh(self): |
|
|
@ -21,6 +73,25 @@ class Nd2Reader(object): |
|
|
|
self._file_handler = open(self._filename, "rb") |
|
|
|
return self._file_handler |
|
|
|
|
|
|
|
def _parse_dict_data(self): |
|
|
|
# TODO: Don't like this name |
|
|
|
for label in self._top_level_dict_labels: |
|
|
|
chunk_location = self._label_map[label].location |
|
|
|
data = self._read_chunk(chunk_location) |
|
|
|
stop = label.index("LV") |
|
|
|
self._metadata[label[:stop]] = self.read_lv_encoding(data, 1) |
|
|
|
|
|
|
|
@property |
|
|
|
def metadata(self): |
|
|
|
return self._metadata |
|
|
|
|
|
|
|
@property |
|
|
|
def _top_level_dict_labels(self): |
|
|
|
# TODO: I don't like this name either |
|
|
|
for label in self._label_map.keys(): |
|
|
|
if label.endswith("LV!") or "LV|" in label: |
|
|
|
yield label |
|
|
|
|
|
|
|
def _read_map(self): |
|
|
|
""" |
|
|
|
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels |
|
|
@ -36,6 +107,7 @@ class Nd2Reader(object): |
|
|
|
if label == "ND2 CHUNK MAP SIGNATURE 0000001!": |
|
|
|
# We've reached the end of the chunk map |
|
|
|
break |
|
|
|
|
|
|
|
self._label_map[label] = value |
|
|
|
label_start = data_start + 16 |
|
|
|
|
|
|
@ -65,8 +137,8 @@ class Nd2Reader(object): |
|
|
|
|
|
|
|
""" |
|
|
|
key = raw_text[label_start: data_start] |
|
|
|
value = struct.unpack("QQ", raw_text[data_start: data_start + 16]) |
|
|
|
return key, value |
|
|
|
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16]) |
|
|
|
return key, chunk(location=location, length=length) |
|
|
|
|
|
|
|
@property |
|
|
|
def chunk_map_start_location(self): |
|
|
@ -131,4 +203,83 @@ class Nd2Reader(object): |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def as_numpy_array(arr): |
|
|
|
return np.frombuffer(arr) |
|
|
|
return np.frombuffer(arr) |
|
|
|
|
|
|
|
def read_lv_encoding(self, data, count): |
|
|
|
data = StringIO(data) |
|
|
|
res = {} |
|
|
|
for c in range(count): |
|
|
|
lastpos = data.tell() |
|
|
|
hdr = data.read(2) |
|
|
|
if not hdr: |
|
|
|
break |
|
|
|
typ = ord(hdr[0]) |
|
|
|
bname = data.read(2*ord(hdr[1])) |
|
|
|
name = bname.decode("utf16")[:-1].encode("utf8") |
|
|
|
if typ == 1: |
|
|
|
value, = struct.unpack("B", data.read(1)) |
|
|
|
elif typ in [2, 3]: |
|
|
|
value, = struct.unpack("I", data.read(4)) |
|
|
|
elif typ == 5: |
|
|
|
value, = struct.unpack("Q", data.read(8)) |
|
|
|
elif typ == 6: |
|
|
|
value, = struct.unpack("d", data.read(8)) |
|
|
|
elif typ == 8: |
|
|
|
value = data.read(2) |
|
|
|
while value[-2:] != "\x00\x00": |
|
|
|
value += data.read(2) |
|
|
|
value = value.decode("utf16")[:-1].encode("utf8") |
|
|
|
elif typ == 9: |
|
|
|
cnt, = struct.unpack("Q", data.read(8)) |
|
|
|
value = array.array("B", data.read(cnt)) |
|
|
|
elif typ == 11: |
|
|
|
newcount, length = struct.unpack("<IQ", data.read(12)) |
|
|
|
length -= data.tell()-lastpos |
|
|
|
nextdata = data.read(length) |
|
|
|
value = self.read_lv_encoding(nextdata, newcount) |
|
|
|
# XXX do not know for what these offsets? are |
|
|
|
unknown = array.array("I", data.read(newcount*8)) |
|
|
|
else: |
|
|
|
assert 0, "%s hdr %x:%x unknown" % (name, ord(hdr[0]), ord(hdr[1])) |
|
|
|
if not name in res: |
|
|
|
res[name] = value |
|
|
|
else: |
|
|
|
if type(res[name]) != type([]): |
|
|
|
res[name] = [res[name]] |
|
|
|
res[name].append(value) |
|
|
|
x = data.read() |
|
|
|
assert not x, "skip %d %s" % (len(x), repr(x[:30])) |
|
|
|
return res |
|
|
|
|
|
|
|
|
|
|
|
class LVLine(object): |
|
|
|
def __init__(self, line): |
|
|
|
self._line = line |
|
|
|
self._extract() |
|
|
|
|
|
|
|
def _extract(self): |
|
|
|
if self._type == 11: |
|
|
|
count, length = struct.unpack("<IQ", self._line[self._name_end: self._name_end + 12]) |
|
|
|
newline = LVLine(self._line[self._name_end + 12:]) |
|
|
|
|
|
|
|
@property |
|
|
|
def name(self): |
|
|
|
return self._line[2: self._name_end].decode("utf16").encode("utf8") |
|
|
|
|
|
|
|
@property |
|
|
|
def _type(self): |
|
|
|
return ord(self._line[0]) |
|
|
|
|
|
|
|
@property |
|
|
|
def _name_end(self): |
|
|
|
""" |
|
|
|
Length is given as number of characters, but since it's unicode (which is two-bytes per character) we return |
|
|
|
twice the number. |
|
|
|
|
|
|
|
""" |
|
|
|
return ord(self._line[1]) * 2 |
|
|
|
|
|
|
|
|
|
|
|
class LVData(object): |
|
|
|
def __init__(self, data): |
|
|
|
self._extracted_data = LVLine(data) |