Pages

Friday, August 28, 2009

javacvs – Netbeans lib to access CVS in java, Tutorial

Well, I din expect i will have a topic to write so soon after join for MS course. But, thanks to Dr. Li I had a chance to work with java again!!! I love java! So, here goes a tutorial which explains how to use javacvs lib.

I will explain important snippets here, will provide the complete source at the end of the tutorial.

Step 1: Create the CVS Root object defining the meta data such as username, host etc To do this, we have to read the CVS/Root file.

File root = new File(cvsPath + PATH_DELIM + cvsRoot);        
if(true == root.exists()){
BufferedReader bufferedReader = new BufferedReader(new FileReader(root));
if(null != bufferedReader){
System.out.println(MSG_CAPTURE_CVS_DETAIL);
String cvsRootData = bufferedReader.readLine();
CVSRoot rootData = CVSRoot.parse(cvsRootData);//Create the CVS Root object defining the meta data such as username, host etc


Step 2: Set the global options, needed for executing commands.
GlobalOptions globalOptions = new GlobalOptions();
globalOptions.setCVSRoot(cvsRootData);

Step 3: Obtain the connection with the credentials and the details provided and then open the connection.
PServerConnection connection = new PServerConnection(rootData);
connection.open();

Step 4: Create a client that can execute the cvs log command, and register the listener to be invoked for CVS outputs. The loglistener is explained at the end.
Client client = new Client(connection, new StandardAdminHandler());     
client.setLocalPath(cvsPath);
client.getEventManager().addCVSListener(new LogListener());

Step 5: The log command that does the job of getting the log for every file in the directory. The log builder is responsible to invoke fileinfo events on the loglistener.
LogCommand command = new LogCommand();
command.setRecursive(true);
Builder builder = new LogBuilder(client.getEventManager(), command);
command.setBuilder(builder);

Step 6: Initialize the writers, and execute the command, close the writers..
xl = new File(F_OUTPUT_CSV);
xlWriter = new BufferedWriter(new FileWriter(xl));
xlWriter.write(HEADER);
client.executeCommand(command, globalOptions);
xlWriter.close();
bufferedReader.close();

LogListener
It extends BasicListener, and overrides the messageSent with an empty definition. It also overrides the fileInfoGenerated method, where the actual processing of each file is done. That is log each of  file. The snippet is below.
//Handle control to Super class.
super.fileInfoGenerated(fileinfoevent);

//Get the log information for the current file event.
LogInformation infoContainer = (LogInformation) fileinfoevent.getInfoContainer();
try {
//Log to Excel in csv format.
logToExcel(infoContainer);
} catch (IOException e) {
//Just print trace, and try logging the next file event.
e.printStackTrace();
}

logToExcel is a simple method that writes the log to the file. The datastructure LogInformation has data related to the file being processed, all the revisions of the file, head revision, the message associated with each revision and lot of more details. I use only the ones I mentioned.
The complete code including the LogListener is below. Ignore the logic that is being done at logToExcel method, that some thing related to my work. But, the goal is to know that fileInfoEvent has LogInformation, that can be used to see data. You can write different listeners, builders in the similar way.
/**
*
*/
package com.ssb.nb.cvs;
/*
* Java SE Packages
*/
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.StringTokenizer;

/*
* Java CVS Packages
*/
import org.netbeans.lib.cvsclient.CVSRoot;
import org.netbeans.lib.cvsclient.Client;
import org.netbeans.lib.cvsclient.admin.StandardAdminHandler;
import org.netbeans.lib.cvsclient.command.Builder;
import org.netbeans.lib.cvsclient.command.CommandException;
import org.netbeans.lib.cvsclient.command.GlobalOptions;
import org.netbeans.lib.cvsclient.command.log.LogBuilder;
import org.netbeans.lib.cvsclient.command.log.LogCommand;
import org.netbeans.lib.cvsclient.command.log.LogInformation;
import org.netbeans.lib.cvsclient.command.log.LogInformation.Revision;
import org.netbeans.lib.cvsclient.commandLine.BasicListener;
import org.netbeans.lib.cvsclient.connection.AuthenticationException;
import org.netbeans.lib.cvsclient.connection.PServerConnection;
import org.netbeans.lib.cvsclient.event.FileInfoEvent;
import org.netbeans.lib.cvsclient.event.MessageEvent;

/**
* @author Shreyas Purohit
*
*/
public class NBCvs {
/*
* Private Static variables
*/
private static File xl;
private static BufferedWriter xlWriter;
private static String cvsPath = "F:/EWorkspace/org.eclipse.core.commands";
private static String F_OUTPUT_CSV = "F:/output.csv";

/*
* Private Constants
*/
private static final String APP_NAME = "CVS2l : ";
private static final String MSG_METHOD = " method: ";
private static final String MSG_USERNAME = " username: ";
private static final String MSG_REPOSITORY = " repository: ";
private static final String MSG_PORT = " port: ";
private static final String MSG_HOST = "Using host: ";
private static final String cvsRoot = "CVS/Root";
private static final String PATH_DELIM = "/";
private static final String MSG_EXIT_SUCCESS = APP_NAME + "Completed sucessfully, terminating process.";
private static final String MSG_EXEC_LOG = APP_NAME + "Executing log command";
private static final String MSG_INIT_OUTPUT = APP_NAME + "Initializing output file";
private static final String MSG_CONNECTING_CVS = APP_NAME + "Connecting to CVS";
private static final String MSG_CAPTURE_CVS_DETAIL = APP_NAME + "Capturing CVS metadata";
private static final String MSG_LOADED_PROP = APP_NAME + "Loaded required properties";
private static final String MSG_PROP_NOT_READABLE = APP_NAME + "Could not read tool.properties, make sure it is not being used by some other application";
private static final String MSG_EXITING = APP_NAME + "Exiting..";
private static final String MSG_PROP_NOT_FOUND = APP_NAME + "Could not find tool.properties, make sure it exists in same directory as the cvs2l.jar";
private static final String TOOL_PROPERTIES = "tool.properties";
private static final String MSG_INIT = APP_NAME + "Initializing";
private static final String HEADER = "File Name,Package,Revision,Bug no.,Description\n";

/*
* Static block to load properties.
*/
static{
System.out.println(MSG_INIT);
Properties prop = new Properties();
try {
prop.load(new FileInputStream(TOOL_PROPERTIES));
} catch (FileNotFoundException e) {
System.out.println(MSG_PROP_NOT_FOUND);
splashExitMsg();
systemExit();
} catch (IOException e) {
System.out.println(MSG_PROP_NOT_READABLE);
splashExitMsg();
systemExit();
}
cvsPath = prop.getProperty("CVS_PATH");
F_OUTPUT_CSV = prop.getProperty("OUTPUT");
System.out.println(MSG_LOADED_PROP);
}

/**
* Displays exit message
*/
private static void splashExitMsg() {
System.out.println(MSG_EXITING);
}

/**
* Exits system
*/
private static void systemExit() {
System.exit(1);
}

/**
* @param args
* @throws IOException
* @throws FileNotFoundException
* @throws AuthenticationException
* @throws CommandException
*/
public static void main(String[] args) throws FileNotFoundException, IOException, AuthenticationException, CommandException {
/*
* Get the path to the CVS Root folder, connect to CVS and execute the log command to retrieve the
* file revision, package, bug number and description.
*/
File root = new File(cvsPath + PATH_DELIM + cvsRoot);
if(true == root.exists()){
BufferedReader bufferedReader = new BufferedReader(new FileReader(root));
if(null != bufferedReader){
System.out.println(MSG_CAPTURE_CVS_DETAIL);
String cvsRootData = bufferedReader.readLine();
CVSRoot rootData = CVSRoot.parse(cvsRootData);//Create the CVS Root object defining the meta data such as username, host etc

GlobalOptions globalOptions = new GlobalOptions();
globalOptions.setCVSRoot(cvsRootData); //Set the global options, needed for executing commands.

System.out.println(APP_NAME + MSG_HOST + rootData.getHostName() + MSG_PORT + rootData.getPort() + MSG_REPOSITORY + rootData.getRepository() + MSG_USERNAME + rootData.getUserName() + MSG_METHOD + rootData.getMethod());
System.out.println(MSG_CONNECTING_CVS);
PServerConnection connection = new PServerConnection(rootData);// Obtain the connection with the credentials and the details provided.
connection.open(); //Open the connection.

/*
* Create a client that can execute the cvs log command, and register the listener to be invoked for CVS outputs.
*/
Client client = new Client(connection, new StandardAdminHandler());
client.setLocalPath(cvsPath);
client.getEventManager().addCVSListener(new LogListener());

/*
* The log command that does the job of getting the log for every file in the
* directory.
* The log builder is responsible to invoke fileinfo events on the loglistner.
*/
LogCommand command = new LogCommand();
command.setRecursive(true);
Builder builder = new LogBuilder(client.getEventManager(), command);
command.setBuilder(builder);

/*
* Initialize the writers, and execute the command, close the writers..
*/
System.out.println(MSG_INIT_OUTPUT);
xl = new File(F_OUTPUT_CSV);
xlWriter = new BufferedWriter(new FileWriter(xl));
xlWriter.write(HEADER);
System.out.println(MSG_EXEC_LOG);
client.executeCommand(command, globalOptions);
xlWriter.close();
bufferedReader.close();
System.out.println(MSG_EXIT_SUCCESS);
}
}
}
/**
* The Listener class for the log command to be executed.
*
* @author Shreyas Purohit
*
*/
public static class LogListener extends BasicListener{
private static final String DOT_STR = ".";
/*
* Private static constants.
*/
private static final String BUG_FOLLOWED_SPACE_STR = "Bug ";
private static final String BUG_STR = "Bug";
private static final String SPACE = " ";
private static final String NEW_LINE = "\n";
private static final String DOUBLE_COMMA = ",,";
private static final String COMMA_STR = ",";
private static final String EMPTY_STR = "";
private static final String JAVA_STR = "java";
private static final String SRC_STR = "src";
private static final String BACKSLASH = "\\";
private static final String SRC = "\\src\\";

@Override
public void messageSent(MessageEvent e) {
/*
* Override the super class, so as to prevent from logging to console.
*/
}

@Override
public void fileInfoGenerated(FileInfoEvent fileinfoevent) {
//Handle control to Super class.
super.fileInfoGenerated(fileinfoevent);

//Get the log information for the current file event.
LogInformation infoContainer = (LogInformation) fileinfoevent.getInfoContainer();
try {
//Log to Excel in csv format.
logToExcel(infoContainer);
} catch (IOException e) {
//Just print trace, and try logging the next file event.
e.printStackTrace();
}
}

public void logToExcel(LogInformation info) throws IOException{
System.out.print(NEW_LINE);
System.out.println("------------------Processing---------------------------");
System.out.println("File : " + info.getRepositoryFilename());
System.out.println("Available Revisions...");

String path = info.getFile().getPath();
String strPackage = EMPTY_STR;
/*
* Extract package information from the path information.
*/
if(path.indexOf(SRC) >= 0 ){
String packagePath = path.substring(path.indexOf(SRC));
StringTokenizer tokenizer = new StringTokenizer(packagePath,BACKSLASH);
while(tokenizer.hasMoreTokens()){
String nextToken = tokenizer.nextToken();
if(!nextToken.equalsIgnoreCase(SRC_STR) && !(nextToken.indexOf(JAVA_STR)>=0)){
strPackage += nextToken + DOT_STR;
}
}
if(strPackage != EMPTY_STR){
strPackage = strPackage.substring(0, strPackage.length()-1);
}
}
boolean fileNameWritten = false;

/*
* Process all the revisions for the current file.
*/
List<Revision> revisionList = info.getRevisionList();
for(Revision revision : revisionList){
System.out.print(revision.getNumber() + SPACE);
//Remove new line character from the message else it hinders rendering in the Excel sheet.
String message = (revision.getMessage() != null && revision.getMessage().indexOf(NEW_LINE) >= 0 ? revision.getMessage().replaceAll(NEW_LINE, SPACE) : revision.getMessage());
String bugNumber = null;

/*
* Extract the bug number from the message string if any.
*/
if(message.indexOf(BUG_FOLLOWED_SPACE_STR) >= 0){
StringTokenizer tokenizer = new StringTokenizer(message,SPACE);
while(tokenizer.hasMoreTokens()){
if(BUG_STR.equalsIgnoreCase(tokenizer.nextToken())){
if(bugNumber != null){
bugNumber += SPACE + tokenizer.nextToken();
}else{
bugNumber = tokenizer.nextToken();
}
if(null != bugNumber){
bugNumber = bugNumber.replaceAll(COMMA_STR, SPACE);
}
}
}

}
/*
* Write to CSV, if there exist a bug in the message for this revision.
*/
if(null != bugNumber){
/*
* Write the filename only once, and repeat the revisions for the same file.
*/
if(false == fileNameWritten){
xlWriter.write(info.getFile().getName() + COMMA_STR + strPackage + DOUBLE_COMMA+ NEW_LINE);
fileNameWritten = true;
}
xlWriter.write(EMPTY_STR+ DOUBLE_COMMA + revision.getNumber() + COMMA_STR + (bugNumber != null ? bugNumber : SPACE) + COMMA_STR + message + NEW_LINE);
}
}
}
}
}

Monday, July 27, 2009

A student again!

I quit my company Tavant, and have decided to pursue my higher education, MS in Compuer Science at University of Alabama, Huntsville. So, this blog may not be updated at a great frequency. When I work on some thing cool, will definitely update the Tech details here.

You can follow my other blog if you are interested at http://sprouting-beans.blogspot.com/

Thursday, June 18, 2009

Streaming Data using Flex and XSocket

I have worked on couple of push technologies namely blazeDS and Lightstreamer and always wondered how does it work. I am just concerned about streaming in Flex and not anything else. You can think I am little inclined towards flex, but the truth is Flex rocks, and after I entered the world of Flex, other UI technologies - I don't care anymore! Anyways, this logic of streaming data to Flex from a backend came to me just by chance! It took 2 days effort to get the below crappy thing going! I read a blog on the Internet that used URLStream to load image data in Flex progressively. Now, if you can load image progressively, then I should be able to do the same with any data. So, here I present Flex - XSocket combination to subscribe/load data from backend for a watchlist. There are a few things, that I did not resolve, as I did not have time to dedicate for it. I will explain those at the end.

JAVA

Lets start from the java backend. I use XScoket Xlightweb library to create a NIO server that can take any symbol as subscription and pushes randomly generated quotes to the client.

I will not be explaining XSockets here. Please visit http://xsocket.sourceforge.net/ for tutorials and downloads.

Lets write a request handler. In XLightweb, the request handler implements the IHttpRequestHandler.

public class ERequestHandler implements IHttpRequestHandler{ 
//Timer that generates the quotes at the specified interval
private final Timer timer = new Timer("timer", true);
public static Map<String, SymbolSender> state = new HashMap<String, SymbolSender>();

Lets implement the onRequest method. The same handler will be used to receive commands, like add symbol, remove symbol and also to receive data request, we use the URI to differentiate between the two request.
    public void onRequest(IHttpExchange exchange) throws IOException, 
BadMessageException {
IHttpRequest req = exchange.getRequest();
String ipAddress = exchange.getConnection().getRemoteAddress().getHostAddress();
if (req.getRequestURI().endsWith("/link1")) {//The Action link
String symbol = req.getParameter("symbol");
String command = req.getParameter("action");

Every ipAddress is given its own handler. So, obtain the handler ie SymbolSender and delegate the action to it. The SymbolSender is a private class that is given little below. Before any client sends an action, it has to connect to this server.
            SymbolSender symbolSender = state.get(ipAddress); 
if(command.equalsIgnoreCase("add")){
symbolSender.addSymbol(symbol);
}else if(command.equalsIgnoreCase("delete")){
symbolSender.removeSymbol(symbol);
}else if(command.equalsIgnoreCase("stop")){
symbolSender.stop();
}

The action has been delegated, so close the channel with a 200 Response header as in the below snippet.
            IHttpResponseHeader respHdr = new HttpResponseHeader(200, "text/plain"); 
BodyDataSink outChannel = exchange.send(respHdr);
outChannel.close();
}else if(req.getRequestURI().endsWith("/link2")){//The initial Data link, that is used to send data back.
String command = req.getParameter("action");
Initialize a new SymbolSender if none exist for the given ipAddress and start the handler using the timer. Here, it is scheduled at a fixed rate of every 500ms.

if(!state.keySet().contains(ipAddress) || state.get(ipAddress) == null){
state.put(ipAddress, new SymbolSender(exchange));
}
SymbolSender symbolSender = state.get(ipAddress);
if(command.equalsIgnoreCase("start")){
timer.scheduleAtFixedRate(symbolSender,new Date(), 500);
}
}
}

Lets define the SymbolSender timer task, which is a handler for each ip address separately.
     private class SymbolSender extends TimerTask { 
private IHttpExchange exchange; //The exchange handler
private List<String> symbolList = new ArrayList<String>();
private BodyDataSink outChannel;
private String ipAddress;
public SymbolSender(IHttpExchange exchange) {
this.exchange = exchange;
IHttpResponseHeader respHdr = new HttpResponseHeader(200, "text/plain");
try {
outChannel = this.exchange.send(respHdr);
outChannel.setEncoding("utf-8");
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
ipAddress = exchange.getConnection().getRemoteAddress().getHostAddress();
}

API for adding and removing the subscription symbol from this ipAddress.
        public void addSymbol(String symbol){ 
symbolList.add(symbol);
}

public void removeSymbol(String symbol){
symbolList.remove(symbol);
}

The run method where the random data is generated for the symbols subscribed from this ipAddress, and written to the client.
 @Override 
public void run() {
try {
if(symbolList.size() > 0){
for(String symbol : symbolList){
String data = "|symbol=" + symbol + "#lastPrice=" + randomToTwoDecimal() + "#lastAsk=" + randomToTwoDecimal() + "#lastBid=" + randomToTwoDecimal() + "|\r\n";
outChannel.write(data);
}
}else{
outChannel.write("STAY ALIVE\r\n");
}
} catch (BufferOverflowException e) {
e.printStackTrace();
stop();
} catch (IOException e) {
e.printStackTrace();
stop();
} catch(Exception e){
e.printStackTrace();
stop();
}

}

A simple random way of generating xx.yy number.
        private String randomToTwoDecimal(){ 
Double p1 = Math.random() * 100;
String value1 = String.valueOf(p1);
Double p2 = Math.random() * 100;
String value2 = String.valueOf(p2);
String randNumber = value1.substring(0, value1.indexOf(".")) + "." + value2.substring(0, value2.indexOf("."));
return randNumber;
}

Stopping this handler is achieved by stopping the task, and removing the handler form the state for the ipAddress.
        public void stop(){ 
this.cancel();
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
ERequestHandler.state.remove(ipAddress);
}
}
}

The main class that starts the server is given below.
public class Streamer { 
public static void runServer() throws IOException{
System.setProperty("org.xlightweb.showDetailedError", "true");
Context rootCtx = new Context("");
IHttpRequestHandler handler = new ERequestHandler();
Add the two contexts that this server listens to.

rootCtx.addHandler("/link1",handler );
rootCtx.addHandler("/link2", handler);
// creates the server by passing over the port number & the server handler
HttpServer server = new HttpServer(9980, rootCtx);
ConnectionUtils.start(server);
}
public static void main(String[] args) throws Exception {
runServer();
}
}

The java part for the streaming server is over. Now lets go ahead with the flex part.

FLEX


For the flex part lets create an application.
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" layout="absolute" creationComplete="init()"> 
<mx:Script>
<![CDATA[
import mx.collections.ArrayCollection;
import mx.controls.Alert;

Lets declare two variables, one for streaming data, and another for writing actions to the xsocket server we just wrote above.
            private var dataStream:URLStream = new URLStream();
private var actionStream:URLLoader = new URLLoader();

The string that we will be getting from the backend is appended here.
private var dataStr:String = ""; 

Lets declare the remaining things i.e a dataprovider, dictionary to keep map of symbol to object and a timer that will be used to parse data and create objects for data provider.
[Bindable] 
private var watchListData:ArrayCollection = new ArrayCollection();
private var symbolObjectMap:Dictionary = new Dictionary();
private static var timer:Timer = new Timer(60);

Lets write the init method thats called on creation complete. We will add a progress listener, that will be really useful to us and the remaining listeners that is just for time pass.
private function init():void{                
dataStream.addEventListener(ProgressEvent.PROGRESS, streamListener);
dataStream.addEventListener(Event.COMPLETE, listener);
dataStream.addEventListener(Event.ACTIVATE, listener);
dataStream.addEventListener(Event.DEACTIVATE, listener);
dataStream.addEventListener(Event.OPEN, listener);
dataStream.addEventListener(HTTPStatusEvent.HTTP_STATUS, listener);
dataStream.addEventListener(IOErrorEvent.IO_ERROR, listener);
dataStream.addEventListener(SecurityErrorEvent.SECURITY_ERROR, listener);
timer.addEventListener(TimerEvent.TIMER, parse, false, 0, true);
timer.start();
}

The stream listener for the data is given below. Very simple logic, just read the multi byte which is already written in form of utf-8 by the server and append it to the data string. Now, this stream listener is called by Flex when there is some specific amount of data is present. And, it does not make sense for me to parse the data here to create objects at that rate. Instead as you will see next code, the timer will invoke the parse method every 60ms and then parse the data in the data string to an appropriate VO.
private function streamListener(event:ProgressEvent):void{ 
if(dataStream.bytesAvailable == 0)return;
if(dataStream.connected){
while(dataStream.bytesAvailable >= 1){
dataStr = dataStr + dataStream.readMultiByte(dataStream.bytesAvailable, "utf-8");
}
trace(dataStr);
}
}

Now, the timepass listener is here. Comment the alerts if you do not want, or use trace instead.
private function listener(event:Event):void{ 
Alert.show("Here at other side of the world :(");
}

Now, here is the parse method for the data I stream from the server. Actually, I do not like to call it streaming, as it is just a normal read and write to streams. Heck! that's why we call it streaming. He he..
private function parse(event:TimerEvent):void{ 
if(this.dataStr != null
&& this.dataStr.indexOf("|") >=0){
var skipLast:Boolean = false;
var streamData:Array = this.dataStr.split("\r\n");

Find out if the data is written partially. That means if its not ending with "|". If that is the case then keep the last piece of string in the data string for appending with the next data arrived from the backend. This will be parsed after the data has arrived and 60ms time lag.
if(String(streamData[streamData.length-1]).charAt(streamData.length-1) != "|"){ 
dataStr = String(streamData[streamData.length-1]);
skipLast = true;
streamData[streamData.length-1] = "STAY ALIVE";
}

Parse all the data other than "STAY ALIVE" pings. I am not sure if you can stop sending STAY ALIVE or delay its frequency. And, I am not doing that R & D either. My purpose remains just of stream data to Flex. Only the positive use case :) The rest of the for loop is just parsing data, creating or updating the StreamerVO object.
for each(var string:String in streamData){ 
if(string != "STAY ALIVE"){
string = string.substring(1,string.length-1);
var nodes:Array = string.split("#");
var symbolAttribute:String = String(nodes[0]);
var symbolData:Array = symbolAttribute.split("=");
if(String(symbolData[0]) == "symbol"){
var symbol:String = String(symbolData[1]);
if(!symbolObjectMap.hasOwnProperty(symbol)){
symbolObjectMap[symbol] = new StreamerVO();
watchListData.addItem(symbolObjectMap[symbol]);
}
var vo:StreamerVO = StreamerVO(symbolObjectMap[symbol]);
for each(var node:String in nodes){
var nodeData : Array = node.split("=");
vo[nodeData[0]] = nodeData[1];
}
}
}
}
}
}

Next lets write the 4 actions supported namely add, delete, start and stop. The url's are in sync with the actions supported by respective links given in the backend code. The get parameters are constructed manually as shown below.
            private function subscribe():void{ 
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=add&symbol=" + txtSymbol.text;
actionStream = new URLLoader();
actionStream.load(urlReq);
}

private function unSubscribe():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=delete&symbol=" + txtSymbol.text;
actionStream = new URLLoader();
actionStream.load(urlReq);
}
private function connect():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link2");
urlReq.url += "?action=start";
dataStream.load(urlReq);
}
private function disconnect():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=stop";
actionStream = new URLLoader();
actionStream.load(urlReq);
}

Now, lets create the view or UI to do the job. I don't think any explanation is needed for this part, as it cant get simpler than this.
 ]]> 
