Browse Source

Fixed complexity

zolfa-add_slices_loading
Gabriele Girelli 4 years ago
committed by GitHub
parent
commit
9d40a12f96
1 changed files with 243 additions and 135 deletions
  1. +243
    -135
      nd2reader/reader.py

+ 243
- 135
nd2reader/reader.py View File

@ -1,207 +1,315 @@
from pims import Frame
from pims.base_frames import FramesSequenceND
# -*- coding: utf-8 -*-
import struct
from nd2reader.exceptions import EmptyFileError, InvalidFileType
from nd2reader.parser import Parser
import array
import six
import warnings
from pims.base_frames import Frame
import numpy as np
from nd2reader.common import get_version, read_chunk
from nd2reader.exceptions import InvalidVersionError
from nd2reader.label_map import LabelMap
from nd2reader.raw_metadata import RawMetadata
class ND2Reader(FramesSequenceND):
"""PIMS wrapper for the ND2 parser.
This is the main class: use this to process your .nd2 files.
"""
class_priority = 12
class Parser(object):
"""Parses ND2 files and creates a Metadata and driver object.
def __init__(self, filename):
super(ND2Reader, self).__init__()
"""
CHUNK_HEADER = 0xabeceda
CHUNK_MAP_START = six.b("ND2 FILEMAP SIGNATURE NAME 0001!")
CHUNK_MAP_END = six.b("ND2 CHUNK MAP SIGNATURE 0000001!")
if not filename.endswith(".nd2"):
raise InvalidFileType("The file %s you want to read with nd2reader does not have extension .nd2." % filename)
supported_file_versions = {(3, None): True}
self.filename = filename
def __init__(self, fh):
self._fh = fh
self._label_map = None
self._raw_metadata = None
self.metadata = None
# first use the parser to parse the file
self._fh = open(filename, "rb")
self._parser = Parser(self._fh)
# First check the file version
self.supported = self._check_version_supported()
# Setup metadata
self.metadata = self._parser.metadata
# Parse the metadata
self._parse_metadata()
# Set data type
self._dtype = self._parser.get_dtype_from_metadata()
def calculate_image_properties(self, index):
"""Calculate FOV, channels and z_levels
# Setup the axes
self._setup_axes()
Args:
index(int): the index (which is simply the order in which the image was acquired)
# Other properties
self._timesteps = None
Returns:
tuple: tuple of the field of view, the channel and the z level
@classmethod
def class_exts(cls):
"""Let PIMS open function use this reader for opening .nd2 files
"""
field_of_view = self._calculate_field_of_view(index)
channel = self._calculate_channel(index)
z_level = self._calculate_z_level(index)
return field_of_view, channel, z_level
def get_image(self, index):
"""
return {'nd2'} | super(ND2Reader, cls).class_exts()
Creates an Image object and adds its metadata, based on the index (which is simply the order in which the image
was acquired). May return None if the ND2 contains multiple channels and not all were taken in each cycle (for
example, if you take bright field images every minute, and GFP images every five minutes, there will be some
indexes that do not contain an image. The reason for this is complicated, but suffice it to say that we hope to
eliminate this possibility in future releases. For now, you'll need to check if your image is None if you're
doing anything out of the ordinary.
def close(self):
"""Correctly close the file handle
Args:
index(int): the index (which is simply the order in which the image was acquired)
"""
if self._fh is not None:
self._fh.close()
Returns:
Frame: the image
def _get_default(self, coord):
"""
field_of_view, channel, z_level = self.calculate_image_properties(index)
channel_offset = index % len(self.metadata["channels"])
image_group_number = int(index / len(self.metadata["channels"]))
frame_number = self._calculate_frame_number(image_group_number, field_of_view, z_level)
try:
return self.default_coords[coord]
except KeyError:
return 0
timestamp, image = self._get_raw_image_data(image_group_number, channel_offset, self.metadata["height"],
self.metadata["width"])
except (TypeError):
return Frame([], frame_no=frame_number, metadata=self._get_frame_metadata())
else:
return Frame(image, frame_no=frame_number, metadata=self._get_frame_metadata())
def get_image_by_attributes(self, frame_number, field_of_view, channel, z_level, height, width):
"""Gets an image based on its attributes alone
def get_frame_2D(self, c=0, t=0, z=0, x=0, y=0, v=0):
"""Gets a given frame using the parser
Args:
x: The x-index (pims expects this)
y: The y-index (pims expects this)
c: The color channel number
t: The frame number
z: The z stack number
v: The field of view index
frame_number: the frame number
field_of_view: the field of view
channel_name: the color channel name
z_level: the z level
height: the height of the image
width: the width of the image
Returns:
pims.Frame: The requested frame
Frame: the requested image
"""
# This needs to be set to width/height to return an image
x = self.metadata["width"]
y = self.metadata["height"]
frame_number = 0 if frame_number is None else frame_number
field_of_view = 0 if field_of_view is None else field_of_view
channel = 0 if channel is None else channel
z_level = 0 if z_level is None else z_level
return self._parser.get_image_by_attributes(t, v, c, z, y, x)
image_group_number = self._calculate_image_group_number(frame_number, field_of_view, z_level)
try:
timestamp, raw_image_data = self._get_raw_image_data(image_group_number, channel,
height, width)
except (TypeError):
return Frame([], frame_no=frame_number, metadata=self._get_frame_metadata())
else:
return Frame(raw_image_data, frame_no=frame_number, metadata=self._get_frame_metadata())
@staticmethod
def get_dtype_from_metadata():
"""Determine the data type from the metadata.
For now, always use float64 to prevent unexpected overflow errors when manipulating the data (calculating sums/
means/etc.)
@property
def parser(self):
"""
Returns the parser object.
Returns:
Parser: the parser object
"""
return self._parser
return np.float64
@property
def pixel_type(self):
"""Return the pixel data type
def _check_version_supported(self):
"""Checks if the ND2 file version is supported by this reader.
Returns:
dtype: the pixel data type
bool: True on supported
"""
return self._dtype
major_version, minor_version = get_version(self._fh)
supported = self.supported_file_versions.get(
(major_version, minor_version)) or self.supported_file_versions.get((major_version, None))
@property
def timesteps(self):
"""Get the timesteps of the experiment
if not supported:
print("Warning: No parser is available for your current ND2 version (%d.%d). " % (
major_version, minor_version) + "This might lead to unexpected behaviour.")
Returns:
np.ndarray: an array of times in milliseconds.
return supported
def _parse_metadata(self):
"""Reads all metadata and instantiates the Metadata object.
"""
if self._timesteps is None:
return self.get_timesteps()
return self._timesteps
# Retrieve raw metadata from the label mapping
self._label_map = self._build_label_map()
self._raw_metadata = RawMetadata(self._fh, self._label_map)
self.metadata = self._raw_metadata.__dict__
self.acquisition_times = self._raw_metadata.acquisition_times
@property
def events(self):
"""Get the events of the experiment
def _build_label_map(self):
"""
Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
Returns:
iterator of events as dict
LabelMap: the computed label map
"""
# go 8 bytes back from file end
self._fh.seek(-8, 2)
chunk_map_start_location = struct.unpack("Q", self._fh.read(8))[0]
self._fh.seek(chunk_map_start_location)
raw_text = self._fh.read(-1)
return LabelMap(raw_text)
return self._get_metadata_property("events")
def _calculate_field_of_view(self, index):
"""Determines what field of view was being imaged for a given image.
Args:
index(int): the index (which is simply the order in which the image was acquired)
@property
def frame_rate(self):
"""The (average) frame rate
Returns:
float: the (average) frame rate in frames per second
int: the field of view
"""
total_duration = 0.0
images_per_cycle = len(self.metadata["z_levels"]) * len(self.metadata["channels"])
return int((index - (index % images_per_cycle)) / images_per_cycle) % len(self.metadata["fields_of_view"])
for loop in self.metadata['experiment']['loops']:
total_duration += loop['duration']
def _calculate_channel(self, index):
"""Determines what channel a particular image is.
if total_duration == 0:
total_duration = self.timesteps[-1]
Args:
index(int): the index (which is simply the order in which the image was acquired)
if total_duration == 0:
raise ValueError('Total measurement duration could not be determined from loops')
Returns:
string: the name of the color channel
return self.metadata['num_frames'] / (total_duration/1000.0)
"""
return self.metadata["channels"][index % len(self.metadata["channels"])]
def _get_metadata_property(self, key, default=None):
if self.metadata is None:
return default
def _calculate_z_level(self, index):
"""Determines the plane in the z-axis a given image was taken in.
if key not in self.metadata:
return default
In the future, this will be replaced with the actual offset in micrometers.
if self.metadata[key] is None:
return default
Args:
index(int): the index (which is simply the order in which the image was acquired)
return self.metadata[key]
Returns:
The z level
def _setup_axes(self):
"""Setup the xyctz axes, iterate over t axis by default
"""
return self.metadata["z_levels"][int(
((index - (index % len(self.metadata["channels"]))) / len(self.metadata["channels"])) % len(
self.metadata["z_levels"]))]
def _calculate_image_group_number(self, frame_number, fov, z_level):
"""
self._init_axis_if_exists('x', self._get_metadata_property("width", default=0))
self._init_axis_if_exists('y', self._get_metadata_property("height", default=0))
self._init_axis_if_exists('c', len(self._get_metadata_property("channels", default=[])), min_size=2)
self._init_axis_if_exists('t', len(self._get_metadata_property("frames", default=[])))
self._init_axis_if_exists('z', len(self._get_metadata_property("z_levels", default=[])), min_size=2)
self._init_axis_if_exists('v', len(self._get_metadata_property("fields_of_view", default=[])), min_size=2)
Images are grouped together if they share the same time index, field of view, and z-level.
if len(self.sizes) == 0:
raise EmptyFileError("No axes were found for this .nd2 file.")
Args:
frame_number: the time index
fov: the field of view number
z_level: the z level number
# provide the default
self.iter_axes = self._guess_default_iter_axis()
Returns:
int: the image group number
self._register_get_frame(self.get_frame_2D, 'yx')
"""
z_length = len(self.metadata['z_levels'])
z_length = z_length if z_length > 0 else 1
fields_of_view = len(self.metadata["fields_of_view"])
fields_of_view = fields_of_view if fields_of_view > 0 else 1
def _init_axis_if_exists(self, axis, size, min_size=1):
if size >= min_size:
self._init_axis(axis, size)
return frame_number * fields_of_view * z_length + (fov * z_length + z_level)
def _guess_default_iter_axis(self):
def _calculate_frame_number(self, image_group_number, field_of_view, z_level):
"""
Guesses the default axis to iterate over based on axis sizes.
Images are in the same frame if they share the same group number and field of view and are taken sequentially.
Args:
image_group_number: the image group number (see _calculate_image_group_number)
field_of_view: the field of view number
z_level: the z level number
Returns:
the axis to iterate over
"""
priority = ['t', 'z', 'c', 'v']
found_axes = []
for axis in priority:
try:
current_size = self.sizes[axis]
except KeyError:
continue
return (image_group_number - (field_of_view * len(self.metadata["z_levels"]) + z_level)) / (
len(self.metadata["fields_of_view"]) * len(self.metadata["z_levels"]))
if current_size > 1:
return axis
@property
def _channel_offset(self):
"""
Image data is interleaved for each image set. That is, if there are four images in a set, the first image
will consist of pixels 1, 5, 9, etc, the second will be pixels 2, 6, 10, and so forth.
found_axes.append(axis)
Returns:
dict: the channel offset for each channel
return found_axes[0]
"""
return {channel: n for n, channel in enumerate(self.metadata["channels"])}
def _remove_unwanted_bytes(self, image_group_data, image_data_start, height, width):
# Remove unwanted 0-bytes that can appear in stitched images
number_of_true_channels = int(len(image_group_data[4:]) / (height * width))
unwanted_bytes_len = (len(image_group_data[image_data_start:]))%(height*width)
if unwanted_bytes_len:
warnings.warn('Identified unwanted bytes in the ND2 file, possibly stitched.')
byte_ids = range(image_data_start+height*number_of_true_channels, len(image_group_data)-unwanted_bytes_len+1, height*number_of_true_channels)
if all([0 == image_group_data[byte_ids[i]+i] for i in range(len(byte_ids))]):
warnings.warn('All unwanted bytes are zero-bytes, correctly removed.')
for i in range(len(byte_ids)):
del image_group_data[byte_ids[i]]
def _get_raw_image_data(self, image_group_number, channel_offset, height, width):
"""Reads the raw bytes and the timestamp of an image.
def get_timesteps(self):
"""Get the timesteps of the experiment
Args:
image_group_number: the image group number (see _calculate_image_group_number)
channel_offset: the number of the color channel
height: the height of the image
width: the width of the image
Returns:
np.ndarray: an array of times in milliseconds.
"""
if self._timesteps is not None and len(self._timesteps) > 0:
return self._timesteps
chunk = self._label_map.get_image_data_location(image_group_number)
data = read_chunk(self._fh, chunk)
# All images in the same image group share the same timestamp! So if you have complicated image data,
# your timestamps may not be entirely accurate. Practically speaking though, they'll only be off by a few
# seconds unless you're doing something super weird.
timestamp = struct.unpack("d", data[:8])[0]
image_group_data = array.array("H", data)
image_data_start = 4 + channel_offset
# The images for the various channels are interleaved within the same array. For example, the second image
# of a four image group will be composed of bytes 2, 6, 10, etc. If you understand why someone would design
# a data structure that way, please send the author of this library a message.
number_of_true_channels = int(len(image_group_data[4:]) / (height * width))
self._remove_unwanted_bytes(image_group_data, image_data_start, height, width)
try:
image_data = np.reshape(image_group_data[image_data_start::number_of_true_channels], (height, width))
except ValueError:
image_data = np.reshape(image_group_data[image_data_start::number_of_true_channels], (height, int(round(len(image_group_data[image_data_start::number_of_true_channels])/height))))
# Skip images that are all zeros! This is important, since NIS Elements creates blank "gap" images if you
# don't have the same number of images each cycle. We discovered this because we only took GFP images every
# other cycle to reduce phototoxicity, but NIS Elements still allocated memory as if we were going to take
# them every cycle.
if np.any(image_data):
return timestamp, image_data
# If a blank "gap" image is encountered, generate an array of corresponding height and width to avoid
# errors with ND2-files with missing frames. Array is filled with nan to reflect that data is missing.
else:
empty_frame = np.full((height, width), np.nan)
warnings.warn('ND2 file contains gap frames which are represented by np.nan-filled arrays; to convert to zeros use e.g. np.nan_to_num(array)')
return timestamp, image_data
def _get_frame_metadata(self):
"""Get the metadata for one frame
self._timesteps = np.array(list(self._parser._raw_metadata.acquisition_times), dtype=np.float) * 1000.0
Returns:
dict: a dictionary containing the parsed metadata
return self._timesteps
"""
return self.metadata

Loading…
Cancel
Save