# Copyright (C) 2020 Daniel Richardson richardson.daniel@hotmail.co.uk # # This file is part of stockTicker project for justinodunn. # # stockTicker can not be copied and/or distributed without the express # permission of Daniel Richardson from twelvedata import TDClient import sys import os import threading from PIL import Image, ImageDraw, ImageFont import time import csv class StockTicker(): def __init__(self): #Define global resources self.symbols = [] self.ApiKeys = [["b1be6696d54342bdb5c8f59a052854fd","237d183faf984e7eba708a647c55153a","3012b3cb19784ed4be2b02e9907e53cb"],["f835e85df2a74df1904fe6dc53bcc17a","41ec268496744c34a96e3c408081300b","2d566fc71ade46d688713b13730c5219"]] #self.ApiKeys = [["6172b84d1f464ad88b95ed52e4391bce","979f9b8c9ee748fbbb57cb490ccaaf42","593fa818a44144699b75433aafd92d15"] self.retrievedList = [] self.greenORred = (255, 255, 255) self.ListStocks = [] self.blank = Image.open('logos/blank.png') self.ApiCalledError = False self.keySwapper = 0 self.displayTime = 60 self.speedDisplay = 25 self.running = True self.Delay = 20 self.parseCSV = 0 self.symbolLength = 0 #Get the logo from file that is the same as ticker name def getLogo(self, Ticker): try: Logo = Image.open('logos/' + Ticker + '.png') except: print('Could not find logo for: ' + Ticker + ' using default') Logo = Image.open('logos/default.png') return Logo #Using change between min and day price give appropriate arrow #and set the overall change colour def getArrow(self, CHANGE): self.greenORred if(CHANGE>0): Arrow = Image.open('logos/up.png') self.greenORred = (0, 255, 0) return Arrow, CHANGE Arrow = Image.open('logos/down.png') self.greenORred = (255, 0, 0) CHANGE = (CHANGE * -1) return Arrow, CHANGE def get_text_dimensions(self, text_string, font): canvas = Image.new('RGB', (100,100)) draw = ImageDraw.Draw(canvas) monospace = font text = text_string white = (255,255,255) draw.text((10, 10), text, font=monospace, fill=white) bbox = canvas.getbbox() width = bbox[2]-bbox[0] height = bbox[3]-bbox[1] return width,height #Draw Ticker, current and change onto one image def textToImage(self, TICKER, CURRENT, CHANGE, ARROW): font = ImageFont.truetype("EncodeSans.ttf", 18) img = Image.new('RGB', (150, 32)) d = ImageDraw.Draw(img) d.text((4, -4), TICKER, fill=(255, 255, 255), font=font) d.text((4, 13), CURRENT, fill=self.greenORred, font=font) text_width_current, text_height = self.get_text_dimensions(CURRENT, font) img.paste(ARROW, ((text_width_current + 9),18)) d.text(((text_width_current+29), 13), CHANGE, fill=self.greenORred, font=font) text_width_change, text_height = self.get_text_dimensions(CHANGE, font) newWidth = (text_width_current+29) + (text_width_change) img.crop((0,0,newWidth,32)) return img #Stitch the logo & prices picture into one image def stitchImage(self, IMAGES): widths, heights = zip(*(i.size for i in IMAGES)) total_width = sum(widths) max_height = max(heights) new_im = Image.new('RGB', (total_width, max_height)) x_offset = 0 for im in IMAGES[0:2]: new_im.paste(im, (x_offset,0)) x_offset += im.size[0] return new_im #Display the image onto the matrix def displayImage(self, IMAGE): print('k') return 0 #Get current prices and day prices for the ticker then format the response into an array def getStockPrices(self, APIKEY, SYMBOLS): try: symbols_list = SYMBOLS.split(",") td = TDClient(apikey=APIKEY) tmin = td.time_series(symbol=SYMBOLS, interval="1min", outputsize='1').as_json() tday = td.time_series(symbol=SYMBOLS, interval="1day", outputsize='2').as_json() for i in range(len(symbols_list)): currentMinPriceList = tmin[symbols_list[i]] currentDayPriceList = list(tday[symbols_list[i]]) del currentDayPriceList[0] currentMinPriceClose = tmin[symbols_list[i]][0]['close'] currentDayPriceClose = currentDayPriceList[0]['close'] self.retrievedList.append([]) self.retrievedList[i].append(symbols_list[i]) self.retrievedList[i].append(currentMinPriceClose) self.retrievedList[i].append(currentDayPriceClose) except: self.ApiCalledError = True print("Could not fetch data - API CALLS REACHED? - Will display old image") #Connect all the pieces togeather creating 1 long final stock image def GetfullStockImage(self): self.ApiCalledError = False self.ListStocks = [] self.retrievedList = [] start = time.time() print('elapsed time: ' + str((time.time()-start))) j=0 while (j<3): print('API KEY: ' + str(self.ApiKeys[self.keySwapper][j])) self.getStockPrices(self.ApiKeys[self.keySwapper][j], self.symbols[(j+self.parseCSV)][0]) if (self.ApiCalledError == False): j+=1 print('elapsed time after api call: ' + str((time.time()-start))) for i in range(len(self.retrievedList)): print('elapsed timethrough loop: ' + str((time.time()-start))) Change = float(self.retrievedList[i][1])-float(self.retrievedList[i][2]) #TEXT Ticker = self.retrievedList[i][0] #TEXT Current = '%.2f' % float(self.retrievedList[i][1]) #TEXT Logo = self.getLogo(self.retrievedList[i][0]) Arrow, Change = self.getArrow(Change) Change = '%.2f' % Change MidFrame = self.textToImage(Ticker, Current, Change, Arrow) #IMAGE THE TEXT StitchedStock = self.stitchImage([Logo,MidFrame]) self.ListStocks.append(self.blank) self.ListStocks.append(StitchedStock) self.retrievedList = [] else: break print('elapsed time loop ended: ' + str((time.time()-start))) if (self.ApiCalledError == False): FinalDisplayImage = self.stitchImage(self.ListStocks) FinalDisplayImage.save('final.ppm') print(self.retrievedList) #Send the final stitched image to the display for set amount of time def displayMatrix(self): #os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=32 --led-cols=256") os.system("sudo ./demo -D1 final.ppm -t " + str(self.displayTime) +" -m "+ str(self.speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=64 --led-cols=64 --led-slowdown-gpio=4 ") #Retrieve symbols from the CSV file def GetSymbols(self): self.symbols = [] CSV = csv.reader(open('csv/tickers.csv', 'r')) z = 0 for row in CSV: self.symbols.append([]) endLine = str(row[0]+','+row[1]+','+row[2]+','+row[3]+','+row[4]+','+row[5]+','+row[6]+','+row[7]+','+row[8]+','+row[9]+','+row[10]) self.symbols[z].append(endLine) z+=1 print(self.symbols) #Main run definition called by server def runStockTicker(self, RUNTIME, DELAY, SPEEDTIME): self.parseCSV = 0 self.Delay = DELAY self.displayTime = RUNTIME self.speedDisplay = SPEEDTIME self.GetSymbols() self.symbolLength = (len(self.symbols) - 3) self.GetfullStockImage() self.keySwapper += 1 self.running = True self.parseCSV += 3 while (True): if (self.running == True): if (self.keySwapper<=1): if(self.parseCSV <= self.symbolLength): th = threading.Thread(target=self.displayMatrix) th.start() #displayMatrix() time.sleep((int(self.displayTime) - int(self.Delay))) self.GetfullStockImage() self.keySwapper += 1 th.join() self.parseCSV += 3 else: self.parseCSV = 0 else: self.keySwapper = 0 else: break; #Change running to false stopping refresh at next checkpoint def stopStockTicker(self): self.keySwapper = 0 self.running = False print('MATRIX DISPLAY STOP CALLED')