Browse Source

#11 eliminated the messy parts of the parser, mostly

master
jim 10 years ago
parent
commit
46b1cdb933
1 changed files with 64 additions and 43 deletions
  1. +64
    -43
      nd2reader/reader.py

+ 64
- 43
nd2reader/reader.py View File

@ -13,6 +13,10 @@ field_of_view = namedtuple('FOV', ['number', 'x', 'y', 'z', 'pfs_offset'])
class Nd2FileReader(object):
CHUNK_HEADER = 0xabeceda
CHUNK_MAP_START = "ND2 FILEMAP SIGNATURE NAME 0001!"
CHUNK_MAP_END = "ND2 CHUNK MAP SIGNATURE 0000001!"
"""
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
@ -23,12 +27,17 @@ class Nd2FileReader(object):
self._file_handler = None
self._channel_offset = None
self._chunk_map_start_location = None
self._cursor_position = None
self._label_map = {}
self._metadata = {}
self._read_map()
self._parse_dict_data()
self._parse_metadata()
self.__dimensions = None
@staticmethod
def as_numpy_array(arr):
return np.frombuffer(arr)
def get_image(self, time_index, fov, channel_name, z_level):
image_set_number = self._calculate_image_set_number(time_index, fov, z_level)
timestamp, raw_image_data = self.get_raw_image_data(image_set_number, self.channel_offset[channel_name])
@ -168,14 +177,13 @@ class Nd2FileReader(object):
image_data_start = 4 + channel_offset
return timestamp, image_data[image_data_start::self.channel_count]
def _parse_dict_data(self):
# TODO: Don't like this name
def _parse_metadata(self):
for label in self._label_map.keys():
if not label.endswith("LV!") or "LV|" in label:
continue
data = self._read_chunk(self._label_map[label])
stop = label.index("LV")
self._metadata[label[:stop]] = self.read_lv_encoding(data, 1)
self._metadata[label[:stop]] = self._read_file(data, 1)
def _read_map(self):
"""
@ -188,13 +196,13 @@ class Nd2FileReader(object):
chunk_map_start_location = struct.unpack("Q", self.fh.read(8))[0]
self.fh.seek(chunk_map_start_location)
raw_text = self.fh.read(-1)
label_start = raw_text.index("ND2 FILEMAP SIGNATURE NAME 0001!") + 32
label_start = raw_text.index(Nd2FileReader.CHUNK_MAP_START) + 32
while True:
data_start = raw_text.index("!", label_start) + 1
key = raw_text[label_start: data_start]
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
if key == "ND2 CHUNK MAP SIGNATURE 0000001!":
if key == Nd2FileReader.CHUNK_MAP_END:
# We've reached the end of the chunk map
break
self._label_map[key] = location
@ -209,58 +217,73 @@ class Nd2FileReader(object):
# The chunk metadata is always 16 bytes long
chunk_metadata = self.fh.read(16)
header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
if header != 0xabeceda:
if header != Nd2FileReader.CHUNK_HEADER:
raise ValueError("The ND2 file seems to be corrupted.")
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata.
self.fh.seek(chunk_location + 16 + relative_offset)
return self.fh.read(data_length)
@staticmethod
def as_numpy_array(arr):
return np.frombuffer(arr)
def _z_level_count(self):
name = "CustomData|Z!"
st = self._read_chunk(self._label_map[name])
res = array.array("d", st)
return len(res)
def read_lv_encoding(self, data, count):
return len(array.array("d", st))
def _parse_unsigned_char(self, data):
return struct.unpack("B", data.read(1))[0]
def _parse_unsigned_int(self, data):
return struct.unpack("I", data.read(4))[0]
def _parse_unsigned_long(self, data):
return struct.unpack("Q", data.read(8))[0]
def _parse_double(self, data):
return struct.unpack("d", data.read(8))[0]
def _parse_string(self, data):
value = data.read(2)
while not value.endswith("\x00\x00"):
# the string ends at the first instance of \x00\x00
value += data.read(2)
return value.decode("utf16")[:-1].encode("utf8")
def _parse_char_array(self, data):
array_length = struct.unpack("Q", data.read(8))[0]
return array.array("B", data.read(array_length))
def _parse_metadata_item(self, args):
data, cursor_position = args
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - cursor_position
next_data_length = data.read(length)
value = self._read_file(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
return value
def _get_value(self, data, data_type):
parser = {1: {'method': self._parse_unsigned_char, 'args': data},
2: {'method': self._parse_unsigned_int, 'args': data},
3: {'method': self._parse_unsigned_int, 'args': data},
5: {'method': self._parse_unsigned_long, 'args': data},
6: {'method': self._parse_double, 'args': data},
8: {'method': self._parse_string, 'args': data},
9: {'method': self._parse_char_array, 'args': data},
11: {'method': self._parse_metadata_item, 'args': (data, self._cursor_position)}}
return parser[data_type]['method'](parser[data_type]['args'])
def _read_file(self, data, count):
data = StringIO(data)
metadata = {}
total_count = 0
for _ in xrange(count):
cursor_position = data.tell()
total_count += 1
self._cursor_position = data.tell()
header = data.read(2)
if not header:
break
data_type, name_length = map(ord, header)
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
if data_type == 1:
value, = struct.unpack("B", data.read(1))
elif data_type in [2, 3]:
value, = struct.unpack("I", data.read(4))
elif data_type == 5:
value, = struct.unpack("Q", data.read(8))
elif data_type == 6:
value, = struct.unpack("d", data.read(8))
elif data_type == 8:
value = data.read(2)
while value[-2:] != "\x00\x00":
value += data.read(2)
value = value.decode("utf16")[:-1].encode("utf8")
elif data_type == 9:
cnt, = struct.unpack("Q", data.read(8))
value = array.array("B", data.read(cnt))
elif data_type == 11:
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - cursor_position
next_data_length = data.read(length)
value = self.read_lv_encoding(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
value = self._get_value(data, data_type)
if name not in metadata:
metadata[name] = value
else:
@ -271,6 +294,4 @@ class Nd2FileReader(object):
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
# the value to the already-existing list.
metadata[name].append(value)
# x = data.read()
# assert not x, "skip %d %s" % (len(x), repr(x[:30]))
return metadata

Loading…
Cancel
Save