Main Page | Class Hierarchy | Class List | File List | Class Members | Related Pages

GrabDataMultipleSink.java

00001 /* 00002 * Created on April 16, 2004 00003 */ 00004 package org.nees.rbnb; 00005 00006 import java.io.IOException; 00007 import java.io.PrintWriter; 00008 import java.io.FileWriter; 00009 import java.io.File; 00010 import java.util.Vector; 00011 import java.util.Iterator; 00012 import java.util.Stack; 00013 import java.util.Date; 00014 import java.util.TimeZone; 00015 import java.text.SimpleDateFormat; 00016 00017 import java.util.regex.*; 00018 00019 import com.rbnb.sapi.*; 00020 //import COM.Creare.Utility.ArgHandler; //for argument parsing 00021 import com.rbnb.utility.ArgHandler; //for argument parsing 00022 00026 public class GrabDataMultipleSink { 00027 00028 private static final String SERVER_NAME = "localhost"; 00029 private static final String SERVER_PORT = "3333"; 00030 private static final String SINK_NAME = "GrabDataMutiple"; 00031 private static final String SEARCH_CHANNEL_PATH = ".*"; 00032 private static final String ARCHIVE_DIRECTORY = "."; 00033 00034 private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("MMM d, yyyy h:mm aa"); 00035 private static final TimeZone TZ = TimeZone.getTimeZone("GMT"); 00036 00037 private static final SimpleDateFormat INPUT_FORMAT = new SimpleDateFormat("yyyy-MM-dd;hh:mm:ss.SSS"); 00038 00039 static 00040 { 00041 DATE_FORMAT.setTimeZone(TZ); 00042 INPUT_FORMAT.setTimeZone(TZ); 00043 } 00044 00045 private String serverName = SERVER_NAME; 00046 private String serverPort = SERVER_PORT; 00047 private String server = serverName + ":" + serverPort; 00048 private String sinkName = SINK_NAME; 00049 private String searchChannelPath = SEARCH_CHANNEL_PATH; 00050 private String archiveDirectory = ARCHIVE_DIRECTORY; 00051 private boolean includeHidden = false; 00052 00053 private Sink sink = null; 00054 private ChannelMap sMap; 00055 private int index; 00056 00057 private boolean useStart = false; 00058 private double startTime; 00059 private boolean useDuration = false; 00060 private double endTime; 00061 private double duration; 00062 00063 public static void main(String[] args) { 00064 GrabDataMultipleSink s = new GrabDataMultipleSink(args); 00065 s.exec(); 00066 } // main 00067 00068 private void printUsage() { 00069 System.out.println("GrabDataMultipleSink: usage is..."); 00070 System.out.println("GrabDataMultipleSink "); 00071 System.out.println("[-s Server Hostname *" + SERVER_NAME + "] "); 00072 System.out.println("[-o Server Port Number *" + SERVER_PORT + "] "); 00073 System.out.println("[-k Sink Name *" + SINK_NAME + " ]"); 00074 System.out.println("[-c Channel Path Pattern *" + SEARCH_CHANNEL_PATH + "] "); 00075 System.out.println("[-d Archive directory root *" + ARCHIVE_DIRECTORY + "]"); 00076 System.out.println("[-x Include Hidden (true|false) *false]"); 00077 System.out.println("[-a Start Time *now]"); 00078 System.out.println("[-z End Time *<unspecified>]"); 00079 System.out.println("Note: times can either be yyyy-mm-dd;hh:mm:ss.nnn or"); 00080 System.out.println("an arbitraty floating point number"); 00081 } // printUsage 00082 00083 public GrabDataMultipleSink(String[] args) { 00084 //parse args 00085 try { 00086 ArgHandler ah=new ArgHandler(args); 00087 if (ah.checkFlag('h')) { 00088 printUsage(); 00089 System.exit(0); 00090 } 00091 if (ah.checkFlag('s')) { 00092 String a=ah.getOption('s'); 00093 if (a!=null) serverName=a; 00094 } 00095 if (ah.checkFlag('p')) { 00096 String a=ah.getOption('p'); 00097 if (a!=null) serverPort=a; 00098 } 00099 if (ah.checkFlag('c')) { 00100 String a=ah.getOption('c'); 00101 if (a!=null) searchChannelPath=a; 00102 } 00103 if (ah.checkFlag('k')) { 00104 String a=ah.getOption('k'); 00105 if (a!=null) sinkName=a; 00106 } 00107 if (ah.checkFlag('d')) { 00108 String a=ah.getOption('d'); 00109 if (a!=null) archiveDirectory=a; 00110 } 00111 if (ah.checkFlag('x')) { 00112 String a=ah.getOption('x'); 00113 if (a!=null) 00114 { 00115 if (a.equals("true")) includeHidden = true; 00116 if (a.equals("false")) includeHidden = false; 00117 } 00118 } 00119 if (ah.checkFlag('a')) { 00120 String a=ah.getOption('a'); 00121 if (a!=null) 00122 { 00123 try 00124 { 00125 double value = getTimeOrDouble(a); 00126 startTime = value; 00127 useStart = true; 00128 } 00129 catch (Exception ingore) {} 00130 } 00131 } 00132 if (ah.checkFlag('z')) { 00133 String a=ah.getOption('z'); 00134 if (a!=null) 00135 { 00136 try 00137 { 00138 double value = getTimeOrDouble(a); 00139 endTime = value; 00140 useDuration = true; 00141 } 00142 catch (Exception ingore) {} 00143 } 00144 } 00145 } catch (Exception e) { 00146 System.err.println("GrabDataMultipleSink argument exception "+e.getMessage()); 00147 e.printStackTrace(); 00148 System.exit(0); 00149 } 00150 00151 if (useDuration && !useStart) // gat a endTime without a startTime 00152 { 00153 useDuration = false; 00154 System.out.println("Got End Time without a Start Time; ignored End Time."); 00155 } 00156 if (useStart && useDuration && (endTime < startTime)) 00157 { 00158 useStart = false; 00159 useDuration = false; 00160 System.out.println("End Time is less then Start Time; ignoring both."); 00161 } 00162 00163 if (useDuration) 00164 { 00165 duration = endTime - startTime; 00166 } 00167 00168 server = serverName + ":" + serverPort; 00169 00170 System.out.println("Starting GrabDataMultipleSink on " + server + " as " + sinkName); 00171 System.out.println(" Requesting " + searchChannelPath); 00172 System.out.println(" Use GrabDataMultipleSink -h to see optional parameters"); 00173 } // GrabDataMultipleSink 00174 00175 public void exec() 00176 { 00177 try { 00178 // Create a sink and connect: 00179 sink=new Sink(); 00180 sink.OpenRBNBConnection(server,sinkName); 00181 00182 // get all the channel paths that match the pattern 00183 Vector collection = new Vector(); 00184 sMap = new ChannelMap(); 00185 sink.RequestRegistration(); 00186 sMap = sink.Fetch(-1,sMap); 00187 ChannelTree tree = ChannelTree.createFromChannelMap(sMap); 00188 00189 Pattern p = Pattern.compile(searchChannelPath); 00190 // for each channel path, check match, collect matches... 00191 00192 Iterator nodes = tree.iterator(); 00193 while (nodes.hasNext()) 00194 { 00195 ChannelTree.Node n = (ChannelTree.Node)nodes.next(); 00196 System.out.println("Checking " + n.getFullName() + ";" + n.getName()); 00197 if (!includeHidden && n.getFullName().startsWith("_")) continue; 00198 if (n.getType() != ChannelTree.CHANNEL) continue; 00199 String name = n.getFullName(); 00200 Matcher m = p.matcher(name); 00201 if (m.matches()) 00202 { 00203 System.out.println("Matches"); 00204 boolean isSource = false; 00205 ChannelTree.Node upNode = n.getParent(); 00206 while ((!isSource) || (upNode != null)) 00207 { 00208 if (upNode.getType() == ChannelTree.SOURCE) isSource = true; 00209 upNode = upNode.getParent(); 00210 } 00211 if (isSource) 00212 { 00213 System.out.println("... and is a source."); 00214 collection.add(n); 00215 } 00216 else 00217 System.out.println("... and is NOT a source."); 00218 } 00219 } 00220 00221 Iterator channels = collection.iterator(); 00222 00223 if (channels.hasNext()) 00224 { 00225 System.out.println("GrabDataMultipleSink: Intending connectsions to server = " 00226 + server + " as " + sinkName + "monotering channels: "); 00227 while (channels.hasNext()) 00228 { 00229 ChannelTree.Node candidate = (ChannelTree.Node)channels.next(); 00230 System.out.println(candidate.getFullName()); 00231 new GrabDataSink(candidate); 00232 } 00233 } 00234 else 00235 System.out.println("GrabDataMultipleSink: No channels to monitor."); 00236 sink.CloseRBNBConnection(); 00237 00238 } catch (SAPIException se) { se.printStackTrace(); } 00239 } // exec 00240 00241 private double getTimeOrDouble(String arg) throws Exception 00242 { 00243 double value = 0.0; 00244 boolean gotit = false; 00245 00246 try{ 00247 Date d = INPUT_FORMAT.parse(arg); 00248 long t = d.getTime(); 00249 value = ((double)t)/1000.0; 00250 gotit = true; 00251 } catch (Exception ignore) 00252 { 00253 gotit = false; 00254 } 00255 00256 if (!gotit) 00257 try { 00258 value = Double.parseDouble(arg); 00259 gotit = true; 00260 } catch (Exception ignore) 00261 { 00262 gotit = false; 00263 } 00264 00265 if (!gotit) throw(new Exception("Failed to parse time " + arg)); 00266 00267 return value; 00268 00269 } // getTimeOrDouble 00270 00271 private class GrabDataSink 00272 { 00273 Sink sink = null; 00274 ChannelMap sMap; 00275 int index; 00276 00277 String name; 00278 PrintWriter out = null; 00279 00280 boolean connected = false; 00281 00282 Thread stringDataThread; 00283 boolean runit = false; 00284 00285 ChannelTree.Node itsNode; 00286 00287 public GrabDataSink(ChannelTree.Node node) 00288 { 00289 itsNode = node; 00290 name = itsNode.getFullName().replace('/','_'); 00291 if (connect(name)) 00292 { 00293 connected = true; 00294 if (createFile()) 00295 writeFileHeader(); 00296 startThread(); 00297 } 00298 } // GrabDataSink 00299 00300 private void writeFileHeader() { 00301 // TODO Auto-generated method stub 00302 00303 } // writeFileHeader() 00304 00305 private boolean connect(String name) 00306 { 00307 try 00308 { 00309 sink=new Sink(); 00310 sink.OpenRBNBConnection(server,sinkName + "_" + name); 00311 sMap = new ChannelMap(); 00312 index = sMap.Add(itsNode.getFullName()); 00313 sink.Subscribe(sMap,"newest"); 00314 } 00315 catch (Throwable th) 00316 { 00317 System.out.println("Failed connection for " + name 00318 + "; reason: " + th); 00319 return false; 00320 } 00321 System.out.println("Sucessful connection for " + name); 00322 return true; 00323 } // connect 00324 00325 private boolean createFile() 00326 { 00327 ChannelTree.Node p = itsNode.getParent(); 00328 Stack st = new Stack(); 00329 00330 // get the path for the file 00331 while (p != null) 00332 { 00333 st.push(p); 00334 p = p.getParent(); 00335 } 00336 00337 // make sure that the directories on the path exist 00338 String path = archiveDirectory; 00339 while (!st.empty()) 00340 { 00341 p = (ChannelTree.Node)st.pop(); 00342 path = path + File.separator + p.getName(); 00343 } 00344 File f = new File(path); 00345 System.out.println("Appempting to create the directory path " + path); 00346 f.mkdirs(); 00347 00348 // Open file on path 00349 path = path + File.separator + itsNode.getName(); 00350 System.out.println("Trying to create file for path "+ path); 00351 try { 00352 out = new PrintWriter(new FileWriter(path)); 00353 } catch (IOException e) { 00354 System.out.println("Can not create file " + e); 00355 return false; 00356 } 00357 return true; 00358 } // createFile 00359 00360 private void startThread() 00361 { 00362 00363 if (!connected) return; 00364 00365 // Use this inner class to hide the public run method 00366 Runnable r = new Runnable() { 00367 public void run() { 00368 runWork(); 00369 } 00370 }; 00371 runit = true; 00372 stringDataThread = new Thread(r, itsNode.getName()); 00373 stringDataThread.start(); 00374 System.out.println("GrabDataSink: Started thread for " + itsNode.getFullName()); 00375 } // startThread 00376 00377 public void stopThread() 00378 { 00379 runit = false; 00380 stringDataThread.interrupt(); 00381 System.out.println("GrabDataSink: Stopped thread for " + itsNode.getFullName()); 00382 } // stopThread 00383 00384 private void runWork () 00385 { 00386 try { 00387 while(isRunning()) 00388 { 00389 ChannelMap m = sink.Fetch(-1); 00390 if (m == null) 00391 { 00392 System.out.println("Data fetch failed for " + itsNode.getFullName()); 00393 continue; 00394 } 00395 double startTime = m.GetTimeStart(index); 00396 double duration = m.GetTimeDuration(index); 00397 double[] data = m.GetDataAsFloat64(index); 00398 long unixTime = (long)(startTime * 1000.0); // convert sec to millisec 00399 String time1 = DATE_FORMAT.format(new Date(unixTime)); 00400 unixTime = (long)((startTime + duration) * 1000.0); 00401 String time2 = DATE_FORMAT.format(new Date(unixTime)); 00402 System.out.println("Data Received (for " 00403 + time1 + " to " + time2 00404 + "GMT -- " + itsNode.getFullName() + "): "); 00405 out.print(startTime + " " + duration); 00406 for (int i = 0; i < data.length; i++) 00407 { 00408 System.out.println(" " + i + ": " + data[i]); 00409 out.print(" " + data[i]); 00410 } 00411 out.println(); 00412 out.flush(); 00413 } 00414 } catch (SAPIException se) { 00415 se.printStackTrace(); 00416 } 00417 stringDataThread = null; 00418 } // runWork 00419 00420 private boolean isRunning() 00421 { 00422 return (connected && runit); 00423 } // isRunning 00424 00425 } // class GrabDataSink 00426 00427 }

Generated on Tue Aug 24 11:12:25 2004 for Data turbine for NEESGrid by doxygen 1.3.7