00001
00002
00003
00004 package org.nees.rbnb;
00005
00006 import java.io.IOException;
00007
00008 import com.rbnb.sapi.*;
00009 import COM.Creare.Utility.ArgHandler;
00010 import COM.Creare.Utility.RBNBProcess;
00011
00012
00013
00017 public class StringSink {
00018
00019 private static final String SERVER = "localhost:3333";
00020 private static final String SINK_NAME = "GetSting";
00021 private static final String SOURCE_NAME = "Command";
00022 private static final String CHANNEL_NAME = "CommandData";
00023
00024 private String server = SERVER;
00025 private String sinkName = SINK_NAME;
00026 private String sourceName = SOURCE_NAME;
00027 private String channelName = CHANNEL_NAME;
00028 private String requestPath = sourceName + "/" + channelName;
00029
00030 Sink sink = null;
00031 ChannelMap sMap;
00032 int index;
00033 boolean connected = false;
00034
00035 Thread stringDataThread;
00036 boolean runit = false;
00037
00038 public static void main(String[] args) {
00039 StringSink w = new StringSink(args);
00040 w.exec();
00041 w.startThread();
00042 }
00043
00044 private void printUsage() {
00045 System.out.println("StringSink: usage is...");
00046 System.out.println("StringSink ");
00047 System.out.println("[-s server_hostname *" + SERVER + "] ");
00048 System.out.println("[-k Sink Name *" + SINK_NAME + " ]");
00049 System.out.println("[-n source_name *" + SOURCE_NAME + "] ");
00050 System.out.println("[-c channel_name *" + CHANNEL_NAME + "] ");
00051 }
00052
00053 public StringSink(String[] args) {
00054
00055 try {
00056 ArgHandler ah=new ArgHandler(args);
00057 if (ah.checkFlag('h')) {
00058 printUsage();
00059 RBNBProcess.exit(0);
00060 }
00061 if (ah.checkFlag('s')) {
00062 String a=ah.getOption('s');
00063 if (a!=null) server=a;
00064 }
00065 if (ah.checkFlag('n')) {
00066 String a=ah.getOption('n');
00067 if (a!=null) sourceName=a;
00068 }
00069 if (ah.checkFlag('c')) {
00070 String a=ah.getOption('c');
00071 if (a!=null) channelName=a;
00072 }
00073 if (ah.checkFlag('k')) {
00074 String a=ah.getOption('k');
00075 if (a!=null) sinkName=a;
00076 }
00077 } catch (Exception e) {
00078 System.err.println("StringSink argument exception "+e.getMessage());
00079 e.printStackTrace();
00080 RBNBProcess.exit(0);
00081 }
00082
00083 requestPath = sourceName + "/" + channelName;
00084
00085 System.out.println("Starting StringSink on " + server + " as " + sinkName);
00086 System.out.println(" Requesting " + requestPath);
00087 System.out.println(" Use StringSink -h to see optional parameters");
00088 }
00089
00090 public void exec()
00091 {
00092 try {
00093
00094 sink=new Sink();
00095 sink.OpenRBNBConnection(server,sinkName);
00096 sMap = new ChannelMap();
00097 index = sMap.Add(requestPath);
00098 sink.Subscribe(sMap,"newest");
00099 connected = true;
00100 System.out.println("StringSink: Connection made to sever = "
00101 + server + " as " + sinkName
00102 + " requesting " + requestPath + ".");
00103 } catch (SAPIException se) { se.printStackTrace(); }
00104 }
00105
00106 public void startThread()
00107 {
00108
00109 if (!connected) return;
00110
00111
00112 Runnable r = new Runnable() {
00113 public void run() {
00114 runWork();
00115 }
00116 };
00117 runit = true;
00118 stringDataThread = new Thread(r, "StringData");
00119 stringDataThread.start();
00120 System.out.println("StringSink: Started thread.");
00121 }
00122
00123 public void stopThread()
00124 {
00125 runit = false;
00126 stringDataThread.interrupt();
00127 System.out.println("StringSink: Stopped thread.");
00128 }
00129
00130 private void runWork ()
00131 {
00132 try {
00133 while(isRunning())
00134 {
00135 ChannelMap m = sink.Fetch(-1);
00136 if (m == null)
00137 {
00138 System.out.println("Data fetch failed.");
00139 continue;
00140 }
00141 String[] st = m.GetDataAsString(index);
00142 System.out.println("Command(s) Received: ");
00143 for (int i = 0; i < st.length; i++)
00144 {
00145 System.out.println(st[i]);
00146 }
00147 }
00148 } catch (SAPIException se) {
00149 se.printStackTrace();
00150 }
00151 stringDataThread = null;
00152 }
00153
00154 public boolean isRunning()
00155 {
00156 return (connected && runit);
00157 }
00158
00159
00160 }