Tuesday, November 10, 2020

EKS - Alpaca market data on Grafana and PostgreSQL

Following a number of experimentations with Prometheus and Grafana for the purposes of collecting and viewing logs off internal systems, I decided to put together a proof of concept for something more visually fun.

The objective is to subscribe to a market data feed and display historic prices in the form of a graph. Needed for this are:

1. Market data - Alpaca offers a free data API. This requires an active trading account but no cash deposit is necessary. The data is supplied by 3rd party vendors which would be a factor if we were building strategies, but for display purposes this would be perfectly sufficient

2. Visualization - the choices are numerous, from Python libraries like Matplotlib and Plotly to dashboards by Kibana, but the aim is to demonstrate the Grafana's ease of use

3. Data store - PostgreSQL is integrated into Grafana and would work as  sufficient data store

4. Host - using a Kubernetes cluster on Amazon Elastic Kubernetes Service (AWS EKS) would help decouple services into individual components as well as serve as effective hosting option

IMPLEMENTATION

Those in need of a refresh on the Kubernetes concepts, please see this article.

EKS CLUSTER

The prerequisites for this are: an account with AWS, AWS command line, kubectl and eksctl. A cluster can be initiated using the below yaml: eksctl create cluster -f cluster.yaml. 

Note that we are intentionally using a single node group on t2.medium EC2 instance. This is to save on hosting fees. We are not yet concerned with resilience as this is a demo exercise - thus only a single instance to host all services. The size is of concern as well - t2.medium offers greater number of private IPs than micro and nano as thus more suitable to host a multi-service cluster.

kind: ClusterConfig
 
metadata:
  name: marketeye-cluster
  region: us-east-2
 
nodeGroups:
  - name: ng-1
    instanceType: t2.medium
    desiredCapacity: 1

PosgreSQL NODE

We firstly create a postrges-conf.yaml with ConfigMap to expose the database credentials. These will become available as environment variables on other pods.

apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-config
  labels:
    app: postgres
data:
  POSTGRES_DB: postgresdb
  POSTGRES_USER: postgresadmin
  POSTGRES_PASSWORD: admin123

Define a volume in postrges-vol.yaml - persistent storage outside of the pod that helps preserve the data in case of the database pod loss. The two kinds below stand for: PersistentVolumeClaim – a dedicated resource that represents a request for storage of specific size (i.e. WHAT we want). PersistentVolume – represents the actual storage in the cluster (i.e. HOW we want it implemented); can specify path and size as well.

kind: PersistentVolume
apiVersion: v1
metadata:
  name: postgres-pv-volume
  labels:
    type: local
    app: postgres
spec:
  storageClassName: manual
  capacity:
    storage: 1G
  accessModes:
    - ReadWriteMany
  hostPath:
    path: "/mnt/data"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: postgres-pv-claim
  labels:
    app: postgres
spec:
  storageClassName: manual
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 1G

Add a postrges-svc.yaml defining the Service - a networking abstract that exposes an application running on a pod. The type of ClusterIP indicates that the service is to be available to requests originating within the cluster only.

kind: Service
metadata:
  name: postgres
  labels:
    app: postgres
spec:
  type: ClusterIP
  ports:
   - port: 5432
     name: postgres-serv
  selector:
   app: postgres

Add a postrges-deployment.yaml defining a Deployment - the mechanism of deploying pods that provides auto scaling and resource availability. image below maps to the depo containing the latest PostgreSQL imagepostgres-config maps to the config defined previously. Note the Service needed to be defined and applied prior to Deployment if these are loaded via distinct yaml files with a "kubectl apply -f" per each file.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
spec:
  selector:
    matchLabels:
      app: postgres
  replicas: 1
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
        - name: postgres
          image: postgres:latest
          imagePullPolicy: "IfNotPresent"
          ports:
            - containerPort: 5432
          envFrom:
            - configMapRef:
                name: postgres-config
          env:
            - name: POSTGRES_HOST
              value: postgres-service
            - name: POSTGRES_PORT
              value: "5432"
          volumeMounts:
            - mountPath: /var/lib/postgresql/data
              name: postgredb
      volumes:
        - name: postgredb
          persistentVolumeClaim:
            claimName: postgres-pv-claim

