diff --git a/nd2reader/driver/driver.py b/nd2reader/driver/driver.py deleted file mode 100644 index 8358870..0000000 --- a/nd2reader/driver/driver.py +++ /dev/null @@ -1,11 +0,0 @@ -def get_driver(filename, version): - """ - Instantiates the correct driver for the ND2, which allows us to parse metadata and access images. - - :param filename: the path to the ND2 - :type filename: str - :param version: the version of the ND2. Note that this is different than the version of NIS Elements used to create the ND2. - :type version: tuple - - """ - return 1 diff --git a/nd2reader/driver/v3.py b/nd2reader/driver/v3.py deleted file mode 100644 index c5a095b..0000000 --- a/nd2reader/driver/v3.py +++ /dev/null @@ -1,397 +0,0 @@ -# -*- coding: utf-8 -*- - -import array -from datetime import datetime -import numpy as np -import re -import struct -import six -from nd2reader.model.image import Image - - -class Nd2Parser(object): - """ - Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data. - You should not ever need to instantiate this class manually unless you're a developer. - - """ - CHUNK_HEADER = 0xabeceda - CHUNK_MAP_START = six.b("ND2 FILEMAP SIGNATURE NAME 0001!") - CHUNK_MAP_END = six.b("ND2 CHUNK MAP SIGNATURE 0000001!") - - def __init__(self, filename): - self._absolute_start = None - self._filename = filename - self._fh = None - self._channels = None - self._channel_count = None - self._chunk_map_start_location = None - self._cursor_position = 0 - self._dimension_text = None - self._fields_of_view = None - self._label_map = {} - self._metadata = {} - self._read_map() - self._time_indexes = None - self._parse_metadata() - self._z_levels = None - - def get_image(self, index): - channel_offset = index % len(self._metadata.channels) - fov = self._calculate_field_of_view(index) - channel = self._calculate_channel(index) - z_level = self._calculate_z_level(index) - image_group_number = int(index / len(self._metadata.channels)) - frame_number = self._calculate_frame_number(image_group_number, fov, z_level) - timestamp, image = self._get_raw_image_data(image_group_number, channel_offset, self._metadata.height, self._metadata.width) - image.add_params(timestamp, frame_number, fov, channel, z_level) - - @property - def absolute_start(self): - """ - The date and time when acquisition began. - - :rtype: datetime.datetime() - - """ - if self._absolute_start is None: - for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values(): - line = line.decode("utf8") - absolute_start_12 = None - absolute_start_24 = None - # ND2s seem to randomly switch between 12- and 24-hour representations. - try: - absolute_start_24 = datetime.strptime(line, "%m/%d/%Y %H:%M:%S") - except (TypeError, ValueError): - pass - try: - absolute_start_12 = datetime.strptime(line, "%m/%d/%Y %I:%M:%S %p") - except (TypeError, ValueError): - pass - if not absolute_start_12 and not absolute_start_24: - continue - return absolute_start_12 if absolute_start_12 else absolute_start_24 - raise ValueError("This ND2 has no recorded start time. This is probably a bug.") - return self._absolute_start - - @property - def channels(self): - """ - These are labels created by the NIS Elements user. Typically they may a short description of the filter cube - used (e.g. "bright field", "GFP", etc.) - - :rtype: list - - """ - if not self._channels: - self._channels = [] - metadata = self.metadata[six.b('ImageMetadataSeq')][six.b('SLxPictureMetadata')][six.b('sPicturePlanes')] - try: - validity = self.metadata[six.b('ImageMetadata')][six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')] - except KeyError: - # If none of the channels have been deleted, there is no validity list, so we just make one - validity = [True for _ in metadata] - # Channel information is contained in dictionaries with the keys a0, a1...an where the number - # indicates the order in which the channel is stored. So by sorting the dicts alphabetically - # we get the correct order. - for (label, chan), valid in zip(sorted(metadata[six.b('sPlaneNew')].items()), validity): - if not valid: - continue - self._channels.append(chan[six.b('sDescription')].decode("utf8")) - return self._channels - - @property - def fields_of_view(self): - """ - The metadata contains information about fields of view, but it contains it even if some fields - of view were cropped. We can't find anything that states which fields of view are actually - in the image data, so we have to calculate it. There probably is something somewhere, since - NIS Elements can figure it out, but we haven't found it yet. - - :rtype: list - - """ - if self._fields_of_view is None: - self._fields_of_view = self._parse_dimension_text(r""".*?XY\((\d+)\).*?""") - return self._fields_of_view - - @property - def frames(self): - """ - The number of cycles. - - :rtype: list - - """ - if self._time_indexes is None: - self._time_indexes = self._parse_dimension_text(r""".*?T'\((\d+)\).*?""") - return self._time_indexes - - @property - def z_levels(self): - """ - The different levels in the Z-plane. Just a sequence from 0 to n. - - :rtype: list - - """ - if self._z_levels is None: - self._z_levels = self._parse_dimension_text(r""".*?Z\((\d+)\).*?""") - return self._z_levels - - def _calculate_field_of_view(self, frame_number): - images_per_cycle = len(self.z_levels) * len(self.channels) - return int((frame_number - (frame_number % images_per_cycle)) / images_per_cycle) % len(self.fields_of_view) - - def _calculate_channel(self, frame_number): - return self.channels[frame_number % len(self.channels)] - - def _calculate_z_level(self, frame_number): - return self.z_levels[int(((frame_number - (frame_number % len(self.channels))) / len(self.channels)) % len(self.z_levels))] - - @property - def _file_handle(self): - if self._fh is None: - self._fh = open(self._filename, "rb") - return self._fh - - def _get_raw_image_data(self, image_group_number, channel_offset, height, width): - """ - Reads the raw bytes and the timestamp of an image. - - :param image_group_number: groups are made of images with the same time index, field of view and z-level. - :type image_group_number: int - :param channel_offset: the offset in the array where the bytes for this image are found. - :type channel_offset: int - - :return: (int, array.array()) or None - - """ - chunk = self._label_map[six.b("ImageDataSeq|%d!" % image_group_number)] - data = self._read_chunk(chunk) - # All images in the same image group share the same timestamp! So if you have complicated image data, - # your timestamps may not be entirely accurate. Practically speaking though, they'll only be off by a few - # seconds unless you're doing something super weird. - timestamp = struct.unpack("d", data[:8])[0] - image_group_data = array.array("H", data) - image_data_start = 4 + channel_offset - # The images for the various channels are interleaved within the same array. For example, the second image - # of a four image group will be composed of bytes 2, 6, 10, etc. If you understand why someone would design - # a data structure that way, please send the author of this library a message. - image_data = np.reshape(image_group_data[image_data_start::len(self.channels)], (height, width)) - # Skip images that are all zeros! This is important, since NIS Elements creates blank "gap" images if you - # don't have the same number of images each cycle. We discovered this because we only took GFP images every - # other cycle to reduce phototoxicity, but NIS Elements still allocated memory as if we were going to take - # them every cycle. - if np.any(image_data): - return timestamp, Image(image_data) - return None - - @property - def _dimensions(self): - """ - While there are metadata values that represent a lot of what we want to capture, they seem to be unreliable. - Sometimes certain elements don't exist, or change their data type randomly. However, the human-readable text - is always there and in the same exact format, so we just parse that instead. - - :rtype: str - - """ - if self._dimension_text is None: - for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values(): - if six.b("Dimensions:") in line: - metadata = line - break - else: - raise ValueError("Could not parse metadata dimensions!") - for line in metadata.split(six.b("\r\n")): - if line.startswith(six.b("Dimensions:")): - self._dimension_text = line - break - else: - raise ValueError("Could not parse metadata dimensions!") - return self._dimension_text - - def _calculate_image_group_number(self, time_index, fov, z_level): - """ - Images are grouped together if they share the same time index, field of view, and z-level. - - :type time_index: int - :type fov: int - :type z_level: int - - :rtype: int - - """ - return time_index * len(self.fields_of_view) * len(self.z_levels) + (fov * len(self.z_levels) + z_level) - - def _calculate_frame_number(self, image_group_number, fov, z_level): - return (image_group_number - (fov * len(self.z_levels) + z_level)) / (len(self.fields_of_view) * len(self.z_levels)) - - @property - def _channel_offset(self): - """ - Image data is interleaved for each image set. That is, if there are four images in a set, the first image - will consist of pixels 1, 5, 9, etc, the second will be pixels 2, 6, 10, and so forth. - - :rtype: dict - - """ - channel_offset = {} - for n, channel in enumerate(self._channels): - channel_offset[channel] = n - return channel_offset - - def _parse_dimension_text(self, pattern): - try: - count = int(re.match(pattern, self._dimensions).group(1)) - except AttributeError: - return [0] - except TypeError: - match = re.match(pattern, self._dimensions.decode("utf8")) - if not match: - return [0] - return list(range(int(match.group(1)))) - else: - return list(range(count)) - - @property - def _total_images_per_channel(self): - """ - The total number of images per channel. Warning: this may be inaccurate as it includes "gap" images. - - :rtype: int - - """ - return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiSequenceCount')] - - def _parse_metadata(self): - """ - Reads all metadata. - - """ - for label in self._label_map.keys(): - if label.endswith(six.b("LV!")) or six.b("LV|") in label: - data = self._read_chunk(self._label_map[label]) - stop = label.index(six.b("LV")) - self.metadata[label[:stop]] = self._read_metadata(data, 1) - - def _read_map(self): - """ - Every label ends with an exclamation point, however, we can't directly search for those to find all the labels - as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label, - grab the subsequent data (always 16 bytes long), advance to the next label and repeat. - - """ - self._file_handle.seek(-8, 2) - chunk_map_start_location = struct.unpack("Q", self._file_handle.read(8))[0] - self._file_handle.seek(chunk_map_start_location) - raw_text = self._file_handle.read(-1) - label_start = raw_text.index(Nd2Parser.CHUNK_MAP_START) + 32 - - while True: - data_start = raw_text.index(six.b("!"), label_start) + 1 - key = raw_text[label_start: data_start] - location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16]) - if key == Nd2Parser.CHUNK_MAP_END: - # We've reached the end of the chunk map - break - self._label_map[key] = location - label_start = data_start + 16 - - def _read_chunk(self, chunk_location): - """ - Gets the data for a given chunk pointer - - """ - self._file_handle.seek(chunk_location) - # The chunk metadata is always 16 bytes long - chunk_metadata = self._file_handle.read(16) - header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata) - if header != Nd2Parser.CHUNK_HEADER: - raise ValueError("The ND2 file seems to be corrupted.") - # We start at the location of the chunk metadata, skip over the metadata, and then proceed to the - # start of the actual data field, which is at some arbitrary place after the metadata. - self._file_handle.seek(chunk_location + 16 + relative_offset) - return self._file_handle.read(data_length) - - def _parse_unsigned_char(self, data): - return struct.unpack("B", data.read(1))[0] - - def _parse_unsigned_int(self, data): - return struct.unpack("I", data.read(4))[0] - - def _parse_unsigned_long(self, data): - return struct.unpack("Q", data.read(8))[0] - - def _parse_double(self, data): - return struct.unpack("d", data.read(8))[0] - - def _parse_string(self, data): - value = data.read(2) - while not value.endswith(six.b("\x00\x00")): - # the string ends at the first instance of \x00\x00 - value += data.read(2) - return value.decode("utf16")[:-1].encode("utf8") - - def _parse_char_array(self, data): - array_length = struct.unpack("Q", data.read(8))[0] - return array.array("B", data.read(array_length)) - - def _parse_metadata_item(self, data): - """ - Reads hierarchical data, analogous to a Python dict. - - """ - new_count, length = struct.unpack("