00001
00076 #include <sys/types.h>
00077 #include <sys/socket.h>
00078 #include <netinet/in.h>
00079 #include <arpa/inet.h>
00080 #include <netdb.h>
00081
00082 #include <stdlib.h>
00083 #include <stdio.h>
00084 #include <math.h>
00085 #include <unistd.h>
00086 #include <stdint.h>
00087 #include <string.h>
00088 #include <signal.h>
00089 #include <time.h>
00090 #include <pthread.h>
00091 #include <getopt.h>
00092
00093 #include "nsds_util.h"
00094 #include "flog.h"
00095
00096
00098 const int SRV_PORT = 42420;
00099
00101 const int QUEUE_LENGTH = 1;
00102
00104 const int NUM_THREADS = 2;
00105
00107 const size_t DATA_READ_SIZE = 256;
00108
00110 const time_t RATE_UPDATE_PERIOD = 10;
00111
00113 bool control_break;
00114
00116 bool streaming_active;
00117
00119 typedef struct
00120 {
00122 int daq_data;
00124 int nsds_data;
00125 } data_thread_arg;
00126
00127
00128
00138 void sighandler(int signal)
00139 {
00140 char me[] = "sighandler";
00141
00142 flog_usr(FLOG_NOTICE, 0, me, "Got signal %d, exiting", signal);
00143
00144
00145 if(signal == SIGPIPE)
00146 {
00147 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00148 "Got SIGPIPE, ignoring");
00149 return;
00150 }
00151
00152
00153 control_break = true;
00154
00155 return;
00156 }
00157
00158
00175 int tcp_send_driverids(const uint16_t control_socket,
00176 const uint16_t data_socket,
00177 const char *daq_name)
00178 {
00179 char me[] = "tcp_send_driverids";
00180 int rc1, rc2;
00181 char driver_uniq[128] = "";
00182 char snd_len = 0;
00183
00184
00185 sprintf(driver_uniq, "driverid drv-lv-%s-ctr", daq_name);
00186 snd_len = strlen(driver_uniq);
00187
00188
00189 rc1 = tcp_nl_write(control_socket, driver_uniq);
00190
00191 sprintf(driver_uniq, "driverid drv-lv-%s-data", daq_name);
00192 snd_len += strlen(driver_uniq);
00193
00194 rc2 = tcp_nl_write(data_socket, driver_uniq);
00195
00196
00197 if((rc1 + rc2) == snd_len)
00198 return(0);
00199 else
00200 return(1);
00201
00202 flog_usr(FLOG_NOLOG, 0, me, "Done");
00203 }
00204
00205
00217 void *data_thread_main(void *arg)
00218 {
00219 char me[] = "data_thread_main";
00220 data_thread_arg *s_args = arg;
00221 char rcv_buf[MAX_CMD_LEN];
00222 int32_t byte_count = 0;
00223 int rc;
00224 data_thread_arg l_args;
00225 int rcv_count = 0;
00226 time_t t_start = time(NULL);
00227 time_t t_delta = 0;
00228 time_t t_last = time(NULL);
00229
00230
00231 if(s_args == NULL)
00232 {
00233 flog_usr(FLOG_ERROR, FL_ERR_NULL_PTR, me,
00234 "Error - void thread argument pointer!");
00235 return(NULL);
00236 }
00237
00238
00239 memcpy(&l_args, s_args, sizeof(l_args));
00240
00241
00242 while((streaming_active) && (!control_break))
00243 {
00244
00245
00246
00247
00248
00249 rcv_count = read(l_args.daq_data, rcv_buf, DATA_READ_SIZE);
00250 if(rcv_count <= 0)
00251 {
00252 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00253 "Error reading data from DAQ");
00254 goto thread_bail;
00255 }
00256
00268
00269 rc = write(l_args.nsds_data, rcv_buf, rcv_count);
00270 if(rc != rcv_count)
00271 {
00272 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00273 "Error sending data to NSDS");
00274 goto thread_bail;
00275 }
00276
00277 byte_count += rcv_count;
00278
00279
00280 t_delta = time(NULL) - t_last;
00281 if(t_delta >= RATE_UPDATE_PERIOD)
00282 {
00283 t_last = time(NULL);
00284 flog_usr(FLOG_NOTICE, 0, me,
00285 "Average data rate so far: %f for %d kb",
00286 ((float) (byte_count >> 10) / (float) (time(NULL) - t_start)),
00287 (byte_count >> 10));
00288 }
00289 }
00290
00291 thread_bail:
00292
00293 t_delta = time(NULL) - t_start;
00294
00295
00296 if(t_delta == 0)
00297 t_delta = 1;
00298
00299 flog_usr(FLOG_NOTICE, 0, me,
00300 "Thread exiting, %d bytes transferred, appx %f kb/sec",
00301 byte_count, ((float) (byte_count >> 10) / (float) t_delta));
00302
00303
00304 streaming_active = false;
00305
00306 return(NULL);
00307 }
00308
00309
00320 int data_thread_startup(data_thread_arg d_arg)
00321 {
00322 char me[] = "data_thread_startup";
00323 int rc;
00324 pthread_t data_thread;
00325
00326
00327 streaming_active = false;
00328 sleep(1);
00329
00330 streaming_active = true;
00331
00332 rc = pthread_create(&data_thread, NULL, data_thread_main,
00333 (void *) &d_arg);
00334 if(rc != 0)
00335 {
00336 flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00337 "Error on thread creation");
00338 return(rc);
00339 }
00340
00341
00342 pthread_detach(data_thread);
00343 pthread_setconcurrency(NUM_THREADS);
00344
00345
00346 usleep(1000);
00347
00348 flog_usr(FLOG_NOTICE, 0, me, "Data thread started OK");
00349
00350 return(0);
00351 }
00352
00353
00367 int cmd_handler(const uint16_t nsds_s_ctrl,
00368 const uint16_t nsds_s_data,
00369 const uint16_t daq_control,
00370 const uint16_t daq_data,
00371 const char *daq_name)
00372 {
00373 char me[] = "cmd_handler";
00374 char cmd_buf[MAX_CMD_LEN] = "";
00375 int bytes_read = 0;
00376 int rc;
00377 const time_t normal_timeout = 30;
00378 bool got_cmd = false;
00379
00380
00381
00382
00383
00384
00385 while(!got_cmd)
00386 {
00387 flog_usr(FLOG_PROGRESS, 0, me, "Waiting for command...");
00388
00389
00390 bytes_read = tcp_nl_read(nsds_s_ctrl, cmd_buf, normal_timeout);
00391 if(bytes_read < 0)
00392 {
00393 flog_usr(FLOG_WARNING, FL_ERR_TRANSCEIVE, me,
00394 "Error reading command from control channel");
00395 return(1);
00396 }
00397 else if(bytes_read == 0)
00398 {
00399 flog_usr(FLOG_L3BUG, 0, me,
00400 "No command before timeout, checking status");
00401
00402 if(streaming_active == false)
00403 {
00404 flog_usr(FLOG_WARNING, FL_ERR_BAD_VALUE, me,
00405 "Data thread has died! Remaking connections.");
00406 return(2);
00407 }
00408 }
00409 else
00410 {
00411 flog_usr(FLOG_PROGRESS, 0, me, "Command received OK from NSDS");
00412 got_cmd = true;
00413 }
00414 }
00415
00416
00417
00418 if(strstr(cmd_buf, "Welcome") != NULL)
00419 {
00420 flog_usr(FLOG_L2BUG, 0, me,
00421 "Got welcome string '%s'", cmd_buf);
00422 flog_usr(FLOG_CL2BUG, 0, me,
00423 "Sending driver ident to control and data channels...");
00424
00425 rc = tcp_send_driverids(nsds_s_ctrl, nsds_s_data, daq_name);
00426 if(rc == 0)
00427 {
00428 flog_usr(FLOG_PROGRESS, 0, me,
00429 "Driver IDs sent OK, command complete");
00430 }
00431 else
00432 {
00433 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00434 "Error sending driver IDs");
00435 }
00436
00437 return(rc);
00438 }
00439
00440
00441
00442 flog_usr(FLOG_L2BUG, 0, me,
00443 "Got request '%s', forwarding to DAQ", cmd_buf);
00444
00445 rc = tcp_nl_write(daq_control, cmd_buf);
00446 if(rc != strlen(cmd_buf))
00447 {
00448 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00449 "Error sending command to DAQ");
00450 return(1);
00451 }
00452
00453 flog_usr(FLOG_L2BUG, 0, me,"Reading reply from DAQ");
00454
00455
00456 bytes_read = tcp_nl_read(daq_control, cmd_buf, normal_timeout);
00457 if(bytes_read <= 0)
00458 {
00459 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00460 "Error reading DAQ reply");
00461 return(1);
00462 }
00463
00464
00465 flog_usr(FLOG_L2BUG, 0, me,
00466 "Got reply '%s', forwarding to NSDS", cmd_buf);
00467
00468
00469 rc = tcp_nl_write(nsds_s_ctrl, cmd_buf);
00470 if(rc != bytes_read)
00471 {
00472 flog_usr(FLOG_ERROR, FL_ERR_TRANSCEIVE, me,
00473 "Error writing to NSDS control channel");
00474 return(1);
00475 }
00476
00477 flog_usr(FLOG_L2BUG, 0, me,"Query completed OK");
00478 flog_usr(FLOG_CL2BUG, 0, me, "Done with cmd_handler");
00479
00480 return(0);
00481 }
00482
00483
00502 void main_loop(const char *daq_machine, const uint16_t daq_port,
00503 const char *nsds_machine, const uint16_t nsds_port,
00504 const char *driver_name)
00505 {
00506 char me[] = "main_loop";
00507 int nsds_s_data, nsds_s_ctrl, daq_s_ctrl, daq_s_data;
00508 int init_count = 0;
00509 time_t wait_time = 10;
00510 time_t end_interval = 3 * wait_time;
00511 int rc = 0;
00512 static data_thread_arg d_arg;
00513
00514
00515
00516
00517 while(control_break == false)
00518 {
00519
00520 if(init_count > 0)
00521 {
00522 flog_usr(FLOG_NOTICE, 0, me,
00523 "Re-init driver, closing connections");
00524
00525
00526 streaming_active = false;
00527
00528
00529 tcp_close(daq_s_ctrl);
00530 tcp_close(daq_s_data);
00531
00532 tcp_close(nsds_s_ctrl);
00533 tcp_close(nsds_s_data);
00534
00535
00536 daq_s_ctrl = daq_s_data = nsds_s_ctrl = nsds_s_data = -1;
00537
00538
00539
00540 sleep(2);
00541 }
00542
00543 init_count++;
00544
00545 flog_usr(FLOG_NOTICE, 0, me,
00546 "Beginning connect attempt #%d, DAQ first", init_count);
00547
00548
00549 daq_s_ctrl = tcp_connect_retry(daq_machine, daq_port,
00550 wait_time, (time_t) 0,
00551 "DAQ control port");
00552
00553 if(daq_s_ctrl <= 0)
00554 continue;
00555
00556
00557 if(control_break == true)
00558 break;
00559
00560 flog_usr(FLOG_CNOTICE, 0, me,
00561 "Connected to DAQ machine '%s' control channel",
00562 tcp_peername(daq_s_ctrl));
00563
00564
00565 sleep(1);
00566
00567
00568 daq_s_data = tcp_connect_retry(daq_machine, daq_port + 1,
00569 wait_time, (time(NULL) + end_interval),
00570 "DAQ data port");
00571
00572 if(daq_s_data <= 0)
00573 continue;
00574
00575
00576 if(control_break == true)
00577 break;
00578
00579 flog_usr(FLOG_CNOTICE, 0, me,
00580 "Connected to DAQ machine '%s' data channel",
00581 tcp_peername(daq_s_ctrl));
00582
00583
00584 nsds_s_ctrl = tcp_connect_retry(nsds_machine, nsds_port,
00585 wait_time,
00586 (time(NULL) + end_interval),
00587 "NSDS control port");
00588 if(nsds_s_ctrl <= 0)
00589 {
00590 flog_usr(FLOG_NOTICE, 0, me, "No NSDS control connection, restarting");
00591 continue;
00592 }
00593 if(control_break == true)
00594 break;
00595
00596 flog_usr(FLOG_CNOTICE, 0, me,
00597 "Connected to NSDS control channel on %s",
00598 tcp_peername(nsds_s_ctrl));
00599
00600
00601
00602
00603 nsds_s_data = tcp_connect_retry(nsds_machine, (nsds_port + 1),
00604 wait_time,
00605 (time(NULL) + end_interval),
00606 "NSDS data port");
00607 if(nsds_s_data <= 0)
00608 {
00609 flog_usr(FLOG_NOTICE, 0, me,
00610 "No NSDS data connection, restarting");
00611 continue;
00612 }
00613 if(control_break == true)
00614 break;
00615
00616 flog_usr(FLOG_CNOTICE, 0, me,
00617 "Connected to NSDS data channel on %s",
00618 tcp_peername(nsds_s_data));
00619
00620 flog_usr(FLOG_PROGRESS, 0, me,
00621 "All network connections in place, proceeding");
00622
00623
00624 flog_usr(FLOG_PROGRESS, 0, me, "Starting data thread");
00625
00626 d_arg.nsds_data = nsds_s_data;
00627 d_arg.daq_data = daq_s_data;
00628
00629 rc = data_thread_startup(d_arg);
00630 if(rc != 0)
00631 {
00632 flog_usr(FLOG_ERROR, FL_ERR_SYSTEM, me,
00633 "Error on data thread creation!");
00634 return;
00635 }
00636
00637
00638 do
00639 {
00640 rc = cmd_handler(nsds_s_ctrl, nsds_s_data, daq_s_ctrl,
00641 daq_s_data, driver_name);
00642 } while(rc == 0);
00643 }
00644
00645 return;
00646 }
00647
00648
00661 int main(int argc, char *argv[])
00662 {
00663 char me[] = "main";
00664 char daq_machine[512] = "";
00665 char nsds_machine[512] = "";
00666 const char daq_predef[] = "localhost";
00667 const int daq_preport = 55055;
00668 const char nsds_predef[] = "localhost";
00669 const int nsds_preport = 42420;
00670 char driver_name[512] = "";
00671 int rc, idx;
00672 int help_flag = 0;
00673 uint16_t daq_port = daq_preport;
00674 uint16_t nsds_port = nsds_preport;
00675 struct option long_options[] =
00676 {
00677 {"daq_port", required_argument, 0, 'd'},
00678 {"daq_machine", required_argument, 0, 'm'},
00679 {"nsds_port", required_argument, 0, 'n'},
00680 {"nsds_machine", required_argument, 0, 'l'},
00681 {"help", no_argument, &help_flag, 'h'},
00682 {"driver_name", required_argument, 0, 'x'},
00683 {0, 0, 0, 0}
00684 };
00685
00686
00687
00688 flog_set_report(FLOG_L3BUG, FLOG_QUIET, FLOG_QUIET);
00689 flog_set_style(0x2019, 0x0038, 0x6008);
00690
00691
00692 strcpy(nsds_machine, nsds_predef);
00693 strcpy(daq_machine, daq_predef);
00694
00695 daq_port = daq_preport;
00696 nsds_port = nsds_preport;
00697
00698
00699
00700 while(1)
00701 {
00702 rc = getopt_long(argc, argv, "d:m:n:l:x:h",
00703 long_options, &idx);
00704 if(rc == -1)
00705 break;
00706
00707 switch(rc)
00708 {
00709 case 0:
00710 break;
00711
00712 case 'd':
00713 daq_port = atoi(optarg);
00714 flog_usr(FLOG_NOTICE, 0, me,
00715 "Setting DAQ port to %d", daq_port);
00716 break;
00717
00718 case 'm':
00719 strcpy(daq_machine, optarg);
00720 flog_usr(FLOG_NOTICE, 0, me,
00721 "Set DAQ machine name to '%s'", daq_machine);
00722 break;
00723
00724 case 'n':
00725 nsds_port = atoi(optarg);
00726 flog_usr(FLOG_NOTICE, 0, me,
00727 "Set NSDS port to %d", nsds_port);
00728 break;
00729
00730 case 'l':
00731 strcpy(nsds_machine, optarg);
00732 flog_usr(FLOG_NOTICE, 0, me,
00733 "Set NSDS machine name to '%s'", nsds_machine);
00734 break;
00735
00736 case 'x':
00737 strcpy(driver_name, optarg);
00738 flog_usr(FLOG_NOTICE, 0, me,
00739 "Set driver name to '%s'", driver_name);
00740 break;
00741
00742 default:
00743 case 'h':
00744 help_flag = 1;
00745 break;
00746 }
00747 }
00748
00749 if(help_flag)
00750 {
00751 flog_usr(FLOG_NOTICE, 0, me,
00752 "Options are --daq_port, --daq_machine,");
00753 flog_usr(FLOG_CNOTICE, 0, me,
00754 "--nsds_port, --nsds_machine, --driver_name");
00755 flog_usr(FLOG_CNOTICE, 0, me,
00756 "All have arguments, either hostname or TCP port number");
00757 flog_usr(FLOG_CNOTICE, 0, me,
00758 "driver_name is for multiple instances per host");
00759 exit(1);
00760 }
00761
00762 flog_usr(FLOG_NOTICE, 0, me, "Installing signal handler.");
00763
00764 control_break = false;
00765 signal(SIGINT, sighandler);
00766 signal(SIGPIPE, sighandler);
00767
00768
00769 if(driver_name[0] == 0x00)
00770 strcpy(driver_name, daq_machine);
00771
00772
00773 main_loop(daq_machine, daq_port, nsds_machine, nsds_port, driver_name);
00774
00775 flog_usr(FLOG_NOTICE, 0, me, "Done OK");
00776
00777 return(0);
00778 }