Pages

Thursday, June 18, 2009

Streaming Data using Flex and XSocket

I have worked on couple of push technologies namely blazeDS and Lightstreamer and always wondered how does it work. I am just concerned about streaming in Flex and not anything else. You can think I am little inclined towards flex, but the truth is Flex rocks, and after I entered the world of Flex, other UI technologies - I don't care anymore! Anyways, this logic of streaming data to Flex from a backend came to me just by chance! It took 2 days effort to get the below crappy thing going! I read a blog on the Internet that used URLStream to load image data in Flex progressively. Now, if you can load image progressively, then I should be able to do the same with any data. So, here I present Flex - XSocket combination to subscribe/load data from backend for a watchlist. There are a few things, that I did not resolve, as I did not have time to dedicate for it. I will explain those at the end.

JAVA

Lets start from the java backend. I use XScoket Xlightweb library to create a NIO server that can take any symbol as subscription and pushes randomly generated quotes to the client.

I will not be explaining XSockets here. Please visit http://xsocket.sourceforge.net/ for tutorials and downloads.

Lets write a request handler. In XLightweb, the request handler implements the IHttpRequestHandler.

public class ERequestHandler implements IHttpRequestHandler{ 
//Timer that generates the quotes at the specified interval
private final Timer timer = new Timer("timer", true);
public static Map<String, SymbolSender> state = new HashMap<String, SymbolSender>();

Lets implement the onRequest method. The same handler will be used to receive commands, like add symbol, remove symbol and also to receive data request, we use the URI to differentiate between the two request.
    public void onRequest(IHttpExchange exchange) throws IOException, 
BadMessageException {
IHttpRequest req = exchange.getRequest();
String ipAddress = exchange.getConnection().getRemoteAddress().getHostAddress();
if (req.getRequestURI().endsWith("/link1")) {//The Action link
String symbol = req.getParameter("symbol");
String command = req.getParameter("action");

Every ipAddress is given its own handler. So, obtain the handler ie SymbolSender and delegate the action to it. The SymbolSender is a private class that is given little below. Before any client sends an action, it has to connect to this server.
            SymbolSender symbolSender = state.get(ipAddress); 
if(command.equalsIgnoreCase("add")){
symbolSender.addSymbol(symbol);
}else if(command.equalsIgnoreCase("delete")){
symbolSender.removeSymbol(symbol);
}else if(command.equalsIgnoreCase("stop")){
symbolSender.stop();
}

The action has been delegated, so close the channel with a 200 Response header as in the below snippet.
            IHttpResponseHeader respHdr = new HttpResponseHeader(200, "text/plain"); 
BodyDataSink outChannel = exchange.send(respHdr);
outChannel.close();
}else if(req.getRequestURI().endsWith("/link2")){//The initial Data link, that is used to send data back.
String command = req.getParameter("action");
Initialize a new SymbolSender if none exist for the given ipAddress and start the handler using the timer. Here, it is scheduled at a fixed rate of every 500ms.

if(!state.keySet().contains(ipAddress) || state.get(ipAddress) == null){
state.put(ipAddress, new SymbolSender(exchange));
}
SymbolSender symbolSender = state.get(ipAddress);
if(command.equalsIgnoreCase("start")){
timer.scheduleAtFixedRate(symbolSender,new Date(), 500);
}
}
}

Lets define the SymbolSender timer task, which is a handler for each ip address separately.
     private class SymbolSender extends TimerTask { 
private IHttpExchange exchange; //The exchange handler
private List<String> symbolList = new ArrayList<String>();
private BodyDataSink outChannel;
private String ipAddress;
public SymbolSender(IHttpExchange exchange) {
this.exchange = exchange;
IHttpResponseHeader respHdr = new HttpResponseHeader(200, "text/plain");
try {
outChannel = this.exchange.send(respHdr);
outChannel.setEncoding("utf-8");
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
ipAddress = exchange.getConnection().getRemoteAddress().getHostAddress();
}

API for adding and removing the subscription symbol from this ipAddress.
        public void addSymbol(String symbol){ 
symbolList.add(symbol);
}

public void removeSymbol(String symbol){
symbolList.remove(symbol);
}

The run method where the random data is generated for the symbols subscribed from this ipAddress, and written to the client.
 @Override 
public void run() {
try {
if(symbolList.size() > 0){
for(String symbol : symbolList){
String data = "|symbol=" + symbol + "#lastPrice=" + randomToTwoDecimal() + "#lastAsk=" + randomToTwoDecimal() + "#lastBid=" + randomToTwoDecimal() + "|\r\n";
outChannel.write(data);
}
}else{
outChannel.write("STAY ALIVE\r\n");
}
} catch (BufferOverflowException e) {
e.printStackTrace();
stop();
} catch (IOException e) {
e.printStackTrace();
stop();
} catch(Exception e){
e.printStackTrace();
stop();
}

}

A simple random way of generating xx.yy number.
        private String randomToTwoDecimal(){ 
Double p1 = Math.random() * 100;
String value1 = String.valueOf(p1);
Double p2 = Math.random() * 100;
String value2 = String.valueOf(p2);
String randNumber = value1.substring(0, value1.indexOf(".")) + "." + value2.substring(0, value2.indexOf("."));
return randNumber;
}

Stopping this handler is achieved by stopping the task, and removing the handler form the state for the ipAddress.
        public void stop(){ 
this.cancel();
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
ERequestHandler.state.remove(ipAddress);
}
}
}

The main class that starts the server is given below.
public class Streamer { 
public static void runServer() throws IOException{
System.setProperty("org.xlightweb.showDetailedError", "true");
Context rootCtx = new Context("");
IHttpRequestHandler handler = new ERequestHandler();
Add the two contexts that this server listens to.

rootCtx.addHandler("/link1",handler );
rootCtx.addHandler("/link2", handler);
// creates the server by passing over the port number & the server handler
HttpServer server = new HttpServer(9980, rootCtx);
ConnectionUtils.start(server);
}
public static void main(String[] args) throws Exception {
runServer();
}
}

The java part for the streaming server is over. Now lets go ahead with the flex part.

FLEX


For the flex part lets create an application.
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" layout="absolute" creationComplete="init()"> 
<mx:Script>
<![CDATA[
import mx.collections.ArrayCollection;
import mx.controls.Alert;

Lets declare two variables, one for streaming data, and another for writing actions to the xsocket server we just wrote above.
            private var dataStream:URLStream = new URLStream();
private var actionStream:URLLoader = new URLLoader();

The string that we will be getting from the backend is appended here.
private var dataStr:String = ""; 

Lets declare the remaining things i.e a dataprovider, dictionary to keep map of symbol to object and a timer that will be used to parse data and create objects for data provider.
[Bindable] 
private var watchListData:ArrayCollection = new ArrayCollection();
private var symbolObjectMap:Dictionary = new Dictionary();
private static var timer:Timer = new Timer(60);

Lets write the init method thats called on creation complete. We will add a progress listener, that will be really useful to us and the remaining listeners that is just for time pass.
private function init():void{                
dataStream.addEventListener(ProgressEvent.PROGRESS, streamListener);
dataStream.addEventListener(Event.COMPLETE, listener);
dataStream.addEventListener(Event.ACTIVATE, listener);
dataStream.addEventListener(Event.DEACTIVATE, listener);
dataStream.addEventListener(Event.OPEN, listener);
dataStream.addEventListener(HTTPStatusEvent.HTTP_STATUS, listener);
dataStream.addEventListener(IOErrorEvent.IO_ERROR, listener);
dataStream.addEventListener(SecurityErrorEvent.SECURITY_ERROR, listener);
timer.addEventListener(TimerEvent.TIMER, parse, false, 0, true);
timer.start();
}

The stream listener for the data is given below. Very simple logic, just read the multi byte which is already written in form of utf-8 by the server and append it to the data string. Now, this stream listener is called by Flex when there is some specific amount of data is present. And, it does not make sense for me to parse the data here to create objects at that rate. Instead as you will see next code, the timer will invoke the parse method every 60ms and then parse the data in the data string to an appropriate VO.
private function streamListener(event:ProgressEvent):void{ 
if(dataStream.bytesAvailable == 0)return;
if(dataStream.connected){
while(dataStream.bytesAvailable >= 1){
dataStr = dataStr + dataStream.readMultiByte(dataStream.bytesAvailable, "utf-8");
}
trace(dataStr);
}
}

Now, the timepass listener is here. Comment the alerts if you do not want, or use trace instead.
private function listener(event:Event):void{ 
Alert.show("Here at other side of the world :(");
}

Now, here is the parse method for the data I stream from the server. Actually, I do not like to call it streaming, as it is just a normal read and write to streams. Heck! that's why we call it streaming. He he..
private function parse(event:TimerEvent):void{ 
if(this.dataStr != null
&& this.dataStr.indexOf("|") >=0){
var skipLast:Boolean = false;
var streamData:Array = this.dataStr.split("\r\n");

Find out if the data is written partially. That means if its not ending with "|". If that is the case then keep the last piece of string in the data string for appending with the next data arrived from the backend. This will be parsed after the data has arrived and 60ms time lag.
if(String(streamData[streamData.length-1]).charAt(streamData.length-1) != "|"){ 
dataStr = String(streamData[streamData.length-1]);
skipLast = true;
streamData[streamData.length-1] = "STAY ALIVE";
}

Parse all the data other than "STAY ALIVE" pings. I am not sure if you can stop sending STAY ALIVE or delay its frequency. And, I am not doing that R & D either. My purpose remains just of stream data to Flex. Only the positive use case :) The rest of the for loop is just parsing data, creating or updating the StreamerVO object.
for each(var string:String in streamData){ 
if(string != "STAY ALIVE"){
string = string.substring(1,string.length-1);
var nodes:Array = string.split("#");
var symbolAttribute:String = String(nodes[0]);
var symbolData:Array = symbolAttribute.split("=");
if(String(symbolData[0]) == "symbol"){
var symbol:String = String(symbolData[1]);
if(!symbolObjectMap.hasOwnProperty(symbol)){
symbolObjectMap[symbol] = new StreamerVO();
watchListData.addItem(symbolObjectMap[symbol]);
}
var vo:StreamerVO = StreamerVO(symbolObjectMap[symbol]);
for each(var node:String in nodes){
var nodeData : Array = node.split("=");
vo[nodeData[0]] = nodeData[1];
}
}
}
}
}
}

Next lets write the 4 actions supported namely add, delete, start and stop. The url's are in sync with the actions supported by respective links given in the backend code. The get parameters are constructed manually as shown below.
            private function subscribe():void{ 
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=add&symbol=" + txtSymbol.text;
actionStream = new URLLoader();
actionStream.load(urlReq);
}

private function unSubscribe():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=delete&symbol=" + txtSymbol.text;
actionStream = new URLLoader();
actionStream.load(urlReq);
}
private function connect():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link2");
urlReq.url += "?action=start";
dataStream.load(urlReq);
}
private function disconnect():void{
var urlReq:URLRequest = new URLRequest("http://localhost:9980/link1");
urlReq.url += "?action=stop";
actionStream = new URLLoader();
actionStream.load(urlReq);
}

