Skip to content

Commit f6f25e3

Browse files
authored
Section 6
1 parent d0e5ad5 commit f6f25e3

18 files changed

+1171
-0
lines changed

β€ŽSection 6/README.mdβ€Ž

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Section 6
2+
3+
- `main.py`: section 6.2, client GUI and websocket client
4+
- `tests/`: section 5 unit tests
5+
- `test_rx.py`: section 5.1 basic reactive test
6+
- `test_transaction_form.py`: section 5.2 reactive GUI test
7+
- `test_server.py`: section 5.3 reactive Tornado server test
8+
- `server/` directory contains code from:
9+
- `matching_server.py`: section 4.3 web server, section 6.1
10+
managing orders on the web server
11+
- `transaction_server.py`: section 5.4 cluster testing
12+
- `client/` directory contains code from:
13+
- section 3 reactive GUIs and data flows
14+
- `client.py`: section 4.4 real-time async client receiver/sender
15+
(used in section 5.4 cluster testing)
16+
17+
Server-side Streams:
18+
19+
* orders per user
20+
* all orders (combination of orders per user)
21+
* prices (aggregate of orders grouped by stock symbol)
22+
23+
Client-side Streams:
24+
25+
* prices (from server)
26+
* orders (from user input or from bot)
27+
28+
Cluster test:
29+
30+
cd server
31+
python matching_server.py 8888 &
32+
python matching_server.py 7777 &
33+
python transaction_server.py &
34+
cd ..
35+
cd client
36+
python client.py

