Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
5.8 KiB

  1. # This class is responsible for handling all asynchronous Logz.io's
  2. # communication
  3. import sys
  4. import json
  5. from time import sleep
  6. from datetime import datetime
  7. from threading import Thread, enumerate
  8. import requests
  9. from .logger import get_logger
  10. if sys.version[0] == '2':
  11. import Queue as queue
  12. else:
  13. import queue as queue
  14. MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
  15. def backup_logs(logs, logger):
  16. timestamp = datetime.now().strftime('%d%m%Y-%H%M%S')
  17. logger.info(
  18. 'Backing up your logs to logzio-failures-%s.txt', timestamp)
  19. with open('logzio-failures-{}.txt'.format(timestamp), 'a') as f:
  20. f.writelines('\n'.join(logs))
  21. class LogzioSender:
  22. def __init__(self,
  23. token, url='https://listener.logz.io:8071',
  24. logs_drain_timeout=5,
  25. debug=False):
  26. self.token = token
  27. self.url = '{}/?token={}'.format(url, token)
  28. self.logs_drain_timeout = logs_drain_timeout
  29. self.logger = get_logger(debug)
  30. # Function to see if the main thread is alive
  31. self.is_main_thread_active = lambda: any(
  32. (i.name == 'MainThread') and i.is_alive() for i in enumerate())
  33. # Create a queue to hold logs
  34. self.queue = queue.Queue()
  35. self.sending_thread = Thread(target=self._drain_queue)
  36. self.sending_thread.daemon = False
  37. self.sending_thread.name = 'logzio-sending-thread'
  38. self.sending_thread.start()
  39. def append(self, logs_message):
  40. # Queue lib is thread safe, no issue here
  41. self.queue.put(json.dumps(logs_message))
  42. def flush(self):
  43. self._flush_queue()
  44. def _drain_queue(self):
  45. last_try = False
  46. while not last_try:
  47. # If main is exited, we should run one last time and try to remove
  48. # all logs
  49. if not self.is_main_thread_active():
  50. self.logger.debug(
  51. 'Identified quit of main thread, sending logs one '
  52. 'last time')
  53. last_try = True
  54. try:
  55. self._flush_queue()
  56. # TODO: Which exception?
  57. except Exception as e:
  58. self.logger.debug(
  59. 'Unexpected exception while draining queue to Logz.io, '
  60. 'swallowing. Exception: %s', e)
  61. if not last_try:
  62. sleep(self.logs_drain_timeout)
  63. def _flush_queue(self):
  64. # TODO: Break this down. This function is crazy.
  65. # Sending logs until queue is empty
  66. while not self.queue.empty():
  67. logs_list = self._get_messages_up_to_max_allowed_size()
  68. self.logger.debug(
  69. 'Starting to drain %s logs to Logz.io', len(logs_list))
  70. # Not configurable from the outside
  71. sleep_between_retries = 2
  72. number_of_retries = 4
  73. should_backup_to_disk = True
  74. headers = {"Content-type": "text/plain"}
  75. for current_try in range(number_of_retries):
  76. should_retry = False
  77. try:
  78. response = requests.post(
  79. self.url, headers=headers, data='\n'.join(logs_list))
  80. if response.status_code != 200:
  81. if response.status_code == 400:
  82. self.logger.info(
  83. 'Got 400 code from Logz.io. This means that '
  84. 'some of your logs are too big, or badly '
  85. 'formatted. response: %s', response.text)
  86. should_backup_to_disk = False
  87. break
  88. if response.status_code == 401:
  89. self.logger.info(
  90. 'You are not authorized with Logz.io! Token '
  91. 'OK? dropping logs...')
  92. should_backup_to_disk = False
  93. break
  94. else:
  95. self.logger.info(
  96. 'Got %s while sending logs to Logz.io, '
  97. 'Try (%s/%s). Response: %s',
  98. response.status_code,
  99. current_try + 1,
  100. number_of_retries,
  101. response.text)
  102. should_retry = True
  103. else:
  104. self.logger.debug(
  105. 'Successfully sent bulk of %s logs to '
  106. 'Logz.io!', len(logs_list))
  107. should_backup_to_disk = False
  108. break
  109. # TODO: Which exception?
  110. except Exception as e:
  111. self.logger.error(
  112. 'Got exception while sending logs to Logz.io, '
  113. 'Try (%s/%s). Message: %s',
  114. current_try + 1, number_of_retries, e)
  115. should_retry = True
  116. if should_retry:
  117. sleep(sleep_between_retries)
  118. sleep_between_retries *= 2
  119. if should_backup_to_disk:
  120. # Write to file
  121. self.logger.info(
  122. 'Could not send logs to Logz.io after %s tries, '
  123. 'backing up to local file system', number_of_retries)
  124. backup_logs(logs_list, self.logger)
  125. def _get_messages_up_to_max_allowed_size(self):
  126. logs_list = []
  127. current_size = 0
  128. while not self.queue.empty():
  129. current_log = self.queue.get()
  130. current_size += sys.getsizeof(current_log)
  131. logs_list.append(current_log)
  132. if current_size >= MAX_BULK_SIZE_IN_BYTES:
  133. break
  134. return logs_list