</mx:Script>
<mx:VBox>
<mx:TextInput id="txtSymbol"/>
<mx:HBox>
<mx:Button label="Connect" click="connect()"/>
<mx:Button label="DisConnect" click="disconnect()"/>
<mx:Button label="Subscribe" click="subscribe()"/>
<mx:Button label="UnSubscribe" click="unSubscribe()"/>
</mx:HBox>
<mx:DataGrid id="watchList" dataProvider="{watchListData}">
<mx:columns>
<mx:DataGridColumn headerText="Symbol" dataField="symbol"/>
<mx:DataGridColumn headerText="LastPrice" dataField="lastPrice"/>
<mx:DataGridColumn headerText="LastAsk" dataField="lastAsk"/>
<mx:DataGridColumn headerText="LastBid" dataField="lastBid"/>
</mx:columns>
</mx:DataGrid>
</mx:VBox>
</mx:Application>

And, that's all folks. The backed is writing the data to the channel, and front end flex is reading it and displaying. Well, there are few bugs in this code I presented.

The data does not start to flow to the front end for initial 20 sec or so. Try to connect and subscribe say 3-4 symbols IBM, GOOG, MSFT, CSCO soon. I guess the problem is with the Operating system buffer because even when I said flush manually in the XSocket code (Application buffer) no effect could be seen or the problem could exist in Flex also, I am not sure. As I said earlier, my goal is not to solve these problems but to rather get streaming up, which gets up after a initial delay which is fine with me right now.