GRAFANA

Grafana source code is open for all to explore. The Docker-friendly installation instructions are available hereThe below grafana.yaml was derived from this very worthy post. As with Postgres above, there is a Service for connectivity and a Deployment for pod management.

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: grafana
  name: grafana-deployment
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      component: grafana
  template:
    metadata:
      labels:
        component: grafana
    spec:
      volumes:
      - name: grafana-claim
        persistentVolumeClaim:
          claimName: grafana-claim
      containers:
      - name: grafana
        image: grafana/grafana:5.0.0
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 3000
        resources:
          limits:
            cpu: 500m
            memory: 2500Mi
          requests:
            cpu: 100m
            memory: 100Mi
        volumeMounts:
        - mountPath: /var/lib/grafana
          name: grafana-claim
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  creationTimestamp: null
  labels:
    component: grafana
  name: grafana-claim
  namespace: default
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi
---
apiVersion: v1
kind: Service
metadata:
  name: grafana-ip-service
  namespace: default
spec:
  type: LoadBalancer
  selector:
    component: grafana
  ports:
  - port: 3000
    targetPort: 3000

ALPACA API

Since we are deploying on Kubernetes, we need a custom container with the following:
- Code to interrogate the API and write data into the Postgres DB
- OS and all dependencies needed for the code to run
- Secret containing API credentials

Alpaca offer various technologies for API interaction: C#, Go, Javascript, Python. A number of Python examples are available AlpacaHQ Git repo. Streaming live price updates are available via Websockets as described here.

The below Dockerfile describes the container: 

FROM python:3.7.9
RUN apt-get update
COPY requirements.txt
RUN pip install -r /requirements.txt
 
RUN useradd --create-home alpaca
WORKDIR /home/alpaca
USER alpaca
 
COPY alpaca-bot.py .
COPY alpaca_db.py .
 
ENTRYPOINT [ "python3", "alpaca-bot.py" ]

alpaca-bot.py and alpaca_db.py are the code files residing in the local directory. requirements.txt contains references to the below libraries that need to be installed on the pod:

alpaca-trade-api
psycopg2-binary

secrets.yaml with API credentials obtained from Alpaca needs to be applied before the container is deployed:

apiVersion: v1
kind: Secret
metadata:
  name: api-secret
data:
  api_key_id: <.....>
  api_secret_key: <.....>
  api_base_url: <.....>

Define the database class in alpaca_db.py:

import psycopg2
from psycopg2 import Error
 
class PostgresDB():
    def __init__(self, db, user, pwd, host, port):
        self.conn = psycopg2.connect(database=db, user=user, password=pwd, host=host, port=port)
        self.conn.autocommit = True
        self.cur = self.conn.cursor()
 
    def execute(self, statement):
        try:
            self.cur.execute(statement)
        except (Exception, Error) as error:
            print("Error while connecting to PostgreSQL:", error)
 
    def fetch(self, statement):
        try:
            self.cur.execute(statement)
            rows = self.cur.fetchall()
            for row in rows:
                print(row)
        except (Exception, Error) as error:
            print("Error while connecting to PostgreSQL:", error)
 
    def close(self):
        self.cur.close()
        self.conn.close()

Add the Alpaca stream subscription code in alpaca_bot.py - in part based on the post here

import alpaca_trade_api as tradeapi
import threading
from alpaca_trade_api import StreamConn
import datetime
import os
import asyncio
import alpaca_db
 
# Get environment variables
key_id = os.getenv('APCA_API_KEY_ID')
secret_key = os.environ.get('APCA_API_SECRET_KEY')
base_url =  os.environ.get('APCA_API_BASE_URL')
data_stream = 'alpacadatav1'
ws_url = 'wss://data.alpaca.markets'
 
database = os.environ['POSTGRES_DB']
dbUser = os.environ['POSTGRES_USER']
dbPwd = os.environ['POSTGRES_PASSWORD']
dbHost = os.environ['POSTGRES_SERVICE_HOST']
dbPort = os.environ['POSTGRES_SERVICE_PORT']
 
script_start = datetime.datetime.now()
symbol = ['MSFT']
 
