Browse Source

Merge pull request #19 from nir0s/various-optimizations

Various optimizations
opensearch
Roi Rav-Hon 6 years ago
committed by GitHub
parent
commit
a417ce51ea
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 229 additions and 78 deletions
  1. +58
    -2
      .gitignore
  2. +21
    -9
      .travis.yml
  3. +22
    -13
      README.md
  4. +2
    -0
      logzio/exceptions.py
  5. +33
    -22
      logzio/handler.py
  6. +9
    -0
      logzio/logger.py
  7. +61
    -32
      logzio/sender.py
  8. +23
    -0
      tox.ini

+ 58
- 2
.gitignore View File

@ -1,3 +1,59 @@
*.pyc
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*egg-info/
# C extensions
*.so
# Distribution / packaging
.Python
/env/
/bin/
/build/
/build_docs/
/develop-eggs/
/dist/
/eggs/
/lib/
/lib64/
/parts/
/sdist/
/var/
/*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
/htmlcov/
/.tox/
.coverage
.cache
nosetests.xml
coverage.xml
.pytest_cache/
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Rope
.ropeproject
# Django stuff:
*.log
*.pot
# Sphinx documentation
/docs/_build/
*.iml
*COMMIT_MSG

+ 21
- 9
.travis.yml View File

@ -1,12 +1,24 @@
sudo: false
language: python
python:
- "2.7"
- "3.3"
- "3.4"
- "3.5"
- "3.6"
install:
- pip install requests future
matrix:
include:
- python: 2.7
env: TOXENV=flake8
- python: 3.6
env: TOXENV=py3flake8
- python: 2.7
env: TOXENV=py27
- python: 3.3
env: TOXENV=py33
- python: 3.4
env: TOXENV=py34
- python: 3.5
env: TOXENV=py35
- python: 3.6
env: TOXENV=py36
script: py.test
install:
- pip install tox
script:
- tox

+ 22
- 13
README.md View File

@ -1,11 +1,11 @@
[![PyPI version](https://badge.fury.io/py/logzio-python-handler.svg)](https://badge.fury.io/py/logzio-python-handler) [![Build Status](https://travis-ci.org/logzio/logzio-python-handler.svg?branch=master)](https://travis-ci.org/logzio/logzio-python-handler)
# The Logz.io Python Handler
This is a Python handler that sends logs in bulk over HTTPS to Logz.io.
The handler uses a subclass named LogzioSender (which can be used without this handler as well, to ship raw data).
The LogzioSender class opens a new Thread, that consumes from the logs queue. Each iteration (its frequency of which can be configured by the logs_drain_timeout parameter), will try to consume the queue in its entirety.
Logs will get divided into separate bulks, based on their size.
LogzioSender will check if the main thread is alive. In case the main thread quits, it will try to consume the queue one last time, and then exit. So your program can hang for a few seconds, until the logs are drained.
This is a Python handler that sends logs in bulk over HTTPS to Logz.io.
The handler uses a subclass named LogzioSender (which can be used without this handler as well, to ship raw data).
The LogzioSender class opens a new Thread, that consumes from the logs queue. Each iteration (its frequency of which can be configured by the logs_drain_timeout parameter), will try to consume the queue in its entirety.
Logs will get divided into separate bulks, based on their size.
LogzioSender will check if the main thread is alive. In case the main thread quits, it will try to consume the queue one last time, and then exit. So your program can hang for a few seconds, until the logs are drained.
In case the logs failed to be sent to Logz.io after a couple of tries, they will be written to the local file system. You can later upload them to Logz.io using curl.
## Installation
@ -20,9 +20,18 @@ Travis CI will build this handler and test against:
- "3.4"
- "3.5"
- "3.6"
We can't ensure compatibility to any other version, as we can't test it automatically.
To run tests:
```bash
$ pip install tox
$ tox
...
```
## Python configuration
#### Config File
```
@ -53,8 +62,8 @@ format={"additional_field": "value"}
- Time to sleep between draining attempts (defaults to "3")
- Logz.io Listener address (defaults to "https://listener.logz.io:8071")
- Debug flag. Set to True, will print debug messages to stdout. (defaults to "False")
Please note, that you have to configure those parameters by this exact order.
Please note, that you have to configure those parameters by this exact order.
i.e. you cannot set Debug to true, without configuring all of the previous parameters as well.
#### Code Example
@ -76,10 +85,10 @@ except:
```
#### Extra Fields
In case you need to dynamic metadata to your logger, other then the constant metadata from the formatter, you can use the "extra" parameter.
All key values in the dictionary passed in "extra" will be presented in Logz.io as new fields in the log you are sending.
Please note, that you cannot override default fields by the python logger (i.e. lineno, thread, etc..)
For example:
In case you need to dynamic metadata to your logger, other then the constant metadata from the formatter, you can use the "extra" parameter.
All key values in the dictionary passed in "extra" will be presented in Logz.io as new fields in the log you are sending.
Please note, that you cannot override default fields by the python logger (i.e. lineno, thread, etc..)
For example:
```
@ -145,7 +154,7 @@ LOGGING = {
- 2.0.3 - Fix bug that consumed more logs while draining than Logz.io's bulk limit
- 2.0.2 - Support for formatted messages (Thanks @johnraz!)
- 2.0.1 - Added __all__ to __init__.py, so support * imports
- 2.0.0 - Production, stable release.
- 2.0.0 - Production, stable release.
- *BREAKING* - Configuration option logs_drain_count was removed, and the order of the parameters has changed for better simplicity. Please review the parameters section above.
- Introducing the LogzioSender class, which is generic and can be used without the handler wrap to ship raw data to Logz.io. Just create a new instance of the class, and use the append() method.
- Simplifications and Robustness


+ 2
- 0
logzio/exceptions.py View File

@ -0,0 +1,2 @@
class LogzioException(Exception):
pass

+ 33
- 22
logzio/handler.py View File

@ -1,36 +1,47 @@
import datetime
import sys
import json
import logging
import logging.handlers
import datetime
import traceback
import sys
import logging.handlers
from .sender import LogzioSender
from .exceptions import LogzioException
class LogzioHandler(logging.Handler):
def __init__(self, token, logzio_type="python", logs_drain_timeout=3,
url="https://listener.logz.io:8071", debug=False):
def __init__(self,
token,
logzio_type="python",
logs_drain_timeout=3,
url="https://listener.logz.io:8071",
debug=False):
if token is "":
raise Exception("Logz.io Token must be provided")
if not token:
raise LogzioException('Logz.io Token must be provided')
self.logzio_type = logzio_type
self.logzio_sender = LogzioSender(token=token, url=url, logs_drain_timeout=logs_drain_timeout, debug=debug)
self.logzio_sender = LogzioSender(
token=token,
url=url,
logs_drain_timeout=logs_drain_timeout,
debug=debug)
logging.Handler.__init__(self)
def extra_fields(self, message):
not_allowed_keys = (
'args', 'asctime', 'created', 'exc_info', 'stack_info', 'exc_text', 'filename',
'funcName', 'levelname', 'levelno', 'lineno', 'module',
'args', 'asctime', 'created', 'exc_info', 'stack_info', 'exc_text',
'filename', 'funcName', 'levelname', 'levelno', 'lineno', 'module',
'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
'processName', 'relativeCreated', 'thread', 'threadName')
if sys.version_info < (3, 0):
var_type = (basestring, bool, dict, float, int, long, list, type(None))
# long and basestring don't exist in py3 so, NOQA
var_type = (basestring, bool, dict, float, # NOQA
int, long, list, type(None)) # NOQA
else:
var_type = (str, bool, dict, float, int, list, type(None))
@ -60,29 +71,29 @@ class LogzioHandler(logging.Handler):
def format_message(self, message):
now = datetime.datetime.utcnow()
timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z"
timestamp = now.strftime('%Y-%m-%dT%H:%M:%S') + \
'.%03d' % (now.microsecond / 1000) + 'Z'
return_json = {
"logger": message.name,
"line_number": message.lineno,
"path_name": message.pathname,
"log_level": message.levelname,
"type": self.logzio_type,
"message": message.getMessage(),
"@timestamp": timestamp
'logger': message.name,
'line_number': message.lineno,
'path_name': message.pathname,
'log_level': message.levelname,
'type': self.logzio_type,
'message': message.getMessage(),
'@timestamp': timestamp
}
if message.exc_info:
return_json["exception"] = self.format_exception(message.exc_info)
return_json['exception'] = self.format_exception(message.exc_info)
else:
formatted_message = self.format(message)
return_json.update(self.extra_fields(message))
if isinstance(formatted_message, dict):
return_json.update(formatted_message)
else:
return_json["message"] = formatted_message
return_json['message'] = formatted_message
return return_json


+ 9
- 0
logzio/logger.py View File

@ -0,0 +1,9 @@
import sys
import logging
def get_logger(debug):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG if debug else logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
return logger

+ 61
- 32
logzio/sender.py View File

@ -1,36 +1,46 @@
# This class is responsible for handling all asynchronous Logz.io's communication
# This class is responsible for handling all asynchronous Logz.io's
# communication
import sys
import requests
import json
from threading import Thread, enumerate
from datetime import datetime
from time import sleep
from datetime import datetime
from threading import Thread, enumerate
import requests
from .logger import get_logger
if sys.version[0] == '2':
import Queue as queue
else:
import queue as queue
MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB
def backup_logs(logs):
timestamp = datetime.now().strftime("%d%m%Y-%H%M%S")
print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp))
with open("logzio-failures-{0}.txt".format(timestamp), "a") as f:
def backup_logs(logs, logger):
timestamp = datetime.now().strftime('%d%m%Y-%H%M%S')
logger.info(
'Backing up your logs to logzio-failures-%s.txt', timestamp)
with open('logzio-failures-{}.txt'.format(timestamp), 'a') as f:
f.writelines('\n'.join(logs))
class LogzioSender:
def __init__(self, token, url="https://listener.logz.io:8071", logs_drain_timeout=5, debug=False):
def __init__(self,
token, url='https://listener.logz.io:8071',
logs_drain_timeout=5,
debug=False):
self.token = token
self.url = "{0}/?token={1}".format(url, token)
self.url = '{}/?token={}'.format(url, token)
self.logs_drain_timeout = logs_drain_timeout
self.debug = debug
self.logger = get_logger(debug)
# Function to see if the main thread is alive
self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate())
self.is_main_thread_active = lambda: any(
(i.name == 'MainThread') and i.is_alive() for i in enumerate())
# Create a queue to hold logs
self.queue = queue.Queue()
@ -39,7 +49,7 @@ class LogzioSender:
def _initialize_sending_thread(self):
self.sending_thread = Thread(target=self._drain_queue)
self.sending_thread.daemon = False
self.sending_thread.name = "logzio-sending-thread"
self.sending_thread.name = 'logzio-sending-thread'
self.sending_thread.start()
def append(self, logs_message):
@ -52,24 +62,24 @@ class LogzioSender:
def flush(self):
self._flush_queue()
def _debug(self, message):
if self.debug:
print(str(message))
def _drain_queue(self):
last_try = False
while not last_try:
# If main is exited, we should run one last time and try to remove all logs
# If main is exited, we should run one last time and try to remove
# all logs
if not self.is_main_thread_active():
self._debug("Identified quit of main thread, sending logs one last time")
self.logger.debug(
'Identified quit of main thread, sending logs one '
'last time')
last_try = True
try:
self._flush_queue()
except Exception as e:
self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e))
self.logger.debug(
'Unexpected exception while draining queue to Logz.io, '
'swallowing. Exception: %s', e)
if not last_try:
sleep(self.logs_drain_timeout)
@ -78,7 +88,8 @@ class LogzioSender:
# Sending logs until queue is empty
while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size()
self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io")
self.logger.debug(
'Starting to drain %s logs to Logz.io', len(logs_list))
# Not configurable from the outside
sleep_between_retries = 2
@ -90,27 +101,43 @@ class LogzioSender:
for current_try in range(number_of_retries):
should_retry = False
try:
response = requests.post(self.url, headers=headers, data='\n'.join(logs_list))
response = requests.post(
self.url, headers=headers, data='\n'.join(logs_list))
if response.status_code != 200:
if response.status_code == 400:
print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text))
self.logger.info(
'Got 400 code from Logz.io. This means that '
'some of your logs are too big, or badly '
'formatted. response: %s', response.text)
should_backup_to_disk = False
break
if response.status_code == 401:
print("You are not authorized with Logz.io! Token OK? dropping logs...")
self.logger.info(
'You are not authorized with Logz.io! Token '
'OK? dropping logs...')
should_backup_to_disk = False
break
else:
print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text))
self.logger.info(
'Got %s while sending logs to Logz.io, '
'Try (%s/%s). Response: %s',
response.status_code,
current_try + 1,
number_of_retries,
response.text)
should_retry = True
else:
self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!")
self.logger.debug(
'Successfully sent bulk of %s logs to '
'Logz.io!', len(logs_list))
should_backup_to_disk = False
break
except Exception as e:
print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e))
self.logger.error(
'Got exception while sending logs to Logz.io, '
'Try (%s/%s). Message: %s',
current_try + 1, number_of_retries, e)
should_retry = True
if should_retry:
@ -119,8 +146,10 @@ class LogzioSender:
if should_backup_to_disk:
# Write to file
print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.")
backup_logs(logs_list)
self.logger.info(
'Could not send logs to Logz.io after %s tries, '
'backing up to local file system', number_of_retries)
backup_logs(logs_list, self.logger)
def _get_messages_up_to_max_allowed_size(self):
logs_list = []


+ 23
- 0
tox.ini View File

@ -0,0 +1,23 @@
[tox]
minversion = 1.7.2
envlist = flake8, py3flake8, py27, py33, py34, py35, py36
skip_missing_interpreters = true
[testenv]
deps =
future
requests
pytest
pytest-cov
passenv = CI TRAVIS TRAVIS_*
commands = pytest --cov-report term-missing --cov logzio tests -v
[testenv:flake8]
basepython = python2.7
deps = flake8
commands = flake8 logzio
[testenv:py3flake8]
basepython = python3.6
deps = flake8
commands = flake8 logzio

Loading…
Cancel
Save