Pages

Thursday, June 18, 2009

Streaming Data using Flex and XSocket

I have worked on couple of push technologies namely blazeDS and Lightstreamer and always wondered how does it work. I am just concerned about streaming in Flex and not anything else. You can think I am little inclined towards flex, but the truth is Flex rocks, and after I entered the world of Flex, other UI technologies - I don't care anymore! Anyways, this logic of streaming data to Flex from a backend came to me just by chance! It took 2 days effort to get the below crappy thing going! I read a blog on the Internet that used URLStream to load image data in Flex progressively. Now, if you can load image progressively, then I should be able to do the same with any data. So, here I present Flex - XSocket combination to subscribe/load data from backend for a watchlist. There are a few things, that I did not resolve, as I did not have time to dedicate for it. I will explain those at the end.

JAVA

Lets start from the java backend. I use XScoket Xlightweb library to create a NIO server that can take any symbol as subscription and pushes randomly generated quotes to the client.

I will not be explaining XSockets here. Please visit http://xsocket.sourceforge.net/ for tutorials and downloads.

Lets write a request handler. In XLightweb, the request handler implements the IHttpRequestHandler.

public class ERequestHandler implements IHttpRequestHandler{ 
//Timer that generates the quotes at the specified interval
private final Timer timer = new Timer("timer", true);
public static Map<String, SymbolSender> state = new HashMap<String, SymbolSender>();

Lets implement the onRequest method. The same handler will be used to receive commands, like add symbol, remove symbol and also to receive data request, we use the URI to differentiate between the two request.
    public void onRequest(IHttpExchange exchange) throws IOException, 
BadMessageException {
IHttpRequest req = exchange.getRequest();
String ipAddress = exchange.getConnection().getRemoteAddress().getHostAddress();
if (req.getRequestURI().endsWith("/link1")) {//The Action link
String symbol = req.getParameter("symbol");
String command = req.getParameter("action");

Every ipAddress is given its own handler. So, obtain the handler ie SymbolSender and delegate the action to it. The SymbolSender is a private class that is given little below. Before any client sends an action, it has to connect to this server.
            SymbolSender symbolSender = state.get(ipAddress); 
if(command.equalsIgnoreCase("add")){
symbolSender.addSymbol(symbol);
}else if(command.equalsIgnoreCase("delete")){
symbolSender.removeSymbol(symbol);
}else if(command.equalsIgnoreCase("stop")){
symbolSender.stop();
}

The action has been delegated, so close the channel with a 200 Response header as in the below snippet.
            IHttpResponseHeader respHdr = new HttpResponseHeader(200, "text/plain"); 
BodyDataSink outChannel = exchange.send(respHdr);
outChannel.close();
}else if(req.getRequestURI().endsWith("/link2")){//The initial Data link, that is used to send data back.
String command = req.getParameter("action");
Initialize a new SymbolSender if none exist for the given ipAddress and start the handler using the timer. Here, it is scheduled at a fixed rate of every 500ms.

if(!state.keySet().contains(ipAddress) || state.get(ipAddress) == null){
state.put(ipAddress, new SymbolSender(exchange));
}
SymbolSender symbolSender = state.get(ipAddress);
if(command.equalsIgnoreCase("start")){
timer.scheduleAtFixedRate(symbolSender,new Date(), 500);
}
}
}

Lets define the SymbolSender timer task, which is a handler for each ip address separately.
     private class SymbolSender extends TimerTask { 
private IHttpExchange exchange; //The exchange handler
private List<String> symbolList = new ArrayList<String>();
private BodyDataSink outChannel;
private String ipAddress;
public SymbolSender(IHttpExchange exchange) {
this.exchange = exchange;
IHttpResponseHeader respHdr = new HttpResponseHeader(200, "text/plain");
try {
outChannel = this.exchange.send(respHdr);
outChannel.setEncoding("utf-8");
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
ipAddress = exchange.getConnection().getRemoteAddress().getHostAddress();
}

API for adding and removing the subscription symbol from this ipAddress.
        public void addSymbol(String symbol){ 
symbolList.add(symbol);
}

public void removeSymbol(String symbol){
symbolList.remove(symbol);
}

The run method where the random data is generated for the symbols subscribed from this ipAddress, and written to the client.
 @Override 
public void run() {
try {
if(symbolList.size() > 0){
for(String symbol : symbolList){
String data = "|symbol=" + symbol + "#lastPrice=" + randomToTwoDecimal() + "#lastAsk=" + randomToTwoDecimal() + "#lastBid=" + randomToTwoDecimal() + "|\r\n";
outChannel.write(data);
}
}else{
outChannel.write("STAY ALIVE\r\n");
}
} catch (BufferOverflowException e) {
e.printStackTrace();
stop();
} catch (IOException e) {
e.printStackTrace();
stop();
} catch(Exception e){
e.printStackTrace();
stop();
}

}

A simple random way of generating xx.yy number.
        private String randomToTwoDecimal(){ 
Double p1 = Math.random() * 100;
String value1 = String.valueOf(p1);
Double p2 = Math.random() * 100;
String value2 = String.valueOf(p2);
String randNumber = value1.substring(0, value1.indexOf(".")) + "." + value2.substring(0, value2.indexOf("."));
return randNumber;
}

Stopping this handler is achieved by stopping the task, and removing the handler form the state for the ipAddress.
        public void stop(){ 
this.cancel();
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
ERequestHandler.state.remove(ipAddress);
}
}
}

The main class that starts the server is given below.
public class Streamer { 
public static void runServer() throws IOException{
System.setProperty("org.xlightweb.showDetailedError", "true");
Context rootCtx = new Context("");
IHttpRequestHandler handler = new ERequestHandler();
Add the two contexts that this server listens to.

rootCtx.addHandler("/link1",handler );
rootCtx.addHandler("/link2", handler);
// creates the server by passing over the port number & the server handler
HttpServer server = new HttpServer(9980, rootCtx);
ConnectionUtils.start(server);
}
public static void main(String[] args) throws Exception {
runServer();
}
}

The java part for the streaming server is over. Now lets go ahead with the flex part.

FLEX


For the flex part lets create an application.
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" layout="absolute" creationComplete="init()"> 
<mx:Script>
<![CDATA[
import mx.collections.ArrayCollection;
import mx.controls.Alert;

Lets declare two variables, one for streaming data, and another for writing actions to the xsocket server we just wrote above.
            private var dataStream:URLStream = new URLStream();
private var actionStream:URLLoader = new URLLoader();

The string that we will be getting from the backend is appended here.
private var dataStr:String = ""; 

Lets declare the remaining things i.e a dataprovider, dictionary to keep map of symbol to object and a timer that will be used to parse data and create objects for data provider.
[Bindable] 
private var watchListData:ArrayCollection = new ArrayCollection();
private var symbolObjectMap:Dictionary = new Dictionary();
private static var timer:Timer = new Timer(60);

Lets write the init method thats called on creation complete. We will add a progress listener, that will be really useful to us and the remaining listeners that is just for time pass.
private function init():void{                
dataStream.addEventListener(ProgressEvent.PROGRESS, streamListener);
dataStream.addEventListener(Event.COMPLETE, listener);
dataStream.addEventListener(Event.ACTIVATE, listener);
dataStream.addEventListener(Event.DEACTIVATE, listener);
dataStream.addEventListener(Event.OPEN, listener);
dataStream.addEventListener(HTTPStatusEvent.HTTP_STATUS, listener);
dataStream.addEventListener(IOErrorEvent.IO_ERROR, listener);
dataStream.addEventListener(SecurityErrorEvent.SECURITY_ERROR, listener);
timer.addEventListener(TimerEvent.TIMER, parse, false, 0, true);
timer.start();
}

The stream listener for the data is given below. Very simple logic, just read the multi byte which is already written in form of utf-8 by the server and append it to the data string. Now, this stream listener is called by Flex when there is some specific amount of data is present. And, it does not make sense for me to parse the data here to create objects at that rate. Instead as you will see next code, the timer will invoke the parse method every 60ms and then parse the data in the data string to an appropriate VO.
private function streamListener(event:ProgressEvent):void{ 
if(dataStream.bytesAvailable == 0)return;
if(dataStream.connected){
while(dataStream.bytesAvailable >= 1){
dataStr = dataStr + dataStream.readMultiByte(dataStream.bytesAvailable, "utf-8");
}
trace(dataStr);
}
}

Now, the timepass listener is here. Comment the alerts if you do not want, or use trace instead.
private function listener(event:Event):void{ 
Alert.show("Here at other side of the world :(");
}

Now, here is the parse method for the data I stream from the server. Actually, I do not like to call it streaming, as it is just a normal read and write to streams. Heck! that's why we call it streaming. He he..
private function parse(event:TimerEvent):void{ 
if(this.dataStr != null
&& this.dataStr.indexOf("|") >=0){
var skipLast:Boolean = false;
var streamData:Array = this.dataStr.split("\r\n");

Find out if the data is written partially. That means if its not ending with "|". If that is the case then keep the last piece of string in the data string for appending with the next data arrived from the backend. This will be parsed after the data has arrived and 60ms time lag.
if(String(streamData[streamData.length-1]).charAt(streamData.length-1) != "|"){ 
dataStr = String(streamData[streamData.length-1]);
skipLast = true;
streamData[streamData.length-1] = "STAY ALIVE";
}

Parse all the data other than "STAY ALIVE" pings. I am not sure if you can stop sending STAY ALIVE or delay its frequency. And, I am not doing that R & D either. My purpose remains just of stream data to Flex. Only the positive use case :) The rest of the for loop is just parsing data, creating or updating the StreamerVO object.
for each(var string:String in streamData){ 
if(string != "STAY ALIVE"){
string = string.substring(1,string.length-1);
var nodes:Array = string.split("#");
var symbolAttribute:String = String(nodes[0]);
var symbolData:Array = symbolAttribute.split("=");
if(String(symbolData[0]) == "symbol"){
var symbol:String = String(symbolData[1]);
if(!symbolObjectMap.hasOwnProperty(symbol)){
symbolObjectMap[symbol] = new StreamerVO();
watchListData.addItem(symbolObjectMap[symbol]);
}
var vo:StreamerVO = StreamerVO(symbolObjectMap[symbol]);
for each(var node:String in nodes){
var nodeData : Array = node.split("=");
vo[nodeData[0]] = nodeData[1];
}
}
}
}
}
}

Next lets write the 4 actions supported namely add, delete, start and stop. The url's are in sync with the actions supported by respective links given in the backend code. The get parameters are constructed manually as shown below.
            private function subscribe():void{ 
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=add&symbol=" + txtSymbol.text;
actionStream = new URLLoader();
actionStream.load(urlReq);
}

private function unSubscribe():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=delete&symbol=" + txtSymbol.text;
actionStream = new URLLoader();
actionStream.load(urlReq);
}
private function connect():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link2");
urlReq.url += "?action=start";
dataStream.load(urlReq);
}
private function disconnect():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=stop";
actionStream = new URLLoader();
actionStream.load(urlReq);
}

