import fnmatch import logging.config import os import time from unittest import TestCase from .mockLogzioListener import listener def _find(pattern, path): result = [] for root, dirs, files in os.walk(path): for name in files: if fnmatch.fnmatch(name, pattern): result.append(os.path.join(root, name)) break # Not descending recursively return result class TestLogzioSender(TestCase): def setUp(self): self.logzio_listener = listener.MockLogzioListener() self.logzio_listener.clear_logs_buffer() self.logzio_listener.clear_server_error() self.logs_drain_timeout = 1 logging_configuration = { "version": 1, "formatters": { "logzio": { "format": '{"key": "value"}' } }, "handlers": { "LogzioHandler": { "class": "logzio.handler.LogzioHandler", "formatter": "logzio", "level": "DEBUG", "token": "token", 'logzio_type': "type", 'logs_drain_timeout': self.logs_drain_timeout, 'url': "http://" + self.logzio_listener.get_host() + ":" + str(self.logzio_listener.get_port()), 'debug': True } }, "loggers": { "test": { "handlers": ["LogzioHandler"], "level": "DEBUG" } } } logging.config.dictConfig(logging_configuration) self.logger = logging.getLogger('test') for curr_file in _find("logzio-failures-*.txt", "."): os.remove(curr_file) def test_simple_log_drain(self): log_message = "Test simple log drain" self.logger.info(log_message) time.sleep(self.logs_drain_timeout * 2) self.assertTrue(self.logzio_listener.find_log(log_message)) def test_multiple_lines_drain(self): logs_num = 50 for counter in range(0, logs_num): self.logger.info("Test " + str(counter)) time.sleep(self.logs_drain_timeout * 2) for counter in range(0, logs_num): self.logger.info("Test " + str(counter)) time.sleep(self.logs_drain_timeout * 2) self.assertEqual(self.logzio_listener.get_number_of_logs(), logs_num * 2) def test_server_failure(self): log_message = "Failing log message" self.logzio_listener.set_server_error() self.logger.info(log_message) time.sleep(self.logs_drain_timeout * 2) self.assertFalse(self.logzio_listener.find_log(log_message)) self.logzio_listener.clear_server_error() time.sleep(self.logs_drain_timeout * 2 * 4) # Longer, because of the retry self.assertTrue(self.logzio_listener.find_log(log_message)) def test_local_file_backup(self): log_message = "Backup to local filesystem" self.logzio_listener.set_server_error() self.logger.info(log_message) # Make sure no file is present self.assertEqual(len(_find("logzio-failures-*.txt", ".")), 0) time.sleep(2 * 2 * 2 * 2 * 2) # All of the retries failure_files = _find("logzio-failures-*.txt", ".") self.assertEqual(len(failure_files), 1) with open(failure_files[0], "r") as f: line = f.readline() self.assertTrue(log_message in line) def test_local_file_backup_disabled(self): log_message = "Backup to local filesystem" self.logzio_listener.set_server_error() self.logger.handlers[0].logzio_sender.backup_logs = False self.logger.info(log_message) # Make sure no file is present self.assertEqual(len(_find("logzio-failures-*.txt", ".")), 0) time.sleep(2 * 2 * 2 * 2 * 2) # All of the retries # Make sure no file was created self.assertEqual(len(_find("logzio-failures-*.txt", ".")), 0) def test_can_send_after_fork(self): childpid = os.fork() child_log_message = 'logged from child process' parent_log_message = 'logged from parent process' if childpid == 0: # Log from the child process self.logger.info(child_log_message) time.sleep(self.logs_drain_timeout * 2) os._exit(0) # Wait for the child process to finish os.waitpid(childpid, 0) # log from the parent process self.logger.info(parent_log_message) time.sleep(self.logs_drain_timeout * 2) # Ensure listener receive all log messages self.assertTrue(self.logzio_listener.find_log(child_log_message)) self.assertTrue(self.logzio_listener.find_log(parent_log_message))