fintic-tracker/stockTicker.py

292 lines
12 KiB
Python

# Copyright (C) 2020 Daniel Richardson richardson.daniel@hotmail.co.uk
#
# This file is part of stockTicker project for justinodunn.
#
# stockTicker can not be copied and/or distributed without the express
# permission of Daniel Richardson
from twelvedata import TDClient
import sys
import os
import threading
from PIL import Image, ImageDraw, ImageFont
import time
import csv
from rgbmatrix import RGBMatrix, RGBMatrixOptions
class StockTicker():
def __init__(self):
#Define global resources
self.symbols = []
#self.ApiKeys = [["b1be6696d54342bdb5c8f59a052854fd","237d183faf984e7eba708a647c55153a","3012b3cb19784ed4be2b02e9907e53cb"],["f835e85df2a74df1904fe6dc53bcc17a","41ec268496744c34a96e3c408081300b","2d566fc71ade46d688713b13730c5219"]]
self.ApiKeys = [["6172b84d1f464ad88b95ed52e4391bce","979f9b8c9ee748fbbb57cb490ccaaf42","593fa818a44144699b75433aafd92d15"]]
self.retrievedList = []
self.greenORred = (255, 255, 255)
self.ListStocks = []
self.blank = Image.open('logos/blank.png')
self.ApiCalledError = False
self.keySwapper = 0
self.displayTime = 60
self.speedDisplay = 25
self.running = True
self.Delay = 20
self.parseCSV = 0
self.symbolLength = 0
self.brightness = 1.0
# Configuration for the matrix
options = RGBMatrixOptions()
options.rows = 64
options.cols = 64
options.chain_length = 1
options.parallel = 1
options.hardware_mapping = 'adafruit-hat' # If you have an Adafruit HAT: 'adafruit-hat'
options.gpio_slowdown = 4
self.matrix = RGBMatrix(options = options)
def openImage(self, image_file):
image = Image.open(image_file)
# Make image fit our screen.
#image.thumbnail((self.matrix.width, self.matrix.height), Image.ANTIALIAS)
image = image.convert('RGB')
return image
def SetImage(self, image, pos_x = 0, pos_y = 0, offset_x = 0, offset_y = 0, unsafe=True):
if (image.mode != "RGB"):
raise Exception("Currently, only RGB mode is supported for SetImage(). Please create images with mode 'RGB' or convert first with image = image.convert('RGB'). Pull requests to support more modes natively are also welcome :)")
if unsafe:
#In unsafe mode we directly access the underlying PIL image array
#in cython, which is considered unsafe pointer accecss,
#however it's super fast and seems to work fine
#https://groups.google.com/forum/#!topic/cython-users/Dc1ft5W6KM4
img_width, img_height = image.size
self.matrix.SetPixelsPillow(offset_x, offset_y, img_width, img_height, image)
else:
# First implementation of a SetImage(). OPTIMIZE_ME: A more native
# implementation that directly reads the buffer and calls the underlying
# C functions can certainly be faster.
img_width, img_height = image.size
pixels = image.load()
for x in range(max(0, -offset_x), min(img_width, self.matrix.width - offset_x)):
for y in range(max(0, -offset_y), min(img_height, self.matrix.height - offset_y)):
(r, g, b) = pixels[x, y]
self.matrix.SetPixel(x + offset_x, y + offset_y, r*brightness, g*brightness, b*brightness)
def ScrollImage(self, image_file, pos_x = 0, pos_y = 0, delay = 0.05):
image = self.openImage(image_file)
i = 0
while True:
self.SetImage(image, offset_x = -i, offset_y = 0)
time.sleep(delay)
i += 1
#Get the logo from file that is the same as ticker name
def getLogo(self, Ticker):
try:
Logo = Image.open('logos/' + Ticker + '.png')
except:
print('Could not find logo for: ' + Ticker + ' using default')
Logo = Image.open('logos/default.png')
return Logo
#Using change between min and day price give appropriate arrow
#and set the overall change colour
def getArrow(self, CHANGE):
self.greenORred
if(CHANGE>0):
Arrow = Image.open('logos/up.png')
self.greenORred = (0, 255, 0)
return Arrow, CHANGE
Arrow = Image.open('logos/down.png')
self.greenORred = (255, 0, 0)
CHANGE = (CHANGE * -1)
return Arrow, CHANGE
def get_text_dimensions(self, text_string, font):
canvas = Image.new('RGB', (100,100))
draw = ImageDraw.Draw(canvas)
monospace = font
text = text_string
white = (255,255,255)
draw.text((10, 10), text, font=monospace, fill=white)
bbox = canvas.getbbox()
width = bbox[2]-bbox[0]
height = bbox[3]-bbox[1]
return width,height
#Draw Ticker, current and change onto one image
def textToImage(self, TICKER, CURRENT, CHANGE, ARROW):
font = ImageFont.load("7x14.pil")
img = Image.new('RGB', (150, 32))
d = ImageDraw.Draw(img)
d.text((4, -4), TICKER, fill=(255, 255, 255), font=font)
d.text((4, 13), CURRENT, fill=self.greenORred, font=font)
text_width_current, text_height = self.get_text_dimensions(CURRENT, font)
img.paste(ARROW, ((text_width_current + 9),18))
d.text(((text_width_current+29), 13), CHANGE, fill=self.greenORred, font=font)
text_width_change, text_height = self.get_text_dimensions(CHANGE, font)
newWidth = (text_width_current+29) + (text_width_change)
img.crop((0,0,newWidth,32))
return img
#Stitch the logo & prices picture into one image
def stitchImage(self, IMAGES):
widths, heights = zip(*(i.size for i in IMAGES))
total_width = sum(widths)
max_height = max(heights)
new_im = Image.new('RGB', (total_width, max_height))
x_offset = 0
for im in IMAGES[0:2]:
new_im.paste(im, (x_offset,0))
x_offset += im.size[0]
return new_im
#Display the image onto the matrix
def displayImage(self, IMAGE):
print('k')
return 0
#Get current prices and day prices for the ticker then format the response into an array
def getStockPrices(self, APIKEY, SYMBOLS):
APIKEY = "237d183faf984e7eba708a647c55153a"
try:
symbols_list = SYMBOLS.split(",")
td = TDClient(apikey=APIKEY)
tmin = td.time_series(symbol=SYMBOLS, interval="1min", outputsize='1').as_json()
tday = td.time_series(symbol=SYMBOLS, interval="1day", outputsize='2').as_json()
for i in range(len(symbols_list)):
currentMinPriceList = tmin[symbols_list[i]]
currentDayPriceList = list(tday[symbols_list[i]])
del currentDayPriceList[0]
currentMinPriceClose = tmin[symbols_list[i]][0]['close']
currentDayPriceClose = currentDayPriceList[0]['close']
self.retrievedList.append([])
self.retrievedList[i].append(symbols_list[i])
self.retrievedList[i].append(currentMinPriceClose)
self.retrievedList[i].append(currentDayPriceClose)
except:
self.ApiCalledError = True
print("Could not fetch data - API CALLS REACHED? - Will display old image")
#Connect all the pieces togeather creating 1 long final stock image
def GetfullStockImage(self):
self.ApiCalledError = False
self.ListStocks = []
self.retrievedList = []
start = time.time()
print('elapsed time: ' + str((time.time()-start)))
j=0
while (j<3):
print('API KEY: ' + str(self.ApiKeys[self.keySwapper][j]))
# NT: this weird API key thing doesnt work, when I hard code the API key in the above function it does
self.getStockPrices(self.ApiKeys[self.keySwapper][j], self.symbols[(j+self.parseCSV)][0])
if (self.ApiCalledError == False):
j+=1
print('elapsed time after api call: ' + str((time.time()-start)))
for i in range(len(self.retrievedList)):
print('elapsed timethrough loop: ' + str((time.time()-start)))
Change = float(self.retrievedList[i][1])-float(self.retrievedList[i][2]) #TEXT
Ticker = self.retrievedList[i][0] #TEXT
Current = '%.2f' % float(self.retrievedList[i][1]) #TEXT
Logo = self.getLogo(self.retrievedList[i][0])
Arrow, Change = self.getArrow(Change)
Change = '%.2f' % Change
MidFrame = self.textToImage(Ticker, Current, Change, Arrow) #IMAGE THE TEXT
StitchedStock = self.stitchImage([Logo,MidFrame])
self.ListStocks.append(self.blank)
self.ListStocks.append(StitchedStock)
self.retrievedList = []
else:
break
print('elapsed time loop ended: ' + str((time.time()-start)))
if (self.ApiCalledError == False):
FinalDisplayImage = self.stitchImage(self.ListStocks)
FinalDisplayImage.save('final.ppm')
print(self.retrievedList)
#Send the final stitched image to the display for set amount of time
def displayMatrix(self):
#os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=32 --led-cols=256")
os.system("sudo ./demo -D1 final.ppm -t " + str(self.displayTime) +" -m "+ str(self.speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=64 --led-cols=64 --led-slowdown-gpio=4 ")
#Retrieve symbols from the CSV file
def GetSymbols(self):
self.symbols = []
CSV = csv.reader(open('csv/tickers.csv', 'r'))
z = 0
for row in CSV:
self.symbols.append([])
endLine = str(row[0]+','+row[1]+','+row[2]+','+row[3]+','+row[4]+','+row[5]+','+row[6]+','+row[7]+','+row[8]+','+row[9]+','+row[10])
self.symbols[z].append(endLine)
z+=1
print(self.symbols)
#Main run definition called by server
def runStockTicker(self, RUNTIME, DELAY, SPEEDTIME):
self.parseCSV = 0
self.Delay = DELAY
self.displayTime = RUNTIME
self.speedDisplay = SPEEDTIME
self.GetSymbols()
self.symbolLength = (len(self.symbols) - 3)
self.GetfullStockImage()
self.keySwapper += 1
self.running = True
self.parseCSV += 3
while (True):
if (self.running == True):
if (self.keySwapper<=1):
if(self.parseCSV <= self.symbolLength):
th = threading.Thread(target=self.displayMatrix)
th.start()
#displayMatrix()
time.sleep((int(self.displayTime) - int(self.Delay)))
self.GetfullStockImage()
self.keySwapper += 1
th.join()
self.parseCSV += 3
else:
self.parseCSV = 0
else:
self.keySwapper = 0
else:
break;
#Change running to false stopping refresh at next checkpoint
def stopStockTicker(self):
self.keySwapper = 0
self.running = False
print('MATRIX DISPLAY STOP CALLED')