Browse Source

Merge pull request #18 from orenmazor/manual_draining

Manual draining
opensearch
Roi Rav-Hon 7 years ago
committed by GitHub
parent
commit
7b806081b8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 48 deletions
  1. +0
    -1
      .cache/v/cache/lastfailed
  2. +4
    -1
      logzio/handler.py
  3. +53
    -46
      logzio/sender.py

+ 0
- 1
.cache/v/cache/lastfailed View File

@ -1 +0,0 @@
{}

+ 4
- 1
logzio/handler.py View File

@ -9,6 +9,7 @@ from .sender import LogzioSender
class LogzioHandler(logging.Handler):
def __init__(self, token, logzio_type="python", logs_drain_timeout=3,
url="https://listener.logz.io:8071", debug=False):
@ -44,6 +45,9 @@ class LogzioHandler(logging.Handler):
return extra_fields
def flush(self):
self.logzio_sender.flush()
def format(self, record):
message = super(LogzioHandler, self).format(record)
try:
@ -84,4 +88,3 @@ class LogzioHandler(logging.Handler):
def emit(self, record):
self.logzio_sender.append(self.format_message(record))

+ 53
- 46
logzio/sender.py View File

@ -22,6 +22,7 @@ def backup_logs(logs):
class LogzioSender:
def __init__(self, token, url="https://listener.logz.io:8071", logs_drain_timeout=5, debug=False):
self.token = token
self.url = "{0}/?token={1}".format(url, token)
@ -43,6 +44,9 @@ class LogzioSender:
# Queue lib is thread safe, no issue here
self.queue.put(json.dumps(logs_message))
def flush(self):
self._flush_queue()
def _debug(self, message):
if self.debug:
print(str(message))
@ -57,52 +61,7 @@ class LogzioSender:
last_try = True
try:
# Sending logs until queue is empty
while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size()
self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io")
# Not configurable from the outside
sleep_between_retries = 2
number_of_retries = 4
should_backup_to_disk = True
headers = {"Content-type": "text/plain"}
for current_try in range(number_of_retries):
should_retry = False
try:
response = requests.post(self.url, headers=headers, data='\n'.join(logs_list))
if response.status_code != 200:
if response.status_code == 400:
print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text))
should_backup_to_disk = False
break
if response.status_code == 401:
print("You are not authorized with Logz.io! Token OK? dropping logs...")
should_backup_to_disk = False
break
else:
print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text))
should_retry = True
else:
self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!")
should_backup_to_disk = False
break
except Exception as e:
print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e))
should_retry = True
if should_retry:
sleep(sleep_between_retries)
sleep_between_retries *= 2
if should_backup_to_disk:
# Write to file
print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.")
backup_logs(logs_list)
self._flush_queue()
except Exception as e:
self._debug("Unexpected exception while draining queue to Logz.io, swallowing. Exception: " + str(e))
@ -110,6 +69,54 @@ class LogzioSender:
if not last_try:
sleep(self.logs_drain_timeout)
def _flush_queue(self):
# Sending logs until queue is empty
while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size()
self._debug("Starting to drain " + str(len(logs_list)) + " logs to Logz.io")
# Not configurable from the outside
sleep_between_retries = 2
number_of_retries = 4
should_backup_to_disk = True
headers = {"Content-type": "text/plain"}
for current_try in range(number_of_retries):
should_retry = False
try:
response = requests.post(self.url, headers=headers, data='\n'.join(logs_list))
if response.status_code != 200:
if response.status_code == 400:
print("Got 400 code from Logz.io. This means that some of your logs are too big, or badly formatted. response: {0}".format(response.text))
should_backup_to_disk = False
break
if response.status_code == 401:
print("You are not authorized with Logz.io! Token OK? dropping logs...")
should_backup_to_disk = False
break
else:
print("Got {} while sending logs to Logz.io, Try ({}/{}). Response: {}".format(response.status_code, current_try + 1, number_of_retries, response.text))
should_retry = True
else:
self._debug("Successfully sent bulk of " + str(len(logs_list)) + " logs to Logz.io!")
should_backup_to_disk = False
break
except Exception as e:
print("Got exception while sending logs to Logz.io, Try ({}/{}). Message: {}".format(current_try + 1, number_of_retries, e))
should_retry = True
if should_retry:
sleep(sleep_between_retries)
sleep_between_retries *= 2
if should_backup_to_disk:
# Write to file
print("Could not send logs to Logz.io after " + str(number_of_retries) + " tries, backing up to local file system.")
backup_logs(logs_list)
def _get_messages_up_to_max_allowed_size(self):
logs_list = []
current_size = 0


Loading…
Cancel
Save