|
@ -10,13 +10,13 @@ from threading import Thread, enumerate |
|
|
import requests |
|
|
import requests |
|
|
|
|
|
|
|
|
from .logger import get_logger |
|
|
from .logger import get_logger |
|
|
|
|
|
from .logger import get_stdout_logger |
|
|
|
|
|
|
|
|
if sys.version[0] == '2': |
|
|
if sys.version[0] == '2': |
|
|
import Queue as queue |
|
|
import Queue as queue |
|
|
else: |
|
|
else: |
|
|
import queue as queue |
|
|
import queue as queue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB |
|
|
MAX_BULK_SIZE_IN_BYTES = 1 * 1024 * 1024 # 1 MB |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -34,14 +34,19 @@ class LogzioSender: |
|
|
logs_drain_timeout=5, |
|
|
logs_drain_timeout=5, |
|
|
debug=False, |
|
|
debug=False, |
|
|
backup_logs=True, |
|
|
backup_logs=True, |
|
|
network_timeout=10.0): |
|
|
|
|
|
|
|
|
network_timeout=10.0, |
|
|
|
|
|
number_of_retries=4, |
|
|
|
|
|
retry_timeout=2): |
|
|
self.token = token |
|
|
self.token = token |
|
|
self.url = '{}/?token={}'.format(url, token) |
|
|
self.url = '{}/?token={}'.format(url, token) |
|
|
self.logs_drain_timeout = logs_drain_timeout |
|
|
self.logs_drain_timeout = logs_drain_timeout |
|
|
self.logger = get_logger(debug) |
|
|
self.logger = get_logger(debug) |
|
|
|
|
|
self.stdout_logger = get_stdout_logger(debug) |
|
|
self.backup_logs = backup_logs |
|
|
self.backup_logs = backup_logs |
|
|
self.network_timeout = network_timeout |
|
|
self.network_timeout = network_timeout |
|
|
self.requests_session = requests.Session() |
|
|
self.requests_session = requests.Session() |
|
|
|
|
|
self.number_of_retries = number_of_retries |
|
|
|
|
|
self.retry_timeout = retry_timeout |
|
|
|
|
|
|
|
|
# Function to see if the main thread is alive |
|
|
# Function to see if the main thread is alive |
|
|
self.is_main_thread_active = lambda: any( |
|
|
self.is_main_thread_active = lambda: any( |
|
@ -53,6 +58,7 @@ class LogzioSender: |
|
|
|
|
|
|
|
|
def __del__(self): |
|
|
def __del__(self): |
|
|
del self.logger |
|
|
del self.logger |
|
|
|
|
|
del self.stdout_logger |
|
|
del self.backup_logs |
|
|
del self.backup_logs |
|
|
del self.queue |
|
|
del self.queue |
|
|
|
|
|
|
|
@ -79,7 +85,7 @@ class LogzioSender: |
|
|
# If main is exited, we should run one last time and try to remove |
|
|
# If main is exited, we should run one last time and try to remove |
|
|
# all logs |
|
|
# all logs |
|
|
if not self.is_main_thread_active(): |
|
|
if not self.is_main_thread_active(): |
|
|
self.logger.debug( |
|
|
|
|
|
|
|
|
self.stdout_logger.debug( |
|
|
'Identified quit of main thread, sending logs one ' |
|
|
'Identified quit of main thread, sending logs one ' |
|
|
'last time') |
|
|
'last time') |
|
|
last_try = True |
|
|
last_try = True |
|
@ -98,17 +104,17 @@ class LogzioSender: |
|
|
# Sending logs until queue is empty |
|
|
# Sending logs until queue is empty |
|
|
while not self.queue.empty(): |
|
|
while not self.queue.empty(): |
|
|
logs_list = self._get_messages_up_to_max_allowed_size() |
|
|
logs_list = self._get_messages_up_to_max_allowed_size() |
|
|
self.logger.debug( |
|
|
|
|
|
|
|
|
self.stdout_logger.debug( |
|
|
'Starting to drain %s logs to Logz.io', len(logs_list)) |
|
|
'Starting to drain %s logs to Logz.io', len(logs_list)) |
|
|
|
|
|
|
|
|
# Not configurable from the outside |
|
|
# Not configurable from the outside |
|
|
sleep_between_retries = 2 |
|
|
|
|
|
number_of_retries = 4 |
|
|
|
|
|
|
|
|
sleep_between_retries = self.retry_timeout |
|
|
|
|
|
self.number_of_retries = self.number_of_retries |
|
|
|
|
|
|
|
|
should_backup_to_disk = True |
|
|
should_backup_to_disk = True |
|
|
headers = {"Content-type": "text/plain"} |
|
|
headers = {"Content-type": "text/plain"} |
|
|
|
|
|
|
|
|
for current_try in range(number_of_retries): |
|
|
|
|
|
|
|
|
for current_try in range(self.number_of_retries): |
|
|
should_retry = False |
|
|
should_retry = False |
|
|
try: |
|
|
try: |
|
|
response = self.requests_session.post( |
|
|
response = self.requests_session.post( |
|
@ -116,7 +122,7 @@ class LogzioSender: |
|
|
timeout=self.network_timeout) |
|
|
timeout=self.network_timeout) |
|
|
if response.status_code != 200: |
|
|
if response.status_code != 200: |
|
|
if response.status_code == 400: |
|
|
if response.status_code == 400: |
|
|
self.logger.info( |
|
|
|
|
|
|
|
|
self.stdout_logger.info( |
|
|
'Got 400 code from Logz.io. This means that ' |
|
|
'Got 400 code from Logz.io. This means that ' |
|
|
'some of your logs are too big, or badly ' |
|
|
'some of your logs are too big, or badly ' |
|
|
'formatted. response: %s', response.text) |
|
|
'formatted. response: %s', response.text) |
|
@ -124,42 +130,41 @@ class LogzioSender: |
|
|
break |
|
|
break |
|
|
|
|
|
|
|
|
if response.status_code == 401: |
|
|
if response.status_code == 401: |
|
|
self.logger.info( |
|
|
|
|
|
|
|
|
self.stdout_logger.info( |
|
|
'You are not authorized with Logz.io! Token ' |
|
|
'You are not authorized with Logz.io! Token ' |
|
|
'OK? dropping logs...') |
|
|
'OK? dropping logs...') |
|
|
should_backup_to_disk = False |
|
|
should_backup_to_disk = False |
|
|
break |
|
|
break |
|
|
else: |
|
|
else: |
|
|
self.logger.info( |
|
|
|
|
|
|
|
|
self.stdout_logger.info( |
|
|
'Got %s while sending logs to Logz.io, ' |
|
|
'Got %s while sending logs to Logz.io, ' |
|
|
'Try (%s/%s). Response: %s', |
|
|
'Try (%s/%s). Response: %s', |
|
|
response.status_code, |
|
|
response.status_code, |
|
|
current_try + 1, |
|
|
current_try + 1, |
|
|
number_of_retries, |
|
|
|
|
|
|
|
|
self.number_of_retries, |
|
|
response.text) |
|
|
response.text) |
|
|
should_retry = True |
|
|
should_retry = True |
|
|
else: |
|
|
else: |
|
|
self.logger.debug( |
|
|
|
|
|
|
|
|
self.stdout_logger.debug( |
|
|
'Successfully sent bulk of %s logs to ' |
|
|
'Successfully sent bulk of %s logs to ' |
|
|
'Logz.io!', len(logs_list)) |
|
|
'Logz.io!', len(logs_list)) |
|
|
should_backup_to_disk = False |
|
|
should_backup_to_disk = False |
|
|
break |
|
|
break |
|
|
except Exception as e: |
|
|
except Exception as e: |
|
|
self.logger.warning( |
|
|
|
|
|
|
|
|
self.stdout_logger.warning( |
|
|
'Got exception while sending logs to Logz.io, ' |
|
|
'Got exception while sending logs to Logz.io, ' |
|
|
'Try (%s/%s). Message: %s', |
|
|
'Try (%s/%s). Message: %s', |
|
|
current_try + 1, number_of_retries, e) |
|
|
|
|
|
|
|
|
current_try + 1, self.number_of_retries, e) |
|
|
should_retry = True |
|
|
should_retry = True |
|
|
|
|
|
|
|
|
if should_retry: |
|
|
if should_retry: |
|
|
sleep(sleep_between_retries) |
|
|
sleep(sleep_between_retries) |
|
|
sleep_between_retries *= 2 |
|
|
|
|
|
|
|
|
|
|
|
if should_backup_to_disk and self.backup_logs: |
|
|
if should_backup_to_disk and self.backup_logs: |
|
|
# Write to file |
|
|
# Write to file |
|
|
self.logger.error( |
|
|
|
|
|
|
|
|
self.stdout_logger.error( |
|
|
'Could not send logs to Logz.io after %s tries, ' |
|
|
'Could not send logs to Logz.io after %s tries, ' |
|
|
'backing up to local file system', number_of_retries) |
|
|
|
|
|
|
|
|
'backing up to local file system', self.number_of_retries) |
|
|
backup_logs(logs_list, self.logger) |
|
|
backup_logs(logs_list, self.logger) |
|
|
|
|
|
|
|
|
del logs_list |
|
|
del logs_list |
|
|