fintic-tracker/stockTicker.py

239 lines
8.0 KiB
Python

# Copyright (C) 2020 Daniel Richardson richardson.daniel@hotmail.co.uk
#
# This file is part of stockTicker project for justinodunn.
#
# stockTicker can not be copied and/or distributed without the express
# permission of Daniel Richardson
from twelvedata import TDClient
import sys
import os
import threading
from PIL import Image, ImageDraw, ImageFont
import time
import csv
#Define global resources
symbols = []
ApiKeys = [["b1be6696d54342bdb5c8f59a052854fd","237d183faf984e7eba708a647c55153a","3012b3cb19784ed4be2b02e9907e53cb"],["f835e85df2a74df1904fe6dc53bcc17a","41ec268496744c34a96e3c408081300b","2d566fc71ade46d688713b13730c5219"]]
#ApiKeys = [["6172b84d1f464ad88b95ed52e4391bce","979f9b8c9ee748fbbb57cb490ccaaf42","593fa818a44144699b75433aafd92d15"]
retrievedList = []
greenORred = (255, 255, 255)
ListStocks = []
blank = Image.open('logos/blank.png')
ApiCalledError = False
keySwapper = 0
displayTime = 60
speedDisplay = 25
running = True
Delay = 20
parseCSV = 0
symbolLength = 0
#Get the logo from file that is the same as ticker name
def getLogo(Ticker):
try:
Logo = Image.open('logos/' + Ticker + '.png')
except:
print('Could not find logo for: ' + Ticker + ' using default')
Logo = Image.open('logos/default.png')
return Logo
#Using change between min and day price give appropriate arrow
#and set the overall change colour
def getArrow(CHANGE):
global greenORred
if(CHANGE>0):
Arrow = Image.open('logos/up.png')
greenORred = (0, 255, 0)
return Arrow, CHANGE
Arrow = Image.open('logos/down.png')
greenORred = (255, 0, 0)
CHANGE = (CHANGE * -1)
return Arrow, CHANGE
def get_text_dimensions(text_string, font):
canvas = Image.new('RGB', (100,100))
draw = ImageDraw.Draw(canvas)
monospace = font
text = text_string
white = (255,255,255)
draw.text((10, 10), text, font=monospace, fill=white)
bbox = canvas.getbbox()
width = bbox[2]-bbox[0]
height = bbox[3]-bbox[1]
return width,height
#Draw Ticker, current and change onto one image
def textToImage(TICKER, CURRENT, CHANGE, ARROW):
font = ImageFont.truetype("EncodeSans.ttf", 18)
img = Image.new('RGB', (150, 32))
d = ImageDraw.Draw(img)
d.text((4, -4), TICKER, fill=(255, 255, 255), font=font)
d.text((4, 13), CURRENT, fill=greenORred, font=font)
text_width_current, text_height = get_text_dimensions(CURRENT, font)
img.paste(ARROW, ((text_width_current + 9),18))
d.text(((text_width_current+29), 13), CHANGE, fill=greenORred, font=font)
text_width_change, text_height = get_text_dimensions(CHANGE, font)
newWidth = (text_width_current+29) + (text_width_change)
img.crop((0,0,newWidth,32))
return img
#Stitch the logo & prices picture into one image
def stitchImage(IMAGES):
widths, heights = zip(*(i.size for i in IMAGES))
total_width = sum(widths)
max_height = max(heights)
new_im = Image.new('RGB', (total_width, max_height))
x_offset = 0
for im in IMAGES[0:2]:
new_im.paste(im, (x_offset,0))
x_offset += im.size[0]
return new_im
#Display the image onto the matrix
def displayImage(IMAGE):
print('k')
return 0
#Get current prices and day prices for the ticker then format the response into an array
def getStockPrices(APIKEY, SYMBOLS):
global ApiCalledError
try:
symbols_list = SYMBOLS.split(",")
td = TDClient(apikey=APIKEY)
tmin = td.time_series(symbol=SYMBOLS, interval="1min", outputsize='1').as_json()
tday = td.time_series(symbol=SYMBOLS, interval="1day", outputsize='2').as_json()
for i in range(len(symbols_list)):
currentMinPriceList = tmin[symbols_list[i]]
currentDayPriceList = list(tday[symbols_list[i]])
del currentDayPriceList[0]
currentMinPriceClose = tmin[symbols_list[i]][0]['close']
currentDayPriceClose = currentDayPriceList[0]['close']
retrievedList.append([])
retrievedList[i].append(symbols_list[i])
retrievedList[i].append(currentMinPriceClose)
retrievedList[i].append(currentDayPriceClose)
except:
ApiCalledError = True
print("Could not fetch data - API CALLS REACHED? - Will display old image")
#Connect all the pieces togeather creating 1 long final stock image
def GetfullStockImage():
global ListStocks
global retrievedList
global ApiCalledError
ApiCalledError = False
ListStocks = []
retrievedList = []
start = time.time()
print('elapsed time: ' + str((time.time()-start)))
j=0
while (j<3):
print('API KEY: ' + str(ApiKeys[keySwapper][j]))
getStockPrices(ApiKeys[keySwapper][j], symbols[(j+parseCSV)][0])
if (ApiCalledError == False):
j+=1
print('elapsed time after api call: ' + str((time.time()-start)))
for i in range(len(retrievedList)):
print('elapsed timethrough loop: ' + str((time.time()-start)))
Change = float(retrievedList[i][1])-float(retrievedList[i][2]) #TEXT
Ticker = retrievedList[i][0] #TEXT
Current = '%.2f' % float(retrievedList[i][1]) #TEXT
Logo = getLogo(retrievedList[i][0])
Arrow, Change = getArrow(Change)
Change = '%.2f' % Change
MidFrame = textToImage(Ticker, Current, Change, Arrow) #IMAGE THE TEXT
StitchedStock = stitchImage([Logo,MidFrame])
ListStocks.append(blank)
ListStocks.append(StitchedStock)
retrievedList = []
else:
break
print('elapsed time loop ended: ' + str((time.time()-start)))
if (ApiCalledError == False):
FinalDisplayImage = stitchImage(ListStocks)
FinalDisplayImage.save('final.ppm')
print(retrievedList)
#Send the final stitched image to the display for set amount of time
def displayMatrix():
#os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=32 --led-cols=256")
os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=64 --led-cols=64")
#Retrieve symbols from the CSV file
def GetSymbols():
global symbols
symbols = []
CSV = csv.reader(open('csv/tickers.csv', 'r'))
z = 0
for row in CSV:
symbols.append([])
endLine = str(row[0]+','+row[1]+','+row[2]+','+row[3]+','+row[4]+','+row[5]+','+row[6]+','+row[7]+','+row[8]+','+row[9]+','+row[10])
symbols[z].append(endLine)
z+=1
print(symbols)
#Main run definition called by server
def runStockTicker(RUNTIME, DELAY, SPEEDTIME):
global keySwapper
global running
global displayTime
global symbolLength
global parseCSV
global Delay
global speedDisplay
parseCSV = 0
Delay = DELAY
displayTime = RUNTIME
speedDisplay = SPEEDTIME
GetSymbols()
symbolLength = (len(symbols) - 3)
GetfullStockImage()
keySwapper += 1
running = True
parseCSV += 3
while (True):
if (running == True):
if (keySwapper<=1):
if(parseCSV <= symbolLength):
th = threading.Thread(target=displayMatrix)
th.start()
#displayMatrix()
time.sleep((int(displayTime) - int(Delay)))
GetfullStockImage()
keySwapper += 1
th.join()
parseCSV += 3
else:
parseCSV = 0
else:
keySwapper = 0
else:
break;
#Change running to false stopping refresh at next checkpoint
def stopStockTicker():
global running
global keySwapper
keySwapper = 0
running = False
print('MATRIX DISPLAY STOP CALLED')