diff --git a/README.md b/README.md index 189784e..1c0cfe2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,20 @@ -[![PyPI version](https://badge.fury.io/py/logzio-python-handler.svg)](https://badge.fury.io/py/logzio-python-handler) [![Build Status](https://travis-ci.org/logzio/logzio-python-handler.svg?branch=master)](https://travis-ci.org/logzio/logzio-python-handler) +# The OpenSearch Python Handler + +### Fork notice + +This project has been forked from [The Logz.io Python Handler](https://github.com/logzio/logzio-python-handler), version 4.0.1. +After using the logz.io hosted and managed services to collect and visualize the logs of our Python appliances, we moved to +a self-hosted on-premise OpenSearch cluster. + +To reduce the burden of adapting our applications, we modified `logzio-python-handler` to send the log stream directly to our OpenSearch +cluster. This fork work as a drop-in replacement for `logzio-python-handler`, the name of classes is unaltered at the moment. + +Instead of the parameters `url` and `token` the LogzioHandler constructors now requires `host`, `port`, `username`, `password` of the +OpenSearch cluster. + +Only basic HTTP auth is supported at the moment, and a behavios has been hard-coded to produce new indices for every month. + -# The Logz.io Python Handler
diff --git a/logzio/handler.py b/logzio/handler.py index e9f21e4..9690145 100644 --- a/logzio/handler.py +++ b/logzio/handler.py @@ -14,7 +14,7 @@ from opentelemetry.instrumentation.logging import LoggingInstrumentor class LogzioHandler(logging.Handler): def __init__(self, - token, + host, port, username, password, logzio_type="python", logs_drain_timeout=3, url="https://listener.logz.io:8071", @@ -25,17 +25,13 @@ class LogzioHandler(logging.Handler): retry_timeout=2, add_context=False): - if not token: - raise LogzioException('Logz.io Token must be provided') - self.logzio_type = logzio_type if add_context: LoggingInstrumentor().instrument(set_logging_format=True) self.logzio_sender = LogzioSender( - token=token, - url=url, + host=host, port=port, username=username, password=password, logs_drain_timeout=logs_drain_timeout, debug=debug, backup_logs=backup_logs, diff --git a/logzio/sender.py b/logzio/sender.py index 5a7d16c..6cbf8f7 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -8,6 +8,17 @@ from datetime import datetime from threading import Thread, enumerate import requests +from datetime import datetime + +import opensearchpy + +import logging + +opensearch_logger = logging.getLogger('opensearch') +opensearch_logger.propagate = False +opensearch_logger.handlers.clear() +opensearch_logger.setLevel(logging.WARNING) + from .logger import get_logger from .logger import get_stdout_logger @@ -30,15 +41,26 @@ def backup_logs(logs, logger): class LogzioSender: def __init__(self, - token, url='https://listener.logz.io:8071', + host, port, username, password, + # token, url='https://listener.logz.io:8071', logs_drain_timeout=5, debug=False, backup_logs=True, network_timeout=10.0, number_of_retries=4, retry_timeout=2): - self.token = token - self.url = '{}/?token={}'.format(url, token) + # self.token = token + # self.url = '{}/?token={}'.format(url, token) + + self.client = opensearchpy.OpenSearch( + hosts = [{'host': host, 'port': port}], + http_compress = True, # enables gzip compression for request bodies + http_auth = (username, password), + use_ssl = True, + verify_certs = True, + ssl_assert_hostname = False, + ssl_show_warn = False, + ) self.logs_drain_timeout = logs_drain_timeout self.stdout_logger = get_stdout_logger(debug) self.backup_logs = backup_logs @@ -71,7 +93,8 @@ class LogzioSender: self._initialize_sending_thread() # Queue lib is thread safe, no issue here - self.queue.put(json.dumps(logs_message)) + # self.queue.put(json.dumps(logs_message)) + self.queue.put(logs_message) def flush(self): self._flush_queue() @@ -115,39 +138,48 @@ class LogzioSender: for current_try in range(self.number_of_retries): should_retry = False try: - response = self.requests_session.post( - self.url, headers=headers, data='\n'.join(logs_list), - timeout=self.network_timeout) - if response.status_code != 200: - if response.status_code == 400: - self.stdout_logger.info( - 'Got 400 code from Logz.io. This means that ' - 'some of your logs are too big, or badly ' - 'formatted. response: %s', response.text) - should_backup_to_disk = False - break - - if response.status_code == 401: - self.stdout_logger.info( - 'You are not authorized with Logz.io! Token ' - 'OK? dropping logs...') - should_backup_to_disk = False - break - else: - self.stdout_logger.info( - 'Got %s while sending logs to Logz.io, ' - 'Try (%s/%s). Response: %s', - response.status_code, - current_try + 1, - self.number_of_retries, - response.text) - should_retry = True - else: - self.stdout_logger.debug( - 'Successfully sent bulk of %s logs to ' - 'Logz.io!', len(logs_list)) - should_backup_to_disk = False - break + index_name = f"backendlog-{datetime.utcnow():%Y-%m}" + index_body = {'settings': {'index': {'number_of_shards': 1, 'number_of_replicas': 0}}} + self.client.indices.create(index_name, body=index_body, ignore=400) + respose = opensearchpy.helpers.bulk( + self.client, + [{'_index': index_name, **entry} for entry in logs_list], + max_retries=3 + ) + + # response = self.requests_session.post( + # self.url, headers=headers, data='\n'.join(logs_list), + # timeout=self.network_timeout) + # if response.status_code != 200: + # if response.status_code == 400: + # self.stdout_logger.info( + # 'Got 400 code from Logz.io. This means that ' + # 'some of your logs are too big, or badly ' + # 'formatted. response: %s', response.text) + # should_backup_to_disk = False + # break + + # if response.status_code == 401: + # self.stdout_logger.info( + # 'You are not authorized with Logz.io! Token ' + # 'OK? dropping logs...') + # should_backup_to_disk = False + # break + # else: + # self.stdout_logger.info( + # 'Got %s while sending logs to Logz.io, ' + # 'Try (%s/%s). Response: %s', + # response.status_code, + # current_try + 1, + # self.number_of_retries, + # response.text) + # should_retry = True + # else: + self.stdout_logger.debug( + 'Successfully sent bulk of %s logs to ' + 'Logz.io!', len(logs_list)) + should_backup_to_disk = False + break except Exception as e: self.stdout_logger.warning( 'Got exception while sending logs to Logz.io, ' diff --git a/requirements.txt b/requirements.txt index 3169b13..c757919 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ requests>=2.27.0 protobuf>=3.20.2 opentelemetry-instrumentation-logging==0.32b0 -setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability \ No newline at end of file +setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability +opensearch-py + diff --git a/setup.py b/setup.py index d88aada..f24b67e 100644 --- a/setup.py +++ b/setup.py @@ -2,20 +2,21 @@ from setuptools import setup, find_packages setup( - name="logzio-python-handler", + name="opensearch-python-handler", version='4.0.1', - description="Logging handler to send logs to your Logz.io account with bulk SSL", - keywords="logging handler logz.io bulk https", - author="roiravhon", - maintainer="tamir-michaeli", - mail="tamir.michaeli@logz.io", - url="https://github.com/logzio/logzio-python-handler/", + description="Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler", + keywords="logging handler opensearch bulk https", + author="Lorenzo Zolfanelli", + maintainer="Lorenzo Zolfanelli", + mail="dev@zolfa.nl", + url="https://projects.lilik.it/zolfa/opensearch-python-handler", license="Apache License 2", packages=find_packages(), install_requires=[ "requests>=2.27.0", "protobuf>=3.20.2", - "opentelemetry-instrumentation-logging==0.32b0" + "opentelemetry-instrumentation-logging==0.32b0", + "opensearch-py" ], test_requires=[ "future"