β€ŽSection 6/client/client.pyβ€Ž

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import asyncio
2+
import datetime
3+
from random import choice
4+
5+
from rx import Observable
6+
from rx.subjects import Subject
7+
from rx.concurrency import IOLoopScheduler
8+
9+
from tornado.ioloop import IOLoop
10+
from tornado.websocket import websocket_connect
11+
12+
class Client:
13+
def __init__(self, host='localhost', port='8888'):
14+
self._url = 'ws://{}:{}/exchange'.format(host, port)
15+
self.conn = None
16+
self.opened = Subject()
17+
self.messages = Subject()
18+
# self.messages.subscribe(lambda msg: print('received: {}'.format(msg)))
19+
20+
def connect(self):
21+
def on_connect(conn):
22+
print('on_connect')
23+
self.conn = conn
24+
self.opened.on_next(conn)
25+
self.opened.on_completed()
26+
self.opened.dispose()
27+
28+
def on_message_callback(message):
29+
# print('on_message_callback')
30+
self.messages.on_next(message)
31+
32+
print('connect to server')
33+
future = websocket_connect(
34+
self._url,
35+
on_message_callback=on_message_callback,
36+
)
37+
Observable.from_future(future).subscribe(on_connect)
38+
39+
def write_message(self, message):
40+
self.conn.write_message(message)
41+
42+
if __name__ == '__main__':
43+
scheduler = IOLoopScheduler(IOLoop.current())
44+
45+
def make_say_hello(client, client_id):
46+
def say_hello():
47+
print('{} client #{} is sending orders'.format(
48+
datetime.datetime.now(), client_id))
49+
symbols = ['ABC', 'DEF', 'GHI', 'A', 'GS', 'GO']
50+
quantities = [90, 100, 110]
51+
prices = [1.20, 1.21, 1.22, 1.23, 1.24, 1.25]
52+
client.write_message(
53+
'order,{},{},buy,{},{}'.format(client_id, choice(symbols), choice(quantities), choice(prices)))
54+
client.write_message(
55+
'order,{},{},sell,{},{}'.format(client_id, choice(symbols), choice(quantities), choice(prices)))
56+
57+
def schedule_say_hello(conn):
58+
sleep = 5000
59+
Observable \
60+
.interval(sleep, scheduler=scheduler) \
61+
.subscribe(lambda value: say_hello())
62+
return schedule_say_hello
63+
64+
clients = []
65+
for client_id in range(10):
66+
client = Client(port='9999')
67+
client.opened.subscribe(make_say_hello(client, client_id))
68+
clients.append(client)
69+
70+
for client in clients:
71+
client.connect()
72+
IOLoop.current().start()
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from PyQt5.QtWidgets import QWidget, QGridLayout
2+
3+
from rx.subjects import Subject
4+
5+
from client.transaction_form import TransactionForm
6+
from client.stock_overview_table import StockOverviewTable
7+
8+
class ClientWindow(QWidget):
9+
def __init__(self, *args, **kwargs):
10+
stock_prices_stream = kwargs.pop('stock_prices_stream')
11+
QWidget.__init__(self, *args, **kwargs)
12+
self.events = Subject()
13+
self._setup_window()
14+
self._layout = QGridLayout(self)
15+
self._overview_table = StockOverviewTable(stock_prices_stream=stock_prices_stream)
16+
self._form = TransactionForm()
17+
self._layout.addWidget(self._overview_table, 0, 0)
18+
self._layout.addWidget(self._form, 0, 2)
19+
self._layout.setColumnStretch(0, 2)
20+
21+
def _setup_window(self):
22+
self.resize(640, 320)
23+
self.move(350, 200)
24+
self.setWindowTitle('client: stock exchange')
25+
26+
def get_orders(self):
27+
return self._form.orders_stream
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from PyQt5 import QtCore
2+
from PyQt5.QtWidgets import QTableWidget, QTableWidgetItem
3+
4+
class StockOverviewTable(QTableWidget):
5+
def __init__(self, *args, **kwargs):
6+
stock_prices_stream = kwargs.pop('stock_prices_stream')
7+
QTableWidget.__init__(self, *args, **kwargs)
8+
self.setRowCount(0)
9+
self.setColumnCount(4)
10+
self.setHorizontalHeaderLabels(['Symbol', 'Name', 'Buy Price', 'Sell Price'])
11+
self.setColumnWidth(0, 50)
12+
self.setColumnWidth(1, 200)
13+
self.setColumnWidth(2, 100)
14+
self.setColumnWidth(3, 100)
15+
self.horizontalHeader().setStretchLastSection(True)
16+
self.setSortingEnabled(True)
17+
stock_prices_stream.subscribe(self._create_or_update_stock_row)
18+
19+
def _find_matching_row_index(self, stock_row):
20+
matches = self.findItems(stock_row[0], QtCore.Qt.MatchExactly)
21+
if len(matches) == 0:
22+
self.setRowCount(self.rowCount() + 1)
23+
return self.rowCount() - 1
24+
return self.indexFromItem(matches[0]).row()
25+
26+
def _create_or_update_stock_row(self, stock_row):
27+
row = self._find_matching_row_index(stock_row)
28+
29+
column_index = 0
30+
for column in stock_row:
31+
self.setItem(row, column_index, QTableWidgetItem(str(column)))
32+
column_index += 1
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from PyQt5.QtWidgets import QFormLayout, QLabel, QLineEdit, QPushButton, QWidget
2+
3+
from rx.subjects import Subject
4+
5+
from client.transaction_validator import TransactionValidator
6+
7+
class TransactionForm(QWidget):
8+
def __init__(self, *args, **kwargs):
9+
self.orders_stream = Subject()
10+
self._validator = TransactionValidator()
11+
QWidget.__init__(self, *args, **kwargs)
12+
self._setup_form()
13+
self._connect_events()
14+
self._subscribe_to_streams()
15+
16+
def _setup_form(self):
17+
self._symbol_input = QLineEdit()
18+
self._symbol_error_label = QLabel()
19+
self._price_input = QLineEdit()
20+
self._price_error_label = QLabel()
21+
self._ok_button = QPushButton('Post Order')
22+
self._layout = QFormLayout(self)
23+
self._layout.addRow('Symbol:', self._symbol_input)
24+
self._layout.addRow(self._symbol_error_label)
25+
self._layout.addRow('Price:', self._price_input)
26+
self._layout.addRow(self._price_error_label)
27+
self._layout.addRow(self._ok_button)
28+
self._clear_form()
29+
30+
def _connect_events(self):
31+
self._ok_button.clicked.connect(self.submit_order)
32+
self._symbol_input.textChanged.connect(self._symbol_text_changed)
33+
self._price_input.textChanged.connect(self._price_text_changed)
34+
35+
def _subscribe_to_streams(self):
36+
def invalid_field(error):
37+
self.set_error_label(*error)
38+
self._validator.on_error(invalid_field)
39+
40+
def enable_order_submission(order):
41+
self._ok_button.setEnabled(True)
42+
self._validator.on_valid_order(enable_order_submission)
43+
44+
def set_error_label(self, field, text):
45+
label = '_{}_error_label'.format(field)
46+
getattr(self, label).setText(text)
47+
48+
def _clear_form(self):
49+
for widget_name in ['symbol_input', 'symbol_error_label', 'price_input', 'price_error_label']:
50+
widget = '_{}'.format(widget_name)
51+
getattr(self, widget).setText('')
52+
self._ok_button.setEnabled(False)
53+
54+
def submit_order(self):
55+
self.orders_stream.on_next(self._validator.latest_valid_order)
56+
self._clear_form()
57+
58+
def _symbol_text_changed(self):
59+
self._symbol_error_label.setText('')
60+
value = self._symbol_input.text()
61+
self._validator.next_symbol(value)
62+
63+
def _price_text_changed(self):
64+
self._price_error_label.setText('')
65+
price_string = self._price_input.text()
66+
self._validator.next_price(price_string)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import re
2+
3+
from rx import Observable
4+
from rx.subjects import Subject
5+
6+
class TransactionValidator:
7+
def __init__(self):
8+
self._error_stream = Subject()
9+
self._price_stream = Subject()
10+
self._symbol_stream = Subject()
11+
self.latest_valid_order = None
12+
13+
def next_error(self, field, error_text):
14+
self._error_stream.on_next([field, error_text])
15+
16+
# We're going to check if the price text has changed to a valid
17+
# value. If it is an empty string or not a number that's greater
18+
# than 0.0 we're going to emit an error in the error stream. If
19+
# it's valid, we will emit the value to the price stream.
20+
def next_price(self, value):
21+
if len(value) == 0:
22+
self.next_error('price', 'cannot be blank')
23+
else:
24+
try:
25+
price = float(value)
26+
if price < 0.01:
27+
self.next_error('price', 'must be greater than 0.00')
28+
else:
29+
self._price_stream.on_next(price)
30+
except ValueError:
31+
if len(re.sub('[a-zA-Z ]', '', value)) == 0:
32+
self.next_error('price', 'must be a number')
33+
34+
# We're going to check if the symbol text has changed. It's a
35+
# simple check, we want to make sure it isn't an empty string. If
36+
# it's valid, we will emit the value on the symbol stream. If it's
37+
# invalid, we'll emit the error on the error stream.
38+
def next_symbol(self, value):
39+
if len(value) == 0 or len(re.sub('[0-9 ]', '', value)) == 0:
40+
self.next_error('symbol', 'cannot be blank')
41+
else:
42+
self._symbol_stream.on_next(value)
43+
44+
def on_error(self, func):
45+
self._error_stream.subscribe(func)
46+
47+
def on_valid_order(self, func):
48+
# Combine latest will emit items when any of the observables
49+
# have an item to emit. We want the latest valid values that
50+
# the user has entered in the symbol and price input
51+
# boxes. When we have that, we can enable the "submit order"
52+
# button. The form inputs and buttons are manipulated through
53+
# the subscription function that was passed in.
54+
def store_order_and_send_to_subscriber(order):
55+
self.latest_valid_order = order
56+
func(order)
57+
58+
Observable.combine_latest(
59+
self._symbol_stream,
60+
self._price_stream,
61+
lambda symbol, price: { 'symbol': symbol, 'price': price }
62+
).subscribe(store_order_and_send_to_subscriber)

