00001
00002
00003
00004 package org.nees.rbnb;
00005
00006 import java.io.IOException;
00007 import java.util.Date;
00008 import java.util.TimeZone;
00009 import java.text.SimpleDateFormat;
00010
00011 import com.rbnb.sapi.*;
00012 import COM.Creare.Utility.ArgHandler;
00013 import COM.Creare.Utility.RBNBProcess;
00014
00015
00016
00020 public class NumberSink {
00021
00022 private static final String SERVER_NAME = "localhost";
00023 private static final String SERVER_PORT = "3333";
00024 private static final String SINK_NAME = "GetNumber";
00025 private static final String SOURCE_NAME = "RandomWalk";
00026 private static final String CHANNEL_NAME = "RandomWalkData";
00027
00028 private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("MMM d, yyyy h:mm aa");
00029 private static final TimeZone TZ = TimeZone.getTimeZone("GMT");
00030
00031 static
00032 {
00033 DATE_FORMAT.setTimeZone(TZ);
00034 }
00035
00036 private String serverName = SERVER_NAME;
00037 private String serverPort = SERVER_PORT;
00038 private String server = serverName + ":" + serverPort;
00039 private String sinkName = SINK_NAME;
00040 private String sourceName = SOURCE_NAME;
00041 private String channelName = CHANNEL_NAME;
00042 private String requestPath = sourceName + "/" + channelName;
00043
00044 Sink sink = null;
00045 ChannelMap sMap;
00046 int index;
00047 boolean connected = false;
00048
00049 Thread stringDataThread;
00050 boolean runit = false;
00051
00052 public static void main(String[] args) {
00053 NumberSink s = new NumberSink(args);
00054 s.exec();
00055 s.startThread();
00056 }
00057
00058 private void printUsage() {
00059 System.out.println("NumberSink: usage is...");
00060 System.out.println("NumberSink ");
00061 System.out.println("[-s Server Hostname *" + SERVER_NAME + "] ");
00062 System.out.println("[-o Server Port Number *" + SERVER_PORT + "] ");
00063 System.out.println("[-k Sink Name *" + SINK_NAME + " ]");
00064 System.out.println("[-n Source Name *" + SOURCE_NAME + "] ");
00065 System.out.println("[-c Source Channel Name *" + CHANNEL_NAME + "] ");
00066 }
00067
00068 public NumberSink(String[] args) {
00069
00070 try {
00071 ArgHandler ah=new ArgHandler(args);
00072 if (ah.checkFlag('h')) {
00073 printUsage();
00074 RBNBProcess.exit(0);
00075 }
00076 if (ah.checkFlag('s')) {
00077 String a=ah.getOption('s');
00078 if (a!=null) serverName=a;
00079 }
00080 if (ah.checkFlag('p')) {
00081 String a=ah.getOption('p');
00082 if (a!=null) serverPort=a;
00083 }
00084 if (ah.checkFlag('n')) {
00085 String a=ah.getOption('n');
00086 if (a!=null) sourceName=a;
00087 }
00088 if (ah.checkFlag('c')) {
00089 String a=ah.getOption('c');
00090 if (a!=null) channelName=a;
00091 }
00092 if (ah.checkFlag('k')) {
00093 String a=ah.getOption('k');
00094 if (a!=null) sinkName=a;
00095 }
00096 } catch (Exception e) {
00097 System.err.println("NumberSink argument exception "+e.getMessage());
00098 e.printStackTrace();
00099 RBNBProcess.exit(0);
00100 }
00101
00102 requestPath = sourceName + "/" + channelName;
00103 server = serverName + ":" + serverPort;
00104
00105 System.out.println("Starting NumberSink on " + server + " as " + sinkName);
00106 System.out.println(" Requesting " + requestPath);
00107 System.out.println(" Use NumberSink -h to see optional parameters");
00108 }
00109
00110 public void exec()
00111 {
00112 try {
00113
00114 sink=new Sink();
00115 sink.OpenRBNBConnection(server,sinkName);
00116 sMap = new ChannelMap();
00117 index = sMap.Add(requestPath);
00118 sink.Subscribe(sMap,"newest");
00119 connected = true;
00120 System.out.println("NumberSink: Connection made to sever = "
00121 + server + " as " + sinkName
00122 + " requesting " + requestPath + ".");
00123 } catch (SAPIException se) { se.printStackTrace(); }
00124 }
00125
00126 public void startThread()
00127 {
00128
00129 if (!connected) return;
00130
00131
00132 Runnable r = new Runnable() {
00133 public void run() {
00134 runWork();
00135 }
00136 };
00137 runit = true;
00138 stringDataThread = new Thread(r, "StringData");
00139 stringDataThread.start();
00140 System.out.println("NumberSink: Started thread.");
00141 }
00142
00143 public void stopThread()
00144 {
00145 runit = false;
00146 stringDataThread.interrupt();
00147 System.out.println("NumberSink: Stopped thread.");
00148 }
00149
00150 private void runWork ()
00151 {
00152 try {
00153 while(isRunning())
00154 {
00155 ChannelMap m = sink.Fetch(-1);
00156 if (m == null)
00157 {
00158 System.out.println("Data fetch failed.");
00159 continue;
00160 }
00161 double timeStamp = m.GetTimeStart(index);
00162 double[] data = m.GetDataAsFloat64(index);
00163 long unixTime = (long)(timeStamp * 1000.0);
00164 String time = DATE_FORMAT.format(new Date(unixTime));
00165 System.out.println("Data Received (for " + time + " GMT): ");
00166 for (int i = 0; i < data.length; i++)
00167 {
00168 System.out.println(" " + i + ": " + data[i]);
00169 }
00170 }
00171 } catch (SAPIException se) {
00172 se.printStackTrace();
00173 }
00174 stringDataThread = null;
00175 }
00176
00177 public boolean isRunning()
00178 {
00179 return (connected && runit);
00180 }
00181
00182
00183 }