Now, lets create the view or UI to do the job. I don't think any explanation is needed for this part, as it cant get simpler than this.
 ]]> 
</mx:Script>
<mx:VBox>
<mx:TextInput id="txtSymbol"/>
<mx:HBox>
<mx:Button label="Connect" click="connect()"/>
<mx:Button label="DisConnect" click="disconnect()"/>
<mx:Button label="Subscribe" click="subscribe()"/>
<mx:Button label="UnSubscribe" click="unSubscribe()"/>
</mx:HBox>
<mx:DataGrid id="watchList" dataProvider="{watchListData}">
<mx:columns>
<mx:DataGridColumn headerText="Symbol" dataField="symbol"/>
<mx:DataGridColumn headerText="LastPrice" dataField="lastPrice"/>
<mx:DataGridColumn headerText="LastAsk" dataField="lastAsk"/>
<mx:DataGridColumn headerText="LastBid" dataField="lastBid"/>
</mx:columns>
</mx:DataGrid>
</mx:VBox>
</mx:Application>

And, that's all folks. The backed is writing the data to the channel, and front end flex is reading it and displaying. Well, there are few bugs in this code I presented.

The data does not start to flow to the front end for initial 20 sec or so. Try to connect and subscribe say 3-4 symbols IBM, GOOG, MSFT, CSCO soon. I guess the problem is with the Operating system buffer because even when I said flush manually in the XSocket code (Application buffer) no effect could be seen or the problem could exist in Flex also, I am not sure. As I said earlier, my goal is not to solve these problems but to rather get streaming up, which gets up after a initial delay which is fine with me right now.