I had to use two links so that I could add symbols to be subscribed with out re-connecting to get the data that is being pushed. I could not figure out a way to do it using a single link. You guys can give a shot and let me know. I leave this here for now, as I am in no mood of writing a complete infrastructure for push when several exists.

Below is a small video of the Flex UI using the code above showing streaming data.


Wednesday, June 17, 2009

Open Mbeans Tutorial

Writing an MBean in java is very simple. But, when you want the return type to a table of data(say, some metrics) then simple Mbeans are not of much use. One way to get table of data as a return type is using a Open Mbeans. This tutorial gives a simple example of how to write an Open Mbean that returns Table of data.

Step 1: Define a dynamic MXBean by implementing interface "DynamicMBean".
public class QuoteMXBean implements DynamicMBean { 
public Object getAttribute(String attribute) throws AttributeNotFoundException,
MBeanException, ReflectionException{
//TODO
}
public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
InvalidAttributeValueException, MBeanException, ReflectionException{
//TODO
}
public AttributeList getAttributes(String[] attributes){
//TODO
}
public AttributeList setAttributes(AttributeList attributes){
//TODO
}
public Object invoke(String actionName, Object params[], String signature[])
throws MBeanException, ReflectionException{
//TODO
}
public MBeanInfo getMBeanInfo(){
//TODO
}
}

