Browse Source

Rewrite most of the handler, tests were introduced

opensearch
Roi Rav-Hon 8 years ago
parent
commit
6b31517848
15 changed files with 399 additions and 192 deletions
  1. +1
    -0
      .cache/v/cache/lastfailed
  2. +12
    -0
      .travis.yml
  3. +30
    -16
      README.md
  4. +0
    -0
      __init__.py
  5. +0
    -1
      logzio/__init__.py
  6. +11
    -148
      logzio/handler.py
  7. +116
    -0
      logzio/sender.py
  8. +2
    -2
      setup.py
  9. +0
    -12
      tests/__init__.py
  10. +0
    -0
      tests/mockLogzioListener/__init__.py
  11. +82
    -0
      tests/mockLogzioListener/listener.py
  12. +9
    -0
      tests/mockLogzioListener/logsList.py
  13. +18
    -0
      tests/mockLogzioListener/persistentFlags.py
  14. +10
    -13
      tests/test_logzioHandler.py
  15. +108
    -0
      tests/test_logzioSender.py

+ 1
- 0
.cache/v/cache/lastfailed View File

@ -0,0 +1 @@
{}

+ 12
- 0
.travis.yml View File

@ -0,0 +1,12 @@
language: python
python:
- "2.6"
- "2.7"
- "3.2"
- "3.3"
- "3.4"
install:
- pip install requests
script: pytest

+ 30
- 16
README.md View File

