Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
5.8 KiB

  1. # This class is responsible for handling all asynchronous Logz.io's
  2. # communication
  3. import sys
  4. import json
  5. from time import sleep
  6. from datetime import datetime
  7. from threading import Thread, enumerate
  8. import requests
  9. if sys.version[0] == '2':
  10. import Queue as queue
  11. else:
  12. import queue as queue
  13. MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
  14. def backup_logs(logs):
  15. timestamp = datetime.now().strftime('%d%m%Y-%H%M%S')
  16. print('Backing up your logs to logzio-failures-{}.txt'.format(timestamp))
  17. with open('logzio-failures-{}.txt'.format(timestamp), 'a') as f:
  18. f.writelines('\n'.join(logs))
  19. class LogzioSender:
  20. def __init__(self,
  21. token, url='https://listener.logz.io:8071',
  22. logs_drain_timeout=5,
  23. debug=False):
  24. self.token = token
  25. self.url = '{}/?token={}'.format(url, token)
  26. self.logs_drain_timeout = logs_drain_timeout
  27. self.debug = debug
  28. # Function to see if the main thread is alive
  29. self.is_main_thread_active = lambda: any(
  30. (i.name == 'MainThread') and i.is_alive() for i in enumerate())
  31. # Create a queue to hold logs
  32. self.queue = queue.Queue()
  33. self.sending_thread = Thread(target=self._drain_queue)
  34. self.sending_thread.daemon = False
  35. self.sending_thread.name = 'logzio-sending-thread'
  36. self.sending_thread.start()
  37. def append(self, logs_message):
  38. # Queue lib is thread safe, no issue here
  39. self.queue.put(json.dumps(logs_message))
  40. def flush(self):
  41. self._flush_queue()
  42. def _debug(self, message):
  43. if self.debug:
  44. print(str(message))
  45. def _drain_queue(self):
  46. last_try = False
  47. while not last_try:
  48. # If main is exited, we should run one last time and try to remove
  49. # all logs
  50. if not self.is_main_thread_active():
  51. self._debug(
  52. 'Identified quit of main thread, sending logs one '
  53. 'last time')
  54. last_try = True
  55. try:
  56. self._flush_queue()
  57. # TODO: Which exception?
  58. except Exception as e:
  59. self._debug(
  60. 'Unexpected exception while draining queue to Logz.io, '
  61. 'swallowing. Exception: {}'.format(e))
  62. if not last_try:
  63. sleep(self.logs_drain_timeout)
  64. def _flush_queue(self):
  65. # TODO: Break this down. This function is crazy.
  66. # Sending logs until queue is empty
  67. while not self.queue.empty():
  68. logs_list = self._get_messages_up_to_max_allowed_size()
  69. self._debug(
  70. 'Starting to drain {} logs to Logz.io'.format(len(logs_list)))
  71. # Not configurable from the outside
  72. sleep_between_retries = 2
  73. number_of_retries = 4
  74. should_backup_to_disk = True
  75. headers = {"Content-type": "text/plain"}
  76. for current_try in range(number_of_retries):
  77. should_retry = False
  78. try:
  79. response = requests.post(
  80. self.url, headers=headers, data='\n'.join(logs_list))
  81. if response.status_code != 200:
  82. if response.status_code == 400:
  83. print('Got 400 code from Logz.io. This means that '
  84. 'some of your logs are too big, or badly '
  85. 'formatted. response: {}'.format(
  86. response.text))
  87. should_backup_to_disk = False
  88. break
  89. if response.status_code == 401:
  90. print(
  91. 'You are not authorized with Logz.io! Token '
  92. 'OK? dropping logs...')
  93. should_backup_to_disk = False
  94. break
  95. else:
  96. print(
  97. 'Got {} while sending logs to Logz.io, '
  98. 'Try ({}/{}). Response: {}'.format(
  99. response.status_code,
  100. current_try + 1,
  101. number_of_retries,
  102. response.text))
  103. should_retry = True
  104. else:
  105. self._debug(
  106. 'Successfully sent bulk of {} logs to '
  107. 'Logz.io!'.format(len(logs_list)))
  108. should_backup_to_disk = False
  109. break
  110. # TODO: Which exception?
  111. except Exception as e:
  112. print('Got exception while sending logs to Logz.io, '
  113. 'Try ({}/{}). Message: {}'.format(
  114. current_try + 1, number_of_retries, e))
  115. should_retry = True
  116. if should_retry:
  117. sleep(sleep_between_retries)
  118. sleep_between_retries *= 2
  119. if should_backup_to_disk:
  120. # Write to file
  121. print('Could not send logs to Logz.io after {} tries, '
  122. 'backing up to local file system'.format(
  123. number_of_retries))
  124. backup_logs(logs_list)
  125. def _get_messages_up_to_max_allowed_size(self):
  126. logs_list = []
  127. current_size = 0
  128. while not self.queue.empty():
  129. current_log = self.queue.get()
  130. current_size += sys.getsizeof(current_log)
  131. logs_list.append(current_log)
  132. if current_size >= MAX_BULK_SIZE_IN_BYTES:
  133. break
  134. return logs_list