00001
00002
00003
00004
package org.nees.rbnb;
00005
00006
import java.io.IOException;
00007
import java.util.Date;
00008
import java.util.TimeZone;
00009
import java.text.SimpleDateFormat;
00010
00011
import com.rbnb.sapi.*;
00012
00013
import com.rbnb.utility.ArgHandler;
00014
00019 public class NumberSubSample extends RBNBBase {
00020
00021
private static final String SINK_NAME_NAME =
"SinkName";
00022
private static final String SINK_NAME =
"NumberSubSampleSink";
00023
private static final String SOURCE_NAME_NAME =
"SourceName";
00024
private static final String SOURCE_NAME =
"NumberSubSampleSource";
00025
private static final String CHANNEL_NAME_NAME =
"ChannelName";
00026
private static final String CHANNEL_NAME =
"SubSample";
00027
private static final String SKIP_NAME =
"SamplesToSkip";
00028
private static final int SKIP = 4;
00029
private static final String REQUEST_PATH_NAME =
"RequestPath";
00030
private static final String REQUEST_PATH =
"undefined";
00031
private static final String CACHE_SIZE_NAME =
"CacheSize";
00032
private static final int DEFAULT_CACHE_SIZE=100;
00033
00034
private String sinkName = SINK_NAME;
00035
private String sourceName = SOURCE_NAME;
00036
private String channelName = CHANNEL_NAME;
00037
private String requestPath = REQUEST_PATH;
00038
private int skip = SKIP;
00039
private int count = 0;
00040
private int cacheSize = DEFAULT_CACHE_SIZE;
00041
00042 Sink sink = null;
00043 Source source = null;
00044
boolean sinkConnected =
false;
00045
boolean sourceConnected =
false;
00046
00047 Thread stringDataThread;
00048
boolean runit =
false;
00049
00050
private String[] parameterNameArray;
00051
private String[] parameterTypeArray;
00052
private Object[] parameterDefaultArray;
00053
00054
public static void main(String[] args) {
00055
NumberSubSample s =
new NumberSubSample();
00056
if (s.
setArgs(args))
00057 {
00058 s.
connect();
00059 s.
startThread();
00060 }
00061 }
00062
00063
public NumberSubSample()
00064 {
00065 setParamterArrays();
00066 initialize();
00067 }
00068
00069
public void printUsage() {
00070 System.out.println(
"NumberSubSample: usage is...");
00071 System.out.println(
"NumberSubSample ");
00072 super.printUsage();
00073 System.out.println(
"[-S Source Name *" + SOURCE_NAME +
"] ");
00074 System.out.println(
"[-C Source Channel Name *" + CHANNEL_NAME +
"] ");
00075 System.out.println(
"[-K Sink Name *" + SINK_NAME +
" ]");
00076 System.out.println(
"[-x Samples to skip *" + SKIP +
" ]");
00077 System.out.println(
"-P Request Path (required) e.g. \"Source/data\" ");
00078 }
00079
00080
00081
00082
00083
protected boolean setInstanceArgs(String[] args)
throws IllegalArgumentException
00084 {
00085
00086
try {
00087 ArgHandler ah=
new ArgHandler(args);
00088
if (ah.checkFlag(
'S')) {
00089 String a=ah.getOption(
'S');
00090
if (a!=null) sourceName=a;
00091 }
00092
if (ah.checkFlag(
'C')) {
00093 String a=ah.getOption(
'C');
00094
if (a!=null) channelName=a;
00095 }
00096
if (ah.checkFlag(
'K')) {
00097 String a=ah.getOption(
'K');
00098
if (a!=null) sinkName=a;
00099 }
00100
if (ah.checkFlag(
'P'))
00101 {
00102 String a=ah.getOption(
'P');
00103
if (a!=null) requestPath=a;
00104 }
00105 }
catch (Exception e) {
00106
throw new IllegalArgumentException(
"NumberSubSample argument exception "+e.getMessage());
00107 }
00108
00109
if(requestPath.equals(REQUEST_PATH))
00110 {
00111 System.out.println(
"Request Path is required");
00112 printUsage();
00113
return false;
00114 }
00115
00116 System.out.println(
"Starting NumberSubSample on " + getServer() +
" as " + sinkName);
00117 System.out.println(
" Requesting " + requestPath +
";\n supplying number as " +
00118
"source = " + sourceName +
", channel = " + channelName);
00119 System.out.println(
" Use NumberSubSample -h to see optional parameters");
00120
return true;
00121 }
00122
00123
00124
00125
00126
protected void setParamterArrays() {
00127 String [] pNameArray = super.getBaseParameterNameArray();
00128 String [] pTypeArray = super.getBaseParameterTypeArray();
00129 Object [] pDefaultArray = super.getBaseParameterDefaultArray();
00130
00131
int base = pNameArray.length;
00132
int numberOfparameters = 6;
00133 parameterNameArray =
new String[base + numberOfparameters];
00134 parameterTypeArray =
new String[base + numberOfparameters];
00135 parameterDefaultArray =
new Object[base + numberOfparameters];
00136
00137
for (
int i = 0; i < base; i++)
00138 {
00139 parameterNameArray[i] = pNameArray[i];
00140 parameterTypeArray[i] = pTypeArray[i];
00141 parameterDefaultArray[i] = pDefaultArray[i];
00142 }
00143
00144 parameterNameArray[base + 0] = SINK_NAME_NAME;
00145 parameterTypeArray[base + 0] =
RBNBBaseParameterHolder.STRING_TYPE;
00146 parameterDefaultArray[base + 0] =
new String(SINK_NAME);
00147
00148 parameterNameArray[base + 1] = SOURCE_NAME_NAME;
00149 parameterTypeArray[base + 1] =
RBNBBaseParameterHolder.STRING_TYPE;
00150 parameterDefaultArray[base + 1] =
new String(SOURCE_NAME);
00151
00152 parameterNameArray[base + 2] = CHANNEL_NAME_NAME;
00153 parameterTypeArray[base + 2] =
RBNBBaseParameterHolder.STRING_TYPE;
00154 parameterDefaultArray[base + 2] =
new String(CHANNEL_NAME);
00155
00156 parameterNameArray[base + 3] = SKIP_NAME;
00157 parameterTypeArray[base + 3] =
RBNBBaseParameterHolder.INTEGER_TYPE;
00158 parameterDefaultArray[base + 3] =
new Integer(SKIP);
00159
00160 parameterNameArray[base + 4] = REQUEST_PATH_NAME;
00161 parameterTypeArray[base + 4] =
RBNBBaseParameterHolder.STRING_TYPE;
00162 parameterDefaultArray[base + 4] =
new String(REQUEST_PATH);
00163
00164 parameterNameArray[base + 5] = CACHE_SIZE_NAME;
00165 parameterTypeArray[base + 5] =
RBNBBaseParameterHolder.INTEGER_TYPE;
00166 parameterDefaultArray[base + 5] =
new Integer(DEFAULT_CACHE_SIZE);
00167 }
00168
00169
00170
00171
00172 public void setVariablesFromParameters() {
00173 super.setBaseVarialbesFromParametes();
00174
00175 sinkName = SINK_NAME;
00176
try{
00177 Object obj = getValueWithDefault(SINK_NAME_NAME, SINK_NAME);
00178 sinkName = (String)obj;
00179 }
catch (Throwable ignore){}
00180
00181 sourceName = SOURCE_NAME;
00182
try{
00183 Object obj = getValueWithDefault(SOURCE_NAME_NAME, SOURCE_NAME);
00184 sourceName = (String)obj;
00185 }
catch (Throwable ignore){}
00186
00187 channelName = CHANNEL_NAME;
00188
try{
00189 Object obj = getValueWithDefault(CHANNEL_NAME_NAME, CHANNEL_NAME);
00190 channelName = (String)obj;
00191 }
catch (Throwable ignore){}
00192
00193 requestPath = REQUEST_PATH;
00194
try{
00195 Object obj = getValueWithDefault(REQUEST_PATH_NAME, REQUEST_PATH);
00196 requestPath = (String)obj;
00197 }
catch (Throwable ignore){}
00198
00199 skip = SKIP;
00200
try{
00201 Integer i =
new Integer(SKIP);
00202 Object obj = getValueWithDefault(SKIP_NAME, i);
00203 skip = ((Integer)obj).intValue();
00204 }
catch (Throwable ignore){}
00205
00206 cacheSize = DEFAULT_CACHE_SIZE;
00207
try{
00208 Integer i =
new Integer(DEFAULT_CACHE_SIZE);
00209 Object obj = getValueWithDefault(CACHE_SIZE_NAME, i);
00210 cacheSize = ((Integer)obj).intValue();
00211 }
catch (Throwable ignore){}
00212
00213 }
00214
00215
public void connect()
00216 {
00217
try {
00218
00219 sink =
new Sink();
00220 sink.OpenRBNBConnection(getServer(),sinkName);
00221 ChannelMap sMap =
new ChannelMap();
00222
int index = sMap.Add(requestPath);
00223 sink.Subscribe(sMap,
"newest");
00224 sinkConnected =
true;
00225 System.out.println(
"NumberSubSample: Sink connection made to server = "
00226 + getServer() +
" as " + sinkName
00227 +
" requesting " + requestPath +
".");
00228 }
catch (SAPIException se) { se.printStackTrace(); }
00229
00230
try {
00231
00232 source=
new Source(cacheSize,
"none", 0);
00233 source.OpenRBNBConnection(getServer(),sourceName);
00234 sourceConnected =
true;
00235 System.out.println(
"NumberSubSample: Source connection made to server = "
00236 + getServer() +
" as " + sourceName +
" with " + channelName +
".");
00237 }
catch (SAPIException se) { se.printStackTrace(); }
00238
00239 }
00240
00241
private void disconnect() {
00242 source.CloseRBNBConnection();
00243 source = null;
00244 sourceConnected =
false;
00245 sink.CloseRBNBConnection();
00246 sink = null;
00247 sinkConnected =
false;
00248 }
00249
00250
private boolean connected()
00251 {
00252
return sourceConnected && sinkConnected;
00253 }
00254
00255
public void startThread()
00256 {
00257
00258
if (!connected())
return;
00259
00260
00261 Runnable r =
new Runnable() {
00262
public void run() {
00263 runWork();
00264 }
00265 };
00266 runit =
true;
00267 stringDataThread =
new Thread(r,
"NumberSubSample");
00268 stringDataThread.start();
00269 System.out.println(
"NumberSubSample: Started thread.");
00270 }
00271
00272
public void stopThread()
00273 {
00274 runit =
false;
00275 stringDataThread.interrupt();
00276 System.out.println(
"NumberSubSample: Stopped thread.");
00277 }
00278
00279
private void runWork ()
00280 {
00281
try {
00282
while(
isRunning())
00283 {
00284 ChannelMap inMap = sink.Fetch(-1);
00285
double[] data = inMap.GetDataAsFloat64(0);
00286
double start = inMap.GetTimeStart(0);
00287
double duration = inMap.GetTimeDuration(0);
00288 count++;
00289
if (count > (skip + 1))
00290 {
00291 count = 0;
00292 ChannelMap outMap =
new ChannelMap();
00293
int index = outMap.Add(channelName);
00294 outMap.PutTime(start, duration);
00295 outMap.PutDataAsFloat64(index,data);
00296 source.Flush(outMap);
00297 }
00298 }
00299 }
catch (SAPIException se) {
00300 se.printStackTrace();
00301 }
00302 stringDataThread = null;
00303 }
00304
00305 public boolean isRunning()
00306 {
00307
return (connected() && runit);
00308 }
00309
00310 public String[]
getParameterNameArray() {
00311
return parameterNameArray;
00312 }
00313
00314 public String[]
getParameterTypeArray() {
00315
return parameterTypeArray;
00316 }
00317
00318 public Object[]
getParameterDefaultArray() {
00319
return parameterDefaultArray;
00320 }
00321
00322
00323
00324
00325 public boolean start() {
00326
if (
isRunning())
return false;
00327
if (connected()) disconnect();
00328
setVariablesFromParameters();
00329 connect();
00330
if (!connected())
return false;
00331 startThread();
00332
return true;
00333 }
00334
00335
00336
00337
00338 public boolean stop() {
00339
if (!
isRunning())
return false;
00340 stopThread();
00341 disconnect();
00342
return true;
00343 }
00344
00345 }