|
|
@ -8,6 +8,17 @@ from datetime import datetime |
|
|
|
from threading import Thread, enumerate |
|
|
|
|
|
|
|
import requests |
|
|
|
from datetime import datetime |
|
|
|
|
|
|
|
import opensearchpy |
|
|
|
|
|
|
|
import logging |
|
|
|
|
|
|
|
opensearch_logger = logging.getLogger('opensearch') |
|
|
|
opensearch_logger.propagate = False |
|
|
|
opensearch_logger.handlers.clear() |
|
|
|
opensearch_logger.setLevel(logging.WARNING) |
|
|
|
|
|
|
|
|
|
|
|
from .logger import get_logger |
|
|
|
from .logger import get_stdout_logger |
|
|
@ -30,15 +41,26 @@ def backup_logs(logs, logger): |
|
|
|
|
|
|
|
class LogzioSender: |
|
|
|
def __init__(self, |
|
|
|
token, url='https://listener.logz.io:8071', |
|
|
|
host, port, username, password, |
|
|
|
# token, url='https://listener.logz.io:8071', |
|
|
|
logs_drain_timeout=5, |
|
|
|
debug=False, |
|
|
|
backup_logs=True, |
|
|
|
network_timeout=10.0, |
|
|
|
number_of_retries=4, |
|
|
|
retry_timeout=2): |
|
|
|
self.token = token |
|
|
|
self.url = '{}/?token={}'.format(url, token) |
|
|
|
# self.token = token |
|
|
|
# self.url = '{}/?token={}'.format(url, token) |
|
|
|
|
|
|
|
self.client = opensearchpy.OpenSearch( |
|
|
|
hosts = [{'host': host, 'port': port}], |
|
|
|
http_compress = True, # enables gzip compression for request bodies |
|
|
|
http_auth = (username, password), |
|
|
|
use_ssl = True, |
|
|
|
verify_certs = True, |
|
|
|
ssl_assert_hostname = False, |
|
|
|
ssl_show_warn = False, |
|
|
|
) |
|
|
|
self.logs_drain_timeout = logs_drain_timeout |
|
|
|
self.stdout_logger = get_stdout_logger(debug) |
|
|
|
self.backup_logs = backup_logs |
|
|
@ -71,7 +93,8 @@ class LogzioSender: |
|
|
|
self._initialize_sending_thread() |
|
|
|
|
|
|
|
# Queue lib is thread safe, no issue here |
|
|
|
self.queue.put(json.dumps(logs_message)) |
|
|
|
# self.queue.put(json.dumps(logs_message)) |
|
|
|
self.queue.put(logs_message) |
|
|
|
|
|
|
|
def flush(self): |
|
|
|
self._flush_queue() |
|
|
@ -115,39 +138,48 @@ class LogzioSender: |
|
|
|
for current_try in range(self.number_of_retries): |
|
|
|
should_retry = False |
|
|
|
try: |
|
|
|
response = self.requests_session.post( |
|
|
|
self.url, headers=headers, data='\n'.join(logs_list), |
|
|
|
timeout=self.network_timeout) |
|
|
|
if response.status_code != 200: |
|
|
|
if response.status_code == 400: |
|
|
|
self.stdout_logger.info( |
|
|
|
'Got 400 code from Logz.io. This means that ' |
|
|
|
'some of your logs are too big, or badly ' |
|
|
|
'formatted. response: %s', response.text) |
|
|
|
should_backup_to_disk = False |
|
|
|
break |
|
|
|
|
|
|
|
if response.status_code == 401: |
|
|
|
self.stdout_logger.info( |
|
|
|
'You are not authorized with Logz.io! Token ' |
|
|
|
'OK? dropping logs...') |
|
|
|
should_backup_to_disk = False |
|
|
|
break |
|
|
|
else: |
|
|
|
self.stdout_logger.info( |
|
|
|
'Got %s while sending logs to Logz.io, ' |
|
|
|
'Try (%s/%s). Response: %s', |
|
|
|
response.status_code, |
|
|
|
current_try + 1, |
|
|
|
self.number_of_retries, |
|
|
|
response.text) |
|
|
|
should_retry = True |
|
|
|
else: |
|
|
|
self.stdout_logger.debug( |
|
|
|
'Successfully sent bulk of %s logs to ' |
|
|
|
'Logz.io!', len(logs_list)) |
|
|
|
should_backup_to_disk = False |
|
|
|
break |
|
|
|
index_name = f"backendlog-{datetime.utcnow():%Y-%m}" |
|
|
|
index_body = {'settings': {'index': {'number_of_shards': 1, 'number_of_replicas': 0}}} |
|
|
|
self.client.indices.create(index_name, body=index_body, ignore=400) |
|
|
|
respose = opensearchpy.helpers.bulk( |
|
|
|
self.client, |
|
|
|
[{'_index': index_name, **entry} for entry in logs_list], |
|
|
|
max_retries=3 |
|
|
|
) |
|
|
|
|
|
|
|
# response = self.requests_session.post( |
|
|
|
# self.url, headers=headers, data='\n'.join(logs_list), |
|
|
|
# timeout=self.network_timeout) |
|
|
|
# if response.status_code != 200: |
|
|
|
# if response.status_code == 400: |
|
|
|
# self.stdout_logger.info( |
|
|
|
# 'Got 400 code from Logz.io. This means that ' |
|
|
|
# 'some of your logs are too big, or badly ' |
|
|
|
# 'formatted. response: %s', response.text) |
|
|
|
# should_backup_to_disk = False |
|
|
|
# break |
|
|
|
|
|
|
|
# if response.status_code == 401: |
|
|
|
# self.stdout_logger.info( |
|
|
|
# 'You are not authorized with Logz.io! Token ' |
|
|
|
# 'OK? dropping logs...') |
|
|
|
# should_backup_to_disk = False |
|
|
|
# break |
|
|
|
# else: |
|
|
|
# self.stdout_logger.info( |
|
|
|
# 'Got %s while sending logs to Logz.io, ' |
|
|
|
# 'Try (%s/%s). Response: %s', |
|
|
|
# response.status_code, |
|
|
|
# current_try + 1, |
|
|
|
# self.number_of_retries, |
|
|
|
# response.text) |
|
|
|
# should_retry = True |
|
|
|
# else: |
|
|
|
self.stdout_logger.debug( |
|
|
|
'Successfully sent bulk of %s logs to ' |
|
|
|
'Logz.io!', len(logs_list)) |
|
|
|
should_backup_to_disk = False |
|
|
|
break |
|
|
|
except Exception as e: |
|
|
|
self.stdout_logger.warning( |
|
|
|
'Got exception while sending logs to Logz.io, ' |
|
|
|