00001
00002
00003
00004
package org.nees.rbnb;
00005
00006
import java.io.IOException;
00007
import java.util.Date;
00008
import java.util.TimeZone;
00009
import java.text.SimpleDateFormat;
00010
00011
import com.rbnb.sapi.*;
00012
00013
import com.rbnb.utility.ArgHandler;
00014
00015
00016
import com.rbnb.utility.RBNBProcess;
00017
00018
00022 public class NumberSink {
00023
00024
private static final String SERVER_NAME =
"localhost";
00025
private static final String SERVER_PORT =
"3333";
00026
private static final String SINK_NAME =
"GetNumber";
00027
private static final String SOURCE_NAME =
"RandomWalk";
00028
private static final String CHANNEL_NAME =
"RandomWalkData";
00029
00030
private static final SimpleDateFormat DATE_FORMAT =
new SimpleDateFormat(
"MMM d, yyyy h:mm aa");
00031
private static final TimeZone TZ = TimeZone.getTimeZone(
"GMT");
00032
00033
static
00034 {
00035 DATE_FORMAT.setTimeZone(TZ);
00036 }
00037
00038
private String serverName = SERVER_NAME;
00039
private String serverPort = SERVER_PORT;
00040
private String server = serverName +
":" + serverPort;
00041
private String sinkName = SINK_NAME;
00042
private String sourceName = SOURCE_NAME;
00043
private String channelName = CHANNEL_NAME;
00044
private String requestPath = sourceName +
"/" + channelName;
00045
00046 Sink sink = null;
00047 ChannelMap sMap;
00048
int index;
00049
boolean connected =
false;
00050
00051 Thread stringDataThread;
00052
boolean runit =
false;
00053
00054
public static void main(String[] args) {
00055
NumberSink s =
new NumberSink(args);
00056 s.
exec();
00057 s.
startThread();
00058 }
00059
00060
private void printUsage() {
00061 System.out.println(
"NumberSink: usage is...");
00062 System.out.println(
"NumberSink ");
00063 System.out.println(
"[-s Server Hostname *" + SERVER_NAME +
"] ");
00064 System.out.println(
"[-o Server Port Number *" + SERVER_PORT +
"] ");
00065 System.out.println(
"[-k Sink Name *" + SINK_NAME +
" ]");
00066 System.out.println(
"[-n Source Name *" + SOURCE_NAME +
"] ");
00067 System.out.println(
"[-c Source Channel Name *" + CHANNEL_NAME +
"] ");
00068 }
00069
00070
public NumberSink(String[] args) {
00071
00072
try {
00073 ArgHandler ah=
new ArgHandler(args);
00074
if (ah.checkFlag(
'h')) {
00075 printUsage();
00076 RBNBProcess.exit(0);
00077 }
00078
if (ah.checkFlag(
's')) {
00079 String a=ah.getOption(
's');
00080
if (a!=null) serverName=a;
00081 }
00082
if (ah.checkFlag(
'p')) {
00083 String a=ah.getOption(
'p');
00084
if (a!=null) serverPort=a;
00085 }
00086
if (ah.checkFlag(
'n')) {
00087 String a=ah.getOption(
'n');
00088
if (a!=null) sourceName=a;
00089 }
00090
if (ah.checkFlag(
'c')) {
00091 String a=ah.getOption(
'c');
00092
if (a!=null) channelName=a;
00093 }
00094
if (ah.checkFlag(
'k')) {
00095 String a=ah.getOption(
'k');
00096
if (a!=null) sinkName=a;
00097 }
00098 }
catch (Exception e) {
00099 System.err.println(
"NumberSink argument exception "+e.getMessage());
00100 e.printStackTrace();
00101 RBNBProcess.exit(0);
00102 }
00103
00104 requestPath = sourceName +
"/" + channelName;
00105 server = serverName +
":" + serverPort;
00106
00107 System.out.println(
"Starting NumberSink on " + server +
" as " + sinkName);
00108 System.out.println(
" Requesting " + requestPath);
00109 System.out.println(
" Use NumberSink -h to see optional parameters");
00110 }
00111
00112
public void exec()
00113 {
00114
try {
00115
00116 sink=
new Sink();
00117 sink.OpenRBNBConnection(server,sinkName);
00118 sMap =
new ChannelMap();
00119 index = sMap.Add(requestPath);
00120 sink.Subscribe(sMap,
"newest");
00121 connected =
true;
00122 System.out.println(
"NumberSink: Connection made to server = "
00123 + server +
" as " + sinkName
00124 +
" requesting " + requestPath +
".");
00125 }
catch (SAPIException se) { se.printStackTrace(); }
00126 }
00127
00128
public void startThread()
00129 {
00130
00131
if (!connected)
return;
00132
00133
00134 Runnable r =
new Runnable() {
00135
public void run() {
00136 runWork();
00137 }
00138 };
00139 runit =
true;
00140 stringDataThread =
new Thread(r,
"StringData");
00141 stringDataThread.start();
00142 System.out.println(
"NumberSink: Started thread.");
00143 }
00144
00145
public void stopThread()
00146 {
00147 runit =
false;
00148 stringDataThread.interrupt();
00149 System.out.println(
"NumberSink: Stopped thread.");
00150 }
00151
00152
private void runWork ()
00153 {
00154
try {
00155
while(isRunning())
00156 {
00157 ChannelMap m = sink.Fetch(-1);
00158
if (m == null)
00159 {
00160 System.out.println(
"Data fetch failed.");
00161
continue;
00162 }
00163
double timeStamp = m.GetTimeStart(index);
00164
double[] data = m.GetDataAsFloat64(index);
00165
long unixTime = (
long)(timeStamp * 1000.0);
00166 String time = DATE_FORMAT.format(
new Date(unixTime));
00167 System.out.println(
"Data Received (for " + time +
" GMT): ");
00168
for (
int i = 0; i < data.length; i++)
00169 {
00170 System.out.println(
" " + i +
": " + data[i]);
00171 }
00172 }
00173 }
catch (SAPIException se) {
00174 se.printStackTrace();
00175 }
00176 stringDataThread = null;
00177 }
00178
00179
public boolean isRunning()
00180 {
00181
return (connected && runit);
00182 }
00183
00184
00185 }