Main Page   Data Structures   File List   Data Fields   Globals  

driver.c

Go to the documentation of this file.
00001 
00076 #include <sys/types.h>
00077 #include <sys/socket.h>
00078 #include <netinet/in.h>
00079 #include <arpa/inet.h>
00080 #include <netdb.h>
00081 
00082 #include <stdlib.h>
00083 #include <stdio.h>
00084 #include <math.h>
00085 #include <unistd.h>
00086 #include <stdint.h>
00087 #include <string.h>
00088 #include <signal.h>
00089 #include <time.h>
00090 #include <pthread.h>
00091 #include <getopt.h>
00092 
00093 #include "nsds_util.h"
00094 #include "flog.h"
00095 
00096 /* Globals */
00098 const int SRV_PORT = 42420;
00099 
00101 const int QUEUE_LENGTH = 1;
00102 
00104 const int NUM_THREADS = 2;
00105 
00107 const size_t DATA_READ_SIZE = 256;
00108 
00110 const time_t RATE_UPDATE_PERIOD = 10;
00111 
00113 bool control_break;
00114 
00116 bool streaming_active;
00117 
00119 typedef struct 
00120 {
00122     int daq_data;  
00124     int nsds_data; 
00125 } data_thread_arg;
00126 
00127 
00128 // ---------------------------------------------------------------------
00138 void sighandler(int signal)
00139 {
00140     char    me[] = "sighandler";
00141 
00142     flog_usr(FLOG_NOTICE, 0, me, "Got signal %d, exiting", signal);
00143     
00144     // New 10/17/02, let pipe errors propagate into socket layer
00145     if(signal == SIGPIPE)
00146     {
00147     flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00148          "Got SIGPIPE, ignoring");
00149     return;
00150     }
00151     
00152     // Signal must have been a break
00153     control_break = true;
00154 
00155     return;
00156 }
00157 
00158 // ---------------------------------------------------------------------
00175 int tcp_send_driverids(const uint16_t control_socket, 
00176                const uint16_t data_socket, 
00177                const char *daq_name)
00178 {
00179     char               me[] = "tcp_send_driverids";
00180     int                rc1, rc2;
00181     char               driver_uniq[128] = "";
00182     char               snd_len = 0;
00183     
00184     // Build unique driver id
00185     sprintf(driver_uniq, "driverid drv-lv-%s-ctr", daq_name);
00186     snd_len = strlen(driver_uniq);
00187     
00188     // Send driver id and so forth
00189     rc1 = tcp_nl_write(control_socket, driver_uniq);
00190 
00191     sprintf(driver_uniq, "driverid drv-lv-%s-data", daq_name);
00192     snd_len += strlen(driver_uniq);
00193     
00194     rc2 = tcp_nl_write(data_socket, driver_uniq);
00195 
00196     // Did it work?
00197     if((rc1 + rc2) == snd_len)
00198     return(0);
00199     else
00200     return(1);
00201 
00202     flog_usr(FLOG_NOLOG, 0, me, "Done");
00203 }
00204 
00205 // ---------------------------------------------------------------------
00217 void *data_thread_main(void *arg)
00218 {
00219     char              me[] = "data_thread_main";
00220     data_thread_arg   *s_args = arg;
00221     char              rcv_buf[MAX_CMD_LEN];
00222     int32_t           byte_count = 0;
00223     int               rc;
00224     data_thread_arg   l_args;
00225     int               rcv_count = 0;
00226     time_t            t_start = time(NULL);
00227     time_t            t_delta = 0;
00228     time_t            t_last = time(NULL);
00229     
00230     
00231     if(s_args == NULL)
00232     {
00233     flog_usr(FLOG_ERROR, FL_ERR_NULL_PTR, me,
00234          "Error - void thread argument pointer!");
00235     return(NULL);
00236     }
00237 
00238     // Save arguments, may be on the stack
00239     memcpy(&l_args, s_args, sizeof(l_args));
00240 
00241     // Blocking calls, so no delays are needed
00242     while((streaming_active) && (!control_break))
00243     {
00244     // Read a byte, write a byte.
00245     /* This is pretty inefficient - if we need to, we can
00246        use fdopen() and get line buffering from the OS.
00247        Test then decide.
00248     */
00249     rcv_count = read(l_args.daq_data, rcv_buf, DATA_READ_SIZE);
00250     if(rcv_count <= 0)
00251     {
00252         flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00253              "Error reading data from DAQ");
00254         goto thread_bail;
00255     }
00256 
00268     // Write data out to NSDS
00269     rc = write(l_args.nsds_data, rcv_buf, rcv_count);
00270     if(rc != rcv_count)
00271     {
00272         flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00273              "Error sending data to NSDS");
00274         goto thread_bail;
00275     }
00276     
00277     byte_count += rcv_count;
00278 
00279     // DDT Statistics
00280     t_delta = time(NULL) - t_last;
00281     if(t_delta >= RATE_UPDATE_PERIOD)
00282     {
00283         t_last = time(NULL);
00284         flog_usr(FLOG_NOTICE, 0, me, 
00285              "Average data rate so far: %f for %d kb",
00286              ((float) (byte_count >> 10) / (float) (time(NULL) - t_start)),
00287              (byte_count >> 10));
00288     }
00289     }
00290 
00291   thread_bail:
00292     // See if can appx the rate
00293     t_delta = time(NULL) - t_start;
00294 
00295     // If less than one second, accept the error in return for not /0
00296     if(t_delta == 0)
00297     t_delta = 1;
00298     
00299     flog_usr(FLOG_NOTICE, 0, me, 
00300          "Thread exiting, %d bytes transferred, appx %f kb/sec", 
00301          byte_count, ((float) (byte_count >> 10) / (float) t_delta));
00302 
00303     // Tell main we're offline
00304     streaming_active = false;
00305     
00306     return(NULL);
00307 }
00308     
00309 // ---------------------------------------------------------------------
00320 int data_thread_startup(data_thread_arg d_arg)
00321 {
00322     char        me[] = "data_thread_startup";
00323     int         rc;
00324     pthread_t   data_thread;
00325 
00326     // Just in case, this should restart thread
00327     streaming_active = false;
00328     sleep(1);
00329     
00330     streaming_active = true;
00331         
00332     rc = pthread_create(&data_thread, NULL, data_thread_main,
00333             (void *) &d_arg);
00334     if(rc != 0)
00335     {
00336     flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00337          "Error on thread creation");
00338     return(rc);
00339     }
00340     
00341     // Let 'er rip
00342     pthread_detach(data_thread);
00343     pthread_setconcurrency(NUM_THREADS);
00344 
00345     // Give it time to start
00346     usleep(1000);
00347     
00348     flog_usr(FLOG_NOTICE, 0, me, "Data thread started OK");
00349 
00350     return(0);
00351 }
00352 
00353 // ---------------------------------------------------------------------
00367 int cmd_handler(const uint16_t nsds_s_ctrl, 
00368         const uint16_t nsds_s_data,
00369         const uint16_t daq_control,
00370         const uint16_t daq_data, 
00371         const char     *daq_name)
00372 {
00373     char               me[] = "cmd_handler";
00374     char               cmd_buf[MAX_CMD_LEN] = "";
00375     int                bytes_read = 0;
00376     int                rc;
00377     const time_t       normal_timeout = 30;
00378     bool               got_cmd = false;
00379 
00380     /* Loop until:
00381        1) Get a command from the NSDS
00382        2) Get an error on control or data sockets
00383        3) User aborts
00384     */
00385     while(!got_cmd)
00386     {
00387        flog_usr(FLOG_PROGRESS, 0, me, "Waiting for command...");
00388         
00389        // Get command from NSDS control channel, wait 30 secs
00390        bytes_read = tcp_nl_read(nsds_s_ctrl, cmd_buf, normal_timeout);
00391        if(bytes_read < 0)
00392        {
00393        flog_usr(FLOG_WARNING, FL_ERR_TRANSCEIVE, me,
00394             "Error reading command from control channel");
00395        return(1);
00396        }
00397        else if(bytes_read == 0)
00398        {
00399        flog_usr(FLOG_L3BUG, 0, me,
00400             "No command before timeout, checking status");
00401        // Check for dead streaming thread
00402        if(streaming_active == false)
00403        {
00404            flog_usr(FLOG_WARNING, FL_ERR_BAD_VALUE, me,
00405                 "Data thread has died! Remaking connections.");
00406            return(2);
00407        }
00408        }
00409        else // Got command!
00410        {
00411        flog_usr(FLOG_PROGRESS, 0, me, "Command received OK from NSDS");
00412        got_cmd = true;
00413        }
00414     }
00415 
00416     // Here we intercept the welcome command and return our compiled-in
00417     // string (plus cmd-line modifier if present). For this we have 1KLOC of C.
00418     if(strstr(cmd_buf, "Welcome") != NULL)
00419     {
00420     flog_usr(FLOG_L2BUG, 0, me,
00421          "Got welcome string '%s'", cmd_buf);
00422     flog_usr(FLOG_CL2BUG, 0, me,
00423          "Sending driver ident to control and data channels...");
00424 
00425     rc = tcp_send_driverids(nsds_s_ctrl, nsds_s_data, daq_name);
00426     if(rc == 0)
00427     {
00428         flog_usr(FLOG_PROGRESS, 0, me,
00429              "Driver IDs sent OK, command complete");
00430     }
00431     else
00432     {
00433         flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00434              "Error sending driver IDs");
00435     }
00436     
00437     return(rc);
00438     }
00439 
00440     // If it ain't 'Welcome', we forward it as-is, ditto for the reply. We R Netcat.
00441 
00442     flog_usr(FLOG_L2BUG, 0, me, 
00443          "Got request '%s', forwarding to DAQ", cmd_buf);
00444     
00445     rc = tcp_nl_write(daq_control, cmd_buf);
00446     if(rc != strlen(cmd_buf))
00447     {
00448     flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00449          "Error sending command to DAQ");
00450     return(1);
00451     }
00452     
00453     flog_usr(FLOG_L2BUG, 0, me,"Reading reply from DAQ");
00454     
00455     // Get reply from DAQ
00456     bytes_read = tcp_nl_read(daq_control, cmd_buf, normal_timeout);
00457     if(bytes_read <= 0)
00458     {
00459     flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00460          "Error reading DAQ reply");
00461     return(1);
00462     }
00463     
00464     // DDT 
00465     flog_usr(FLOG_L2BUG, 0, me, 
00466          "Got reply '%s', forwarding to NSDS", cmd_buf);
00467     
00468     // Forward to NSDS
00469     rc = tcp_nl_write(nsds_s_ctrl, cmd_buf);
00470     if(rc != bytes_read)
00471     {
00472     flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00473          "Error writing to NSDS control channel");
00474     return(1);
00475     }
00476     
00477     flog_usr(FLOG_L2BUG, 0, me,"Query completed OK");
00478     flog_usr(FLOG_CL2BUG, 0, me, "Done with cmd_handler");
00479     
00480     return(0);
00481 }
00482 
00483 // ---------------------------------------------------------------------
00502 void main_loop(const char *daq_machine, const uint16_t daq_port,
00503            const char *nsds_machine, const uint16_t nsds_port,
00504            const char *driver_name)
00505 {
00506     char               me[] = "main_loop";
00507     int                nsds_s_data, nsds_s_ctrl, daq_s_ctrl, daq_s_data;
00508     int                init_count = 0;
00509     time_t             wait_time = 10;
00510     time_t             end_interval = 3 * wait_time;
00511     int                rc = 0;
00512     static data_thread_arg d_arg;
00513     
00514     
00515     // ------------------------
00516     // Main loop, runs until control-c
00517     while(control_break == false)
00518     {
00519     // If restarting, close all first
00520     if(init_count > 0)
00521     {
00522         flog_usr(FLOG_NOTICE, 0, me,
00523              "Re-init driver, closing connections");
00524 
00525         // Tell data thread to exit, if running
00526         streaming_active = false;
00527 
00528         // Shut down all TCP connections
00529         tcp_close(daq_s_ctrl);
00530         tcp_close(daq_s_data);
00531         
00532         tcp_close(nsds_s_ctrl);
00533         tcp_close(nsds_s_data);
00534 
00535         // Mark as invalid FDs
00536         daq_s_ctrl = daq_s_data = nsds_s_ctrl = nsds_s_data = -1;
00537 
00538         // Let labview finish. Do NOT REMOVE THIS. It's a hack to workaround
00539         // a really stupid LV bug. You Have Been Warned.
00540         sleep(2);
00541     }
00542 
00543     init_count++;
00544 
00545     flog_usr(FLOG_NOTICE, 0, me,
00546          "Beginning connect attempt #%d, DAQ first", init_count);
00547     
00548     // Connect to DAQ control channel, wait forever for same
00549     daq_s_ctrl = tcp_connect_retry(daq_machine, daq_port, 
00550                        wait_time, (time_t) 0,
00551                        "DAQ control port");
00552     // If error, start over
00553     if(daq_s_ctrl <= 0)
00554         continue;
00555     
00556     // Did they hit control-c?
00557     if(control_break == true)
00558         break;
00559 
00560     flog_usr(FLOG_CNOTICE, 0, me,
00561          "Connected to DAQ machine '%s' control channel", 
00562          tcp_peername(daq_s_ctrl));
00563 
00564     // DDT labview likes a bit of a delay between TCP connects
00565     sleep(1);
00566 
00567     // Connect to DAQ data channel, wait usual time
00568     daq_s_data = tcp_connect_retry(daq_machine, daq_port + 1, 
00569                        wait_time, (time(NULL) + end_interval),
00570                        "DAQ data port");
00571     // If error, start over
00572     if(daq_s_data <= 0)
00573         continue;
00574     
00575     // Did they hit control-c?
00576     if(control_break == true)
00577         break;
00578 
00579     flog_usr(FLOG_CNOTICE, 0, me,
00580          "Connected to DAQ machine '%s' data channel", 
00581          tcp_peername(daq_s_ctrl));
00582 
00583     // Connect to NSDS control, end at now + 30 secs
00584     nsds_s_ctrl = tcp_connect_retry(nsds_machine, nsds_port,
00585                     wait_time, 
00586                     (time(NULL) + end_interval),
00587                     "NSDS control port");
00588     if(nsds_s_ctrl <= 0)
00589     {
00590         flog_usr(FLOG_NOTICE, 0, me, "No NSDS control connection, restarting");
00591         continue;
00592     }
00593     if(control_break == true)
00594         break;
00595 
00596     flog_usr(FLOG_CNOTICE, 0, me,
00597          "Connected to NSDS control channel on %s",
00598          tcp_peername(nsds_s_ctrl));
00599 
00600     // ---------------
00601     // Second TCP connection to NSDS - data channel, wait 30 secs
00602     //  note use of adjacent TCP port for connection!
00603     nsds_s_data = tcp_connect_retry(nsds_machine, (nsds_port + 1),
00604                     wait_time, 
00605                     (time(NULL) + end_interval),
00606                     "NSDS data port");
00607     if(nsds_s_data <= 0)
00608     {
00609         flog_usr(FLOG_NOTICE, 0, me, 
00610              "No NSDS data connection, restarting");
00611         continue;
00612     }
00613     if(control_break == true)
00614         break;
00615 
00616     flog_usr(FLOG_CNOTICE, 0, me,
00617          "Connected to NSDS data channel on %s", 
00618          tcp_peername(nsds_s_data));
00619 
00620     flog_usr(FLOG_PROGRESS, 0, me,
00621          "All network connections in place, proceeding");
00622     
00623     // New 10/21/02, start data thread now and let it run
00624     flog_usr(FLOG_PROGRESS, 0, me, "Starting data thread");
00625     
00626     d_arg.nsds_data = nsds_s_data;
00627     d_arg.daq_data = daq_s_data;
00628     
00629     rc = data_thread_startup(d_arg);
00630     if(rc != 0)
00631     {
00632         flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00633              "Error on data thread creation!");
00634         return;
00635     }
00636     
00637     // Do work (read commands and respond) until error
00638     do
00639     {
00640         rc = cmd_handler(nsds_s_ctrl, nsds_s_data, daq_s_ctrl, 
00641                  daq_s_data, driver_name);
00642     } while(rc == 0);
00643     }
00644     
00645     return;
00646 }
00647     
00648 // ---------------------------------------------------------------------
00661 int main(int argc, char *argv[])
00662 {
00663     char          me[] = "main";
00664     char          daq_machine[512] = "";
00665     char          nsds_machine[512] = "";
00666     const char    daq_predef[] = "localhost";
00667     const int     daq_preport = 55055;
00668     const char    nsds_predef[] = "localhost";
00669     const int     nsds_preport = 42420;
00670     char          driver_name[512] = "";
00671     int           rc, idx;
00672     int           help_flag = 0;
00673     uint16_t      daq_port = daq_preport;
00674     uint16_t      nsds_port = nsds_preport;
00675     struct option long_options[] = 
00676     {
00677         {"daq_port", required_argument, 0, 'd'},
00678         {"daq_machine", required_argument, 0, 'm'},
00679         {"nsds_port", required_argument, 0, 'n'},
00680         {"nsds_machine", required_argument, 0, 'l'},
00681         {"help", no_argument, &help_flag, 'h'},
00682         {"driver_name", required_argument, 0, 'x'},
00683         {0, 0, 0, 0}
00684     };
00685         
00686 
00687     // Need to set logging options before any printouts
00688     flog_set_report(FLOG_L3BUG, FLOG_QUIET, FLOG_QUIET);
00689     flog_set_style(0x2019, 0x0038, 0x6008);    
00690 
00691     // Set defaults
00692     strcpy(nsds_machine, nsds_predef);
00693     strcpy(daq_machine, daq_predef);
00694     
00695     daq_port = daq_preport;
00696     nsds_port = nsds_preport;
00697     
00698 
00699     // Check command line
00700     while(1)
00701     {
00702     rc = getopt_long(argc, argv, "d:m:n:l:x:h",
00703              long_options, &idx);
00704     if(rc == -1)
00705         break;
00706     
00707     switch(rc)
00708     {
00709       case 0: // Flag set, just bail
00710         break;
00711         
00712       case 'd': // daq_port
00713         daq_port = atoi(optarg);
00714         flog_usr(FLOG_NOTICE, 0, me, 
00715              "Setting DAQ port to %d", daq_port);
00716         break;
00717 
00718       case 'm': // Daq machine name
00719         strcpy(daq_machine, optarg);
00720         flog_usr(FLOG_NOTICE, 0, me, 
00721              "Set DAQ machine name to '%s'", daq_machine);
00722         break;
00723         
00724       case 'n': // NSDS port
00725         nsds_port = atoi(optarg);
00726         flog_usr(FLOG_NOTICE, 0, me, 
00727              "Set NSDS port to %d", nsds_port);
00728         break;
00729         
00730       case 'l': // NSDS machine name
00731         strcpy(nsds_machine, optarg);
00732         flog_usr(FLOG_NOTICE, 0, me,
00733              "Set NSDS machine name to '%s'", nsds_machine);
00734         break;
00735         
00736       case 'x': // Driver name
00737         strcpy(driver_name, optarg);
00738         flog_usr(FLOG_NOTICE, 0, me, 
00739              "Set driver name to '%s'", driver_name);
00740         break;
00741         
00742       default:
00743       case 'h': // Help!
00744         help_flag = 1;
00745         break;
00746     }
00747     }
00748     
00749     if(help_flag)
00750     {
00751     flog_usr(FLOG_NOTICE, 0, me, 
00752          "Options are --daq_port, --daq_machine,");
00753     flog_usr(FLOG_CNOTICE, 0, me, 
00754          "--nsds_port, --nsds_machine, --driver_name");
00755     flog_usr(FLOG_CNOTICE, 0, me,
00756          "All have arguments, either  hostname or TCP port number");
00757     flog_usr(FLOG_CNOTICE, 0, me,
00758          "driver_name is for multiple instances per host");
00759     exit(1);
00760     }
00761     
00762     flog_usr(FLOG_NOTICE, 0, me, "Installing signal handler.");
00763 
00764     control_break = false;
00765     signal(SIGINT, sighandler);
00766     signal(SIGPIPE, sighandler);
00767 
00768     // Set driver name for NSDS if necessary
00769     if(driver_name[0] == 0x00)
00770     strcpy(driver_name, daq_machine);
00771 
00772     // Call main routine
00773     main_loop(daq_machine, daq_port, nsds_machine, nsds_port, driver_name);
00774     
00775     flog_usr(FLOG_NOTICE, 0, me, "Done OK");
00776     
00777     return(0);
00778 }

Generated on Fri Dec 6 14:33:16 2002 for NSDS Driver by doxygen1.2.18