Browse Source

Merge pull request #20 from OspreyInformatics/master

Ensure sending thread is alive before appending log message
opensearch
Roi Rav-Hon 7 years ago
committed by GitHub
parent
commit
25318785a6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 1 deletions
  1. +1
    -0
      .gitignore
  2. +5
    -0
      logzio/sender.py
  3. +1
    -1
      tests/mockLogzioListener/listener.py
  4. +22
    -0
      tests/test_logzioSender.py

+ 1
- 0
.gitignore View File

@ -1,2 +1,3 @@
*.pyc *.pyc
*egg-info/

+ 5
- 0
logzio/sender.py View File

@ -34,13 +34,18 @@ class LogzioSender:
# Create a queue to hold logs # Create a queue to hold logs
self.queue = queue.Queue() self.queue = queue.Queue()
self._initialize_sending_thread()
def _initialize_sending_thread(self):
self.sending_thread = Thread(target=self._drain_queue) self.sending_thread = Thread(target=self._drain_queue)
self.sending_thread.daemon = False self.sending_thread.daemon = False
self.sending_thread.name = "logzio-sending-thread" self.sending_thread.name = "logzio-sending-thread"
self.sending_thread.start() self.sending_thread.start()
def append(self, logs_message): def append(self, logs_message):
if not self.sending_thread.is_alive():
self._initialize_sending_thread()
# Queue lib is thread safe, no issue here # Queue lib is thread safe, no issue here
self.queue.put(json.dumps(logs_message)) self.queue.put(json.dumps(logs_message))


+ 1
- 1
tests/mockLogzioListener/listener.py View File

@ -73,7 +73,7 @@ class MockLogzioListener:
return len(self.logs_list) return len(self.logs_list)
def clear_logs_buffer(self): def clear_logs_buffer(self):
self.logs_list = []
self.logs_list[:] = []
def set_server_error(self): def set_server_error(self):
self.persistent_flags.set_server_error() self.persistent_flags.set_server_error()


+ 22
- 0
tests/test_logzioSender.py View File

@ -21,6 +21,7 @@ def _find(pattern, path):
class TestLogzioSender(TestCase): class TestLogzioSender(TestCase):
def setUp(self): def setUp(self):
self.logzio_listener = listener.MockLogzioListener() self.logzio_listener = listener.MockLogzioListener()
self.logzio_listener.clear_logs_buffer()
self.logzio_listener.clear_server_error() self.logzio_listener.clear_server_error()
self.logs_drain_timeout = 1 self.logs_drain_timeout = 1
@ -106,3 +107,24 @@ class TestLogzioSender(TestCase):
with open(failure_files[0], "r") as f: with open(failure_files[0], "r") as f:
line = f.readline() line = f.readline()
self.assertTrue(log_message in line) self.assertTrue(log_message in line)
def test_can_send_after_fork(self):
childpid = os.fork()
child_log_message = 'logged from child process'
parent_log_message = 'logged from parent process'
if childpid == 0:
# Log from the child process
self.logger.info(child_log_message)
time.sleep(self.logs_drain_timeout * 2)
os._exit(0)
# Wait for the child process to finish
os.waitpid(childpid, 0)
# log from the parent process
self.logger.info(parent_log_message)
time.sleep(self.logs_drain_timeout * 2)
# Ensure listener receive all log messages
self.assertTrue(self.logzio_listener.find_log(child_log_message))
self.assertTrue(self.logzio_listener.find_log(parent_log_message))

Loading…
Cancel
Save