Browse Source

#66 began enormous refactor to support multiple versions of ND2s in an elegant way

master
jim 9 years ago
parent
commit
949d0612d4
15 changed files with 949 additions and 876 deletions
  1. +1
    -152
      nd2reader/__init__.py
  2. +1
    -0
      nd2reader/driver/__init__.py
  3. +11
    -0
      nd2reader/driver/driver.py
  4. +0
    -0
      nd2reader/driver/v2.py
  5. +386
    -0
      nd2reader/driver/v3.py
  6. +36
    -0
      nd2reader/driver/version.py
  7. +154
    -0
      nd2reader/interface.py
  8. +2
    -155
      nd2reader/model/__init__.py
  9. +37
    -0
      nd2reader/model/group.py
  10. +115
    -0
      nd2reader/model/image.py
  11. +0
    -386
      nd2reader/parser.py
  12. +183
    -183
      tests/__init__.py
  13. +0
    -0
      tests/driver/__init__.py
  14. +7
    -0
      tests/driver/driver.py
  15. +16
    -0
      tests/driver/version.py

+ 1
- 152
nd2reader/__init__.py View File

@ -1,152 +1 @@
# -*- coding: utf-8 -*-
from nd2reader.model import Image, ImageSet
from nd2reader.parser import Nd2Parser
import six
class Nd2(Nd2Parser):
"""
Allows easy access to NIS Elements .nd2 image files.
"""
def __init__(self, filename):
super(Nd2, self).__init__(filename)
self._filename = filename
def __repr__(self):
return "\n".join(["<ND2 %s>" % self._filename,
"Created: %s" % self.absolute_start.strftime("%Y-%m-%d %H:%M:%S"),
"Image size: %sx%s (HxW)" % (self.height, self.width),
"Image cycles: %s" % len(self.time_indexes),
"Channels: %s" % ", ".join(["'%s'" % str(channel) for channel in self.channels]),
"Fields of View: %s" % len(self.fields_of_view),
"Z-Levels: %s" % len(self.z_levels)
])
def __len__(self):
"""
This should be the total number of images in the ND2, but it may be inaccurate. If the ND2 contains a
different number of images in a cycle (i.e. there are "gap" images) it will be higher than reality.
:rtype: int
"""
return self._total_images_per_channel * len(self.channels)
def __getitem__(self, item):
"""
Allows slicing ND2s.
>>> nd2 = Nd2("my_images.nd2")
>>> image = nd2[16] # gets 17th frame
>>> for image in nd2[100:200]: # iterate over the 100th to 200th images
>>> do_something(image.data)
>>> for image in nd2[::-1]: # iterate backwards
>>> do_something(image.data)
>>> for image in nd2[37:422:17]: # do something super weird if you really want to
>>> do_something(image.data)
:type item: int or slice
:rtype: nd2reader.model.Image() or generator
"""
if isinstance(item, int):
try:
channel_offset = item % len(self.channels)
fov = self._calculate_field_of_view(item)
channel = self._calculate_channel(item)
z_level = self._calculate_z_level(item)
image_group_number = int(item / len(self.channels))
frame_number = self._calculate_frame_number(image_group_number, fov, z_level)
timestamp, raw_image_data = self._get_raw_image_data(image_group_number, channel_offset)
image = Image(timestamp, frame_number, raw_image_data, fov, channel, z_level, self.height, self.width)
except (TypeError, ValueError):
return None
except KeyError:
raise IndexError("Invalid frame number.")
else:
return image
elif isinstance(item, slice):
return self._slice(item.start, item.stop, item.step)
raise IndexError
def _slice(self, start, stop, step):
"""
Allows for iteration over a selection of the entire dataset.
:type start: int
:type stop: int
:type step: int
:rtype: nd2reader.model.Image() or None
"""
start = start if start is not None else 0
step = step if step is not None else 1
stop = stop if stop is not None else len(self)
# This weird thing with the step allows you to iterate backwards over the images
for i in range(start, stop)[::step]:
yield self[i]
@property
def image_sets(self):
"""
Iterates over groups of related images. This is useful if your ND2 contains multiple fields of view.
A typical use case might be that you have, say, four areas of interest that you're monitoring, and every
minute you take a bright field and GFP image of each one. For each cycle, this method would produce four
ImageSet objects, each containing one bright field and one GFP image.
:return: model.ImageSet()
"""
for time_index in self.time_indexes:
image_set = ImageSet()
for fov in self.fields_of_view:
for channel_name in self.channels:
for z_level in self.z_levels:
image = self.get_image(time_index, fov, channel_name, z_level)
if image is not None:
image_set.add(image)
yield image_set
@property
def height(self):
"""
:return: height of each image, in pixels
:rtype: int
"""
return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiHeight')]
@property
def width(self):
"""
:return: width of each image, in pixels
:rtype: int
"""
return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiWidth')]
def get_image(self, frame_number, field_of_view, channel_name, z_level):
"""
Returns an Image if data exists for the given parameters, otherwise returns None. In general, you should avoid
using this method unless you're very familiar with the structure of ND2 files.
:type frame_number: int
:param field_of_view: the label for the place in the XY-plane where this image was taken.
:type field_of_view: int
:param channel_name: the name of the color of this image
:type channel_name: str
:param z_level: the label for the location in the Z-plane where this image was taken.
:type z_level: int
:rtype: nd2reader.model.Image() or None
"""
image_group_number = self._calculate_image_group_number(frame_number, field_of_view, z_level)
try:
timestamp, raw_image_data = self._get_raw_image_data(image_group_number, self._channel_offset[channel_name])
image = Image(timestamp, frame_number, raw_image_data, field_of_view, channel_name, z_level, self.height, self.width)
except TypeError:
return None
else:
return image
from nd2reader.interface import Nd2

+ 1
- 0
nd2reader/driver/__init__.py View File

@ -0,0 +1 @@
from nd2reader.driver.driver import get_driver

+ 11
- 0
nd2reader/driver/driver.py View File

@ -0,0 +1,11 @@
def get_driver(filename, version):
"""
Instantiates the correct driver for the ND2, which allows us to parse metadata and access images.
:param filename: the path to the ND2
:type filename: str
:param version: the version of the ND2. Note that this is different than the version of NIS Elements used to create the ND2.
:type version: tuple
"""
return 1

+ 0
- 0
nd2reader/driver/v2.py View File


+ 386
- 0
nd2reader/driver/v3.py View File

