|
|
- From 4f639231c83b09ea004c03e95c702b7750bf9930 Mon Sep 17 00:00:00 2001
- From: Ander Juaristi <a@juaristi.eus>
- Date: Fri, 26 Apr 2019 09:58:06 +0200
- Subject: IPFIX: Add IPFIX output plugin
-
- This patch adds an IPFIX output plugin to ulogd2. It generates NetFlow/IPFIX
- traces and sends them to a remote server (collector) via TCP or UDP.
-
- Based on original work by Holger Eitzenberger <holger@eitzenberger.org>.
-
- How to test this
- ----------------
-
- I am currently testing this with the NFCT input and Wireshark.
-
- Place the following in ulogd.conf:
-
- # this will print all flows on screen
- loglevel=1
-
- # load NFCT and IPFIX plugins
- plugin="/lib/ulogd/ulogd_inpflow_NFCT.so"
- plugin="/lib/ulogd/ulogd_output_IPFIX.so"
-
- stack=ct1:NFCT,ipfix1:IPFIX
-
- [ct1]
- netlink_socket_buffer_size=217088
- netlink_socket_buffer_maxsize=1085440
- accept_proto_filter=tcp,sctp
-
- [ipfix1]
- oid=1
- host="127.0.0.1"
- #port=4739
- #send_template="once"
-
- I am currently testing it by launching a plain NetCat listener on port
- 4739 (the default for IPFIX) and then running Wireshark and see that it
- dissects the IPFIX/NetFlow traffic correctly (obviously this relies on
- the Wireshark NetFlow dissector being correct).
-
- First:
-
- nc -vvvv -l 127.0.0.1 4739
-
- Then:
-
- sudo ulogd -vc ulogd.conf
-
- Signed-off-by: Ander Juaristi <a@juaristi.eus>
- Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
- ---
- configure.ac | 2 +-
- include/ulogd/ulogd.h | 5 +
- input/flow/ulogd_inpflow_IPFIX.c | 2 -
- output/Makefile.am | 2 +-
- output/ipfix/Makefile.am | 7 +
- output/ipfix/ipfix.c | 141 ++++++++++
- output/ipfix/ipfix.h | 89 +++++++
- output/ipfix/ulogd_output_IPFIX.c | 503 +++++++++++++++++++++++++++++++++++
- output/ulogd_output_IPFIX.c | 546 --------------------------------------
- 9 files changed, 747 insertions(+), 550 deletions(-)
- delete mode 100644 input/flow/ulogd_inpflow_IPFIX.c
- create mode 100644 output/ipfix/Makefile.am
- create mode 100644 output/ipfix/ipfix.c
- create mode 100644 output/ipfix/ipfix.h
- create mode 100644 output/ipfix/ulogd_output_IPFIX.c
- delete mode 100644 output/ulogd_output_IPFIX.c
-
- --- a/configure.ac
- +++ b/configure.ac
- @@ -179,7 +179,7 @@ AC_CONFIG_FILES(include/Makefile include
- input/sum/Makefile \
- filter/Makefile filter/raw2packet/Makefile filter/packet2flow/Makefile \
- output/Makefile output/pcap/Makefile output/mysql/Makefile output/pgsql/Makefile output/sqlite3/Makefile \
- - output/dbi/Makefile \
- + output/dbi/Makefile output/ipfix/Makefile \
- src/Makefile Makefile Rules.make)
- AC_OUTPUT
-
- --- a/include/ulogd/ulogd.h
- +++ b/include/ulogd/ulogd.h
- @@ -28,6 +28,11 @@
-
- /* types without length */
- #define ULOGD_RET_NONE 0x0000
- +#define __packed __attribute__((packed))
- +#define __noreturn __attribute__((noreturn))
- +#define __cold __attribute__((cold))
- +
- +#define __packed __attribute__((packed))
-
- #define ULOGD_RET_INT8 0x0001
- #define ULOGD_RET_INT16 0x0002
- --- a/output/Makefile.am
- +++ b/output/Makefile.am
- @@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include ${
- ${LIBNETFILTER_CONNTRACK_CFLAGS} ${LIBNETFILTER_LOG_CFLAGS}
- AM_CFLAGS = ${regular_CFLAGS}
-
- -SUBDIRS= pcap mysql pgsql sqlite3 dbi
- +SUBDIRS= pcap mysql pgsql sqlite3 dbi ipfix
-
- pkglib_LTLIBRARIES = ulogd_output_LOGEMU.la ulogd_output_SYSLOG.la \
- ulogd_output_OPRINT.la ulogd_output_GPRINT.la \
- --- /dev/null
- +++ b/output/ipfix/Makefile.am
- @@ -0,0 +1,7 @@
- +AM_CPPFLAGS = -I$(top_srcdir)/include
- +AM_CFLAGS = $(regular_CFLAGS)
- +
- +pkglib_LTLIBRARIES = ulogd_output_IPFIX.la
- +
- +ulogd_output_IPFIX_la_SOURCES = ulogd_output_IPFIX.c ipfix.c
- +ulogd_output_IPFIX_la_LDFLAGS = -avoid-version -module
- --- /dev/null
- +++ b/output/ipfix/ipfix.c
- @@ -0,0 +1,141 @@
- +/*
- + * ipfix.c
- + *
- + * Holger Eitzenberger, 2009.
- + */
- +
- +/* These forward declarations are needed since ulogd.h doesn't like to be the first */
- +#include <ulogd/linuxlist.h>
- +
- +#define __packed __attribute__((packed))
- +
- +#include "ipfix.h"
- +
- +#include <ulogd/ulogd.h>
- +#include <ulogd/common.h>
- +
- +struct ipfix_msg *ipfix_msg_alloc(size_t len, uint32_t oid)
- +{
- + struct ipfix_msg *msg;
- + struct ipfix_hdr *hdr;
- +
- + if (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN)
- + return NULL;
- +
- + msg = malloc(sizeof(struct ipfix_msg) + len);
- + memset(msg, 0, sizeof(struct ipfix_msg));
- + msg->tail = msg->data + IPFIX_HDRLEN;
- + msg->end = msg->data + len;
- +
- + hdr = ipfix_msg_hdr(msg);
- + memset(hdr, 0, IPFIX_HDRLEN);
- + hdr->version = htons(IPFIX_VERSION);
- + hdr->oid = htonl(oid);
- +
- + return msg;
- +}
- +
- +void ipfix_msg_free(struct ipfix_msg *msg)
- +{
- + if (!msg)
- + return;
- +
- + if (msg->nrecs > 0)
- + ulogd_log(ULOGD_DEBUG, "%s: %d flows have been lost\n", __func__,
- + msg->nrecs);
- +
- + free(msg);
- +}
- +
- +struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *msg)
- +{
- + return (struct ipfix_hdr *)msg->data;
- +}
- +
- +void *ipfix_msg_data(struct ipfix_msg *msg)
- +{
- + return msg->data;
- +}
- +
- +size_t ipfix_msg_len(const struct ipfix_msg *msg)
- +{
- + return msg->tail - msg->data;
- +}
- +
- +struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *msg, uint16_t sid)
- +{
- + struct ipfix_set_hdr *shdr;
- +
- + if (msg->end - msg->tail < (int) IPFIX_SET_HDRLEN)
- + return NULL;
- +
- + shdr = (struct ipfix_set_hdr *)msg->tail;
- + shdr->id = sid;
- + shdr->len = IPFIX_SET_HDRLEN;
- + msg->tail += IPFIX_SET_HDRLEN;
- + msg->last_set = shdr;
- + return shdr;
- +}
- +
- +struct ipfix_set_hdr *ipfix_msg_get_set(const struct ipfix_msg *msg)
- +{
- + return msg->last_set;
- +}
- +
- +/**
- + * Add data record to an IPFIX message. The data is accounted properly.
- + *
- + * @return pointer to data or %NULL if not that much space left.
- + */
- +void *ipfix_msg_add_data(struct ipfix_msg *msg, size_t len)
- +{
- + void *data;
- +
- + if (!msg->last_set) {
- + ulogd_log(ULOGD_FATAL, "msg->last_set is NULL\n");
- + return NULL;
- + }
- +
- + if ((ssize_t) len > msg->end - msg->tail)
- + return NULL;
- +
- + data = msg->tail;
- + msg->tail += len;
- + msg->nrecs++;
- + msg->last_set->len += len;
- +
- + return data;
- +}
- +
- +/* check and dump message */
- +int ipfix_dump_msg(const struct ipfix_msg *msg)
- +{
- + const struct ipfix_hdr *hdr = ipfix_msg_hdr(msg);
- + const struct ipfix_set_hdr *shdr = (struct ipfix_set_hdr *) hdr->data;
- +
- + if (ntohs(hdr->len) < IPFIX_HDRLEN) {
- + ulogd_log(ULOGD_FATAL, "Invalid IPFIX message header length\n");
- + return -1;
- + }
- + if (ipfix_msg_len(msg) != IPFIX_HDRLEN + ntohs(shdr->len)) {
- + ulogd_log(ULOGD_FATAL, "Invalid IPFIX message length\n");
- + return -1;
- + }
- +
- + ulogd_log(ULOGD_DEBUG, "msg: ver=%#x len=%#x t=%#x seq=%#x oid=%d\n",
- + ntohs(hdr->version), ntohs(hdr->len), htonl(hdr->time),
- + ntohl(hdr->seqno), ntohl(hdr->oid));
- +
- + return 0;
- +}
- +
- +/* template management */
- +size_t ipfix_rec_len(uint16_t sid)
- +{
- + if (sid != htons(VY_IPFIX_SID)) {
- + ulogd_log(ULOGD_FATAL, "Invalid SID\n");
- + return 0;
- + }
- +
- + return sizeof(struct vy_ipfix_data);
- +}
- --- /dev/null
- +++ b/output/ipfix/ipfix.h
- @@ -0,0 +1,89 @@
- +/*
- + * ipfix.h
- + *
- + * Holger Eitzenberger <holger@eitzenberger.org>, 2009.
- + */
- +#ifndef IPFIX_H
- +#define IPFIX_H
- +
- +#include <stdint.h>
- +#include <netinet/in.h>
- +
- +
- +struct ipfix_hdr {
- +#define IPFIX_VERSION 0xa
- + uint16_t version;
- + uint16_t len;
- + uint32_t time;
- + uint32_t seqno;
- + uint32_t oid; /* Observation Domain ID */
- + uint8_t data[];
- +} __packed;
- +
- +#define IPFIX_HDRLEN sizeof(struct ipfix_hdr)
- +
- +/*
- + * IDs 0-255 are reserved for Template Sets. IDs of Data Sets are > 255.
- + */
- +struct ipfix_templ_hdr {
- + uint16_t id;
- + uint16_t cnt;
- + uint8_t data[];
- +} __packed;
- +
- +struct ipfix_set_hdr {
- +#define IPFIX_SET_TEMPL 2
- +#define IPFIX_SET_OPT_TEMPL 3
- + uint16_t id;
- + uint16_t len;
- + uint8_t data[];
- +} __packed;
- +
- +#define IPFIX_SET_HDRLEN sizeof(struct ipfix_set_hdr)
- +
- +struct ipfix_msg {
- + struct llist_head link;
- + uint8_t *tail;
- + uint8_t *end;
- + unsigned nrecs;
- + struct ipfix_set_hdr *last_set;
- + uint8_t data[];
- +};
- +
- +struct vy_ipfix_data {
- + struct in_addr saddr;
- + struct in_addr daddr;
- + uint16_t ifi_in;
- + uint16_t ifi_out;
- + uint32_t packets;
- + uint32_t bytes;
- + uint32_t start; /* Unix time */
- + uint32_t end; /* Unix time */
- + uint16_t sport;
- + uint16_t dport;
- + uint32_t aid; /* Application ID */
- + uint8_t l4_proto;
- + uint8_t dscp;
- + uint16_t __padding;
- +} __packed;
- +
- +#define VY_IPFIX_SID 256
- +
- +#define VY_IPFIX_FLOWS 36
- +#define VY_IPFIX_PKT_LEN (IPFIX_HDRLEN + IPFIX_SET_HDRLEN \
- + + VY_IPFIX_FLOWS * sizeof(struct vy_ipfix_data))
- +
- +/* template management */
- +size_t ipfix_rec_len(uint16_t);
- +
- +/* message handling */
- +struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t);
- +void ipfix_msg_free(struct ipfix_msg *);
- +struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *);
- +size_t ipfix_msg_len(const struct ipfix_msg *);
- +void *ipfix_msg_data(struct ipfix_msg *);
- +struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *, uint16_t);
- +void *ipfix_msg_add_data(struct ipfix_msg *, size_t);
- +int ipfix_dump_msg(const struct ipfix_msg *);
- +
- +#endif /* IPFIX_H */
- --- /dev/null
- +++ b/output/ipfix/ulogd_output_IPFIX.c
- @@ -0,0 +1,503 @@
- +/*
- + * ulogd_output_IPFIX.c
- + *
- + * ulogd IPFIX Exporter plugin.
- + *
- + * This program is distributed in the hope that it will be useful,
- + * but WITHOUT ANY WARRANTY; without even the implied warranty of
- + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- + * GNU General Public License for more details.
- + *
- + * You should have received a copy of the GNU General Public License
- + * along with this program; if not, write to the Free Software
- + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- + *
- + * Holger Eitzenberger <holger@eitzenberger.org> Astaro AG 2009
- + */
- +#include <unistd.h>
- +#include <time.h>
- +#include <sys/types.h>
- +#include <sys/socket.h>
- +#include <arpa/inet.h>
- +#include <netdb.h>
- +#include <ulogd/ulogd.h>
- +#include <ulogd/common.h>
- +
- +#include "ipfix.h"
- +
- +#define DEFAULT_MTU 512 /* RFC 5101, 10.3.3 */
- +#define DEFAULT_PORT 4739 /* RFC 5101, 10.3.4 */
- +#define DEFAULT_SPORT 4740
- +
- +enum {
- + OID_CE = 0,
- + HOST_CE,
- + PORT_CE,
- + PROTO_CE,
- + MTU_CE,
- +};
- +
- +#define oid_ce(x) (x->ces[OID_CE])
- +#define host_ce(x) (x->ces[HOST_CE])
- +#define port_ce(x) (x->ces[PORT_CE])
- +#define proto_ce(x) (x->ces[PROTO_CE])
- +#define mtu_ce(x) (x->ces[MTU_CE])
- +
- +static const struct config_keyset ipfix_kset = {
- + .num_ces = 5,
- + .ces = {
- + {
- + .key = "oid",
- + .type = CONFIG_TYPE_INT,
- + .u.value = 0
- + },
- + {
- + .key = "host",
- + .type = CONFIG_TYPE_STRING,
- + .u.string = ""
- + },
- + {
- + .key = "port",
- + .type = CONFIG_TYPE_INT,
- + .u.value = DEFAULT_PORT
- + },
- + {
- + .key = "proto",
- + .type = CONFIG_TYPE_STRING,
- + .u.string = "tcp"
- + },
- + {
- + .key = "mtu",
- + .type = CONFIG_TYPE_INT,
- + .u.value = DEFAULT_MTU
- + }
- + }
- +};
- +
- +struct ipfix_templ {
- + struct ipfix_templ *next;
- +};
- +
- +struct ipfix_priv {
- + struct ulogd_fd ufd;
- + uint32_t seqno;
- + struct ipfix_msg *msg; /* current message */
- + struct llist_head list;
- + struct ipfix_templ *templates;
- + int proto;
- + struct ulogd_timer timer;
- + struct sockaddr_in sa;
- +};
- +
- +enum {
- + InIpSaddr = 0,
- + InIpDaddr,
- + InRawInPktCount,
- + InRawInPktLen,
- + InRawOutPktCount,
- + InRawOutPktLen,
- + InFlowStartSec,
- + InFlowStartUsec,
- + InFlowEndSec,
- + InFlowEndUsec,
- + InL4SPort,
- + InL4DPort,
- + InIpProto,
- + InCtMark
- +};
- +
- +static struct ulogd_key ipfix_in_keys[] = {
- + [InIpSaddr] = {
- + .type = ULOGD_RET_IPADDR,
- + .name = "orig.ip.saddr"
- + },
- + [InIpDaddr] = {
- + .type = ULOGD_RET_IPADDR,
- + .name = "orig.ip.daddr"
- + },
- + [InRawInPktCount] = {
- + .type = ULOGD_RET_UINT64,
- + .name = "orig.raw.pktcount"
- + },
- + [InRawInPktLen] = {
- + .type = ULOGD_RET_UINT64,
- + .name = "orig.raw.pktlen"
- + },
- + [InRawOutPktCount] = {
- + .type = ULOGD_RET_UINT64,
- + .name = "reply.raw.pktcount"
- + },
- + [InRawOutPktLen] = {
- + .type = ULOGD_RET_UINT64,
- + .name = "reply.raw.pktlen"
- + },
- + [InFlowStartSec] = {
- + .type = ULOGD_RET_UINT32,
- + .name = "flow.start.sec"
- + },
- + [InFlowStartUsec] = {
- + .type = ULOGD_RET_UINT32,
- + .name = "flow.start.usec"
- + },
- + [InFlowEndSec] = {
- + .type = ULOGD_RET_UINT32,
- + .name = "flow.end.sec"
- + },
- + [InFlowEndUsec] = {
- + .type = ULOGD_RET_UINT32,
- + .name = "flow.end.usec"
- + },
- + [InL4SPort] = {
- + .type = ULOGD_RET_UINT16,
- + .name = "orig.l4.sport"
- + },
- + [InL4DPort] = {
- + .type = ULOGD_RET_UINT16,
- + .name = "orig.l4.dport"
- + },
- + [InIpProto] = {
- + .type = ULOGD_RET_UINT8,
- + .name = "orig.ip.protocol"
- + },
- + [InCtMark] = {
- + .type = ULOGD_RET_UINT32,
- + .name = "ct.mark"
- + }
- +};
- +
- +/* do some polishing and enqueue it */
- +static void enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
- +{
- + struct ipfix_hdr *hdr = ipfix_msg_data(msg);
- +
- + if (!msg)
- + return;
- +
- + hdr->time = htonl(time(NULL));
- + hdr->seqno = htonl(priv->seqno += msg->nrecs);
- + if (msg->last_set) {
- + msg->last_set->id = htons(msg->last_set->id);
- + msg->last_set->len = htons(msg->last_set->len);
- + msg->last_set = NULL;
- + }
- + hdr->len = htons(ipfix_msg_len(msg));
- +
- + llist_add(&msg->link, &priv->list);
- +}
- +
- +/**
- + * @return %ULOGD_IRET_OK or error value
- + */
- +static int send_msgs(struct ulogd_pluginstance *pi)
- +{
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- + struct llist_head *curr, *tmp;
- + struct ipfix_msg *msg;
- + int ret = ULOGD_IRET_OK, sent;
- +
- + llist_for_each_prev(curr, &priv->list) {
- + msg = llist_entry(curr, struct ipfix_msg, link);
- +
- + sent = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
- + if (sent < 0) {
- + ulogd_log(ULOGD_ERROR, "send: %m\n");
- + ret = ULOGD_IRET_ERR;
- + goto done;
- + }
- +
- + /* TODO handle short send() for other protocols */
- + if ((size_t) sent < ipfix_msg_len(msg))
- + ulogd_log(ULOGD_ERROR, "short send: %d < %d\n",
- + sent, ipfix_msg_len(msg));
- + }
- +
- + llist_for_each_safe(curr, tmp, &priv->list) {
- + msg = llist_entry(curr, struct ipfix_msg, link);
- + llist_del(curr);
- + msg->nrecs = 0;
- + ipfix_msg_free(msg);
- + }
- +
- +done:
- + return ret;
- +}
- +
- +static int ipfix_ufd_cb(int fd, unsigned what, void *arg)
- +{
- + struct ulogd_pluginstance *pi = arg;
- + struct ipfix_priv *priv = (struct ipfix_priv *) pi->private;
- + ssize_t nread;
- + char buf[16];
- +
- + if (what & ULOGD_FD_READ) {
- + nread = recv(priv->ufd.fd, buf, sizeof(buf), MSG_DONTWAIT);
- + if (nread < 0) {
- + ulogd_log(ULOGD_ERROR, "recv: %m\n");
- + } else if (!nread) {
- + ulogd_log(ULOGD_INFO, "connection reset by peer\n");
- + ulogd_unregister_fd(&priv->ufd);
- + } else
- + ulogd_log(ULOGD_INFO, "unexpected data (%d bytes)\n", nread);
- + }
- +
- + return 0;
- +}
- +
- +static void ipfix_timer_cb(struct ulogd_timer *t, void *data)
- +{
- + struct ulogd_pluginstance *pi = data;
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- +
- + if (priv->msg && priv->msg->nrecs > 0) {
- + enqueue_msg(priv, priv->msg);
- + priv->msg = NULL;
- + send_msgs(pi);
- + }
- +}
- +
- +static int ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginstance_stack *stack)
- +{
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- + int oid, port, mtu, ret;
- + char *host, *proto;
- + char addr[16];
- +
- + ret = config_parse_file(pi->id, pi->config_kset);
- + if (ret < 0)
- + return ret;
- +
- + oid = oid_ce(pi->config_kset).u.value;
- + host = host_ce(pi->config_kset).u.string;
- + port = port_ce(pi->config_kset).u.value;
- + proto = proto_ce(pi->config_kset).u.string;
- + mtu = mtu_ce(pi->config_kset).u.value;
- +
- + if (!oid) {
- + ulogd_log(ULOGD_FATAL, "invalid Observation ID\n");
- + return ULOGD_IRET_ERR;
- + }
- + if (!host || !strcmp(host, "")) {
- + ulogd_log(ULOGD_FATAL, "no destination host specified\n");
- + return ULOGD_IRET_ERR;
- + }
- +
- + if (!strcmp(proto, "udp")) {
- + priv->proto = IPPROTO_UDP;
- + } else if (!strcmp(proto, "tcp")) {
- + priv->proto = IPPROTO_TCP;
- + } else {
- + ulogd_log(ULOGD_FATAL, "unsupported protocol '%s'\n", proto);
- + return ULOGD_IRET_ERR;
- + }
- +
- + memset(&priv->sa, 0, sizeof(priv->sa));
- + priv->sa.sin_family = AF_INET;
- + priv->sa.sin_port = htons(port);
- + ret = inet_pton(AF_INET, host, &priv->sa.sin_addr);
- + if (ret <= 0) {
- + ulogd_log(ULOGD_FATAL, "inet_pton: %m\n");
- + return ULOGD_IRET_ERR;
- + }
- +
- + INIT_LLIST_HEAD(&priv->list);
- +
- + ulogd_init_timer(&priv->timer, pi, ipfix_timer_cb);
- +
- + ulogd_log(ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
- + inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
- + port, mtu);
- +
- + return ULOGD_IRET_OK;
- +}
- +
- +static int tcp_connect(struct ulogd_pluginstance *pi)
- +{
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- + int ret = ULOGD_IRET_ERR;
- +
- + if ((priv->ufd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- + ulogd_log(ULOGD_FATAL, "socket: %m\n");
- + return ULOGD_IRET_ERR;
- + }
- +
- + if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
- + ulogd_log(ULOGD_ERROR, "connect: %m\n");
- + ret = ULOGD_IRET_ERR;
- + goto err_close;
- + }
- +
- + return ULOGD_IRET_OK;
- +
- +err_close:
- + close(priv->ufd.fd);
- + return ret;
- +}
- +
- +static int udp_connect(struct ulogd_pluginstance *pi)
- +{
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- +
- + if ((priv->ufd.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
- + ulogd_log(ULOGD_FATAL, "socket: %m\n");
- + return ULOGD_IRET_ERR;
- + }
- +
- + if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
- + ulogd_log(ULOGD_ERROR, "connect: %m\n");
- + return ULOGD_IRET_ERR;
- + }
- +
- + return 0;
- +}
- +
- +static int ipfix_start(struct ulogd_pluginstance *pi)
- +{
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- + char addr[16];
- + int port, ret;
- +
- + switch (priv->proto) {
- + case IPPROTO_UDP:
- + if ((ret = udp_connect(pi)) < 0)
- + return ret;
- + break;
- + case IPPROTO_TCP:
- + if ((ret = tcp_connect(pi)) < 0)
- + return ret;
- + break;
- +
- + default:
- + break;
- + }
- +
- + priv->seqno = 0;
- +
- + port = port_ce(pi->config_kset).u.value;
- + ulogd_log(ULOGD_INFO, "connected to %s:%d\n",
- + inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
- + port);
- +
- + /* Register the socket FD */
- + priv->ufd.when = ULOGD_FD_READ;
- + priv->ufd.cb = ipfix_ufd_cb;
- + priv->ufd.data = pi;
- +
- + if (ulogd_register_fd(&priv->ufd) < 0)
- + return ULOGD_IRET_ERR;
- +
- + /* Add a 1 second timer */
- + ulogd_add_timer(&priv->timer, 1);
- +
- + return ULOGD_IRET_OK;
- +}
- +
- +static int ipfix_stop(struct ulogd_pluginstance *pi)
- +{
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- +
- + ulogd_unregister_fd(&priv->ufd);
- + close(priv->ufd.fd);
- + priv->ufd.fd = -1;
- +
- + ulogd_del_timer(&priv->timer);
- +
- + ipfix_msg_free(priv->msg);
- + priv->msg = NULL;
- +
- + return 0;
- +}
- +
- +static int ipfix_interp(struct ulogd_pluginstance *pi)
- +{
- + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
- + struct vy_ipfix_data *data;
- + int oid, mtu, ret;
- + char addr[16];
- +
- + if (!(GET_FLAGS(pi->input.keys, InIpSaddr) & ULOGD_RETF_VALID))
- + return ULOGD_IRET_OK;
- +
- + oid = oid_ce(pi->config_kset).u.value;
- + mtu = mtu_ce(pi->config_kset).u.value;
- +
- +again:
- + if (!priv->msg) {
- + priv->msg = ipfix_msg_alloc(mtu, oid);
- + if (!priv->msg) {
- + /* just drop this flow */
- + ulogd_log(ULOGD_ERROR, "out of memory, dropping flow\n");
- + return ULOGD_IRET_OK;
- + }
- + ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
- + }
- +
- + data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
- + if (!data) {
- + enqueue_msg(priv, priv->msg);
- + priv->msg = NULL;
- + /* can't loop because the next will definitely succeed */
- + goto again;
- + }
- +
- + data->ifi_in = data->ifi_out = 0;
- +
- + data->saddr.s_addr = ikey_get_u32(&pi->input.keys[InIpSaddr]);
- + data->daddr.s_addr = ikey_get_u32(&pi->input.keys[InIpDaddr]);
- +
- + data->packets = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktCount])
- + + ikey_get_u64(&pi->input.keys[InRawOutPktCount])));
- + data->bytes = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktLen])
- + + ikey_get_u64(&pi->input.keys[InRawOutPktLen])));
- +
- + data->start = htonl(ikey_get_u32(&pi->input.keys[InFlowStartSec]));
- + data->end = htonl(ikey_get_u32(&pi->input.keys[InFlowEndSec]));
- +
- + if (GET_FLAGS(pi->input.keys, InL4SPort) & ULOGD_RETF_VALID) {
- + data->sport = htons(ikey_get_u16(&pi->input.keys[InL4SPort]));
- + data->dport = htons(ikey_get_u16(&pi->input.keys[InL4DPort]));
- + }
- +
- + data->aid = 0;
- + if (GET_FLAGS(pi->input.keys, InCtMark) & ULOGD_RETF_VALID)
- + data->aid = htonl(ikey_get_u32(&pi->input.keys[InCtMark]));
- +
- + data->l4_proto = ikey_get_u8(&pi->input.keys[InIpProto]);
- + data->__padding = 0;
- +
- + ulogd_log(ULOGD_DEBUG, "Got new packet (packets = %u, bytes = %u, flow = (%u, %u), saddr = %s, daddr = %s, sport = %u, dport = %u)\n",
- + ntohl(data->packets), ntohl(data->bytes), ntohl(data->start), ntohl(data->end),
- + inet_ntop(AF_INET, &data->saddr.s_addr, addr, sizeof(addr)),
- + inet_ntop(AF_INET, &data->daddr.s_addr, addr, sizeof(addr)),
- + ntohs(data->sport), ntohs(data->dport));
- +
- + if ((ret = send_msgs(pi)) < 0)
- + return ret;
- +
- + return ULOGD_IRET_OK;
- +}
- +
- +static struct ulogd_plugin ipfix_plugin = {
- + .name = "IPFIX",
- + .input = {
- + .keys = ipfix_in_keys,
- + .num_keys = ARRAY_SIZE(ipfix_in_keys),
- + .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW | ULOGD_DTYPE_SUM
- + },
- + .output = {
- + .type = ULOGD_DTYPE_SINK
- + },
- + .config_kset = (struct config_keyset *) &ipfix_kset,
- + .priv_size = sizeof(struct ipfix_priv),
- + .configure = ipfix_configure,
- + .start = ipfix_start,
- + .stop = ipfix_stop,
- + .interp = ipfix_interp,
- + .version = VERSION,
- +};
- +
- +void __attribute__ ((constructor)) init(void);
- +
- +void init(void)
- +{
- + ulogd_register_plugin(&ipfix_plugin);
- +}
|