@ -1,9 +1,12 @@
[![PyPI version](https://badge.fury.io/py/logzio-python-handler.svg)](https://badge.fury.io/py/logzio-python-handler)
# The Logz.io Python Handler
This is a Python handler that sends logs in bulk over HTTPS to Logz.io. The handler uses an internal buffer, and you can choose the drain timeout as well as the number of messages to hold in the queue before the drain. Everything works in threads, so if the main program exists, the threads will continue to work until all logs are drained.
**This is in BETA. We currently use this handler internally. We will provide tests soon**
This is a Python handler that sends logs in bulk over HTTPS to Logz.io.
The handler uses a subclass named LogzioSender (which can be used without this handler as well, to ship raw data).
The LogzioSender class opens a new Thread, that consumes from the logs queue. Each iteration (its frequency of which can be configured by the logs_drain_timeout parameter), will try to consume the queue in its entirety.
Logs will get divided into separate bulks, based on their size.
LogzioSender will check if the main thread is alive. In case the main thread quits, it will try to consume the queue one last time, and then exit. So your program can hang for a few seconds, until the logs are drained.
In case the logs failed to be sent to Logz.io after a couple of tries, they will be written to the local file system. You can later upload them to Logz.io using curl.
## Installation
```bash
@ -18,11 +21,11 @@ keys=LogzioHandler
[handler_LogzioHandler]
class=logzio.handler.LogzioHandler
formatter=jsonFormat
args=('token', 10, 20)
formatter=logzioFormat
args=('token', 'my_type')
[formatters]
keys=jsonFormat
keys=logzioFormat
[loggers]
keys=root
@ -31,15 +34,18 @@ keys=root
handlers=LogzioHandler
level=INFO
[formatter_jsonFormat]
format={ "loggerName":"%(name)s", "functionName":"%(funcName)s", "lineNo":"%(lineno)d", "levelName":"%(levelname)s", "message":"%(message)s"}
[formatter_logzioFormat]
format={"additional_field": "value"}
```
*args=() arguments, by order*
- Your logz.io token
- Number of logs to keep in buffer before draining
- Time to wait before draining, regardless of the previouse setting
- Log type, for searching in logz.io (defaults to "python")
- Time to sleep between draining attempts (defaults to "3")
- Logz.io Listener address (defaults to "https://listener.logz.io:8071")
- Debug flag. Set to True, will print debug messages to stdout. (defaults to "False")
Please note, that you have to configure those parameters by this exact order.
i.e. you cannot set Debug to true, without configuring all of the previous parameters as well.
#### Code Example
```python
@ -68,8 +74,8 @@ LOGGING = {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
},
'json': {
'format': '{ "loggerName":"%(name)s", "functionName":"%(funcName)s", "lineNo":"%(lineno)d", "levelName":"%(levelname)s", "message":"%(message)s"}'
'logzioFormat': {
'format': '{"additional_field": "value"}'
}
},
'handlers': {
@ -81,12 +87,12 @@ LOGGING = {
'logzio': {
'class': 'logzio.handler.LogzioHandler',
'level': 'INFO',
'formatter': 'json',
'formatter': 'logzioFormat',
'token': 'token',
'url': 'https://listener.logz.io:8071'
'logs_drain_count': 10,
'logzio_type': "django",
'logs_drain_timeout': 5,
'logzio_type': "django"
'url': 'https://listener.logz.io:8071',
'debug': True
},
},
'loggers': {
@ -109,3 +115,11 @@ LOGGING = {
- logs_drain_timeout - Time to wait before draining, regardless of the previouse setting
- logzio_type - Log type, for searching in logz.io (defaults to "python")
- appname - Your django app
## Release Notes
- 2.0.0 - Production, stable release.
- *BREAKING* - Configuration option logs_drain_count was removed, and the order of the parameters has changed for better simplicity. Please review the parameters section above.
- Introducing the LogzioSender class, which is generic and can be used without the handler wrap to ship raw data to Logz.io. Just create a new instance of the class, and use the append() method.
- Simplifications and Robustness
- Full testing framework
- 1.X - Beta versions

tests/unit/__init__.py → __init__.py View File


+ 0
- 1
logzio/__init__.py View File

@ -1 +0,0 @@
__author__ = 'roiravhon'

+ 11
- 148
logzio/handler.py View File

@ -1,106 +1,23 @@
import datetime
import json
import logging
import logging.handlers
import requests
import traceback
import datetime
import json
import os
from threading import Event, Thread, Condition, Lock, enumerate
from time import sleep
from .sender import LogzioSender
class LogzioHandler(logging.Handler):
# Hold all logs buffered
logs = []
# Event for locking buffer additions while draining
buffer_event = Event()
# Condition to count log messages
logs_counter_condition = Condition()
# Lock to only drain logs once
drain_lock = Lock()
def __init__(self, token, logs_drain_count=100, logs_drain_timeout=10,
logzio_type="python", url="https://listener.logz.io:8071"):
def __init__(self, token, logzio_type="python", logs_drain_timeout=3,
url="https://listener.logz.io:8071", debug=False):
if token is "":
raise Exception("Logz.io Token must be provided")
logging.Handler.__init__(self)
self.logs_drain_count = logs_drain_count
self.logs_drain_timeout = logs_drain_timeout
self.logzio_type = logzio_type
self.url = "{0}/?token={1}".format(url, token)
self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
self.buffer_event.set()
# Create threads
timeout_thread = Thread(target=self.wait_to_timeout_and_drain)
counter_thread = Thread(target=self.count_logs_and_drain)
# And start them
timeout_thread.start()
counter_thread.start()
def wait_to_timeout_and_drain(self):
while True:
sleep(self.logs_drain_timeout)
if len(self.logs) > 0:
self.drain_messages()
if not self.is_main_thread_active():
# Signal the counter thread so it would exit as well
try:
self.logs_counter_condition.acquire()
self.logs_counter_condition.notify()
finally:
self.logs_counter_condition.release()
break
def count_logs_and_drain(self):
try:
# Acquire the condition
self.logs_counter_condition.acquire()
# Running indefinite
while True:
# Waiting for new log lines to come
self.logs_counter_condition.wait()
if not self.is_main_thread_active():
break
# Do we have enough logs to drain?
if len(self.logs) >= self.logs_drain_count:
self.drain_messages()
finally:
self.logs_counter_condition.release()
def add_to_buffer(self, message):
# Check if we are currently draining buffer so we wont loose logs
self.buffer_event.wait()
try:
# Acquire the condition
self.logs_counter_condition.acquire()
self.logs.append(json.dumps(message))
# Notify watcher for a new log coming in
self.logs_counter_condition.notify()
finally:
# Release the condition
self.logs_counter_condition.release()
self.logzio_sender = LogzioSender(token=token, url=url, logs_drain_timeout=logs_drain_timeout, debug=debug)
logging.Handler.__init__(self)
def format(self, record):
message = super(LogzioHandler, self).format(record)
@ -109,7 +26,7 @@ class LogzioHandler(logging.Handler):
except (TypeError, ValueError):
return message
def formatException(self, exc_info):
def format_exception(self, exc_info):
return '\n'.join(traceback.format_exception(*exc_info))
def format_message(self, message):
@ -122,11 +39,12 @@ class LogzioHandler(logging.Handler):
"path_name": message.pathname,
"log_level": message.levelname,
"type": self.logzio_type,
"message": message.msg,
"@timestamp": timestamp
}
if message.exc_info:
return_json["message"] = self.formatException(message.exc_info)
return_json["exception"] = self.format_exception(message.exc_info)
else:
formatted_message = self.format(message)
if isinstance(formatted_message, dict):
@ -136,60 +54,5 @@ class LogzioHandler(logging.Handler):
return return_json
def backup_logs(self, logs):
timestamp = datetime.datetime.now().strftime("%d%m%Y-%H%M%S")
print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp))
with open("logzio-failures-{0}.txt".format(timestamp), "a") as f:
f.writelines('\n'.join(logs))
def drain_messages(self):
try:
self.buffer_event.clear()
self.drain_lock.acquire()
# Copy buffer
temp_logs = list(self.logs)
self.logs = []
# Release the event
self.buffer_event.set()
# Not configurable from the outside
sleep_between_retries = 2
number_of_retries = 4
success_in_send = False
headers = {"Content-type": "text/plain"}
for current_try in range(number_of_retries):
try:
response = requests.post(self.url, headers=headers, data='\n'.join(temp_logs))
if response.status_code != 200: # 429 400, on 400 print stdout
if response.status_code == 400:
print("Got unexpected 400 code from logz.io, response: {0}".format(response.text))
self.backup_logs(temp_logs)
if response.status_code == 401:
print("You are not authorized with logz.io! dropping..")
break
except Exception as e:
print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e.message))
sleep(sleep_between_retries)
sleep_between_retries *= 2
else:
success_in_send = True
break
if not success_in_send:
# Write to file
self.backup_logs(temp_logs)
finally:
self.buffer_event.set()
self.drain_lock.release()
def emit(self, record):
self.add_to_buffer(self.format_message(record))
self.logzio_sender.append(self.format_message(record))

+ 116
- 0
logzio/sender.py View File

@ -0,0 +1,116 @@
# This class is responsible for handling all asynchronous Logz.io's communication
import sys
import requests
import json
from threading import Thread, enumerate
from datetime import datetime
from Queue import Queue
from time import sleep
MAX_BULK_SIZE_IN_BYTES = 3 * 1024 * 1024 # 3 MB
def backup_logs(logs):
timestamp = datetime.now().strftime("%d%m%Y-%H%M%S")
print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp))
with open("logzio-failures-{0}.txt".format(timestamp), "a") as f:
f.writelines('\n'.join(logs))
class LogzioSender:
def __init__(self, token, url="https://listener.logz.io:8071", logs_drain_timeout=5, debug=False):
self.token = token
self.url = "{0}/?token={1}".format(url, token)
self.logs_drain_timeout = logs_drain_timeout
self.debug = debug
# Function to see if the main thread is alive
self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
# Create a queue to hold logs
self.queue = Queue()
self.sending_thread = Thread(target=self._drain_queue)
self.sending_thread.daemon = False
self.sending_thread.name = "logzio-sending-thread"
self.sending_thread.start()
def append(self, logs_message):
# Queue lib is thread safe, no issue here
self.queue.put(json.dumps(logs_message))
def _debug(self, message):
if self.debug:
print(str(message))
def _drain_queue(self):
last_try = False
while not last_try:
# If main is exited, we should run one last time and try to remove all logs
if not self.is_main_thread_active():
self._debug("Identified quit of main thread, sending logs one last time")
last_try = True
try:
# Sending logs until queue is empty
while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size()
self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io")
# Not configurable from the outside
sleep_between_retries = 2
number_of_retries = 4
should_backup_to_disk = True
headers = {"Content-type": "text/plain"}
for current_try in range(number_of_retries):
should_retry = False
try:
response = requests.post(self.url, headers=headers, data='\n'.join(logs_list))
if response.status_code != 200:
if response.status_code == 400:
print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text))
should_backup_to_disk = False
break
if response.status_code == 401:
print("You are not authorized with Logz.io! Token OK? dropping logs...")
should_backup_to_disk = False
break
else:
print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text))
should_retry = True
else:
self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!")
should_backup_to_disk = False
break
except Exception as e:
print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e.message))
should_retry = True
if should_retry:
sleep(sleep_between_retries)
sleep_between_retries *= 2
if should_backup_to_disk:
# Write to file
print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.")
backup_logs(logs_list)
except Exception as e:
self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e))
if not last_try:
sleep(self.logs_drain_timeout)
def _get_messages_up_to_max_allowed_size(self):
logs_list = []
while not self.queue.empty():
logs_list.append(self.queue.get())
if sys.getsizeof(logs_list) >= MAX_BULK_SIZE_IN_BYTES:
break
return logs_list

+ 2
- 2
setup.py View File

@ -4,7 +4,7 @@ from setuptools import setup, find_packages
setup(
name="logzio-python-handler",
version='1.0.5',
version='2.0.0',
description="Logging handler to send logs to your Logz.io account with bulk SSL",
keywords="logging handler logz.io bulk https",
author="roiravhon",
@ -17,7 +17,7 @@ setup(
],
include_package_data=True,
classifiers=[
'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Programming Language :: Python :: 2.7'
]


+ 0
- 12
tests/__init__.py View File

@ -1,12 +0,0 @@
import os
import pkgutil
def load_tests(loader, suite, pattern):
for imp, modname, _ in pkgutil.walk_packages(__path__):
mod = imp.find_module(modname).load_module(modname)
for test in loader.loadTestsFromModule(mod):
print("Running TestCase: {}".format(modname))
suite.addTests(test)
print("=" * 70)
return suite

+ 0
- 0
tests/mockLogzioListener/__init__.py View File


+ 82
- 0
tests/mockLogzioListener/listener.py View File

@ -0,0 +1,82 @@
import socket
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from .logsList import logs_list
from .persistentFlags import persistent_flags
class ListenerHandler(BaseHTTPRequestHandler):
def do_POST(self):
try:
content_length = int(self.headers.get("Content-Length"))
all_logs = self.rfile.read(content_length).split('\n')
if len(all_logs) == 0:
self.send_response(400, "Bad Request")
return
for log in all_logs:
if log != "":
if persistent_flags.get_server_error():
self.send_response(500, "Issue!!!!!!!")
return
logs_list.list.append(log)
self.send_response(200, "Ok")
return
except IndexError:
self.send_response(400, "Bad Request")
return
class MockLogzioListener:
def __init__(self):
self.port = find_available_port()
self.host = "localhost"
self.server = HTTPServer((self.host, self.port), ListenerHandler)
self.listening_thread = Thread(target=self._start_listening)
self.listening_thread.daemon = True
self.listening_thread.name = "mock-logzio-listener"
self.listening_thread.start()
self.logs_list = logs_list.list
self.persistent_flags = persistent_flags
def _start_listening(self):
self.server.serve_forever()
def get_port(self):
return self.port
def get_host(self):
return self.host
def find_log(self, search_log):
for current_log in self.logs_list:
if search_log in current_log:
return True
return False
def get_number_of_logs(self):
return len(self.logs_list)
def clear_logs_buffer(self):
self.logs_list = []
def set_server_error(self):
self.persistent_flags.set_server_error()
def clear_server_error(self):
self.persistent_flags.clear_server_error()
def find_available_port():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", 0))
sock.listen(1)
port = sock.getsockname()[1]
sock.close()
return port

+ 9
- 0
tests/mockLogzioListener/logsList.py View File

@ -0,0 +1,9 @@
# Modules imported only once, this is effectively a singleton
class LogsList:
def __init__(self):
self.list = []
logs_list = LogsList()

+ 18
- 0
tests/mockLogzioListener/persistentFlags.py View File

@ -0,0 +1,18 @@
# Modules imported only once, this is effectively a singleton
class PersistentFlags:
def __init__(self):
self.server_error = False
def set_server_error(self):
self.server_error = True
def clear_server_error(self):
self.server_error = False
def get_server_error(self):
return self.server_error
persistent_flags = PersistentFlags()

tests/unit/handler_test.py → tests/test_logzioHandler.py View File


+ 108
- 0
tests/test_logzioSender.py View File

@ -0,0 +1,108 @@
import fnmatch
import logging.config
import os
import time
from unittest import TestCase
from .mockLogzioListener import listener
def _find(pattern, path):
result = []
for root, dirs, files in os.walk(path):
for name in files:
if fnmatch.fnmatch(name, pattern):
result.append(os.path.join(root, name))
break # Not descending recursively
return result
class TestLogzioSender(TestCase):
def setUp(self):
self.logzio_listener = listener.MockLogzioListener()
self.logzio_listener.clear_server_error()
self.logs_drain_timeout = 1
logging_configuration = {
"version": 1,
"formatters": {
"logzio": {
"format": '{"key": "value"}'
}
},
"handlers": {
"LogzioHandler": {
"class": "logzio.handler.LogzioHandler",
"formatter": "logzio",
"level": "DEBUG",
"token": "token",
'logzio_type': "type",
'logs_drain_timeout': self.logs_drain_timeout,
'url': "http://" + self.logzio_listener.get_host() + ":" + str(self.logzio_listener.get_port()),
'debug': True
}
},
"loggers": {
"test": {
"handlers": ["LogzioHandler"],
"level": "DEBUG"
}
}
}
logging.config.dictConfig(logging_configuration)
self.logger = logging.getLogger('test')
for curr_file in _find("logzio-failures-*.txt", "."):
os.remove(curr_file)
def test_simple_log_drain(self):
log_message = "Test simple log drain"
self.logger.info(log_message)
time.sleep(self.logs_drain_timeout * 2)
self.assertTrue(self.logzio_listener.find_log(log_message))
def test_multiple_lines_drain(self):
logs_num = 50
for counter in range(0, logs_num):
self.logger.info("Test " + str(counter))
time.sleep(self.logs_drain_timeout * 2)
for counter in range(0, logs_num):
self.logger.info("Test " + str(counter))
time.sleep(self.logs_drain_timeout * 2)
self.assertEqual(self.logzio_listener.get_number_of_logs(), logs_num * 2)
def test_server_failure(self):
log_message = "Failing log message"
self.logzio_listener.set_server_error()
self.logger.info(log_message)
time.sleep(self.logs_drain_timeout * 2)
self.assertFalse(self.logzio_listener.find_log(log_message))
self.logzio_listener.clear_server_error()
time.sleep(self.logs_drain_timeout * 2 * 4) # Longer, because of the retry
self.assertTrue(self.logzio_listener.find_log(log_message))
def test_local_file_backup(self):
log_message = "Backup to local filesystem"
self.logzio_listener.set_server_error()
self.logger.info(log_message)
# Make sure no file is present
self.assertEqual(len(_find("logzio-failures-*.txt", ".")), 0)
time.sleep(2 * 2 * 2 * 2 * 2) # All of the retries
failure_files = _find("logzio-failures-*.txt", ".")
self.assertEqual(len(failure_files), 1)
with open(failure_files[0], "r") as f:
line = f.readline()
self.assertTrue(log_message in line)

Loading…
Cancel
Save