Step 2:
A table is made up of rows. And each row is made up of columns. We will have to give exactly this description defining a table.
String[] itemNames = { "symbol", "localTimeStamp", "latestResponse"}; 
String[] itemDescriptions = { "Symbol",
"The Timestamp for the response as on server", "The response String from Quoteserver"};
OpenType[] itemTypes = { SimpleType.STRING,
SimpleType.STRING, SimpleType.STRING};
CompositeType snapshotType = new CompositeType("snapshot", "Quote Data",
itemNames, itemDescriptions, itemTypes);
All the variables can be made private and initialized either in a static block or in the constructor. The CompositeType just defined that each row will contain the columns "symbol", "localTimeStamp", "latestResponse". Now lets define the table type it self.
String[] index = { "symbol" }; 
TabularType quoteTableType = new TabularType("quoteSnapshots",
"List of Quotes", snapshotType,
index);
That is, a table type "quoteTableType" is a collection of rows given by Composite type "snapshotType". The "index" gives, the unique identification for the table.


Step 3:
Lets declare the table with type "quoteTableType".
private TabularDataSupport quoteSnapshot = new TabularDataSupport(quoteTableType); 

Please add the getter and setters for the same.


Step 4:
Lets implement getMBeanInfo().
OpenMBeanInfoSupport info; 
OpenMBeanAttributeInfoSupport[] attributes = new OpenMBeanAttributeInfoSupport[1];
OpenMBeanConstructorInfoSupport[] constructors = new OpenMBeanConstructorInfoSupport[1];
OpenMBeanOperationInfoSupport[] operations = new OpenMBeanOperationInfoSupport[4];
MBeanNotificationInfo[] notifications = new MBeanNotificationInfo[0];

