# This class is responsible for handling all asynchronous Logz.io's
|
|
# communication
|
|
import sys
|
|
|
|
import json
|
|
|
|
from time import sleep
|
|
from datetime import datetime
|
|
from threading import Thread, enumerate
|
|
|
|
import requests
|
|
|
|
if sys.version[0] == '2':
|
|
import Queue as queue
|
|
else:
|
|
import queue as queue
|
|
|
|
|
|
MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
|
|
|
|
|
|
def backup_logs(logs):
|
|
timestamp = datetime.now().strftime("%d%m%Y-%H%M%S")
|
|
print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp))
|
|
with open("logzio-failures-{0}.txt".format(timestamp), "a") as f:
|
|
f.writelines('\n'.join(logs))
|
|
|
|
|
|
class LogzioSender:
|
|
|
|
def __init__(self,
|
|
token, url="https://listener.logz.io:8071",
|
|
logs_drain_timeout=5,
|
|
debug=False):
|
|
self.token = token
|
|
self.url = "{0}/?token={1}".format(url, token)
|
|
self.logs_drain_timeout = logs_drain_timeout
|
|
self.debug = debug
|
|
|
|
# Function to see if the main thread is alive
|
|
self.is_main_thread_active = lambda: any(
|
|
(i.name == "MainThread") and i.is_alive() for i in enumerate())
|
|
|
|
# Create a queue to hold logs
|
|
self.queue = queue.Queue()
|
|
|
|
self.sending_thread = Thread(target=self._drain_queue)
|
|
self.sending_thread.daemon = False
|
|
self.sending_thread.name = "logzio-sending-thread"
|
|
self.sending_thread.start()
|
|
|
|
def append(self, logs_message):
|
|
# Queue lib is thread safe, no issue here
|
|
self.queue.put(json.dumps(logs_message))
|
|
|
|
def flush(self):
|
|
self._flush_queue()
|
|
|
|
def _debug(self, message):
|
|
if self.debug:
|
|
print(str(message))
|
|
|
|
def _drain_queue(self):
|
|
last_try = False
|
|
|
|
while not last_try:
|
|
# If main is exited, we should run one last time and try to remove
|
|
# all logs
|
|
if not self.is_main_thread_active():
|
|
self._debug(
|
|
"Identified quit of main thread, sending logs one "
|
|
"last time")
|
|
last_try = True
|
|
|
|
try:
|
|
self._flush_queue()
|
|
|
|
except Exception as e:
|
|
self._debug(
|
|
"Unexpected exception while draining queue to Logz.io, "
|
|
"swallowing. Exception: " + str(e))
|
|
|
|
if not last_try:
|
|
sleep(self.logs_drain_timeout)
|
|
|
|
def _flush_queue(self):
|
|
# Sending logs until queue is empty
|
|
while not self.queue.empty():
|
|
logs_list = self._get_messages_up_to_max_allowed_size()
|
|
self._debug("Starting to drain " +
|
|
str(len(logs_list)) + " logs to Logz.io")
|
|
|
|
# Not configurable from the outside
|
|
sleep_between_retries = 2
|
|
number_of_retries = 4
|
|
|
|
should_backup_to_disk = True
|
|
headers = {"Content-type": "text/plain"}
|
|
|
|
for current_try in range(number_of_retries):
|
|
should_retry = False
|
|
try:
|
|
response = requests.post(
|
|
self.url, headers=headers, data='\n'.join(logs_list))
|
|
if response.status_code != 200:
|
|
if response.status_code == 400:
|
|
print("Got 400 code from Logz.io. This means that "
|
|
"some of your logs are too big, or badly "
|
|
"formatted. response: {0}".format(
|
|
response.text))
|
|
should_backup_to_disk = False
|
|
break
|
|
|
|
if response.status_code == 401:
|
|
print(
|
|
"You are not authorized with Logz.io! Token "
|
|
"OK? dropping logs...")
|
|
should_backup_to_disk = False
|
|
break
|
|
else:
|
|
print(
|
|
"Got {} while sending logs to Logz.io, "
|
|
"Try ({}/{}). Response: {}".format(
|
|
response.status_code,
|
|
current_try + 1,
|
|
number_of_retries,
|
|
response.text))
|
|
should_retry = True
|
|
else:
|
|
self._debug("Successfully sent bulk of " +
|
|
str(len(logs_list)) + " logs to Logz.io!")
|
|
should_backup_to_disk = False
|
|
break
|
|
|
|
except Exception as e:
|
|
print("Got exception while sending logs to Logz.io, "
|
|
"Try ({}/{}). Message: {}".format(
|
|
current_try + 1, number_of_retries, e))
|
|
should_retry = True
|
|
|
|
if should_retry:
|
|
sleep(sleep_between_retries)
|
|
sleep_between_retries *= 2
|
|
|
|
if should_backup_to_disk:
|
|
# Write to file
|
|
print("Could not send logs to Logz.io after " +
|
|
str(number_of_retries) +
|
|
" tries, backing up to local file system.")
|
|
backup_logs(logs_list)
|
|
|
|
def _get_messages_up_to_max_allowed_size(self):
|
|
logs_list = []
|
|
current_size = 0
|
|
while not self.queue.empty():
|
|
current_log = self.queue.get()
|
|
current_size += sys.getsizeof(current_log)
|
|
logs_list.append(current_log)
|
|
if current_size >= MAX_BULK_SIZE_IN_BYTES:
|
|
break
|
|
return logs_list
|