| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- --- a/lib/ipfix.c
- +++ b/lib/ipfix.c
- @@ -37,6 +37,9 @@ $$LIC$$
- #ifdef SCTPSUPPORT
- #include <netinet/sctp.h>
- #endif
- +#ifndef NOTHREADS
- +#include <pthread.h>
- +#endif
- #include <fcntl.h>
- #include <netdb.h>
-
- @@ -123,6 +126,18 @@ static uint16_t g_lasttid;
- static ipfix_datarecord_t g_data = { NULL, NULL, 0 }; /* ipfix_export */
-
- static ipfix_field_t *g_ipfix_fields;
- +#ifndef NOTHREADS
- +static pthread_mutex_t g_mutex;
- +#define mod_lock() { \
- + if ( pthread_mutex_lock( &g_mutex ) !=0 ) \
- + mlogf( 0, "[ipfix] mutex_lock() failed: %s\n", \
- + strerror( errno ) ); \
- + }
- +#define mod_unlock() { pthread_mutex_unlock( &g_mutex ); }
- +#else
- +#define mod_lock()
- +#define mod_unlock()
- +#endif
-
- /*----- prototypes -------------------------------------------------------*/
-
- @@ -133,6 +148,7 @@ int _ipfix_send_message( ipfix_t *ifh,
- ipfix_message_t *message );
- int _ipfix_write_msghdr( ipfix_t *ifh, ipfix_message_t *msg, iobuf_t *buf );
- void _ipfix_disconnect( ipfix_collector_t *col );
- +int _ipfix_export_flush( ipfix_t *ifh );
-
-
- /* name : do_writeselect
- @@ -576,16 +592,18 @@ int ipfix_decode_float( void *in, void *
-
- int ipfix_snprint_float( char *str, size_t size, void *data, size_t len )
- {
- - float tmp32;
- - double tmp64;
- + uint32_t tmp32;
- + uint64_t tmp64;
-
- switch ( len ) {
- case 4:
- - ipfix_decode_float( data, &tmp32, 4);
- - return snprintf( str, size, "%f", tmp32 );
- + memcpy( &tmp32, data, len );
- + tmp32 = htonl( tmp32 );
- + return snprintf( str, size, "%f", (float)tmp32 );
- case 8:
- - ipfix_decode_float( data, &tmp64, 8);
- - return snprintf( str, size, "%lf", tmp64);
- + memcpy( &tmp64, data, len );
- + tmp64 = HTONLL( tmp64 );
- + return snprintf( str, size, "%lf", (double)tmp64 );
- default:
- break;
- }
- @@ -682,12 +700,19 @@ int ipfix_get_eno_ieid( char *field, int
- * parameters:
- * remarks: init module, read field type info.
- */
- -int ipfix_init ( void )
- +int ipfix_init( void )
- {
- if ( g_tstart ) {
- ipfix_cleanup();
- }
-
- +#ifndef NOTHREADS
- + if ( pthread_mutex_init( &g_mutex, NULL ) !=0 ) {
- + mlogf( 0, "[ipfix] pthread_mutex_init() failed: %s\n",
- + strerror(errno) );
- + return -1;
- + }
- +#endif
- g_tstart = time(NULL);
- signal( SIGPIPE, SIG_IGN );
- g_lasttid = 255;
- @@ -806,6 +831,9 @@ void ipfix_cleanup ( void )
- g_data.maxfields = 0;
- g_data.lens = NULL;
- g_data.addrs = NULL;
- +#ifndef NOTHREADS
- + (void)pthread_mutex_destroy( &g_mutex );
- +#endif
- }
-
- int _ipfix_connect ( ipfix_collector_t *col )
- @@ -1465,7 +1493,7 @@ int _ipfix_write_template( ipfix_t
- default:
- /* check space */
- if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN ) {
- - if ( ipfix_export_flush( ifh ) < 0 )
- + if ( _ipfix_export_flush( ifh ) < 0 )
- return -1;
- if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN )
- return -1;
- @@ -1474,6 +1502,8 @@ int _ipfix_write_template( ipfix_t
- /* write template prior to data */
- if ( ifh->offset > 0 ) {
- memmove( ifh->buffer + tsize, ifh->buffer, ifh->offset );
- + if ( ifh->cs_tid )
- + ifh->cs_header += tsize;
- }
-
- buf = ifh->buffer;
- @@ -1615,8 +1645,11 @@ int ipfix_open( ipfix_t **ipfixh, int so
- return -1;
- }
- node->ifh = i;
- +
- + mod_lock();
- node->next = g_ipfixlist;
- g_ipfixlist = node;
- + mod_unlock();
-
- *ipfixh = i;
- return 0;
- @@ -1633,7 +1666,8 @@ void ipfix_close( ipfix_t *h )
- {
- ipfix_node_t *l, *n;
-
- - ipfix_export_flush( h );
- + mod_lock();
- + _ipfix_export_flush( h );
-
- while( h->collectors )
- _ipfix_drop_collector( (ipfix_collector_t**)&h->collectors );
- @@ -1659,6 +1693,7 @@ void ipfix_close( ipfix_t *h )
- #endif
- free(h->buffer);
- free(h);
- + mod_unlock();
- }
- }
-
- @@ -2156,6 +2191,22 @@ void ipfix_release_template( ipfix_t *if
- ipfix_delete_template( ifh, templ );
- }
-
- +static void _finish_cs( ipfix_t *ifh )
- +{
- + size_t buflen;
- + uint8_t *buf;
- +
- + /* finish current dataset */
- + if ( (buf=ifh->cs_header) ==NULL )
- + return;
- + buflen = 0;
- + INSERTU16( buf+buflen, buflen, ifh->cs_tid );
- + INSERTU16( buf+buflen, buflen, ifh->cs_bytes );
- + ifh->cs_bytes = 0;
- + ifh->cs_header = NULL;
- + ifh->cs_tid = 0;
- +}
- +
- int ipfix_export( ipfix_t *ifh, ipfix_template_t *templ, ... )
- {
- int i;
- @@ -2199,13 +2250,14 @@ int ipfix_export( ipfix_t *ifh, ipfix_te
- g_data.addrs, g_data.lens );
- }
-
- -int ipfix_export_array( ipfix_t *ifh,
- - ipfix_template_t *templ,
- - int nfields,
- - void **fields,
- - uint16_t *lengths )
- +static int
- +_ipfix_export_array( ipfix_t *ifh,
- + ipfix_template_t *templ,
- + int nfields,
- + void **fields,
- + uint16_t *lengths )
- {
- - int i;
- + int i, newset_f=0;
- size_t buflen, datasetlen;
- uint8_t *p, *buf;
-
- @@ -2249,7 +2301,19 @@ int ipfix_export_array( ipfix_t
-
- /** get size of data set, check space
- */
- - for ( i=0, datasetlen=4; i<nfields; i++ ) {
- + if ( templ->tid == ifh->cs_tid ) {
- + newset_f = 0;
- + datasetlen = 0;
- + }
- + else {
- + if ( ifh->cs_tid > 0 ) {
- + _finish_cs( ifh );
- + }
- + newset_f = 1;
- + datasetlen = 4;
- + }
- +
- + for ( i=0; i<nfields; i++ ) {
- if ( templ->fields[i].flength == IPFIX_FT_VARLEN ) {
- if ( lengths[i]>254 )
- datasetlen += 3;
- @@ -2263,21 +2327,29 @@ int ipfix_export_array( ipfix_t
- }
- datasetlen += lengths[i];
- }
- - if ( ((ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN )
- - && (ipfix_export_flush( ifh ) <0) ) {
- - return -1;
- +
- + if ( (ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN ) {
- + if ( ifh->cs_tid )
- + _finish_cs( ifh );
- + newset_f = 1;
- +
- + if ( _ipfix_export_flush( ifh ) <0 )
- + return -1;
- }
-
- - /* fill buffer
- - */
- + /* fill buffer */
- buf = (uint8_t*)(ifh->buffer) + ifh->offset;
- buflen = 0;
-
- - /* insert data set
- - */
- - ifh->nrecords ++;
- - INSERTU16( buf+buflen, buflen, templ->tid );
- - INSERTU16( buf+buflen, buflen, datasetlen );
- + if ( newset_f ) {
- + /* insert data set
- + */
- + ifh->cs_bytes = 0;
- + ifh->cs_header = buf;
- + ifh->cs_tid = templ->tid;
- + INSERTU16( buf+buflen, buflen, templ->tid );
- + INSERTU16( buf+buflen, buflen, 4 );
- + }
-
- /* insert data record
- */
- @@ -2303,7 +2375,9 @@ int ipfix_export_array( ipfix_t
- buflen += lengths[i];
- }
-
- + ifh->nrecords ++;
- ifh->offset += buflen;
- + ifh->cs_bytes += buflen;
- if ( ifh->version == IPFIX_VERSION )
- ifh->seqno ++;
- return 0;
- @@ -2313,7 +2387,7 @@ int ipfix_export_array( ipfix_t
- * parameters:
- * remarks: rewrite this func!
- */
- -int ipfix_export_flush( ipfix_t *ifh )
- +int _ipfix_export_flush( ipfix_t *ifh )
- {
- iobuf_t *buf;
- ipfix_collector_t *col;
- @@ -2322,8 +2396,14 @@ int ipfix_export_flush( ipfix_t *ifh )
- if ( (ifh==NULL) || (ifh->offset==0) )
- return 0;
-
- - if ( (buf=_ipfix_getbuf()) ==NULL )
- + if ( ifh->cs_tid > 0 ) {
- + /* finish current dataset */
- + _finish_cs( ifh );
- + }
- +
- + if ( (buf=_ipfix_getbuf()) ==NULL ) {
- return -1;
- + }
-
- #ifdef DEBUG
- mlogf( 0, "[ipfix_export_flush] msg has %d records, %d bytes\n",
- @@ -2350,3 +2430,30 @@ int ipfix_export_flush( ipfix_t *ifh )
- _ipfix_freebuf( buf );
- return ret;
- }
- +
- +int ipfix_export_array( ipfix_t *ifh,
- + ipfix_template_t *templ,
- + int nfields,
- + void **fields,
- + uint16_t *lengths )
- +{
- + int ret;
- +
- + mod_lock();
- + ret = _ipfix_export_array( ifh, templ, nfields, fields, lengths );
- + mod_unlock();
- +
- + return ret;
- +}
- +
- +int ipfix_export_flush( ipfix_t *ifh )
- +{
- + int ret;
- +
- + mod_lock();
- + ret = _ipfix_export_flush( ifh );
- + mod_unlock();
- +
- + return ret;
- +}
- +
- --- a/lib/ipfix.h
- +++ b/lib/ipfix.h
- @@ -142,6 +142,12 @@ typedef struct
- int nrecords; /* no. of records in buffer */
- size_t offset; /* output buffer fill level */
- uint32_t seqno; /* sequence no. of next message */
- +
- + /* experimental */
- + int cs_tid; /* template id of current dataset */
- + int cs_bytes; /* size of current set */
- + uint8_t *cs_header; /* start of current set */
- +
- } ipfix_t;
-
- /** exporter funcs
- --- a/lib/ipfix_col.c
- +++ b/lib/ipfix_col.c
- @@ -907,7 +907,7 @@ int ipfix_decode_datarecord( ipfixt_node
- return 0;
- }
-
- -static void do_free_datarecord( ipfix_datarecord_t *data )
- +void ipfix_free_datarecord( ipfix_datarecord_t *data )
- {
- if ( data ) {
- if ( data->addrs )
- @@ -925,6 +925,7 @@ int ipfix_parse_msg( ipfix_input_t *inpu
- ipfix_hdr_t hdr; /* ipfix packet header */
- ipfixs_node_t *s;
- ipfix_datarecord_t data = { NULL, NULL, 0 };
- + ipfixe_node_t *e;
- uint8_t *buf; /* ipfix payload */
- uint16_t setid, setlen; /* set id, set lenght */
- int i, nread, offset; /* counter */
- @@ -1042,6 +1043,12 @@ int ipfix_parse_msg( ipfix_input_t *inpu
- err_flag = 1;
- }
- else {
- + for ( e=g_exporter; e!=NULL; e=e->next ) {
- + if ( e->elem->export_dset )
- + (void) e->elem->export_dset( t, buf+nread, setlen,
- + e->elem->data );
- + }
- +
- /** read data records
- */
- for ( offset=nread, bytesleft=setlen; bytesleft>4; ) {
- @@ -1076,11 +1083,11 @@ int ipfix_parse_msg( ipfix_input_t *inpu
- goto errend;
-
- end:
- - do_free_datarecord( &data );
- + ipfix_free_datarecord( &data );
- return nread;
-
- errend:
- - do_free_datarecord( &data );
- + ipfix_free_datarecord( &data );
- return -1;
- }
-
- @@ -1093,7 +1100,7 @@ void process_client_tcp( int fd, int mas
- tcp_conn_t *tcon = (tcp_conn_t*)data;
- char *func = "process_client_tcp";
-
- - mlogf( 3, "[%s] fd %d mask %d called.\n", func, fd, mask );
- + mlogf( 4, "[%s] fd %d mask %d called.\n", func, fd, mask );
-
- /** read ipfix header
- */
- --- a/lib/ipfix_col.h
- +++ b/lib/ipfix_col.h
- @@ -88,6 +88,7 @@ typedef struct ipfix_col_info
- int (*export_newsource)(ipfixs_node_t*,void*);
- int (*export_newmsg)(ipfixs_node_t*,ipfix_hdr_t*,void*);
- int (*export_trecord)(ipfixs_node_t*,ipfixt_node_t*,void*);
- + int (*export_dset)(ipfixt_node_t*,uint8_t*,size_t,void*);
- int (*export_drecord)(ipfixs_node_t*,ipfixt_node_t*,
- ipfix_datarecord_t*,void*);
- void (*export_cleanup)(void*);
- --- a/lib/ipfix_col_files.c
- +++ b/lib/ipfix_col_files.c
- @@ -68,7 +68,7 @@ static int export_newsource_file( ipfixs
- return -1;
- }
- snprintf( s->fname+strlen(s->fname), PATH_MAX-strlen(s->fname),
- - "/%u", s->odid );
- + "/%u", (unsigned int)s->odid );
- if ( (access( s->fname, R_OK ) <0 )
- && (mkdir( s->fname, S_IRWXU ) <0) ) {
- mlogf( 0, "[%s] cannot access dir '%s': %s\n",
|