You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

353 lines
8.9 KiB

import os
import struct
import array
from datetime import datetime
import six
import re
from nd2reader.exceptions import InvalidVersionError
def get_version(fh):
"""Determines what version the ND2 is.
Args:
fh: File handle of the .nd2 file
Returns:
tuple: Major and minor version
"""
# the first 16 bytes seem to have no meaning, so we skip them
fh.seek(16)
# the next 38 bytes contain the string that we want to parse. Unlike most of the ND2, this is in UTF-8
data = fh.read(38).decode("utf8")
return parse_version(data)
def parse_version(data):
"""Parses a string with the version data in it.
Args:
data (unicode): the 19th through 54th byte of the ND2, representing the version
Returns:
tuple: Major and minor version
"""
match = re.search(r"""^ND2 FILE SIGNATURE CHUNK NAME01!Ver(?P<major>\d)\.(?P<minor>\d)$""", data)
if match:
# We haven't seen a lot of ND2s but the ones we have seen conform to this
return int(match.group('major')), int(match.group('minor'))
raise InvalidVersionError("The version of the ND2 you specified is not supported.")
def read_chunk(fh, chunk_location):
"""Reads a piece of data given the location of its pointer.
Args:
fh: an open file handle to the ND2
chunk_location (int): location to read
Returns:
bytes: the data at the chunk location
"""
if chunk_location is None or fh is None:
return None
fh.seek(chunk_location)
# The chunk metadata is always 16 bytes long
chunk_metadata = fh.read(16)
header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
if header != 0xabeceda:
raise ValueError("The ND2 file seems to be corrupted.")
# We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
# start of the actual data field, which is at some arbitrary place after the metadata.
fh.seek(chunk_location + 16 + relative_offset)
return fh.read(data_length)
def read_array(fh, kind, chunk_location):
"""
Args:
fh: File handle of the nd2 file
kind: data type, can be one of 'double', 'int' or 'float'
chunk_location: the location of the array chunk in the binary nd2 file
Returns:
array.array: an array of the data
"""
kinds = {'double': 'd',
'int': 'i',
'float': 'f'}
if kind not in kinds:
raise ValueError('You attempted to read an array of an unknown type.')
raw_data = read_chunk(fh, chunk_location)
if raw_data is None:
return None
return array.array(kinds[kind], raw_data)
def _parse_unsigned_char(data):
"""
Args:
data: binary data
Returns:
char: the data converted to unsigned char
"""
return struct.unpack("B", data.read(1))[0]
def _parse_unsigned_int(data):
"""
Args:
data: binary data
Returns:
int: the data converted to unsigned int
"""
return struct.unpack("I", data.read(4))[0]
def _parse_unsigned_long(data):
"""
Args:
data: binary data
Returns:
long: the data converted to unsigned long
"""
return struct.unpack("Q", data.read(8))[0]
def _parse_double(data):
"""
Args:
data: binary data
Returns:
double: the data converted to double
"""
return struct.unpack("d", data.read(8))[0]
def _parse_string(data):
"""
Args:
data: binary data
Returns:
string: the data converted to string
"""
value = data.read(2)
# the string ends at the first instance of \x00\x00
while not value.endswith(six.b("\x00\x00")):
next_data = data.read(2)
if len(next_data) == 0:
break
value += next_data
try:
decoded = value.decode("utf16")[:-1].encode("utf8")
except UnicodeDecodeError:
decoded = value.decode('utf8').encode("utf8")
return decoded
def _parse_char_array(data):
"""
Args:
data: binary data
Returns:
array.array: the data converted to an array
"""
array_length = struct.unpack("Q", data.read(8))[0]
return array.array("B", data.read(array_length))
def parse_date(text_info):
"""
The date and time when acquisition began.
Args:
text_info: the text that contains the date and time information
Returns:
datetime: the date and time of the acquisition
"""
for line in text_info.values():
line = line.decode("utf8")
# ND2s seem to randomly switch between 12- and 24-hour representations.
possible_formats = ["%m/%d/%Y %H:%M:%S", "%m/%d/%Y %I:%M:%S %p", "%d/%m/%Y %H:%M:%S"]
for date_format in possible_formats:
try:
absolute_start = datetime.strptime(line, date_format)
except (TypeError, ValueError):
continue
return absolute_start
return None
def _parse_metadata_item(data, cursor_position):
"""Reads hierarchical data, analogous to a Python dict.
Args:
data: the binary data that needs to be parsed
cursor_position: the position in the binary nd2 file
Returns:
dict: a dictionary containing the metadata item
"""
new_count, length = struct.unpack("<IQ", data.read(12))
length -= data.tell() - cursor_position
next_data_length = data.read(length)
value = read_metadata(next_data_length, new_count)
# Skip some offsets
data.read(new_count * 8)
return value
def _get_value(data, data_type, cursor_position):
"""ND2s use various codes to indicate different data types, which we translate here.
Args:
data: the binary data
data_type: the data type (unsigned char = 1, unsigned int = 2 or 3, unsigned long = 5, double = 6, string = 8,
char array = 9, metadata item = 11)
cursor_position: the cursor position in the binary nd2 file
Returns:
mixed: the parsed value
"""
parser = {1: _parse_unsigned_char,
2: _parse_unsigned_int,
3: _parse_unsigned_int,
5: _parse_unsigned_long,
6: _parse_double,
8: _parse_string,
9: _parse_char_array,
11: _parse_metadata_item}
try:
value = parser[data_type](data) if data_type < 11 else parser[data_type](data, cursor_position)
except (KeyError, struct.error):
value = None
return value
def read_metadata(data, count):
"""
Iterates over each element of some section of the metadata and parses it.
Args:
data: the metadata in binary form
count: the number of metadata elements
Returns:
dict: a dictionary containing the parsed metadata
"""
if data is None:
return None
data = six.BytesIO(data)
metadata = {}
for _ in range(count):
cursor_position = data.tell()
header = data.read(2)
if not header:
# We've reached the end of some hierarchy of data
break
data_type, name_length = struct.unpack('BB', header)
name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
value = _get_value(data, data_type, cursor_position)
metadata = _add_to_metadata(metadata, name, value)
return metadata
def _add_to_metadata(metadata, name, value):
"""
Add the name value pair to the metadata dict
Args:
metadata (dict): a dictionary containing the metadata
name (string): the dictionary key
value: the value to add
Returns:
dict: the new metadata dictionary
"""
if name not in metadata.keys():
metadata[name] = value
else:
if not isinstance(metadata[name], list):
# We have encountered this key exactly once before. Since we're seeing it again, we know we
# need to convert it to a list before proceeding.
metadata[name] = [metadata[name]]
# We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
# the value to the already-existing list.
metadata[name].append(value)
return metadata
def get_from_dict_if_exists(key, dictionary, convert_key_to_binary=True):
"""
Get the entry from the dictionary if it exists
Args:
key: key to lookup
dictionary: dictionary to look in
convert_key_to_binary: convert the key from string to binary if true
Returns:
the value of dictionary[key] or None
"""
if convert_key_to_binary:
key = six.b(key)
if key not in dictionary:
return None
return dictionary[key]
def check_or_make_dir(directory):
"""
Check if a directory exists, if not, create it
Args:
directory: the path to the directory
"""
if not os.path.exists(directory):
os.makedirs(directory)