00001
00029 #include <sys/types.h>
00030 #include <sys/socket.h>
00031 #include <netinet/in.h>
00032 #include <arpa/inet.h>
00033 #include <netdb.h>
00034
00035 #include <assert.h>
00036 #include <stdlib.h>
00037 #include <stdio.h>
00038 #include <math.h>
00039 #include <unistd.h>
00040 #include <stdint.h>
00041 #include <string.h>
00042 #include <signal.h>
00043 #include <time.h>
00044 #include <pthread.h>
00045 #include <getopt.h>
00046
00047 #include "nsds_util.h"
00048 #include "flog.h"
00049
00050
00051
00052
00054 const int DAQ_PORT = 55055;
00055
00057 const int TCP_Q_LEN = 1;
00058
00060 bool control_break;
00061
00063 int sample_rate;
00064
00066 bool streaming_active;
00067
00069 const int SINE_PERIOD = 100;
00070
00072 struct
00073 {
00074 pthread_mutex_t mutex;
00075 bool active[NUM_CHANNELS];
00076 int num_active;
00077 } chan_struct;
00078
00079
00085 int data_active_count(void)
00086 {
00087 char me[] = "data_active_count";
00088 int rc;
00089 int count;
00090
00091
00092 rc = pthread_mutex_lock(&chan_struct.mutex);
00093 if(rc != 0)
00094 {
00095 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00096 "Error locking mutex!");
00097 return(-1);
00098 }
00099
00100
00101 count = chan_struct.num_active;
00102
00103 rc = pthread_mutex_unlock(&chan_struct.mutex);
00104 if(rc != 0)
00105 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00106 "Error unlocking mutex!");
00107
00108 return(count);
00109 }
00110
00111
00122 bool data_channel_enabled(const int channel_id)
00123 {
00124 char me[] = "data_channel_enabled";
00125 int rc;
00126 bool is_act = false;
00127
00128
00129 if((channel_id < 0) || (channel_id >= NUM_CHANNELS))
00130 {
00131 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00132 "Error, invalid channel ID %d", channel_id);
00133 return(false);
00134 }
00135
00136
00137 rc = pthread_mutex_lock(&chan_struct.mutex);
00138 if(rc != 0)
00139 {
00140 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00141 "Error locking mutex!");
00142 return(false);
00143 }
00144
00145 is_act = chan_struct.active[channel_id];
00146
00147 rc = pthread_mutex_unlock(&chan_struct.mutex);
00148 if(rc != 0)
00149 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00150 "Error unlocking mutex!");
00151
00152 return(is_act);
00153 }
00154
00155
00168 int data_channel_flag(const int channel_id, const bool subscribe)
00169 {
00170 char me[] = "data_channel_flag";
00171 int rc;
00172 bool old_setting;
00173
00174
00175 if((channel_id < 0) || (channel_id >= NUM_CHANNELS))
00176 {
00177 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00178 "Error, invalid channel ID %d", channel_id);
00179 return(-1);
00180 }
00181
00182
00183 rc = pthread_mutex_lock(&chan_struct.mutex);
00184 if(rc != 0)
00185 {
00186 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00187 "Error locking mutex!");
00188 return(rc);
00189 }
00190
00191
00192 old_setting = chan_struct.active[channel_id];
00193
00194
00195 chan_struct.active[channel_id] = subscribe;
00196
00197 if(subscribe != old_setting)
00198 {
00199 if(subscribe)
00200 chan_struct.num_active++;
00201 else
00202 chan_struct.num_active--;
00203
00204 if(chan_struct.num_active < 0)
00205 {
00206 flog_usr(FLOG_ERROR, FL_ERR_NOCANDO, me,
00207 "Underflow in active channel count; code error!");
00208 chan_struct.num_active = 0;
00209 }
00210
00211 if(chan_struct.num_active > NUM_CHANNELS)
00212 {
00213 flog_usr(FLOG_ERROR, FL_ERR_NOCANDO, me,
00214 "Overflow in active channel count; code error!");
00215 chan_struct.num_active = (NUM_CHANNELS - 1);
00216 }
00217 }
00218
00219 rc = pthread_mutex_unlock(&chan_struct.mutex);
00220 if(rc != 0)
00221 {
00222 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00223 "Error unlocking mutex!");
00224 return(rc);
00225 }
00226
00227
00228 if((subscribe) && (old_setting != subscribe))
00229 {
00230 flog_usr(FLOG_L2BUG, 0, me,
00231 "Channel %d marked as active", channel_id);
00232 }
00233 else if(old_setting != subscribe)
00234 {
00235 flog_usr(FLOG_L2BUG, 0, me,
00236 "Channel %d marked as inactive", channel_id);
00237 }
00238
00239 return(0);
00240 }
00241
00242
00250 double data_generate(const int index, const double sin_period)
00251 {
00252 double t_arg = 0.0;
00253 double t_per = 0.0;
00254
00255
00256 assert(sin_period != 0.0);
00257
00258
00259 t_per = (double) index / sin_period;
00260
00261
00262 t_arg = t_per * 2.0 * M_PI;
00263
00264 return(sin(t_arg));
00265 }
00266
00267
00275 void sighandler(int signal)
00276 {
00277 char me[] = "sighandler";
00278
00279 flog_usr(FLOG_NOTICE, 0, me, "Got signal %d, exiting", signal);
00280 control_break = true;
00281
00282 return;
00283 }
00284
00285
00294 void *daq_thread_main(void *arg)
00295 {
00296 char me[] = "daq_thread_main";
00297 int *iptr = (int *) arg;
00298 int data_socket;
00299 uint32_t idx = 0;
00300 uint32_t data_idx = 0;
00301 uint32_t byte_count = 0;
00302 char *data_buf = NULL;
00303 char datum_buf[DATUM_LEN] = "";
00304 int datum_idx = 0;
00305 int bufsize;
00306 double data[SINE_PERIOD];
00307 int rc;
00308 int sample_delay = 50000;
00309 time_t t_start, t_delta;
00310
00311
00312
00313 if(iptr == NULL)
00314 {
00315 flog_usr(FLOG_ERROR, FL_ERR_NULL_PTR, me, "Null pointer to thread!");
00316 return(NULL);
00317 }
00318
00319
00320 bufsize = (sizeof(char) * NUM_CHANNELS * (TSTAMP_LEN + DATUM_LEN + 4));
00321
00322
00323 data_buf = (char *) malloc(bufsize);
00324 if(data_buf == NULL)
00325 {
00326 flog_usr(FLOG_ERROR, FL_ERR_NO_MEMORY, me,
00327 "Malloc failure on %d bytes!", bufsize);
00328
00329
00330 streaming_active = false;
00331
00332 return(NULL);
00333 }
00334
00335
00336 for(idx = 0; idx < SINE_PERIOD; idx++)
00337 data[idx] = data_generate(idx, SINE_PERIOD);
00338
00339
00340
00341 if((sample_rate > 0) && (sample_rate < 100000))
00342 {
00343
00344 sample_delay = 1000000 / sample_rate;
00345 }
00346
00347
00348 data_socket = *iptr;
00349
00350 flog_usr(FLOG_NOTICE, 0, me, "Data thread running, %d Hz, %d usec delay",
00351 sample_rate, sample_delay);
00352
00353 t_start = time(NULL);
00354
00355
00356 while((streaming_active) && (!control_break))
00357 {
00358
00359 if(data_active_count() <= 0)
00360 {
00361 usleep(sample_delay);
00362 continue;
00363 }
00364
00365
00366 memset(data_buf, 0x00, bufsize);
00367
00368
00369 strcpy(data_buf, gen_timestamp());
00370
00371
00372 for(idx = 0; idx < NUM_CHANNELS; idx++)
00373 {
00374 if(data_channel_enabled(idx))
00375 {
00376
00377
00378 datum_idx++;
00379
00380
00381
00382 sprintf(datum_buf, "\t%d\t%f", idx, data[datum_idx]);
00383
00384
00385 strcat(data_buf, datum_buf);
00386 }
00387 }
00388
00389
00390 rc = tcp_nl_write(data_socket, data_buf);
00391 if(rc != strlen(data_buf))
00392 {
00393 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00394 "Error sending data, short write");
00395 goto thread_bailout;
00396 }
00397
00398
00399 assert(rc <= bufsize);
00400
00401
00402 byte_count += rc;
00403
00404
00405
00406 }
00407
00408 thread_bailout:
00409
00410
00411 t_delta = time(NULL) - t_start;
00412 if(t_delta == 0)
00413 t_delta = 1;
00414
00415 flog_usr(FLOG_NOTICE, 0, me,
00416 "Data thread exiting, %d points sent, appx %f kb/sec",
00417 data_idx + 1, ((float) (byte_count >> 10) / t_delta));
00418
00419
00420 streaming_active = false;
00421
00422 free(data_buf);
00423
00424 return(NULL);
00425 }
00426
00427
00428
00439 int daq_do_work(const int control_socket, const int data_socket)
00440 {
00441 char me[] = "daq_do_work";
00442 int rc, bytes_read, idx;
00443 time_t req_timeout = 0;
00444 char cmd_buf[MAX_CMD_LEN] = "";
00445 char daq_stopped[] = "stopped";
00446 char daq_running[] = "streaming";
00447 char *daq_status = NULL;
00448 char wtf[] = "Unknown command";
00449 char strm_ok[] = "Streaming data on data channel from port";
00450 char strm_stop[] = "Stopping data on data channel from port";
00451 pthread_t data_thread;
00452 char *cptr = NULL;
00453 char *s_ptr = NULL;
00454 char reply_buf[128] = "";
00455 int chan_id;
00456 char tok_buf[MAX_CMD_LEN] = "";
00457 char channel_list[NUM_CHANNELS * 5] = "";
00458
00459
00460 flog_usr(FLOG_PROGRESS, 0, me, "Waiting for request (indefinite wait)...");
00461
00462
00463 memset(cmd_buf, 0x00, MAX_CMD_LEN);
00464 memset(channel_list, 0x00, NUM_CHANNELS * 4);
00465
00466
00467 for(idx = 0; idx < NUM_CHANNELS; idx++)
00468 {
00469 if(idx < (NUM_CHANNELS - 1))
00470 sprintf(reply_buf, "%d,", idx);
00471 else
00472 sprintf(reply_buf, "%d", idx);
00473
00474 strcat(channel_list, reply_buf);
00475 }
00476
00477
00478 assert(strlen(channel_list) < ((NUM_CHANNELS * 4) - 1));
00479
00480
00481 bytes_read = tcp_nl_read(control_socket, cmd_buf, req_timeout);
00482 if(bytes_read <= 0)
00483 {
00484 flog_usr(FLOG_NOTICE, FL_ERR_TRANSCEIVE, me,
00485 "Error on request read");
00486 return(1);
00487 }
00488
00489
00490 if(strstr(cmd_buf, "list-channels") != NULL)
00491 {
00492 flog_usr(FLOG_PROGRESS, 0, me,
00493 "Got request for channel list");
00494
00495
00496 rc = tcp_nl_write(control_socket, channel_list);
00497 if(rc != strlen(channel_list))
00498 {
00499 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00500 "Error writing status response on control channel");
00501 return(2);
00502 }
00503 }
00504 else if(strstr(cmd_buf, "daq-status") != NULL)
00505 {
00506 if(streaming_active == true)
00507 daq_status = daq_running;
00508 else
00509 daq_status = daq_stopped;
00510
00511 flog_usr(FLOG_L2BUG, 0, me,
00512 "Got status request '%s' OK, responding with '%s'",
00513 cmd_buf, daq_status);
00514
00515 rc = tcp_nl_write(control_socket, daq_status);
00516 if(rc != strlen(daq_status))
00517 {
00518 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00519 "Error writing status response on control channel");
00520 return(2);
00521 }
00522 }
00523 else if(strstr(cmd_buf, "open-port") != NULL)
00524 {
00525
00526 strcpy(tok_buf, cmd_buf);
00527
00528
00529 cptr = strtok_r(tok_buf, CMD_BUF_DELIM_CHARS, &s_ptr);
00530 cptr = strtok_r(NULL, CMD_BUF_DELIM_CHARS, &s_ptr);
00531
00532 if(cptr == NULL)
00533 {
00534 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00535 "Invalid data port open, channel ID not found");
00536
00537
00538 tcp_nl_write(control_socket, BAD_PORT);
00539 return(-1);
00540 }
00541
00542
00543 chan_id = atoi(cptr);
00544
00545 flog_usr(FLOG_L2BUG, 0, me,
00546 "Got data subscription request for channel %d", chan_id);
00547
00548 if(streaming_active == false)
00549 {
00550 flog_usr(FLOG_CL2BUG, 0, me, "Starting data thread");
00551
00552
00553 streaming_active = true;
00554
00555
00556 for(idx = 0; idx < NUM_CHANNELS; idx++)
00557 data_channel_flag(idx, false);
00558
00559
00560 rc = data_channel_flag(chan_id, true);
00561 if(rc != 0)
00562 {
00563 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00564 "Error %d activating channel %d", rc, chan_id);
00565 return(rc);
00566 }
00567
00568
00569 rc = pthread_create(&data_thread, NULL, daq_thread_main,
00570 (void *) &data_socket);
00571 if(rc != 0)
00572 {
00573 flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00574 "Error on thread creation!");
00575 return(3);
00576 }
00577
00578
00579 pthread_detach(data_thread);
00580 pthread_setconcurrency(2);
00581
00582 flog_usr(FLOG_CL2BUG, 0, me, "Data thread started OK");
00583 }
00584 else
00585 {
00586 flog_usr(FLOG_CL2BUG, 0, me, "adding to active list");
00587
00588
00589 rc = data_channel_flag(chan_id, true);
00590 if(rc != 0)
00591 {
00592 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00593 "Error %d activating channel %d", rc, chan_id);
00594 return(rc);
00595 }
00596 }
00597
00598
00599 sprintf(reply_buf, "%s %d", strm_ok, chan_id);
00600
00601
00602 rc = tcp_nl_write(control_socket, reply_buf);
00603 if(rc != strlen(reply_buf))
00604 {
00605 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00606 "Error writing response on control channel");
00607 return(2);
00608 }
00609 }
00610 else if(strstr(cmd_buf, "close-port") != NULL)
00611 {
00612
00613 strcpy(tok_buf, cmd_buf);
00614
00615
00616 cptr = strtok_r(tok_buf, CMD_BUF_DELIM_CHARS, &s_ptr);
00617 cptr = strtok_r(NULL, CMD_BUF_DELIM_CHARS, &s_ptr);
00618
00619 if(cptr == NULL)
00620 {
00621 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00622 "Invalid data port close, channel ID not found");
00623 tcp_nl_write(control_socket, BAD_PORT);
00624 return(-1);
00625 }
00626
00627
00628 chan_id = atoi(cptr);
00629
00630 flog_usr(FLOG_NOTICE, 0, me,
00631 "Got port close request on channel %d", chan_id);
00632
00633
00634 rc = data_channel_flag(chan_id, false);
00635
00636
00637 if(data_active_count() <= 0)
00638 streaming_active = false;
00639
00640
00641 sprintf(reply_buf, "%s %d", strm_stop, chan_id);
00642
00643
00644 rc = tcp_nl_write(control_socket, reply_buf);
00645 if(rc != strlen(reply_buf))
00646 {
00647 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00648 "Error writing response on control channel");
00649 return(2);
00650 }
00651 }
00652 else
00653 {
00654 flog_usr(FLOG_ERROR, FL_ERR_NOCANDO, me,
00655 "Got unknown command '%s'", cmd_buf);
00656 rc = tcp_nl_write(control_socket, wtf);
00657 if(rc != strlen(wtf))
00658 {
00659 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00660 "Error writing response on control channel");
00661 return(2);
00662 }
00663 }
00664
00665
00666 fsync(control_socket);
00667
00668 flog_usr(FLOG_L2BUG, 0, me, "done");
00669
00670 return(0);
00671 }
00672
00673
00674
00684 void daq_main_loop(const int daq_port)
00685 {
00686 char me[] = "daq_main_loop";
00687 int ctrl_master, data_master;
00688 int control_socket, data_socket;
00689 struct sockaddr_in fsin;
00690 int addr_len = sizeof(fsin);
00691 bool do_init = false;
00692 int cmd_count = 0;
00693
00694
00695 ctrl_master = tcp_socket_make(daq_port, TCP_Q_LEN);
00696 if(ctrl_master <= 0)
00697 return;
00698
00699
00700 data_master = tcp_socket_make(daq_port + 1, TCP_Q_LEN);
00701 if(data_master <= 0)
00702 return;
00703
00704
00705
00706 while(control_break == false)
00707 {
00708
00709 if(do_init == true)
00710 {
00711 flog_usr(FLOG_PROGRESS, 0, me,
00712 "Re-init DAQ, closing connection");
00713
00714 tcp_close(control_socket);
00715 tcp_close(data_socket);
00716
00717
00718 streaming_active = false;
00719 }
00720 else
00721 do_init = true;
00722
00723
00724
00725 flog_usr(FLOG_L1BUG, 0, me, "Waiting for NSDS control connection...");
00726
00727 control_socket = accept(ctrl_master,
00728 (struct sockaddr *) &fsin, &addr_len);
00729
00730 flog_usr(FLOG_CL1BUG, 0, me,
00731 "Driver '%s' connected to control channel",
00732 tcp_peername(control_socket));
00733
00734 flog_usr(FLOG_L1BUG, 0, me, "Waiting for NSDS data connection...");
00735
00736 data_socket = accept(data_master,
00737 (struct sockaddr *) &fsin, &addr_len);
00738
00739
00740 flog_usr(FLOG_CL1BUG, 0, me,
00741 "Driver '%s' connected to data channel",
00742 tcp_peername(data_socket));
00743
00744
00745
00746
00747 while(daq_do_work(control_socket, data_socket) == 0)
00748 {
00749 flog_usr(FLOG_L2BUG, 0, me,
00750 "Command #%d completed OK", cmd_count++);
00751 }
00752 }
00753
00754 return;
00755 }
00756
00757
00771 int main(int argc, char *argv[])
00772 {
00773 char me[] = "main";
00774 uint16_t daq_port = 0;
00775 int help_flag = 0;
00776 int idx, rc;
00777
00778 struct option long_options[] =
00779 {
00780 {"port", required_argument, 0, 'p'},
00781 {"rate", required_argument, 0, 'r'},
00782 {"help", no_argument, &help_flag, 'h'},
00783 {0, 0, 0, 0}
00784 };
00785
00786
00787 daq_port = DAQ_PORT;
00788 sample_rate = 20;
00789
00790
00791 flog_set_report(FLOG_L3BUG, FLOG_QUIET, FLOG_QUIET);
00792 flog_set_style(0x2019, 0x0038, 0x6008);
00793
00794
00795 while(1)
00796 {
00797 rc = getopt_long(argc, argv, "p:r:h",
00798 long_options, &idx);
00799 if(rc == -1)
00800 break;
00801
00802 switch(rc)
00803 {
00804 case 0:
00805
00806 break;
00807
00808 case 'p':
00809 daq_port = atoi(optarg);
00810 flog_usr(FLOG_NOTICE, 0, me,
00811 "Setting port to %d", daq_port);
00812 break;
00813
00814 case 'r':
00815 sample_rate = atoi(optarg);
00816
00817 flog_usr(FLOG_NOTICE, 0, me,
00818 "Setting sample rate to %d Hz", sample_rate);
00819 break;
00820
00821 case '?':
00822
00823 break;
00824
00825 case 'h':
00826 help_flag = 1;
00827 break;
00828
00829 default:
00830 flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00831 "Unreachable case in getopt_long parse");
00832 exit(1);
00833 }
00834 }
00835
00836 if(help_flag != 0)
00837 {
00838 flog_usr(FLOG_NOTICE, 0, me, "Options are -port and / or -rate");
00839 flog_usr(FLOG_CNOTICE, 0, me,
00840 "Corresponding to driver TCP port and sample rate");
00841 return(1);
00842 }
00843
00844 flog_usr(FLOG_NOTICE, 0, me,
00845 "Compiled for %d data channels max", NUM_CHANNELS);
00846
00847
00848 flog_usr(FLOG_NOTICE, 0, me, "Installing signal handler");
00849 control_break = false;
00850 signal(SIGINT, sighandler);
00851 signal(SIGPIPE, sighandler);
00852
00853
00854 daq_main_loop(daq_port);
00855
00856
00857 flog_usr(FLOG_NOTICE, 0, me, "Done");
00858 return(0);
00859 }