Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
6.2 KiB

  1. import logging
  2. import logging.handlers
  3. import requests
  4. import traceback
  5. import datetime
  6. import json
  7. import os
  8. from threading import Event, Thread, Condition, Lock, enumerate
  9. from time import sleep
  10. class LogzioHandler(logging.Handler):
  11. # Hold all logs buffered
  12. logs = []
  13. # Event for locking buffer additions while draining
  14. buffer_event = Event()
  15. # Condition to count log messages
  16. logs_counter_condition = Condition()
  17. # Lock to only drain logs once
  18. drain_lock = Lock()
  19. def __init__(self, token, logs_drain_count=100, logs_drain_timeout=10,
  20. logzio_type="python", url="https://listener.logz.io:8071"):
  21. if token is "":
  22. raise Exception("Logz.io Token must be provided")
  23. logging.Handler.__init__(self)
  24. self.logs_drain_count = logs_drain_count
  25. self.logs_drain_timeout = logs_drain_timeout
  26. self.logzio_type = logzio_type
  27. self.url = "{0}/?token={1}".format(url, token)
  28. self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
  29. self.buffer_event.set()
  30. # Create threads
  31. timeout_thread = Thread(target=self.wait_to_timeout_and_drain)
  32. counter_thread = Thread(target=self.count_logs_and_drain)
  33. # And start them
  34. timeout_thread.start()
  35. counter_thread.start()
  36. def wait_to_timeout_and_drain(self):
  37. while True:
  38. sleep(self.logs_drain_timeout)
  39. if len(self.logs) > 0:
  40. self.drain_messages()
  41. if not self.is_main_thread_active():
  42. # Signal the counter thread so it would exit as well
  43. try:
  44. self.logs_counter_condition.acquire()
  45. self.logs_counter_condition.notify()
  46. finally:
  47. self.logs_counter_condition.release()
  48. break
  49. def count_logs_and_drain(self):
  50. try:
  51. # Acquire the condition
  52. self.logs_counter_condition.acquire()
  53. # Running indefinite
  54. while True:
  55. # Waiting for new log lines to come
  56. self.logs_counter_condition.wait()
  57. if not self.is_main_thread_active():
  58. break
  59. # Do we have enough logs to drain?
  60. if len(self.logs) >= self.logs_drain_count:
  61. self.drain_messages()
  62. finally:
  63. self.logs_counter_condition.release()
  64. def add_to_buffer(self, message):
  65. # Check if we are currently draining buffer so we wont loose logs
  66. self.buffer_event.wait()
  67. try:
  68. # Acquire the condition
  69. self.logs_counter_condition.acquire()
  70. self.logs.append(json.dumps(message))
  71. # Notify watcher for a new log coming in
  72. self.logs_counter_condition.notify()
  73. finally:
  74. # Release the condition
  75. self.logs_counter_condition.release()
  76. def format(self, record):
  77. message = super(LogzioHandler, self).format(record)
  78. try:
  79. return json.loads(message)
  80. except (TypeError, ValueError):
  81. return message
  82. def formatException(self, exc_info):
  83. return '\n'.join(traceback.format_exception(*exc_info))
  84. def format_message(self, message):
  85. now = datetime.datetime.utcnow()
  86. timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z"
  87. return_json = {
  88. "logger": message.name,
  89. "line_number": message.lineno,
  90. "path_name": message.pathname,
  91. "log_level": message.levelname,
  92. "type": self.logzio_type,
  93. "@timestamp": timestamp
  94. }
  95. if message.exc_info:
  96. return_json["message"] = self.formatException(message.exc_info)
  97. else:
  98. formatted_message = self.format(message)
  99. if isinstance(formatted_message, dict):
  100. return_json.update(formatted_message)
  101. else:
  102. return_json["message"] = formatted_message
  103. return return_json
  104. def backup_logs(self, logs):
  105. timestamp = datetime.datetime.now().strftime("%d%m%Y-%H%M%S")
  106. print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp))
  107. with open("logzio-failures-{0}.txt".format(timestamp), "a") as f:
  108. f.writelines('\n'.join(logs))
  109. def drain_messages(self):
  110. try:
  111. self.buffer_event.clear()
  112. self.drain_lock.acquire()
  113. # Copy buffer
  114. temp_logs = list(self.logs)
  115. self.logs = []
  116. # Release the event
  117. self.buffer_event.set()
  118. # Not configurable from the outside
  119. sleep_between_retries = 2
  120. number_of_retries = 4
  121. success_in_send = False
  122. headers = {"Content-type": "text/plain"}
  123. for current_try in range(number_of_retries):
  124. try:
  125. response = requests.post(self.url, headers=headers, data='\n'.join(temp_logs))
  126. if response.status_code != 200: # 429 400, on 400 print stdout
  127. if response.status_code == 400:
  128. print("Got unexpected 400 code from logz.io, response: {0}".format(response.text))
  129. self.backup_logs(temp_logs)
  130. if response.status_code == 401:
  131. print("You are not authorized with logz.io! dropping..")
  132. break
  133. except Exception as e:
  134. print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e.message))
  135. sleep(sleep_between_retries)
  136. sleep_between_retries *= 2
  137. else:
  138. success_in_send = True
  139. break
  140. if not success_in_send:
  141. # Write to file
  142. self.backup_logs(temp_logs)
  143. finally:
  144. self.buffer_event.set()
  145. self.drain_lock.release()
  146. def emit(self, record):
  147. self.add_to_buffer(self.format_message(record))