//Just one attribute
attributes[0] = new OpenMBeanAttributeInfoSupport("QuoteSnapshot",
"Table of quotes data", quoteSnapshotType, true, false,
false);
//No arg constructor
constructors[0] = new OpenMBeanConstructorInfoSupport(
"QuoteMXBean",
"Constructs a QuoteMXBean instance.",
new OpenMBeanParameterInfoSupport[0]);

//Just one operation

OpenMBeanParameterInfo[] params = new OpenMBeanParameterInfoSupport[0];
operations[0] = new OpenMBeanOperationInfoSupport(
"resetAndGetQuoteSnapshot",
"Reset and get the latest available data for the Quotes",
params, quoteSnapshotType,
MBeanOperationInfo.INFO);
//Build the info

info = new OpenMBeanInfoSupport(this .getClass().getName(),
"Quote - Open - MBean", attributes, constructors,
operations, notifications);

Step 5:
Implement getAttribute() and getAttributes().
 public Object getAttribute(String attributeName) throws AttributeNotFoundException, 
MBeanException, ReflectionException {
if (attributeName.equals("QuoteSnapshot")) {
return (TabularData)getQuoteSnapshot();
}
throw new AttributeNotFoundException("Cannot find "
+ attributeName + " attribute ");
}
public AttributeList getAttributes(String[] attributeNames) {
AttributeList resultList = new AttributeList();
if (attributeNames.length == 0)
return resultList;
for (int i = 0; i < attributeNames.length; i++) {
try {
Object value = getAttribute(attributeNames[i]);
resultList.add(new Attribute(attributeNames[i], value));
} catch (Exception e) {
e.printStackTrace();
}
}
return (resultList);
}

