# Copyright (C) 2020 Daniel Richardson richardson.daniel@hotmail.co.uk # # This file is part of stockTicker project for justinodunn. # # stockTicker can not be copied and/or distributed without the express # permission of Daniel Richardson import sys, select import os import threading from PIL import Image, ImageDraw, ImageFont import time import csv import requests import pexpect from rgbmatrix import RGBMatrix, RGBMatrixOptions from rgbmatrix.graphics import * from multiprocessing import Process def getInput(Block=False): if Block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []): msg = sys.stdin.read(1) #sys.stdin.flush() else: msg = '' return msg class StockTicker(): def __init__(self): #Define global resources self.symbols = [] self.greenORred = (255, 255, 255) #self.blank = Image.open('logos/blank.png') self.blank = Image.new('RGB', (0, 32)) self.running = True self.brightness = 1.0 self.delay = 0.02 # Configuration for the matrix options = RGBMatrixOptions() options.rows = 32 options.cols = 64 options.chain_length = 2 options.parallel = 1 options.hardware_mapping = 'adafruit-hat' # If you have an Adafruit HAT: 'adafruit-hat' options.gpio_slowdown = 3 self.matrix = RGBMatrix(options = options) def openImage(self, image_file): image = Image.open(image_file) # Make image fit our screen. #image.thumbnail((self.matrix.width, self.matrix.height), Image.ANTIALIAS) image = image.convert('RGB') return image def setImage(self, image, offset_x = 0, offset_y = 0, unsafe=False): if (image.mode != "RGB"): raise Exception("Currently, only RGB mode is supported for SetImage(). Please create images with mode 'RGB' or convert first with image = image.convert('RGB'). Pull requests to support more modes natively are also welcome :)") if unsafe: #In unsafe mode we directly acceshow to send commands to running python processs the underlying PIL image array #in cython, which is considered unsafe pointer accecss, #however it's super fast and seems to work fine #https://groups.google.com/forum/#!topic/cython-users/Dc1ft5W6KM4 img_width, img_height = image.size print('args', img_width, img_height, offset_x, offset_y, image) self.matrix.SetPixelsPillow(offset_x, offset_y, img_width, img_height, image) else: # First implementation of a SetImage(). OPTIMIZE_ME: A more native # implementation that directly reads the buffer and calls the underlying # C functions can certainly be faster. img_width, img_height = image.size pixels = image.load() for x in range(max(0, -offset_x), min(img_width, self.matrix.width - offset_x)): for y in range(max(0, -offset_y), min(img_height, self.matrix.height - offset_y)): (r, g, b) = pixels[x, y] self.matrix.SetPixel(x + offset_x, y + offset_y, r*self.brightness, g*self.brightness, b*self.brightness) def scrollImage(self, image, offset_x = 0, offset_y = 0): img_width, img_height = image.size while offset_x > -img_width: offset_x -= 1 self.setImage(image, offset_x = offset_x, offset_y = offset_y) try: msg = getInput() if msg == 'K': self.resetMatrix() return True self.process_msg(msg) except KeyboardInterrupt: sys.stdout.flush() pass time.sleep(self.delay) return False def scrollImageStacked(self, image, offset_x = 0, offset_y = 0): img_width, img_height = image.size while offset_x > -img_width - 128: offset_x -= 1 self.setImage(image, offset_x = offset_x+128, offset_y = offset_y) self.setImage(image, offset_x = offset_x, offset_y = offset_y+16) try: msg = getInput() if msg == 'K': self.resetMatrix() return True self.process_msg(msg) except KeyboardInterrupt: sys.stdout.flush() pass time.sleep(self.delay) return False def scrollImageTransition(self, image_files, offset_x = 0, offset_y = 0, stocks = True): # use two image files and switch between them with a seemless transition current_img = 1 kill = False while True: if current_img == 1: if stocks: update_process = Process(target = self.getFullStockImage, args = (1,)) update_process.start() image1 = self.openImage(image_files[0]) image2 = self.openImage(image_files[1]) elif current_img == 2: if stocks: update_process = Process(target = self.getFullStockImage, args = (2,)) update_process.start() image1 = self.openImage(image_files[1]) image2 = self.openImage(image_files[0]) img_width, img_height = image1.size while offset_x > -img_width: offset_x -= 1 self.setImage(image1, offset_x = offset_x, offset_y = offset_y) if offset_x + img_width < self.matrix.width: # if the image is ending self.setImage(image2, offset_x = offset_x + img_width, offset_y = offset_y) time.sleep(self.delay) try: msg = getInput() if msg == 'K': self.resetMatrix() kill = True image1.close() image2.close() break self.process_msg(msg) except KeyboardInterrupt: sys.stdout.flush() pass image1.close() image2.close() if kill: break if stocks: update_process.join() if current_img == 1: current_img = 2 elif current_img == 2: current_img = 1 offset_x = 0 def textImage(self, text, font, r = 255, g = 255, b = 255): ''' creates and returns a ppm image containing the text in the supplied font and colour ''' width, height = self.get_text_dimensions(text, font) print(text) print('dims:', width, height) img = Image.new('RGB', (width + 50, height)) d = ImageDraw.Draw(img) d.text((4, 0), text, fill=(r, g, b), font=font) return img def displayUserText(self): ''' displays the text entered in the webpage by the user. ''' f = open('csv/scroll_text.csv', 'r') CSV = csv.reader(f) text, r, g, b = next(CSV) f.close() font = ImageFont.load("./fonts/texgyre-27.pil") img = self.textImage(text, font, int(r), int(g), int(b)) img.save('scroll_text.ppm') self.scrollImageTransition(['scroll_text.ppm', 'scroll_text.ppm'], offset_x = 128, offset_y = 0, stocks = False) def displayNews(self): font = ImageFont.load("./fonts/8x13.pil") while True: headlines = [] f = open('csv/news.csv', 'r') CSV = csv.reader(f) next(CSV) for row in CSV: headlines.append(','.join(row)) f.close() for headline in headlines: img = self.textImage(headline, font) #killed = self.scrollImage(img, offset_x = 128, offset_y = 0) killed = self.scrollImageStacked(img, offset_x = 128, offset_y = 0) if killed: break if killed: break def displayGIF(self, gif_file): with open('log.txt', "w") as log: try: im = Image.open(gif_file) # To iterate through the entire gif i = 0 while 1: print(im.tell()) try: im.seek(i) except EOFError: print('finished') i = 0 im.seek(i) # do something to im self.setImage(im.convert('RGB')) time.sleep(0.5) i += 1 try: msg = getInput() if msg == 'K': im.close() self.resetMatrix() break self.process_msg(msg) except KeyboardInterrupt: sys.stdout.flush() pass except Exception as e: log.write(str(e)) #Using change between min and day price give appropriate arrow #and set the overall change colour def getArrow(self, CHANGE): self.greenORred logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos') if(CHANGE>0): Arrow = Image.open(os.path.join(logos_path, 'up.png')) self.greenORred = (0, 255, 0) return Arrow, CHANGE Arrow = Image.open(os.path.join(logos_path, 'down.png')) self.greenORred = (255, 0, 0) CHANGE = (CHANGE * -1) return Arrow, CHANGE def get_text_dimensions(self, text_string, font): canvas = Image.new('RGB', (10000,100)) draw = ImageDraw.Draw(canvas) monospace = font text = text_string white = (255,255,255) draw.text((10, 10), text, font=monospace, fill=white) bbox = canvas.getbbox() width = bbox[2]-bbox[0] height = bbox[3]-bbox[1] return width,height #Draw Ticker, current and change onto one image def textToImage(self, TICKER, CURRENT, CHANGE, ARROW): font = ImageFont.load("./fonts/10x20.pil") img = Image.new('RGB', (150, 32)) d = ImageDraw.Draw(img) d.text((4, 0), TICKER, fill=(255, 255, 255), font=font) d.text((4, 16), CURRENT, fill=self.greenORred, font=font) text_width_current, text_height = self.get_text_dimensions(CURRENT, font) img.paste(ARROW, ((text_width_current + 9),18)) d.text(((text_width_current+29), 16), CHANGE, fill=self.greenORred, font=font) text_width_change, text_height = self.get_text_dimensions(CHANGE, font) newWidth = (text_width_current+29) + (text_width_change) img.crop((0,0,newWidth,32)) return img #Stitch the logo & prices picture into one image def stitchImage(self, image_list): widths, heights = zip(*(i.size for i in image_list)) total_width = sum(widths) max_height = max(heights) new_im = Image.new('RGB', (total_width, max_height)) x_offset = 0 for im in image_list: new_im.paste(im, (x_offset,0)) x_offset += im.size[0] return new_im def resetMatrix(self): for x in range(self.matrix.width): for y in range(self.matrix.height): self.matrix.SetPixel(x , y , 0,0,0) #Connect all the pieces togeather creating 1 long final stock image def getFullStockImage(self, updated_img): image_list = [] start = time.time() self.readCSV() for i, symbol in enumerate(self.symbols): info = self.stock_info[symbol] change = float(info[0])-float(info[1]) #TEXT ticker = symbol #TEXT current = '%.2f' % float(info[0]) #TEXT arrow, change = self.getArrow(change) change = '%.2f' % change midFrame = self.textToImage(ticker, current, change, arrow) #IMAGE THE TEXT try: logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos') logo = Image.open(os.path.join(logos_path, ticker + '.png')) stitchedStock = self.stitchImage([logo,midFrame]) except: stitchedStock = midFrame image_list.append(self.blank) image_list.append(stitchedStock) for i, coin in enumerate(self.coins): info = self.coin_info[coin] change = float(info[1]) #TEXT ticker = coin.upper() #TEXT current = '%.2f' % float(info[0]) #TEXT arrow, change = self.getArrow(change) change = '%.2f' % change midFrame = self.textToImage(ticker, current, change, arrow) #IMAGE THE TEXT try: logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos') logo = Image.open(os.path.join(logos_path, ticker + '.png')) stitchedStock = self.stitchImage([logo,midFrame]) except: stitchedStock = midFrame image_list.append(self.blank) image_list.append(stitchedStock) finalDisplayImage = self.stitchImage(image_list) if updated_img == 1: finalDisplayImage.save('final.ppm') elif updated_img == 2: finalDisplayImage.save('final1.ppm') #Send the final stitched image to the display for set amount of time def displayStocks(self): #os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=32 --led-cols=256") #os.system("sudo ./demo -D1 final.ppm -t " + str(self.displayTime) +" -m "+ str(self.speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=64 --led-cols=64 --led-slowdown-gpio=4 ") self.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0) #Retrieve symbols and stock info from the csv file def readCSV(self): self.symbols = [] self.stock_info = {} f = open('csv/tickers.csv', 'r') CSV = csv.reader(f) next(CSV) for row in CSV: try: symbol, current_price, opening_price = row self.symbols.append(symbol) self.stock_info[symbol] = [current_price, opening_price] except: symbol = row[0] self.symbols.append(symbol) self.stock_info[symbol] = [] f.close() self.coins = [] self.coin_info = {} f = open('csv/crypto.csv', 'r') CSV = csv.reader(f) next(CSV) for row in CSV: try: coin, current_price, day_change = row self.coins.append(coin) self.coin_info[coin] = [current_price, day_change] except: symbol = row[0] self.coins.append(coin) self.coin_info[coin] = [] f.close() #Main run definition called by server def runStockTicker(self, runtime, delay, speedtime): self.getSymbols() self.GetfullStockImage() self.keySwapper += 1 self.running = True while (True): if (self.running == True): #th = threading.Thread(target=self.displayMatrix) #th.start() self.displayMatrix() time.sleep((int(runtime) - int(delay))) self.getfullStockImage() #th.join() else: break; #Change running to false stopping refresh at next checkpoint def stopStockTicker(self): self.keySwapper = 0 self.running = False print('MATRIX DISPLAY STOP CALLED') def process_msg(self, msg): if msg == 'S': # stocks self.getFullStockImage(1) self.displayStocks() elif msg == 'N': #news self.displayNews() # speed settings elif msg == 's': self.delay = 0.03 elif msg == 'm': self.delay = 0.01 elif msg == 'f': self.delay = 0.005 elif msg in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']: # birghtness ettings self.brightness = min(1.0, float(msg)/10 + 0.1) elif msg == 'T':# text self.displayUserText() elif msg == 'I': # image image = self.openImage(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_image')) self.setImage( image) elif msg == 'G': # gif self.displayGIF(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_gif')) elif msg == 'K': # kill self.resetMatrix() if __name__ == '__main__': #print(sys.stdin.readlines()) stock_ticker = StockTicker() #stock_ticker.displayTextRepeating() #stock_ticker.process_msg('f') #stock_ticker.displayNews() #stock_ticker.displayGIF('/home/pi/Desktop/stock_ticker/gifs/open.gif') #stock_ticker.displayGIF('/home/pi/Desktop/stock_ticker/gifs/close.gif') #stock_ticker.process_msg(brightness) #stock_ticker.process_msg(speed) #stock_ticker.displayText() #stock_ticker.getFullStockImage(1) #stock_ticker.displayStocks() #stock_ticker.delay = 0.001 #stock_ticker.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0) while True: msg = getInput() stock_ticker.process_msg(msg)