@ -0,0 +1,386 @@
# -*- coding: utf-8 -*-
import array
from datetime import datetime
import numpy as np
import re
import struct
import six
class Nd2Parser(object):
"""
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
You should not ever need to instantiate this class manually unless you're a developer.
"""
CHUNK_HEADER = 0xabeceda
CHUNK_MAP_START = six.b("ND2 FILEMAP SIGNATURE NAME 0001!")
CHUNK_MAP_END = six.b("ND2 CHUNK MAP SIGNATURE 0000001!")
def __init__(self, filename):
self._absolute_start = None
self._filename = filename
self._fh = None
self._channels = None
self._channel_count = None
self._chunk_map_start_location = None
self._cursor_position = 0
self._dimension_text = None
self._fields_of_view = None
self._label_map = {}
self.metadata = {}
self._read_map()
self._time_indexes = None
self._parse_metadata()
self._z_levels = None
@property
def absolute_start(self):
"""
The date and time when acquisition began.
:rtype: datetime.datetime()
"""
if self._absolute_start is None:
for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values():
line = line.decode("utf8")
absolute_start_12 = None
absolute_start_24 = None
# ND2s seem to randomly switch between 12- and 24-hour representations.
try:
absolute_start_24 = datetime.strptime(line, "%m/%d/%Y %H:%M:%S")
except (TypeError, ValueError):
pass
try:
absolute_start_12 = datetime.strptime(line, "%m/%d/%Y %I:%M:%S %p")
except (TypeError, ValueError):
pass
if not absolute_start_12 and not absolute_start_24:
continue
return absolute_start_12 if absolute_start_12 else absolute_start_24
raise ValueError("This ND2 has no recorded start time. This is probably a bug.")
return self._absolute_start
@property
def channels(self):
"""
These are labels created by the NIS Elements user. Typically they may a short description of the filter cube
used (e.g. "bright field", "GFP", etc.)
:rtype: list
"""
if not self._channels:
self._channels = []
metadata = self.metadata[six.b('ImageMetadataSeq')][six.b('SLxPictureMetadata')][six.b('sPicturePlanes')]
try:
validity = self.metadata[six.b('ImageMetadata')][six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')]
except KeyError:
# If none of the channels have been deleted, there is no validity list, so we just make one
validity = [True for _ in metadata]
# Channel information is contained in dictionaries with the keys a0, a1...an where the number
# indicates the order in which the channel is stored. So by sorting the dicts alphabetically
# we get the correct order.
for (label, chan), valid in zip(sorted(metadata[six.b('sPlaneNew')].items()), validity):
if not valid:
continue
self._channels.append(chan[six.b('sDescription')].decode("utf8"))
return self._channels
@property
def fields_of_view(self):
"""
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
:rtype: list
"""
if self._fields_of_view is None:
self._fields_of_view = self._parse_dimension_text(r""".*?XY\((\d+)\).*?""")
return self._fields_of_view
@property
def frames(self):
"""
The number of cycles.
:rtype: list
"""
if self._time_indexes is None:
self._time_indexes = self._parse_dimension_text(r""".*?T'\((\d+)\).*?""")
return self._time_indexes
@property
def z_levels(self):
"""
The different levels in the Z-plane. Just a sequence from 0 to n.
:rtype: list
"""
if self._z_levels is None:
self._z_levels = self._parse_dimension_text(r""".*?Z\((\d+)\).*?""")
return self._z_levels
def _calculate_field_of_view(self, frame_number):
images_per_cycle = len(self.z_levels) * len(self.channels)
return int((frame_number - (frame_number % images_per_cycle)) / images_per_cycle) % len(self.fields_of_view)
def _calculate_channel(self, frame_number):
return self.channels[frame_number % len(self.channels)]
def _calculate_z_level(self, frame_number):
return self.z_levels[int(((frame_number - (frame_number % len(self.channels))) / len(self.channels)) % len(self.z_levels))]
@property
def _file_handle(self):
if self._fh is None:
self._fh = open(self._filename, "rb")
return self._fh
def _get_raw_image_data(self, image_group_number, channel_offset):
"""
Reads the raw bytes and the timestamp of an image.
:param image_group_number: groups are made of images with the same time index, field of view and z-level.
:type image_group_number: int
:param channel_offset: the offset in the array where the bytes for this image are found.
:type channel_offset: int
:return: (int, array.array()) or None
"""
chunk = self._label_map[six.b("ImageDataSeq|%d!" % image_group_number)]
data = self._read_chunk(chunk)
# All images in the same image group share the same timestamp! So if you have complicated image data,
# your timestamps may not be entirely accurate. Practically speaking though, they'll only be off by a few
# seconds unless you're doing something super weird.
timestamp = struct.unpack("d", data[:8])[0]
image_group_data = array.array("H", data)
image_data_start = 4 + channel_offset
# The images for the various channels are interleaved within the same array. For example, the second image
# of a four image group will be composed of bytes 2, 6, 10, etc. If you understand why someone would design
# a data structure that way, please send the author of this library a message.
image_data = image_group_data[image_data_start::len(self.channels)]
# Skip images that are all zeros! This is important, since NIS Elements creates blank "gap" images if you
# don't have the same number of images each cycle. We discovered this because we only took GFP images every
# other cycle to reduce phototoxicity, but NIS Elements still allocated memory as if we were going to take
# them every cycle.
if np.any(image_data):
return timestamp, image_data
return None
@property
def _dimensions(self):
"""
While there are metadata values that represent a lot of what we want to capture, they seem to be unreliable.
Sometimes certain elements don't exist, or change their data type randomly. However, the human-readable text
is always there and in the same exact format, so we just parse that instead.
:rtype: str
"""
if self._dimension_text is None:
for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values():
if six.b("Dimensions:") in line:
metadata = line
break
else:
raise ValueError("Could not parse metadata dimensions!")
for line in metadata.split(six.b("\r\n")):
if line.startswith(six.b("Dimensions:")):
self._dimension_text = line
break
else:
raise ValueError("Could not parse metadata dimensions!")
return self._dimension_text
def _calculate_image_group_number(self, time_index, fov, z_level):
"""
Images are grouped together if they share the same time index, field of view, and z-level.
:type time_index: int
:type fov: int
:type z_level: int
:rtype: int
"""
return time_index * len(self.fields_of_view) * len(self.z_levels) + (fov * len(self.z_levels) + z_level)
def _calculate_frame_number(self, image_group_number, fov, z_level):
return (image_group_number - (fov * len(self.z_levels) + z_level)) / (len(self.fields_of_view) * len(self.z_levels))
@property
def _channel_offset(self):
"""
Image data is interleaved for each image set. That is, if there are four images in a set, the first image
will consist of pixels 1, 5, 9, etc, the second will be pixels 2, 6, 10, and so forth.
:rtype: dict
"""
channel_offset = {}
for n, channel in enumerate(self._channels):
channel_offset[channel] = n
return channel_offset
def _parse_dimension_text(self, pattern):
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return [0]
except TypeError:
match = re.match(pattern, self._dimensions.decode("utf8"))
if not match:
return [0]
return list(range(int(match.group(1))))
else:
return list(range(count))
@property
def _total_images_per_channel(self):
"""
The total number of images per channel. Warning: this may be inaccurate as it includes "gap" images.
:rtype: int
"""
return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiSequenceCount')]
def _parse_metadata(self):
"""
Reads all metadata.
"""
for label in self._label_map.keys():
if label.endswith(six.b("LV!")) or six.b("LV|") in label:
data = self._read_chunk(self._label_map[label])
stop = label.index(six.b("LV"))
self.metadata[label[:stop]] = self._read_metadata(data, 1)
def _read_map(self):
"""
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
"""
self._file_handle.seek(-8, 2)
chunk_map_start_location = struct.unpack("Q", self._file_handle.read(8))[0]
self._file_handle.seek(chunk_map_start_location)
raw_text = self._file_handle.read(-1)
label_start = raw_text.index(Nd2Parser.CHUNK_MAP_START) + 32
while True:
data_start = raw_text.index(six.b("!"), label_start) + 1
key = raw_text[label_start: data_start]
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
if key == Nd2Parser.CHUNK_MAP_END:
# We've reached the end of the chunk map
break
self._label_map[key] = location
label_start = data_start + 16
def _read_chunk(self, chunk_location):
"""
Gets the data for a given chunk pointer
"""
self._file_handle.seek(chunk_location)
# The chunk metadata is always 16 bytes long
chunk_metadata = self._file_handle.read(16)
header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
if header != Nd2Parser.CHUNK_HEADER:
raise ValueError("The ND2 file seems to be corrupted.")
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata.
self._file_handle.seek(chunk_location + 16 + relative_offset)
return self._file_handle.read(data_length)
def _parse_unsigned_char(self, data):
return struct.unpack("B", data.read(1))[0]
def _parse_unsigned_int(self, data):
return struct.unpack("I", data.read(4))[0]
def _parse_unsigned_long(self, data):
return struct.unpack("Q", data.read(8))[0]
def _parse_double(self, data):
return struct.unpack("d", data.read(8))[0]
def _parse_string(self, data):
value = data.read(2)
while not value.endswith(six.b("\x00\x00")):
# the string ends at the first instance of \x00\x00
value += data.read(2)
return value.decode("utf16")[:-1].encode("utf8")
def _parse_char_array(self, data):
array_length = struct.unpack("Q", data.read(8))[0]
return array.array("B", data.read(array_length))
def _parse_metadata_item(self, data):
"""
Reads hierarchical data, analogous to a Python dict.
"""
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - self._cursor_position
next_data_length = data.read(length)
value = self._read_metadata(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
return value
def _get_value(self, data, data_type):
"""
ND2s use various codes to indicate different data types, which we translate here.
"""
parser = {1: self._parse_unsigned_char,
2: self._parse_unsigned_int,
3: self._parse_unsigned_int,
5: self._parse_unsigned_long,
6: self._parse_double,
8: self._parse_string,
9: self._parse_char_array,
11: self._parse_metadata_item}
return parser[data_type](data)
def _read_metadata(self, data, count):
"""
Iterates over each element some section of the metadata and parses it.
"""
data = six.BytesIO(data)
metadata = {}
for _ in range(count):
self._cursor_position = data.tell()
header = data.read(2)
if not header:
# We've reached the end of some hierarchy of data
break
if six.PY3:
header = header.decode("utf8")
data_type, name_length = map(ord, header)
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
value = self._get_value(data, data_type)
if name not in metadata.keys():
metadata[name] = value
else:
if not isinstance(metadata[name], list):
# We have encountered this key exactly once before. Since we're seeing it again, we know we
# need to convert it to a list before proceeding.
metadata[name] = [metadata[name]]
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
# the value to the already-existing list.
metadata[name].append(value)
return metadata

+ 36
- 0
nd2reader/driver/version.py View File

@ -0,0 +1,36 @@
import re
class InvalidVersionError(Exception):
pass
def get_version(filename):
"""
Determines what version the ND2 is.
:param filename: the path (absolute or relative) to the ND2
:type filename: str
"""
with open(filename, 'rb') as f:
# the first 16 bytes seem to have no meaning, so we skip them
f.seek(16)
# the next 38 bytes contain the string that we want to parse. Unlike most of the ND2, this is in UTF-8
data = f.read(38).decode("utf8")
return parse_version(data)
def parse_version(data):
"""
Parses a string with the version data in it.
:param data: the 19th through 54th byte of the ND2, representing the version
:type data: unicode
"""
match = re.search(r"""^ND2 FILE SIGNATURE CHUNK NAME01!Ver(?P<major>\d)\.(?P<minor>\d)$""", data)
if match:
# We haven't seen a lot of ND2s but the ones we have seen conform to this
return int(match.group('major')), int(match.group('minor'))
raise InvalidVersionError("The version of the ND2 you specified is not supported.")

+ 154
- 0
nd2reader/interface.py View File

@ -0,0 +1,154 @@
# -*- coding: utf-8 -*-
from nd2reader.model import Image, ImageGroup
from nd2reader.driver import get_driver
from nd2reader.driver.version import get_version
import six
class Nd2(object):
"""
Allows easy access to NIS Elements .nd2 image files.
"""
def __init__(self, filename):
version = get_version(filename)
self._driver = get_driver(filename, version)
self._metadata = self._driver.get_metadata()
def __repr__(self):
return "\n".join(["<ND2 %s>" % self._driver._filename,
"Created: %s" % self._driver.absolute_start,
"Image size: %sx%s (HxW)" % (self.height, self.width),
"Frames: %s" % len(self.frames),
"Channels: %s" % ", ".join(["'%s'" % str(channel) for channel in self.channels]),
"Fields of View: %s" % len(self.fields_of_view),
"Z-Levels: %s" % len(self.z_levels)
])
def __len__(self):
"""
This should be the total number of images in the ND2, but it may be inaccurate. If the ND2 contains a
different number of images in a cycle (i.e. there are "gap" images) it will be higher than reality.
:rtype: int
"""
return self._driver.total_images_per_channel * len(self.channels)
def __getitem__(self, item):
"""
Allows slicing ND2s.
>>> nd2 = Nd2("my_images.nd2")
>>> image = nd2[16] # gets 17th frame
>>> for image in nd2[100:200]: # iterate over the 100th to 200th images
>>> do_something(image.data)
>>> for image in nd2[::-1]: # iterate backwards
>>> do_something(image.data)
>>> for image in nd2[37:422:17]: # do something super weird if you really want to
>>> do_something(image.data)
:type item: int or slice
:rtype: nd2reader.model.Image() or generator
"""
if isinstance(item, int):
try:
channel_offset = item % len(self.channels)
fov = self._calculate_field_of_view(item)
channel = self._calculate_channel(item)
z_level = self._calculate_z_level(item)
image_group_number = int(item / len(self.channels))
frame_number = self._calculate_frame_number(image_group_number, fov, z_level)
timestamp, raw_image_data = self._get_raw_image_data(image_group_number, channel_offset)
image = Image(timestamp, frame_number, raw_image_data, fov, channel, z_level, self.height, self.width)
except (TypeError, ValueError):
return None
except KeyError:
raise IndexError("Invalid frame number.")
else:
return image
elif isinstance(item, slice):
return self._slice(item.start, item.stop, item.step)
raise IndexError
def _slice(self, start, stop, step):
"""
Allows for iteration over a selection of the entire dataset.
:type start: int
:type stop: int
:type step: int
:rtype: nd2reader.model.Image() or None
"""
start = start if start is not None else 0
step = step if step is not None else 1
stop = stop if stop is not None else len(self)
# This weird thing with the step allows you to iterate backwards over the images
for i in range(start, stop)[::step]:
yield self[i]
@property
def image_sets(self):
"""
Iterates over groups of related images. This is useful if your ND2 contains multiple fields of view.
A typical use case might be that you have, say, four areas of interest that you're monitoring, and every
minute you take a bright field and GFP image of each one. For each cycle, this method would produce four
ImageSet objects, each containing one bright field and one GFP image.
:return: model.ImageSet()
"""
for time_index in self.time_indexes:
image_set = ImageGroup()
for fov in self.fields_of_view:
for channel_name in self.channels:
for z_level in self.z_levels:
image = self.get_image(time_index, fov, channel_name, z_level)
if image is not None:
image_set.add(image)
yield image_set
@property
def height(self):
"""
:return: height of each image, in pixels
:rtype: int
"""
return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiHeight')]
@property
def width(self):
"""
:return: width of each image, in pixels
:rtype: int
"""
return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiWidth')]
def get_image(self, frame_number, field_of_view, channel_name, z_level):
"""
Returns an Image if data exists for the given parameters, otherwise returns None. In general, you should avoid
using this method unless you're very familiar with the structure of ND2 files.
:type frame_number: int
:param field_of_view: the label for the place in the XY-plane where this image was taken.
:type field_of_view: int
:param channel_name: the name of the color of this image
:type channel_name: str
:param z_level: the label for the location in the Z-plane where this image was taken.
:type z_level: int
:rtype: nd2reader.model.Image() or None
"""
image_group_number = self._calculate_image_group_number(frame_number, field_of_view, z_level)
try:
timestamp, raw_image_data = self._get_raw_image_data(image_group_number, self._channel_offset[channel_name])
image = Image(timestamp, frame_number, raw_image_data, field_of_view, channel_name, z_level, self.height, self.width)
except TypeError:
return None
else:
return image

+ 2
- 155
nd2reader/model/__init__.py View File

@ -1,155 +1,2 @@
# -*- coding: utf-8 -*-
import collections
import numpy as np
import logging
log = logging.getLogger(__name__)
class Image(object):
def __init__(self, timestamp, frame_number, raw_array, field_of_view, channel, z_level, height, width):
"""
A wrapper around the raw pixel data of an image.
:param timestamp: The frame number relative to the .
:type timestamp: int
:param timestamp: The number of milliseconds after the beginning of the acquisition that this image was taken.
:type timestamp: int
:param raw_array: The raw sequence of bytes that represents the image.
:type raw_array: array.array()
:param field_of_view: The label for the place in the XY-plane where this image was taken.
:type field_of_view: int
:param channel: The name of the color of this image
:type channel: str
:param z_level: The label for the location in the Z-plane where this image was taken.
:type z_level: int
:param height: The height of the image in pixels.
:type height: int
:param width: The width of the image in pixels.
:type width: int
"""
self._timestamp = timestamp
self._frame_number = int(frame_number)
self._raw_data = raw_array
self._field_of_view = field_of_view
self._channel = channel
self._z_level = z_level
self._height = height
self._width = width
self._data = None
def __repr__(self):
return "\n".join(["<ND2 Image>",
"%sx%s (HxW)" % (self._height, self._width),
"Timestamp: %s" % self.timestamp,
"Frame: %s" % self._frame_number,
"Field of View: %s" % self.field_of_view,
"Channel: %s" % self.channel,
"Z-Level: %s" % self.z_level,
])
@property
def data(self):
"""
The actual image data.
:rtype np.array()
"""
if self._data is None:
# The data is just a 1-dimensional array originally.
# We convert it to a 2D image here.
self._data = np.reshape(self._raw_data, (self._height, self._width))
return self._data
@property
def field_of_view(self):
"""
Which of the fixed locations this image was taken at.
:rtype int:
"""
return self._field_of_view
@property
def timestamp(self):
"""
The number of seconds after the beginning of the acquisition that the image was taken. Note that for a given
field of view and z-level offset, if you have images of multiple channels, they will all be given the same
timestamp. No, this doesn't make much sense. But that's how ND2s are structured, so if your experiment depends
on millisecond accuracy, you need to find an alternative imaging system.
:rtype float:
"""
return self._timestamp / 1000.0
@property
def frame_number(self):
return self._frame_number
@property
def channel(self):
"""
The name of the filter used to acquire this image. These are user-supplied in NIS Elements.
:rtype str:
"""
return self._channel
@property
def z_level(self):
"""
The vertical offset of the image. These are simple integers starting from 0, where the 0 is the lowest
z-level and each subsequent level incremented by 1.
For example, if you acquired images at -3 µm, 0 µm, and +3 µm, your z-levels would be:
-3 µm: 0
0 µm: 1
+3 µm: 2
:rtype int:
"""
return self._z_level
class ImageSet(object):
"""
A group of images that were taken at roughly the same time.
"""
def __init__(self):
self._images = collections.defaultdict(dict)
def __len__(self):
""" The number of images in the image set. """
return sum([len(channel) for channel in self._images.values()])
def __repr__(self):
return "\n".join(["<ND2 Image Set>",
"Image count: %s" % len(self)])
def get(self, channel, z_level=0):
"""
Retrieve an image with a given channel and z-level. For most users, z_level will always be 0.
:type channel: str
:type z_level: int
"""
return self._images.get(channel).get(z_level)
def add(self, image):
"""
Stores an image.
:type image: nd2reader.model.Image()
"""
self._images[image.channel][image.z_level] = image
from nd2reader.model.image import Image
from nd2reader.model.group import ImageGroup

+ 37
- 0
nd2reader/model/group.py View File

@ -0,0 +1,37 @@
import collections
class ImageGroup(object):
"""
A group of images that were taken at roughly the same time and in the same field of view.
"""
def __init__(self):
self._images = collections.defaultdict(dict)
def __len__(self):
""" The number of images in the image set. """
return sum([len(channel) for channel in self._images.values()])
def __repr__(self):
return "\n".join(["<ND2 Image Group>",
"Image count: %s" % len(self)])
def get(self, channel, z_level=0):
"""
Retrieve an image with a given channel and z-level. For most users, z_level will always be 0.
:type channel: str
:type z_level: int
"""
return self._images.get(channel).get(z_level)
def add(self, image):
"""
Stores an image.
:type image: nd2reader.model.Image()
"""
self._images[image.channel][image.z_level] = image

+ 115
- 0
nd2reader/model/image.py View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
import numpy as np
class Image(object):
def __init__(self, timestamp, frame_number, raw_array, field_of_view, channel, z_level, height, width):
"""
A wrapper around the raw pixel data of an image.
:param timestamp: The frame number relative to the .
:type timestamp: int
:param timestamp: The number of milliseconds after the beginning of the acquisition that this image was taken.
:type timestamp: int
:param raw_array: The raw sequence of bytes that represents the image.
:type raw_array: array.array()
:param field_of_view: The label for the place in the XY-plane where this image was taken.
:type field_of_view: int
:param channel: The name of the color of this image
:type channel: str
:param z_level: The label for the location in the Z-plane where this image was taken.
:type z_level: int
:param height: The height of the image in pixels.
:type height: int
:param width: The width of the image in pixels.
:type width: int
"""
self._timestamp = timestamp
self._frame_number = int(frame_number)
self._raw_data = raw_array
self._field_of_view = field_of_view
self._channel = channel
self._z_level = z_level
self._height = height
self._width = width
self._data = None
def __repr__(self):
return "\n".join(["<ND2 Image>",
"%sx%s (HxW)" % (self._height, self._width),
"Timestamp: %s" % self.timestamp,
"Frame: %s" % self._frame_number,
"Field of View: %s" % self.field_of_view,
"Channel: %s" % self.channel,
"Z-Level: %s" % self.z_level,
])
@property
def data(self):
"""
The actual image data.
:rtype np.array()
"""
if self._data is None:
# The data is just a 1-dimensional array originally.
# We convert it to a 2D image here.
self._data = np.reshape(self._raw_data, (self._height, self._width))
return self._data
@property
def field_of_view(self):
"""
Which of the fixed locations this image was taken at.
:rtype int:
"""
return self._field_of_view
@property
def timestamp(self):
"""
The number of seconds after the beginning of the acquisition that the image was taken. Note that for a given
field of view and z-level offset, if you have images of multiple channels, they will all be given the same
timestamp. No, this doesn't make much sense. But that's how ND2s are structured, so if your experiment depends
on millisecond accuracy, you need to find an alternative imaging system.
:rtype float:
"""
return self._timestamp / 1000.0
@property
def frame_number(self):
return self._frame_number
@property
def channel(self):
"""
The name of the filter used to acquire this image. These are user-supplied in NIS Elements.
:rtype str:
"""
return self._channel
@property
def z_level(self):
"""
The vertical offset of the image. These are simple integers starting from 0, where the 0 is the lowest
z-level and each subsequent level incremented by 1.
For example, if you acquired images at -3 µm, 0 µm, and +3 µm, your z-levels would be:
-3 µm: 0
0 µm: 1
+3 µm: 2
:rtype int:
"""
return self._z_level

+ 0
- 386
nd2reader/parser.py View File

@ -1,386 +0,0 @@
# -*- coding: utf-8 -*-
import array
from datetime import datetime
import numpy as np
import re
import struct
import six
class Nd2Parser(object):
"""
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
You should not ever need to instantiate this class manually unless you're a developer.
"""
CHUNK_HEADER = 0xabeceda
CHUNK_MAP_START = six.b("ND2 FILEMAP SIGNATURE NAME 0001!")
CHUNK_MAP_END = six.b("ND2 CHUNK MAP SIGNATURE 0000001!")
def __init__(self, filename):
self._absolute_start = None
self._filename = filename
self._fh = None
self._channels = None
self._channel_count = None
self._chunk_map_start_location = None
self._cursor_position = 0
self._dimension_text = None
self._fields_of_view = None
self._label_map = {}
self.metadata = {}
self._read_map()
self._time_indexes = None
self._parse_metadata()
self._z_levels = None
@property
def absolute_start(self):
"""
The date and time when acquisition began.
:rtype: datetime.datetime()
"""
if self._absolute_start is None:
for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values():
line = line.decode("utf8")
absolute_start_12 = None
absolute_start_24 = None
# ND2s seem to randomly switch between 12- and 24-hour representations.
try:
absolute_start_24 = datetime.strptime(line, "%m/%d/%Y %H:%M:%S")
except (TypeError, ValueError):
pass
try:
absolute_start_12 = datetime.strptime(line, "%m/%d/%Y %I:%M:%S %p")
except (TypeError, ValueError):
pass
if not absolute_start_12 and not absolute_start_24:
continue
return absolute_start_12 if absolute_start_12 else absolute_start_24
raise ValueError("This ND2 has no recorded start time. This is probably a bug.")
return self._absolute_start
@property
def channels(self):
"""
These are labels created by the NIS Elements user. Typically they may a short description of the filter cube
used (e.g. "bright field", "GFP", etc.)
:rtype: list
"""
if not self._channels:
self._channels = []
metadata = self.metadata[six.b('ImageMetadataSeq')][six.b('SLxPictureMetadata')][six.b('sPicturePlanes')]
try:
validity = self.metadata[six.b('ImageMetadata')][six.b('SLxExperiment')][six.b('ppNextLevelEx')][six.b('')][0][six.b('ppNextLevelEx')][six.b('')][0][six.b('pItemValid')]
except KeyError:
# If none of the channels have been deleted, there is no validity list, so we just make one
validity = [True for _ in metadata]
# Channel information is contained in dictionaries with the keys a0, a1...an where the number
# indicates the order in which the channel is stored. So by sorting the dicts alphabetically
# we get the correct order.
for (label, chan), valid in zip(sorted(metadata[six.b('sPlaneNew')].items()), validity):
if not valid:
continue
self._channels.append(chan[six.b('sDescription')].decode("utf8"))
return self._channels
@property
def fields_of_view(self):
"""
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
:rtype: list
"""
if self._fields_of_view is None:
self._fields_of_view = self._parse_dimension_text(r""".*?XY\((\d+)\).*?""")
return self._fields_of_view
@property
def time_indexes(self):
"""
The number of cycles.
:rtype: list
"""
if self._time_indexes is None:
self._time_indexes = self._parse_dimension_text(r""".*?T'\((\d+)\).*?""")
return self._time_indexes
@property
def z_levels(self):
"""
The different levels in the Z-plane. Just a sequence from 0 to n.
:rtype: list
"""
if self._z_levels is None:
self._z_levels = self._parse_dimension_text(r""".*?Z\((\d+)\).*?""")
return self._z_levels
def _calculate_field_of_view(self, frame_number):
images_per_cycle = len(self.z_levels) * len(self.channels)
return int((frame_number - (frame_number % images_per_cycle)) / images_per_cycle) % len(self.fields_of_view)
def _calculate_channel(self, frame_number):
return self.channels[frame_number % len(self.channels)]
def _calculate_z_level(self, frame_number):
return self.z_levels[int(((frame_number - (frame_number % len(self.channels))) / len(self.channels)) % len(self.z_levels))]
@property
def _file_handle(self):
if self._fh is None:
self._fh = open(self._filename, "rb")
return self._fh
def _get_raw_image_data(self, image_group_number, channel_offset):
"""
Reads the raw bytes and the timestamp of an image.
:param image_group_number: groups are made of images with the same time index, field of view and z-level.
:type image_group_number: int
:param channel_offset: the offset in the array where the bytes for this image are found.
:type channel_offset: int
:return: (int, array.array()) or None
"""
chunk = self._label_map[six.b("ImageDataSeq|%d!" % image_group_number)]
data = self._read_chunk(chunk)
# All images in the same image group share the same timestamp! So if you have complicated image data,
# your timestamps may not be entirely accurate. Practically speaking though, they'll only be off by a few
# seconds unless you're doing something super weird.
timestamp = struct.unpack("d", data[:8])[0]
image_group_data = array.array("H", data)
image_data_start = 4 + channel_offset
# The images for the various channels are interleaved within the same array. For example, the second image
# of a four image group will be composed of bytes 2, 6, 10, etc. If you understand why someone would design
# a data structure that way, please send the author of this library a message.
image_data = image_group_data[image_data_start::len(self.channels)]
# Skip images that are all zeros! This is important, since NIS Elements creates blank "gap" images if you
# don't have the same number of images each cycle. We discovered this because we only took GFP images every
# other cycle to reduce phototoxicity, but NIS Elements still allocated memory as if we were going to take
# them every cycle.
if np.any(image_data):
return timestamp, image_data
return None
@property
def _dimensions(self):
"""
While there are metadata values that represent a lot of what we want to capture, they seem to be unreliable.
Sometimes certain elements don't exist, or change their data type randomly. However, the human-readable text
is always there and in the same exact format, so we just parse that instead.
:rtype: str
"""
if self._dimension_text is None:
for line in self.metadata[six.b('ImageTextInfo')][six.b('SLxImageTextInfo')].values():
if six.b("Dimensions:") in line:
metadata = line
break
else:
raise ValueError("Could not parse metadata dimensions!")
for line in metadata.split(six.b("\r\n")):
if line.startswith(six.b("Dimensions:")):
self._dimension_text = line
break
else:
raise ValueError("Could not parse metadata dimensions!")
return self._dimension_text
def _calculate_image_group_number(self, time_index, fov, z_level):
"""
Images are grouped together if they share the same time index, field of view, and z-level.
:type time_index: int
:type fov: int
:type z_level: int
:rtype: int
"""
return time_index * len(self.fields_of_view) * len(self.z_levels) + (fov * len(self.z_levels) + z_level)
def _calculate_frame_number(self, image_group_number, fov, z_level):
return (image_group_number - (fov * len(self.z_levels) + z_level)) / (len(self.fields_of_view) * len(self.z_levels))
@property
def _channel_offset(self):
"""
Image data is interleaved for each image set. That is, if there are four images in a set, the first image
will consist of pixels 1, 5, 9, etc, the second will be pixels 2, 6, 10, and so forth.
:rtype: dict
"""
channel_offset = {}
for n, channel in enumerate(self._channels):
channel_offset[channel] = n
return channel_offset
def _parse_dimension_text(self, pattern):
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return [0]
except TypeError:
match = re.match(pattern, self._dimensions.decode("utf8"))
if not match:
return [0]
return list(range(int(match.group(1))))
else:
return list(range(count))
@property
def _total_images_per_channel(self):
"""
The total number of images per channel. Warning: this may be inaccurate as it includes "gap" images.
:rtype: int
"""
return self.metadata[six.b('ImageAttributes')][six.b('SLxImageAttributes')][six.b('uiSequenceCount')]
def _parse_metadata(self):
"""
Reads all metadata.
"""
for label in self._label_map.keys():
if label.endswith(six.b("LV!")) or six.b("LV|") in label:
data = self._read_chunk(self._label_map[label])
stop = label.index(six.b("LV"))
self.metadata[label[:stop]] = self._read_metadata(data, 1)
def _read_map(self):
"""
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
"""
self._file_handle.seek(-8, 2)
chunk_map_start_location = struct.unpack("Q", self._file_handle.read(8))[0]
self._file_handle.seek(chunk_map_start_location)
raw_text = self._file_handle.read(-1)
label_start = raw_text.index(Nd2Parser.CHUNK_MAP_START) + 32
while True:
data_start = raw_text.index(six.b("!"), label_start) + 1
key = raw_text[label_start: data_start]
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
if key == Nd2Parser.CHUNK_MAP_END:
# We've reached the end of the chunk map
break
self._label_map[key] = location
label_start = data_start + 16
def _read_chunk(self, chunk_location):
"""
Gets the data for a given chunk pointer
"""
self._file_handle.seek(chunk_location)
# The chunk metadata is always 16 bytes long
chunk_metadata = self._file_handle.read(16)
header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
if header != Nd2Parser.CHUNK_HEADER:
raise ValueError("The ND2 file seems to be corrupted.")
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata.
self._file_handle.seek(chunk_location + 16 + relative_offset)
return self._file_handle.read(data_length)
def _parse_unsigned_char(self, data):
return struct.unpack("B", data.read(1))[0]
def _parse_unsigned_int(self, data):
return struct.unpack("I", data.read(4))[0]
def _parse_unsigned_long(self, data):
return struct.unpack("Q", data.read(8))[0]
def _parse_double(self, data):
return struct.unpack("d", data.read(8))[0]
def _parse_string(self, data):
value = data.read(2)
while not value.endswith(six.b("\x00\x00")):
# the string ends at the first instance of \x00\x00
value += data.read(2)
return value.decode("utf16")[:-1].encode("utf8")
def _parse_char_array(self, data):
array_length = struct.unpack("Q", data.read(8))[0]
return array.array("B", data.read(array_length))
def _parse_metadata_item(self, data):
"""
Reads hierarchical data, analogous to a Python dict.
"""
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - self._cursor_position
next_data_length = data.read(length)
value = self._read_metadata(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
return value
def _get_value(self, data, data_type):
"""
ND2s use various codes to indicate different data types, which we translate here.
"""
parser = {1: self._parse_unsigned_char,
2: self._parse_unsigned_int,
3: self._parse_unsigned_int,
5: self._parse_unsigned_long,
6: self._parse_double,
8: self._parse_string,
9: self._parse_char_array,
11: self._parse_metadata_item}
return parser[data_type](data)
def _read_metadata(self, data, count):
"""
Iterates over each element some section of the metadata and parses it.
"""
data = six.BytesIO(data)
metadata = {}
for _ in range(count):
self._cursor_position = data.tell()
header = data.read(2)
if not header:
# We've reached the end of some hierarchy of data
break
if six.PY3:
header = header.decode("utf8")
data_type, name_length = map(ord, header)
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
value = self._get_value(data, data_type)
if name not in metadata.keys():
metadata[name] = value
else:
if not isinstance(metadata[name], list):
# We have encountered this key exactly once before. Since we're seeing it again, we know we
# need to convert it to a list before proceeding.
metadata[name] = [metadata[name]]
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
# the value to the already-existing list.
metadata[name].append(value)
return metadata

+ 183
- 183
tests/__init__.py View File

@ -1,183 +1,183 @@
from nd2reader.parser import Nd2Parser
import unittest
class MockNd2Parser(object):
def __init__(self, channels, fields_of_view, z_levels):
self.channels = channels
self.fields_of_view = fields_of_view
self.z_levels = z_levels
class TestNd2Parser(unittest.TestCase):
def test_calculate_field_of_view_simple(self):
""" With a single field of view, the field of view should always be the same number (0). """
nd2 = MockNd2Parser([''], [0], [0])
for frame_number in range(1000):
result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
self.assertEqual(result, 0)
def test_calculate_field_of_view_two_channels(self):
nd2 = MockNd2Parser(['', 'GFP'], [0], [0])
for frame_number in range(1000):
result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
self.assertEqual(result, 0)
def test_calculate_field_of_view_three_channels(self):
nd2 = MockNd2Parser(['', 'GFP', 'dsRed'], [0], [0])
for frame_number in range(1000):
result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
self.assertEqual(result, 0)
def test_calculate_field_of_view_two_fovs(self):
nd2 = MockNd2Parser([''], [0, 1], [0])
for frame_number in range(1000):
result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
self.assertEqual(result, frame_number % 2)
def test_calculate_field_of_view_two_fovs_two_zlevels(self):
nd2 = MockNd2Parser([''], [0, 1], [0, 1])
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 0), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 1), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 2), 1)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 3), 1)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 4), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 5), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 6), 1)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 7), 1)
def test_calculate_field_of_view_two_everything(self):
nd2 = MockNd2Parser(['', 'GFP'], [0, 1], [0, 1])
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 0), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 1), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 2), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 3), 0)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 4), 1)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 5), 1)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 6), 1)
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 7), 1)
def test_calculate_field_of_view_7c2f2z(self):
nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], [0, 1], [0, 1])
for i in range(14):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
for i in range(14, 28):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
for i in range(28, 42):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
for i in range(42, 56):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
for i in range(56, 70):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
for i in range(70, 84):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
def test_calculate_field_of_view_2c3f5z(self):
""" All prime numbers to elucidate any errors that won't show up when numbers are multiples of each other """
nd2 = MockNd2Parser(['', 'GFP'], [0, 1, 2], [0, 1, 2, 3, 4])
for i in range(10):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
for i in range(10, 20):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
for i in range(20, 30):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 2)
for i in range(30, 40):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
for i in range(40, 50):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
for i in range(50, 60):
self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 2)
def test_calculate_channel_simple(self):
nd2 = MockNd2Parser(['GFP'], [0], [0])
for i in range(1000):
self.assertEqual(Nd2Parser._calculate_channel(nd2, i), 'GFP')
def test_calculate_channel(self):
nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], [0], [0])
for i in range(1000):
for n, channel in enumerate(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], start=i*7):
self.assertEqual(Nd2Parser._calculate_channel(nd2, n), channel)
def test_calculate_channel_7c2fov1z(self):
nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], [0, 1], [0])
for i in range(1000):
for n, channel in enumerate(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], start=i*7):
self.assertEqual(Nd2Parser._calculate_channel(nd2, n), channel)
def test_calculate_channel_ludicrous_values(self):
nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], list(range(31)), list(range(17)))
for i in range(10000):
for n, channel in enumerate(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], start=i*7):
self.assertEqual(Nd2Parser._calculate_channel(nd2, n), channel)
def test_calculate_z_level(self):
nd2 = MockNd2Parser([''], [0], [0])
for frame_number in range(1000):
result = Nd2Parser._calculate_z_level(nd2, frame_number)
self.assertEqual(result, 0)
def test_calculate_z_level_1c1f2z(self):
nd2 = MockNd2Parser([''], [0], [0, 1])
for frame_number in range(1000):
result = Nd2Parser._calculate_z_level(nd2, frame_number)
self.assertEqual(result, frame_number % 2)
def test_calculate_z_level_31c17f1z(self):
nd2 = MockNd2Parser(list(range(31)), list(range(17)), [0])
for frame_number in range(1000):
result = Nd2Parser._calculate_z_level(nd2, frame_number)
self.assertEqual(result, 0)
def test_calculate_z_level_2c1f2z(self):
nd2 = MockNd2Parser(['', 'GFP'], [0], [0, 1])
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 0), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 1), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 2), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 3), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 4), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 5), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 6), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 7), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 8), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 9), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 10), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 11), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 12), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 13), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 14), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 15), 1)
def test_calculate_z_level_2c3f5z(self):
nd2 = MockNd2Parser(['', 'GFP'], [0, 1, 2], [0, 1, 2, 3, 4])
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 0), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 1), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 2), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 3), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 4), 2)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 5), 2)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 6), 3)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 7), 3)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 8), 4)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 9), 4)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 10), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 11), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 12), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 13), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 14), 2)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 15), 2)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 16), 3)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 17), 3)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 18), 4)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 19), 4)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 20), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 21), 0)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 22), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 23), 1)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 24), 2)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 25), 2)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 26), 3)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 27), 3)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 28), 4)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 29), 4)
self.assertEqual(Nd2Parser._calculate_z_level(nd2, 30), 0)
# from nd2reader.parser import Nd2Parser
# import unittest
#
#
# class MockNd2Parser(object):
# def __init__(self, channels, fields_of_view, z_levels):
# self.channels = channels
# self.fields_of_view = fields_of_view
# self.z_levels = z_levels
#
#
# class TestNd2Parser(unittest.TestCase):
# def test_calculate_field_of_view_simple(self):
# """ With a single field of view, the field of view should always be the same number (0). """
# nd2 = MockNd2Parser([''], [0], [0])
# for frame_number in range(1000):
# result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
# self.assertEqual(result, 0)
#
# def test_calculate_field_of_view_two_channels(self):
# nd2 = MockNd2Parser(['', 'GFP'], [0], [0])
# for frame_number in range(1000):
# result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
# self.assertEqual(result, 0)
#
# def test_calculate_field_of_view_three_channels(self):
# nd2 = MockNd2Parser(['', 'GFP', 'dsRed'], [0], [0])
# for frame_number in range(1000):
# result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
# self.assertEqual(result, 0)
#
# def test_calculate_field_of_view_two_fovs(self):
# nd2 = MockNd2Parser([''], [0, 1], [0])
# for frame_number in range(1000):
# result = Nd2Parser._calculate_field_of_view(nd2, frame_number)
# self.assertEqual(result, frame_number % 2)
#
# def test_calculate_field_of_view_two_fovs_two_zlevels(self):
# nd2 = MockNd2Parser([''], [0, 1], [0, 1])
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 0), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 1), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 2), 1)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 3), 1)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 4), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 5), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 6), 1)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 7), 1)
#
# def test_calculate_field_of_view_two_everything(self):
# nd2 = MockNd2Parser(['', 'GFP'], [0, 1], [0, 1])
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 0), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 1), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 2), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 3), 0)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 4), 1)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 5), 1)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 6), 1)
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, 7), 1)
#
# def test_calculate_field_of_view_7c2f2z(self):
# nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], [0, 1], [0, 1])
# for i in range(14):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
# for i in range(14, 28):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
# for i in range(28, 42):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
# for i in range(42, 56):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
# for i in range(56, 70):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
# for i in range(70, 84):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
#
# def test_calculate_field_of_view_2c3f5z(self):
# """ All prime numbers to elucidate any errors that won't show up when numbers are multiples of each other """
# nd2 = MockNd2Parser(['', 'GFP'], [0, 1, 2], [0, 1, 2, 3, 4])
# for i in range(10):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
# for i in range(10, 20):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
# for i in range(20, 30):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 2)
# for i in range(30, 40):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 0)
# for i in range(40, 50):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 1)
# for i in range(50, 60):
# self.assertEqual(Nd2Parser._calculate_field_of_view(nd2, i), 2)
#
# def test_calculate_channel_simple(self):
# nd2 = MockNd2Parser(['GFP'], [0], [0])
# for i in range(1000):
# self.assertEqual(Nd2Parser._calculate_channel(nd2, i), 'GFP')
#
# def test_calculate_channel(self):
# nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], [0], [0])
# for i in range(1000):
# for n, channel in enumerate(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], start=i*7):
# self.assertEqual(Nd2Parser._calculate_channel(nd2, n), channel)
#
# def test_calculate_channel_7c2fov1z(self):
# nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], [0, 1], [0])
# for i in range(1000):
# for n, channel in enumerate(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], start=i*7):
# self.assertEqual(Nd2Parser._calculate_channel(nd2, n), channel)
#
# def test_calculate_channel_ludicrous_values(self):
# nd2 = MockNd2Parser(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], list(range(31)), list(range(17)))
# for i in range(10000):
# for n, channel in enumerate(['', 'GFP', 'dsRed', 'dTomato', 'lulzBlue', 'jimbotronPurple', 'orange'], start=i*7):
# self.assertEqual(Nd2Parser._calculate_channel(nd2, n), channel)
#
# def test_calculate_z_level(self):
# nd2 = MockNd2Parser([''], [0], [0])
# for frame_number in range(1000):
# result = Nd2Parser._calculate_z_level(nd2, frame_number)
# self.assertEqual(result, 0)
#
# def test_calculate_z_level_1c1f2z(self):
# nd2 = MockNd2Parser([''], [0], [0, 1])
# for frame_number in range(1000):
# result = Nd2Parser._calculate_z_level(nd2, frame_number)
# self.assertEqual(result, frame_number % 2)
#
# def test_calculate_z_level_31c17f1z(self):
# nd2 = MockNd2Parser(list(range(31)), list(range(17)), [0])
# for frame_number in range(1000):
# result = Nd2Parser._calculate_z_level(nd2, frame_number)
# self.assertEqual(result, 0)
#
# def test_calculate_z_level_2c1f2z(self):
# nd2 = MockNd2Parser(['', 'GFP'], [0], [0, 1])
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 0), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 1), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 2), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 3), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 4), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 5), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 6), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 7), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 8), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 9), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 10), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 11), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 12), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 13), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 14), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 15), 1)
#
# def test_calculate_z_level_2c3f5z(self):
# nd2 = MockNd2Parser(['', 'GFP'], [0, 1, 2], [0, 1, 2, 3, 4])
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 0), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 1), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 2), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 3), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 4), 2)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 5), 2)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 6), 3)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 7), 3)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 8), 4)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 9), 4)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 10), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 11), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 12), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 13), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 14), 2)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 15), 2)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 16), 3)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 17), 3)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 18), 4)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 19), 4)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 20), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 21), 0)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 22), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 23), 1)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 24), 2)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 25), 2)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 26), 3)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 27), 3)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 28), 4)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 29), 4)
# self.assertEqual(Nd2Parser._calculate_z_level(nd2, 30), 0)

+ 0
- 0
tests/driver/__init__.py View File


+ 7
- 0
tests/driver/driver.py View File

@ -0,0 +1,7 @@
import unittest
from nd2reader.driver import get_driver
class TestDriver(unittest.TestCase):
def test_get_driver(self):
pass

+ 16
- 0
tests/driver/version.py View File

@ -0,0 +1,16 @@
import unittest
from nd2reader.driver.version import parse_version
class VersionTests(unittest.TestCase):
def test_parse_version_2(self):
data = 'ND2 FILE SIGNATURE CHUNK NAME01!Ver2.2'
actual = parse_version(data)
expected = (2, 2)
self.assertTupleEqual(actual, expected)
def test_parse_version_3(self):
data = 'ND2 FILE SIGNATURE CHUNK NAME01!Ver3.0'
actual = parse_version(data)
expected = (3, 0)
self.assertTupleEqual(actual, expected)

Loading…
Cancel
Save