From df76f91e049587d3d0c9f52bf8c2afd89f795190 Mon Sep 17 00:00:00 2001 From: Oren Mazor Date: Tue, 30 Jan 2018 15:30:56 -0500 Subject: [PATCH 1/4] extract the queue draining action out of the loop. this way it can be manually called externally --- .cache/v/cache/lastfailed | 1 - logzio/handler.py | 3 ++ logzio/sender.py | 95 ++++++++++++++++++++------------------- 3 files changed, 52 insertions(+), 47 deletions(-) delete mode 100644 .cache/v/cache/lastfailed diff --git a/.cache/v/cache/lastfailed b/.cache/v/cache/lastfailed deleted file mode 100644 index 9e26dfe..0000000 --- a/.cache/v/cache/lastfailed +++ /dev/null @@ -1 +0,0 @@ -{} \ No newline at end of file diff --git a/logzio/handler.py b/logzio/handler.py index 0a2e52f..44bbf6c 100644 --- a/logzio/handler.py +++ b/logzio/handler.py @@ -44,6 +44,9 @@ class LogzioHandler(logging.Handler): return extra_fields + def flush(self): + self.logzio_sender._flush_queue() + def format(self, record): message = super(LogzioHandler, self).format(record) try: diff --git a/logzio/sender.py b/logzio/sender.py index 35bfb93..9a715d5 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -57,52 +57,7 @@ class LogzioSender: last_try = True try: - # Sending logs until queue is empty - while not self.queue.empty(): - logs_list = self._get_messages_up_to_max_allowed_size() - self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io") - - # Not configurable from the outside - sleep_between_retries = 2 - number_of_retries = 4 - - should_backup_to_disk = True - headers = {"Content-type": "text/plain"} - - for current_try in range(number_of_retries): - should_retry = False - try: - response = requests.post(self.url, headers=headers, data='\n'.join(logs_list)) - if response.status_code != 200: - if response.status_code == 400: - print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text)) - should_backup_to_disk = False - break - - if response.status_code == 401: - print("You are not authorized with Logz.io! Token OK? dropping logs...") - should_backup_to_disk = False - break - else: - print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text)) - should_retry = True - else: - self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!") - should_backup_to_disk = False - break - - except Exception as e: - print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e)) - should_retry = True - - if should_retry: - sleep(sleep_between_retries) - sleep_between_retries *= 2 - - if should_backup_to_disk: - # Write to file - print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.") - backup_logs(logs_list) + self._flush_the_queue() except Exception as e: self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e)) @@ -110,6 +65,54 @@ class LogzioSender: if not last_try: sleep(self.logs_drain_timeout) + def _flush_the_queue(self): + # Sending logs until queue is empty + while not self.queue.empty(): + logs_list = self._get_messages_up_to_max_allowed_size() + self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io") + + # Not configurable from the outside + sleep_between_retries = 2 + number_of_retries = 4 + + should_backup_to_disk = True + headers = {"Content-type": "text/plain"} + + for current_try in range(number_of_retries): + should_retry = False + try: + response = requests.post(self.url, headers=headers, data='\n'.join(logs_list)) + if response.status_code != 200: + if response.status_code == 400: + print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text)) + should_backup_to_disk = False + break + + if response.status_code == 401: + print("You are not authorized with Logz.io! Token OK? dropping logs...") + should_backup_to_disk = False + break + else: + print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text)) + should_retry = True + else: + self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!") + should_backup_to_disk = False + break + + except Exception as e: + print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e)) + should_retry = True + + if should_retry: + sleep(sleep_between_retries) + sleep_between_retries *= 2 + + if should_backup_to_disk: + # Write to file + print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.") + backup_logs(logs_list) + def _get_messages_up_to_max_allowed_size(self): logs_list = [] current_size = 0 From 7b2040706f04f83d85fed5091bdb47e6055badc9 Mon Sep 17 00:00:00 2001 From: Oren Mazor Date: Tue, 30 Jan 2018 15:33:43 -0500 Subject: [PATCH 2/4] PEP8 --- logzio/handler.py | 2 +- logzio/sender.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/logzio/handler.py b/logzio/handler.py index 44bbf6c..d3abc7c 100644 --- a/logzio/handler.py +++ b/logzio/handler.py @@ -9,6 +9,7 @@ from .sender import LogzioSender class LogzioHandler(logging.Handler): + def __init__(self, token, logzio_type="python", logs_drain_timeout=3, url="https://listener.logz.io:8071", debug=False): @@ -87,4 +88,3 @@ class LogzioHandler(logging.Handler): def emit(self, record): self.logzio_sender.append(self.format_message(record)) - diff --git a/logzio/sender.py b/logzio/sender.py index 9a715d5..d16a3c6 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -22,6 +22,7 @@ def backup_logs(logs): class LogzioSender: + def __init__(self, token, url="https://listener.logz.io:8071", logs_drain_timeout=5, debug=False): self.token = token self.url = "{0}/?token={1}".format(url, token) From 27f76c2136b9ed246fa26132df019358ad18ccf4 Mon Sep 17 00:00:00 2001 From: Oren Mazor Date: Tue, 30 Jan 2018 16:12:07 -0500 Subject: [PATCH 3/4] typo --- logzio/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logzio/handler.py b/logzio/handler.py index d3abc7c..60a90c9 100644 --- a/logzio/handler.py +++ b/logzio/handler.py @@ -46,7 +46,7 @@ class LogzioHandler(logging.Handler): return extra_fields def flush(self): - self.logzio_sender._flush_queue() + self.logzio_sender._flush_the_queue() def format(self, record): message = super(LogzioHandler, self).format(record) From cc31353d250c2a4a432a981cf7bc6bf75432e16b Mon Sep 17 00:00:00 2001 From: Oren Mazor Date: Wed, 31 Jan 2018 06:49:58 -0500 Subject: [PATCH 4/4] fix semantics I rushed through the first time. the handler shouldn't call a private method on the sender --- logzio/handler.py | 2 +- logzio/sender.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/logzio/handler.py b/logzio/handler.py index 60a90c9..71f55f8 100644 --- a/logzio/handler.py +++ b/logzio/handler.py @@ -46,7 +46,7 @@ class LogzioHandler(logging.Handler): return extra_fields def flush(self): - self.logzio_sender._flush_the_queue() + self.logzio_sender.flush() def format(self, record): message = super(LogzioHandler, self).format(record) diff --git a/logzio/sender.py b/logzio/sender.py index d16a3c6..7975f63 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -44,6 +44,9 @@ class LogzioSender: # Queue lib is thread safe, no issue here self.queue.put(json.dumps(logs_message)) + def flush(self): + self._flush_queue() + def _debug(self, message): if self.debug: print(str(message)) @@ -58,7 +61,7 @@ class LogzioSender: last_try = True try: - self._flush_the_queue() + self._flush_queue() except Exception as e: self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e)) @@ -66,7 +69,7 @@ class LogzioSender: if not last_try: sleep(self.logs_drain_timeout) - def _flush_the_queue(self): + def _flush_queue(self): # Sending logs until queue is empty while not self.queue.empty(): logs_list = self._get_messages_up_to_max_allowed_size()