451 lines
15 KiB
Python
451 lines
15 KiB
Python
# Copyright (C) 2020 Daniel Richardson richardson.daniel@hotmail.co.uk
|
|
#
|
|
# This file is part of stockTicker project for justinodunn.
|
|
#
|
|
# stockTicker can not be copied and/or distributed without the express
|
|
# permission of Daniel Richardson
|
|
|
|
|
|
import sys, select
|
|
import os
|
|
import threading
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import time
|
|
import csv
|
|
import requests
|
|
import pexpect
|
|
from rgbmatrix import RGBMatrix, RGBMatrixOptions
|
|
from rgbmatrix.graphics import *
|
|
from multiprocessing import Process
|
|
|
|
def getInput(Block=False):
|
|
if Block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
|
|
msg = sys.stdin.read(1)
|
|
#sys.stdin.flush()
|
|
else:
|
|
msg = ''
|
|
return msg
|
|
|
|
|
|
|
|
class StockTicker():
|
|
def __init__(self):
|
|
#Define global resources
|
|
self.symbols = []
|
|
|
|
self.greenORred = (255, 255, 255)
|
|
#self.blank = Image.open('logos/blank.png')
|
|
self.blank = Image.new('RGB', (0, 32))
|
|
self.running = True
|
|
self.brightness = 1.0
|
|
self.delay = 0.02
|
|
|
|
# Configuration for the matrix
|
|
options = RGBMatrixOptions()
|
|
options.rows = 32
|
|
options.cols = 64
|
|
options.chain_length = 2
|
|
options.parallel = 1
|
|
options.hardware_mapping = 'adafruit-hat' # If you have an Adafruit HAT: 'adafruit-hat'
|
|
options.gpio_slowdown = 3
|
|
self.matrix = RGBMatrix(options = options)
|
|
|
|
|
|
def openImage(self, image_file):
|
|
image = Image.open(image_file)
|
|
# Make image fit our screen.
|
|
#image.thumbnail((self.matrix.width, self.matrix.height), Image.ANTIALIAS)
|
|
|
|
image = image.convert('RGB')
|
|
return image
|
|
|
|
|
|
def setImage(self, image, offset_x = 0, offset_y = 0, unsafe=False):
|
|
|
|
|
|
if (image.mode != "RGB"):
|
|
raise Exception("Currently, only RGB mode is supported for SetImage(). Please create images with mode 'RGB' or convert first with image = image.convert('RGB'). Pull requests to support more modes natively are also welcome :)")
|
|
|
|
if unsafe:
|
|
#In unsafe mode we directly acceshow to send commands to running python processs the underlying PIL image array
|
|
#in cython, which is considered unsafe pointer accecss,
|
|
#however it's super fast and seems to work fine
|
|
#https://groups.google.com/forum/#!topic/cython-users/Dc1ft5W6KM4
|
|
img_width, img_height = image.size
|
|
|
|
print('args', img_width, img_height, offset_x, offset_y, image)
|
|
self.matrix.SetPixelsPillow(offset_x, offset_y, img_width, img_height, image)
|
|
else:
|
|
# First implementation of a SetImage(). OPTIMIZE_ME: A more native
|
|
# implementation that directly reads the buffer and calls the underlying
|
|
# C functions can certainly be faster.
|
|
img_width, img_height = image.size
|
|
pixels = image.load()
|
|
for x in range(max(0, -offset_x), min(img_width, self.matrix.width - offset_x)):
|
|
for y in range(max(0, -offset_y), min(img_height, self.matrix.height - offset_y)):
|
|
(r, g, b) = pixels[x, y]
|
|
|
|
self.matrix.SetPixel(x + offset_x, y + offset_y, r*self.brightness, g*self.brightness, b*self.brightness)
|
|
|
|
def scrollImage(self, image_file, offset_x = 0, offset_y = 0):
|
|
image = self.openImage(image_file)
|
|
img_width, img_height = image.size
|
|
|
|
|
|
|
|
while offset_x > -img_width:
|
|
offset_x -= 1
|
|
|
|
self.setImage(image, offset_x = offset_x, offset_y = offset_y)
|
|
time.sleep(delay)
|
|
|
|
def scrollImageTransition(self, image_files, offset_x = 0, offset_y = 0, stocks = True):
|
|
# use two image files and switch between them with a seemless transition
|
|
current_img = 1
|
|
|
|
kill = False
|
|
while True:
|
|
|
|
if current_img == 1:
|
|
|
|
if stocks:
|
|
update_process = Process(target = self.getFullStockImage, args = (1,))
|
|
update_process.start()
|
|
image1 = self.openImage(image_files[0])
|
|
image2 = self.openImage(image_files[1])
|
|
|
|
elif current_img == 2:
|
|
|
|
if stocks:
|
|
update_process = Process(target = self.getFullStockImage, args = (2,))
|
|
update_process.start()
|
|
|
|
image1 = self.openImage(image_files[1])
|
|
image2 = self.openImage(image_files[0])
|
|
|
|
img_width, img_height = image1.size
|
|
|
|
|
|
while offset_x > -img_width:
|
|
offset_x -= 1
|
|
|
|
self.setImage(image1, offset_x = offset_x, offset_y = offset_y)
|
|
|
|
if offset_x + img_width < self.matrix.width: # if the image is ending
|
|
self.setImage(image2, offset_x = offset_x + img_width, offset_y = offset_y)
|
|
|
|
|
|
time.sleep(self.delay)
|
|
try:
|
|
msg = getInput()
|
|
if msg == 'K':
|
|
self.resetMatrix()
|
|
kill = True
|
|
break
|
|
|
|
self.process_msg(msg)
|
|
except KeyboardInterrupt:
|
|
sys.stdout.flush()
|
|
pass
|
|
|
|
|
|
if kill: break
|
|
|
|
if stocks:
|
|
update_process.join()
|
|
if current_img == 1:
|
|
current_img = 2
|
|
elif current_img == 2:
|
|
current_img = 1
|
|
offset_x = 0
|
|
|
|
def displayText(self):
|
|
f = open('csv/scroll_text.csv', 'r')
|
|
|
|
CSV = csv.reader(f)
|
|
text, r, g, b = next(CSV)
|
|
|
|
f.close()
|
|
|
|
|
|
font = ImageFont.load("./fonts/texgyre-27.pil")
|
|
|
|
width, height = self.get_text_dimensions(text, font)
|
|
print(text)
|
|
print('dims:', width, height)
|
|
img = Image.new('RGB', (width + 10, 32))
|
|
d = ImageDraw.Draw(img)
|
|
|
|
d.text((4, 0), text, fill=(int(r), int(g), int(b)), font=font)
|
|
|
|
img.save('scroll_text.ppm')
|
|
self.scrollImageTransition(['scroll_text.ppm', 'scroll_text.ppm'], offset_x = 128, offset_y = 0, stocks = False)
|
|
|
|
|
|
|
|
#Using change between min and day price give appropriate arrow
|
|
#and set the overall change colour
|
|
def getArrow(self, CHANGE):
|
|
self.greenORred
|
|
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
|
|
if(CHANGE>0):
|
|
Arrow = Image.open(os.path.join(logos_path, 'up.png'))
|
|
self.greenORred = (0, 255, 0)
|
|
return Arrow, CHANGE
|
|
Arrow = Image.open(os.path.join(logos_path, 'down.png'))
|
|
self.greenORred = (255, 0, 0)
|
|
CHANGE = (CHANGE * -1)
|
|
return Arrow, CHANGE
|
|
|
|
def get_text_dimensions(self, text_string, font):
|
|
canvas = Image.new('RGB', (10000,100))
|
|
|
|
draw = ImageDraw.Draw(canvas)
|
|
monospace = font
|
|
|
|
text = text_string
|
|
white = (255,255,255)
|
|
draw.text((10, 10), text, font=monospace, fill=white)
|
|
|
|
bbox = canvas.getbbox()
|
|
width = bbox[2]-bbox[0]
|
|
height = bbox[3]-bbox[1]
|
|
|
|
return width,height
|
|
|
|
#Draw Ticker, current and change onto one image
|
|
def textToImage(self, TICKER, CURRENT, CHANGE, ARROW):
|
|
font = ImageFont.load("./fonts/10x20.pil")
|
|
img = Image.new('RGB', (150, 32))
|
|
d = ImageDraw.Draw(img)
|
|
|
|
d.text((4, 0), TICKER, fill=(255, 255, 255), font=font)
|
|
d.text((4, 16), CURRENT, fill=self.greenORred, font=font)
|
|
|
|
text_width_current, text_height = self.get_text_dimensions(CURRENT, font)
|
|
|
|
img.paste(ARROW, ((text_width_current + 9),18))
|
|
d.text(((text_width_current+29), 16), CHANGE, fill=self.greenORred, font=font)
|
|
|
|
text_width_change, text_height = self.get_text_dimensions(CHANGE, font)
|
|
|
|
newWidth = (text_width_current+29) + (text_width_change)
|
|
|
|
img.crop((0,0,newWidth,32))
|
|
return img
|
|
|
|
#Stitch the logo & prices picture into one image
|
|
def stitchImage(self, image_list):
|
|
widths, heights = zip(*(i.size for i in image_list))
|
|
total_width = sum(widths)
|
|
max_height = max(heights)
|
|
new_im = Image.new('RGB', (total_width, max_height))
|
|
x_offset = 0
|
|
for im in image_list:
|
|
new_im.paste(im, (x_offset,0))
|
|
x_offset += im.size[0]
|
|
|
|
|
|
return new_im
|
|
|
|
def resetMatrix(self):
|
|
for x in range(self.matrix.width):
|
|
for y in range(self.matrix.height):
|
|
|
|
|
|
self.matrix.SetPixel(x , y , 0,0,0)
|
|
|
|
#Connect all the pieces togeather creating 1 long final stock image
|
|
def getFullStockImage(self, updated_img):
|
|
|
|
image_list = []
|
|
|
|
start = time.time()
|
|
self.readCSV()
|
|
|
|
|
|
for i, symbol in enumerate(self.symbols):
|
|
info = self.stock_info[symbol]
|
|
|
|
change = float(info[0])-float(info[1]) #TEXT
|
|
ticker = symbol #TEXT
|
|
current = '%.2f' % float(info[0]) #TEXT
|
|
|
|
arrow, change = self.getArrow(change)
|
|
change = '%.2f' % change
|
|
midFrame = self.textToImage(ticker, current, change, arrow) #IMAGE THE TEXT
|
|
|
|
|
|
try:
|
|
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
|
|
|
|
logo = Image.open(os.path.join(logos_path, ticker + '.png'))
|
|
stitchedStock = self.stitchImage([logo,midFrame])
|
|
except:
|
|
|
|
stitchedStock = midFrame
|
|
|
|
|
|
|
|
image_list.append(self.blank)
|
|
image_list.append(stitchedStock)
|
|
|
|
for i, coin in enumerate(self.coins):
|
|
info = self.coin_info[coin]
|
|
|
|
change = float(info[1]) #TEXT
|
|
ticker = coin.upper() #TEXT
|
|
current = '%.2f' % float(info[0]) #TEXT
|
|
|
|
|
|
|
|
arrow, change = self.getArrow(change)
|
|
change = '%.2f' % change
|
|
midFrame = self.textToImage(ticker, current, change, arrow) #IMAGE THE TEXT
|
|
|
|
|
|
try:
|
|
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
|
|
|
|
logo = Image.open(os.path.join(logos_path, ticker + '.png'))
|
|
stitchedStock = self.stitchImage([logo,midFrame])
|
|
except:
|
|
|
|
stitchedStock = midFrame
|
|
|
|
image_list.append(self.blank)
|
|
image_list.append(stitchedStock)
|
|
|
|
finalDisplayImage = self.stitchImage(image_list)
|
|
|
|
if updated_img == 1:
|
|
finalDisplayImage.save('final.ppm')
|
|
elif updated_img == 2:
|
|
finalDisplayImage.save('final1.ppm')
|
|
|
|
|
|
|
|
#Send the final stitched image to the display for set amount of time
|
|
def displayStocks(self):
|
|
#os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=32 --led-cols=256")
|
|
#os.system("sudo ./demo -D1 final.ppm -t " + str(self.displayTime) +" -m "+ str(self.speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=64 --led-cols=64 --led-slowdown-gpio=4 ")
|
|
self.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0)
|
|
#Retrieve symbols and stock info from the csv file
|
|
def readCSV(self):
|
|
|
|
self.symbols = []
|
|
self.stock_info = {}
|
|
f = open('csv/tickers.csv', 'r')
|
|
CSV = csv.reader(f)
|
|
next(CSV)
|
|
for row in CSV:
|
|
|
|
try:
|
|
symbol, current_price, opening_price = row
|
|
self.symbols.append(symbol)
|
|
self.stock_info[symbol] = [current_price, opening_price]
|
|
except:
|
|
symbol = row[0]
|
|
self.symbols.append(symbol)
|
|
self.stock_info[symbol] = []
|
|
|
|
f.close()
|
|
|
|
self.coins = []
|
|
self.coin_info = {}
|
|
f = open('csv/crypto.csv', 'r')
|
|
CSV = csv.reader(f)
|
|
next(CSV)
|
|
for row in CSV:
|
|
|
|
try:
|
|
coin, current_price, day_change = row
|
|
self.coins.append(coin)
|
|
self.coin_info[coin] = [current_price, day_change]
|
|
except:
|
|
symbol = row[0]
|
|
self.coins.append(coin)
|
|
self.coin_info[coin] = []
|
|
|
|
f.close()
|
|
|
|
#Main run definition called by server
|
|
def runStockTicker(self, runtime, delay, speedtime):
|
|
|
|
|
|
self.getSymbols()
|
|
|
|
self.GetfullStockImage()
|
|
self.keySwapper += 1
|
|
self.running = True
|
|
|
|
|
|
while (True):
|
|
if (self.running == True):
|
|
|
|
|
|
#th = threading.Thread(target=self.displayMatrix)
|
|
#th.start()
|
|
self.displayMatrix()
|
|
time.sleep((int(runtime) - int(delay)))
|
|
self.getfullStockImage()
|
|
|
|
#th.join()
|
|
|
|
|
|
else:
|
|
break;
|
|
|
|
#Change running to false stopping refresh at next checkpoint
|
|
def stopStockTicker(self):
|
|
|
|
|
|
self.keySwapper = 0
|
|
self.running = False
|
|
print('MATRIX DISPLAY STOP CALLED')
|
|
|
|
def process_msg(self, msg):
|
|
|
|
if msg == 'S':
|
|
self.getFullStockImage(1)
|
|
self.displayStocks()
|
|
|
|
|
|
elif msg == 's':
|
|
self.delay = 0.03
|
|
|
|
elif msg == 'm':
|
|
self.delay = 0.01
|
|
|
|
elif msg == 'f':
|
|
self.delay = 0.005
|
|
|
|
elif msg in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']:
|
|
|
|
self.brightness = min(1.0, float(msg)/10 + 0.1)
|
|
elif msg == 'T':
|
|
self.displayText()
|
|
|
|
if __name__ == '__main__':
|
|
|
|
#print(sys.stdin.readlines())
|
|
stock_ticker = StockTicker()
|
|
|
|
#stock_ticker.process_msg(brightness)
|
|
#stock_ticker.process_msg(speed)
|
|
|
|
|
|
#stock_ticker.displayText()
|
|
#stock_ticker.getFullStockImage(1)
|
|
#stock_ticker.displayStocks()
|
|
|
|
stock_ticker.delay = 0.001
|
|
stock_ticker.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0)
|
|
|
|
while True:
|
|
msg = getInput()
|
|
stock_ticker.process_msg(msg)
|
|
|
|
|
|
|