# Initialise and connect to the database
dbInstance = alpaca_db.PostgresDB(database, dbUser, dbPwd, dbHost, dbPort)
 
# Connect to the Alpaca API
connApi = tradeapi.stream2.StreamConn(
    key_id, secret_key, base_url=base_url, data_url=ws_url, data_stream=data_stream
)
 
# Extend the connection interval as needed
async def handle_signal(channel, symbol, value):
    if (datetime.datetime.now() - script_start).seconds > 30:
        print('Closing connection for ' + str(symbol))
        await connApi.unsubscribe([r'^T.' + str(symbol), r'^AM.' + str(symbol)])
        dbInstance.close()
    else:
        print(str(channel) + '.' + str(symbol))
        print(str(value))
        print(' ')
 
# T stands for a trade event
@connApi.on(r'^T.*$', symbol)
async def on_trade(connApi, channel, trade):
    symbol = str(trade.symbol)
    price = float(trade.price)
    print('-- T Hit --')
    timeNow = datetime.datetime.now()
    command = "INSERT INTO TRADE_AUDIT(ID, SYMBOL, TRADE_PRICE) VALUES ('%s', '%s', %s);" % (timeNow, symbol, price)
    dbInstance.execute(command)
    await handle_signal(channel, symbol, price)
 
 
async def kickoff_trades():
    await connApi.subscribe("T.('%s')" % (symbol))
 
 
if __name__ == '__main__':
    dbInstance.execute('CREATE TABLE TRADE_AUDIT(\
            ID          TEXT  NOT NULL,\
            SYMBOL      TEXT  NOT NULL,\
            TRADE_PRICE FLOAT NOT NULL,\
            PRIMARY KEY( ID ));')
    loop = asyncio.new_event_loop()
    loop.create_task(kickoff_trades())
    loop.run()

The image containing all of the above can be build and pushed up to the custom Docker hub repo (<.....>) using the commands below:

sudo docker login --username=<.....> --password <.....>
sudo docker build -f Dockerfile -t alpaca-app:latest .
sudo docker tag <.build id produced in the step above.> <.....>/alpaca-app:latest
sudo docker push <.....>/alpaca-app

And, finally, the last piece to tie it all together - alpaca-app.yaml - the yaml to deploy the custom Alpaca app. This pulls in the application image from the custom Docker hub repo (fill in the name in place of <.....>). It also maps in the database config (configMapRef) and the Alpaca credentials (env).

apiVersion: apps/v1
kind: Deployment
metadata:
  name: alpaca
  labels:
    app: alpaca-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: alpaca-app
  template:
    metadata:
      labels:
        app: alpaca-app
    spec:
      containers:
      - name: alpaca-app
        image: <.....>/alpaca-app
        command: ["sleep", "123456"]
        envFrom:
          - configMapRef:
              name: postgres-config
        env:
        - name: APCA_API_KEY_ID
          valueFrom:
            secretKeyRef:
              name: api-secret
              key: api_key_id
        - name: APCA_API_SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: api-secret
              key: api_secret_key
        - name: APCA_API_BASE_URL
          valueFrom:
            secretKeyRef:
              name: api-secret
              key: api_base_url
      nodeSelector:
        beta.kubernetes.io/os: linux


DEPLOY IT ALL !

kubectl apply -f secrets.yaml
kubectl apply -f postrges-conf.yaml
kubectl apply -f postrges-vol.yaml
kubectl apply -f postrges-svc.yaml
kubectl apply -f postrges-deployment.yaml
kubectl apply -f grafana.yaml
kubectl apply -f alpaca-app.yaml

Once applied, do kubectl get all to see all components deployed or kubectl get scv to only see the services. See EXTERNAL-IP and PORT(S) column values in the grafane-ip-service row below - that is the URL and the port of the load balancer service residing on top of Grafana. In addition, CLUSTER-IP value of postgres service is the internal in-cluster IP of the database insatnce.


Log into Grafana using the above URL and "admin/admin" credential. Make sure to change the admin password if needed. Define a Data Source using PostgresSQL as Type, the above service IP as Host and database login credentials as defined in postrges-conf.yaml.


Create a new Dashboard selecting the data source defined above and a simple SQL statement to pull data in. 


No comments:

Post a Comment