Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

164 lines
5.8 KiB

# This class is responsible for handling all asynchronous Logz.io's
# communication
import sys
import json
from time import sleep
from datetime import datetime
from threading import Thread, enumerate
import requests
if sys.version[0] == '2':
import Queue as queue
else:
import queue as queue
MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
def backup_logs(logs):
timestamp = datetime.now().strftime("%d%m%Y-%H%M%S")
print("Backing up your logs to logzio-failures-{}.txt".format(timestamp))
with open("logzio-failures-{}.txt".format(timestamp), "a") as f:
f.writelines('\n'.join(logs))
class LogzioSender:
def __init__(self,
token, url="https://listener.logz.io:8071",
logs_drain_timeout=5,
debug=False):
self.token = token
self.url = "{}/?token={}".format(url, token)
self.logs_drain_timeout = logs_drain_timeout
self.debug = debug
# Function to see if the main thread is alive
self.is_main_thread_active = lambda: any(
(i.name == "MainThread") and i.is_alive() for i in enumerate())
# Create a queue to hold logs
self.queue = queue.Queue()
self.sending_thread = Thread(target=self._drain_queue)
self.sending_thread.daemon = False
self.sending_thread.name = "logzio-sending-thread"
self.sending_thread.start()
def append(self, logs_message):
# Queue lib is thread safe, no issue here
self.queue.put(json.dumps(logs_message))
def flush(self):
self._flush_queue()
def _debug(self, message):
if self.debug:
print(str(message))
def _drain_queue(self):
last_try = False
while not last_try:
# If main is exited, we should run one last time and try to remove
# all logs
if not self.is_main_thread_active():
self._debug(
"Identified quit of main thread, sending logs one "
"last time")
last_try = True
try:
self._flush_queue()
# TODO: Which exception?
except Exception as e:
self._debug(
"Unexpected exception while draining queue to Logz.io, "
"swallowing. Exception: {}".format(e))
if not last_try:
sleep(self.logs_drain_timeout)
def _flush_queue(self):
# TODO: Break this down. This function is crazy.
# Sending logs until queue is empty
while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size()
self._debug(
"Starting to drain {} logs to Logz.io".format(len(logs_list)))
# Not configurable from the outside
sleep_between_retries = 2
number_of_retries = 4
should_backup_to_disk = True
headers = {"Content-type": "text/plain"}
for current_try in range(number_of_retries):
should_retry = False
try:
response = requests.post(
self.url, headers=headers, data='\n'.join(logs_list))
if response.status_code != 200:
if response.status_code == 400:
print("Got 400 code from Logz.io. This means that "
"some of your logs are too big, or badly "
"formatted. response: {}".format(
response.text))
should_backup_to_disk = False
break
if response.status_code == 401:
print(
"You are not authorized with Logz.io! Token "
"OK? dropping logs...")
should_backup_to_disk = False
break
else:
print(
"Got {} while sending logs to Logz.io, "
"Try ({}/{}). Response: {}".format(
response.status_code,
current_try + 1,
number_of_retries,
response.text))
should_retry = True
else:
self._debug(
"Successfully sent bulk of {} logs to "
"Logz.io!".format(len(logs_list)))
should_backup_to_disk = False
break
# TODO: Which exception?
except Exception as e:
print("Got exception while sending logs to Logz.io, "
"Try ({}/{}). Message: {}".format(
current_try + 1, number_of_retries, e))
should_retry = True
if should_retry:
sleep(sleep_between_retries)
sleep_between_retries *= 2
if should_backup_to_disk:
# Write to file
print("Could not send logs to Logz.io after {} tries, "
"backing up to local file system".format(
number_of_retries))
backup_logs(logs_list)
def _get_messages_up_to_max_allowed_size(self):
logs_list = []
current_size = 0
while not self.queue.empty():
current_log = self.queue.get()
current_size += sys.getsizeof(current_log)
logs_list.append(current_log)
if current_size >= MAX_BULK_SIZE_IN_BYTES:
break
return logs_list