You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

879 lines
22 KiB

  1. From 4f639231c83b09ea004c03e95c702b7750bf9930 Mon Sep 17 00:00:00 2001
  2. From: Ander Juaristi <a@juaristi.eus>
  3. Date: Fri, 26 Apr 2019 09:58:06 +0200
  4. Subject: IPFIX: Add IPFIX output plugin
  5. This patch adds an IPFIX output plugin to ulogd2. It generates NetFlow/IPFIX
  6. traces and sends them to a remote server (collector) via TCP or UDP.
  7. Based on original work by Holger Eitzenberger <holger@eitzenberger.org>.
  8. How to test this
  9. ----------------
  10. I am currently testing this with the NFCT input and Wireshark.
  11. Place the following in ulogd.conf:
  12. # this will print all flows on screen
  13. loglevel=1
  14. # load NFCT and IPFIX plugins
  15. plugin="/lib/ulogd/ulogd_inpflow_NFCT.so"
  16. plugin="/lib/ulogd/ulogd_output_IPFIX.so"
  17. stack=ct1:NFCT,ipfix1:IPFIX
  18. [ct1]
  19. netlink_socket_buffer_size=217088
  20. netlink_socket_buffer_maxsize=1085440
  21. accept_proto_filter=tcp,sctp
  22. [ipfix1]
  23. oid=1
  24. host="127.0.0.1"
  25. #port=4739
  26. #send_template="once"
  27. I am currently testing it by launching a plain NetCat listener on port
  28. 4739 (the default for IPFIX) and then running Wireshark and see that it
  29. dissects the IPFIX/NetFlow traffic correctly (obviously this relies on
  30. the Wireshark NetFlow dissector being correct).
  31. First:
  32. nc -vvvv -l 127.0.0.1 4739
  33. Then:
  34. sudo ulogd -vc ulogd.conf
  35. Signed-off-by: Ander Juaristi <a@juaristi.eus>
  36. Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
  37. ---
  38. configure.ac | 2 +-
  39. include/ulogd/ulogd.h | 5 +
  40. input/flow/ulogd_inpflow_IPFIX.c | 2 -
  41. output/Makefile.am | 2 +-
  42. output/ipfix/Makefile.am | 7 +
  43. output/ipfix/ipfix.c | 141 ++++++++++
  44. output/ipfix/ipfix.h | 89 +++++++
  45. output/ipfix/ulogd_output_IPFIX.c | 503 +++++++++++++++++++++++++++++++++++
  46. output/ulogd_output_IPFIX.c | 546 --------------------------------------
  47. 9 files changed, 747 insertions(+), 550 deletions(-)
  48. delete mode 100644 input/flow/ulogd_inpflow_IPFIX.c
  49. create mode 100644 output/ipfix/Makefile.am
  50. create mode 100644 output/ipfix/ipfix.c
  51. create mode 100644 output/ipfix/ipfix.h
  52. create mode 100644 output/ipfix/ulogd_output_IPFIX.c
  53. delete mode 100644 output/ulogd_output_IPFIX.c
  54. diff --git a/configure.ac b/configure.ac
  55. index 3aa0624..48b4995 100644
  56. --- a/configure.ac
  57. +++ b/configure.ac
  58. @@ -179,7 +179,7 @@ AC_CONFIG_FILES(include/Makefile include/ulogd/Makefile include/libipulog/Makefi
  59. input/sum/Makefile \
  60. filter/Makefile filter/raw2packet/Makefile filter/packet2flow/Makefile \
  61. output/Makefile output/pcap/Makefile output/mysql/Makefile output/pgsql/Makefile output/sqlite3/Makefile \
  62. - output/dbi/Makefile \
  63. + output/dbi/Makefile output/ipfix/Makefile \
  64. src/Makefile Makefile Rules.make)
  65. AC_OUTPUT
  66. diff --git a/include/ulogd/ulogd.h b/include/ulogd/ulogd.h
  67. index 2e38195..1636a8c 100644
  68. --- a/include/ulogd/ulogd.h
  69. +++ b/include/ulogd/ulogd.h
  70. @@ -28,6 +28,11 @@
  71. /* types without length */
  72. #define ULOGD_RET_NONE 0x0000
  73. +#define __packed __attribute__((packed))
  74. +#define __noreturn __attribute__((noreturn))
  75. +#define __cold __attribute__((cold))
  76. +
  77. +#define __packed __attribute__((packed))
  78. #define ULOGD_RET_INT8 0x0001
  79. #define ULOGD_RET_INT16 0x0002
  80. diff --git a/output/Makefile.am b/output/Makefile.am
  81. index ff851ad..7ba8217 100644
  82. --- a/output/Makefile.am
  83. +++ b/output/Makefile.am
  84. @@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include ${LIBNETFILTER_ACCT_CFLAGS} \
  85. ${LIBNETFILTER_CONNTRACK_CFLAGS} ${LIBNETFILTER_LOG_CFLAGS}
  86. AM_CFLAGS = ${regular_CFLAGS}
  87. -SUBDIRS= pcap mysql pgsql sqlite3 dbi
  88. +SUBDIRS= pcap mysql pgsql sqlite3 dbi ipfix
  89. pkglib_LTLIBRARIES = ulogd_output_LOGEMU.la ulogd_output_SYSLOG.la \
  90. ulogd_output_OPRINT.la ulogd_output_GPRINT.la \
  91. diff --git a/output/ipfix/Makefile.am b/output/ipfix/Makefile.am
  92. new file mode 100644
  93. index 0000000..cacda26
  94. --- /dev/null
  95. +++ b/output/ipfix/Makefile.am
  96. @@ -0,0 +1,7 @@
  97. +AM_CPPFLAGS = -I$(top_srcdir)/include
  98. +AM_CFLAGS = $(regular_CFLAGS)
  99. +
  100. +pkglib_LTLIBRARIES = ulogd_output_IPFIX.la
  101. +
  102. +ulogd_output_IPFIX_la_SOURCES = ulogd_output_IPFIX.c ipfix.c
  103. +ulogd_output_IPFIX_la_LDFLAGS = -avoid-version -module
  104. diff --git a/output/ipfix/ipfix.c b/output/ipfix/ipfix.c
  105. new file mode 100644
  106. index 0000000..60a4c7f
  107. --- /dev/null
  108. +++ b/output/ipfix/ipfix.c
  109. @@ -0,0 +1,141 @@
  110. +/*
  111. + * ipfix.c
  112. + *
  113. + * Holger Eitzenberger, 2009.
  114. + */
  115. +
  116. +/* These forward declarations are needed since ulogd.h doesn't like to be the first */
  117. +#include <ulogd/linuxlist.h>
  118. +
  119. +#define __packed __attribute__((packed))
  120. +
  121. +#include "ipfix.h"
  122. +
  123. +#include <ulogd/ulogd.h>
  124. +#include <ulogd/common.h>
  125. +
  126. +struct ipfix_msg *ipfix_msg_alloc(size_t len, uint32_t oid)
  127. +{
  128. + struct ipfix_msg *msg;
  129. + struct ipfix_hdr *hdr;
  130. +
  131. + if (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN)
  132. + return NULL;
  133. +
  134. + msg = malloc(sizeof(struct ipfix_msg) + len);
  135. + memset(msg, 0, sizeof(struct ipfix_msg));
  136. + msg->tail = msg->data + IPFIX_HDRLEN;
  137. + msg->end = msg->data + len;
  138. +
  139. + hdr = ipfix_msg_hdr(msg);
  140. + memset(hdr, 0, IPFIX_HDRLEN);
  141. + hdr->version = htons(IPFIX_VERSION);
  142. + hdr->oid = htonl(oid);
  143. +
  144. + return msg;
  145. +}
  146. +
  147. +void ipfix_msg_free(struct ipfix_msg *msg)
  148. +{
  149. + if (!msg)
  150. + return;
  151. +
  152. + if (msg->nrecs > 0)
  153. + ulogd_log(ULOGD_DEBUG, "%s: %d flows have been lost\n", __func__,
  154. + msg->nrecs);
  155. +
  156. + free(msg);
  157. +}
  158. +
  159. +struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *msg)
  160. +{
  161. + return (struct ipfix_hdr *)msg->data;
  162. +}
  163. +
  164. +void *ipfix_msg_data(struct ipfix_msg *msg)
  165. +{
  166. + return msg->data;
  167. +}
  168. +
  169. +size_t ipfix_msg_len(const struct ipfix_msg *msg)
  170. +{
  171. + return msg->tail - msg->data;
  172. +}
  173. +
  174. +struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *msg, uint16_t sid)
  175. +{
  176. + struct ipfix_set_hdr *shdr;
  177. +
  178. + if (msg->end - msg->tail < (int) IPFIX_SET_HDRLEN)
  179. + return NULL;
  180. +
  181. + shdr = (struct ipfix_set_hdr *)msg->tail;
  182. + shdr->id = sid;
  183. + shdr->len = IPFIX_SET_HDRLEN;
  184. + msg->tail += IPFIX_SET_HDRLEN;
  185. + msg->last_set = shdr;
  186. + return shdr;
  187. +}
  188. +
  189. +struct ipfix_set_hdr *ipfix_msg_get_set(const struct ipfix_msg *msg)
  190. +{
  191. + return msg->last_set;
  192. +}
  193. +
  194. +/**
  195. + * Add data record to an IPFIX message. The data is accounted properly.
  196. + *
  197. + * @return pointer to data or %NULL if not that much space left.
  198. + */
  199. +void *ipfix_msg_add_data(struct ipfix_msg *msg, size_t len)
  200. +{
  201. + void *data;
  202. +
  203. + if (!msg->last_set) {
  204. + ulogd_log(ULOGD_FATAL, "msg->last_set is NULL\n");
  205. + return NULL;
  206. + }
  207. +
  208. + if ((ssize_t) len > msg->end - msg->tail)
  209. + return NULL;
  210. +
  211. + data = msg->tail;
  212. + msg->tail += len;
  213. + msg->nrecs++;
  214. + msg->last_set->len += len;
  215. +
  216. + return data;
  217. +}
  218. +
  219. +/* check and dump message */
  220. +int ipfix_dump_msg(const struct ipfix_msg *msg)
  221. +{
  222. + const struct ipfix_hdr *hdr = ipfix_msg_hdr(msg);
  223. + const struct ipfix_set_hdr *shdr = (struct ipfix_set_hdr *) hdr->data;
  224. +
  225. + if (ntohs(hdr->len) < IPFIX_HDRLEN) {
  226. + ulogd_log(ULOGD_FATAL, "Invalid IPFIX message header length\n");
  227. + return -1;
  228. + }
  229. + if (ipfix_msg_len(msg) != IPFIX_HDRLEN + ntohs(shdr->len)) {
  230. + ulogd_log(ULOGD_FATAL, "Invalid IPFIX message length\n");
  231. + return -1;
  232. + }
  233. +
  234. + ulogd_log(ULOGD_DEBUG, "msg: ver=%#x len=%#x t=%#x seq=%#x oid=%d\n",
  235. + ntohs(hdr->version), ntohs(hdr->len), htonl(hdr->time),
  236. + ntohl(hdr->seqno), ntohl(hdr->oid));
  237. +
  238. + return 0;
  239. +}
  240. +
  241. +/* template management */
  242. +size_t ipfix_rec_len(uint16_t sid)
  243. +{
  244. + if (sid != htons(VY_IPFIX_SID)) {
  245. + ulogd_log(ULOGD_FATAL, "Invalid SID\n");
  246. + return 0;
  247. + }
  248. +
  249. + return sizeof(struct vy_ipfix_data);
  250. +}
  251. diff --git a/output/ipfix/ipfix.h b/output/ipfix/ipfix.h
  252. new file mode 100644
  253. index 0000000..cdb5a6f
  254. --- /dev/null
  255. +++ b/output/ipfix/ipfix.h
  256. @@ -0,0 +1,89 @@
  257. +/*
  258. + * ipfix.h
  259. + *
  260. + * Holger Eitzenberger <holger@eitzenberger.org>, 2009.
  261. + */
  262. +#ifndef IPFIX_H
  263. +#define IPFIX_H
  264. +
  265. +#include <stdint.h>
  266. +#include <netinet/in.h>
  267. +
  268. +
  269. +struct ipfix_hdr {
  270. +#define IPFIX_VERSION 0xa
  271. + uint16_t version;
  272. + uint16_t len;
  273. + uint32_t time;
  274. + uint32_t seqno;
  275. + uint32_t oid; /* Observation Domain ID */
  276. + uint8_t data[];
  277. +} __packed;
  278. +
  279. +#define IPFIX_HDRLEN sizeof(struct ipfix_hdr)
  280. +
  281. +/*
  282. + * IDs 0-255 are reserved for Template Sets. IDs of Data Sets are > 255.
  283. + */
  284. +struct ipfix_templ_hdr {
  285. + uint16_t id;
  286. + uint16_t cnt;
  287. + uint8_t data[];
  288. +} __packed;
  289. +
  290. +struct ipfix_set_hdr {
  291. +#define IPFIX_SET_TEMPL 2
  292. +#define IPFIX_SET_OPT_TEMPL 3
  293. + uint16_t id;
  294. + uint16_t len;
  295. + uint8_t data[];
  296. +} __packed;
  297. +
  298. +#define IPFIX_SET_HDRLEN sizeof(struct ipfix_set_hdr)
  299. +
  300. +struct ipfix_msg {
  301. + struct llist_head link;
  302. + uint8_t *tail;
  303. + uint8_t *end;
  304. + unsigned nrecs;
  305. + struct ipfix_set_hdr *last_set;
  306. + uint8_t data[];
  307. +};
  308. +
  309. +struct vy_ipfix_data {
  310. + struct in_addr saddr;
  311. + struct in_addr daddr;
  312. + uint16_t ifi_in;
  313. + uint16_t ifi_out;
  314. + uint32_t packets;
  315. + uint32_t bytes;
  316. + uint32_t start; /* Unix time */
  317. + uint32_t end; /* Unix time */
  318. + uint16_t sport;
  319. + uint16_t dport;
  320. + uint32_t aid; /* Application ID */
  321. + uint8_t l4_proto;
  322. + uint8_t dscp;
  323. + uint16_t __padding;
  324. +} __packed;
  325. +
  326. +#define VY_IPFIX_SID 256
  327. +
  328. +#define VY_IPFIX_FLOWS 36
  329. +#define VY_IPFIX_PKT_LEN (IPFIX_HDRLEN + IPFIX_SET_HDRLEN \
  330. + + VY_IPFIX_FLOWS * sizeof(struct vy_ipfix_data))
  331. +
  332. +/* template management */
  333. +size_t ipfix_rec_len(uint16_t);
  334. +
  335. +/* message handling */
  336. +struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t);
  337. +void ipfix_msg_free(struct ipfix_msg *);
  338. +struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *);
  339. +size_t ipfix_msg_len(const struct ipfix_msg *);
  340. +void *ipfix_msg_data(struct ipfix_msg *);
  341. +struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *, uint16_t);
  342. +void *ipfix_msg_add_data(struct ipfix_msg *, size_t);
  343. +int ipfix_dump_msg(const struct ipfix_msg *);
  344. +
  345. +#endif /* IPFIX_H */
  346. diff --git a/output/ipfix/ulogd_output_IPFIX.c b/output/ipfix/ulogd_output_IPFIX.c
  347. new file mode 100644
  348. index 0000000..ec143b1
  349. --- /dev/null
  350. +++ b/output/ipfix/ulogd_output_IPFIX.c
  351. @@ -0,0 +1,503 @@
  352. +/*
  353. + * ulogd_output_IPFIX.c
  354. + *
  355. + * ulogd IPFIX Exporter plugin.
  356. + *
  357. + * This program is distributed in the hope that it will be useful,
  358. + * but WITHOUT ANY WARRANTY; without even the implied warranty of
  359. + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  360. + * GNU General Public License for more details.
  361. + *
  362. + * You should have received a copy of the GNU General Public License
  363. + * along with this program; if not, write to the Free Software
  364. + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  365. + *
  366. + * Holger Eitzenberger <holger@eitzenberger.org> Astaro AG 2009
  367. + */
  368. +#include <unistd.h>
  369. +#include <time.h>
  370. +#include <sys/types.h>
  371. +#include <sys/socket.h>
  372. +#include <arpa/inet.h>
  373. +#include <netdb.h>
  374. +#include <ulogd/ulogd.h>
  375. +#include <ulogd/common.h>
  376. +
  377. +#include "ipfix.h"
  378. +
  379. +#define DEFAULT_MTU 512 /* RFC 5101, 10.3.3 */
  380. +#define DEFAULT_PORT 4739 /* RFC 5101, 10.3.4 */
  381. +#define DEFAULT_SPORT 4740
  382. +
  383. +enum {
  384. + OID_CE = 0,
  385. + HOST_CE,
  386. + PORT_CE,
  387. + PROTO_CE,
  388. + MTU_CE,
  389. +};
  390. +
  391. +#define oid_ce(x) (x->ces[OID_CE])
  392. +#define host_ce(x) (x->ces[HOST_CE])
  393. +#define port_ce(x) (x->ces[PORT_CE])
  394. +#define proto_ce(x) (x->ces[PROTO_CE])
  395. +#define mtu_ce(x) (x->ces[MTU_CE])
  396. +
  397. +static const struct config_keyset ipfix_kset = {
  398. + .num_ces = 5,
  399. + .ces = {
  400. + {
  401. + .key = "oid",
  402. + .type = CONFIG_TYPE_INT,
  403. + .u.value = 0
  404. + },
  405. + {
  406. + .key = "host",
  407. + .type = CONFIG_TYPE_STRING,
  408. + .u.string = ""
  409. + },
  410. + {
  411. + .key = "port",
  412. + .type = CONFIG_TYPE_INT,
  413. + .u.value = DEFAULT_PORT
  414. + },
  415. + {
  416. + .key = "proto",
  417. + .type = CONFIG_TYPE_STRING,
  418. + .u.string = "tcp"
  419. + },
  420. + {
  421. + .key = "mtu",
  422. + .type = CONFIG_TYPE_INT,
  423. + .u.value = DEFAULT_MTU
  424. + }
  425. + }
  426. +};
  427. +
  428. +struct ipfix_templ {
  429. + struct ipfix_templ *next;
  430. +};
  431. +
  432. +struct ipfix_priv {
  433. + struct ulogd_fd ufd;
  434. + uint32_t seqno;
  435. + struct ipfix_msg *msg; /* current message */
  436. + struct llist_head list;
  437. + struct ipfix_templ *templates;
  438. + int proto;
  439. + struct ulogd_timer timer;
  440. + struct sockaddr_in sa;
  441. +};
  442. +
  443. +enum {
  444. + InIpSaddr = 0,
  445. + InIpDaddr,
  446. + InRawInPktCount,
  447. + InRawInPktLen,
  448. + InRawOutPktCount,
  449. + InRawOutPktLen,
  450. + InFlowStartSec,
  451. + InFlowStartUsec,
  452. + InFlowEndSec,
  453. + InFlowEndUsec,
  454. + InL4SPort,
  455. + InL4DPort,
  456. + InIpProto,
  457. + InCtMark
  458. +};
  459. +
  460. +static struct ulogd_key ipfix_in_keys[] = {
  461. + [InIpSaddr] = {
  462. + .type = ULOGD_RET_IPADDR,
  463. + .name = "orig.ip.saddr"
  464. + },
  465. + [InIpDaddr] = {
  466. + .type = ULOGD_RET_IPADDR,
  467. + .name = "orig.ip.daddr"
  468. + },
  469. + [InRawInPktCount] = {
  470. + .type = ULOGD_RET_UINT64,
  471. + .name = "orig.raw.pktcount"
  472. + },
  473. + [InRawInPktLen] = {
  474. + .type = ULOGD_RET_UINT64,
  475. + .name = "orig.raw.pktlen"
  476. + },
  477. + [InRawOutPktCount] = {
  478. + .type = ULOGD_RET_UINT64,
  479. + .name = "reply.raw.pktcount"
  480. + },
  481. + [InRawOutPktLen] = {
  482. + .type = ULOGD_RET_UINT64,
  483. + .name = "reply.raw.pktlen"
  484. + },
  485. + [InFlowStartSec] = {
  486. + .type = ULOGD_RET_UINT32,
  487. + .name = "flow.start.sec"
  488. + },
  489. + [InFlowStartUsec] = {
  490. + .type = ULOGD_RET_UINT32,
  491. + .name = "flow.start.usec"
  492. + },
  493. + [InFlowEndSec] = {
  494. + .type = ULOGD_RET_UINT32,
  495. + .name = "flow.end.sec"
  496. + },
  497. + [InFlowEndUsec] = {
  498. + .type = ULOGD_RET_UINT32,
  499. + .name = "flow.end.usec"
  500. + },
  501. + [InL4SPort] = {
  502. + .type = ULOGD_RET_UINT16,
  503. + .name = "orig.l4.sport"
  504. + },
  505. + [InL4DPort] = {
  506. + .type = ULOGD_RET_UINT16,
  507. + .name = "orig.l4.dport"
  508. + },
  509. + [InIpProto] = {
  510. + .type = ULOGD_RET_UINT8,
  511. + .name = "orig.ip.protocol"
  512. + },
  513. + [InCtMark] = {
  514. + .type = ULOGD_RET_UINT32,
  515. + .name = "ct.mark"
  516. + }
  517. +};
  518. +
  519. +/* do some polishing and enqueue it */
  520. +static void enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
  521. +{
  522. + struct ipfix_hdr *hdr = ipfix_msg_data(msg);
  523. +
  524. + if (!msg)
  525. + return;
  526. +
  527. + hdr->time = htonl(time(NULL));
  528. + hdr->seqno = htonl(priv->seqno += msg->nrecs);
  529. + if (msg->last_set) {
  530. + msg->last_set->id = htons(msg->last_set->id);
  531. + msg->last_set->len = htons(msg->last_set->len);
  532. + msg->last_set = NULL;
  533. + }
  534. + hdr->len = htons(ipfix_msg_len(msg));
  535. +
  536. + llist_add(&msg->link, &priv->list);
  537. +}
  538. +
  539. +/**
  540. + * @return %ULOGD_IRET_OK or error value
  541. + */
  542. +static int send_msgs(struct ulogd_pluginstance *pi)
  543. +{
  544. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  545. + struct llist_head *curr, *tmp;
  546. + struct ipfix_msg *msg;
  547. + int ret = ULOGD_IRET_OK, sent;
  548. +
  549. + llist_for_each_prev(curr, &priv->list) {
  550. + msg = llist_entry(curr, struct ipfix_msg, link);
  551. +
  552. + sent = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
  553. + if (sent < 0) {
  554. + ulogd_log(ULOGD_ERROR, "send: %m\n");
  555. + ret = ULOGD_IRET_ERR;
  556. + goto done;
  557. + }
  558. +
  559. + /* TODO handle short send() for other protocols */
  560. + if ((size_t) sent < ipfix_msg_len(msg))
  561. + ulogd_log(ULOGD_ERROR, "short send: %d < %d\n",
  562. + sent, ipfix_msg_len(msg));
  563. + }
  564. +
  565. + llist_for_each_safe(curr, tmp, &priv->list) {
  566. + msg = llist_entry(curr, struct ipfix_msg, link);
  567. + llist_del(curr);
  568. + msg->nrecs = 0;
  569. + ipfix_msg_free(msg);
  570. + }
  571. +
  572. +done:
  573. + return ret;
  574. +}
  575. +
  576. +static int ipfix_ufd_cb(int fd, unsigned what, void *arg)
  577. +{
  578. + struct ulogd_pluginstance *pi = arg;
  579. + struct ipfix_priv *priv = (struct ipfix_priv *) pi->private;
  580. + ssize_t nread;
  581. + char buf[16];
  582. +
  583. + if (what & ULOGD_FD_READ) {
  584. + nread = recv(priv->ufd.fd, buf, sizeof(buf), MSG_DONTWAIT);
  585. + if (nread < 0) {
  586. + ulogd_log(ULOGD_ERROR, "recv: %m\n");
  587. + } else if (!nread) {
  588. + ulogd_log(ULOGD_INFO, "connection reset by peer\n");
  589. + ulogd_unregister_fd(&priv->ufd);
  590. + } else
  591. + ulogd_log(ULOGD_INFO, "unexpected data (%d bytes)\n", nread);
  592. + }
  593. +
  594. + return 0;
  595. +}
  596. +
  597. +static void ipfix_timer_cb(struct ulogd_timer *t, void *data)
  598. +{
  599. + struct ulogd_pluginstance *pi = data;
  600. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  601. +
  602. + if (priv->msg && priv->msg->nrecs > 0) {
  603. + enqueue_msg(priv, priv->msg);
  604. + priv->msg = NULL;
  605. + send_msgs(pi);
  606. + }
  607. +}
  608. +
  609. +static int ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginstance_stack *stack)
  610. +{
  611. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  612. + int oid, port, mtu, ret;
  613. + char *host, *proto;
  614. + char addr[16];
  615. +
  616. + ret = config_parse_file(pi->id, pi->config_kset);
  617. + if (ret < 0)
  618. + return ret;
  619. +
  620. + oid = oid_ce(pi->config_kset).u.value;
  621. + host = host_ce(pi->config_kset).u.string;
  622. + port = port_ce(pi->config_kset).u.value;
  623. + proto = proto_ce(pi->config_kset).u.string;
  624. + mtu = mtu_ce(pi->config_kset).u.value;
  625. +
  626. + if (!oid) {
  627. + ulogd_log(ULOGD_FATAL, "invalid Observation ID\n");
  628. + return ULOGD_IRET_ERR;
  629. + }
  630. + if (!host || !strcmp(host, "")) {
  631. + ulogd_log(ULOGD_FATAL, "no destination host specified\n");
  632. + return ULOGD_IRET_ERR;
  633. + }
  634. +
  635. + if (!strcmp(proto, "udp")) {
  636. + priv->proto = IPPROTO_UDP;
  637. + } else if (!strcmp(proto, "tcp")) {
  638. + priv->proto = IPPROTO_TCP;
  639. + } else {
  640. + ulogd_log(ULOGD_FATAL, "unsupported protocol '%s'\n", proto);
  641. + return ULOGD_IRET_ERR;
  642. + }
  643. +
  644. + memset(&priv->sa, 0, sizeof(priv->sa));
  645. + priv->sa.sin_family = AF_INET;
  646. + priv->sa.sin_port = htons(port);
  647. + ret = inet_pton(AF_INET, host, &priv->sa.sin_addr);
  648. + if (ret <= 0) {
  649. + ulogd_log(ULOGD_FATAL, "inet_pton: %m\n");
  650. + return ULOGD_IRET_ERR;
  651. + }
  652. +
  653. + INIT_LLIST_HEAD(&priv->list);
  654. +
  655. + ulogd_init_timer(&priv->timer, pi, ipfix_timer_cb);
  656. +
  657. + ulogd_log(ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
  658. + inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
  659. + port, mtu);
  660. +
  661. + return ULOGD_IRET_OK;
  662. +}
  663. +
  664. +static int tcp_connect(struct ulogd_pluginstance *pi)
  665. +{
  666. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  667. + int ret = ULOGD_IRET_ERR;
  668. +
  669. + if ((priv->ufd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
  670. + ulogd_log(ULOGD_FATAL, "socket: %m\n");
  671. + return ULOGD_IRET_ERR;
  672. + }
  673. +
  674. + if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
  675. + ulogd_log(ULOGD_ERROR, "connect: %m\n");
  676. + ret = ULOGD_IRET_ERR;
  677. + goto err_close;
  678. + }
  679. +
  680. + return ULOGD_IRET_OK;
  681. +
  682. +err_close:
  683. + close(priv->ufd.fd);
  684. + return ret;
  685. +}
  686. +
  687. +static int udp_connect(struct ulogd_pluginstance *pi)
  688. +{
  689. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  690. +
  691. + if ((priv->ufd.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
  692. + ulogd_log(ULOGD_FATAL, "socket: %m\n");
  693. + return ULOGD_IRET_ERR;
  694. + }
  695. +
  696. + if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
  697. + ulogd_log(ULOGD_ERROR, "connect: %m\n");
  698. + return ULOGD_IRET_ERR;
  699. + }
  700. +
  701. + return 0;
  702. +}
  703. +
  704. +static int ipfix_start(struct ulogd_pluginstance *pi)
  705. +{
  706. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  707. + char addr[16];
  708. + int port, ret;
  709. +
  710. + switch (priv->proto) {
  711. + case IPPROTO_UDP:
  712. + if ((ret = udp_connect(pi)) < 0)
  713. + return ret;
  714. + break;
  715. + case IPPROTO_TCP:
  716. + if ((ret = tcp_connect(pi)) < 0)
  717. + return ret;
  718. + break;
  719. +
  720. + default:
  721. + break;
  722. + }
  723. +
  724. + priv->seqno = 0;
  725. +
  726. + port = port_ce(pi->config_kset).u.value;
  727. + ulogd_log(ULOGD_INFO, "connected to %s:%d\n",
  728. + inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
  729. + port);
  730. +
  731. + /* Register the socket FD */
  732. + priv->ufd.when = ULOGD_FD_READ;
  733. + priv->ufd.cb = ipfix_ufd_cb;
  734. + priv->ufd.data = pi;
  735. +
  736. + if (ulogd_register_fd(&priv->ufd) < 0)
  737. + return ULOGD_IRET_ERR;
  738. +
  739. + /* Add a 1 second timer */
  740. + ulogd_add_timer(&priv->timer, 1);
  741. +
  742. + return ULOGD_IRET_OK;
  743. +}
  744. +
  745. +static int ipfix_stop(struct ulogd_pluginstance *pi)
  746. +{
  747. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  748. +
  749. + ulogd_unregister_fd(&priv->ufd);
  750. + close(priv->ufd.fd);
  751. + priv->ufd.fd = -1;
  752. +
  753. + ulogd_del_timer(&priv->timer);
  754. +
  755. + ipfix_msg_free(priv->msg);
  756. + priv->msg = NULL;
  757. +
  758. + return 0;
  759. +}
  760. +
  761. +static int ipfix_interp(struct ulogd_pluginstance *pi)
  762. +{
  763. + struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
  764. + struct vy_ipfix_data *data;
  765. + int oid, mtu, ret;
  766. + char addr[16];
  767. +
  768. + if (!(GET_FLAGS(pi->input.keys, InIpSaddr) & ULOGD_RETF_VALID))
  769. + return ULOGD_IRET_OK;
  770. +
  771. + oid = oid_ce(pi->config_kset).u.value;
  772. + mtu = mtu_ce(pi->config_kset).u.value;
  773. +
  774. +again:
  775. + if (!priv->msg) {
  776. + priv->msg = ipfix_msg_alloc(mtu, oid);
  777. + if (!priv->msg) {
  778. + /* just drop this flow */
  779. + ulogd_log(ULOGD_ERROR, "out of memory, dropping flow\n");
  780. + return ULOGD_IRET_OK;
  781. + }
  782. + ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
  783. + }
  784. +
  785. + data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
  786. + if (!data) {
  787. + enqueue_msg(priv, priv->msg);
  788. + priv->msg = NULL;
  789. + /* can't loop because the next will definitely succeed */
  790. + goto again;
  791. + }
  792. +
  793. + data->ifi_in = data->ifi_out = 0;
  794. +
  795. + data->saddr.s_addr = ikey_get_u32(&pi->input.keys[InIpSaddr]);
  796. + data->daddr.s_addr = ikey_get_u32(&pi->input.keys[InIpDaddr]);
  797. +
  798. + data->packets = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktCount])
  799. + + ikey_get_u64(&pi->input.keys[InRawOutPktCount])));
  800. + data->bytes = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktLen])
  801. + + ikey_get_u64(&pi->input.keys[InRawOutPktLen])));
  802. +
  803. + data->start = htonl(ikey_get_u32(&pi->input.keys[InFlowStartSec]));
  804. + data->end = htonl(ikey_get_u32(&pi->input.keys[InFlowEndSec]));
  805. +
  806. + if (GET_FLAGS(pi->input.keys, InL4SPort) & ULOGD_RETF_VALID) {
  807. + data->sport = htons(ikey_get_u16(&pi->input.keys[InL4SPort]));
  808. + data->dport = htons(ikey_get_u16(&pi->input.keys[InL4DPort]));
  809. + }
  810. +
  811. + data->aid = 0;
  812. + if (GET_FLAGS(pi->input.keys, InCtMark) & ULOGD_RETF_VALID)
  813. + data->aid = htonl(ikey_get_u32(&pi->input.keys[InCtMark]));
  814. +
  815. + data->l4_proto = ikey_get_u8(&pi->input.keys[InIpProto]);
  816. + data->__padding = 0;
  817. +
  818. + ulogd_log(ULOGD_DEBUG, "Got new packet (packets = %u, bytes = %u, flow = (%u, %u), saddr = %s, daddr = %s, sport = %u, dport = %u)\n",
  819. + ntohl(data->packets), ntohl(data->bytes), ntohl(data->start), ntohl(data->end),
  820. + inet_ntop(AF_INET, &data->saddr.s_addr, addr, sizeof(addr)),
  821. + inet_ntop(AF_INET, &data->daddr.s_addr, addr, sizeof(addr)),
  822. + ntohs(data->sport), ntohs(data->dport));
  823. +
  824. + if ((ret = send_msgs(pi)) < 0)
  825. + return ret;
  826. +
  827. + return ULOGD_IRET_OK;
  828. +}
  829. +
  830. +static struct ulogd_plugin ipfix_plugin = {
  831. + .name = "IPFIX",
  832. + .input = {
  833. + .keys = ipfix_in_keys,
  834. + .num_keys = ARRAY_SIZE(ipfix_in_keys),
  835. + .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW | ULOGD_DTYPE_SUM
  836. + },
  837. + .output = {
  838. + .type = ULOGD_DTYPE_SINK
  839. + },
  840. + .config_kset = (struct config_keyset *) &ipfix_kset,
  841. + .priv_size = sizeof(struct ipfix_priv),
  842. + .configure = ipfix_configure,
  843. + .start = ipfix_start,
  844. + .stop = ipfix_stop,
  845. + .interp = ipfix_interp,
  846. + .version = VERSION,
  847. +};
  848. +
  849. +void __attribute__ ((constructor)) init(void);
  850. +
  851. +void init(void)
  852. +{
  853. + ulogd_register_plugin(&ipfix_plugin);
  854. +}
  855. --
  856. cgit v1.2.1