You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
5.9 KiB

  1. import struct
  2. import array
  3. from datetime import datetime
  4. import six
  5. import re
  6. from nd2reader.exceptions import InvalidVersionError
  7. def get_version(fh):
  8. """
  9. Determines what version the ND2 is.
  10. :param fh: an open file handle to the ND2
  11. :type fh: file
  12. """
  13. # the first 16 bytes seem to have no meaning, so we skip them
  14. fh.seek(16)
  15. # the next 38 bytes contain the string that we want to parse. Unlike most of the ND2, this is in UTF-8
  16. data = fh.read(38).decode("utf8")
  17. return parse_version(data)
  18. def parse_version(data):
  19. """
  20. Parses a string with the version data in it.
  21. :param data: the 19th through 54th byte of the ND2, representing the version
  22. :type data: unicode
  23. """
  24. match = re.search(r"""^ND2 FILE SIGNATURE CHUNK NAME01!Ver(?P<major>\d)\.(?P<minor>\d)$""", data)
  25. if match:
  26. # We haven't seen a lot of ND2s but the ones we have seen conform to this
  27. return int(match.group('major')), int(match.group('minor'))
  28. raise InvalidVersionError("The version of the ND2 you specified is not supported.")
  29. def read_chunk(fh, chunk_location):
  30. """
  31. Reads a piece of data given the location of its pointer.
  32. :param fh: an open file handle to the ND2
  33. :param chunk_location: a pointer
  34. :type chunk_location: int
  35. :rtype: bytes
  36. """
  37. if chunk_location is None:
  38. return None
  39. fh.seek(chunk_location)
  40. # The chunk metadata is always 16 bytes long
  41. chunk_metadata = fh.read(16)
  42. header, relative_offset, data_length = struct.unpack("IIQ", chunk_metadata)
  43. if header != 0xabeceda:
  44. raise ValueError("The ND2 file seems to be corrupted.")
  45. # We start at the location of the chunk metadata, skip over the metadata, and then proceed to the
  46. # start of the actual data field, which is at some arbitrary place after the metadata.
  47. fh.seek(chunk_location + 16 + relative_offset)
  48. return fh.read(data_length)
  49. def read_array(fh, kind, chunk_location):
  50. kinds = {'double': 'd',
  51. 'int': 'i',
  52. 'float': 'f'}
  53. if kind not in kinds:
  54. raise ValueError('You attempted to read an array of an unknown type.')
  55. raw_data = read_chunk(fh, chunk_location)
  56. if raw_data is None:
  57. return None
  58. return array.array(kinds[kind], raw_data)
  59. def _parse_unsigned_char(data):
  60. return struct.unpack("B", data.read(1))[0]
  61. def _parse_unsigned_int(data):
  62. return struct.unpack("I", data.read(4))[0]
  63. def _parse_unsigned_long(data):
  64. return struct.unpack("Q", data.read(8))[0]
  65. def _parse_double(data):
  66. return struct.unpack("d", data.read(8))[0]
  67. def _parse_string(data):
  68. value = data.read(2)
  69. while not value.endswith(six.b("\x00\x00")):
  70. # the string ends at the first instance of \x00\x00
  71. value += data.read(2)
  72. return value.decode("utf16")[:-1].encode("utf8")
  73. def _parse_char_array(data):
  74. array_length = struct.unpack("Q", data.read(8))[0]
  75. return array.array("B", data.read(array_length))
  76. def parse_date(text_info):
  77. """
  78. The date and time when acquisition began.
  79. :rtype: datetime.datetime() or None
  80. """
  81. for line in text_info.values():
  82. line = line.decode("utf8")
  83. # ND2s seem to randomly switch between 12- and 24-hour representations.
  84. try:
  85. absolute_start = datetime.strptime(line, "%m/%d/%Y %H:%M:%S")
  86. except (TypeError, ValueError):
  87. try:
  88. absolute_start = datetime.strptime(line, "%m/%d/%Y %I:%M:%S %p")
  89. except (TypeError, ValueError):
  90. absolute_start = None
  91. return absolute_start
  92. def _parse_metadata_item(data, cursor_position):
  93. """
  94. Reads hierarchical data, analogous to a Python dict.
  95. """
  96. new_count, length = struct.unpack("<IQ", data.read(12))
  97. length -= data.tell() - cursor_position
  98. next_data_length = data.read(length)
  99. value = read_metadata(next_data_length, new_count)
  100. # Skip some offsets
  101. data.read(new_count * 8)
  102. return value
  103. def _get_value(data, data_type, cursor_position):
  104. """
  105. ND2s use various codes to indicate different data types, which we translate here.
  106. """
  107. parser = {1: _parse_unsigned_char,
  108. 2: _parse_unsigned_int,
  109. 3: _parse_unsigned_int,
  110. 5: _parse_unsigned_long,
  111. 6: _parse_double,
  112. 8: _parse_string,
  113. 9: _parse_char_array,
  114. 11: _parse_metadata_item}
  115. return parser[data_type](data) if data_type < 11 else parser[data_type](data, cursor_position)
  116. def read_metadata(data, count):
  117. """
  118. Iterates over each element some section of the metadata and parses it.
  119. """
  120. if data is None:
  121. return None
  122. data = six.BytesIO(data)
  123. metadata = {}
  124. for _ in range(count):
  125. cursor_position = data.tell()
  126. header = data.read(2)
  127. if not header:
  128. # We've reached the end of some hierarchy of data
  129. break
  130. if six.PY3:
  131. header = header.decode("utf8")
  132. data_type, name_length = map(ord, header)
  133. name = data.read(name_length * 2).decode("utf16")[:-1].encode("utf8")
  134. value = _get_value(data, data_type, cursor_position)
  135. metadata = _add_to_metadata(metadata, name, value)
  136. return metadata
  137. def _add_to_metadata(metadata, name, value):
  138. """
  139. Add the name value pair to the metadata dict
  140. :param metadata:
  141. :param name:
  142. :param value:
  143. :return:
  144. """
  145. if name not in metadata.keys():
  146. metadata[name] = value
  147. else:
  148. if not isinstance(metadata[name], list):
  149. # We have encountered this key exactly once before. Since we're seeing it again, we know we
  150. # need to convert it to a list before proceeding.
  151. metadata[name] = [metadata[name]]
  152. # We've encountered this key before so we're guaranteed to be dealing with a list. Thus we append
  153. # the value to the already-existing list.
  154. metadata[name].append(value)
  155. return metadata