From df76f91e049587d3d0c9f52bf8c2afd89f795190 Mon Sep 17 00:00:00 2001 From: Oren Mazor Date: Tue, 30 Jan 2018 15:30:56 -0500 Subject: [PATCH] extract the queue draining action out of the loop. this way it can be manually called externally --- .cache/v/cache/lastfailed | 1 - logzio/handler.py | 3 ++ logzio/sender.py | 95 ++++++++++++++++++++------------------- 3 files changed, 52 insertions(+), 47 deletions(-) delete mode 100644 .cache/v/cache/lastfailed diff --git a/.cache/v/cache/lastfailed b/.cache/v/cache/lastfailed deleted file mode 100644 index 9e26dfe..0000000 --- a/.cache/v/cache/lastfailed +++ /dev/null @@ -1 +0,0 @@ -{} \ No newline at end of file diff --git a/logzio/handler.py b/logzio/handler.py index 0a2e52f..44bbf6c 100644 --- a/logzio/handler.py +++ b/logzio/handler.py @@ -44,6 +44,9 @@ class LogzioHandler(logging.Handler): return extra_fields + def flush(self): + self.logzio_sender._flush_queue() + def format(self, record): message = super(LogzioHandler, self).format(record) try: diff --git a/logzio/sender.py b/logzio/sender.py index 35bfb93..9a715d5 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -57,52 +57,7 @@ class LogzioSender: last_try = True try: - # Sending logs until queue is empty - while not self.queue.empty(): - logs_list = self._get_messages_up_to_max_allowed_size() - self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io") - - # Not configurable from the outside - sleep_between_retries = 2 - number_of_retries = 4 - - should_backup_to_disk = True - headers = {"Content-type": "text/plain"} - - for current_try in range(number_of_retries): - should_retry = False - try: - response = requests.post(self.url, headers=headers, data='\n'.join(logs_list)) - if response.status_code != 200: - if response.status_code == 400: - print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text)) - should_backup_to_disk = False - break - - if response.status_code == 401: - print("You are not authorized with Logz.io! Token OK? dropping logs...") - should_backup_to_disk = False - break - else: - print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text)) - should_retry = True - else: - self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!") - should_backup_to_disk = False - break - - except Exception as e: - print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e)) - should_retry = True - - if should_retry: - sleep(sleep_between_retries) - sleep_between_retries *= 2 - - if should_backup_to_disk: - # Write to file - print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.") - backup_logs(logs_list) + self._flush_the_queue() except Exception as e: self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e)) @@ -110,6 +65,54 @@ class LogzioSender: if not last_try: sleep(self.logs_drain_timeout) + def _flush_the_queue(self): + # Sending logs until queue is empty + while not self.queue.empty(): + logs_list = self._get_messages_up_to_max_allowed_size() + self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io") + + # Not configurable from the outside + sleep_between_retries = 2 + number_of_retries = 4 + + should_backup_to_disk = True + headers = {"Content-type": "text/plain"} + + for current_try in range(number_of_retries): + should_retry = False + try: + response = requests.post(self.url, headers=headers, data='\n'.join(logs_list)) + if response.status_code != 200: + if response.status_code == 400: + print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text)) + should_backup_to_disk = False + break + + if response.status_code == 401: + print("You are not authorized with Logz.io! Token OK? dropping logs...") + should_backup_to_disk = False + break + else: + print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text)) + should_retry = True + else: + self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!") + should_backup_to_disk = False + break + + except Exception as e: + print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e)) + should_retry = True + + if should_retry: + sleep(sleep_between_retries) + sleep_between_retries *= 2 + + if should_backup_to_disk: + # Write to file + print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.") + backup_logs(logs_list) + def _get_messages_up_to_max_allowed_size(self): logs_list = [] current_size = 0