Now, lets create the view or UI to do the job. I don't think any explanation is needed for this part, as it cant get simpler than this.
 ]]> 
</mx:Script>
<mx:VBox>
<mx:TextInput id="txtSymbol"/>
<mx:HBox>
<mx:Button label="Connect" click="connect()"/>
<mx:Button label="DisConnect" click="disconnect()"/>
<mx:Button label="Subscribe" click="subscribe()"/>
<mx:Button label="UnSubscribe" click="unSubscribe()"/>
</mx:HBox>
<mx:DataGrid id="watchList" dataProvider="{watchListData}">
<mx:columns>
<mx:DataGridColumn headerText="Symbol" dataField="symbol"/>
<mx:DataGridColumn headerText="LastPrice" dataField="lastPrice"/>
<mx:DataGridColumn headerText="LastAsk" dataField="lastAsk"/>
<mx:DataGridColumn headerText="LastBid" dataField="lastBid"/>
</mx:columns>
</mx:DataGrid>
</mx:VBox>
</mx:Application>

And, that's all folks. The backed is writing the data to the channel, and front end flex is reading it and displaying. Well, there are few bugs in this code I presented.

The data does not start to flow to the front end for initial 20 sec or so. Try to connect and subscribe say 3-4 symbols IBM, GOOG, MSFT, CSCO soon. I guess the problem is with the Operating system buffer because even when I said flush manually in the XSocket code (Application buffer) no effect could be seen or the problem could exist in Flex also, I am not sure. As I said earlier, my goal is not to solve these problems but to rather get streaming up, which gets up after a initial delay which is fine with me right now.

I had to use two links so that I could add symbols to be subscribed with out re-connecting to get the data that is being pushed. I could not figure out a way to do it using a single link. You guys can give a shot and let me know. I leave this here for now, as I am in no mood of writing a complete infrastructure for push when several exists.

Below is a small video of the Flex UI using the code above showing streaming data.


3 comments:

  1. Hi, thanks for this very nice Example, I am looking for way of making Java-Flex download manager.
    Java is streaming File to Client and flex is streaming this forward to the Local disc of the user (after the dialogue Interaction: save, cancel)
    But the FileReference.save() Method is not supporting streaming. Is there other way to realization it ?
    salute
    andrzej

    ReplyDelete
  2. What you are talking about is a normal file download from a server.. Try using download.. The FileReference.download() method prompts the user for a location to save the file and initiates downloading from a remote URL.. Not sure about the streaming part you are talking here..

    ReplyDelete
  3. Anonymous3:58 PM

    "state.keySet().contains(ipAddress)" should be replaced with "state.containsKey(ipAddress)"

    ReplyDelete