Browse Source

refactored a bit

jim 10 years ago
4 changed files with 364 additions and 357 deletions
  1. +7
  2. +43
  3. +311
  4. +3

+ 7
- 338
nd2reader/ View File

@ -1,342 +1,11 @@
import array
import numpy as np
import struct
from collections import namedtuple
from StringIO import StringIO
from nd2reader.model import Channel
from pprint import pprint
import logging
from nd2reader.service import BaseNd2
chunk = namedtuple('Chunk', ['location', 'length'])
field_of_view = namedtuple('FOV', ['number', 'x', 'y', 'z', 'pfs_offset'])
log = logging.getLogger("nd2reader")
class Nd2(object):
class Nd2(BaseNd2):
def __init__(self, filename):
self._parser = Nd2Parser(filename)
def timepoint_count(self):
return len(self._parser.metadata['ImageEvents']['RLxExperimentRecord']['pEvents'][''])
def height(self):
return self._parser.metadata['ImageAttributes']['SLxImageAttributes']['uiHeight']
def width(self):
return self._parser.metadata['ImageAttributes']['SLxImageAttributes']['uiWidth']
def fields_of_view(self):
Fields of view are the various places in the xy-plane where images were taken.
# Grab all the metadata about fields of view
fov_metadata = self.metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']
# The attributes include x, y, and z coordinates, and perfect focus (PFS) offset
fov_attributes = fov_metadata['uLoopPars']['Points']['']
# If you crop fields of view from your ND2 file, the metadata is retained and only this list is
# updated to indicate that the fields of view have been deleted.
fov_validity = fov_metadata['pItemValid']
# We only yield valid (i.e. uncropped) fields of view
for number, (fov, valid) in enumerate(zip(fov_attributes, fov_validity)):
if valid:
yield field_of_view(number=number + 1,
def fov_count(self):
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
return sum(self.metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']['pItemValid'])
def channels(self):
metadata = self.metadata['ImageMetadataSeq']['SLxPictureMetadata']['sPicturePlanes']
validity = self.metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']['ppNextLevelEx']['']['pItemValid']
# Channel information is contained in dictionaries with the keys a0, where the number
# indicates the order in which the channel is stored. So by sorting the dicts alphabetically
# we get the correct order.
for (label, chan), valid in zip(sorted(metadata['sPlaneNew'].items()), validity):
if not valid:
name = chan['sDescription']
exposure_time = metadata['sSampleSetting'][label]['dExposureTime']
camera = metadata['sSampleSetting'][label]['pCameraSetting']['CameraUserName']
yield Channel(name, camera, exposure_time)
def channel_count(self):
return self.metadata['ImageAttributes']["SLxImageAttributes"]["uiComp"]
def zoom_levels(self):
for i in self.metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']['ppNextLevelEx'][''].items():
yield i
def z_level_count(self):
The number of different z-axis levels.
return 1
def metadata(self):
return self._parser.metadata
def get_images(self, fov_number, channel_name, z_axis):
def get_image(self, nr):
d = self._parser._read_chunk(self._parser._label_map["ImageDataSeq|%d!" % nr].location)
timestamp = struct.unpack("d", d[:8])[0]
res = [timestamp]
# The images for the various channels are interleaved within each other.
for i in range(self.channel_count):
a = array.array("H", d)
# TODO: Are you missing a zoom level? Is there extra data here? Can you get timestamps now?
return res
class Nd2Parser(object):
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
def __init__(self, filename):
self._filename = filename
self._file_handler = None
self._chunk_map_start_location = None
self._label_map = {}
self._metadata = {}
def fh(self):
if self._file_handler is None:
self._file_handler = open(self._filename, "rb")
return self._file_handler
def _parse_dict_data(self):
# TODO: Don't like this name
for label in self._top_level_dict_labels:
chunk_location = self._label_map[label].location
data = self._read_chunk(chunk_location)
stop = label.index("LV")
self._metadata[label[:stop]] = self.read_lv_encoding(data, 1)
def metadata(self):
return self._metadata
def _top_level_dict_labels(self):
# TODO: I don't like this name either
for label in self._label_map.keys():
if label.endswith("LV!") or "LV|" in label:
yield label
def _read_map(self):
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
raw_text = self._get_raw_chunk_map_text()
label_start = self._find_first_label_offset(raw_text)
while True:
data_start = self._get_data_start(label_start, raw_text)
label, value = self._extract_map_key(label_start, data_start, raw_text)
if label == "ND2 CHUNK MAP SIGNATURE 0000001!":
# We've reached the end of the chunk map
self._label_map[label] = value
label_start = data_start + 16
def _find_first_label_offset(raw_text):
The chunk map starts with some number of (seemingly) useless bytes, followed
by "ND2 FILEMAP SIGNATURE NAME 0001!". We return the location of the first character after this sequence,
which is the actual beginning of the chunk map.
return raw_text.index("ND2 FILEMAP SIGNATURE NAME 0001!") + 32
def _get_data_start(label_start, raw_text):
The data for a given label begins immediately after the first exclamation point
return raw_text.index("!", label_start) + 1
def _extract_map_key(label_start, data_start, raw_text):
Chunk map entries are a string label of arbitrary length followed by 16 bytes of data, which represent
the byte offset from the beginning of the file where that data can be found.
key = raw_text[label_start: data_start]
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
return key, chunk(location=location, length=length)
def chunk_map_start_location(self):
The position in bytes from the beginning of the file where the chunk map begins.
The chunk map is a series of string labels followed by the position (in bytes) of the respective data.
if self._chunk_map_start_location is None:
# Put the cursor 8 bytes before the end of the file, 2)
# Read the last 8 bytes of the file
self._chunk_map_start_location = struct.unpack("Q",[0]
return self._chunk_map_start_location
def _read_chunk(self, chunk_location):
Gets the data for a given chunk pointer
chunk_data = self._read_chunk_metadata()
header, relative_offset, data_length = self._parse_chunk_metadata(chunk_data)
return self._read_chunk_data(chunk_location, relative_offset, data_length)
def _read_chunk_metadata(self):
Gets the chunks metadata, which is always 16 bytes
def _read_chunk_data(self, chunk_location, relative_offset, data_length):
Reads the actual data for a given chunk
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata. + 16 + relative_offset)
def _parse_chunk_metadata(chunk_data):
Finds out everything about a given chunk. Every chunk begins with the same value, so if that's ever
different we can assume the file has suffered some kind of damage.
header, relative_offset, data_length = struct.unpack("IIQ", chunk_data)
if header != 0xabeceda:
raise ValueError("The ND2 file seems to be corrupted.")
return header, relative_offset, data_length
def _get_raw_chunk_map_text(self):
Reads the entire chunk map and returns it as a string.
def as_numpy_array(arr):
return np.frombuffer(arr)
def read_lv_encoding(self, data, count):
data = StringIO(data)
res = {}
for c in range(count):
lastpos = data.tell()
hdr =
if not hdr:
typ = ord(hdr[0])
bname =*ord(hdr[1]))
name = bname.decode("utf16")[:-1].encode("utf8")
if typ == 1:
value, = struct.unpack("B",
elif typ in [2, 3]:
value, = struct.unpack("I",
elif typ == 5:
value, = struct.unpack("Q",
elif typ == 6:
value, = struct.unpack("d",
elif typ == 8:
value =
while value[-2:] != "\x00\x00":
value +=
value = value.decode("utf16")[:-1].encode("utf8")
elif typ == 9:
cnt, = struct.unpack("Q",
value = array.array("B",
elif typ == 11:
newcount, length = struct.unpack("<IQ",
length -= data.tell()-lastpos
nextdata =
value = self.read_lv_encoding(nextdata, newcount)
# XXX do not know for what these offsets? are
unknown = array.array("I",*8))
assert 0, "%s hdr %x:%x unknown" % (name, ord(hdr[0]), ord(hdr[1]))
if not name in res:
res[name] = value
if not isinstance(res[name], list):
res[name] = [res[name]]
x =
assert not x, "skip %d %s" % (len(x), repr(x[:30]))
return res
# class LVLine(object):
# def __init__(self, line):
# self._line = line
# self._extract()
# def _extract(self):
# if self._type == 11:
# count, length = struct.unpack("<IQ", self._line[self._name_end: self._name_end + 12])
# newline = self._line[self._name_end + 12:]
# @property
# def name(self):
# return self._line[2: self._name_end].decode("utf16").encode("utf8")
# @property
# def _type(self):
# return ord(self._line[0])
# @property
# def _name_end(self):
# """
# Length is given as number of characters, but since it's unicode (which is two-bytes per character) we return
# twice the number.
# """
# return ord(self._line[1]) * 2
# class LVData(object):
# def __init__(self, data):
# self._extracted_data = LVLine(data)
super(Nd2, self).__init__(filename)

+ 43
- 2
nd2reader/model/ View File

@ -1,3 +1,7 @@
import numpy as np
class Channel(object):
def __init__(self, name, camera, exposure_time):
self._name = name
@ -19,6 +23,43 @@ class Channel(object):
return self._exposure_time
class Image(object):
class ImageSet(object):
A group of images that share the same timestamp. NIS Elements doesn't store a unique timestamp for every
image, rather, it stores one for each set of images that share the same field of view and z-axis level.
def __init__(self):
self.timestamp = None
self._images = []
def add(self, image):
:type image: nd2reader.model.Image()
def __iter__(self):
for image in self._images:
yield image
class Image(object):
def __init__(self, timestamp, raw_array, height, width):
self._timestamp = timestamp
self._raw_data = raw_array
self._height = height
self._width = width
def timestamp(self):
# TODO: Convert to datetime object
return self._timestamp
def data(self):
return np.reshape(self._raw_data, (self._height, self._width))
def show(self):

+ 311
- 0
nd2reader/service/ View File

@ -0,0 +1,311 @@
import array
import numpy as np
import struct
from StringIO import StringIO
from collections import namedtuple
import logging
from nd2reader.model import Channel, ImageSet, Image
log = logging.getLogger("nd2reader")
chunk = namedtuple('Chunk', ['location', 'length'])
field_of_view = namedtuple('FOV', ['number', 'x', 'y', 'z', 'pfs_offset'])
class BaseNd2(object):
def __init__(self, filename):
self._parser = Nd2Reader(filename)
def height(self):
return self._parser.metadata['ImageAttributes']['SLxImageAttributes']['uiHeight']
def width(self):
return self._parser.metadata['ImageAttributes']['SLxImageAttributes']['uiWidth']
def _get_timepoint_count(self):
return len(self._parser.metadata['ImageEvents']['RLxExperimentRecord']['pEvents'][''])
def _fields_of_view(self):
Fields of view are the various places in the xy-plane where images were taken.
# Grab all the metadata about fields of view
fov_metadata = self._metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']
# The attributes include x, y, and z coordinates, and perfect focus (PFS) offset
fov_attributes = fov_metadata['uLoopPars']['Points']['']
# If you crop fields of view from your ND2 file, the metadata is retained and only this list is
# updated to indicate that the fields of view have been deleted.
fov_validity = fov_metadata['pItemValid']
# We only yield valid (i.e. uncropped) fields of view
for number, (fov, valid) in enumerate(zip(fov_attributes, fov_validity)):
if valid:
yield field_of_view(number=number + 1,
def _fov_count(self):
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
return sum(self._metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']['pItemValid'])
def _channels(self):
metadata = self._metadata['ImageMetadataSeq']['SLxPictureMetadata']['sPicturePlanes']
validity = self._metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']['ppNextLevelEx']['']['pItemValid']
# Channel information is contained in dictionaries with the keys a0, where the number
# indicates the order in which the channel is stored. So by sorting the dicts alphabetically
# we get the correct order.
for (label, chan), valid in zip(sorted(metadata['sPlaneNew'].items()), validity):
if not valid:
name = chan['sDescription']
exposure_time = metadata['sSampleSetting'][label]['dExposureTime']
camera = metadata['sSampleSetting'][label]['pCameraSetting']['CameraUserName']
yield Channel(name, camera, exposure_time)
def _channel_count(self):
return self._metadata['ImageAttributes']["SLxImageAttributes"]["uiComp"]
# @property
# def _z_levels(self):
# for i in self._metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx']['']['ppNextLevelEx'][''].items():
# yield i
# @property
# def _z_level_count(self):
# """
# The number of different z-axis levels.
# """
# return 1
def _metadata(self):
return self._parser.metadata
def _get_image_set(self, nr):
chunk = self._parser._label_map["ImageDataSeq|%d!" % nr]
d = self._parser._read_chunk(chunk.location)
timestamp = struct.unpack("d", d[:8])[0]
image_set = ImageSet()
# The images for the various channels are interleaved within each other.
for i in range(self._channel_count):
image_data = array.array("H", d)
image = Image(timestamp, image_data[4+i::self._channel_count], self.height, self.width)
return image_set
class Nd2Reader(object):
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
def __init__(self, filename):
self._filename = filename
self._file_handler = None
self._chunk_map_start_location = None
self._label_map = {}
self._metadata = {}
def fh(self):
if self._file_handler is None:
self._file_handler = open(self._filename, "rb")
return self._file_handler
def _parse_dict_data(self):
# TODO: Don't like this name
for label in self._top_level_dict_labels:
chunk_location = self._label_map[label].location
data = self._read_chunk(chunk_location)
stop = label.index("LV")
self._metadata[label[:stop]] = self.read_lv_encoding(data, 1)
def metadata(self):
return self._metadata
def _top_level_dict_labels(self):
# TODO: I don't like this name either
for label in self._label_map.keys():
if label.endswith("LV!") or "LV|" in label:
yield label
def _read_map(self):
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
raw_text = self._get_raw_chunk_map_text()
label_start = self._find_first_label_offset(raw_text)
while True:
data_start = self._get_data_start(label_start, raw_text)
label, value = self._extract_map_key(label_start, data_start, raw_text)
if label == "ND2 CHUNK MAP SIGNATURE 0000001!":
# We've reached the end of the chunk map
self._label_map[label] = value
label_start = data_start + 16
def _find_first_label_offset(raw_text):
The chunk map starts with some number of (seemingly) useless bytes, followed
by "ND2 FILEMAP SIGNATURE NAME 0001!". We return the location of the first character after this sequence,
which is the actual beginning of the chunk map.
return raw_text.index("ND2 FILEMAP SIGNATURE NAME 0001!") + 32
def _get_data_start(label_start, raw_text):
The data for a given label begins immediately after the first exclamation point
return raw_text.index("!", label_start) + 1
def _extract_map_key(label_start, data_start, raw_text):
Chunk map entries are a string label of arbitrary length followed by 16 bytes of data, which represent
the byte offset from the beginning of the file where that data can be found.
key = raw_text[label_start: data_start]
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
return key, chunk(location=location, length=length)
def chunk_map_start_location(self):
The position in bytes from the beginning of the file where the chunk map begins.
The chunk map is a series of string labels followed by the position (in bytes) of the respective data.
if self._chunk_map_start_location is None:
# Put the cursor 8 bytes before the end of the file, 2)
# Read the last 8 bytes of the file
self._chunk_map_start_location = struct.unpack("Q",[0]
return self._chunk_map_start_location
def _read_chunk(self, chunk_location):
Gets the data for a given chunk pointer
chunk_data = self._read_chunk_metadata()
header, relative_offset, data_length = self._parse_chunk_metadata(chunk_data)
return self._read_chunk_data(chunk_location, relative_offset, data_length)
def _read_chunk_metadata(self):
Gets the chunks metadata, which is always 16 bytes
def _read_chunk_data(self, chunk_location, relative_offset, data_length):
Reads the actual data for a given chunk
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata. + 16 + relative_offset)
def _parse_chunk_metadata(chunk_data):
Finds out everything about a given chunk. Every chunk begins with the same value, so if that's ever
different we can assume the file has suffered some kind of damage.
header, relative_offset, data_length = struct.unpack("IIQ", chunk_data)
if header != 0xabeceda:
raise ValueError("The ND2 file seems to be corrupted.")
return header, relative_offset, data_length
def _get_raw_chunk_map_text(self):
Reads the entire chunk map and returns it as a string.
def as_numpy_array(arr):
return np.frombuffer(arr)
def read_lv_encoding(self, data, count):
data = StringIO(data)
res = {}
total_count = 0
for c in range(count):
lastpos = data.tell()
total_count += 1
# log.debug("%s: %s" % (total_count, lastpos))
hdr =
if not hdr:
typ = ord(hdr[0])
bname =*ord(hdr[1]))
name = bname.decode("utf16")[:-1].encode("utf8")
if typ == 1:
value, = struct.unpack("B",
elif typ in [2, 3]:
value, = struct.unpack("I",
elif typ == 5:
value, = struct.unpack("Q",
elif typ == 6:
value, = struct.unpack("d",
elif typ == 8:
value =
while value[-2:] != "\x00\x00":
value +=
value = value.decode("utf16")[:-1].encode("utf8")
elif typ == 9:
cnt, = struct.unpack("Q",
value = array.array("B",
elif typ == 11:
newcount, length = struct.unpack("<IQ",
length -= data.tell()-lastpos
nextdata =
value = self.read_lv_encoding(nextdata, newcount)
# Skip some offsets * 8)
assert 0, "%s hdr %x:%x unknown" % (name, ord(hdr[0]), ord(hdr[1]))
if not name in res:
res[name] = value
if not isinstance(res[name], list):
res[name] = [res[name]]
x =
assert not x, "skip %d %s" % (len(x), repr(x[:30]))
return res

+ 3
- 17 View File

@ -5,20 +5,6 @@ from skimage import io
# n = Nd2("/home/jim/Desktop/nd2hacking/test-141111.nd2")
n = Nd2("/home/jim/Desktop/nd2hacking/YFP-dsRed-GFP-BF.nd2")
# print("Height: ", n.height)
# print("Width: ", n.width)
# for fov in n.fields_of_view:
# print(fov.number, fov.x, fov.y, fov.z, fov.pfs_offset)
for chan in n.channels:
# pprint(len(n.metadata['ImageMetadata']['SLxExperiment']['uLoopPars']['pPeriod']['']))
# res = n.get_image(6)
# print(res[0])
# arr = np.reshape(res[1], (n.height, n.width))
# io.imshow(arr)
# pprint(n.metadata)
image_set = n._get_image_set(3)
for image in image_set:
