Browse Source

Fix some flake8 stuff and organize imports

opensearch
nir0s 7 years ago
parent
commit
b511e9ed21
2 changed files with 69 additions and 27 deletions
  1. +20
    -10
      logzio/handler.py
  2. +49
    -17
      logzio/sender.py

+ 20
- 10
logzio/handler.py View File

@ -1,36 +1,45 @@
import datetime
import sys
import json import json
import logging import logging
import logging.handlers
import datetime
import traceback import traceback
import sys
import logging.handlers
from .sender import LogzioSender from .sender import LogzioSender
class LogzioHandler(logging.Handler): class LogzioHandler(logging.Handler):
def __init__(self, token, logzio_type="python", logs_drain_timeout=3,
url="https://listener.logz.io:8071", debug=False):
def __init__(self,
token,
logzio_type="python",
logs_drain_timeout=3,
url="https://listener.logz.io:8071",
debug=False):
if token is "": if token is "":
raise Exception("Logz.io Token must be provided") raise Exception("Logz.io Token must be provided")
self.logzio_type = logzio_type self.logzio_type = logzio_type
self.logzio_sender = LogzioSender(token=token, url=url, logs_drain_timeout=logs_drain_timeout, debug=debug)
self.logzio_sender = LogzioSender(
token=token,
url=url,
logs_drain_timeout=logs_drain_timeout,
debug=debug)
logging.Handler.__init__(self) logging.Handler.__init__(self)
def extra_fields(self, message): def extra_fields(self, message):
not_allowed_keys = ( not_allowed_keys = (
'args', 'asctime', 'created', 'exc_info', 'stack_info', 'exc_text', 'filename',
'funcName', 'levelname', 'levelno', 'lineno', 'module',
'args', 'asctime', 'created', 'exc_info', 'stack_info', 'exc_text',
'filename', 'funcName', 'levelname', 'levelno', 'lineno', 'module',
'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process', 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
'processName', 'relativeCreated', 'thread', 'threadName') 'processName', 'relativeCreated', 'thread', 'threadName')
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
var_type = (basestring, bool, dict, float, int, long, list, type(None))
var_type = (basestring, bool, dict, float,
int, long, list, type(None))
else: else:
var_type = (str, bool, dict, float, int, list, type(None)) var_type = (str, bool, dict, float, int, list, type(None))
@ -60,7 +69,8 @@ class LogzioHandler(logging.Handler):
def format_message(self, message): def format_message(self, message):
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z"
timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + \
".%03d" % (now.microsecond / 1000) + "Z"
return_json = { return_json = {
"logger": message.name, "logger": message.name,


+ 49
- 17
logzio/sender.py View File

@ -1,16 +1,21 @@
# This class is responsible for handling all asynchronous Logz.io's communication
# This class is responsible for handling all asynchronous Logz.io's
# communication
import sys import sys
import requests
import json import json
from threading import Thread, enumerate
from datetime import datetime
from time import sleep from time import sleep
from datetime import datetime
from threading import Thread, enumerate
import requests
if sys.version[0] == '2': if sys.version[0] == '2':
import Queue as queue import Queue as queue
else: else:
import queue as queue import queue as queue
MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
@ -23,14 +28,18 @@ def backup_logs(logs):
class LogzioSender: class LogzioSender:
def __init__(self, token, url="https://listener.logz.io:8071", logs_drain_timeout=5, debug=False):
def __init__(self,
token, url="https://listener.logz.io:8071",
logs_drain_timeout=5,
debug=False):
self.token = token self.token = token
self.url = "{0}/?token={1}".format(url, token) self.url = "{0}/?token={1}".format(url, token)
self.logs_drain_timeout = logs_drain_timeout self.logs_drain_timeout = logs_drain_timeout
self.debug = debug self.debug = debug
# Function to see if the main thread is alive # Function to see if the main thread is alive
self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
self.is_main_thread_active = lambda: any(
(i.name == "MainThread") and i.is_alive() for i in enumerate())
# Create a queue to hold logs # Create a queue to hold logs
self.queue = queue.Queue() self.queue = queue.Queue()
@ -55,16 +64,21 @@ class LogzioSender:
last_try = False last_try = False
while not last_try: while not last_try:
# If main is exited, we should run one last time and try to remove all logs
# If main is exited, we should run one last time and try to remove
# all logs
if not self.is_main_thread_active(): if not self.is_main_thread_active():
self._debug("Identified quit of main thread, sending logs one last time")
self._debug(
"Identified quit of main thread, sending logs one "
"last time")
last_try = True last_try = True
try: try:
self._flush_queue() self._flush_queue()
except Exception as e: except Exception as e:
self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e))
self._debug(
"Unexpected exception while draining queue to Logz.io, "
"swallowing. Exception: " + str(e))
if not last_try: if not last_try:
sleep(self.logs_drain_timeout) sleep(self.logs_drain_timeout)
@ -73,7 +87,8 @@ class LogzioSender:
# Sending logs until queue is empty # Sending logs until queue is empty
while not self.queue.empty(): while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size() logs_list = self._get_messages_up_to_max_allowed_size()
self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io")
self._debug("Starting to drain " +
str(len(logs_list)) + " logs to Logz.io")
# Not configurable from the outside # Not configurable from the outside
sleep_between_retries = 2 sleep_between_retries = 2
@ -85,27 +100,42 @@ class LogzioSender:
for current_try in range(number_of_retries): for current_try in range(number_of_retries):
should_retry = False should_retry = False
try: try:
response = requests.post(self.url, headers=headers, data='\n'.join(logs_list))
response = requests.post(
self.url, headers=headers, data='\n'.join(logs_list))
if response.status_code != 200: if response.status_code != 200:
if response.status_code == 400: if response.status_code == 400:
print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text))
print("Got 400 code from Logz.io. This means that "
"some of your logs are too big, or badly "
"formatted. response: {0}".format(
response.text))
should_backup_to_disk = False should_backup_to_disk = False
break break
if response.status_code == 401: if response.status_code == 401:
print("You are not authorized with Logz.io! Token OK? dropping logs...")
print(
"You are not authorized with Logz.io! Token "
"OK? dropping logs...")
should_backup_to_disk = False should_backup_to_disk = False
break break
else: else:
print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text))
print(
"Got {} while sending logs to Logz.io, "
"Try ({}/{}). Response: {}".format(
response.status_code,
current_try + 1,
number_of_retries,
response.text))
should_retry = True should_retry = True
else: else:
self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!")
self._debug("Successfully sent bulk of " +
str(len(logs_list)) + " logs to Logz.io!")
should_backup_to_disk = False should_backup_to_disk = False
break break
except Exception as e: except Exception as e:
print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e))
print("Got exception while sending logs to Logz.io, "
"Try ({}/{}). Message: {}".format(
current_try + 1, number_of_retries, e))
should_retry = True should_retry = True
if should_retry: if should_retry:
@ -114,7 +144,9 @@ class LogzioSender:
if should_backup_to_disk: if should_backup_to_disk:
# Write to file # Write to file
print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.")
print("Could not send logs to Logz.io after " +
str(number_of_retries) +
" tries, backing up to local file system.")
backup_logs(logs_list) backup_logs(logs_list)
def _get_messages_up_to_max_allowed_size(self): def _get_messages_up_to_max_allowed_size(self):


Loading…
Cancel
Save