Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
5.6 KiB

  1. import logging
  2. import logging.handlers
  3. import requests
  4. import traceback
  5. import datetime
  6. import json
  7. from threading import Event, Thread, Condition, Lock, enumerate
  8. from time import sleep
  9. class LogzioHandler(logging.Handler):
  10. # Hold all logs buffered
  11. logs = []
  12. # Event for locking buffer additions while draining
  13. buffer_event = Event()
  14. # Condition to count log messages
  15. logs_counter_condition = Condition()
  16. # Lock to only drain logs once
  17. drain_lock = Lock()
  18. def __init__(self, token, logs_drain_count=100, logs_drain_timeout=10,
  19. logzio_type="python", url="https://listener.logz.io:8071/"):
  20. if token is "":
  21. raise Exception("Logz.io Token must be provided")
  22. logging.Handler.__init__(self)
  23. self.logs_drain_count = logs_drain_count
  24. self.logs_drain_timeout = logs_drain_timeout
  25. self.logzio_type = logzio_type
  26. self.url = "{0}?token={1}".format(url, token)
  27. self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
  28. self.buffer_event.set()
  29. # Create threads
  30. timeout_thread = Thread(target=self.wait_to_timeout_and_drain)
  31. counter_thread = Thread(target=self.count_logs_and_drain)
  32. # And start them
  33. timeout_thread.start()
  34. counter_thread.start()
  35. def wait_to_timeout_and_drain(self):
  36. while True:
  37. sleep(self.logs_drain_timeout)
  38. if len(self.logs) > 0:
  39. self.drain_messages()
  40. if not self.is_main_thread_active():
  41. # Signal the counter thread so it would exit as well
  42. try:
  43. self.logs_counter_condition.acquire()
  44. self.logs_counter_condition.notify()
  45. finally:
  46. self.logs_counter_condition.release()
  47. break
  48. def count_logs_and_drain(self):
  49. try:
  50. # Acquire the condition
  51. self.logs_counter_condition.acquire()
  52. # Running indefinite
  53. while True:
  54. # Waiting for new log lines to come
  55. self.logs_counter_condition.wait()
  56. if not self.is_main_thread_active():
  57. break
  58. # Do we have enough logs to drain?
  59. if len(self.logs) >= self.logs_drain_count:
  60. self.drain_messages()
  61. finally:
  62. self.logs_counter_condition.release()
  63. def add_to_buffer(self, message):
  64. # Check if we are currently draining buffer so we wont loose logs
  65. self.buffer_event.wait()
  66. try:
  67. # Acquire the condition
  68. self.logs_counter_condition.acquire()
  69. self.logs.append(json.dumps(message))
  70. # Notify watcher for a new log coming in
  71. self.logs_counter_condition.notify()
  72. finally:
  73. # Release the condition
  74. self.logs_counter_condition.release()
  75. def handle_exceptions(self, message):
  76. if message.exc_info:
  77. return '\n'.join(traceback.format_exception(*message.exc_info))
  78. else:
  79. return message.getMessage()
  80. def format_message(self, message):
  81. message_field = self.handle_exceptions(message)
  82. now = datetime.datetime.utcnow()
  83. timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z"
  84. return_json = {
  85. "logger": message.name,
  86. "line_number": message.lineno,
  87. "path_name": message.pathname,
  88. "log_level": message.levelname,
  89. "message": message_field,
  90. "type": self.logzio_type,
  91. "@timestamp": timestamp
  92. }
  93. return return_json
  94. def backup_logs(self, logs):
  95. timestamp = datetime.datetime.now().strftime("%d%m%Y-%H%M%S")
  96. print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp))
  97. with open("logzio-failures-{0}.txt".format(timestamp), "a") as f:
  98. f.writelines('\n'.join(logs))
  99. def drain_messages(self):
  100. try:
  101. self.buffer_event.clear()
  102. self.drain_lock.acquire()
  103. # Copy buffer
  104. temp_logs = list(self.logs)
  105. self.logs = []
  106. # Release the event
  107. self.buffer_event.set()
  108. # Not configurable from the outside
  109. sleep_between_retries = 2000
  110. number_of_retries = 4
  111. success_in_send = False
  112. headers = {"Content-type": "text/plain"}
  113. for current_try in range(number_of_retries):
  114. response = requests.post(self.url, headers=headers, data='\n'.join(temp_logs))
  115. if response.status_code != 200: # 429 400, on 400 print stdout
  116. if response.status_code == 400:
  117. print("Got unexpected 400 code from logz.io, response: {0}".format(response.text))
  118. self.backup_logs(temp_logs)
  119. if response.status_code == 401:
  120. print("You are not authorized with logz.io! dropping..")
  121. break
  122. sleep(sleep_between_retries)
  123. sleep_between_retries *= 2
  124. else:
  125. success_in_send = True
  126. break
  127. if not success_in_send:
  128. # Write to file
  129. self.backup_logs(temp_logs)
  130. finally:
  131. self.buffer_event.set()
  132. self.drain_lock.release()
  133. def emit(self, record):
  134. self.add_to_buffer(self.format_message(record))