Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.4 KiB

  1. import fnmatch
  2. import logging.config
  3. import os
  4. import time
  5. from unittest import TestCase
  6. from .mockLogzioListener import listener
  7. def _find(pattern, path):
  8. result = []
  9. for root, dirs, files in os.walk(path):
  10. for name in files:
  11. if fnmatch.fnmatch(name, pattern):
  12. result.append(os.path.join(root, name))
  13. break # Not descending recursively
  14. return result
  15. class TestLogzioSender(TestCase):
  16. def setUp(self):
  17. self.logzio_listener = listener.MockLogzioListener()
  18. self.logzio_listener.clear_server_error()
  19. self.logs_drain_timeout = 1
  20. logging_configuration = {
  21. "version": 1,
  22. "formatters": {
  23. "logzio": {
  24. "format": '{"key": "value"}'
  25. }
  26. },
  27. "handlers": {
  28. "LogzioHandler": {
  29. "class": "logzio.handler.LogzioHandler",
  30. "formatter": "logzio",
  31. "level": "DEBUG",
  32. "token": "token",
  33. 'logzio_type': "type",
  34. 'logs_drain_timeout': self.logs_drain_timeout,
  35. 'url': "http://" + self.logzio_listener.get_host() + ":" + str(self.logzio_listener.get_port()),
  36. 'debug': True
  37. }
  38. },
  39. "loggers": {
  40. "test": {
  41. "handlers": ["LogzioHandler"],
  42. "level": "DEBUG"
  43. }
  44. }
  45. }
  46. logging.config.dictConfig(logging_configuration)
  47. self.logger = logging.getLogger('test')
  48. for curr_file in _find("logzio-failures-*.txt", "."):
  49. os.remove(curr_file)
  50. def test_simple_log_drain(self):
  51. log_message = "Test simple log drain"
  52. self.logger.info(log_message)
  53. time.sleep(self.logs_drain_timeout * 2)
  54. self.assertTrue(self.logzio_listener.find_log(log_message))
  55. def test_multiple_lines_drain(self):
  56. logs_num = 50
  57. for counter in range(0, logs_num):
  58. self.logger.info("Test " + str(counter))
  59. time.sleep(self.logs_drain_timeout * 2)
  60. for counter in range(0, logs_num):
  61. self.logger.info("Test " + str(counter))
  62. time.sleep(self.logs_drain_timeout * 2)
  63. self.assertEqual(self.logzio_listener.get_number_of_logs(), logs_num * 2)
  64. def test_server_failure(self):
  65. log_message = "Failing log message"
  66. self.logzio_listener.set_server_error()
  67. self.logger.info(log_message)
  68. time.sleep(self.logs_drain_timeout * 2)
  69. self.assertFalse(self.logzio_listener.find_log(log_message))
  70. self.logzio_listener.clear_server_error()
  71. time.sleep(self.logs_drain_timeout * 2 * 4) # Longer, because of the retry
  72. self.assertTrue(self.logzio_listener.find_log(log_message))
  73. def test_local_file_backup(self):
  74. log_message = "Backup to local filesystem"
  75. self.logzio_listener.set_server_error()
  76. self.logger.info(log_message)
  77. # Make sure no file is present
  78. self.assertEqual(len(_find("logzio-failures-*.txt", ".")), 0)
  79. time.sleep(2 * 2 * 2 * 2 * 2) # All of the retries
  80. failure_files = _find("logzio-failures-*.txt", ".")
  81. self.assertEqual(len(failure_files), 1)
  82. with open(failure_files[0], "r") as f:
  83. line = f.readline()
  84. self.assertTrue(log_message in line)