β€ŽSection 6/main2.pyβ€Ž

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import sys
2+
import random
3+
import datetime
4+
import asyncio
5+
from threading import Thread
6+
7+
from PyQt5 import QtCore
8+
from PyQt5.QtWidgets import QApplication
9+
from tornado.ioloop import IOLoop
10+
11+
from rx import Observable
12+
from rx.subjects import Subject
13+
from rx.concurrency import QtScheduler, AsyncIOScheduler
14+
15+
from client.client import Client
16+
from client.client_window import ClientWindow
17+
18+
import utils
19+
20+
if __name__ == '__main__':
21+
app = QApplication(sys.argv)
22+
scheduler = QtScheduler(QtCore)
23+
stock_prices = Subject()
24+
client = Client(port='9999')
25+
26+
loop = asyncio.new_event_loop()
27+
asyncio_scheduler = AsyncIOScheduler(loop)
28+
29+
def run_client():
30+
asyncio.set_event_loop(loop)
31+
asyncio.get_event_loop().call_soon(
32+
utils.run_client_websocket, client, stock_prices)
33+
asyncio.get_event_loop().run_forever()
34+
35+
thread = Thread(target=run_client, daemon=True)
36+
thread.start()
37+
38+
client_window = ClientWindow(stock_prices_stream=stock_prices)
39+
client_window.get_orders() \
40+
.filter(utils.order_is_valid) \
41+
.subscribe(lambda x: print(x), lambda x: print('error'))
42+
client_window.show()
43+
sys.exit(app.exec_())

β€ŽSection 6/main3.pyβ€Ž

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import sys
2+
import random
3+
import datetime
4+
import asyncio
5+
from threading import Thread
6+
7+
from PyQt5 import QtCore
8+
from PyQt5.QtWidgets import QApplication
9+
from tornado.ioloop import IOLoop
10+
11+
from rx import Observable
12+
from rx.subjects import Subject
13+
from rx.concurrency import QtScheduler, AsyncIOScheduler
14+
15+
from client.client import Client
16+
from client.client_window import ClientWindow
17+
18+
import utils
19+
20+
if __name__ == '__main__':
21+
app = QApplication(sys.argv)
22+
scheduler = QtScheduler(QtCore)
23+
stock_prices = Subject()
24+
client = Client(port='9999')
25+
26+
loop = asyncio.new_event_loop()
27+
asyncio_scheduler = AsyncIOScheduler(loop)
28+
29+
def run_client():
30+
asyncio.set_event_loop(loop)
31+
asyncio.get_event_loop().call_soon(
32+
utils.run_client_websocket, client, stock_prices)
33+
asyncio.get_event_loop().run_forever()
34+
35+
thread = Thread(target=run_client, daemon=True)
36+
thread.start()
37+
38+
client_window = ClientWindow(stock_prices_stream=stock_prices)
39+
40+
def send_order(order):
41+
stock_order = {
42+
'stock_symbol': order['symbol'],
43+
'price': order['price'],
44+
'direction': 'buy',
45+
'quantity': 1,
46+
}
47+
utils.write_order(client, stock_order)
48+
49+
client_window.get_orders() \
50+
.filter(utils.order_is_valid) \
51+
.subscribe(send_order)
52+
53+
client_window.show()
54+
sys.exit(app.exec_())

0 commit comments

Comments
 (0)