You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

360 lines
9.2 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. import os
  2. import struct
  3. import array
  4. import functools
  5. from datetime import datetime
  6. import six
  7. import re
  8. from nd2reader2.exceptions import InvalidVersionError
  9. def get_version(fh):
  10. """Determines what version the ND2 is.
  11. Args:
  12. fh: File handle of the .nd2 file
  13. Returns:
  14. tuple: Major and minor version
  15. """
  16. # the first 16 bytes seem to have no meaning, so we skip them
  17. fh.seek(16)
  18. # the next 38 bytes contain the string that we want to parse. Unlike most of the ND2, this is in UTF-8
  19. data = fh.read(38).decode("utf8")
  20. return parse_version(data)
  21. def parse_version(data):
  22. """Parses a string with the version data in it.
  23. Args:
  24. data (unicode): the 19th through 54th byte of the ND2, representing the version
  25. Returns:
  26. tuple: Major and minor version
  27. """
  28. match = re.search(r"""^ND2 FILE SIGNATURE CHUNK NAME01!Ver(?P<major>\d)\.(?P<minor>\d)$""", data)
  29. if match:
  30. # We haven't seen a lot of ND2s but the ones we have seen conform to this
  31. return int(match.group('major')), int(match.group('minor'))
  32. raise InvalidVersionError("The version of the ND2 you specified is not supported.")
  33. def read_chunk(fh, chunk_location):
  34. """Reads a piece of data given the location of its pointer.
  35. Args:
  36. fh: an open file handle to the ND2
  37. chunk_location (int): location to read
  38. Returns:
  39. bytes: the data at the chunk location
  40. """
  41. if chunk_location is None or fh is None:
  42. return None
  43. fh.seek(chunk_location)
  44. # The chunk metadata is always 16 bytes long
  45. chunk_metadata = fh.read(16)
  46. header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
  47. if header != 0xabeceda:
  48. raise ValueError("The ND2 file seems to be corrupted.")
  49. # We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
  50. # start of the actual data field, which is at some arbitrary place after the metadata.
  51. fh.seek(chunk_location + 16 + relative_offset)
  52. return fh.read(data_length)
  53. def read_array(fh, kind, chunk_location):
  54. """
  55. Args:
  56. fh: File handle of the nd2 file
  57. kind: data type, can be one of 'double', 'int' or 'float'
  58. chunk_location: the location of the array chunk in the binary nd2 file
  59. Returns:
  60. array.array: an array of the data
  61. """
  62. kinds = {'double': 'd',
  63. 'int': 'i',
  64. 'float': 'f'}
  65. if kind not in kinds:
  66. raise ValueError('You attempted to read an array of an unknown type.')
  67. raw_data = read_chunk(fh, chunk_location)
  68. if raw_data is None:
  69. return None
  70. return array.array(kinds[kind], raw_data)
  71. def _parse_unsigned_char(data):
  72. """
  73. Args:
  74. data: binary data
  75. Returns:
  76. char: the data converted to unsigned char
  77. """
  78. return struct.unpack("B", data.read(1))[0]
  79. def _parse_unsigned_int(data):
  80. """
  81. Args:
  82. data: binary data
  83. Returns:
  84. int: the data converted to unsigned int
  85. """
  86. return struct.unpack("I", data.read(4))[0]
  87. def _parse_unsigned_long(data):
  88. """
  89. Args:
  90. data: binary data
  91. Returns:
  92. long: the data converted to unsigned long
  93. """
  94. return struct.unpack("Q", data.read(8))[0]
  95. def _parse_double(data):
  96. """
  97. Args:
  98. data: binary data
  99. Returns:
  100. double: the data converted to double
  101. """
  102. return struct.unpack("d", data.read(8))[0]
  103. def _parse_string(data, utf8encode=True):
  104. """
  105. Args:
  106. data: binary data
  107. Returns:
  108. string: the data converted to string
  109. """
  110. value = data.read(2)
  111. # the string ends at the first instance of \x00\x00
  112. while not value.endswith(six.b("\x00\x00")):
  113. next_data = data.read(2)
  114. if len(next_data) == 0:
  115. break
  116. value += next_data
  117. try:
  118. decoded = value.decode("utf16")[:-1]
  119. except UnicodeDecodeError:
  120. decoded = value.decode('utf8')
  121. if utf8encode:
  122. decoded = decoded.encode("utf8")
  123. return decoded
  124. def _parse_char_array(data):
  125. """
  126. Args:
  127. data: binary data
  128. Returns:
  129. array.array: the data converted to an array
  130. """
  131. array_length = struct.unpack("Q", data.read(8))[0]
  132. return array.array("B", data.read(array_length))
  133. def parse_date(text_info):
  134. """
  135. The date and time when acquisition began.
  136. Args:
  137. text_info: the text that contains the date and time information
  138. Returns:
  139. datetime: the date and time of the acquisition
  140. """
  141. for line in text_info.values():
  142. line = line.decode("utf8")
  143. # ND2s seem to randomly switch between 12- and 24-hour representations.
  144. possible_formats = ["%m/%d/%Y %H:%M:%S", "%m/%d/%Y %I:%M:%S %p", "%d/%m/%Y %H:%M:%S"]
  145. for date_format in possible_formats:
  146. try:
  147. absolute_start = datetime.strptime(line, date_format)
  148. except (TypeError, ValueError):
  149. continue
  150. return absolute_start
  151. return None
  152. def _parse_metadata_item(data, cursor_position, utf8encode=True):
  153. """Reads hierarchical data, analogous to a Python dict.
  154. Args:
  155. data: the binary data that needs to be parsed
  156. cursor_position: the position in the binary nd2 file
  157. Returns:
  158. dict: a dictionary containing the metadata item
  159. """
  160. new_count, length = struct.unpack("<IQ", data.read(12))
  161. length -= data.tell() - cursor_position
  162. next_data_length = data.read(length)
  163. value = read_metadata(next_data_length, new_count, utf8encode=utf8encode)
  164. # Skip some offsets
  165. data.read(new_count * 8)
  166. return value
  167. def _get_value(data, data_type, cursor_position, utf8encode=True):
  168. """ND2s use various codes to indicate different data types, which we translate here.
  169. Args:
  170. data: the binary data
  171. data_type: the data type (unsigned char = 1, unsigned int = 2 or 3, unsigned long = 5, double = 6, string = 8,
  172. char array = 9, metadata item = 11)
  173. cursor_position: the cursor position in the binary nd2 file
  174. Returns:
  175. mixed: the parsed value
  176. """
  177. parser = {1: _parse_unsigned_char,
  178. 2: _parse_unsigned_int,
  179. 3: _parse_unsigned_int,
  180. 5: _parse_unsigned_long,
  181. 6: _parse_double,
  182. 8: functools.partial(_parse_string, utf8encode=utf8encode),
  183. 9: _parse_char_array,
  184. 11: functools.partial(_parse_metadata_item, utf8encode=utf8encode)}
  185. try:
  186. value = parser[data_type](data) if data_type < 11 else parser[data_type](data, cursor_position)
  187. except (KeyError, struct.error):
  188. value = None
  189. return value
  190. def read_metadata(data, count, utf8encode=True):
  191. """
  192. Iterates over each element of some section of the metadata and parses it.
  193. Args:
  194. data: the metadata in binary form
  195. count: the number of metadata elements
  196. Returns:
  197. dict: a dictionary containing the parsed metadata
  198. """
  199. if data is None:
  200. return None
  201. data = six.BytesIO(data)
  202. metadata = {}
  203. for _ in range(count):
  204. cursor_position = data.tell()
  205. header = data.read(2)
  206. if not header:
  207. # We've reached the end of some hierarchy of data
  208. break
  209. data_type, name_length = struct.unpack('BB', header)
  210. name = data.read(name_length * 2).decode("utf16")[:-1]
  211. value = _get_value(data, data_type, cursor_position, utf8encode=utf8encode)
  212. if utf8encode:
  213. name = name.encode("utf8")
  214. metadata = _add_to_metadata(metadata, name, value)
  215. return metadata
  216. def _add_to_metadata(metadata, name, value):
  217. """
  218. Add the name value pair to the metadata dict
  219. Args:
  220. metadata (dict): a dictionary containing the metadata
  221. name (string): the dictionary key
  222. value: the value to add
  223. Returns:
  224. dict: the new metadata dictionary
  225. """
  226. if name not in metadata.keys():
  227. metadata[name] = value
  228. else:
  229. if not isinstance(metadata[name], list):
  230. # We have encountered this key exactly once before. Since we're seeing it again, we know we
  231. # need to convert it to a list before proceeding.
  232. metadata[name] = [metadata[name]]
  233. # We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
  234. # the value to the already-existing list.
  235. metadata[name].append(value)
  236. return metadata
  237. def get_from_dict_if_exists(key, dictionary, convert_key_to_binary=True):
  238. """
  239. Get the entry from the dictionary if it exists
  240. Args:
  241. key: key to lookup
  242. dictionary: dictionary to look in
  243. convert_key_to_binary: convert the key from string to binary if true
  244. Returns:
  245. the value of dictionary[key] or None
  246. """
  247. if convert_key_to_binary:
  248. key = six.b(key)
  249. if key not in dictionary:
  250. return None
  251. return dictionary[key]
  252. def check_or_make_dir(directory):
  253. """
  254. Check if a directory exists, if not, create it
  255. Args:
  256. directory: the path to the directory
  257. """
  258. if not os.path.exists(directory):
  259. os.makedirs(directory)