Following a number of experimentations with Prometheus and Grafana for the purposes of collecting and viewing logs off internal systems, I decided to put together a proof of concept for something more visually fun.
The objective is to subscribe to a market data feed and display historic prices in the form of a graph. Needed for this are:
1. Market data - Alpaca offers a free data API. This requires an active trading account but no cash deposit is necessary. The data is supplied by 3rd party vendors which would be a factor if we were building strategies, but for display purposes this would be perfectly sufficient
2. Visualization - the choices are numerous, from Python libraries like Matplotlib and Plotly to dashboards by Kibana, but the aim is to demonstrate the Grafana's ease of use
3. Data store - PostgreSQL is integrated into Grafana and would work as sufficient data store
4. Host - using a Kubernetes cluster on Amazon Elastic Kubernetes Service (AWS EKS) would help decouple services into individual components as well as serve as effective hosting option
IMPLEMENTATION
Those in need of a refresh on the Kubernetes concepts, please see this article.
EKS CLUSTER
The prerequisites for this are: an account with AWS, AWS command line, kubectl and eksctl. A cluster can be initiated using the below yaml: eksctl create cluster -f cluster.yaml.
Note that we are intentionally using a single node group on t2.medium EC2 instance. This is to save on hosting fees. We are not yet concerned with resilience as this is a demo exercise - thus only a single instance to host all services. The size is of concern as well - t2.medium offers greater number of private IPs than micro and nano as thus more suitable to host a multi-service cluster.
metadata:
name: marketeye-cluster
region: us-east-2
nodeGroups:
- name: ng-1
instanceType: t2.medium
desiredCapacity: 1
kind: ConfigMap
metadata:
name: postgres-config
labels:
app: postgres
data:
POSTGRES_DB: postgresdb
POSTGRES_USER: postgresadmin
POSTGRES_PASSWORD: admin123
Define a volume in postrges-vol.yaml - persistent storage outside of the pod that helps preserve the data in case of the database pod loss. The two kinds below stand for: PersistentVolumeClaim – a dedicated resource that represents a request for storage of specific size (i.e. WHAT we want). PersistentVolume – represents the actual storage in the cluster (i.e. HOW we want it implemented); can specify path and size as well.
apiVersion: v1
metadata:
name: postgres-pv-volume
labels:
type: local
app: postgres
spec:
storageClassName: manual
capacity:
storage: 1G
accessModes:
- ReadWriteMany
hostPath:
path: "/mnt/data"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: postgres-pv-claim
labels:
app: postgres
spec:
storageClassName: manual
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1G
Add a postrges-svc.yaml defining the Service - a networking abstract that exposes an application running on a pod. The type of ClusterIP indicates that the service is to be available to requests originating within the cluster only.
metadata:
name: postgres
labels:
app: postgres
spec:
type: ClusterIP
ports:
- port: 5432
name: postgres-serv
selector:
app: postgres
Add a postrges-deployment.yaml defining a Deployment - the mechanism of deploying pods that provides auto scaling and resource availability. image below maps to the depo containing the latest PostgreSQL image. postgres-config maps to the config defined previously. Note the Service needed to be defined and applied prior to Deployment if these are loaded via distinct yaml files with a "kubectl apply -f" per each file.
kind: Deployment
metadata:
name: postgres
spec:
selector:
matchLabels:
app: postgres
replicas: 1
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:latest
imagePullPolicy: "IfNotPresent"
ports:
- containerPort: 5432
envFrom:
- configMapRef:
name: postgres-config
env:
- name: POSTGRES_HOST
value: postgres-service
- name: POSTGRES_PORT
value: "5432"
volumeMounts:
- mountPath: /var/lib/postgresql/data
name: postgredb
volumes:
- name: postgredb
persistentVolumeClaim:
claimName: postgres-pv-claim
GRAFANA
Grafana source code is open for all to explore. The Docker-friendly installation instructions are available here. The below grafana.yaml was derived from this very worthy post. As with Postgres above, there is a Service for connectivity and a Deployment for pod management.
kind: Deployment
metadata:
labels:
app: grafana
name: grafana-deployment
namespace: default
spec:
replicas: 1
selector:
matchLabels:
component: grafana
template:
metadata:
labels:
component: grafana
spec:
volumes:
- name: grafana-claim
persistentVolumeClaim:
claimName: grafana-claim
containers:
- name: grafana
image: grafana/grafana:5.0.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 3000
resources:
limits:
cpu: 500m
memory: 2500Mi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- mountPath: /var/lib/grafana
name: grafana-claim
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
creationTimestamp: null
labels:
component: grafana
name: grafana-claim
namespace: default
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: grafana-ip-service
namespace: default
spec:
type: LoadBalancer
selector:
component: grafana
ports:
- port: 3000
targetPort: 3000
ALPACA API
- Code to interrogate the API and write data into the Postgres DB
- OS and all dependencies needed for the code to run
- Secret containing API credentials
Alpaca offer various technologies for API interaction: C#, Go, Javascript, Python. A number of Python examples are available AlpacaHQ Git repo. Streaming live price updates are available via Websockets as described here.
The below Dockerfile describes the container:
RUN apt-get update
COPY requirements.txt
RUN pip install -r /requirements.txt
RUN useradd --create-home alpaca
WORKDIR /home/alpaca
USER alpaca
COPY alpaca-bot.py .
COPY alpaca_db.py .
ENTRYPOINT [ "python3", "alpaca-bot.py" ]
alpaca-bot.py and alpaca_db.py are the code files residing in the local directory. requirements.txt contains references to the below libraries that need to be installed on the pod:
psycopg2-binary
A secrets.yaml with API credentials obtained from Alpaca needs to be applied before the container is deployed:
kind: Secret
metadata:
name: api-secret
data:
api_key_id: <.....>
api_secret_key: <.....>
api_base_url: <.....>
Define the database class in alpaca_db.py:
from psycopg2 import Error
class PostgresDB():
def __init__(self, db, user, pwd, host, port):
self.conn = psycopg2.connect(database=db, user=user, password=pwd, host=host, port=port)
self.conn.autocommit = True
self.cur = self.conn.cursor()
def execute(self, statement):
try:
self.cur.execute(statement)
except (Exception, Error) as error:
print("Error while connecting to PostgreSQL:", error)
def fetch(self, statement):
try:
self.cur.execute(statement)
rows = self.cur.fetchall()
for row in rows:
print(row)
except (Exception, Error) as error:
print("Error while connecting to PostgreSQL:", error)
def close(self):
self.cur.close()
self.conn.close()
Add the Alpaca stream subscription code in alpaca_bot.py - in part based on the post here.
import threading
from alpaca_trade_api import StreamConn
import datetime
import os
import asyncio
import alpaca_db
# Get environment variables
key_id = os.getenv('APCA_API_KEY_ID')
secret_key = os.environ.get('APCA_API_SECRET_KEY')
base_url = os.environ.get('APCA_API_BASE_URL')
data_stream = 'alpacadatav1'
ws_url = 'wss://data.alpaca.markets'
database = os.environ['POSTGRES_DB']
dbUser = os.environ['POSTGRES_USER']
dbPwd = os.environ['POSTGRES_PASSWORD']
dbHost = os.environ['POSTGRES_SERVICE_HOST']
dbPort = os.environ['POSTGRES_SERVICE_PORT']
script_start = datetime.datetime.now()
symbol = ['MSFT']
# Initialise and connect to the database
dbInstance = alpaca_db.PostgresDB(database, dbUser, dbPwd, dbHost, dbPort)
# Connect to the Alpaca API
connApi = tradeapi.stream2.StreamConn(
key_id, secret_key, base_url=base_url, data_url=ws_url, data_stream=data_stream
)
async def handle_signal(channel, symbol, value):
if (datetime.datetime.now() - script_start).seconds > 30:
print('Closing connection for ' + str(symbol))
await connApi.unsubscribe([r'^T.' + str(symbol), r'^AM.' + str(symbol)])
dbInstance.close()
else:
print(str(channel) + '.' + str(symbol))
print(str(value))
print(' ')
# T stands for a trade event
@connApi.on(r'^T.*$', symbol)
async def on_trade(connApi, channel, trade):
symbol = str(trade.symbol)
price = float(trade.price)
print('-- T Hit --')
timeNow = datetime.datetime.now()
command = "INSERT INTO TRADE_AUDIT(ID, SYMBOL, TRADE_PRICE) VALUES ('%s', '%s', %s);" % (timeNow, symbol, price)
await handle_signal(channel, symbol, price)
async def kickoff_trades():
if __name__ == '__main__':
dbInstance.execute('CREATE TABLE TRADE_AUDIT(\
ID TEXT NOT NULL,\
SYMBOL TEXT NOT NULL,\
TRADE_PRICE FLOAT NOT NULL,\
PRIMARY KEY( ID ));')
loop = asyncio.new_event_loop()
loop.create_task(kickoff_trades())
loop.run()
The image containing all of the above can be build and pushed up to the custom Docker hub repo (<.....>) using the commands below:
sudo docker build -f Dockerfile -t alpaca-app:latest .
sudo docker tag <.build id produced in the step above.> <.....>/alpaca-app:latest
sudo docker push <.....>/alpaca-app
kind: Deployment
metadata:
name: alpaca
labels:
app: alpaca-app
spec:
replicas: 1
selector:
matchLabels:
app: alpaca-app
template:
metadata:
labels:
app: alpaca-app
spec:
containers:
- name: alpaca-app
image: <.....>/alpaca-app
envFrom:
- configMapRef:
name: postgres-config
env:
- name: APCA_API_KEY_ID
valueFrom:
secretKeyRef:
name: api-secret
key: api_key_id
- name: APCA_API_SECRET_KEY
valueFrom:
secretKeyRef:
name: api-secret
key: api_secret_key
- name: APCA_API_BASE_URL
valueFrom:
secretKeyRef:
name: api-secret
key: api_base_url
nodeSelector:
beta.kubernetes.io/os: linux
Once applied, do kubectl get all to see all components deployed or kubectl get scv to only see the services. See EXTERNAL-IP and PORT(S) column values in the grafane-ip-service row below - that is the URL and the port of the load balancer service residing on top of Grafana. In addition, CLUSTER-IP value of postgres service is the internal in-cluster IP of the database insatnce.
No comments:
Post a Comment