PYTHON CODINGS
- Khoo Jacky

- Aug 7, 2020
- 2 min read
Updated: Aug 13, 2020
For this particular project, I am using the Raspberry Pi Model 4 B which I purchased from
HOW IT WORKS......
Python is the official programming language of Raspberry Pi and IDLE 3, a Python Integrated Development Environment, so that is what I am using to program my RPi.
MAIN PROGRAM:
Now I will split the codes into different sections to make it easier to explain.
import sys ////provides functions and variables
import http.client ////Enable connection to THINGSPEAK
import urllib for data logging purposes
import schedule //// allows for twitter hourly updates
from datetime import datetime ////Allow for display of time
from twython import Twython ////TweetBOT
from auth import ( //// stored in a seperate file are the read and
consumer_key, and write keys in order to send tweets
consumer_secret, from a specific account
access_token,
access_token_secret
)
twitter = Twython( ////Initialize the keys and store it
consumer_key,
consumer_secret,
access_token,
access_token_secret
)
sys.path.append('/home/pi/Downloads') ////make sure the lcd module is in the path
import lcd ////import the module from the path above
lcd.lcd_init()
i = datetime.now() ////
import Adafruit_DHT ////import DHT22 sensor library
import time ////provides time related functions
from time import sleep ////allow us to use the sleep function
import RPi.GPIO as GPIO ////refer RPi.GPIO as GPIO
GPIO.setwarnings(False) ////disable some warnings the system may
DHT_SENSOR = Adafruit_DHT.DHT22
RPI_SENSOR = Adafruit_DHT.DHT22
DHT_PIN = 4
RPI_PIN = 27
buzzer = 23
ledR = 17
waterlvl = 25
switch = 16
valve = 22
key ="A6IN0O2EUXCW4ER8" #thingspeak.key
GPIO.setmode(GPIO.BCM)
GPIO.setup(waterlvl, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(buzzer, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(ledR, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(switch, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setwarnings(False)
def buzz(noteFreq, duration): #buzzerconfig
halveWaveTime = 1 / (noteFreq * 2 )
waves = int(duration * noteFreq)
for i in range(waves):
GPIO.output(buzzer, True)
time.sleep(halveWaveTime)
GPIO.output(buzzer, False)
time.sleep(halveWaveTime)
def overheat(): #buzzertune
t=0
notes=[262,294,330,262,262,294,330,262,330,349,392,330,349,392,392,440,392,349,330,262,392,440,392,349,330,262,262,196,262,262,196,262]
duration=[0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,1,0.5,0.5,1,0.25,0.25,0.25,0.25,0.5,0.5,0.25,0.25,0.25,0.25,0.5,0.5,0.5,0.5,1,0.5,0.5,1]
for n in notes:
buzz(n, duration[t])
time.sleep(duration[t] *0.1)
t+=1
def wire(): #buzzertune
t=0
notes=[262,330,262,330,262,330,262,330,262,330,262,330]
duration=[0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5]
for n in notes:
buzz(n, duration[t])
time.sleep(duration[t] *0.1)
t+=1
def update():
now = i.strftime('%Y/%m/%d %H:%M:%S ')
message = now + ("Temp={0:0.1f}C Humidity={1:0.1f}% Your RPI Hourly Update".format(temperature, humidity))
twitter.update_status(status=message)
print(“Twitter Updated”)
return
schedule.every(1).hour.do(update)
def openValve():
GPIO.output(valve, GPIO.HIGH)
sleep(2)
GPIO.output(valve, GPIO.LOW)
return
schedule.every(4).hour.do(openValve)
while True:
GPIO.output(waterlvl, GPIO.input(switch))
sleep(1)
humidity, temperature = Adafruit_DHT.read(DHT_SENSOR, DHT_PIN)
RPIhumid,RPItemp = Adafruit_DHT.read(RPI_SENSOR, RPI_PIN)
if humidity is not None and temperature is not None:
schedule.run_pending()
print("Temp={0:0.1f}C Humidity={1:0.1f}%".format(temperature, humidity))
lcd.lcd_byte(lcd.LCD_LINE_1, lcd.LCD_CMD)
lcd.lcd_string("Temp={0:0.1f}C".format(temperature),2)
lcd.lcd_byte(lcd.LCD_LINE_2, lcd.LCD_CMD)
lcd.lcd_string("Humidity={0:0.1f}%".format(humidity),2)
humidity, temperature = Adafruit_DHT.read_retry(DHT_SENSOR, DHT_PIN)
RPIhumid,RPItemp = Adafruit_DHT.read_retry(RPI_SENSOR, RPI_PIN)
#define parameters
params = urllib.parse.urlencode({'field1': temperature, 'field2' : humidity, ‘field3’: RPItemp, ‘field4’ : RPIhumid, 'key':key})
#headers configuration/address connection
headers = {"Content-typZZe":"application/x-www-form-urlencoded","Accept":"text/plain"}
conn = http.client.HTTPConnection("api.thingspeak.com:80")
#connection to thingspeak and send data
try:
conn.request("POST", "/update",params,headers)
response = conn.getresponse()
print(response.status, response.reason)
data = response.read()
conn.close()
except:
print("Failed to connect")
if RPItemp is not None:
print("RPI Temp={0:0.1f}C".format(temperature))
else:
print("Sensor failure. Check wiring.");
lcd.lcd_byte(lcd.LCD_LINE_1, lcd.LCD_CMD)
lcd.lcd_string("RPI SENSOR",2)
lcd.lcd_byte(lcd.LCD_LINE_2, lcd.LCD_CMD)
lcd.lcd_string("Not Working",2)
GPIO.output(ledR,GPIO.HIGH)
wire()
GPIO.output(ledR,GPIO.LOW)
sleep(2)
if(RPItemp > 30):
print("OVERHEATING!!!")
lcd.lcd_byte(lcd.LCD_LINE_1, lcd.LCD_CMD)
lcd.lcd_string("RPI OVERHEATING!",2)
lcd.lcd_byte(lcd.LCD_LINE_2, lcd.LCD_CMD)
lcd.lcd_string("Tweet Alert",2)
GPIO.output(ledR,GPIO.HIGH)
overheat() #play music
now = i.strftime('%Y/%m/%d %H:%M:%S ')
message = now + ("RPI IS OVER HEATING!!!Temp={0:0.1f}°C ".format(temperature))
twitter.update_status(status=message)
print("Tweeted: " + message)
GPIO.output(ledR,GPIO.LOW)
else:
print("Sensor failure. Check wiring.");
lcd.lcd_byte(lcd.LCD_LINE_1, lcd.LCD_CMD)
lcd.lcd_string("Sensor Failure",2)
lcd.lcd_byte(lcd.LCD_LINE_2, lcd.LCD_CMD)
lcd.lcd_string("Check Wiring",2)
GPIO.output(ledR,GPIO.HIGH)
wire()
GPIO.output(ledR,GPIO.LOW)
time.sleep(3);


Comments