From 066f73264d7aed408bd923782b91b7ad063454c0 Mon Sep 17 00:00:00 2001 From: Jotham Apaloo Date: Mon, 19 Feb 2018 12:17:38 -0800 Subject: [PATCH 1/5] Make logzio handler log correctly after os.fork() --- .gitignore | 1 + logzio/sender.py | 4 ++++ tests/test_logzioSender.py | 13 +++++++++++++ 3 files changed, 18 insertions(+) diff --git a/.gitignore b/.gitignore index 2f78cf5..6416bf8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.pyc +*egg-info/ diff --git a/logzio/sender.py b/logzio/sender.py index 7975f63..3a40dd0 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -34,7 +34,9 @@ class LogzioSender: # Create a queue to hold logs self.queue = queue.Queue() + self._initialize_sending_thread() + def _initialize_sending_thread(self): self.sending_thread = Thread(target=self._drain_queue) self.sending_thread.daemon = False self.sending_thread.name = "logzio-sending-thread" @@ -42,6 +44,8 @@ class LogzioSender: def append(self, logs_message): # Queue lib is thread safe, no issue here + if not self.sending_thread.is_alive(): + self._initialize_sending_thread() self.queue.put(json.dumps(logs_message)) def flush(self): diff --git a/tests/test_logzioSender.py b/tests/test_logzioSender.py index 0db4c8e..d7b136c 100644 --- a/tests/test_logzioSender.py +++ b/tests/test_logzioSender.py @@ -106,3 +106,16 @@ class TestLogzioSender(TestCase): with open(failure_files[0], "r") as f: line = f.readline() self.assertTrue(log_message in line) + + def test_can_send_after_fork(self): + childpid = os.fork() + log_message = 'logged from child process' + + if childpid == 0: + # Log from the child process + self.logger.info(log_message) + time.sleep(self.logs_drain_timeout * 2) + os._exit(0) + # Wait for the child process to finish + os.waitpid(childpid, 0) + self.assertTrue(self.logzio_listener.find_log(log_message)) From fcac03ae7f0d72060b81df4b7f4390e5528329c8 Mon Sep 17 00:00:00 2001 From: Jotham Apaloo Date: Mon, 19 Feb 2018 12:20:15 -0800 Subject: [PATCH 2/5] Ensure parent process messages still received after fork --- tests/test_logzioSender.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tests/test_logzioSender.py b/tests/test_logzioSender.py index d7b136c..9556e7f 100644 --- a/tests/test_logzioSender.py +++ b/tests/test_logzioSender.py @@ -109,13 +109,21 @@ class TestLogzioSender(TestCase): def test_can_send_after_fork(self): childpid = os.fork() - log_message = 'logged from child process' + child_log_message = 'logged from child process' + parent_log_message = 'logged from parent process' if childpid == 0: # Log from the child process - self.logger.info(log_message) + self.logger.info(child_log_message) time.sleep(self.logs_drain_timeout * 2) os._exit(0) # Wait for the child process to finish os.waitpid(childpid, 0) - self.assertTrue(self.logzio_listener.find_log(log_message)) + + # log from the parent process + self.logger.info(parent_log_message) + time.sleep(self.logs_drain_timeout * 2) + + # Ensure listener receive all log messages + self.assertTrue(self.logzio_listener.find_log(child_log_message)) + self.assertTrue(self.logzio_listener.find_log(parent_log_message)) From b1950d5038e9328e077dd1d88e28c56e620d31d0 Mon Sep 17 00:00:00 2001 From: Jotham Apaloo Date: Mon, 19 Feb 2018 14:08:49 -0800 Subject: [PATCH 3/5] Fix logs buffer clearing and clear it before each test --- tests/mockLogzioListener/listener.py | 2 +- tests/test_logzioSender.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/mockLogzioListener/listener.py b/tests/mockLogzioListener/listener.py index df97218..213756f 100644 --- a/tests/mockLogzioListener/listener.py +++ b/tests/mockLogzioListener/listener.py @@ -73,7 +73,7 @@ class MockLogzioListener: return len(self.logs_list) def clear_logs_buffer(self): - self.logs_list = [] + self.logs_list[:] = [] def set_server_error(self): self.persistent_flags.set_server_error() diff --git a/tests/test_logzioSender.py b/tests/test_logzioSender.py index 9556e7f..420688b 100644 --- a/tests/test_logzioSender.py +++ b/tests/test_logzioSender.py @@ -21,6 +21,7 @@ def _find(pattern, path): class TestLogzioSender(TestCase): def setUp(self): self.logzio_listener = listener.MockLogzioListener() + self.logzio_listener.clear_logs_buffer() self.logzio_listener.clear_server_error() self.logs_drain_timeout = 1 From d41ceabe82c2c459232b68438564817e7d2ef768 Mon Sep 17 00:00:00 2001 From: Jotham Apaloo Date: Tue, 20 Feb 2018 09:14:41 -0800 Subject: [PATCH 4/5] Leave initialization in c'tor, just call start() where needed --- logzio/sender.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/logzio/sender.py b/logzio/sender.py index 3a40dd0..8ed7c0d 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -34,9 +34,6 @@ class LogzioSender: # Create a queue to hold logs self.queue = queue.Queue() - self._initialize_sending_thread() - - def _initialize_sending_thread(self): self.sending_thread = Thread(target=self._drain_queue) self.sending_thread.daemon = False self.sending_thread.name = "logzio-sending-thread" @@ -45,7 +42,7 @@ class LogzioSender: def append(self, logs_message): # Queue lib is thread safe, no issue here if not self.sending_thread.is_alive(): - self._initialize_sending_thread() + self.sending_thread.start() self.queue.put(json.dumps(logs_message)) def flush(self): From c6c8171a681cb2d7a4abad8d1346e200b73d8800 Mon Sep 17 00:00:00 2001 From: Jotham Apaloo Date: Tue, 20 Feb 2018 16:31:27 -0800 Subject: [PATCH 5/5] Revert "Leave initialization in c'tor, just call start() where needed" This reverts commit d41ceabe82c2c459232b68438564817e7d2ef768. Which I suspect does not work because the thread is stateful, i.e. after os.fork the thread in copy of the process thinks it has already been started, even though it is not alive. Probably safest to just get a new thread instance if a thread is not alive. --- logzio/sender.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/logzio/sender.py b/logzio/sender.py index 8ed7c0d..eb88a5a 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -34,15 +34,19 @@ class LogzioSender: # Create a queue to hold logs self.queue = queue.Queue() + self._initialize_sending_thread() + + def _initialize_sending_thread(self): self.sending_thread = Thread(target=self._drain_queue) self.sending_thread.daemon = False self.sending_thread.name = "logzio-sending-thread" self.sending_thread.start() def append(self, logs_message): - # Queue lib is thread safe, no issue here if not self.sending_thread.is_alive(): - self.sending_thread.start() + self._initialize_sending_thread() + + # Queue lib is thread safe, no issue here self.queue.put(json.dumps(logs_message)) def flush(self):