11package com .rainmatter .ticker ;
22
3+ /**
4+ * Created by H1ccup on 10/09/16.
5+ */
6+
37import com .neovisionaries .ws .client .*;
4- import com .rainmatter .models .Depth ;
5- import com .rainmatter .models .Tick ;
68import com .rainmatter .kiteconnect .KiteConnect ;
79import com .rainmatter .kiteconnect .Routes ;
10+ import com .rainmatter .kitehttp .exceptions .KiteException ;
11+ import com .rainmatter .models .Depth ;
12+ import com .rainmatter .models .Tick ;
813import org .json .JSONArray ;
914import org .json .JSONException ;
1015import org .json .JSONObject ;
1520import java .util .*;
1621
1722/**
18- * Provides capability to establish a websocket connection to get live quotes.
23+ * Ticker provider sends tokens to com.rainmatter.ticker server and get ticks from com.rainmatter.ticker server. Ticker server sends data in bytes. This Class
24+ * gets ticks and converts into readable format which includes our own business logic.
1925 */
2026public class KiteTicker {
2127
@@ -25,8 +31,8 @@ public class KiteTicker {
2531 private OnDisconnect onDisconnectedListener ;
2632 private WebSocket ws ;
2733
28- //private Thread mThread;
2934 private KiteConnect _kiteSdk ;
35+ private KiteTicker tickerProvider ;
3036
3137 public final int NseCM = 1 ,
3238 NseFO = 2 ,
@@ -42,12 +48,60 @@ public class KiteTicker {
4248 mUnSubscribe = "unsubscribe" ,
4349 mSetMode = "mode" ;
4450
45-
46- private boolean connection = false ;
4751 public static String modeFull = "full" , // Full quote inluding market depth. 172 bytes.
4852 modeQuote = "quote" , // Quote excluding market depth. 52 bytes.
4953 modeLTP = "ltp" ; // Only LTP. 4 bytes.;
5054
55+ private long lastTickArrivedAt = 0 ;
56+ private long timeIntervalForReconnection = 5 ;
57+ private Set <Long > subscribedTokens = new HashSet <>();
58+ private int maxRetries = 50 ;
59+ private int count = 0 ;
60+ private Timer timer = null ;
61+ private boolean tryReconnection = false ;
62+
63+ /** Returns task which performs check every second for reconnection*/
64+ private TimerTask getTask (){
65+ TimerTask checkForRestartTask = new TimerTask () {
66+ @ Override
67+ public void run () {
68+ Date currentDate = new Date ();
69+ long timeInterval = (currentDate .getTime () - lastTickArrivedAt )/1000 ;
70+ if (lastTickArrivedAt > 0 ) {
71+ if (timeInterval >= timeIntervalForReconnection ) {
72+ if (maxRetries == -1 || count <= maxRetries ) {
73+ reconnect (new ArrayList <>(subscribedTokens ));
74+ count ++;
75+ }else if (maxRetries != -1 && count > maxRetries ){
76+ if (timer != null ){
77+ timer .cancel ();
78+ }
79+ }
80+ }
81+ }
82+ }
83+ };
84+ return checkForRestartTask ;
85+ }
86+
87+ /** Set tryReconnection, to instruct KiteTicker that it has to reconnect, if ticker is disconnected*/
88+ public void setTryReconnection (boolean retry ){
89+ tryReconnection = retry ;
90+ }
91+
92+ /** Set minimum time interval after which ticker has to restart in seconds*/
93+ public void setTimeIntervalForReconnection (int interval ) throws KiteException {
94+ if (interval >= 5 ) {
95+ timeIntervalForReconnection = interval ;
96+ }else
97+ throw new KiteException ("reconnection interval can't be less than five seconds" , 00 );
98+ }
99+
100+ /** Set max number of retries for reconnection, for infinite retries set value as -1 */
101+ public void setMaxRetries (int maxRetries ){
102+ this .maxRetries = maxRetries ;
103+ }
104+
51105 public KiteTicker (KiteConnect kiteSdk ){
52106 _kiteSdk = kiteSdk ;
53107 createUrl ();
@@ -86,9 +140,8 @@ public void setOnDisconnectedListener(OnDisconnect listener){
86140 /** Establishes a web socket connection.*/
87141 public void connect () throws WebSocketException , IOException {
88142
89- if (connection ){
90- disconnect ();
91- connection = false ;
143+ if (isConnectionOpen ()){
144+ return ;
92145 }
93146
94147 if (wsuri == null ){
@@ -99,10 +152,18 @@ public void connect() throws WebSocketException, IOException {
99152
100153 @ Override
101154 public void onConnected (WebSocket websocket , Map <String , List <String >> headers ) {
102- connection = true ;
155+ // connection = true;
103156 if (onConnectedListener != null ) {
104157 onConnectedListener .onConnected ();
105158 }
159+ if (tryReconnection ) {
160+ if (timer != null ) {
161+ timer .cancel ();
162+ }
163+ timer = new Timer ();
164+ timer .scheduleAtFixedRate (getTask (), 0 , 1000 );
165+
166+ }
106167 }
107168
108169 @ Override
@@ -118,6 +179,9 @@ public void onBinaryMessage(WebSocket websocket, byte[] binary) {
118179 if (onTickerArrivalListener != null ) {
119180 onTickerArrivalListener .onTick (tickerData );
120181 }
182+
183+ Date date = new Date ();
184+ lastTickArrivedAt = date .getTime ();
121185 }
122186
123187 /**
@@ -133,24 +197,35 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
133197 if (onDisconnectedListener != null ){
134198 onDisconnectedListener .onDisconnected ();
135199 }
136- return ;
200+ //connection = false;
201+ return ;
137202 }
138203
139204 @ Override
140205 public void onError (WebSocket websocket , WebSocketException cause ) {
141206 super .onError (websocket , cause );
142- }
207+ }
208+
143209 });
144210 ws .connect ();
145211 }
146212
147213 /** Disconnects websocket connection.*/
148214 public void disconnect (){
215+ if (timer != null ){
216+ timer .cancel ();
217+ }
149218 if (ws != null && ws .isOpen ()) {
150219 ws .disconnect ();
220+ subscribedTokens = new HashSet <>();
151221 }
152222 }
153223
224+ /** Disconnects websocket connection only for internal use*/
225+ private void nonUserDisconnect (){
226+ ws .disconnect ();
227+ }
228+
154229 /** Returns true if websocket connection is open.
155230 * @return boolean*/
156231 public boolean isConnectionOpen (){
@@ -192,16 +267,17 @@ public void setMode(ArrayList<Long> tokens, String mode){
192267
193268 /** Subscribes for list of tokens.
194269 * @param tokens is list of tokens to be subscribed for.*/
195- public void subscribe (ArrayList <Long > tokens ) throws IOException , WebSocketException {
270+ public void subscribe (ArrayList <Long > tokens ) throws IOException , WebSocketException , KiteException {
196271 if (ws != null ) {
197272 if (ws .isOpen ()) {
198273 createTickerJsonObject (tokens , mSubscribe );
199274 ws .sendText (createTickerJsonObject (tokens , mSubscribe ).toString ());
200275 setMode (tokens , modeQuote );
201276 }else
202- connect ( );
277+ throw new KiteException ( "ticker is not connected" , 504 );
203278 }else
204- connect ();
279+ throw new KiteException ("ticker is not connected" , 504 );
280+ subscribedTokens .addAll (tokens );
205281 }
206282
207283 private JSONObject createTickerJsonObject (ArrayList <Long > tokens , String action ) {
@@ -225,6 +301,7 @@ public void unsubscribe(ArrayList<Long> tokens){
225301 if (ws != null ) {
226302 if (ws .isOpen ()) {
227303 ws .sendText (createTickerJsonObject (tokens , mUnSubscribe ).toString ());
304+ subscribedTokens .removeAll (tokens );
228305 }
229306 }
230307 }
@@ -299,14 +376,13 @@ private Tick getNseIndeciesData(byte[] bin, int x){
299376 tick .setOpenPrice (convertToDouble (getBytes (bin , 16 , 20 )) / dec );
300377 tick .setClosePrice (convertToDouble (getBytes (bin , 20 , 24 )) / dec );
301378 tick .setNetPriceChangeFromClosingPrice (convertToDouble (getBytes (bin , 24 , 28 )) / dec );
302- return tick ;
303379 }else {
304380 tick .setMode (modeLTP );
305381 tick .setTradable (false );
306382 tick .setToken (x );
307383 tick .setLastTradedPrice (convertToDouble (getBytes (bin , 4 , 8 )) / dec );
308- return tick ;
309384 }
385+ return tick ;
310386 }
311387
312388 private Tick getLtpQuote (byte [] bin , int x , int dec1 ){
@@ -412,4 +488,33 @@ private int getLengthFromByteArray(byte[] bin){
412488 return bb .getShort ();
413489 }
414490
491+ /** Disconnects and reconnects ticker*/
492+ private void reconnect (final ArrayList <Long > tokens ) {
493+ try {
494+ nonUserDisconnect ();
495+ connect ();
496+ lastTickArrivedAt = 0 ;
497+ final OnConnect onUsersConnectedListener = this .onConnectedListener ;
498+ setOnConnectedListener (new OnConnect () {
499+ @ Override
500+ public void onConnected () {
501+ try {
502+ subscribe (tokens );
503+ onConnectedListener = onUsersConnectedListener ;
504+ } catch (IOException e ) {
505+ e .printStackTrace ();
506+ } catch (WebSocketException e ) {
507+ e .printStackTrace ();
508+ } catch (KiteException e ) {
509+ e .printStackTrace ();
510+ }
511+ }
512+ });
513+ }catch (WebSocketException we ){
514+ we .printStackTrace ();
515+ }catch (IOException ie ){
516+ ie .printStackTrace ();
517+ }
518+ }
519+
415520}
0 commit comments