Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
4.7 KiB

  1. import logging
  2. import logging.handlers
  3. import requests
  4. import traceback
  5. import datetime
  6. import json
  7. from threading import Event, Thread, Condition, Lock, enumerate
  8. from time import sleep
  9. class LogzioHandler(logging.Handler):
  10. # Hold all logs buffered
  11. logs = []
  12. # Event for locking buffer additions while draining
  13. buffer_event = Event()
  14. # Condition to count log messages
  15. logs_counter_condition = Condition()
  16. # Lock to only drain logs once
  17. drain_lock = Lock()
  18. def __init__(self, token, logs_drain_count=100, logs_drain_timeout=10,
  19. logzio_type="python", url="https://listener.logz.io:8071/"):
  20. if token is "":
  21. raise Exception("Logz.io Token must be provided")
  22. logging.Handler.__init__(self)
  23. self.logs_drain_count = logs_drain_count
  24. self.logs_drain_timeout = logs_drain_timeout
  25. self.logzio_type = logzio_type
  26. self.url = "{0}?token={1}".format(url, token)
  27. self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
  28. self.buffer_event.set()
  29. # Create threads
  30. timeout_thread = Thread(target=self.wait_to_timeout_and_drain)
  31. counter_thread = Thread(target=self.count_logs_and_drain)
  32. # And start them
  33. timeout_thread.start()
  34. counter_thread.start()
  35. def wait_to_timeout_and_drain(self):
  36. while True:
  37. sleep(self.logs_drain_timeout)
  38. if len(self.logs) > 0:
  39. self.drain_messages()
  40. if not self.is_main_thread_active():
  41. # Signal the counter thread so it would exit as well
  42. try:
  43. self.logs_counter_condition.acquire()
  44. self.logs_counter_condition.notify()
  45. finally:
  46. self.logs_counter_condition.release()
  47. break
  48. def count_logs_and_drain(self):
  49. try:
  50. # Acquire the condition
  51. self.logs_counter_condition.acquire()
  52. # Running indefinite
  53. while True:
  54. # Waiting for new log lines to come
  55. self.logs_counter_condition.wait()
  56. if not self.is_main_thread_active():
  57. break
  58. # Do we have enough logs to drain?
  59. if len(self.logs) >= self.logs_drain_count:
  60. self.drain_messages()
  61. finally:
  62. self.logs_counter_condition.release()
  63. def add_to_buffer(self, message):
  64. # Check if we are currently draining buffer so we wont loose logs
  65. self.buffer_event.wait()
  66. try:
  67. # Acquire the condition
  68. self.logs_counter_condition.acquire()
  69. self.logs.append(json.dumps(message))
  70. # Notify watcher for a new log coming in
  71. self.logs_counter_condition.notify()
  72. finally:
  73. # Release the condition
  74. self.logs_counter_condition.release()
  75. def handle_exceptions(self, message):
  76. if message.exc_info:
  77. return '\n'.join(traceback.format_exception(*message.exc_info))
  78. else:
  79. return message.getMessage()
  80. def format_message(self, message):
  81. message_field = self.handle_exceptions(message)
  82. now = datetime.datetime.utcnow()
  83. timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z"
  84. return_json = {
  85. "logger": message.name,
  86. "line_number": message.lineno,
  87. "path_name": message.pathname,
  88. "log_level": message.levelname,
  89. "message": message_field,
  90. "type": self.logzio_type,
  91. "@timestamp": timestamp
  92. }
  93. return return_json
  94. def drain_messages(self):
  95. try:
  96. self.buffer_event.clear()
  97. self.drain_lock.acquire()
  98. # Not configurable from the outside
  99. sleep_between_retries = 2000
  100. number_of_retries = 4
  101. success_in_send = False
  102. headers = {"Content-type": "text/plain"}
  103. for current_try in range(number_of_retries):
  104. response = requests.post(self.url, headers=headers, data='\n'.join(self.logs))
  105. if response.status_code != 200:
  106. sleep(sleep_between_retries)
  107. sleep_between_retries *= 2
  108. else:
  109. success_in_send = True
  110. break
  111. if success_in_send: # Only clear the logs from the buffer if we managed to send them
  112. # Clear the buffer
  113. self.logs = []
  114. finally:
  115. self.buffer_event.set()
  116. self.drain_lock.release()
  117. def emit(self, record):
  118. self.add_to_buffer(self.format_message(record))