fintic-tracker/stockTicker.py

574 lines
19 KiB
Python

# Copyright (C) 2020 Daniel Richardson richardson.daniel@hotmail.co.uk
#
# This file is part of stockTicker project for justinodunn.
#
# stockTicker can not be copied and/or distributed without the express
# permission of Daniel Richardson
import sys, select
import os
import threading
from PIL import Image, ImageDraw, ImageFont
import time
import csv
import requests
import pexpect
from rgbmatrix import RGBMatrix, RGBMatrixOptions
from rgbmatrix.graphics import *
from multiprocessing import Process
def getInput(Block=False):
if Block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
msg = sys.stdin.read(1)
#sys.stdin.flush()
else:
msg = ''
return msg
class StockTicker():
def __init__(self):
#Define global resources
self.symbols = []
self.greenORred = (255, 255, 255)
#self.blank = Image.open('logos/blank.png')
self.blank = Image.new('RGB', (0, 32))
self.running = True
self.brightness = 1.0
self.delay = 0.02
# Configuration for the matrix
options = RGBMatrixOptions()
options.rows = 32
options.cols = 64
options.chain_length = 2
options.parallel = 1
options.hardware_mapping = 'adafruit-hat' # If you have an Adafruit HAT: 'adafruit-hat'
options.gpio_slowdown = 3
self.matrix = RGBMatrix(options = options)
def openImage(self, image_file):
image = Image.open(image_file)
# Make image fit our screen.
#image.thumbnail((self.matrix.width, self.matrix.height), Image.ANTIALIAS)
image = image.convert('RGB')
return image
def setImage(self, image, offset_x = 0, offset_y = 0, unsafe=False):
if (image.mode != "RGB"):
raise Exception("Currently, only RGB mode is supported for SetImage(). Please create images with mode 'RGB' or convert first with image = image.convert('RGB'). Pull requests to support more modes natively are also welcome :)")
if unsafe:
#In unsafe mode we directly acceshow to send commands to running python processs the underlying PIL image array
#in cython, which is considered unsafe pointer accecss,
#however it's super fast and seems to work fine
#https://groups.google.com/forum/#!topic/cython-users/Dc1ft5W6KM4
img_width, img_height = image.size
print('args', img_width, img_height, offset_x, offset_y, image)
self.matrix.SetPixelsPillow(offset_x, offset_y, img_width, img_height, image)
else:
# First implementation of a SetImage(). OPTIMIZE_ME: A more native
# implementation that directly reads the buffer and calls the underlying
# C functions can certainly be faster.
img_width, img_height = image.size
pixels = image.load()
for x in range(max(0, -offset_x), min(img_width, self.matrix.width - offset_x)):
for y in range(max(0, -offset_y), min(img_height, self.matrix.height - offset_y)):
(r, g, b) = pixels[x, y]
self.matrix.SetPixel(x + offset_x, y + offset_y, r*self.brightness, g*self.brightness, b*self.brightness)
def scrollImage(self, image_file, offset_x = 0, offset_y = 0):
image = self.openImage(image_file)
img_width, img_height = image.size
while offset_x > -img_width:
offset_x -= 1
self.setImage(image, offset_x = offset_x, offset_y = offset_y)
try:
msg = getInput()
if msg == 'K':
self.resetMatrix()
return True
self.process_msg(msg)
except KeyboardInterrupt:
sys.stdout.flush()
pass
time.sleep(self.delay)
return False
def scrollImageTransition(self, image_files, offset_x = 0, offset_y = 0, stocks = True):
# use two image files and switch between them with a seemless transition
current_img = 1
kill = False
while True:
if current_img == 1:
if stocks:
update_process = Process(target = self.getFullStockImage, args = (1,))
update_process.start()
image1 = self.openImage(image_files[0])
image2 = self.openImage(image_files[1])
elif current_img == 2:
if stocks:
update_process = Process(target = self.getFullStockImage, args = (2,))
update_process.start()
image1 = self.openImage(image_files[1])
image2 = self.openImage(image_files[0])
img_width, img_height = image1.size
while offset_x > -img_width:
offset_x -= 1
self.setImage(image1, offset_x = offset_x, offset_y = offset_y)
if offset_x + img_width < self.matrix.width: # if the image is ending
self.setImage(image2, offset_x = offset_x + img_width, offset_y = offset_y)
time.sleep(self.delay)
try:
msg = getInput()
if msg == 'K':
self.resetMatrix()
kill = True
image1.close()
image2.close()
break
self.process_msg(msg)
except KeyboardInterrupt:
sys.stdout.flush()
pass
image1.close()
image2.close()
if kill: break
if stocks:
update_process.join()
if current_img == 1:
current_img = 2
elif current_img == 2:
current_img = 1
offset_x = 0
def displayTextRepeating(self):
f = open('csv/scroll_text.csv', 'r')
CSV = csv.reader(f)
text, r, g, b = next(CSV)
f.close()
font = ImageFont.load("./fonts/texgyre-27.pil")
width, height = self.get_text_dimensions(text, font)
print(text)
print('dims:', width, height)
img = Image.new('RGB', (width + 50, 32))
d = ImageDraw.Draw(img)
d.text((4, 0), text, fill=(int(r), int(g), int(b)), font=font)
img.save('scroll_text.ppm')
self.scrollImageTransition(['scroll_text.ppm', 'scroll_text.ppm'], offset_x = 128, offset_y = 0, stocks = False)
def displayText(self, text, font):
width, height = self.get_text_dimensions(text, font)
print(text)
print('dims:', width, height)
img = Image.new('RGB', (width + 50, 32))
d = ImageDraw.Draw(img)
d.text((4, 0), text, fill=(255, 255, 255), font=font)
img.save('scroll_text.ppm')
return self.scrollImage('scroll_text.ppm', offset_x = 128, offset_y = 0)
def displayNews(self):
font = ImageFont.load("./fonts/8x13.pil")
while True:
headlines = []
f = open('csv/news.csv', 'r')
CSV = csv.reader(f)
next(CSV)
for row in CSV:
headlines.append(','.join(row))
f.close()
for headline in headlines:
killed = self.displayText(headline, font)
if killed:
kill = True
if kill: break
if kill: break
def displayGIF(self, gif_file):
with open('log.txt', "w") as log:
try:
im = Image.open(gif_file)
# To iterate through the entire gif
i = 0
while 1:
print(im.tell())
try:
im.seek(i)
except EOFError:
print('finished')
i = 0
im.seek(i)
# do something to im
self.setImage(im.convert('RGB'))
time.sleep(0.5)
i += 1
try:
msg = getInput()
if msg == 'K':
im.close()
self.resetMatrix()
break
self.process_msg(msg)
except KeyboardInterrupt:
sys.stdout.flush()
pass
except Exception as e:
log.write(str(e))
#Using change between min and day price give appropriate arrow
#and set the overall change colour
def getArrow(self, CHANGE):
self.greenORred
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
if(CHANGE>0):
Arrow = Image.open(os.path.join(logos_path, 'up.png'))
self.greenORred = (0, 255, 0)
return Arrow, CHANGE
Arrow = Image.open(os.path.join(logos_path, 'down.png'))
self.greenORred = (255, 0, 0)
CHANGE = (CHANGE * -1)
return Arrow, CHANGE
def get_text_dimensions(self, text_string, font):
canvas = Image.new('RGB', (10000,100))
draw = ImageDraw.Draw(canvas)
monospace = font
text = text_string
white = (255,255,255)
draw.text((10, 10), text, font=monospace, fill=white)
bbox = canvas.getbbox()
width = bbox[2]-bbox[0]
height = bbox[3]-bbox[1]
return width,height
#Draw Ticker, current and change onto one image
def textToImage(self, TICKER, CURRENT, CHANGE, ARROW):
font = ImageFont.load("./fonts/10x20.pil")
img = Image.new('RGB', (150, 32))
d = ImageDraw.Draw(img)
d.text((4, 0), TICKER, fill=(255, 255, 255), font=font)
d.text((4, 16), CURRENT, fill=self.greenORred, font=font)
text_width_current, text_height = self.get_text_dimensions(CURRENT, font)
img.paste(ARROW, ((text_width_current + 9),18))
d.text(((text_width_current+29), 16), CHANGE, fill=self.greenORred, font=font)
text_width_change, text_height = self.get_text_dimensions(CHANGE, font)
newWidth = (text_width_current+29) + (text_width_change)
img.crop((0,0,newWidth,32))
return img
#Stitch the logo & prices picture into one image
def stitchImage(self, image_list):
widths, heights = zip(*(i.size for i in image_list))
total_width = sum(widths)
max_height = max(heights)
new_im = Image.new('RGB', (total_width, max_height))
x_offset = 0
for im in image_list:
new_im.paste(im, (x_offset,0))
x_offset += im.size[0]
return new_im
def resetMatrix(self):
for x in range(self.matrix.width):
for y in range(self.matrix.height):
self.matrix.SetPixel(x , y , 0,0,0)
#Connect all the pieces togeather creating 1 long final stock image
def getFullStockImage(self, updated_img):
image_list = []
start = time.time()
self.readCSV()
for i, symbol in enumerate(self.symbols):
info = self.stock_info[symbol]
change = float(info[0])-float(info[1]) #TEXT
ticker = symbol #TEXT
current = '%.2f' % float(info[0]) #TEXT
arrow, change = self.getArrow(change)
change = '%.2f' % change
midFrame = self.textToImage(ticker, current, change, arrow) #IMAGE THE TEXT
try:
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
logo = Image.open(os.path.join(logos_path, ticker + '.png'))
stitchedStock = self.stitchImage([logo,midFrame])
except:
stitchedStock = midFrame
image_list.append(self.blank)
image_list.append(stitchedStock)
for i, coin in enumerate(self.coins):
info = self.coin_info[coin]
change = float(info[1]) #TEXT
ticker = coin.upper() #TEXT
current = '%.2f' % float(info[0]) #TEXT
arrow, change = self.getArrow(change)
change = '%.2f' % change
midFrame = self.textToImage(ticker, current, change, arrow) #IMAGE THE TEXT
try:
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
logo = Image.open(os.path.join(logos_path, ticker + '.png'))
stitchedStock = self.stitchImage([logo,midFrame])
except:
stitchedStock = midFrame
image_list.append(self.blank)
image_list.append(stitchedStock)
finalDisplayImage = self.stitchImage(image_list)
if updated_img == 1:
finalDisplayImage.save('final.ppm')
elif updated_img == 2:
finalDisplayImage.save('final1.ppm')
#Send the final stitched image to the display for set amount of time
def displayStocks(self):
#os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=32 --led-cols=256")
#os.system("sudo ./demo -D1 final.ppm -t " + str(self.displayTime) +" -m "+ str(self.speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=64 --led-cols=64 --led-slowdown-gpio=4 ")
self.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0)
#Retrieve symbols and stock info from the csv file
def readCSV(self):
self.symbols = []
self.stock_info = {}
f = open('csv/tickers.csv', 'r')
CSV = csv.reader(f)
next(CSV)
for row in CSV:
try:
symbol, current_price, opening_price = row
self.symbols.append(symbol)
self.stock_info[symbol] = [current_price, opening_price]
except:
symbol = row[0]
self.symbols.append(symbol)
self.stock_info[symbol] = []
f.close()
self.coins = []
self.coin_info = {}
f = open('csv/crypto.csv', 'r')
CSV = csv.reader(f)
next(CSV)
for row in CSV:
try:
coin, current_price, day_change = row
self.coins.append(coin)
self.coin_info[coin] = [current_price, day_change]
except:
symbol = row[0]
self.coins.append(coin)
self.coin_info[coin] = []
f.close()
#Main run definition called by server
def runStockTicker(self, runtime, delay, speedtime):
self.getSymbols()
self.GetfullStockImage()
self.keySwapper += 1
self.running = True
while (True):
if (self.running == True):
#th = threading.Thread(target=self.displayMatrix)
#th.start()
self.displayMatrix()
time.sleep((int(runtime) - int(delay)))
self.getfullStockImage()
#th.join()
else:
break;
#Change running to false stopping refresh at next checkpoint
def stopStockTicker(self):
self.keySwapper = 0
self.running = False
print('MATRIX DISPLAY STOP CALLED')
def process_msg(self, msg):
if msg == 'S': # stocks
self.getFullStockImage(1)
self.displayStocks()
elif msg == 'N': #news
self.displayNews()
# speed settings
elif msg == 's':
self.delay = 0.03
elif msg == 'm':
self.delay = 0.01
elif msg == 'f':
self.delay = 0.005
elif msg in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']: # birghtness ettings
self.brightness = min(1.0, float(msg)/10 + 0.1)
elif msg == 'T':# text
self.displayTextRepeating()
elif msg == 'I': # image
image = self.openImage(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_image'))
self.setImage( image)
elif msg == 'G': # gif
self.displayGIF(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_gif'))
elif msg == 'K': # kill
self.resetMatrix()
if __name__ == '__main__':
#print(sys.stdin.readlines())
stock_ticker = StockTicker()
#stock_ticker.process_msg('f')
#stock_ticker.displayNews()
#stock_ticker.displayGIF('/home/pi/Desktop/stock_ticker/gifs/open.gif')
#stock_ticker.displayGIF('/home/pi/Desktop/stock_ticker/gifs/close.gif')
#stock_ticker.process_msg(brightness)
#stock_ticker.process_msg(speed)
#stock_ticker.displayText()
#stock_ticker.getFullStockImage(1)
#stock_ticker.displayStocks()
#stock_ticker.delay = 0.001
#stock_ticker.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0)
while True:
msg = getInput()
stock_ticker.process_msg(msg)