Step 6:
Implement setAttribute().
There is nothing to set in this example.
    public void setAttribute(Attribute arg0) throws AttributeNotFoundException, 
InvalidAttributeValueException, MBeanException, ReflectionException {
throw new AttributeNotFoundException(
"No attribute can be set in this MBean");
}

@Override
public AttributeList setAttributes(AttributeList arg0) {
return new AttributeList();
}

Step 7:
Implement invoke().
public Object invoke(String operationName, Object[] params, String[] signature) 
throws MBeanException, ReflectionException {
if (operationName.equals("resetAndGetQuoteSnapshot")) {
try {
return buildSnapshot(); //This is where you delegate the invokation to your original method.
} catch (OpenDataException e) {
throw new MBeanException(e, "invoking resetAndGetQuoteSnapshot: "
+ e.getClass().getName() + "caught ["
+ e.getMessage() + "]");
}
}
else{
throw new ReflectionException(new NoSuchMethodException(
operationName), "Cannot find the operation "
+ operationName);
}
}
public Object buildSnapshot() throws OpenDataException {
quoteSnapshot = new TabularDataSupport(quoteTableType);
SomeVo vo = //get the data
Object[] itemValues = { vo.symbol, vo.localDate, vo.response};
CompositeData result = new CompositeDataSupport(snapshotType,
itemNames, itemValues);
quoteSnapshot.put(result);
return quoteSnapshot;
}

This is all is needed to build an Open MBean that supports a Tabular data as output. You can use jconsole and view the output. Obviously, you have to deploy the MBean in mbean server. I use spring to do that job. The spring XML is given below.
<bean id="exporter" class="org.springframework.jmx.export.MBeanExporter" lazy-init="false"> 
<property name="beans">
<map>
<entry key="com.ssb.mbean:type=QuoteMXBean" value-ref="quoteMXBean"/>
</map>
</property>
</bean>
<bean id="quoteMXBean" class="com.ssb.mbean.QuoteMXBean"/>
<bean id="registry" class="org.springframework.remoting.rmi.RmiRegistryFactoryBean"
p:port="9999" />
<bean id="serverConnector" class="org.springframework.jmx.support.ConnectorServerFactoryBean"
p:objectName="connector:name=rmi"
p:serviceUrl="service:jmx:rmi://localhost/jndi/rmi://localhost:9999/quotesconnector"
p:threaded="true"
p:daemon="true"/>

The above example is trying to display the latest quote data that is available to the system.

Saturday, April 04, 2009

Lightstreamer : A tutorial for java Adapters and Flex clients : Part 2

I know i am posting this second part pretty late, but can't help, have loads of work at hand. So, here goes the next part of the Lightstreamer tutorial which explains the Flex part. Remember, the flex client swc is available only if you download the evaluation version of the Lightstreamer. This is not packaged with the Moderato version.

This tutorial will explain how to use the native AS3 Flex client lib to subscribe data from the Adapter that I present in the previous post. Strictly speaking, this post will just provide a beginning to LS, we have developed a complete framework on the Flex side which is being used by around 9 modules transparently with out the knowledge of existence of LS. Well, I will not and I am not supposed to provide any more details about it.

Change the port at which the LS server runs. Edit the $LS_HOME/conf/lightstreamer_conf.xml. Change <port>8080</port> to <port>8787</port> in the HTTP server configuration present. Restart the server.

Step 1:

Use the LSClient to connect to the LS server which has the adapter deployed.
var lsClient:LSClient = new LSClient(); 
lsClient.addEventListener(StatusChangeEvent.STATUS_CHANGE,onLSStatusChange); // The LS sends status changes. The method onLSStatusChange handles it.

