Browse Source

Merge pull request #35 from jimrybarski/pypi-refactor

Pypi refactor
zolfa-add_slices_loading
Jim Rybarski 9 years ago
parent
commit
e7a2f821c7
16 changed files with 709 additions and 728 deletions
  1. +4
    -0
      CONTRIBUTORS.txt
  2. +13
    -0
      Dockerfile
  3. +0
    -0
      LICENSE.txt
  4. +7
    -0
      Makefile
  5. +107
    -11
      README.md
  6. +86
    -29
      nd2reader/__init__.py
  7. +94
    -214
      nd2reader/model/__init__.py
  8. +378
    -0
      nd2reader/parser.py
  9. +0
    -439
      nd2reader/service/__init__.py
  10. +1
    -0
      requirements.txt
  11. +2
    -0
      setup.cfg
  12. +17
    -6
      setup.py
  13. +0
    -9
      tests.py
  14. +0
    -0
      tests/__init__.py
  15. +0
    -20
      tests/model/__init__.py
  16. +0
    -0
      tests/service/__init__.py

+ 4
- 0
CONTRIBUTORS.txt View File

@ -0,0 +1,4 @@
Author: Jim Rybarski <jim@rybarski.com>
nd2reader is based on the read_nd2 module from the SLOTH library (http://pythonhosted.org/SLOTH/_modules/sloth/read_nd2.html).
Thanks to M.Kauer and B.Kauer for solving the hardest part of parsing ND2s.

+ 13
- 0
Dockerfile View File

@ -0,0 +1,13 @@
FROM ubuntu
MAINTAINER Jim Rybarski <jim@rybarski.com>
RUN mkdir -p /var/nds2
RUN apt-get update && apt-get install -y \
python-numpy
COPY . /opt/nd2reader
WORKDIR /opt/nd2reader
RUN python setup.py install
WORKDIR /var/nd2s
CMD /usr/bin/python2.7

LICENSE → LICENSE.txt View File


+ 7
- 0
Makefile View File

@ -0,0 +1,7 @@
.PHONY: build shell
build:
docker build -t jimrybarski/nd2reader .
shell:
docker run --rm -v ~/Documents/nd2s:/var/nd2s -it jimrybarski/nd2reader

+ 107
- 11
README.md View File

@ -1,20 +1,116 @@
nd2reader
=========
# nd2reader
## Simple access to hierarchical .nd2 files
### About
# About
`nd2reader` is a pure-Python package that reads images produced by NIS Elements.
`nd2reader` is a pure-Python package that reads images produced by Nikon microscopes. Though it more or less works, it is currently under development and is not quite ready for use by the general public. Version 1.0 should be released in early 2015.
.nd2 files contain images and metadata, which can be split along multiple dimensions: time, fields of view (xy-plane), focus (z-plane), and filter channel.
.nd2 files contain images and metadata, which can be split along multiple dimensions: time, fields of view (xy-axis), focus (z-axis), and filter channel. `nd2reader` allows you to view any subset of images based on any or all of these dimensions.
`nd2reader` produces data in numpy arrays, which makes it trivial to use with the image analysis packages such as `scikit-image` and `OpenCV`.
`nd2reader` holds data in numpy arrays, which makes it trivial to use with the image analysis packages `scikit-image` and `OpenCV`.
### Installation
# Dependencies
Just use pip:
numpy
`pip install nd2reader`
# Installation
If you want to install via git, clone the repo and run:
I'll write this eventually.
`python setup.py install`
### ND2s
A quick summary of ND2 metadata can be obtained as shown below.
```python
>>> import nd2reader
>>> nd2 = nd2reader.Nd2("/path/to/my_images.nd2")
>>> nd2
<ND2 /path/to/my_images.nd2>
Created: 2014-11-11 15:59:19
Image size: 1280x800 (HxW)
Image cycles: 636
Channels: '', 'GFP'
Fields of View: 8
Z-Levels: 3
```
### Simple Iteration
For most cases, you'll just want to iterate over each image:
```python
import nd2reader
nd2 = nd2reader.Nd2("/path/to/my_images.nd2")
for image in nd2:
do_something(image.data)
```
### Image Sets
If you have complicated hierarchical data, it may be easier to use image sets, which groups images together if they
share the same time index and field of view:
```python
import nd2reader
nd2 = nd2reader.Nd2("/path/to/my_complicated_images.nd2")
for image_set in nd2.image_sets:
# you can select images by channel
gfp_image = image_set.get("GFP")
do_something_gfp_related(gfp_image)
# you can also specify the z-level. this defaults to 0 if not given
out_of_focus_image = image_set.get("Bright Field", z_level=1)
do_something_out_of_focus_related(out_of_focus_image)
```
### Direct Image Access
There is a method, `get_image`, which allows random access to images. This might not always return an image, however,
if you acquired different numbers of images in each cycle of a program. For example, if you acquire GFP images every
other minute, but acquire bright field images every minute, `get_image` will return `None` at certain time indexes.
### Images
`Image` objects provide several pieces of useful data.
```python
>>> import nd2reader
>>> nd2 = nd2reader.Nd2("/path/to/my_images.nd2")
>>> image = nd2.get_image(14, 2, "GFP", 1)
>>> image.data
array([[1809, 1783, 1830, ..., 1923, 1920, 1914],
[1687, 1855, 1792, ..., 1986, 1903, 1889],
[1758, 1901, 1849, ..., 1911, 2010, 1954],
...,
[3363, 3370, 3570, ..., 3565, 3601, 3459],
[3480, 3428, 3328, ..., 3542, 3461, 3575],
[3497, 3666, 3635, ..., 3817, 3867, 3779]])
>>> image.channel
'GFP'
>>> image.timestamp
1699.7947813408175
>>> image.field_of_view
2
>>> image.z_level
1
# You can also get a quick summary of image data:
>>> image
<ND2 Image>
1280x800 (HxW)
Timestamp: 1699.79478134
Field of View: 2
Channel: GFP
Z-Level: 1
```
### Bug Reports and Features
If this fails to work exactly as expected, please open a Github issue. If you get an unhandled exception, please
paste the entire stack trace into the issue as well.
### Contributing
Please feel free to submit a pull request with any new features you think would be useful. You can also create an
issue if you'd just like to propose or discuss a potential idea.

+ 86
- 29
nd2reader/__init__.py View File

@ -1,47 +1,104 @@
import logging
from nd2reader.service import BaseNd2
# -*- coding: utf-8 -*-
from nd2reader.model import Image, ImageSet
from nd2reader.parser import Nd2Parser
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
class Nd2(Nd2Parser):
"""
Allows easy access to NIS Elements .nd2 image files.
class Nd2(BaseNd2):
"""
def __init__(self, filename):
super(Nd2, self).__init__(filename)
self._filename = filename
def __repr__(self):
return "\n".join(["<ND2 %s>" % self._filename,
"Created: %s" % self._absolute_start.strftime("%Y-%m-%d %H:%M:%S"),
"Image size: %sx%s (HxW)" % (self.height, self.width),
"Image cycles: %s" % self._time_index_count,
"Channels: %s" % ", ".join(["'%s'" % channel for channel in self._channels]),
"Fields of View: %s" % self._field_of_view_count,
"Z-Levels: %s" % self._z_level_count
])
def get_image(self, time_index, fov, channel_name, z_level):
image_set_number = self._calculate_image_set_number(time_index, fov, z_level)
timestamp, raw_image_data = self._reader.get_raw_image_data(image_set_number, self.channel_offset[channel_name])
return Image(timestamp, raw_image_data, fov, channel_name, z_level, self.height, self.width)
@property
def height(self):
"""
:return: height of each image, in pixels
:rtype: int
"""
return self.metadata['ImageAttributes']['SLxImageAttributes']['uiHeight']
@property
def width(self):
"""
:return: width of each image, in pixels
:rtype: int
"""
return self.metadata['ImageAttributes']['SLxImageAttributes']['uiWidth']
def __iter__(self):
"""
Just return every image in order (might not be exactly the order that the images were physically taken, but it will
be within a few seconds). A better explanation is probably needed here.
Iterates over every image, in the order they were taken.
:return: model.Image()
"""
for i in range(self._image_count):
for fov in range(self.field_of_view_count):
for z_level in range(self.z_level_count):
for channel in self.channels:
image = self.get_image(i, fov, channel.name, z_level)
if image.is_valid:
for fov in range(self._field_of_view_count):
for z_level in range(self._z_level_count):
for channel_name in self._channels:
image = self.get_image(i, fov, channel_name, z_level)
if image is not None:
yield image
def image_sets(self, field_of_view, time_indices=None, channels=None, z_levels=None):
"""
Gets all the images for a given field of view and
@property
def image_sets(self):
"""
timepoint_set = xrange(self.time_index_count) if time_indices is None else time_indices
channel_set = [channel.name for channel in self.channels] if channels is None else channels
z_level_set = xrange(self.z_level_count) if z_levels is None else z_levels
Iterates over groups of related images. This is useful if your ND2 contains multiple fields of view.
A typical use case might be that you have, say, four areas of interest that you're monitoring, and every
minute you take a bright field and GFP image of each one. For each cycle, this method would produce four
ImageSet objects, each containing one bright field and one GFP image.
:return: model.ImageSet()
for timepoint in timepoint_set:
"""
for time_index in xrange(self._time_index_count):
image_set = ImageSet()
for channel_name in channel_set:
for z_level in z_level_set:
image = self.get_image(timepoint, field_of_view, channel_name, z_level)
if image.is_valid:
image_set.add(image)
yield image_set
for fov in range(self._field_of_view_count):
for channel_name in self._channels:
for z_level in xrange(self._z_level_count):
image = self.get_image(time_index, fov, channel_name, z_level)
if image is not None:
image_set.add(image)
yield image_set
def get_image(self, time_index, field_of_view, channel_name, z_level):
"""
Returns an Image if data exists for the given parameters, otherwise returns None. In general, you should avoid
using this method unless you're very familiar with the structure of ND2 files. If you have a use case that
cannot be met by the `__iter__` or `image_sets` methods above, please create an issue on Github.
:param time_index: the frame number
:type time_index: int
:param field_of_view: the label for the place in the XY-plane where this image was taken.
:type field_of_view: int
:param channel_name: the name of the color of this image
:type channel_name: str
:param z_level: the label for the location in the Z-plane where this image was taken.
:type z_level: int
:rtype: nd2reader.model.Image() or None
"""
image_set_number = self._calculate_image_group_number(time_index, field_of_view, z_level)
try:
timestamp, raw_image_data = self._get_raw_image_data(image_set_number, self._channel_offset[channel_name])
image = Image(timestamp, raw_image_data, field_of_view, channel_name, z_level, self.height, self.width)
except TypeError:
return None
else:
return image

+ 94
- 214
nd2reader/model/__init__.py View File

@ -1,56 +1,33 @@
# -*- coding: utf-8 -*-
import collections
import numpy as np
import skimage.io
import logging
from io import BytesIO
import array
import struct
log = logging.getLogger(__name__)
class Channel(object):
def __init__(self, name, camera, exposure_time):
self._name = name
self._camera = camera
self._exposure_time = exposure_time
@property
def name(self):
return self._name
@property
def camera(self):
return self._camera
@property
def exposure_time(self):
return self._exposure_time
class ImageSet(object):
"""
A group of images that share the same timestamp. NIS Elements doesn't store a unique timestamp for every
image, rather, it stores one for each set of images that share the same field of view and z-axis level.
"""
def __init__(self):
self._images = []
def add(self, image):
class Image(object):
def __init__(self, timestamp, raw_array, field_of_view, channel, z_level, height, width):
"""
:type image: nd2reader.model.Image()
A wrapper around the raw pixel data of an image.
:param timestamp: The number of milliseconds after the beginning of the acquisition that this image was taken.
:type timestamp: int
:param raw_array: The raw sequence of bytes that represents the image.
:type raw_array: array.array()
:param field_of_view: The label for the place in the XY-plane where this image was taken.
:type field_of_view: int
:param channel: The name of the color of this image
:type channel: str
:param z_level: The label for the location in the Z-plane where this image was taken.
:type z_level: int
:param height: The height of the image in pixels.
:type height: int
:param width: The width of the image in pixels.
:type width: int
"""
self._images.append(image)
def __iter__(self):
for image in self._images:
yield image
class Image(object):
def __init__(self, timestamp, raw_array, field_of_view, channel, z_level, height, width):
self._timestamp = timestamp
self._raw_data = raw_array
self._field_of_view = field_of_view
@ -60,208 +37,111 @@ class Image(object):
self._width = width
self._data = None
def __repr__(self):
return "\n".join(["<ND2 Image>",
"%sx%s (HxW)" % (self._height, self._width),
"Timestamp: %s" % self.timestamp,
"Field of View: %s" % self.field_of_view,
"Channel: %s" % self.channel,
"Z-Level: %s" % self.z_level,
])
@property
def data(self):
"""
The actual image data.
:rtype np.array()
"""
if self._data is None:
# The data is just a 1-dimensional array originally.
# We convert it to a 2D image here.
self._data = np.reshape(self._raw_data, (self._height, self._width))
return self._data
@property
def field_of_view(self):
"""
Which of the fixed locations this image was taken at.
:rtype int:
"""
return self._field_of_view
@property
def timestamp(self):
"""
The number of seconds after the beginning of the acquisition that the image was taken. Note that for a given field
of view and z-level offset, if you have images of multiple channels, they will all be given the same timestamp.
No, this doesn't make much sense. But that's how ND2s are structured, so if your experiment depends on millisecond
accuracy, you need to find an alternative imaging system.
The number of seconds after the beginning of the acquisition that the image was taken. Note that for a given
field of view and z-level offset, if you have images of multiple channels, they will all be given the same
timestamp. No, this doesn't make much sense. But that's how ND2s are structured, so if your experiment depends
on millisecond accuracy, you need to find an alternative imaging system.
:rtype float:
"""
return self._timestamp / 1000.0
@property
def channel(self):
"""
The name of the filter used to acquire this image. These are user-supplied in NIS Elements.
:rtype str:
"""
return self._channel
@property
def z_level(self):
return self._z_level
@property
def data(self):
if self._data is None:
# The data is just a flat, 1-dimensional array. We convert it to a 2D array and cast the data points as 16-bit integers
self._data = np.reshape(self._raw_data, (self._height, self._width)).astype(np.int64).astype(np.uint16)
return self._data
"""
The vertical offset of the image. These are simple integers starting from 0, where the 0 is the lowest
z-level and each subsequent level incremented by 1.
@property
def is_valid(self):
return np.any(self.data)
For example, if you acquired images at -3 µm, 0 µm, and +3 µm, your z-levels would be:
def show(self):
skimage.io.imshow(self.data)
skimage.io.show()
-3 µm: 0
0 µm: 1
+3 µm: 2
:rtype int:
class MetadataItem(object):
def __init__(self, start, data):
self._datatype = ord(data[start])
self._label_length = 2 * ord(data[start + 1])
self._data = data
"""
return self._z_level
@property
def is_valid(self):
return self._datatype > 0
@property
def key(self):
return self._data[2:self._label_length].decode("utf16").encode("utf8")
class ImageSet(object):
"""
A group of images that were taken at roughly the same time.
@property
def length(self):
return self._length
"""
def __init__(self):
self._images = collections.defaultdict(dict)
@property
def data_start(self):
return self._label_length + 2
def __len__(self):
""" The number of images in the image set. """
return sum([len(channel) for channel in self._images.values()])
@property
def _body(self):
"""
All data after the header.
def __repr__(self):
return "\n".join(["<ND2 Image Set>",
"Image count: %s" % len(self)])
def get(self, channel, z_level=0):
"""
return self._data[self.data_start:]
Retrieve an image with a given channel and z-level. For most users, z_level will always be 0.
def _get_bytes(self, count):
return self._data[self.data_start: self.data_start + count]
:type channel: str
:type z_level: int
@property
def value(self):
parser = {1: self._parse_unsigned_char,
2: self._parse_unsigned_int,
3: self._parse_unsigned_int,
5: self._parse_unsigned_long,
6: self._parse_double,
8: self._parse_string,
9: self._parse_char_array,
11: self._parse_metadata_item
}
return parser[self._datatype]()
def _parse_unsigned_char(self):
self._length = 1
return self._unpack("B", self._get_bytes(self._length))
def _parse_unsigned_int(self):
self._length = 4
return self._unpack("I", self._get_bytes(self._length))
def _parse_unsigned_long(self):
self._length = 8
return self._unpack("Q", self._get_bytes(self._length))
def _parse_double(self):
self._length = 8
return self._unpack("d", self._get_bytes(self._length))
def _parse_string(self):
# the string is of unknown length but ends at the first instance of \x00\x00
stop = self._body.index("\x00\x00")
self._length = stop
return self._body[:stop - 1].decode("utf16").encode("utf8")
def _parse_char_array(self):
array_length = self._unpack("Q", self._get_bytes(8))
self._length = array_length + 8
return array.array("B", self._body[8:array_length])
def _parse_metadata_item(self):
count, length = struct.unpack("<IQ", self._get_bytes(12))
metadata_set = MetadataSet(self._body, 0, count)
def _unpack(self, kind, data):
"""
:param kind: the datatype to interpret the bytes as (see: https://docs.python.org/2/library/struct.html#struct-format-strings)
:type kind: str
:param data: the bytes to be converted
:type data: bytes
Parses a sequence of bytes and converts them to a Python data type.
struct.unpack() returns a tuple but we only want the first element.
return self._images.get(channel).get(z_level)
def add(self, image):
"""
return struct.unpack(kind, data)[0]
class MetadataSet(object):
"""
A container of metadata items. Can contain other MetadataSet objects.
"""
def __init__(self, data, start, item_count):
self._items = []
self._parse(data, start, item_count)
def _parse(self, data, start, item_count):
for item in range(item_count):
metadata_item = MetadataItem(start, data)
if not metadata_item.is_valid:
break
start += metadata_item.length
Stores an image.
:type image: nd2reader.model.Image()
class Chunkmap(object):
def __init__(self):
pass
def read(self, filename):
with open(filename, "rb") as f:
data = f.read(-1)
self.parse(data, 1)
def parse(self, data, count):
data = BytesIO(data)
res = {}
total_count = 0
for c in range(count):
lastpos = data.tell()
total_count += 1
hdr = data.read(2)
if not hdr:
break
typ = ord(hdr[0])
bname = data.read(2*ord(hdr[1]))
name = bname.decode("utf16")[:-1].encode("utf8")
if typ == 1:
value, = struct.unpack("B", data.read(1))
elif typ in [2, 3]:
value, = struct.unpack("I", data.read(4))
elif typ == 5:
value, = struct.unpack("Q", data.read(8))
elif typ == 6:
value, = struct.unpack("d", data.read(8))
elif typ == 8:
value = data.read(2)
while value[-2:] != "\x00\x00":
value += data.read(2)
value = value.decode("utf16")[:-1].encode("utf8")
elif typ == 9:
cnt, = struct.unpack("Q", data.read(8))
value = array.array("B", data.read(cnt))
elif typ == 11:
curpos = data.tell()
newcount, length = struct.unpack("<IQ", data.read(12))
curpos = data.tell()
length -= data.tell()-lastpos
nextdata = data.read(length)
value = self.parse(nextdata, newcount)
# Skip some offsets
data.read(newcount * 8)
else:
assert 0, "%s hdr %x:%x unknown" % (name, ord(hdr[0]), ord(hdr[1]))
if not name in res:
res[name] = value
else:
if not isinstance(res[name], list):
res[name] = [res[name]]
res[name].append(value)
x = data.read()
assert not x, "skip %d %s" % (len(x), repr(x[:30]))
return res
"""
self._images[image.channel][image.z_level] = image

+ 378
- 0
nd2reader/parser.py View File

@ -0,0 +1,378 @@
# -*- coding: utf-8 -*-
import array
from collections import namedtuple
from datetime import datetime
import numpy as np
import re
import struct
from StringIO import StringIO
field_of_view = namedtuple('FOV', ['number', 'x', 'y', 'z', 'pfs_offset'])
class Nd2Parser(object):
"""
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
You should not ever need to instantiate this class manually unless you're a developer.
"""
CHUNK_HEADER = 0xabeceda
CHUNK_MAP_START = "ND2 FILEMAP SIGNATURE NAME 0001!"
CHUNK_MAP_END = "ND2 CHUNK MAP SIGNATURE 0000001!"
def __init__(self, filename):
self._filename = filename
self._fh = None
self._chunk_map_start_location = None
self._cursor_position = 0
self._dimension_text = None
self._label_map = {}
self.metadata = {}
self._read_map()
self._parse_metadata()
@property
def _file_handle(self):
if self._fh is None:
self._fh = open(self._filename, "rb")
return self._fh
def _get_raw_image_data(self, image_group_number, channel_offset):
"""
Reads the raw bytes and the timestamp of an image.
:param image_group_number: groups are made of images with the same time index, field of view and z-level.
:type image_group_number: int
:param channel_offset: the offset in the array where the bytes for this image are found.
:type channel_offset: int
:return: (int, array.array()) or None
"""
chunk = self._label_map["ImageDataSeq|%d!" % image_group_number]
data = self._read_chunk(chunk)
# All images in the same image group share the same timestamp! So if you have complicated image data,
# your timestamps may not be entirely accurate. Practically speaking though, they'll only be off by a few
# seconds unless you're doing something super weird.
timestamp = struct.unpack("d", data[:8])[0]
image_group_data = array.array("H", data)
image_data_start = 4 + channel_offset
# The images for the various channels are interleaved within the same array. For example, the second image
# of a four image group will be composed of bytes 2, 6, 10, etc. If you understand why someone would design
# a data structure that way, please send the author of this library a message.
image_data = image_group_data[image_data_start::self._channel_count]
# Skip images that are all zeros! This is important, since NIS Elements creates blank "gap" images if you
# don't have the same number of images each cycle. We discovered this because we only took GFP images every
# other cycle to reduce phototoxicity, but NIS Elements still allocated memory as if we were going to take
# them every cycle.
if np.any(image_data):
return timestamp, image_data
return None
@property
def _dimensions(self):
"""
While there are metadata values that represent a lot of what we want to capture, they seem to be unreliable.
Sometimes certain elements don't exist, or change their data type randomly. However, the human-readable text
is always there and in the same exact format, so we just parse that instead.
:rtype: str
"""
if self._dimension_text is None:
for line in self.metadata['ImageTextInfo']['SLxImageTextInfo'].values():
if "Dimensions:" in line:
metadata = line
break
else:
raise ValueError("Could not parse metadata dimensions!")
for line in metadata.split("\r\n"):
if line.startswith("Dimensions:"):
self._dimension_text = line
break
else:
raise ValueError("Could not parse metadata dimensions!")
return self._dimension_text
@property
def _channels(self):
"""
These are labels created by the NIS Elements user. Typically they may a short description of the filter cube
used (e.g. "bright field", "GFP", etc.)
:rtype: str
"""
metadata = self.metadata['ImageMetadataSeq']['SLxPictureMetadata']['sPicturePlanes']
try:
validity = self.metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx'][''][0]['ppNextLevelEx'][''][0]['pItemValid']
except KeyError:
# If none of the channels have been deleted, there is no validity list, so we just make one
validity = [True for _ in metadata]
# Channel information is contained in dictionaries with the keys a0, a1...an where the number
# indicates the order in which the channel is stored. So by sorting the dicts alphabetically
# we get the correct order.
for (label, chan), valid in zip(sorted(metadata['sPlaneNew'].items()), validity):
if not valid:
continue
yield chan['sDescription']
def _calculate_image_group_number(self, time_index, fov, z_level):
"""
Images are grouped together if they share the same time index, field of view, and z-level.
:type time_index: int
:type fov: int
:type z_level: int
:rtype: int
"""
return time_index * self._field_of_view_count * self._z_level_count + (fov * self._z_level_count + z_level)
@property
def _channel_offset(self):
"""
Image data is interleaved for each image set. That is, if there are four images in a set, the first image
will consist of pixels 1, 5, 9, etc, the second will be pixels 2, 6, 10, and so forth.
:rtype: dict
"""
channel_offset = {}
for n, channel in enumerate(self._channels):
channel_offset[channel] = n
return channel_offset
@property
def _absolute_start(self):
"""
The date and time when acquisition began.
:rtype: datetime.datetime()
"""
for line in self.metadata['ImageTextInfo']['SLxImageTextInfo'].values():
absolute_start_12 = None
absolute_start_24 = None
# ND2s seem to randomly switch between 12- and 24-hour representations.
try:
absolute_start_24 = datetime.strptime(line, "%m/%d/%Y %H:%M:%S")
except ValueError:
pass
try:
absolute_start_12 = datetime.strptime(line, "%m/%d/%Y %I:%M:%S %p")
except ValueError:
pass
if not absolute_start_12 and not absolute_start_24:
continue
return absolute_start_12 if absolute_start_12 else absolute_start_24
raise ValueError("This ND2 has no recorded start time. This is probably a bug.")
@property
def _channel_count(self):
"""
The number of different channels used, including bright field.
:rtype: int
"""
pattern = r""".*?λ\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
@property
def _field_of_view_count(self):
"""
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
:rtype: int
"""
pattern = r""".*?XY\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
@property
def _time_index_count(self):
"""
The number of cycles.
:rtype: int
"""
pattern = r""".*?T'\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
@property
def _z_level_count(self):
"""
The number of different levels in the Z-plane.
:rtype: int
"""
pattern = r""".*?Z\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
@property
def _image_count(self):
"""
The total number of images in the ND2. Warning: this may be inaccurate as it includes "gap" images.
:rtype: int
"""
return self.metadata['ImageAttributes']['SLxImageAttributes']['uiSequenceCount']
def _parse_metadata(self):
"""
Reads all metadata.
"""
for label in self._label_map.keys():
if label.endswith("LV!") or "LV|" in label:
data = self._read_chunk(self._label_map[label])
stop = label.index("LV")
self.metadata[label[:stop]] = self._read_metadata(data, 1)
def _read_map(self):
"""
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
"""
self._file_handle.seek(-8, 2)
chunk_map_start_location = struct.unpack("Q", self._file_handle.read(8))[0]
self._file_handle.seek(chunk_map_start_location)
raw_text = self._file_handle.read(-1)
label_start = raw_text.index(Nd2Parser.CHUNK_MAP_START) + 32
while True:
data_start = raw_text.index("!", label_start) + 1
key = raw_text[label_start: data_start]
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
if key == Nd2Parser.CHUNK_MAP_END:
# We've reached the end of the chunk map
break
self._label_map[key] = location
label_start = data_start + 16
def _read_chunk(self, chunk_location):
"""
Gets the data for a given chunk pointer
"""
self._file_handle.seek(chunk_location)
# The chunk metadata is always 16 bytes long
chunk_metadata = self._file_handle.read(16)
header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
if header != Nd2Parser.CHUNK_HEADER:
raise ValueError("The ND2 file seems to be corrupted.")
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata.
self._file_handle.seek(chunk_location + 16 + relative_offset)
return self._file_handle.read(data_length)
def _parse_unsigned_char(self, data):
return struct.unpack("B", data.read(1))[0]
def _parse_unsigned_int(self, data):
return struct.unpack("I", data.read(4))[0]
def _parse_unsigned_long(self, data):
return struct.unpack("Q", data.read(8))[0]
def _parse_double(self, data):
return struct.unpack("d", data.read(8))[0]
def _parse_string(self, data):
value = data.read(2)
while not value.endswith("\x00\x00"):
# the string ends at the first instance of \x00\x00
value += data.read(2)
return value.decode("utf16")[:-1].encode("utf8")
def _parse_char_array(self, data):
array_length = struct.unpack("Q", data.read(8))[0]
return array.array("B", data.read(array_length))
def _parse_metadata_item(self, data):
"""
Reads hierarchical data, analogous to a Python dict.
"""
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - self._cursor_position
next_data_length = data.read(length)
value = self._read_metadata(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
return value
def _get_value(self, data, data_type):
"""
ND2s use various codes to indicate different data types, which we translate here.
"""
parser = {1: self._parse_unsigned_char,
2: self._parse_unsigned_int,
3: self._parse_unsigned_int,
5: self._parse_unsigned_long,
6: self._parse_double,
8: self._parse_string,
9: self._parse_char_array,
11: self._parse_metadata_item}
return parser[data_type](data)
def _read_metadata(self, data, count):
"""
Iterates over each element some section of the metadata and parses it.
"""
data = StringIO(data)
metadata = {}
for _ in xrange(count):
self._cursor_position = data.tell()
header = data.read(2)
if not header:
# We've reached the end of some hierarchy of data
break
data_type, name_length = map(ord, header)
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
value = self._get_value(data, data_type)
if name not in metadata.keys():
metadata[name] = value
else:
if not isinstance(metadata[name], list):
# We have encountered this key exactly once before. Since we're seeing it again, we know we
# need to convert it to a list before proceeding.
metadata[name] = [metadata[name]]
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
# the value to the already-existing list.
metadata[name].append(value)
return metadata

+ 0
- 439
nd2reader/service/__init__.py View File

@ -1,439 +0,0 @@
# -*- coding: utf-8 -*-
import array
import numpy as np
import struct
import re
from StringIO import StringIO
from collections import namedtuple
import logging
from nd2reader.model import Channel
from datetime import datetime
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
chunk = namedtuple('Chunk', ['location', 'length'])
field_of_view = namedtuple('FOV', ['number', 'x', 'y', 'z', 'pfs_offset'])
class BaseNd2(object):
def __init__(self, filename):
self._reader = Nd2Reader(filename)
self._channel_offset = None
@property
def height(self):
"""
:return: height of each image, in pixels
"""
return self._metadata['ImageAttributes']['SLxImageAttributes']['uiHeight']
@property
def width(self):
"""
:return: width of each image, in pixels
"""
return self._metadata['ImageAttributes']['SLxImageAttributes']['uiWidth']
@property
def absolute_start(self):
return self._reader.absolute_start
@property
def channels(self):
metadata = self._metadata['ImageMetadataSeq']['SLxPictureMetadata']['sPicturePlanes']
try:
validity = self._metadata['ImageMetadata']['SLxExperiment']['ppNextLevelEx'][''][0]['ppNextLevelEx'][''][0]['pItemValid']
except KeyError:
# If none of the channels have been deleted, there is no validity list, so we just make one
validity = [True for i in metadata]
# Channel information is contained in dictionaries with the keys a0, a1...an where the number
# indicates the order in which the channel is stored. So by sorting the dicts alphabetically
# we get the correct order.
for (label, chan), valid in zip(sorted(metadata['sPlaneNew'].items()), validity):
if not valid:
continue
name = chan['sDescription']
exposure_time = metadata['sSampleSetting'][label]['dExposureTime']
camera = metadata['sSampleSetting'][label]['pCameraSetting']['CameraUserName']
yield Channel(name, camera, exposure_time)
@property
def channel_names(self):
"""
A convenience method for getting an alphabetized list of channel names.
:return: list[str]
"""
for channel in sorted(self.channels, key=lambda x: x.name):
yield channel.name
@property
def _image_count(self):
return self._metadata['ImageAttributes']['SLxImageAttributes']['uiSequenceCount']
@property
def _sequence_count(self):
return self._metadata['ImageEvents']['RLxExperimentRecord']['uiCount']
@property
def time_index_count(self):
"""
The number of images for a given field of view, channel, and z_level combination.
Effectively the number of frames.
:rtype: int
"""
return self._reader.time_index_count
@property
def z_level_count(self):
return self._reader.z_level_count
@property
def field_of_view_count(self):
"""
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
"""
return self._reader.field_of_view_count
@property
def channel_count(self):
return self._reader.channel_count
@property
def channel_offset(self):
if self._channel_offset is None:
self._channel_offset = {}
for n, channel in enumerate(self.channels):
self._channel_offset[channel.name] = n
return self._channel_offset
@property
def _metadata(self):
return self._reader.metadata
def _calculate_image_set_number(self, time_index, fov, z_level):
return time_index * self.field_of_view_count * self.z_level_count + (fov * self.z_level_count + z_level)
class Nd2Reader(object):
"""
Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
"""
def __init__(self, filename):
self._absolute_start = None
self._filename = filename
self._file_handler = None
self._chunk_map_start_location = None
self._label_map = {}
self._metadata = {}
self._read_map()
self._parse_dict_data()
self.__dimensions = None
@property
def _dimensions(self):
if self.__dimensions is None:
# The particular slot that this data shows up in changes (seemingly) randomly
for line in self._metadata['ImageTextInfo']['SLxImageTextInfo'].values():
if "Dimensions:" in line:
metadata = line
break
else:
raise Exception("Could not parse metadata dimensions!")
for line in metadata.split("\r\n"):
if line.startswith("Dimensions:"):
self.__dimensions = line
break
return self.__dimensions
@property
def absolute_start(self):
if self._absolute_start is None:
for line in self._metadata['ImageTextInfo']['SLxImageTextInfo'].values():
absolute_start_12 = None
absolute_start_24 = None
# ND2s seem to randomly switch between 12- and 24-hour representations.
try:
absolute_start_24 = datetime.strptime(line, "%m/%d/%Y %H:%M:%S")
except ValueError:
pass
try:
absolute_start_12 = datetime.strptime(line, "%m/%d/%Y %I:%M:%S %p")
except ValueError:
pass
if not absolute_start_12 and not absolute_start_24:
continue
self._absolute_start = absolute_start_12 if absolute_start_12 else absolute_start_24
return self._absolute_start
@property
def fh(self):
if self._file_handler is None:
self._file_handler = open(self._filename, "rb")
return self._file_handler
@property
def time_index_count(self):
"""
The number of images for a given field of view, channel, and z_level combination.
Effectively the number of frames.
:rtype: int
"""
pattern = r""".*?T'\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
@property
def z_level_count(self):
pattern = r""".*?Z\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
@property
def field_of_view_count(self):
"""
The metadata contains information about fields of view, but it contains it even if some fields
of view were cropped. We can't find anything that states which fields of view are actually
in the image data, so we have to calculate it. There probably is something somewhere, since
NIS Elements can figure it out, but we haven't found it yet.
"""
pattern = r""".*?XY\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
@property
def channel_count(self):
pattern = r""".*?λ\((\d+)\).*?"""
try:
count = int(re.match(pattern, self._dimensions).group(1))
except AttributeError:
return 1
else:
return count
def get_raw_image_data(self, image_set_number, channel_offset):
chunk = self._label_map["ImageDataSeq|%d!" % image_set_number]
data = self._read_chunk(chunk.location)
timestamp = struct.unpack("d", data[:8])[0]
# The images for the various channels are interleaved within each other. Yes, this is an incredibly unintuitive and nonsensical way
# to store data.
image_data = array.array("H", data)
image_data_start = 4 + channel_offset
return timestamp, image_data[image_data_start::self.channel_count]
def _parse_dict_data(self):
# TODO: Don't like this name
for label in self._top_level_dict_labels:
chunk_location = self._label_map[label].location
data = self._read_chunk(chunk_location)
stop = label.index("LV")
self._metadata[label[:stop]] = self.read_lv_encoding(data, 1)
@property
def metadata(self):
return self._metadata
@property
def _top_level_dict_labels(self):
# TODO: I don't like this name either
for label in self._label_map.keys():
if label.endswith("LV!") or "LV|" in label:
yield label
def _read_map(self):
"""
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
"""
raw_text = self._get_raw_chunk_map_text()
label_start = self._find_first_label_offset(raw_text)
while True:
data_start = self._get_data_start(label_start, raw_text)
label, value = self._extract_map_key(label_start, data_start, raw_text)
if label == "ND2 CHUNK MAP SIGNATURE 0000001!":
# We've reached the end of the chunk map
break
self._label_map[label] = value
label_start = data_start + 16
@staticmethod
def _find_first_label_offset(raw_text):
"""
The chunk map starts with some number of (seemingly) useless bytes, followed
by "ND2 FILEMAP SIGNATURE NAME 0001!". We return the location of the first character after this sequence,
which is the actual beginning of the chunk map.
"""
return raw_text.index("ND2 FILEMAP SIGNATURE NAME 0001!") + 32
@staticmethod
def _get_data_start(label_start, raw_text):
"""
The data for a given label begins immediately after the first exclamation point
"""
return raw_text.index("!", label_start) + 1
@staticmethod
def _extract_map_key(label_start, data_start, raw_text):
"""
Chunk map entries are a string label of arbitrary length followed by 16 bytes of data, which represent
the byte offset from the beginning of the file where that data can be found.
"""
key = raw_text[label_start: data_start]
location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
return key, chunk(location=location, length=length)
@property
def chunk_map_start_location(self):
"""
The position in bytes from the beginning of the file where the chunk map begins.
The chunk map is a series of string labels followed by the position (in bytes) of the respective data.
"""
if self._chunk_map_start_location is None:
# Put the cursor 8 bytes before the end of the file
self.fh.seek(-8, 2)
# Read the last 8 bytes of the file
self._chunk_map_start_location = struct.unpack("Q", self.fh.read(8))[0]
return self._chunk_map_start_location
def _read_chunk(self, chunk_location):
"""
Gets the data for a given chunk pointer
"""
self.fh.seek(chunk_location)
chunk_data = self._read_chunk_metadata()
header, relative_offset, data_length = self._parse_chunk_metadata(chunk_data)
return self._read_chunk_data(chunk_location, relative_offset, data_length)
def _read_chunk_metadata(self):
"""
Gets the chunks metadata, which is always 16 bytes
"""
return self.fh.read(16)
def _read_chunk_data(self, chunk_location, relative_offset, data_length):
"""
Reads the actual data for a given chunk
"""
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata.
self.fh.seek(chunk_location + 16 + relative_offset)
return self.fh.read(data_length)
@staticmethod
def _parse_chunk_metadata(chunk_data):
"""
Finds out everything about a given chunk. Every chunk begins with the same value, so if that's ever
different we can assume the file has suffered some kind of damage.
"""
header, relative_offset, data_length = struct.unpack("IIQ", chunk_data)
if header != 0xabeceda:
raise ValueError("The ND2 file seems to be corrupted.")
return header, relative_offset, data_length
def _get_raw_chunk_map_text(self):
"""
Reads the entire chunk map and returns it as a string.
"""
self.fh.seek(self.chunk_map_start_location)
return self.fh.read(-1)
@staticmethod
def as_numpy_array(arr):
return np.frombuffer(arr)
def _z_level_count(self):
name = "CustomData|Z!"
st = self._read_chunk(self._label_map[name].location)
res = array.array("d", st)
return len(res)
def read_lv_encoding(self, data, count):
data = StringIO(data)
res = {}
total_count = 0
for c in range(count):
lastpos = data.tell()
total_count += 1
hdr = data.read(2)
if not hdr:
break
typ = ord(hdr[0])
bname = data.read(2*ord(hdr[1]))
name = bname.decode("utf16")[:-1].encode("utf8")
if typ == 1:
value, = struct.unpack("B", data.read(1))
elif typ in [2, 3]:
value, = struct.unpack("I", data.read(4))
elif typ == 5:
value, = struct.unpack("Q", data.read(8))
elif typ == 6:
value, = struct.unpack("d", data.read(8))
elif typ == 8:
value = data.read(2)
while value[-2:] != "\x00\x00":
value += data.read(2)
value = value.decode("utf16")[:-1].encode("utf8")
elif typ == 9:
cnt, = struct.unpack("Q", data.read(8))
value = array.array("B", data.read(cnt))
elif typ == 11:
newcount, length = struct.unpack("<IQ", data.read(12))
length -= data.tell()-lastpos
nextdata = data.read(length)
value = self.read_lv_encoding(nextdata, newcount)
# Skip some offsets
data.read(newcount * 8)
else:
assert 0, "%s hdr %x:%x unknown" % (name, ord(hdr[0]), ord(hdr[1]))
if not name in res:
res[name] = value
else:
if not isinstance(res[name], list):
res[name] = [res[name]]
res[name].append(value)
x = data.read()
assert not x, "skip %d %s" % (len(x), repr(x[:30]))
return res

+ 1
- 0
requirements.txt View File

@ -0,0 +1 @@
numpy>=1.9.2

+ 2
- 0
setup.cfg View File

@ -0,0 +1,2 @@
[metadata]
description-file = README.md

+ 17
- 6
setup.py View File

@ -1,10 +1,21 @@
from setuptools import setup, find_packages
from distutils.core import setup
setup(
name="nd2reader",
packages=find_packages(),
version="0.9.7",
install_requires=[
'numpy',
],
packages=['nd2reader', 'nd2reader.model'],
version="1.0.0",
description='A tool for reading ND2 files produced by NIS Elements',
author='Jim Rybarski',
author_email='jim@rybarski.com',
url='https://github.com/jimrybarski/nd2reader',
download_url='https://github.com/jimrybarski/nd2reader/tarball/1.0.0',
keywords=['nd2', 'nikon', 'microscopy', 'NIS Elements'],
classifiers=['Development Status :: 5 - Production/Stable',
'Intended Audience :: Science/Research',
'License :: Freely Distributable',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 2.7',
'Topic :: Scientific/Engineering',
]
)

+ 0
- 9
tests.py View File

@ -1,9 +0,0 @@
"""
Auto-discovers all unittests in the tests directory and runs them
"""
import unittest
loader = unittest.TestLoader()
tests = loader.discover('tests', pattern='*.py', top_level_dir='.')
testRunner = unittest.TextTestRunner()
testRunner.run(tests)

+ 0
- 0
tests/__init__.py View File


+ 0
- 20
tests/model/__init__.py View File

@ -1,20 +0,0 @@
import unittest
from nd2reader.model import MetadataSet, MetadataItem
class MetadataItemTests(unittest.TestCase):
def test_is_valid(self):
data = b'\x0b\x13S\x00L\x00x\x00P\x00i\x00c\x00t\x00u\x00r\x00e\x00M\x00e\x00t\x00a\x00d\x00a\x00t\x00a\x00\x00\x00!\x00\x00\x00\xd5]\x00\x00\x00\x00\x00\x00\x06\nd\x00T\x00i\x00m\x00e\x00M\x00S\x00e\x00c\x00\x00\x00\x00\xc0T\x1c\x9b#\xbb@\x06\x0ed\x00T\x00i\x00m\x00e\x00A\x00b\x00s\x00o\x00l\x00u\x00t\x00e\x00\x00\x00Sf\xf5\xa6\xa7\xbeBA\x02\x0ce\x00T\x00i\x00m\x00e\x00S\x00o\x00u\x00r\x00c\x00e\x00\x00\x00\x00\x00\x00\x00\x06\x06d\x00X\x00P\x00o\x00s\x00\x00\x00\x00\x00\x00\x00\x00$\x9d\xc0\x06\x06d\x00Y\x00P\x00o\x00s\x00\x00\x00\x00\x00\x00\x00\xe0\r\xe5@\x03\x06u\x00i\x00R\x00o\x00w\x00\x00\x00\x00\x00\x00\x00\x03\nu\x00i\x00C\x00o\x00n\x002\x000\x00(\x00L\x00\x00\x00\x00\x00\x00\x00\x06\x06d\x00Z\x00P\x00o\x00s\x00\x00\x00\x9a\x99\x99\x99Y\x8d\xb8@\x01\x0eb\x00Z\x00P\x00o\x00s\x00A\x00b\x00s\x00o\x00l\x00u\x00t\x00e\x00\x00\x00\x01\x06\x07d\x00A\x00n\x00g'
item = MetadataItem(0, data)
self.assertTrue(item.is_valid)
def test_key(self):
data = b'\x0b\x13S\x00L\x00x\x00P\x00i\x00c\x00t\x00u\x00r\x00e\x00M\x00e\x00t\x00a\x00d\x00a\x00t\x00a\x00\x00\x00!\x00\x00\x00\xd5]\x00\x00\x00\x00\x00\x00\x06\nd\x00T\x00i\x00m\x00e\x00M\x00S\x00e\x00c\x00\x00\x00\x00\xc0T\x1c\x9b#\xbb@\x06\x0ed\x00T\x00i\x00m\x00e\x00A\x00b\x00s\x00o\x00l\x00u\x00t\x00e\x00\x00\x00Sf\xf5\xa6\xa7\xbeBA\x02\x0ce\x00T\x00i\x00m\x00e\x00S\x00o\x00u\x00r\x00c\x00e\x00\x00\x00\x00\x00\x00\x00\x06\x06d\x00X\x00P\x00o\x00s\x00\x00\x00\x00\x00\x00\x00\x00$\x9d\xc0\x06\x06d\x00Y\x00P\x00o\x00s\x00\x00\x00\x00\x00\x00\x00\xe0\r\xe5@\x03\x06u\x00i\x00R\x00o\x00w\x00\x00\x00\x00\x00\x00\x00\x03\nu\x00i\x00C\x00o\x00n\x002\x000\x00(\x00L\x00\x00\x00\x00\x00\x00\x00\x06\x06d\x00Z\x00P\x00o\x00s\x00\x00\x00\x9a\x99\x99\x99Y\x8d\xb8@\x01\x0eb\x00Z\x00P\x00o\x00s\x00A\x00b\x00s\x00o\x00l\x00u\x00t\x00e\x00\x00\x00\x01\x06\x07d\x00A\x00n\x00g'
item = MetadataItem(0, data)
self.assertEqual(item.key, "SLxPictureMetadata")
def test_parse_double(self):
data = b'\x06\nd\x00T\x00i\x00m\x00e\x00M\x00S\x00e\x00c\x00\x00\x00\x00\xc0T\x1c\x9b#\xbb@\x06\x0e'
item = MetadataItem(0, data)
self.assertEqual(item.value, 6947.605901047587)

+ 0
- 0
tests/service/__init__.py View File


Loading…
Cancel
Save