I had to use two links so that I could add symbols to be subscribed with out re-connecting to get the data that is being pushed. I could not figure out a way to do it using a single link. You guys can give a shot and let me know. I leave this here for now, as I am in no mood of writing a complete infrastructure for push when several exists.

Below is a small video of the Flex UI using the code above showing streaming data.


Wednesday, June 17, 2009

Open Mbeans Tutorial

Writing an MBean in java is very simple. But, when you want the return type to a table of data(say, some metrics) then simple Mbeans are not of much use. One way to get table of data as a return type is using a Open Mbeans. This tutorial gives a simple example of how to write an Open Mbean that returns Table of data.

Step 1: Define a dynamic MXBean by implementing interface "DynamicMBean".
public class QuoteMXBean implements DynamicMBean { 
public Object getAttribute(String attribute) throws AttributeNotFoundException,
MBeanException, ReflectionException{
//TODO
}
public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
InvalidAttributeValueException, MBeanException, ReflectionException{
//TODO
}
public AttributeList getAttributes(String[] attributes){
//TODO
}
public AttributeList setAttributes(AttributeList attributes){
//TODO
}
public Object invoke(String actionName, Object params[], String signature[])
throws MBeanException, ReflectionException{
//TODO
}
public MBeanInfo getMBeanInfo(){
//TODO
}
}

