00001
00002
00003
00004
package org.nees.rbnb;
00005
00006
import java.io.IOException;
00007
import java.io.PrintWriter;
00008
import java.io.FileWriter;
00009
import java.io.File;
00010
import java.util.Vector;
00011
import java.util.Iterator;
00012
import java.util.Stack;
00013
import java.util.Date;
00014
import java.util.TimeZone;
00015
import java.text.SimpleDateFormat;
00016
00017
import java.util.regex.*;
00018
00019
import com.rbnb.sapi.*;
00020
00021
import com.rbnb.utility.ArgHandler;
00022
00026 public class GrabDataMultipleSink {
00027
00028
private static final String SERVER_NAME =
"localhost";
00029
private static final String SERVER_PORT =
"3333";
00030
private static final String SINK_NAME =
"GrabDataMutiple";
00031
private static final String SEARCH_CHANNEL_PATH =
".*";
00032
private static final String ARCHIVE_DIRECTORY =
".";
00033
00034
private static final SimpleDateFormat DATE_FORMAT =
new SimpleDateFormat(
"MMM d, yyyy h:mm aa");
00035
private static final TimeZone TZ = TimeZone.getTimeZone(
"GMT");
00036
00037
private static final SimpleDateFormat INPUT_FORMAT =
new SimpleDateFormat(
"yyyy-MM-dd;hh:mm:ss.SSS");
00038
00039
static
00040 {
00041 DATE_FORMAT.setTimeZone(TZ);
00042 INPUT_FORMAT.setTimeZone(TZ);
00043 }
00044
00045
private String serverName = SERVER_NAME;
00046
private String serverPort = SERVER_PORT;
00047
private String server = serverName +
":" + serverPort;
00048
private String sinkName = SINK_NAME;
00049
private String searchChannelPath = SEARCH_CHANNEL_PATH;
00050
private String archiveDirectory = ARCHIVE_DIRECTORY;
00051
private boolean includeHidden =
false;
00052
00053
private Sink sink = null;
00054
private ChannelMap sMap;
00055
private int index;
00056
00057
private boolean useStart =
false;
00058
private double startTime;
00059
private boolean useDuration =
false;
00060
private double endTime;
00061
private double duration;
00062
00063
public static void main(String[] args) {
00064
GrabDataMultipleSink s =
new GrabDataMultipleSink(args);
00065 s.
exec();
00066 }
00067
00068
private void printUsage() {
00069 System.out.println(
"GrabDataMultipleSink: usage is...");
00070 System.out.println(
"GrabDataMultipleSink ");
00071 System.out.println(
"[-s Server Hostname *" + SERVER_NAME +
"] ");
00072 System.out.println(
"[-o Server Port Number *" + SERVER_PORT +
"] ");
00073 System.out.println(
"[-k Sink Name *" + SINK_NAME +
" ]");
00074 System.out.println(
"[-c Channel Path Pattern *" + SEARCH_CHANNEL_PATH +
"] ");
00075 System.out.println(
"[-d Archive directory root *" + ARCHIVE_DIRECTORY +
"]");
00076 System.out.println(
"[-x Include Hidden (true|false) *false]");
00077 System.out.println(
"[-a Start Time *now]");
00078 System.out.println(
"[-z End Time *<unspecified>]");
00079 System.out.println(
"Note: times can either be yyyy-mm-dd;hh:mm:ss.nnn or");
00080 System.out.println(
"an arbitraty floating point number");
00081 }
00082
00083
public GrabDataMultipleSink(String[] args) {
00084
00085
try {
00086 ArgHandler ah=
new ArgHandler(args);
00087
if (ah.checkFlag(
'h')) {
00088 printUsage();
00089 System.exit(0);
00090 }
00091
if (ah.checkFlag(
's')) {
00092 String a=ah.getOption(
's');
00093
if (a!=null) serverName=a;
00094 }
00095
if (ah.checkFlag(
'p')) {
00096 String a=ah.getOption(
'p');
00097
if (a!=null) serverPort=a;
00098 }
00099
if (ah.checkFlag(
'c')) {
00100 String a=ah.getOption(
'c');
00101
if (a!=null) searchChannelPath=a;
00102 }
00103
if (ah.checkFlag(
'k')) {
00104 String a=ah.getOption(
'k');
00105
if (a!=null) sinkName=a;
00106 }
00107
if (ah.checkFlag(
'd')) {
00108 String a=ah.getOption(
'd');
00109
if (a!=null) archiveDirectory=a;
00110 }
00111
if (ah.checkFlag(
'x')) {
00112 String a=ah.getOption(
'x');
00113
if (a!=null)
00114 {
00115
if (a.equals(
"true")) includeHidden =
true;
00116
if (a.equals(
"false")) includeHidden =
false;
00117 }
00118 }
00119
if (ah.checkFlag(
'a')) {
00120 String a=ah.getOption(
'a');
00121
if (a!=null)
00122 {
00123
try
00124 {
00125
double value = getTimeOrDouble(a);
00126 startTime = value;
00127 useStart =
true;
00128 }
00129
catch (Exception ingore) {}
00130 }
00131 }
00132
if (ah.checkFlag(
'z')) {
00133 String a=ah.getOption(
'z');
00134
if (a!=null)
00135 {
00136
try
00137 {
00138
double value = getTimeOrDouble(a);
00139 endTime = value;
00140 useDuration =
true;
00141 }
00142
catch (Exception ingore) {}
00143 }
00144 }
00145 }
catch (Exception e) {
00146 System.err.println(
"GrabDataMultipleSink argument exception "+e.getMessage());
00147 e.printStackTrace();
00148 System.exit(0);
00149 }
00150
00151
if (useDuration && !useStart)
00152 {
00153 useDuration =
false;
00154 System.out.println(
"Got End Time without a Start Time; ignored End Time.");
00155 }
00156
if (useStart && useDuration && (endTime < startTime))
00157 {
00158 useStart =
false;
00159 useDuration =
false;
00160 System.out.println(
"End Time is less then Start Time; ignoring both.");
00161 }
00162
00163
if (useDuration)
00164 {
00165 duration = endTime - startTime;
00166 }
00167
00168 server = serverName +
":" + serverPort;
00169
00170 System.out.println(
"Starting GrabDataMultipleSink on " + server +
" as " + sinkName);
00171 System.out.println(
" Requesting " + searchChannelPath);
00172 System.out.println(
" Use GrabDataMultipleSink -h to see optional parameters");
00173 }
00174
00175
public void exec()
00176 {
00177
try {
00178
00179 sink=
new Sink();
00180 sink.OpenRBNBConnection(server,sinkName);
00181
00182
00183 Vector collection =
new Vector();
00184 sMap =
new ChannelMap();
00185 sink.RequestRegistration();
00186 sMap = sink.Fetch(-1,sMap);
00187 ChannelTree tree = ChannelTree.createFromChannelMap(sMap);
00188
00189 Pattern p = Pattern.compile(searchChannelPath);
00190
00191
00192 Iterator nodes = tree.iterator();
00193
while (nodes.hasNext())
00194 {
00195 ChannelTree.Node n = (ChannelTree.Node)nodes.next();
00196 System.out.println(
"Checking " + n.getFullName() +
";" + n.getName());
00197
if (!includeHidden && n.getFullName().startsWith(
"_"))
continue;
00198
if (n.getType() != ChannelTree.CHANNEL)
continue;
00199 String name = n.getFullName();
00200 Matcher m = p.matcher(name);
00201
if (m.matches())
00202 {
00203 System.out.println(
"Matches");
00204
boolean isSource =
false;
00205 ChannelTree.Node upNode = n.getParent();
00206
while ((!isSource) || (upNode != null))
00207 {
00208
if (upNode.getType() == ChannelTree.SOURCE) isSource =
true;
00209 upNode = upNode.getParent();
00210 }
00211
if (isSource)
00212 {
00213 System.out.println(
"... and is a source.");
00214 collection.add(n);
00215 }
00216
else
00217 System.out.println(
"... and is NOT a source.");
00218 }
00219 }
00220
00221 Iterator channels = collection.iterator();
00222
00223
if (channels.hasNext())
00224 {
00225 System.out.println(
"GrabDataMultipleSink: Intending connectsions to server = "
00226 + server +
" as " + sinkName +
"monotering channels: ");
00227
while (channels.hasNext())
00228 {
00229 ChannelTree.Node candidate = (ChannelTree.Node)channels.next();
00230 System.out.println(candidate.getFullName());
00231
new GrabDataSink(candidate);
00232 }
00233 }
00234
else
00235 System.out.println(
"GrabDataMultipleSink: No channels to monitor.");
00236 sink.CloseRBNBConnection();
00237
00238 }
catch (SAPIException se) { se.printStackTrace(); }
00239 }
00240
00241
private double getTimeOrDouble(String arg)
throws Exception
00242 {
00243
double value = 0.0;
00244
boolean gotit =
false;
00245
00246
try{
00247 Date d = INPUT_FORMAT.parse(arg);
00248
long t = d.getTime();
00249 value = ((
double)t)/1000.0;
00250 gotit =
true;
00251 }
catch (Exception ignore)
00252 {
00253 gotit =
false;
00254 }
00255
00256
if (!gotit)
00257
try {
00258 value = Double.parseDouble(arg);
00259 gotit =
true;
00260 }
catch (Exception ignore)
00261 {
00262 gotit =
false;
00263 }
00264
00265
if (!gotit)
throw(
new Exception(
"Failed to parse time " + arg));
00266
00267
return value;
00268
00269 }
00270
00271
private class GrabDataSink
00272 {
00273 Sink sink = null;
00274 ChannelMap sMap;
00275
int index;
00276
00277 String name;
00278 PrintWriter out = null;
00279
00280
boolean connected =
false;
00281
00282 Thread stringDataThread;
00283
boolean runit =
false;
00284
00285 ChannelTree.Node itsNode;
00286
00287
public GrabDataSink(ChannelTree.Node node)
00288 {
00289 itsNode = node;
00290 name = itsNode.getFullName().replace(
'/',
'_');
00291
if (connect(name))
00292 {
00293 connected =
true;
00294
if (createFile())
00295 writeFileHeader();
00296 startThread();
00297 }
00298 }
00299
00300
private void writeFileHeader() {
00301
00302
00303 }
00304
00305
private boolean connect(String name)
00306 {
00307
try
00308 {
00309 sink=
new Sink();
00310 sink.OpenRBNBConnection(server,sinkName +
"_" + name);
00311 sMap =
new ChannelMap();
00312 index = sMap.Add(itsNode.getFullName());
00313 sink.Subscribe(sMap,
"newest");
00314 }
00315
catch (Throwable th)
00316 {
00317 System.out.println(
"Failed connection for " + name
00318 +
"; reason: " + th);
00319
return false;
00320 }
00321 System.out.println(
"Sucessful connection for " + name);
00322
return true;
00323 }
00324
00325
private boolean createFile()
00326 {
00327 ChannelTree.Node p = itsNode.getParent();
00328 Stack st =
new Stack();
00329
00330
00331
while (p != null)
00332 {
00333 st.push(p);
00334 p = p.getParent();
00335 }
00336
00337
00338 String path = archiveDirectory;
00339
while (!st.empty())
00340 {
00341 p = (ChannelTree.Node)st.pop();
00342 path = path + File.separator + p.getName();
00343 }
00344 File f =
new File(path);
00345 System.out.println(
"Appempting to create the directory path " + path);
00346 f.mkdirs();
00347
00348
00349 path = path + File.separator + itsNode.getName();
00350 System.out.println(
"Trying to create file for path "+ path);
00351
try {
00352 out =
new PrintWriter(
new FileWriter(path));
00353 }
catch (IOException e) {
00354 System.out.println(
"Can not create file " + e);
00355
return false;
00356 }
00357
return true;
00358 }
00359
00360
private void startThread()
00361 {
00362
00363
if (!connected)
return;
00364
00365
00366 Runnable r =
new Runnable() {
00367
public void run() {
00368 runWork();
00369 }
00370 };
00371 runit =
true;
00372 stringDataThread =
new Thread(r, itsNode.getName());
00373 stringDataThread.start();
00374 System.out.println(
"GrabDataSink: Started thread for " + itsNode.getFullName());
00375 }
00376
00377
public void stopThread()
00378 {
00379 runit =
false;
00380 stringDataThread.interrupt();
00381 System.out.println(
"GrabDataSink: Stopped thread for " + itsNode.getFullName());
00382 }
00383
00384
private void runWork ()
00385 {
00386
try {
00387
while(isRunning())
00388 {
00389 ChannelMap m = sink.Fetch(-1);
00390
if (m == null)
00391 {
00392 System.out.println(
"Data fetch failed for " + itsNode.getFullName());
00393
continue;
00394 }
00395
double startTime = m.GetTimeStart(index);
00396
double duration = m.GetTimeDuration(index);
00397
double[] data = m.GetDataAsFloat64(index);
00398
long unixTime = (
long)(startTime * 1000.0);
00399 String time1 = DATE_FORMAT.format(
new Date(unixTime));
00400 unixTime = (
long)((startTime + duration) * 1000.0);
00401 String time2 = DATE_FORMAT.format(
new Date(unixTime));
00402 System.out.println(
"Data Received (for "
00403 + time1 +
" to " + time2
00404 +
"GMT -- " + itsNode.getFullName() +
"): ");
00405 out.print(startTime +
" " + duration);
00406
for (
int i = 0; i < data.length; i++)
00407 {
00408 System.out.println(
" " + i +
": " + data[i]);
00409 out.print(
" " + data[i]);
00410 }
00411 out.println();
00412 out.flush();
00413 }
00414 }
catch (SAPIException se) {
00415 se.printStackTrace();
00416 }
00417 stringDataThread = null;
00418 }
00419
00420
private boolean isRunning()
00421 {
00422
return (connected && runit);
00423 }
00424
00425 }
00426
00427 }