Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
7.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. # This class is responsible for handling all asynchronous Logz.io's
  2. # communication
  3. import sys
  4. import json
  5. from time import sleep
  6. from datetime import datetime
  7. from threading import Thread, enumerate
  8. import requests
  9. from datetime import datetime
  10. import opensearchpy
  11. import logging
  12. opensearch_logger = logging.getLogger('opensearch')
  13. opensearch_logger.propagate = False
  14. opensearch_logger.handlers.clear()
  15. opensearch_logger.setLevel(logging.WARNING)
  16. from .logger import get_logger
  17. from .logger import get_stdout_logger
  18. if sys.version[0] == '2':
  19. import Queue as queue
  20. else:
  21. import queue as queue
  22. MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
  23. def backup_logs(logs, logger):
  24. timestamp = datetime.now().strftime('%d%m%Y-%H%M%S')
  25. logger.info(
  26. 'Backing up your logs to logzio-failures-%s.txt', timestamp)
  27. with open('logzio-failures-{}.txt'.format(timestamp), 'a') as f:
  28. f.writelines('\n'.join(logs))
  29. class LogzioSender:
  30. def __init__(self,
  31. host, port, username, password,
  32. # token, url='https://listener.logz.io:8071',
  33. logs_drain_timeout=5,
  34. debug=False,
  35. backup_logs=True,
  36. network_timeout=10.0,
  37. number_of_retries=4,
  38. retry_timeout=2):
  39. # self.token = token
  40. # self.url = '{}/?token={}'.format(url, token)
  41. self.client = opensearchpy.OpenSearch(
  42. hosts = [{'host': host, 'port': port}],
  43. http_compress = True, # enables gzip compression for request bodies
  44. http_auth = (username, password),
  45. use_ssl = True,
  46. verify_certs = True,
  47. ssl_assert_hostname = False,
  48. ssl_show_warn = False,
  49. )
  50. self.logs_drain_timeout = logs_drain_timeout
  51. self.stdout_logger = get_stdout_logger(debug)
  52. self.backup_logs = backup_logs
  53. self.network_timeout = network_timeout
  54. self.requests_session = requests.Session()
  55. self.number_of_retries = number_of_retries
  56. self.retry_timeout = retry_timeout
  57. # Function to see if the main thread is alive
  58. self.is_main_thread_active = lambda: any(
  59. (i.name == 'MainThread') and i.is_alive() for i in enumerate())
  60. # Create a queue to hold logs
  61. self.queue = queue.Queue()
  62. self._initialize_sending_thread()
  63. def __del__(self):
  64. del self.stdout_logger
  65. del self.backup_logs
  66. del self.queue
  67. def _initialize_sending_thread(self):
  68. self.sending_thread = Thread(target=self._drain_queue)
  69. self.sending_thread.daemon = False
  70. self.sending_thread.name = 'logzio-sending-thread'
  71. self.sending_thread.start()
  72. def append(self, logs_message):
  73. if not self.sending_thread.is_alive():
  74. self._initialize_sending_thread()
  75. # Queue lib is thread safe, no issue here
  76. # self.queue.put(json.dumps(logs_message))
  77. self.queue.put(logs_message)
  78. def flush(self):
  79. self._flush_queue()
  80. def _drain_queue(self):
  81. last_try = False
  82. while not last_try:
  83. # If main is exited, we should run one last time and try to remove
  84. # all logs
  85. if not self.is_main_thread_active():
  86. self.stdout_logger.debug(
  87. 'Identified quit of main thread, sending logs one '
  88. 'last time')
  89. last_try = True
  90. try:
  91. self._flush_queue()
  92. except Exception as e:
  93. self.stdout_logger.debug(
  94. 'Unexpected exception while draining queue to Logz.io, '
  95. 'swallowing. Exception: %s', e)
  96. if not last_try:
  97. sleep(self.logs_drain_timeout)
  98. def _flush_queue(self):
  99. # Sending logs until queue is empty
  100. while not self.queue.empty():
  101. logs_list = self._get_messages_up_to_max_allowed_size()
  102. self.stdout_logger.debug(
  103. 'Starting to drain %s logs to Logz.io', len(logs_list))
  104. # Not configurable from the outside
  105. sleep_between_retries = self.retry_timeout
  106. self.number_of_retries = self.number_of_retries
  107. should_backup_to_disk = True
  108. headers = {"Content-type": "text/plain"}
  109. for current_try in range(self.number_of_retries):
  110. should_retry = False
  111. try:
  112. index_name = f"backendlog-{datetime.utcnow():%Y-%m}"
  113. index_body = {'settings': {'index': {'number_of_shards': 1, 'number_of_replicas': 0}}}
  114. self.client.indices.create(index_name, body=index_body, ignore=400)
  115. respose = opensearchpy.helpers.bulk(
  116. self.client,
  117. [{'_index': index_name, **entry} for entry in logs_list],
  118. max_retries=3
  119. )
  120. # response = self.requests_session.post(
  121. # self.url, headers=headers, data='\n'.join(logs_list),
  122. # timeout=self.network_timeout)
  123. # if response.status_code != 200:
  124. # if response.status_code == 400:
  125. # self.stdout_logger.info(
  126. # 'Got 400 code from Logz.io. This means that '
  127. # 'some of your logs are too big, or badly '
  128. # 'formatted. response: %s', response.text)
  129. # should_backup_to_disk = False
  130. # break
  131. # if response.status_code == 401:
  132. # self.stdout_logger.info(
  133. # 'You are not authorized with Logz.io! Token '
  134. # 'OK? dropping logs...')
  135. # should_backup_to_disk = False
  136. # break
  137. # else:
  138. # self.stdout_logger.info(
  139. # 'Got %s while sending logs to Logz.io, '
  140. # 'Try (%s/%s). Response: %s',
  141. # response.status_code,
  142. # current_try + 1,
  143. # self.number_of_retries,
  144. # response.text)
  145. # should_retry = True
  146. # else:
  147. self.stdout_logger.debug(
  148. 'Successfully sent bulk of %s logs to '
  149. 'Logz.io!', len(logs_list))
  150. should_backup_to_disk = False
  151. break
  152. except Exception as e:
  153. self.stdout_logger.warning(
  154. 'Got exception while sending logs to Logz.io, '
  155. 'Try (%s/%s). Message: %s',
  156. current_try + 1, self.number_of_retries, e)
  157. should_retry = True
  158. if should_retry:
  159. sleep(sleep_between_retries)
  160. if should_backup_to_disk and self.backup_logs:
  161. # Write to file
  162. self.stdout_logger.error(
  163. 'Could not send logs to Logz.io after %s tries, '
  164. 'backing up to local file system', self.number_of_retries)
  165. backup_logs(logs_list, self.stdout_logger)
  166. del logs_list
  167. def _get_messages_up_to_max_allowed_size(self):
  168. logs_list = []
  169. current_size = 0
  170. while not self.queue.empty():
  171. current_log = self.queue.get()
  172. try:
  173. current_size += sys.getsizeof(current_log)
  174. except TypeError:
  175. # pypy do not support sys.getsizeof
  176. current_size += len(current_log) * 4
  177. logs_list.append(current_log)
  178. if current_size >= MAX_BULK_SIZE_IN_BYTES:
  179. break
  180. return logs_list