Step 2:
A table is made up of rows. And each row is made up of columns. We will have to give exactly this description defining a table.
String[] itemNames = { "symbol", "localTimeStamp", "latestResponse"}; 
String[] itemDescriptions = { "Symbol",
"The Timestamp for the response as on server", "The response String from Quoteserver"};
OpenType[] itemTypes = { SimpleType.STRING,
SimpleType.STRING, SimpleType.STRING};
CompositeType snapshotType = new CompositeType("snapshot", "Quote Data",
itemNames, itemDescriptions, itemTypes);
All the variables can be made private and initialized either in a static block or in the constructor. The CompositeType just defined that each row will contain the columns "symbol", "localTimeStamp", "latestResponse". Now lets define the table type it self.
String[] index = { "symbol" }; 
TabularType quoteTableType = new TabularType("quoteSnapshots",
"List of Quotes", snapshotType,
index);
That is, a table type "quoteTableType" is a collection of rows given by Composite type "snapshotType". The "index" gives, the unique identification for the table.


Step 3:
Lets declare the table with type "quoteTableType".
private TabularDataSupport quoteSnapshot = new TabularDataSupport(quoteTableType); 

Please add the getter and setters for the same.


Step 4:
Lets implement getMBeanInfo().
OpenMBeanInfoSupport info; 
OpenMBeanAttributeInfoSupport[] attributes = new OpenMBeanAttributeInfoSupport[1];
OpenMBeanConstructorInfoSupport[] constructors = new OpenMBeanConstructorInfoSupport[1];
OpenMBeanOperationInfoSupport[] operations = new OpenMBeanOperationInfoSupport[4];
MBeanNotificationInfo[] notifications = new MBeanNotificationInfo[0];

