You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
7.1 KiB

  1. # -*- coding: utf-8 -*-
  2. import array
  3. from collections import namedtuple
  4. import struct
  5. from StringIO import StringIO
  6. field_of_view = namedtuple('FOV', ['number', 'x', 'y', 'z', 'pfs_offset'])
  7. class Nd2Parser(object):
  8. CHUNK_HEADER = 0xabeceda
  9. CHUNK_MAP_START = "ND2 FILEMAP SIGNATURE NAME 0001!"
  10. CHUNK_MAP_END = "ND2 CHUNK MAP SIGNATURE 0000001!"
  11. """
  12. Reads .nd2 files, provides an interface to the metadata, and generates numpy arrays from the image data.
  13. """
  14. def __init__(self, filename):
  15. self._absolute_start = None
  16. self._filename = filename
  17. self._fh = None
  18. self._chunk_map_start_location = None
  19. self._cursor_position = None
  20. self._dimension_text = None
  21. self._label_map = {}
  22. self.metadata = {}
  23. self._read_map()
  24. self._parse_metadata()
  25. @property
  26. def _file_handle(self):
  27. if self._fh is None:
  28. self._fh = open(self._filename, "rb")
  29. return self._fh
  30. @property
  31. def _dimensions(self):
  32. if self._dimension_text is None:
  33. for line in self.metadata['ImageTextInfo']['SLxImageTextInfo'].values():
  34. if "Dimensions:" in line:
  35. metadata = line
  36. break
  37. else:
  38. raise ValueError("Could not parse metadata dimensions!")
  39. for line in metadata.split("\r\n"):
  40. if line.startswith("Dimensions:"):
  41. self._dimension_text = line
  42. break
  43. else:
  44. raise ValueError("Could not parse metadata dimensions!")
  45. return self._dimension_text
  46. @property
  47. def _image_count(self):
  48. return self.metadata['ImageAttributes']['SLxImageAttributes']['uiSequenceCount']
  49. @property
  50. def _sequence_count(self):
  51. return self.metadata['ImageEvents']['RLxExperimentRecord']['uiCount']
  52. def _parse_metadata(self):
  53. for label in self._label_map.keys():
  54. if not label.endswith("LV!") or "LV|" in label:
  55. continue
  56. data = self._read_chunk(self._label_map[label])
  57. stop = label.index("LV")
  58. self.metadata[label[:stop]] = self._read_metadata(data, 1)
  59. def _read_map(self):
  60. """
  61. Every label ends with an exclamation point, however, we can't directly search for those to find all the labels
  62. as some of the bytes contain the value 33, which is the ASCII code for "!". So we iteratively find each label,
  63. grab the subsequent data (always 16 bytes long), advance to the next label and repeat.
  64. """
  65. self._file_handle.seek(-8, 2)
  66. chunk_map_start_location = struct.unpack("Q", self._file_handle.read(8))[0]
  67. self._file_handle.seek(chunk_map_start_location)
  68. raw_text = self._file_handle.read(-1)
  69. label_start = raw_text.index(Nd2Parser.CHUNK_MAP_START) + 32
  70. while True:
  71. data_start = raw_text.index("!", label_start) + 1
  72. key = raw_text[label_start: data_start]
  73. location, length = struct.unpack("QQ", raw_text[data_start: data_start + 16])
  74. if key == Nd2Parser.CHUNK_MAP_END:
  75. # We've reached the end of the chunk map
  76. break
  77. self._label_map[key] = location
  78. label_start = data_start + 16
  79. def _read_chunk(self, chunk_location):
  80. """
  81. Gets the data for a given chunk pointer
  82. """
  83. self._file_handle.seek(chunk_location)
  84. # The chunk metadata is always 16 bytes long
  85. chunk_metadata = self._file_handle.read(16)
  86. header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
  87. if header != Nd2Parser.CHUNK_HEADER:
  88. raise ValueError("The ND2 file seems to be corrupted.")
  89. # We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
  90. # start of the actual data field, which is at some arbitrary place after the metadata.
  91. self._file_handle.seek(chunk_location + 16 + relative_offset)
  92. return self._file_handle.read(data_length)
  93. def _z_level_count(self):
  94. st = self._read_chunk(self._label_map["CustomData|Z!"])
  95. return len(array.array("d", st))
  96. def _parse_unsigned_char(self, data):
  97. return struct.unpack("B", data.read(1))[0]
  98. def _parse_unsigned_int(self, data):
  99. return struct.unpack("I", data.read(4))[0]
  100. def _parse_unsigned_long(self, data):
  101. return struct.unpack("Q", data.read(8))[0]
  102. def _parse_double(self, data):
  103. return struct.unpack("d", data.read(8))[0]
  104. def _parse_string(self, data):
  105. value = data.read(2)
  106. while not value.endswith("\x00\x00"):
  107. # the string ends at the first instance of \x00\x00
  108. value += data.read(2)
  109. return value.decode("utf16")[:-1].encode("utf8")
  110. def _parse_char_array(self, data):
  111. array_length = struct.unpack("Q", data.read(8))[0]
  112. return array.array("B", data.read(array_length))
  113. def _parse_metadata_item(self, args):
  114. data, cursor_position = args
  115. new_count, length = struct.unpack("<IQ", data.read(12))
  116. length -= data.tell() - cursor_position
  117. next_data_length = data.read(length)
  118. value = self._read_metadata(next_data_length, new_count)
  119. # Skip some offsets
  120. data.read(new_count * 8)
  121. return value
  122. def _get_value(self, data, data_type):
  123. parser = {1: {'method': self._parse_unsigned_char, 'args': data},
  124. 2: {'method': self._parse_unsigned_int, 'args': data},
  125. 3: {'method': self._parse_unsigned_int, 'args': data},
  126. 5: {'method': self._parse_unsigned_long, 'args': data},
  127. 6: {'method': self._parse_double, 'args': data},
  128. 8: {'method': self._parse_string, 'args': data},
  129. 9: {'method': self._parse_char_array, 'args': data},
  130. 11: {'method': self._parse_metadata_item, 'args': (data, self._cursor_position)}}
  131. return parser[data_type]['method'](parser[data_type]['args'])
  132. def _read_metadata(self, data, count):
  133. data = StringIO(data)
  134. metadata = {}
  135. for _ in xrange(count):
  136. self._cursor_position = data.tell()
  137. header = data.read(2)
  138. if not header:
  139. break
  140. data_type, name_length = map(ord, header)
  141. name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
  142. value = self._get_value(data, data_type)
  143. if name not in metadata:
  144. metadata[name] = value
  145. else:
  146. if not isinstance(metadata[name], list):
  147. # We have encountered this key exactly once before. Since we're seeing it again, we know we
  148. # need to convert it to a list before proceeding.
  149. metadata[name] = [metadata[name]]
  150. # We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
  151. # the value to the already-existing list.
  152. metadata[name].append(value)
  153. return metadata