Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,29 @@ python -m pytest
```

### Real-time OrderBook
The ```OrderBook``` subscribes to a websocket and keeps a real-time record of
the orderbook for the product_id input. Please provide your feedback for future
The ```OrderBook``` is a convenient data structure to keep a real-time record of
the orderbook for the product_id input. It processes incoming messages from an
already existing WebsocketClient. Please provide your feedback for future
improvements.

```python
import cbpro, time
order_book = cbpro.OrderBook(product_id='BTC-USD')
order_book.start()
import cbpro, time, Queue
class myWebsocketClient(cbpro.WebsocketClient):
def on_open(self):
self.products = ['BTC-USD', 'ETH-USD']
self.order_book_btc = OrderBookConsole(product_id='BTC-USD')
self.order_book_eth = OrderBookConsole(product_id='ETH-USD')
def on_message(self, msg):
self.order_book_btc.process_message(msg)
self.order_book_eth.process_message(msg)

wsClient = myWebsocketClient()
wsClient.start()
time.sleep(10)
order_book.close()
while True:
print(wsClient.order_book_btc.get_ask())
print(wsClient.order_book_eth.get_bid())
time.sleep(1)
```

### Testing
Expand Down
143 changes: 72 additions & 71 deletions cbpro/order_book.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
from cbpro.websocket_client import WebsocketClient


class OrderBook(WebsocketClient):
class OrderBook(object):
def __init__(self, product_id='BTC-USD', log_to=None):
super(OrderBook, self).__init__(
products=product_id, channels=['full'])
self._asks = SortedDict()
self._bids = SortedDict()
self._client = PublicClient()
Expand All @@ -24,18 +22,7 @@ def __init__(self, product_id='BTC-USD', log_to=None):
if self._log_to:
assert hasattr(self._log_to, 'write')
self._current_ticker = None

@property
def product_id(self):
''' Currently OrderBook only supports a single product even though it is stored as a list of products. '''
return self.products[0]

def on_open(self):
self._sequence = -1
print("-- Subscribed to OrderBook! --\n")

def on_close(self):
print("\n-- OrderBook Socket Closed! --")
self.product_id = product_id

def reset_book(self):
self._asks = SortedDict()
Expand All @@ -57,39 +44,37 @@ def reset_book(self):
})
self._sequence = res['sequence']

def on_message(self, message):
if self._log_to:
pickle.dump(message, self._log_to)
def process_message(self, message):
if message.get('product_id') == self.product_id:
if self._log_to:
pickle.dump(message, self._log_to)

sequence = message.get('sequence', -1)
if self._sequence == -1:
self.reset_book()
return
if sequence <= self._sequence:
# ignore older messages (e.g. before order book initialization from getProductOrderBook)
return
elif sequence > self._sequence + 1:
self.on_sequence_gap(self._sequence, sequence)
return
sequence = message.get('sequence', -1)
if self._sequence == -1:
self.reset_book()
return
if sequence <= self._sequence:
# ignore older messages (e.g. before order book initialization from getProductOrderBook)
return
elif sequence > self._sequence + 1:
self.on_sequence_gap(self._sequence, sequence)
return

msg_type = message['type']
if msg_type == 'open':
self.add(message)
elif msg_type == 'done' and 'price' in message:
self.remove(message)
elif msg_type == 'match':
self.match(message)
self._current_ticker = message
elif msg_type == 'change':
self.change(message)
msg_type = message['type']
if msg_type == 'open':
self.add(message)
elif msg_type == 'done' and 'price' in message:
self.remove(message)
elif msg_type == 'match':
self.match(message)
self._current_ticker = message
elif msg_type == 'change':
self.change(message)

self._sequence = sequence
self._sequence = sequence

def on_sequence_gap(self, gap_start, gap_end):
self.reset_book()
print('Error: messages missing ({} - {}). Re-initializing book at sequence.'.format(
gap_start, gap_end, self._sequence))


def add(self, order):
order = {
Expand Down Expand Up @@ -249,7 +234,6 @@ def set_bids(self, price, bids):
import time
import datetime as dt


class OrderBookConsole(OrderBook):
''' Logs real-time changes to the bid-ask spread to the console '''

Expand All @@ -262,38 +246,55 @@ def __init__(self, product_id=None):
self._bid_depth = None
self._ask_depth = None

def on_message(self, message):
super(OrderBookConsole, self).on_message(message)

# Calculate newest bid-ask spread
bid = self.get_bid()
bids = self.get_bids(bid)
bid_depth = sum([b['size'] for b in bids])
ask = self.get_ask()
asks = self.get_asks(ask)
ask_depth = sum([a['size'] for a in asks])

if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth:
# If there are no changes to the bid-ask spread since the last update, no need to print
pass
else:
# If there are differences, update the cache
self._bid = bid
self._ask = ask
self._bid_depth = bid_depth
self._ask_depth = ask_depth
print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(
dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask))

order_book = OrderBookConsole()
order_book.start()
def process_message(self, message):
if message.get('product_id') == self.product_id:
super(OrderBookConsole, self).process_message(message)

try:
# Calculate newest bid-ask spread
bid = self.get_bid()
bids = self.get_bids(bid)
bid_depth = sum([b['size'] for b in bids])
ask = self.get_ask()
asks = self.get_asks(ask)
ask_depth = sum([a['size'] for a in asks])

if self._bid == bid and self._ask == ask and self._bid_depth == bid_depth and self._ask_depth == ask_depth:
# If there are no changes to the bid-ask spread since the last update, no need to print
pass
else:
# If there are differences, update the cache
self._bid = bid
self._ask = ask
self._bid_depth = bid_depth
self._ask_depth = ask_depth
print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(
dt.datetime.now(), self.product_id, bid_depth, bid, ask_depth, ask))
except Exception:
pass

class WebsocketConsole(WebsocketClient):
def on_open(self):
self.products = ['BTC-USD', 'ETH-USD']
self.order_book_btc = OrderBookConsole(product_id='BTC-USD')
self.order_book_eth = OrderBookConsole(product_id='ETH-USD')

def on_message(self, msg):
self.order_book_btc.process_message(msg)
self.order_book_eth.process_message(msg)

wsClient = WebsocketConsole()
wsClient.start()
time.sleep(10)
try:
while True:
time.sleep(10)
pass
except KeyboardInterrupt:
order_book.close()
wsClient.close()
except Exception:
pass

if order_book.error:
if wsClient.error:
sys.exit(1)
else:
sys.exit(0)
3 changes: 2 additions & 1 deletion contributors.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ Leonard Lin
Jeff Gibson
David Caseria
Paul Mestemaker
Drew Rice
Drew Rice
Mike Cardillo