//Just one attribute
attributes[0] = new OpenMBeanAttributeInfoSupport("QuoteSnapshot",
"Table of quotes data", quoteSnapshotType, true, false,
false);
//No arg constructor
constructors[0] = new OpenMBeanConstructorInfoSupport(
"QuoteMXBean",
"Constructs a QuoteMXBean instance.",
new OpenMBeanParameterInfoSupport[0]);

//Just one operation

OpenMBeanParameterInfo[] params = new OpenMBeanParameterInfoSupport[0];
operations[0] = new OpenMBeanOperationInfoSupport(
"resetAndGetQuoteSnapshot",
"Reset and get the latest available data for the Quotes",
params, quoteSnapshotType,
MBeanOperationInfo.INFO);
//Build the info

info = new OpenMBeanInfoSupport(this .getClass().getName(),
"Quote - Open - MBean", attributes, constructors,
operations, notifications);

Step 5:
Implement getAttribute() and getAttributes().
 public Object getAttribute(String attributeName) throws AttributeNotFoundException, 
MBeanException, ReflectionException {
if (attributeName.equals("QuoteSnapshot")) {
return (TabularData)getQuoteSnapshot();
}
throw new AttributeNotFoundException("Cannot find "
+ attributeName + " attribute ");
}
public AttributeList getAttributes(String[] attributeNames) {
AttributeList resultList = new AttributeList();
if (attributeNames.length == 0)
return resultList;
for (int i = 0; i < attributeNames.length; i++) {
try {
Object value = getAttribute(attributeNames[i]);
resultList.add(new Attribute(attributeNames[i], value));
} catch (Exception e) {
e.printStackTrace();
}
}
return (resultList);
}

Step 6:
Implement setAttribute().
There is nothing to set in this example.
    public void setAttribute(Attribute arg0) throws AttributeNotFoundException, 
InvalidAttributeValueException, MBeanException, ReflectionException {
throw new AttributeNotFoundException(
"No attribute can be set in this MBean");
}

@Override
public AttributeList setAttributes(AttributeList arg0) {
return new AttributeList();
}

Step 7:
Implement invoke().
public Object invoke(String operationName, Object[] params, String[] signature) 
throws MBeanException, ReflectionException {
if (operationName.equals("resetAndGetQuoteSnapshot")) {
try {
return buildSnapshot(); //This is where you delegate the invokation to your original method.
} catch (OpenDataException e) {
throw new MBeanException(e, "invoking resetAndGetQuoteSnapshot: "
+ e.getClass().getName() + "caught ["
+ e.getMessage() + "]");
}
}
else{
throw new ReflectionException(new NoSuchMethodException(
operationName), "Cannot find the operation "
+ operationName);
}
}
public Object buildSnapshot() throws OpenDataException {
quoteSnapshot = new TabularDataSupport(quoteTableType);
SomeVo vo = //get the data
Object[] itemValues = { vo.symbol, vo.localDate, vo.response};
CompositeData result = new CompositeDataSupport(snapshotType,
itemNames, itemValues);
quoteSnapshot.put(result);
return quoteSnapshot;
}

This is all is needed to build an Open MBean that supports a Tabular data as output. You can use jconsole and view the output. Obviously, you have to deploy the MBean in mbean server. I use spring to do that job. The spring XML is given below.
<bean id="exporter" class="org.springframework.jmx.export.MBeanExporter" lazy-init="false"> 
<property name="beans">
<map>
<entry key="com.ssb.mbean:type=QuoteMXBean" value-ref="quoteMXBean"/>
</map>
</property>
</bean>
<bean id="quoteMXBean" class="com.ssb.mbean.QuoteMXBean"/>
<bean id="registry" class="org.springframework.remoting.rmi.RmiRegistryFactoryBean"
p:port="9999" />
<bean id="serverConnector" class="org.springframework.jmx.support.ConnectorServerFactoryBean"
p:objectName="connector:name=rmi"
p:serviceUrl="service:jmx:rmi://localhost/jndi/rmi://localhost:9999/quotesconnector"
p:threaded="true"
p:daemon="true"/>

The above example is trying to display the latest quote data that is available to the system.