898 lines
32 KiB
Python
898 lines
32 KiB
Python
# Copyright (C) 2020 Daniel Richardson richardson.daniel@hotmail.co.uk
|
||
#
|
||
# This file is part of stockTicker project for justinodunn.
|
||
#
|
||
# stockTicker can not be copied and/or distributed without the express
|
||
# permission of Daniel Richardson
|
||
|
||
|
||
import sys, select
|
||
import os
|
||
import threading
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
import time
|
||
import csv
|
||
import requests
|
||
import pexpect
|
||
from rgbmatrix import RGBMatrix, RGBMatrixOptions
|
||
from rgbmatrix.graphics import *
|
||
from multiprocessing import Process
|
||
import traceback
|
||
import json
|
||
from datetime import datetime
|
||
|
||
|
||
def getInput(Block=False):
|
||
if Block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
|
||
msg = sys.stdin.read(1)
|
||
#sys.stdin.flush()
|
||
else:
|
||
msg = ''
|
||
return msg
|
||
|
||
|
||
|
||
class StockTicker():
|
||
def __init__(self):
|
||
#Define global resources
|
||
self.symbols = []
|
||
|
||
self.greenORred = (255, 255, 255)
|
||
#self.blank = Image.open('logos/blank.png')
|
||
self.blank = Image.new('RGB', (0, 32))
|
||
self.running = True
|
||
self.brightness = 1.0
|
||
self.delay = 0.02
|
||
|
||
# Configuration for the matrix
|
||
options = RGBMatrixOptions()
|
||
options.rows = 32
|
||
options.cols = 64
|
||
options.chain_length = 2
|
||
options.parallel = 1
|
||
options.hardware_mapping = 'adafruit-hat' # If you have an Adafruit HAT: 'adafruit-hat'
|
||
options.gpio_slowdown = 3
|
||
self.matrix = RGBMatrix(options = options)
|
||
self.points = True # display crypto change in points or percent
|
||
|
||
|
||
def openImage(self, image_file):
|
||
image = Image.open(image_file)
|
||
# Make image fit our screen.
|
||
#image.thumbnail((self.matrix.width, self.matrix.height), Image.ANTIALIAS)
|
||
|
||
image = image.convert('RGB')
|
||
return image
|
||
|
||
|
||
def setImage(self, image, offset_x = 0, offset_y = 0, unsafe=False):
|
||
|
||
|
||
if (image.mode != "RGB"):
|
||
raise Exception("Currently, only RGB mode is supported for SetImage(). Please create images with mode 'RGB' or convert first with image = image.convert('RGB'). Pull requests to support more modes natively are also welcome :)")
|
||
|
||
if unsafe:
|
||
#In unsafe mode we directly acceshow to send commands to running python processs the underlying PIL image array
|
||
#in cython, which is considered unsafe pointer accecss,
|
||
#however it's super fast and seems to work fine
|
||
#https://groups.google.com/forum/#!topic/cython-users/Dc1ft5W6KM4
|
||
img_width, img_height = image.size
|
||
|
||
print('args', img_width, img_height, offset_x, offset_y, image)
|
||
self.matrix.SetPixelsPillow(offset_x, offset_y, img_width, img_height, image)
|
||
else:
|
||
# First implementation of a SetImage(). OPTIMIZE_ME: A more native
|
||
# implementation that directly reads the buffer and calls the underlying
|
||
# C functions can certainly be faster.
|
||
img_width, img_height = image.size
|
||
pixels = image.load()
|
||
for x in range(max(0, -offset_x), min(img_width, self.matrix.width - offset_x)):
|
||
for y in range(max(0, -offset_y), min(img_height, self.matrix.height - offset_y)):
|
||
(r, g, b) = pixels[x, y]
|
||
|
||
self.matrix.SetPixel(x + offset_x, y + offset_y, r*self.brightness, g*self.brightness, b*self.brightness)
|
||
|
||
def scrollImage(self, image, offset_x = 0, offset_y = 0):
|
||
img_width, img_height = image.size
|
||
|
||
|
||
|
||
while offset_x > -img_width:
|
||
offset_x -= 1
|
||
|
||
self.setImage(image, offset_x = offset_x, offset_y = offset_y)
|
||
|
||
# remove the ppixels behind the image, to stop trailing
|
||
|
||
|
||
for x in range(offset_x + img_width, 128):
|
||
for y in range(self.matrix.height):
|
||
|
||
|
||
self.matrix.SetPixel(x , y , 0,0,0)
|
||
|
||
|
||
try:
|
||
msg = getInput()
|
||
if msg == 'K':
|
||
self.resetMatrix()
|
||
|
||
return True
|
||
|
||
self.process_msg(msg)
|
||
except KeyboardInterrupt:
|
||
sys.stdout.flush()
|
||
pass
|
||
|
||
time.sleep(self.delay)
|
||
return False
|
||
|
||
def scrollImageStacked(self, image, offset_x = 0, offset_y = 0):
|
||
img_width, img_height = image.size
|
||
|
||
|
||
|
||
while offset_x > -img_width - 128:
|
||
offset_x -= 1
|
||
|
||
self.setImage(image, offset_x = offset_x+128, offset_y = offset_y)
|
||
self.setImage(image, offset_x = offset_x, offset_y = offset_y+20)
|
||
|
||
try:
|
||
msg = getInput()
|
||
if msg == 'K':
|
||
self.resetMatrix()
|
||
|
||
return True
|
||
|
||
self.process_msg(msg)
|
||
except KeyboardInterrupt:
|
||
sys.stdout.flush()
|
||
pass
|
||
|
||
time.sleep(self.delay)
|
||
return False
|
||
|
||
def scrollImageTransition(self, image_files, offset_x = 0, offset_y = 0, stocks = True):
|
||
# use two image files and switch between them with a seemless transition
|
||
current_img = 1
|
||
image1 = self.openImage(image_files[0])
|
||
image2 = self.openImage(image_files[1])
|
||
kill = False
|
||
while True:
|
||
|
||
if current_img == 1:
|
||
|
||
if stocks:
|
||
update_process = Process(target = self.getFullStockImage, args = (1,))
|
||
update_process.start()
|
||
image1 = self.openImage(image_files[0])
|
||
image2 = self.openImage(image_files[1])
|
||
|
||
elif current_img == 2:
|
||
|
||
if stocks:
|
||
update_process = Process(target = self.getFullStockImage, args = (2,))
|
||
update_process.start()
|
||
|
||
image1 = self.openImage(image_files[1])
|
||
image2 = self.openImage(image_files[0])
|
||
|
||
img_width, img_height = image1.size
|
||
|
||
|
||
while offset_x > -img_width:
|
||
offset_x -= 1
|
||
|
||
self.setImage(image1, offset_x = offset_x, offset_y = offset_y)
|
||
|
||
if offset_x + img_width < self.matrix.width: # if the image is ending
|
||
self.setImage(image2, offset_x = offset_x + img_width, offset_y = offset_y)
|
||
|
||
|
||
time.sleep(self.delay)
|
||
try:
|
||
msg = getInput()
|
||
|
||
if msg == 'K':
|
||
self.resetMatrix()
|
||
kill = True
|
||
image1.close()
|
||
image2.close()
|
||
break
|
||
|
||
self.process_msg(msg)
|
||
except KeyboardInterrupt:
|
||
sys.stdout.flush()
|
||
pass
|
||
if stocks:
|
||
image1.close()
|
||
image2.close()
|
||
|
||
if kill: break
|
||
|
||
if stocks:
|
||
update_process.join()
|
||
if current_img == 1:
|
||
current_img = 2
|
||
elif current_img == 2:
|
||
current_img = 1
|
||
offset_x = 0
|
||
|
||
def textImage(self, text, font, r = 255, g = 255, b = 255, matrix_height = False, buff = 0):
|
||
'''
|
||
creates and returns a ppm image containing the text in the supplied font and colour
|
||
'''
|
||
|
||
width, height = self.get_text_dimensions(text, font)
|
||
|
||
if matrix_height:
|
||
height = 32
|
||
print(text)
|
||
print('dims:', width, height)
|
||
img = Image.new('RGB', (width + buff + 3, height+3))
|
||
d = ImageDraw.Draw(img)
|
||
|
||
d.text((0, 0), text, fill=(r, g, b), font=font)
|
||
return img
|
||
|
||
|
||
def displayUserText(self):
|
||
'''
|
||
displays the text entered in the webpage by the user.
|
||
'''
|
||
|
||
f = open('csv/scroll_text.csv', 'r')
|
||
|
||
CSV = csv.reader(f)
|
||
text, r, g, b = next(CSV)
|
||
|
||
f.close()
|
||
|
||
|
||
font = ImageFont.load("./fonts/texgyre-27.pil")
|
||
|
||
img = self.textImage(text, font, int(r), int(g), int(b), True, buff = 50)
|
||
|
||
img.save('scroll_text.ppm')
|
||
self.scrollImageTransition(['scroll_text.ppm', 'scroll_text.ppm'], offset_x = 128, offset_y = 0, stocks = False)
|
||
|
||
def displayNews(self):
|
||
headline_font = ImageFont.load("./fonts/10x20.pil")
|
||
source_font = ImageFont.load("./fonts/10x20.pil")
|
||
while True:
|
||
headlines = []
|
||
source_date_times = []
|
||
|
||
f = open('csv/news.csv', 'r')
|
||
CSV = csv.reader(f)
|
||
next(CSV)
|
||
for row in CSV:
|
||
headline, source, date, time = row
|
||
headlines.append(headline)
|
||
source_date_times.append(source + ': ' + date + ' ' + time)
|
||
|
||
f.close()
|
||
|
||
if len(headlines) > 0:
|
||
|
||
for i, headline in enumerate(headlines):
|
||
headline = headline.replace("^", ",")
|
||
headline = headline.replace("’", "'")
|
||
headline = headline.replace("‘", "'")
|
||
headline = headline.replace('“', '"')
|
||
headline = headline.replace('”', '"')
|
||
|
||
headline_img = self.textImage(headline, headline_font, matrix_height = True)
|
||
source_img = self.textImage(source_date_times[i], source_font, r=255, g=255, b=0, matrix_height = True)
|
||
|
||
|
||
try:
|
||
logos_path = os.path.join(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos'), 'news_logos')
|
||
|
||
logo = Image.open(os.path.join(logos_path, 'techcrunch' + '.png'))
|
||
print(logo.size)
|
||
img = Image.new('RGB', (headline_img.size[0], 32))
|
||
img.paste(headline_img, (2, -3))
|
||
img.paste(source_img, (2,16))
|
||
|
||
img= self.stitchImage([logo,img])
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
img = Image.new('RGB', (headline_img.size[0], 32))
|
||
img.paste(headline_img, (0,0))
|
||
img.paste(source_img, (0,16))
|
||
img.save('image.ppm')
|
||
|
||
|
||
|
||
killed = self.scrollImage(img, offset_x = 128, offset_y = 0)
|
||
#killed = self.scrollImageStacked(img, offset_x = 128, offset_y = 0)
|
||
if killed: break
|
||
|
||
else:
|
||
print('new failed')
|
||
|
||
|
||
|
||
if killed: break
|
||
|
||
|
||
|
||
def displayGIF(self, gif):
|
||
# To iterate through the entire gif
|
||
i = 0
|
||
while True:
|
||
print(gif.tell())
|
||
try:
|
||
gif.seek(i)
|
||
except EOFError:
|
||
print('finished')
|
||
i = 0
|
||
gif.seek(i)
|
||
# do something to im
|
||
self.setImage(gif.convert('RGB'))
|
||
time.sleep(0.5)
|
||
i += 1
|
||
try:
|
||
msg = getInput()
|
||
|
||
if msg == 'K':
|
||
gif.close()
|
||
self.resetMatrix()
|
||
|
||
break
|
||
|
||
self.process_msg(msg)
|
||
except KeyboardInterrupt:
|
||
sys.stdout.flush()
|
||
pass
|
||
|
||
def scrollGIF(self, gif, offset_x = 0, offset_y = 0):
|
||
# To iterate through the entire gif
|
||
i = 0
|
||
|
||
img_width, img_height = gif.size
|
||
|
||
while offset_x > -img_width:
|
||
offset_x -= 1
|
||
|
||
try:
|
||
gif.seek(i)
|
||
except EOFError:
|
||
print('finished')
|
||
i = 0
|
||
gif.seek(i)
|
||
# do something to im
|
||
self.setImage(gif.convert('RGB'), offset_x = offset_x)
|
||
time.sleep(self.delay)
|
||
|
||
if offset_x % 20 == 0:
|
||
i += 1
|
||
|
||
for x in range(offset_x + img_width, 128):
|
||
for y in range(self.matrix.height):
|
||
|
||
self.matrix.SetPixel(x , y , 0,0,0)
|
||
|
||
try:
|
||
msg = getInput()
|
||
|
||
if msg == 'K':
|
||
gif.close()
|
||
self.resetMatrix()
|
||
return True
|
||
|
||
self.process_msg(msg)
|
||
except KeyboardInterrupt:
|
||
sys.stdout.flush()
|
||
pass
|
||
return False
|
||
|
||
#Using change between min and day price give appropriate arrow
|
||
#and set the overall change colour
|
||
def getArrow(self, CHANGE):
|
||
self.greenORred
|
||
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
|
||
if(CHANGE>0):
|
||
Arrow = Image.open(os.path.join(logos_path, 'up.png'))
|
||
self.greenORred = (0, 255, 0)
|
||
return Arrow, CHANGE
|
||
Arrow = Image.open(os.path.join(logos_path, 'down.png'))
|
||
self.greenORred = (255, 0, 0)
|
||
CHANGE = (CHANGE * -1)
|
||
return Arrow, CHANGE
|
||
|
||
def get_text_dimensions(self, text_string, font):
|
||
canvas = Image.new('RGB', (10000,100))
|
||
|
||
draw = ImageDraw.Draw(canvas)
|
||
monospace = font
|
||
|
||
text = text_string
|
||
white = (255,255,255)
|
||
draw.text((0, 0), text, font=monospace, fill=white)
|
||
|
||
bbox = canvas.getbbox()
|
||
width = bbox[2]-bbox[0]
|
||
height = bbox[3]-bbox[1]
|
||
|
||
return width,height
|
||
|
||
#Draw Ticker, current and change onto one image
|
||
def textToImage(self, TICKER, CURRENT, CHANGE, ARROW):
|
||
font = ImageFont.load("./fonts/10x20.pil")
|
||
text_width_current, text_height = self.get_text_dimensions(CURRENT, font)
|
||
img = Image.new('RGB', (text_width_current + 100, 32))
|
||
d = ImageDraw.Draw(img)
|
||
|
||
d.text((4, 0), TICKER, fill=(255, 255, 255), font=font)
|
||
d.text((4, 16), CURRENT, fill=self.greenORred, font=font)
|
||
|
||
|
||
|
||
img.paste(ARROW, ((text_width_current + 9),18))
|
||
d.text(((text_width_current+29), 16), CHANGE, fill=self.greenORred, font=font)
|
||
|
||
text_width_change, text_height = self.get_text_dimensions(CHANGE, font)
|
||
|
||
newWidth = (text_width_current+29) + (text_width_change)
|
||
|
||
img.crop((0,0,newWidth,32))
|
||
return img
|
||
|
||
#Stitch the logo & prices picture into one image
|
||
def stitchImage(self, image_list):
|
||
widths, heights = zip(*(i.size for i in image_list))
|
||
total_width = sum(widths)
|
||
max_height = max(heights)
|
||
new_im = Image.new('RGB', (total_width, max_height))
|
||
x_offset = 0
|
||
for im in image_list:
|
||
print(x_offset)
|
||
new_im.paste(im, (x_offset,0))
|
||
x_offset += im.size[0]
|
||
|
||
|
||
return new_im
|
||
|
||
def resetMatrix(self):
|
||
for x in range(self.matrix.width):
|
||
for y in range(self.matrix.height):
|
||
|
||
|
||
self.matrix.SetPixel(x , y , 0,0,0)
|
||
|
||
#Connect all the pieces togeather creating 1 long final stock image
|
||
def getFullStockImage(self, updated_img):
|
||
|
||
image_list = []
|
||
|
||
start = time.time()
|
||
self.readCSV()
|
||
|
||
|
||
for i, symbol in enumerate(self.symbols):
|
||
info = self.stock_info[symbol]
|
||
|
||
change = float(info[0])-float(info[1]) #TEXT
|
||
ticker = symbol #TEXT
|
||
current = '%.2f' % float(info[0]) #TEXT
|
||
|
||
arrow, change = self.getArrow(change)
|
||
change = '%.2f' % change
|
||
midFrame = self.textToImage(ticker, current, change, arrow) #IMAGE THE TEXT
|
||
|
||
|
||
try:
|
||
logos_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos')
|
||
|
||
logo = Image.open(os.path.join(logos_path, ticker + '.png'))
|
||
stitchedStock = self.stitchImage([logo,midFrame])
|
||
except:
|
||
|
||
stitchedStock = midFrame
|
||
|
||
|
||
|
||
image_list.append(self.blank)
|
||
image_list.append(stitchedStock)
|
||
|
||
for i, coin in enumerate(self.coins):
|
||
info = self.coin_info[coin]
|
||
print(info)
|
||
change = float(info[3]) #TEXT
|
||
ticker = info[0] #TEXT
|
||
current = float(info[2]) #TEXT
|
||
base = info[1].upper()
|
||
|
||
if self.points:
|
||
# convert percent to points
|
||
change = change/100 * current
|
||
|
||
current = '%.2f' % current
|
||
|
||
|
||
arrow, change = self.getArrow(change)
|
||
change = '%.2f' % change
|
||
midFrame = self.textToImage(ticker + '(' + base + ')', current, change, arrow) #IMAGE THE TEXT
|
||
|
||
|
||
try:
|
||
logos_path = os.path.join(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logos'), 'crypto')
|
||
|
||
logo = Image.open(os.path.join(logos_path, ticker + '.png'))
|
||
stitchedStock = self.stitchImage([logo,midFrame])
|
||
except:
|
||
|
||
stitchedStock = midFrame
|
||
|
||
image_list.append(self.blank)
|
||
image_list.append(stitchedStock)
|
||
|
||
finalDisplayImage = self.stitchImage(image_list)
|
||
|
||
if updated_img == 1:
|
||
finalDisplayImage.save('final.ppm')
|
||
elif updated_img == 2:
|
||
finalDisplayImage.save('final1.ppm')
|
||
|
||
def getTodayWeatherImage(self):
|
||
img = Image.new('RGB', (225, 32))
|
||
|
||
location = 'London'
|
||
|
||
|
||
current_weather = json.load(open('csv/current_weather.json', 'r'))
|
||
|
||
small_font = ImageFont.load("./fonts/5x7.pil")
|
||
large_font = ImageFont.load("./fonts/10x20.pil")
|
||
|
||
|
||
location_img = self.textImage(location, small_font)
|
||
|
||
img.paste(location_img, (5,0))
|
||
|
||
main = current_weather['main_weather']
|
||
if main == 'Clouds':
|
||
main = current_weather['description']
|
||
weather_ids = {'Clear': '01', 'few clouds': '02', 'scattered clouds': '03', 'broken clouds':'04', 'overcast clouds':'04', 'Drizzle':'09',
|
||
'Rain':'10', 'Thunderstorm':'11', 'Snow':'13', 'Mist': '50', 'Smoke': '50', 'Haze': '50', 'Dust': '50', 'Fog': '50',
|
||
'Sand': '50', 'Ash': '50', 'Squall': '50', 'Tornado': '50'}
|
||
|
||
weather_dir = './logos/weather_icons'
|
||
|
||
weather_img = Image.open(weather_dir + '/weather_type_icons/' + weather_ids[main] + '.png')
|
||
img.paste(weather_img, (5,12))
|
||
|
||
temp_img = self.textImage(str("{0:.0f}".format(current_weather['temp'])), large_font)
|
||
img.paste(temp_img, (50,9))
|
||
|
||
deg_img = self.textImage('o', small_font)
|
||
print(temp_img.size)
|
||
img.paste(deg_img, (70, 8))
|
||
|
||
main = current_weather['main_weather']
|
||
main_img = self.textImage(main, small_font)
|
||
img.paste(main_img, (45, 26))
|
||
|
||
feels_img = self.textImage('Feels like:' + str("{0:.0f}".format(current_weather['feels_like'])), small_font)
|
||
img.paste(feels_img, (41, 0))
|
||
|
||
min_img = self.textImage( "{0:.0f}".format(current_weather['min_temp']), small_font, r=0, g=0, b=255)
|
||
img.paste(min_img, (75, 15))
|
||
|
||
max_img = self.textImage( "{0:.0f}".format(current_weather['max_temp']), small_font, r=255, g=0, b=0)
|
||
img.paste(max_img, (90, 15))
|
||
|
||
hum_img = Image.open(weather_dir + '/humidity.png')
|
||
img.paste(hum_img, ( 107, 8))
|
||
|
||
htext_img = self.textImage(str(current_weather['humidity']) + '%', small_font)
|
||
img.paste(htext_img, (120, 10))
|
||
|
||
uv_img = Image.open(weather_dir + '/uv.png')
|
||
img.paste(uv_img, ( 109, 20))
|
||
|
||
utext_img = self.textImage(str(current_weather['uv']) , small_font)
|
||
img.paste(utext_img, (120, 23))
|
||
|
||
weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
|
||
months =['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
|
||
month = months[int(datetime.now().strftime('%m'))]
|
||
date = str(int(datetime.now().strftime('%d')))
|
||
|
||
|
||
weekday = weekdays[datetime.today().weekday()]
|
||
|
||
|
||
|
||
date_img = self.textImage(month + ' ' + date + ', ' + weekday, small_font)
|
||
img.paste(date_img, (132, 0))
|
||
|
||
rain_img = Image.open(weather_dir + '/rain-chance.png')
|
||
img.paste(rain_img, (143,8))
|
||
|
||
|
||
rtext_img = self.textImage(str(current_weather['clouds']) + '%', small_font)
|
||
img.paste(rtext_img, (156, 10))
|
||
|
||
cloud_img = Image.open(weather_dir + '/clouds.png')
|
||
img.paste(cloud_img, (143,20))
|
||
|
||
ctext_img = self.textImage(str(current_weather['clouds']) + '%', small_font)
|
||
img.paste(ctext_img, (156, 22))
|
||
|
||
wind_img = Image.open(weather_dir + '/wind.png')
|
||
img.paste(wind_img, (177,8))
|
||
|
||
wtext_img = self.textImage(str(current_weather['wind_speed']) + 'm/s', small_font)
|
||
img.paste(wtext_img, (190, 10))
|
||
|
||
vis_img = Image.open(weather_dir + '/visibility.png')
|
||
img.paste(vis_img, (177,20))
|
||
|
||
vtext_img = self.textImage(str(current_weather['visibility']/1000) + 'km', small_font)
|
||
img.paste(vtext_img, (190, 22))
|
||
|
||
return img
|
||
|
||
def getDailyWeatherImage(self):
|
||
location = 'London'
|
||
img = Image.new('RGB', (1000, 32))
|
||
|
||
|
||
daily_weather = json.load(open('csv/daily_weather.json', 'r'))
|
||
|
||
small_font = ImageFont.load("./fonts/5x7.pil")
|
||
large_font = ImageFont.load("./fonts/10x20.pil")
|
||
|
||
|
||
location_img = self.textImage(location, small_font)
|
||
|
||
#img.paste(location_img, (20,0))
|
||
|
||
weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
|
||
months =['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
|
||
month = months[int(datetime.now().strftime('%m'))]
|
||
date = str(int(datetime.now().strftime('%d')))
|
||
|
||
|
||
weekday = weekdays[datetime.today().weekday()]
|
||
|
||
|
||
|
||
date_img = self.textImage(month + ' ' + date + ', ' + weekday, small_font)
|
||
#img.paste(date_img, (70, 0))
|
||
|
||
|
||
print(len(daily_weather))
|
||
weather_dir = './logos/weather_icons'
|
||
|
||
weather_ids = {'Clear': '01', 'few clouds': '02', 'scattered clouds': '03', 'broken clouds':'04', 'overcast clouds':'04', 'Drizzle':'09',
|
||
'Rain':'10', 'Thunderstorm':'11', 'Snow':'13', 'Mist': '50', 'Smoke': '50', 'Haze': '50', 'Dust': '50', 'Fog': '50',
|
||
'Sand': '50', 'Ash': '50', 'Squall': '50', 'Tornado': '50'}
|
||
|
||
x_offset = 4
|
||
|
||
for i in range(len(daily_weather)):
|
||
weather = daily_weather[i]
|
||
main = weather['main_weather']
|
||
|
||
|
||
if main == 'Clouds':
|
||
main = weather['description']
|
||
|
||
weather_img = Image.open(weather_dir + '/weather_type_icons/' + weather_ids[main] + '.png')
|
||
min_img = self.textImage( "{0:.0f}".format(weather['min_temp']), small_font, r=0, g=0, b=255)
|
||
|
||
|
||
max_img = self.textImage( "{0:.0f}".format(weather['max_temp']), small_font, r=255, g=0, b=0)
|
||
|
||
img.paste(weather_img, (x_offset, 0))
|
||
img.paste(min_img, (x_offset, 26))
|
||
img.paste(max_img, (x_offset + 12, 26))
|
||
|
||
x_offset += weather_img.size[0] + 5
|
||
|
||
img = img.crop((0,0,x_offset,32))
|
||
|
||
return img
|
||
|
||
#Send the final stitched image to the display for set amount of time
|
||
def displayStocks(self):
|
||
#os.system("sudo ./demo -D1 final.ppm -t " + str(displayTime) +" -m "+ str(speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=32 --led-cols=256")
|
||
#os.system("sudo ./demo -D1 final.ppm -t " + str(self.displayTime) +" -m "+ str(self.speedDisplay) +" --led-gpio-mapping=adafruit-hat --led-rows=64 --led-cols=64 --led-slowdown-gpio=4 ")
|
||
self.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0)
|
||
|
||
|
||
#Retrieve symbols and stock info from the csv file
|
||
def readCSV(self):
|
||
|
||
self.symbols = []
|
||
self.stock_info = {}
|
||
f = open('csv/tickers.csv', 'r')
|
||
CSV = csv.reader(f)
|
||
next(CSV)
|
||
for row in CSV:
|
||
|
||
try:
|
||
symbol, current_price, opening_price = row
|
||
self.symbols.append(symbol)
|
||
self.stock_info[symbol] = [current_price, opening_price]
|
||
except:
|
||
pass # we dont want to include incomplete information
|
||
|
||
f.close()
|
||
|
||
self.coins = []
|
||
self.coin_info = {}
|
||
f = open('csv/crypto.csv', 'r')
|
||
CSV = csv.reader(f)
|
||
next(CSV)
|
||
for row in CSV:
|
||
|
||
try:
|
||
symbol, coin, base, current_price, day_change = row
|
||
self.coins.append(coin)
|
||
self.coin_info[coin] = [symbol, base, current_price, day_change]
|
||
except:
|
||
pass
|
||
|
||
f.close()
|
||
|
||
|
||
|
||
#Main run definition called by server
|
||
def runStockTicker(self, runtime, delay, speedtime):
|
||
|
||
|
||
self.getSymbols()
|
||
|
||
self.GetfullStockImage()
|
||
self.keySwapper += 1
|
||
self.running = True
|
||
|
||
|
||
while (True):
|
||
if (self.running == True):
|
||
|
||
|
||
#th = threading.Thread(target=self.displayMatrix)
|
||
#th.start()
|
||
self.displayMatrix()
|
||
time.sleep((int(runtime) - int(delay)))
|
||
self.getfullStockImage()
|
||
|
||
#th.join()
|
||
|
||
|
||
else:
|
||
break;
|
||
|
||
#Change running to false stopping refresh at next checkpoint
|
||
def stopStockTicker(self):
|
||
|
||
|
||
self.keySwapper = 0
|
||
self.running = False
|
||
print('MATRIX DISPLAY STOP CALLED')
|
||
|
||
def process_msg(self, msg):
|
||
|
||
if msg == 'S': # stocks
|
||
self.getFullStockImage(1)
|
||
self.displayStocks()
|
||
|
||
elif msg == 'N': #news
|
||
self.displayNews()
|
||
# speed settings
|
||
elif msg == 's':
|
||
self.delay = 0.03
|
||
|
||
elif msg == 'm':
|
||
self.delay = 0.01
|
||
|
||
elif msg == 'f':
|
||
self.delay = 0.005
|
||
|
||
|
||
elif msg in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']: # birghtness ettings
|
||
|
||
self.brightness = min(1.0, float(msg)/10 + 0.1)
|
||
|
||
|
||
elif msg == 'T':# text
|
||
|
||
self.displayUserText()
|
||
|
||
elif msg == 'I': # image
|
||
|
||
image = self.openImage(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_image'))
|
||
|
||
#self.setImage( image)
|
||
while True:
|
||
kill = self.scrollImage(image, offset_x = 128)
|
||
|
||
if kill:
|
||
break
|
||
|
||
elif msg == 'G': # gif
|
||
gif = Image.open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_gif'))
|
||
#self.displayGIF(gif)
|
||
while True:
|
||
kill = self.scrollGIF(gif, offset_x = 128)
|
||
if kill:
|
||
break
|
||
|
||
elif msg == 'W': # weather
|
||
img = stock_ticker.getTodayWeatherImage()
|
||
img.save('weather.ppm')
|
||
self.scrollImageTransition(['weather.ppm', 'weather.ppm'])
|
||
|
||
elif msg == 'D': # daily weather
|
||
img = stock_ticker.getDailyWeatherImage()
|
||
img.save('weather.ppm')
|
||
self.scrollImageTransition(['weather.ppm', 'weather.ppm'])
|
||
|
||
elif msg == 'K': # kill
|
||
self.resetMatrix()
|
||
|
||
if __name__ == '__main__':
|
||
|
||
#print(sys.stdin.readlines())
|
||
with open('log.txt', "w") as log:
|
||
try:
|
||
stock_ticker = StockTicker()
|
||
|
||
|
||
|
||
#img = stock_ticker.getDailyWeatherImage()
|
||
#img = stock_ticker.getTodayWeatherImage()
|
||
#img.save('weather.ppm')
|
||
#stock_ticker.scrollImageTransition(['weather.ppm', 'weather.ppm'])
|
||
|
||
|
||
|
||
#stock_ticker.process_msg('G')
|
||
#stock_ticker.scrollImageTransition([os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_image'), os.path.join(os.path.dirname(os.path.abspath(__file__)), 'display_image')], stocks = False)
|
||
#stock_ticker.readCSV()
|
||
#stock_ticker.displayUserText()
|
||
#
|
||
|
||
#stock_ticker.displayNews()
|
||
#stock_ticker.displayGIF('/home/pi/Desktop/stock_ticker/gifs/open.gif')
|
||
|
||
#stock_ticker.displayGIF('/home/pi/Desktop/stock_ticker/gifs/close.gif')
|
||
|
||
|
||
#stock_ticker.process_msg(brightness)
|
||
#stock_ticker.process_msg(speed)
|
||
|
||
|
||
#stock_ticker.displayText()
|
||
#stock_ticker.getFullStockImage(1)
|
||
#stock_ticker.process_msg('f')
|
||
#stock_ticker.displayStocks()
|
||
#stock_ticker.displayStocks()
|
||
|
||
#stock_ticker.delay = 0.001
|
||
#stock_ticker.scrollImageTransition(['final.ppm', 'final.ppm'], offset_x = 0, offset_y = 0)
|
||
|
||
while True:
|
||
msg = getInput()
|
||
stock_ticker.process_msg(msg)
|
||
except Exception as e:
|
||
|
||
exc_type, exc_obj, exc_tb = sys.exc_info()
|
||
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
|
||
log.write(str(e))
|
||
log.write('. file: ' + fname)
|
||
log.write('. line: ' + str(exc_tb.tb_lineno))
|
||
log.write('. type: ' + str(exc_type))
|
||
log.write('\n ' + "".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])))
|
||
raise(e)
|
||
|
||
|