diff --git a/README.md b/README.md index 37872be3..456d212e 100644 --- a/README.md +++ b/README.md @@ -346,16 +346,29 @@ python -m pytest ``` ### Real-time OrderBook -The ```OrderBook``` subscribes to a websocket and keeps a real-time record of -the orderbook for the product_id input. Please provide your feedback for future +The ```OrderBook``` is a convenient data structure to keep a real-time record of +the orderbook for the product_id input. It processes incoming messages from an +already existing WebsocketClient. Please provide your feedback for future improvements. ```python -import cbpro, time -order_book = cbpro.OrderBook(product_id='BTC-USD') -order_book.start() +import cbpro, time, Queue +class myWebsocketClient(cbpro.WebsocketClient): + def on_open(self): + self.products = ['BTC-USD', 'ETH-USD'] + self.order_book_btc = OrderBookConsole(product_id='BTC-USD') + self.order_book_eth = OrderBookConsole(product_id='ETH-USD') + def on_message(self, msg): + self.order_book_btc.process_message(msg) + self.order_book_eth.process_message(msg) + +wsClient = myWebsocketClient() +wsClient.start() time.sleep(10) -order_book.close() +while True: + print(wsClient.order_book_btc.get_ask()) + print(wsClient.order_book_eth.get_bid()) + time.sleep(1) ``` ### Testing diff --git a/cbpro/order_book.py b/cbpro/order_book.py index 0f393d9b..1a8525e5 100644 --- a/cbpro/order_book.py +++ b/cbpro/order_book.py @@ -12,10 +12,8 @@ from cbpro.websocket_client import WebsocketClient -class OrderBook(WebsocketClient): +class OrderBook(object): def __init__(self, product_id='BTC-USD', log_to=None): - super(OrderBook, self).__init__( - products=product_id, channels=['full']) self._asks = SortedDict() self._bids = SortedDict() self._client = PublicClient() @@ -24,18 +22,7 @@ def __init__(self, product_id='BTC-USD', log_to=None): if self._log_to: assert hasattr(self._log_to, 'write') self._current_ticker = None - - @property - def product_id(self): - ''' Currently OrderBook only supports a single product even though it is stored as a list of products. ''' - return self.products[0] - - def on_open(self): - self._sequence = -1 - print("-- Subscribed to OrderBook! --\n") - - def on_close(self): - print("\n-- OrderBook Socket Closed! --") + self.product_id = product_id def reset_book(self): self._asks = SortedDict() @@ -57,39 +44,37 @@ def reset_book(self): }) self._sequence = res['sequence'] - def on_message(self, message): - if self._log_to: - pickle.dump(message, self._log_to) + def process_message(self, message): + if message.get('product_id') == self.product_id: + if self._log_to: + pickle.dump(message, self._log_to) - sequence = message.get('sequence', -1) - if self._sequence == -1: - self.reset_book() - return - if sequence <= self._sequence: - # ignore older messages (e.g. before order book initialization from getProductOrderBook) - return - elif sequence > self._sequence + 1: - self.on_sequence_gap(self._sequence, sequence) - return + sequence = message.get('sequence', -1) + if self._sequence == -1: + self.reset_book() + return + if sequence <= self._sequence: + # ignore older messages (e.g. before order book initialization from getProductOrderBook) + return + elif sequence > self._sequence + 1: + self.on_sequence_gap(self._sequence, sequence) + return - msg_type = message['type'] - if msg_type == 'open': - self.add(message) - elif msg_type == 'done' and 'price' in message: - self.remove(message) - elif msg_type == 'match': - self.match(message) - self._current_ticker = message - elif msg_type == 'change': - self.change(message) + msg_type = message['type'] + if msg_type == 'open': + self.add(message) + elif msg_type == 'done' and 'price' in message: + self.remove(message) + elif msg_type == 'match': + self.match(message) + self._current_ticker = message + elif msg_type == 'change': + self.change(message) - self._sequence = sequence + self._sequence = sequence def on_sequence_gap(self, gap_start, gap_end): self.reset_book() - print('Error: messages missing ({} - {}). Re-initializing book at sequence.'.format( - gap_start, gap_end, self._sequence)) - def add(self, order): order = { @@ -249,7 +234,6 @@ def set_bids(self, price, bids): import time import datetime as dt - class OrderBookConsole(OrderBook): ''' Logs real-time changes to the bid-ask spread to the console ''' @@ -262,38 +246,55 @@ def __init__(self, product_id=None): self._bid_depth = None self._ask_depth = None - def on_message(self, message): - super(OrderBookConsole, self).on_message(message) - - # Calculate newest bid-ask spread - bid = self.get_bid() - bids = self.get_bids(bid) - bid_depth = sum([b['size'] for b in bids]) - ask = self.get_ask() - asks = self.get_asks(ask) - ask_depth = sum([a['size'] for a in asks]) - - if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth: - # If there are no changes to the bid-ask spread since the last update, no need to print - pass - else: - # If there are differences, update the cache - self._bid = bid - self._ask = ask - self._bid_depth = bid_depth - self._ask_depth = ask_depth - print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format( - dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask)) - - order_book = OrderBookConsole() - order_book.start() + def process_message(self, message): + if message.get('product_id') == self.product_id: + super(OrderBookConsole, self).process_message(message) + + try: + # Calculate newest bid-ask spread + bid = self.get_bid() + bids = self.get_bids(bid) + bid_depth = sum([b['size'] for b in bids]) + ask = self.get_ask() + asks = self.get_asks(ask) + ask_depth = sum([a['size'] for a in asks]) + + if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth: + # If there are no changes to the bid-ask spread since the last update, no need to print + pass + else: + # If there are differences, update the cache + self._bid = bid + self._ask = ask + self._bid_depth = bid_depth + self._ask_depth = ask_depth + print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format( + dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask)) + except Exception: + pass + + class WebsocketConsole(WebsocketClient): + def on_open(self): + self.products = ['BTC-USD', 'ETH-USD'] + self.order_book_btc = OrderBookConsole(product_id='BTC-USD') + self.order_book_eth = OrderBookConsole(product_id='ETH-USD') + + def on_message(self, msg): + self.order_book_btc.process_message(msg) + self.order_book_eth.process_message(msg) + + wsClient = WebsocketConsole() + wsClient.start() + time.sleep(10) try: while True: - time.sleep(10) + pass except KeyboardInterrupt: - order_book.close() + wsClient.close() + except Exception: + pass - if order_book.error: + if wsClient.error: sys.exit(1) else: sys.exit(0) diff --git a/contributors.txt b/contributors.txt index b1e5c495..3a6695b6 100644 --- a/contributors.txt +++ b/contributors.txt @@ -3,4 +3,5 @@ Leonard Lin Jeff Gibson David Caseria Paul Mestemaker -Drew Rice \ No newline at end of file +Drew Rice +Mike Cardillo \ No newline at end of file