00001
00013 #include <sys/types.h>
00014 #include <sys/socket.h>
00015 #include <netinet/in.h>
00016 #include <arpa/inet.h>
00017 #include <netdb.h>
00018
00019 #include <assert.h>
00020 #include <stdlib.h>
00021 #include <stdio.h>
00022 #include <math.h>
00023 #include <unistd.h>
00024 #include <stdint.h>
00025 #include <string.h>
00026 #include <signal.h>
00027 #include <time.h>
00028 #include <pthread.h>
00029 #include <getopt.h>
00030
00031 #include <sys/termios.h>
00032 #include <fcntl.h>
00033
00034 #include "nsds_util.h"
00035 #include "flog.h"
00036 #include "adxl_util.h"
00037
00038
00039
00040
00042 const int DAQ_PORT = 55050;
00043
00045 bool streaming_active;
00046
00048 const int TCP_Q_LEN = 1;
00049
00051 struct
00052 {
00053 pthread_mutex_t mutex;
00054 bool active[NUM_ADXL_CHANNELS];
00055 int num_active;
00056 } chan_struct;
00057
00059 typedef struct
00060 {
00061 char *serial_port;
00062 int socket_fd;
00063 int delay_usec;
00064 } thread_params_t;
00065
00066
00067
00074 int data_active_count(void)
00075 {
00076 char me[] = "data_active_count";
00077 int rc;
00078 int count;
00079
00080
00081 rc = pthread_mutex_lock(&chan_struct.mutex);
00082 if(rc != 0)
00083 {
00084 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00085 "Error locking mutex!");
00086 return(-1);
00087 }
00088
00089
00090 count = chan_struct.num_active;
00091
00092 rc = pthread_mutex_unlock(&chan_struct.mutex);
00093 if(rc != 0)
00094 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00095 "Error unlocking mutex!");
00096
00097 return(count);
00098 }
00099
00100
00112 bool data_channel_enabled(const int channel_id)
00113 {
00114 char me[] = "data_channel_enabled";
00115 int rc;
00116 bool is_act = false;
00117
00118
00119 if((channel_id < 0) || (channel_id >= NUM_ADXL_CHANNELS))
00120 {
00121 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00122 "Error, invalid channel ID %d", channel_id);
00123 return(false);
00124 }
00125
00126
00127 rc = pthread_mutex_lock(&chan_struct.mutex);
00128 if(rc != 0)
00129 {
00130 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00131 "Error locking mutex!");
00132 return(false);
00133 }
00134
00135 is_act = chan_struct.active[channel_id];
00136
00137 rc = pthread_mutex_unlock(&chan_struct.mutex);
00138 if(rc != 0)
00139 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00140 "Error unlocking mutex!");
00141
00142 return(is_act);
00143 }
00144
00145
00159 int data_channel_flag(const int channel_id, const bool subscribe)
00160 {
00161 char me[] = "data_channel_flag";
00162 int rc;
00163 bool old_setting;
00164
00165
00166 if((channel_id < 0) || (channel_id >= NUM_ADXL_CHANNELS))
00167 {
00168 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00169 "Error, invalid channel ID %d", channel_id);
00170 return(-1);
00171 }
00172
00173
00174 rc = pthread_mutex_lock(&chan_struct.mutex);
00175 if(rc != 0)
00176 {
00177 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00178 "Error locking mutex!");
00179 return(rc);
00180 }
00181
00182
00183 old_setting = chan_struct.active[channel_id];
00184
00185
00186 chan_struct.active[channel_id] = subscribe;
00187
00188 if(subscribe != old_setting)
00189 {
00190 if(subscribe)
00191 chan_struct.num_active++;
00192 else
00193 chan_struct.num_active--;
00194
00195 if(chan_struct.num_active < 0)
00196 {
00197 flog_usr(FLOG_ERROR, FL_ERR_NOCANDO, me,
00198 "Underflow in active channel count; code error!");
00199 chan_struct.num_active = 0;
00200 }
00201
00202 if(chan_struct.num_active > NUM_ADXL_CHANNELS)
00203 {
00204 flog_usr(FLOG_ERROR, FL_ERR_NOCANDO, me,
00205 "Overflow in active channel count; code error!");
00206 chan_struct.num_active = (NUM_ADXL_CHANNELS - 1);
00207 }
00208 }
00209
00210 rc = pthread_mutex_unlock(&chan_struct.mutex);
00211 if(rc != 0)
00212 {
00213 flog_usr(FLOG_ERROR, FL_ERR_MUTEX_FAILURE, me,
00214 "Error unlocking mutex!");
00215 return(rc);
00216 }
00217
00218
00219 if((subscribe) && (old_setting != subscribe))
00220 {
00221 flog_usr(FLOG_L2BUG, 0, me,
00222 "Channel %d marked as active", channel_id);
00223 }
00224 else if(old_setting != subscribe)
00225 {
00226 flog_usr(FLOG_L2BUG, 0, me,
00227 "Channel %d marked as inactive", channel_id);
00228 }
00229
00230 return(0);
00231 }
00232
00233
00234
00245 void *daq_thread_main(void *arg)
00246 {
00247 char me[] = "daq_thread_main";
00248 thread_params_t *params = (thread_params_t *) arg;
00249 int data_socket;
00250 uint32_t idx = 0;
00251 uint32_t data_idx = 0;
00252 uint32_t byte_count = 0;
00253 char data_buf[1024] = "";
00254 char datum_buf[ADXL_DATUM_LEN] = "";
00255 int rc;
00256 time_t t_start, t_delta;
00257 int COMM_FD = -1;
00258 adxl_reading_t data;
00259
00260
00261
00262 if(params == NULL)
00263 {
00264 flog_usr(FLOG_ERROR, FL_ERR_NULL_PTR, me, "Null pointer to thread!");
00265 return(NULL);
00266 }
00267
00268
00269 COMM_FD = adxl_open(params->serial_port);
00270 if(COMM_FD <= 0)
00271 {
00272 flog_usr(FLOG_ERROR, FL_ERR_UNKNOWN, me,
00273 "Unable to open ADXL serial port");
00274 return(NULL);
00275 }
00276
00277
00278 data_socket = params->socket_fd;
00279
00280 flog_usr(FLOG_NOTICE, 0, me, "Data thread running");
00281
00282 t_start = time(NULL);
00283
00284
00285 while((streaming_active) && (!control_break))
00286 {
00287
00288 rc = adxl_read(COMM_FD, &data);
00289 if(rc != 0)
00290 {
00291 flog_usr(FLOG_ERROR, FL_ERR_UNKNOWN, me,
00292 "Error %d reading from ADXL", rc);
00293 goto thread_bailout;
00294 }
00295
00296
00297 if(data_active_count() <= 0)
00298 continue;
00299
00300
00301 strcpy(data_buf, gen_timestamp());
00302
00303
00304 for(idx = 0; idx < NUM_ADXL_CHANNELS; idx++)
00305 {
00306 if(data_channel_enabled(idx))
00307 {
00308
00309 sprintf(datum_buf, "\t%d\t%6.5f",
00310 idx, data.G[idx]);
00311
00312
00313 strcat(data_buf, datum_buf);
00314 }
00315 }
00316
00317
00318 rc = tcp_nl_write(data_socket, data_buf);
00319 if(rc != strlen(data_buf))
00320 {
00321 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00322 "Error sending data, short write");
00323 goto thread_bailout;
00324 }
00325
00326
00327 byte_count += rc;
00328 data_idx++;
00329
00330
00331
00332
00333
00334 if(params->delay_usec > 0)
00335 usleep(params->delay_usec);
00336 }
00337
00338 thread_bailout:
00339
00340
00341 t_delta = time(NULL) - t_start;
00342 if(t_delta == 0)
00343 t_delta = 1;
00344
00345 flog_usr(FLOG_NOTICE, 0, me,
00346 "Data thread exiting, %d points sent", data_idx + 1);
00347 flog_usr(FLOG_CNOTICE, 0, me,
00348 "appx %3.1f kb/sec and %5.1f samples/sec",
00349 (float) (byte_count >> 10) / t_delta,
00350 (data_idx + 1.0) / (float) t_delta);
00351
00352
00353 streaming_active = false;
00354
00355 adxl_close(COMM_FD);
00356
00357 return(NULL);
00358 }
00359
00360
00361
00375 int daq_do_work(const int control_socket, const int data_socket,
00376 const thread_params_t *thread_params)
00377 {
00378 char me[] = "daq_do_work";
00379 int rc, bytes_read, idx;
00380 time_t req_timeout = 0;
00381 char cmd_buf[MAX_CMD_LEN] = "";
00382 char daq_stopped[] = "stopped";
00383 char daq_running[] = "streaming";
00384 char *daq_status = NULL;
00385 char wtf[] = "Unknown command";
00386 char strm_ok[] = "Streaming data on data channel from port";
00387 char strm_stop[] = "Stopping data on data channel from port";
00388 pthread_t data_thread;
00389 char *cptr = NULL;
00390 char *s_ptr = NULL;
00391 char reply_buf[128] = "";
00392 int chan_id;
00393 char tok_buf[MAX_CMD_LEN] = "";
00394 char channel_list[NUM_ADXL_CHANNELS * 5] = "";
00395
00396
00397 flog_usr(FLOG_PROGRESS, 0, me, "Waiting for request (indefinite wait)...");
00398
00399
00400 memset(cmd_buf, 0x00, MAX_CMD_LEN);
00401 memset(channel_list, 0x00, NUM_ADXL_CHANNELS * 4);
00402
00403
00404 for(idx = 0; idx < NUM_ADXL_CHANNELS; idx++)
00405 {
00406 if(idx < (NUM_ADXL_CHANNELS - 1))
00407 sprintf(reply_buf, "%d,", idx);
00408 else
00409 sprintf(reply_buf, "%d", idx);
00410
00411 strcat(channel_list, reply_buf);
00412 }
00413
00414
00415 assert(strlen(channel_list) < ((NUM_ADXL_CHANNELS * 4) - 1));
00416
00417
00418 bytes_read = tcp_nl_read(control_socket, cmd_buf, req_timeout);
00419 if(bytes_read <= 0)
00420 {
00421 flog_usr(FLOG_NOTICE, FL_ERR_TRANSCEIVE, me,
00422 "Error on request read");
00423 return(1);
00424 }
00425
00426
00427 if(strstr(cmd_buf, "list-channels") != NULL)
00428 {
00429 flog_usr(FLOG_PROGRESS, 0, me,
00430 "Got request for channel list");
00431
00432
00433 rc = tcp_nl_write(control_socket, channel_list);
00434 if(rc != strlen(channel_list))
00435 {
00436 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00437 "Error writing status response on control channel");
00438 return(2);
00439 }
00440 }
00441 else if(strstr(cmd_buf, "daq-status") != NULL)
00442 {
00443 if(streaming_active == true)
00444 daq_status = daq_running;
00445 else
00446 daq_status = daq_stopped;
00447
00448 flog_usr(FLOG_L2BUG, 0, me,
00449 "Got status request '%s' OK, responding with '%s'",
00450 cmd_buf, daq_status);
00451
00452 rc = tcp_nl_write(control_socket, daq_status);
00453 if(rc != strlen(daq_status))
00454 {
00455 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00456 "Error writing status response on control channel");
00457 return(2);
00458 }
00459 }
00460 else if(strstr(cmd_buf, "open-port") != NULL)
00461 {
00462
00463 strcpy(tok_buf, cmd_buf);
00464
00465
00466 cptr = strtok_r(tok_buf, CMD_BUF_DELIM_CHARS, &s_ptr);
00467 cptr = strtok_r(NULL, CMD_BUF_DELIM_CHARS, &s_ptr);
00468
00469 if(cptr == NULL)
00470 {
00471 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00472 "Invalid data port open, channel ID not found");
00473
00474
00475 tcp_nl_write(control_socket, BAD_PORT);
00476 return(-1);
00477 }
00478
00479
00480 chan_id = atoi(cptr);
00481
00482 flog_usr(FLOG_L2BUG, 0, me,
00483 "Got data subscription request for channel %d", chan_id);
00484
00485
00486 if(streaming_active == false)
00487 {
00488 flog_usr(FLOG_CL2BUG, 0, me, "Starting data thread");
00489
00490
00491 streaming_active = true;
00492
00493
00494 for(idx = 0; idx < NUM_ADXL_CHANNELS; idx++)
00495 data_channel_flag(idx, false);
00496
00497
00498 rc = data_channel_flag(chan_id, true);
00499 if(rc != 0)
00500 {
00501 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00502 "Error %d activating channel %d", rc, chan_id);
00503 return(rc);
00504 }
00505
00506
00507 rc = pthread_create(&data_thread, NULL, daq_thread_main,
00508 (void *) thread_params);
00509 if(rc != 0)
00510 {
00511 flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00512 "Error on thread creation!");
00513 return(3);
00514 }
00515
00516
00517 pthread_detach(data_thread);
00518 pthread_setconcurrency(2);
00519
00520 flog_usr(FLOG_CL2BUG, 0, me, "Data thread started OK");
00521 }
00522 else
00523 {
00524 flog_usr(FLOG_CL2BUG, 0, me, "adding to active list");
00525
00526
00527 rc = data_channel_flag(chan_id, true);
00528 if(rc != 0)
00529 {
00530 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00531 "Error %d activating channel %d", rc, chan_id);
00532 return(rc);
00533 }
00534 }
00535
00536
00537 sprintf(reply_buf, "%s %d", strm_ok, chan_id);
00538
00539
00540 rc = tcp_nl_write(control_socket, reply_buf);
00541 if(rc != strlen(reply_buf))
00542 {
00543 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00544 "Error writing response on control channel");
00545 return(2);
00546 }
00547 }
00548 else if(strstr(cmd_buf, "close-port") != NULL)
00549 {
00550
00551 strcpy(tok_buf, cmd_buf);
00552
00553
00554 cptr = strtok_r(tok_buf, CMD_BUF_DELIM_CHARS, &s_ptr);
00555 cptr = strtok_r(NULL, CMD_BUF_DELIM_CHARS, &s_ptr);
00556
00557 if(cptr == NULL)
00558 {
00559 flog_usr(FLOG_ERROR, FL_ERR_BAD_PARAM, me,
00560 "Invalid data port close, channel ID not found");
00561 tcp_nl_write(control_socket, BAD_PORT);
00562 return(-1);
00563 }
00564
00565
00566 chan_id = atoi(cptr);
00567
00568 flog_usr(FLOG_NOTICE, 0, me,
00569 "Got port close request on channel %d", chan_id);
00570
00571
00572 rc = data_channel_flag(chan_id, false);
00573
00574
00575 if(data_active_count() <= 0)
00576 streaming_active = false;
00577
00578
00579 sprintf(reply_buf, "%s %d", strm_stop, chan_id);
00580
00581
00582 rc = tcp_nl_write(control_socket, reply_buf);
00583 if(rc != strlen(reply_buf))
00584 {
00585 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00586 "Error writing response on control channel");
00587 return(2);
00588 }
00589 }
00590 else
00591 {
00592 flog_usr(FLOG_ERROR, FL_ERR_NOCANDO, me,
00593 "Got unknown command '%s'", cmd_buf);
00594 rc = tcp_nl_write(control_socket, wtf);
00595 if(rc != strlen(wtf))
00596 {
00597 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00598 "Error writing response on control channel");
00599 return(2);
00600 }
00601 }
00602
00603
00604 fsync(control_socket);
00605
00606 flog_usr(FLOG_L2BUG, 0, me, "done");
00607
00608 return(0);
00609 }
00610
00611
00624 void daq_main_loop(const int daq_port,
00625 const char *serial_portname, const int sample_delay)
00626 {
00627 char me[] = "daq_main_loop";
00628 int ctrl_master, data_master;
00629 int control_socket, data_socket;
00630 struct sockaddr_in fsin;
00631 int addr_len = sizeof(fsin);
00632 bool do_init = false;
00633 int cmd_count = 0;
00634 thread_params_t thread_params;
00635
00636
00637 ctrl_master = tcp_socket_make(daq_port, TCP_Q_LEN);
00638 if(ctrl_master <= 0)
00639 return;
00640
00641
00642 data_master = tcp_socket_make(daq_port + 1, TCP_Q_LEN);
00643 if(data_master <= 0)
00644 return;
00645
00646
00647
00648 while(control_break == false)
00649 {
00650
00651 if(do_init == true)
00652 {
00653 flog_usr(FLOG_PROGRESS, 0, me,
00654 "Re-init DAQ, closing connection");
00655
00656 tcp_close(control_socket);
00657 tcp_close(data_socket);
00658
00659
00660 streaming_active = false;
00661 }
00662 else
00663 do_init = true;
00664
00665
00666
00667 flog_usr(FLOG_L1BUG, 0, me, "Waiting for driver control connection...");
00668
00669 control_socket = accept(ctrl_master,
00670 (struct sockaddr *) &fsin, &addr_len);
00671
00672 flog_usr(FLOG_CL1BUG, 0, me,
00673 "Driver '%s' connected to control channel",
00674 tcp_peername(control_socket));
00675
00676 flog_usr(FLOG_L1BUG, 0, me, "Waiting for driver data connection...");
00677
00678 data_socket = accept(data_master,
00679 (struct sockaddr *) &fsin, &addr_len);
00680
00681
00682 flog_usr(FLOG_CL1BUG, 0, me,
00683 "Driver '%s' connected to data channel",
00684 tcp_peername(data_socket));
00685
00686
00687 thread_params.serial_port = (char *) serial_portname;
00688 thread_params.socket_fd = data_socket;
00689 thread_params.delay_usec = sample_delay;
00690
00691
00692
00693
00694
00695 while(daq_do_work(control_socket, data_socket, &thread_params) == 0)
00696 {
00697 flog_usr(FLOG_L2BUG, 0, me,
00698 "Command #%d completed OK", cmd_count++);
00699 }
00700 }
00701
00702 return;
00703 }
00704
00705
00719 int main(int argc, char *argv[])
00720 {
00721 char me[] = "main";
00722 uint16_t tcp_port = 0;
00723 int help_flag = 0;
00724 int idx, rc;
00725 char def_serial[] = "/dev/ttyS1";
00726 int sample_rate = 0;
00727 char *serial_portname = def_serial;
00728 uint32_t sample_delay = 0;
00729 struct option long_options[] =
00730 {
00731 {"port", required_argument, 0, 'p'},
00732 {"adxl_port", required_argument, 0, 'a'},
00733 {"rate", required_argument, 0, 'r'},
00734 {"help", no_argument, &help_flag, 'h'},
00735 {0, 0, 0, 0}
00736 };
00737
00738
00739 tcp_port = DAQ_PORT;
00740 sample_rate = 0;
00741
00742
00743
00744 flog_set_report(FLOG_L3BUG, FLOG_QUIET, FLOG_QUIET);
00745 flog_set_style(0x2019, 0x0038, 0x6008);
00746
00747
00748 while(1)
00749 {
00750 rc = getopt_long(argc, argv, "p:r:a:h",
00751 long_options, &idx);
00752 if(rc == -1)
00753 break;
00754
00755 switch(rc)
00756 {
00757 case 0:
00758
00759 break;
00760
00761 case 'p':
00762 tcp_port = atoi(optarg);
00763 flog_usr(FLOG_NOTICE, 0, me,
00764 "Setting TCP server port to %d", tcp_port);
00765 break;
00766
00767 case 'a':
00768 serial_portname = optarg;
00769 flog_usr(FLOG_NOTICE, 0, me, "Serial port set to '%s'",
00770 serial_portname);
00771 break;
00772
00773 case 'r':
00774 sample_rate = atoi(optarg);
00775
00776 flog_usr(FLOG_NOTICE, 0, me,
00777 "Setting sample rate to %d Hz", sample_rate);
00778 break;
00779
00780 case '?':
00781
00782 break;
00783
00784 case 'h':
00785 help_flag = 1;
00786 break;
00787
00788 default:
00789 flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00790 "Unreachable case in getopt_long parse");
00791 exit(1);
00792 }
00793 }
00794
00795 if(help_flag != 0)
00796 {
00797 print_args(long_options);
00798 return(1);
00799 }
00800
00801
00802 if(sample_rate > 0)
00803 {
00804 if(sample_rate > ADXL_MAX_RATE_HZ)
00805 {
00806 flog_usr(FLOG_NOTICE, FL_ERR_BAD_PARAM, me,
00807 "Sample rate %d Hz too high - setting to device max %d",
00808 sample_rate, ADXL_MAX_RATE_HZ);
00809
00810 sample_rate = 0;
00811 }
00812
00813
00814 sample_delay = 1000000 / sample_rate;
00815 }
00816 else
00817 sample_delay = 0;
00818
00819
00820 flog_usr(FLOG_NOTICE, 0, me,
00821 "Compiled for %d data channels max", NUM_ADXL_CHANNELS);
00822
00823
00824 flog_usr(FLOG_NOTICE, 0, me, "Installing signal handler");
00825 control_break = false;
00826 signal(SIGINT, adxl_sighandler);
00827 signal(SIGPIPE, adxl_sighandler);
00828
00829
00830 daq_main_loop(tcp_port, serial_portname, sample_delay);
00831
00832
00833 flog_usr(FLOG_NOTICE, 0, me, "Done");
00834 return(0);
00835 }