From 6b31517848182be4aacfef6b37f350b3e3d00c09 Mon Sep 17 00:00:00 2001 From: Roi Rav-Hon Date: Tue, 28 Mar 2017 18:53:50 +0300 Subject: [PATCH] Rewrite most of the handler, tests were introduced --- .cache/v/cache/lastfailed | 1 + .travis.yml | 12 ++ README.md | 46 +++-- tests/unit/__init__.py => __init__.py | 0 logzio/__init__.py | 1 - logzio/handler.py | 159 ++---------------- logzio/sender.py | 116 +++++++++++++ setup.py | 4 +- tests/__init__.py | 12 -- tests/mockLogzioListener/__init__.py | 0 tests/mockLogzioListener/listener.py | 82 +++++++++ tests/mockLogzioListener/logsList.py | 9 + tests/mockLogzioListener/persistentFlags.py | 18 ++ .../handler_test.py => test_logzioHandler.py} | 23 ++- tests/test_logzioSender.py | 108 ++++++++++++ 15 files changed, 399 insertions(+), 192 deletions(-) create mode 100644 .cache/v/cache/lastfailed create mode 100644 .travis.yml rename tests/unit/__init__.py => __init__.py (100%) create mode 100644 logzio/sender.py create mode 100644 tests/mockLogzioListener/__init__.py create mode 100644 tests/mockLogzioListener/listener.py create mode 100644 tests/mockLogzioListener/logsList.py create mode 100644 tests/mockLogzioListener/persistentFlags.py rename tests/{unit/handler_test.py => test_logzioHandler.py} (83%) create mode 100644 tests/test_logzioSender.py diff --git a/.cache/v/cache/lastfailed b/.cache/v/cache/lastfailed new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/.cache/v/cache/lastfailed @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f16d1b8 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,12 @@ +language: python +python: + - "2.6" + - "2.7" + - "3.2" + - "3.3" + - "3.4" + +install: + - pip install requests + +script: pytest \ No newline at end of file diff --git a/README.md b/README.md index 867417f..38d2459 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,12 @@ [![PyPI version](https://badge.fury.io/py/logzio-python-handler.svg)](https://badge.fury.io/py/logzio-python-handler) # The Logz.io Python Handler -This is a Python handler that sends logs in bulk over HTTPS to Logz.io. The handler uses an internal buffer, and you can choose the drain timeout as well as the number of messages to hold in the queue before the drain. Everything works in threads, so if the main program exists, the threads will continue to work until all logs are drained. - -**This is in BETA. We currently use this handler internally. We will provide tests soon** +This is a Python handler that sends logs in bulk over HTTPS to Logz.io. +The handler uses a subclass named LogzioSender (which can be used without this handler as well, to ship raw data). +The LogzioSender class opens a new Thread, that consumes from the logs queue. Each iteration (its frequency of which can be configured by the logs_drain_timeout parameter), will try to consume the queue in its entirety. +Logs will get divided into separate bulks, based on their size. +LogzioSender will check if the main thread is alive. In case the main thread quits, it will try to consume the queue one last time, and then exit. So your program can hang for a few seconds, until the logs are drained. +In case the logs failed to be sent to Logz.io after a couple of tries, they will be written to the local file system. You can later upload them to Logz.io using curl. ## Installation ```bash @@ -18,11 +21,11 @@ keys=LogzioHandler [handler_LogzioHandler] class=logzio.handler.LogzioHandler -formatter=jsonFormat -args=('token', 10, 20) +formatter=logzioFormat +args=('token', 'my_type') [formatters] -keys=jsonFormat +keys=logzioFormat [loggers] keys=root @@ -31,15 +34,18 @@ keys=root handlers=LogzioHandler level=INFO -[formatter_jsonFormat] -format={ "loggerName":"%(name)s", "functionName":"%(funcName)s", "lineNo":"%(lineno)d", "levelName":"%(levelname)s", "message":"%(message)s"} +[formatter_logzioFormat] +format={"additional_field": "value"} ``` *args=() arguments, by order* - Your logz.io token - - Number of logs to keep in buffer before draining - - Time to wait before draining, regardless of the previouse setting - Log type, for searching in logz.io (defaults to "python") + - Time to sleep between draining attempts (defaults to "3") - Logz.io Listener address (defaults to "https://listener.logz.io:8071") + - Debug flag. Set to True, will print debug messages to stdout. (defaults to "False") + + Please note, that you have to configure those parameters by this exact order. + i.e. you cannot set Debug to true, without configuring all of the previous parameters as well. #### Code Example ```python @@ -68,8 +74,8 @@ LOGGING = { 'verbose': { 'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s' }, - 'json': { - 'format': '{ "loggerName":"%(name)s", "functionName":"%(funcName)s", "lineNo":"%(lineno)d", "levelName":"%(levelname)s", "message":"%(message)s"}' + 'logzioFormat': { + 'format': '{"additional_field": "value"}' } }, 'handlers': { @@ -81,12 +87,12 @@ LOGGING = { 'logzio': { 'class': 'logzio.handler.LogzioHandler', 'level': 'INFO', - 'formatter': 'json', + 'formatter': 'logzioFormat', 'token': 'token', - 'url': 'https://listener.logz.io:8071' - 'logs_drain_count': 10, + 'logzio_type': "django", 'logs_drain_timeout': 5, - 'logzio_type': "django" + 'url': 'https://listener.logz.io:8071', + 'debug': True }, }, 'loggers': { @@ -109,3 +115,11 @@ LOGGING = { - logs_drain_timeout - Time to wait before draining, regardless of the previouse setting - logzio_type - Log type, for searching in logz.io (defaults to "python") - appname - Your django app + +## Release Notes +- 2.0.0 - Production, stable release. + - *BREAKING* - Configuration option logs_drain_count was removed, and the order of the parameters has changed for better simplicity. Please review the parameters section above. + - Introducing the LogzioSender class, which is generic and can be used without the handler wrap to ship raw data to Logz.io. Just create a new instance of the class, and use the append() method. + - Simplifications and Robustness + - Full testing framework +- 1.X - Beta versions \ No newline at end of file diff --git a/tests/unit/__init__.py b/__init__.py similarity index 100% rename from tests/unit/__init__.py rename to __init__.py diff --git a/logzio/__init__.py b/logzio/__init__.py index 4a211c1..e69de29 100644 --- a/logzio/__init__.py +++ b/logzio/__init__.py @@ -1 +0,0 @@ -__author__ = 'roiravhon' diff --git a/logzio/handler.py b/logzio/handler.py index c5a4842..1c19562 100644 --- a/logzio/handler.py +++ b/logzio/handler.py @@ -1,106 +1,23 @@ +import datetime +import json import logging import logging.handlers -import requests import traceback -import datetime -import json -import os -from threading import Event, Thread, Condition, Lock, enumerate -from time import sleep +from .sender import LogzioSender class LogzioHandler(logging.Handler): - - # Hold all logs buffered - logs = [] - - # Event for locking buffer additions while draining - buffer_event = Event() - - # Condition to count log messages - logs_counter_condition = Condition() - - # Lock to only drain logs once - drain_lock = Lock() - - def __init__(self, token, logs_drain_count=100, logs_drain_timeout=10, - logzio_type="python", url="https://listener.logz.io:8071"): + def __init__(self, token, logzio_type="python", logs_drain_timeout=3, + url="https://listener.logz.io:8071", debug=False): if token is "": raise Exception("Logz.io Token must be provided") - logging.Handler.__init__(self) - self.logs_drain_count = logs_drain_count - self.logs_drain_timeout = logs_drain_timeout self.logzio_type = logzio_type - self.url = "{0}/?token={1}".format(url, token) - - self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate()) - - self.buffer_event.set() - - # Create threads - timeout_thread = Thread(target=self.wait_to_timeout_and_drain) - counter_thread = Thread(target=self.count_logs_and_drain) - - # And start them - timeout_thread.start() - counter_thread.start() - - def wait_to_timeout_and_drain(self): - - while True: - sleep(self.logs_drain_timeout) - if len(self.logs) > 0: - self.drain_messages() - - if not self.is_main_thread_active(): - # Signal the counter thread so it would exit as well - try: - self.logs_counter_condition.acquire() - self.logs_counter_condition.notify() - finally: - self.logs_counter_condition.release() - break - - def count_logs_and_drain(self): - try: - # Acquire the condition - self.logs_counter_condition.acquire() - - # Running indefinite - while True: - - # Waiting for new log lines to come - self.logs_counter_condition.wait() - - if not self.is_main_thread_active(): - break - # Do we have enough logs to drain? - if len(self.logs) >= self.logs_drain_count: - self.drain_messages() - - finally: - self.logs_counter_condition.release() - - def add_to_buffer(self, message): - - # Check if we are currently draining buffer so we wont loose logs - self.buffer_event.wait() - - try: - # Acquire the condition - self.logs_counter_condition.acquire() - self.logs.append(json.dumps(message)) - - # Notify watcher for a new log coming in - self.logs_counter_condition.notify() - - finally: - # Release the condition - self.logs_counter_condition.release() + self.logzio_sender = LogzioSender(token=token, url=url, logs_drain_timeout=logs_drain_timeout, debug=debug) + logging.Handler.__init__(self) def format(self, record): message = super(LogzioHandler, self).format(record) @@ -109,7 +26,7 @@ class LogzioHandler(logging.Handler): except (TypeError, ValueError): return message - def formatException(self, exc_info): + def format_exception(self, exc_info): return '\n'.join(traceback.format_exception(*exc_info)) def format_message(self, message): @@ -122,11 +39,12 @@ class LogzioHandler(logging.Handler): "path_name": message.pathname, "log_level": message.levelname, "type": self.logzio_type, + "message": message.msg, "@timestamp": timestamp } if message.exc_info: - return_json["message"] = self.formatException(message.exc_info) + return_json["exception"] = self.format_exception(message.exc_info) else: formatted_message = self.format(message) if isinstance(formatted_message, dict): @@ -136,60 +54,5 @@ class LogzioHandler(logging.Handler): return return_json - def backup_logs(self, logs): - timestamp = datetime.datetime.now().strftime("%d%m%Y-%H%M%S") - print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp)) - with open("logzio-failures-{0}.txt".format(timestamp), "a") as f: - f.writelines('\n'.join(logs)) - - def drain_messages(self): - try: - self.buffer_event.clear() - self.drain_lock.acquire() - - # Copy buffer - temp_logs = list(self.logs) - self.logs = [] - - # Release the event - self.buffer_event.set() - - # Not configurable from the outside - sleep_between_retries = 2 - number_of_retries = 4 - - success_in_send = False - headers = {"Content-type": "text/plain"} - - for current_try in range(number_of_retries): - try: - response = requests.post(self.url, headers=headers, data='\n'.join(temp_logs)) - - if response.status_code != 200: # 429 400, on 400 print stdout - if response.status_code == 400: - - print("Got unexpected 400 code from logz.io, response: {0}".format(response.text)) - self.backup_logs(temp_logs) - - if response.status_code == 401: - print("You are not authorized with logz.io! dropping..") - break - except Exception as e: - print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e.message)) - - sleep(sleep_between_retries) - sleep_between_retries *= 2 - else: - success_in_send = True - break - - if not success_in_send: - # Write to file - self.backup_logs(temp_logs) - - finally: - self.buffer_event.set() - self.drain_lock.release() - def emit(self, record): - self.add_to_buffer(self.format_message(record)) + self.logzio_sender.append(self.format_message(record)) diff --git a/logzio/sender.py b/logzio/sender.py new file mode 100644 index 0000000..371d23d --- /dev/null +++ b/logzio/sender.py @@ -0,0 +1,116 @@ +# This class is responsible for handling all asynchronous Logz.io's communication +import sys +import requests +import json +from threading import Thread, enumerate +from datetime import datetime +from Queue import Queue +from time import sleep + + +MAX_BULK_SIZE_IN_BYTES = 3 * 1024 * 1024 # 3 MB + + +def backup_logs(logs): + timestamp = datetime.now().strftime("%d%m%Y-%H%M%S") + print("Backing up your logs to logzio-failures-{0}.txt".format(timestamp)) + with open("logzio-failures-{0}.txt".format(timestamp), "a") as f: + f.writelines('\n'.join(logs)) + + +class LogzioSender: + def __init__(self, token, url="https://listener.logz.io:8071", logs_drain_timeout=5, debug=False): + self.token = token + self.url = "{0}/?token={1}".format(url, token) + self.logs_drain_timeout = logs_drain_timeout + self.debug = debug + + # Function to see if the main thread is alive + self.is_main_thread_active = lambda: any((i.name == "MainThread") and i.is_alive() for i in enumerate()) + + # Create a queue to hold logs + self.queue = Queue() + + self.sending_thread = Thread(target=self._drain_queue) + self.sending_thread.daemon = False + self.sending_thread.name = "logzio-sending-thread" + self.sending_thread.start() + + def append(self, logs_message): + # Queue lib is thread safe, no issue here + self.queue.put(json.dumps(logs_message)) + + def _debug(self, message): + if self.debug: + print(str(message)) + + def _drain_queue(self): + last_try = False + + while not last_try: + # If main is exited, we should run one last time and try to remove all logs + if not self.is_main_thread_active(): + self._debug("Identified quit of main thread, sending logs one last time") + last_try = True + + try: + # Sending logs until queue is empty + while not self.queue.empty(): + logs_list = self._get_messages_up_to_max_allowed_size() + self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io") + + # Not configurable from the outside + sleep_between_retries = 2 + number_of_retries = 4 + + should_backup_to_disk = True + headers = {"Content-type": "text/plain"} + + for current_try in range(number_of_retries): + should_retry = False + try: + response = requests.post(self.url, headers=headers, data='\n'.join(logs_list)) + if response.status_code != 200: + if response.status_code == 400: + print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text)) + should_backup_to_disk = False + break + + if response.status_code == 401: + print("You are not authorized with Logz.io! Token OK? dropping logs...") + should_backup_to_disk = False + break + else: + print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text)) + should_retry = True + else: + self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!") + should_backup_to_disk = False + break + + except Exception as e: + print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e.message)) + should_retry = True + + if should_retry: + sleep(sleep_between_retries) + sleep_between_retries *= 2 + + if should_backup_to_disk: + # Write to file + print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.") + backup_logs(logs_list) + + except Exception as e: + self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e)) + + if not last_try: + sleep(self.logs_drain_timeout) + + def _get_messages_up_to_max_allowed_size(self): + logs_list = [] + while not self.queue.empty(): + logs_list.append(self.queue.get()) + if sys.getsizeof(logs_list) >= MAX_BULK_SIZE_IN_BYTES: + break + return logs_list diff --git a/setup.py b/setup.py index 0cfbb12..6e3559d 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import setup, find_packages setup( name="logzio-python-handler", - version='1.0.5', + version='2.0.0', description="Logging handler to send logs to your Logz.io account with bulk SSL", keywords="logging handler logz.io bulk https", author="roiravhon", @@ -17,7 +17,7 @@ setup( ], include_package_data=True, classifiers=[ - 'Development Status :: 4 - Beta', + 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', 'Programming Language :: Python :: 2.7' ] diff --git a/tests/__init__.py b/tests/__init__.py index 7152ed1..e69de29 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,12 +0,0 @@ -import os -import pkgutil - -def load_tests(loader, suite, pattern): - for imp, modname, _ in pkgutil.walk_packages(__path__): - mod = imp.find_module(modname).load_module(modname) - for test in loader.loadTestsFromModule(mod): - print("Running TestCase: {}".format(modname)) - suite.addTests(test) - - print("=" * 70) - return suite diff --git a/tests/mockLogzioListener/__init__.py b/tests/mockLogzioListener/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/mockLogzioListener/listener.py b/tests/mockLogzioListener/listener.py new file mode 100644 index 0000000..755b2e1 --- /dev/null +++ b/tests/mockLogzioListener/listener.py @@ -0,0 +1,82 @@ +import socket +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +from threading import Thread +from .logsList import logs_list +from .persistentFlags import persistent_flags + + +class ListenerHandler(BaseHTTPRequestHandler): + def do_POST(self): + try: + content_length = int(self.headers.get("Content-Length")) + all_logs = self.rfile.read(content_length).split('\n') + if len(all_logs) == 0: + self.send_response(400, "Bad Request") + return + + for log in all_logs: + if log != "": + if persistent_flags.get_server_error(): + self.send_response(500, "Issue!!!!!!!") + return + + logs_list.list.append(log) + + self.send_response(200, "Ok") + return + + except IndexError: + self.send_response(400, "Bad Request") + return + + +class MockLogzioListener: + def __init__(self): + self.port = find_available_port() + self.host = "localhost" + + self.server = HTTPServer((self.host, self.port), ListenerHandler) + + self.listening_thread = Thread(target=self._start_listening) + self.listening_thread.daemon = True + self.listening_thread.name = "mock-logzio-listener" + self.listening_thread.start() + self.logs_list = logs_list.list + self.persistent_flags = persistent_flags + + def _start_listening(self): + self.server.serve_forever() + + def get_port(self): + return self.port + + def get_host(self): + return self.host + + def find_log(self, search_log): + for current_log in self.logs_list: + if search_log in current_log: + return True + + return False + + def get_number_of_logs(self): + return len(self.logs_list) + + def clear_logs_buffer(self): + self.logs_list = [] + + def set_server_error(self): + self.persistent_flags.set_server_error() + + def clear_server_error(self): + self.persistent_flags.clear_server_error() + + +def find_available_port(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(("", 0)) + sock.listen(1) + port = sock.getsockname()[1] + sock.close() + return port diff --git a/tests/mockLogzioListener/logsList.py b/tests/mockLogzioListener/logsList.py new file mode 100644 index 0000000..9732487 --- /dev/null +++ b/tests/mockLogzioListener/logsList.py @@ -0,0 +1,9 @@ +# Modules imported only once, this is effectively a singleton + + +class LogsList: + def __init__(self): + self.list = [] + + +logs_list = LogsList() diff --git a/tests/mockLogzioListener/persistentFlags.py b/tests/mockLogzioListener/persistentFlags.py new file mode 100644 index 0000000..b21e995 --- /dev/null +++ b/tests/mockLogzioListener/persistentFlags.py @@ -0,0 +1,18 @@ +# Modules imported only once, this is effectively a singleton + + +class PersistentFlags: + + def __init__(self): + self.server_error = False + + def set_server_error(self): + self.server_error = True + + def clear_server_error(self): + self.server_error = False + + def get_server_error(self): + return self.server_error + +persistent_flags = PersistentFlags() diff --git a/tests/unit/handler_test.py b/tests/test_logzioHandler.py similarity index 83% rename from tests/unit/handler_test.py rename to tests/test_logzioHandler.py index 09c77d4..89e67b2 100644 --- a/tests/unit/handler_test.py +++ b/tests/test_logzioHandler.py @@ -1,18 +1,17 @@ -import ast -import unittest +import os +from unittest import TestCase + import logging + import sys + import re -import os from logzio.handler import LogzioHandler -def dummy_drain_messages(): - pass -class TestHandler(unittest.TestCase): +class TestLogzioHandler(TestCase): def setUp(self): self.handler = LogzioHandler('moo') - self.handler.drain_messages = dummy_drain_messages; def test_json(self): formatter = logging.Formatter( @@ -79,7 +78,6 @@ class TestHandler(unittest.TestCase): ) def test_exc(self): - exc_info = None try: raise ValueError("oops.") except: @@ -99,8 +97,8 @@ class TestHandler(unittest.TestCase): formatted_message = self.handler.format_message(record) formatted_message["@timestamp"] = None - formatted_message["message"] = formatted_message["message"].replace(os.path.abspath(__file__), "") - formatted_message["message"] = re.sub(r", line \d+", "", formatted_message["message"]) + formatted_message["exception"] = formatted_message["exception"].replace(os.path.abspath(__file__), "") + formatted_message["exception"] = re.sub(r", line \d+", "", formatted_message["exception"]) self.assertDictEqual( { @@ -108,12 +106,11 @@ class TestHandler(unittest.TestCase): 'line_number': 10, 'log_level': 'NOTSET', 'logger': 'my-logger', - 'message': 'Traceback (most recent call last):\n\n File "", in test_exc\n raise ValueError("oops.")\n\nValueError: oops.\n', + 'message': 'this is a test: moo.', + 'exception': 'Traceback (most recent call last):\n\n File "", in test_exc\n raise ValueError("oops.")\n\nValueError: oops.\n', 'path_name': 'handler_test.py', 'type': 'python' }, formatted_message ) -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_logzioSender.py b/tests/test_logzioSender.py new file mode 100644 index 0000000..0db4c8e --- /dev/null +++ b/tests/test_logzioSender.py @@ -0,0 +1,108 @@ +import fnmatch +import logging.config +import os +import time +from unittest import TestCase + +from .mockLogzioListener import listener + + +def _find(pattern, path): + result = [] + for root, dirs, files in os.walk(path): + for name in files: + if fnmatch.fnmatch(name, pattern): + result.append(os.path.join(root, name)) + + break # Not descending recursively + return result + + +class TestLogzioSender(TestCase): + def setUp(self): + self.logzio_listener = listener.MockLogzioListener() + self.logzio_listener.clear_server_error() + self.logs_drain_timeout = 1 + + logging_configuration = { + "version": 1, + "formatters": { + "logzio": { + "format": '{"key": "value"}' + } + }, + "handlers": { + "LogzioHandler": { + "class": "logzio.handler.LogzioHandler", + "formatter": "logzio", + "level": "DEBUG", + "token": "token", + 'logzio_type': "type", + 'logs_drain_timeout': self.logs_drain_timeout, + 'url': "http://" + self.logzio_listener.get_host() + ":" + str(self.logzio_listener.get_port()), + 'debug': True + } + }, + "loggers": { + "test": { + "handlers": ["LogzioHandler"], + "level": "DEBUG" + } + } + } + + logging.config.dictConfig(logging_configuration) + self.logger = logging.getLogger('test') + + for curr_file in _find("logzio-failures-*.txt", "."): + os.remove(curr_file) + + def test_simple_log_drain(self): + log_message = "Test simple log drain" + self.logger.info(log_message) + time.sleep(self.logs_drain_timeout * 2) + self.assertTrue(self.logzio_listener.find_log(log_message)) + + def test_multiple_lines_drain(self): + logs_num = 50 + for counter in range(0, logs_num): + self.logger.info("Test " + str(counter)) + time.sleep(self.logs_drain_timeout * 2) + + for counter in range(0, logs_num): + self.logger.info("Test " + str(counter)) + time.sleep(self.logs_drain_timeout * 2) + + self.assertEqual(self.logzio_listener.get_number_of_logs(), logs_num * 2) + + def test_server_failure(self): + log_message = "Failing log message" + self.logzio_listener.set_server_error() + self.logger.info(log_message) + + time.sleep(self.logs_drain_timeout * 2) + + self.assertFalse(self.logzio_listener.find_log(log_message)) + + self.logzio_listener.clear_server_error() + + time.sleep(self.logs_drain_timeout * 2 * 4) # Longer, because of the retry + + self.assertTrue(self.logzio_listener.find_log(log_message)) + + def test_local_file_backup(self): + log_message = "Backup to local filesystem" + self.logzio_listener.set_server_error() + self.logger.info(log_message) + + # Make sure no file is present + self.assertEqual(len(_find("logzio-failures-*.txt", ".")), 0) + + time.sleep(2 * 2 * 2 * 2 * 2) # All of the retries + + failure_files = _find("logzio-failures-*.txt", ".") + self.assertEqual(len(failure_files), 1) + + with open(failure_files[0], "r") as f: + line = f.readline() + self.assertTrue(log_message in line)