var connectionInfo:ConnectionInfo = new ConnectionInfo();
connectionInfo.setServer("localhost");// Place where the server is running
connectionInfo.setAdapter("MyJavaAdapter"); // This is the name of the adapter we configured in the adapters.xml
connectionInfo.setControlPort("8787");// The port we configured above.
connectionInfo.setPort("8787");// The port we configured above.
connectionInfo.setControlProtocol("http");// The protocol to use http or https. For quotes data, http is preferred.
connectionInfo.setProtocol("http");

var connectionPolicy:ConnectionPolicy = new ConnectionPolicy(); //These are more configurations, as needed by the application.
connectionPolicy.setTimeoutForStalled(2);
connectionPolicy.setTimeoutForReconnect(15);
connectionPolicy.setRetryTimeout(5);

//Connect to Lightstreamer Server
try {
lsClient.openConnection(connectionInfo,connectionPolicy);
} catch(e:Error) {
Alert.show(e.message);
}

public function onLSStatusChange(event:StatusChangeEvent):void{
trace(event.previousStatus + " => " + event.status);
//The status can change as below. You can display a popup or some indicators to the user showing what is happening.
/* "DISCONNECTED" => "CONNECTING"
"CONNECTING" => "STREAMING"
"STREAMING" => "CONNECTING"
"STALLED" => "CONNECTING"
"CONNECTING" => "POLLING"
/*
}

Step 2:
Subscribe to the data. You can use Visual or NonVisual table. We use only the NonVisual table. ok, I am sure you did not understand what is a table. In a simple term, a table contains rows and columns. In LS term, the columns are the items you subscribe for. For e.g. LastPrice. The rows are the items you subscribe. For E.g. Quote data for IBM. So, each row is a unique subscription for the items present as columns. Non-Visual table is not a component that will be displayed on the UI. You handle the display yourself. I feel this is more flexible and powerful than Visual Table.
var nonVisualTable:NonVisualTable = new NonVisualTable(new Array("IBM"),new Array("LastAsk","LastBid","LastPrice","Symbol"),"MERGE"); 
nonVisualTable.setDataAdapter("MYJAVAADAPTER");
nonVisualTable.setSnapshotRequired(false);
nonVisualTable.addEventListener(NonVisualItemUpdateEvent.NON_VISUAL_ITEM_UPDATE,handleUpdate);
nonVisualTable.setRequestedMaxFrequency(1);
lsClient.subscribeTable(nonVisualTable);

Ok, here I will explain some terms.
The last param in the constructor arg of NonVisualTable is the management mode. There are 4 types, namely, RAW, DISTINCT, MERGE and COMMAND. You can look at documentation for detailed explanation of all these.
In simple terms, if your data is constantly changing like quotes, and LS sends only those data that have changed. The data that is not changed is not sent and its state is maintained as previous. i.e the last unchanged value. This type of management of data is called MERGE mode.
If you need the data as being generated, i.e each of the data should be pushed to the front end, like news data, then you use the DISTINCT mode.
RAW mode is similar to Distinct. Each and every data is sent to the client. I am not so sure, but the only difference the document says is, in case of DISTINCT not all the events be sent to the client.
COMMAND is more complicated and has better performance, please read the documentation for it. I have used MERGE and DISTINCT mode till now successfully.

The DataAdapter is the one you configured in adapters.xml, in our case "MYJAVAADAPTER".
You can request a snapshot if the data-adapter supports one.

The requested max frequency limits the updates I get from the LS server. I do not need more than 1 update per item per second. So, its 1 in my case. Make sure you use this with MERGE mode only.

handleUpdate gets the updates from the LS. It is given below. The code is self explanatory.
public function handleUpdate(event:NonVisualItemUpdateEvent):void{ 
var eventType:String = event.item; //This provides the update event name, in our case IBM.
trace(extractFieldData(event,"LastAsk"));
trace(extractFieldData(event,"LastBid"));
trace(extractFieldData(event,"LastPrice"));
}
private function extractFieldData(event:NonVisualItemUpdateEvent,field:*):String {
var value:String;
if (event.isFieldChanged(field)) {
value = event.getFieldValue(field);
} else {
value = event.getOldFieldValue(field);
}
return value;
}

Step 3: When all done, please un-subscribe the table.
lsClient.unsubscribeTable(nonVisualTable); 

This is all you need to get going on the Flex side with LS. We started from here and you wont believe where we have reached. The LS forums is really really helpful. Please post any queries there, I have always got sensible and useful replies. Try going through the lightstreamer_conf.xml, there are a lot of things you can configure there. Any developer with some experience will understand it. If you need more help, just go to the forums.

This will end the Lightstreamer tutorials from my side. Try this technology, am sure you will love it...

Monday, February 09, 2009

Lightstreamer : A tutorial for java Adapters and Flex clients : Part 1

I think there is shortage of tutorials on web related to Lightstreamer, which is one awesome technology. Initially you may become over whelmed at the terminologies and trying to understand each and every part of it. I use lightstreamer as a part of architecture of a trading platform. I have used it pretty extensively within limits for the application. Hence, I would like to give this tutorial for people who wants to use this push software in there app's. I will cover the concepts with some amount of examples that will make ME understand Lightstreamer. :)

The basic requirement to understand this tutorial is, you need to know the concept of push technology first. I have used BlazeDS Messaging initially, so, may be if you are a Flex developer, first check out BlazeDS messaging. Then understand the concept of Push technology. Then, come to this tutorial to make more sense out of it.

In Lightstreamer, there are two parts for the push (I will call it 'streaming') to work. A backend and a front end. The backend is explained in java and front end is explained in Flex.

This 1st part of Tutorial is about Java Backend. I will explain the Flex Client side UI in the next part of the Tutorial.

Installation:

Download the zip from http://www.lightstreamer.com/. Extract it in a directory which I will call $LS_HOME, and read the GETTING_STARTED.txt in $LS_HOME. Its neatly and clearly written. There should not be a problem in understanding it. I would suggest you download a trial version than using the Moderato, the free one. And, Also buy it if you find it very useful :) Anyway's, its valid for 60 days, which is more than enough for you to evaluate it and understand its capability.

The Backend:

Structure: The first thing is, where do you put the code that has been written and how do you put it. There are two approaches.
First one is, compile your java code, get the .class files, and put it under $LS_HOME/adapters/myadapter/classes.
Second one is, create a jar out of your class files, and put it in $LS_HOME/adapters/myadapter/lib.

Create the two folders myadapter and classes/lib as needed by your approach.

Once the structure is ready, copy the adapters.xml from the samples given in $LS_HOME/adapters/Demo.

So, what is Adapter? Simplest term, this is what is responsible for pushing your data to your UI.

Lets write an adapter. Also, there are three more terminologies which we need to know.

    2. itemName: This is any String that can be used to uniquely identify the type of data that has been requested by the client to the server. The server, i.e this adapter should understand the data that client from UI is requesting to be pushed. In the example below, it is as simple as Symbol IBM, that the server pushes data to UI.
    3. ItemEventListener: This is the interface defined by Lightstreamer(LS) that is instantiated by LS, when the LS server is started. This has a method 'update', that is used to push data to the UI.
    4. Snapshot: If the data is not available to be pushed, but a snapshot of it is available then the isSnapShotAvailable method can return true, and the itemEvenListener can send true in the last arguments for snapshots. The UI can request for a snapshot if it needs it. In trading sample app that I give below, a snapshot is not of much use, so, is defined as false.


And, this is it, we are good to go for an example.

Its very simple, create a Class MyJavaAdapter and implement com.lightstreamer.interfaces.data.DataProvider. That is it.. :)

Below is a simple adapter, that can be used generically with any Class that extends DataGenerator abstract class and is capable of generating data from any source, either database or from another server that pumps data over a network. You can define your own implementation.

In the code sample below, the MyJavaAdapter delegates its work to any class that extends DataGenerator. DataGenerator, does the job of setting and working with Listener given by the LS. Any class, that extends the DataGenerator is hidden from the intricacies of the LS. The push method in the DataGenerator does the actual job of pushing data to the UI. The data is sent in form of a key=value pair. The extended class, has to worry just about its business logic, i.e, where to get data from and push it to the server by converting it to a map. The sample below is a very very simple design, and can be highly modified/replaced to make it more generic and better, I hope you get the point.

public abstract class DataGenerator{ 
private ItemEventListener listener

public void init();
public boolean isSnapshotAvailable(String arg0)
throws SubscriptionException {
return false;
}

public final void setListener(ItemEventListener listener) {
this.listener = listener;
}

public void subscribe(String itemName, boolean arg1);

public void unsubscribe(String itemName);
public void push(String itemName,Map<String, String> values, boolean isSnapShot){
listener.update(itemName , values, isSnapShot);
}
}

The sample QuotesStubber that extends DataGenerator is given below. It uses timer to generate data at regular interval and push it to the front end. The push is happening in the run method if you observe.
public class QuotesStubber extends DataGenerator { 
private static final Timer dispatcher = new Timer();
private List<String> subscribedSymbols;
public void init(){
subscribedSymbols = new ArrayList<String>();
run();
}
public void subscribe(String symbol, boolean arg1){
subscribedSymbols.add(symbol);
}
public void unsubscribe(String symbol){
subscribedSymbols.remove(removalIndex);
}
private void run(){
dispatcher.scheduleAtFixedRate(new TimerTask(){
public void run(){
for(int i=0; i < subscribedSymbols.size(); i++){
String req = subscribedSymbols.get(i);
Map<String,String> response = generateQuotesResponse(req);
push(req,response,false)
if(i >= subscribedSymbols.size()){
i = 0;
}
}
}
private Map<String,String> generateQuotesResponse(String symbol) {
Map<String,String> response = new HashMap<String,String>();
response.put("LastAsk",getRandom(2,2));
response.put("LastBid",getRandom(2,2));
response.put("LastPrice",getRandom(2,2));
response.put("Symbol",symbol);
return response;
}
private String getRandom(int digitB4Decimal, int digitAfterDecimal) {
String sRand = null;
Random random = new Random();
String sB4Decimal = String.valueOf(Math.abs(random.nextLong())).substring(0, digitB4Decimal);
String sAfter4Decimal = String.valueOf(Math.abs(random.nextLong())).substring(0, digitB4Decimal);
sRand = sB4Decimal + "." + sAfter4Decimal;
return sRand;
}
}, 0, 500);
}
}

The Adapter is below, this is responsible for interactions with the DataGenerator given above. The methods defined here is called by LS server at appropriate times. The reference for the datagenerator in the adapter below can be either injected or instantiated as given in comment.
public class MyJavaAdapter implements DataProvider{ 

private ItemEventListener listener;
private DataGenerator generator;//injected using spring or can be instantiated in the init method below
public void init(Map arg0, File arg1) throws DataProviderException {
generator.init();
}
public boolean isSnapshotAvailable(String arg0)
throws SubscriptionException {
generator.isSnapshotAvailable(arg0);
}
public void setListener(ItemEventListener listener) {
generator.setListener(listener);
}
public void subscribe(String itemName, boolean arg1)
throws SubscriptionException, FailureException {
generator.subscribe(itemName,arg1);
}
public void unsubscribe(String itemName) throws SubscriptionException,
FailureException {
generator.unsubscribe(itemName,arg1);
}
}

Finally, open adapter.xml that you copied before, and define a dataprovider as:
<data_provider name="MYJAVAADAPTER"> 
<adapter_class>MyJavaAdapter</adapter_class>
</data_provider>

You will use the 'name' attribute defined here from the Flex UI when you start requesting data to be pushed to it.
The Flex part of it will be provided in my next post/tutorial as a continuation to this one.