Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
4.9 KiB

7 years ago
  1. # This class is responsible for handling all asynchronous Logz.io's communication
  2. import sys
  3. import requests
  4. import json
  5. from threading import Thread, enumerate
  6. from datetime import datetime
  7. from time import sleep
  8. if sys.version[0] == '2':
  9. import Queue as queue
  10. else:
  11. import queue as queue
  12. MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
  13. def backup_logs(logs):
  14. timestamp = datetime.now().strftime("%d%m%Y-%H%M%S")
  15. print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp))
  16. with open("logzio-failures-{0}.txt".format(timestamp), "a") as f:
  17. f.writelines('\n'.join(logs))
  18. class LogzioSender:
  19. def __init__(self, token, url="https://listener.logz.io:8071", logs_drain_timeout=5, debug=False):
  20. self.token = token
  21. self.url = "{0}/?token={1}".format(url, token)
  22. self.logs_drain_timeout = logs_drain_timeout
  23. self.debug = debug
  24. # Function to see if the main thread is alive
  25. self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
  26. # Create a queue to hold logs
  27. self.queue = queue.Queue()
  28. self.sending_thread = Thread(target=self._drain_queue)
  29. self.sending_thread.daemon = False
  30. self.sending_thread.name = "logzio-sending-thread"
  31. self.sending_thread.start()
  32. def append(self, logs_message):
  33. # Queue lib is thread safe, no issue here
  34. self.queue.put(json.dumps(logs_message))
  35. def _debug(self, message):
  36. if self.debug:
  37. print(str(message))
  38. def _drain_queue(self):
  39. last_try = False
  40. while not last_try:
  41. # If main is exited, we should run one last time and try to remove all logs
  42. if not self.is_main_thread_active():
  43. self._debug("Identified quit of main thread, sending logs one last time")
  44. last_try = True
  45. try:
  46. self._flush_the_queue()
  47. except Exception as e:
  48. self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e))
  49. if not last_try:
  50. sleep(self.logs_drain_timeout)
  51. def _flush_the_queue(self):
  52. # Sending logs until queue is empty
  53. while not self.queue.empty():
  54. logs_list = self._get_messages_up_to_max_allowed_size()
  55. self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io")
  56. # Not configurable from the outside
  57. sleep_between_retries = 2
  58. number_of_retries = 4
  59. should_backup_to_disk = True
  60. headers = {"Content-type": "text/plain"}
  61. for current_try in range(number_of_retries):
  62. should_retry = False
  63. try:
  64. response = requests.post(self.url, headers=headers, data='\n'.join(logs_list))
  65. if response.status_code != 200:
  66. if response.status_code == 400:
  67. print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text))
  68. should_backup_to_disk = False
  69. break
  70. if response.status_code == 401:
  71. print("You are not authorized with Logz.io! Token OK? dropping logs...")
  72. should_backup_to_disk = False
  73. break
  74. else:
  75. print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text))
  76. should_retry = True
  77. else:
  78. self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!")
  79. should_backup_to_disk = False
  80. break
  81. except Exception as e:
  82. print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e))
  83. should_retry = True
  84. if should_retry:
  85. sleep(sleep_between_retries)
  86. sleep_between_retries *= 2
  87. if should_backup_to_disk:
  88. # Write to file
  89. print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.")
  90. backup_logs(logs_list)
  91. def _get_messages_up_to_max_allowed_size(self):
  92. logs_list = []
  93. current_size = 0
  94. while not self.queue.empty():
  95. current_log = self.queue.get()
  96. current_size += sys.getsizeof(current_log)
  97. logs_list.append(current_log)
  98. if current_size >= MAX_BULK_SIZE_IN_BYTES:
  99. break
  100. return logs_list