Voici le fichier programme principal et comme nous avions des plantages de système de manière aléatoires, nous avons mis un deuxième programme que je transmets dans le prochain message
#!/usr/bin/python3
import os
import time
import requests
import urllib.parse
import board
import busio
import adafruit_ds3231
from datetime import datetime, timedelta
from math import *
from picamera2 import Picamera2
from ina219 import INA219, DeviceRangeError
import logging
import logging.handlers
import subprocess
fileLog = ‹ /home/pi/script/log.txt ›
logger = logging.getLogger(‹ mylogger ›)
logger.setLevel(logging.DEBUG)
handler = logging.handlers.RotatingFileHandler(fileLog, maxBytes=100000000, backupCount=1)
stream = logging.StreamHandler()
stream.setLevel(logging.DEBUG)
logger.addHandler(stream)
logger.addHandler(handler)
i2c = busio.I2C(board.SCL, board.SDA)
rtc = adafruit_ds3231.DS3231(i2c)
rtc.alarm1_interrupt = True
rtc.alarm1_status = False
rtc.alarm2_interrupt = True
rtc.alarm2_status = False
now_ds3231 = rtc.datetime
cmd = f"sudo date -s ‹ {now_ds3231.tm_year}-{now_ds3231.tm_mon:02d}-{now_ds3231.tm_mday:02d} {now_ds3231.tm_hour:02d}:{now_ds3231.tm_min:02d}:{now_ds3231.tm_sec:02d} ›"
os.system(cmd)
with open(‹ /boot/id_materiel.txt ›) as f:
id_materiel = f.read().strip()
with open(‹ /boot/portssh.txt ›) as f:
portssh = f.read().strip()
------------ START UTILS ------------
Log message (console & file)
def log(type, message):
messageToWrite = datetime.today().strftime(‹ %Y-%m-%d %H:%M:%S ›) + " - " + type + " - " + message
logger.info(messageToWrite)
Check if device has internet
def isInternetUp():
try:
check = subprocess.getoutput(« sudo nmcli -f GENERAL.STATE con show gsm | awk ‹ {print $2} › »)
if(check == « activated »):
print(« activated »)
return True
else:
print(« not activated »)
return False
except Exception as e:
log(« error », "no internet : " + str(e))
return False
Sync time with ntp
def syncTime():
try:
import ntplib
client = ntplib.NTPClient()
response = client.request(‹ pool.ntp.org ›)
os.system(‹ date ’ + time.strftime( ›%m%d%H%M%Y.%S’,time.localtime(response.tx_time)))
try:
rtc.datetime = time.localtime(response.tx_time)
log(« INFO », "New date set into system & RTC : "+time.strftime(‹ %y/%m/%d %u %H:%M:%S ›,time.localtime(response.tx_time)))
except Exception as e:
log(« ERROR », "syncTime - Write RTC : "+str(e))
return False
except Exception as e:
log(« ERROR », "syncTime() : Can’t sync with time server : " + str(e))
Reset nmcli connection
def resetConnection():
try:
os.system(« sudo nmcli con delete gsm »)
os.system(« sudo nmcli con add con-name gsm connection.type gsm ifname ttyUSB0 »)
os.system(« sudo nmcli con modify gsm ipv6.method disabled »)
os.system(« sudo nmcli con modify gsm connection.autoconnect yes »)
os.system(« sudo nmcli con modify gsm gsm.apn orange »)
os.system(« sudo nmcli con modify gsm gsm.username orange »)
os.system(« sudo nmcli con modify gsm gsm.password orange »)
except Exception as e:
log(« ERROR », "resetConnection() : Can’t reset connection : " + str(e))
Encode decimal to BCD
def dec2bcd(dec):
return int(int(dec)/10)*16 + int(dec)%10
def bcd2dec(bcd):
return(10 * (bcd >> 4) + (bcd & 0x0F))
Get value from UPS and return it
def getUPS():
ina = INA219(shunt_ohms=0.1, address=0x45)
ina.configure()
try:
v = ina.voltage()
i = ina.current()
p = ina.power()
message = '¤t={0:0.1f}&power={1:0.01f}&voltage={2:0.01f}'.format(i, p/1000, v)
return message
except DeviceRangeError as e:
log("ERROR", "getUPS() : "+str(e))
return False
def writeWitty(date):
try:
alarm1_struct = time.localtime(time.mktime(date.timetuple()))
alarm2_struct = time.localtime(time.mktime((2024, 1, 1, 8, 0, 0, 1, 1, 0)))
rtc.alarm1 = (alarm1_struct, « monthly »)
rtc.alarm2 = (alarm2_struct, « daily »)
log(« INFO », "Next startup scheduled at "+date.strftime(‹ %Y-%m-%d %H:%M:%S ›))
return True
except Exception as e:
log(« ERROR », "schedueNextStartup - Write RTC : "+str(e))
return False
def readWritty(date):
try:
dateAlarm = rtc.alarm1[0]
dateAlarm1 = datetime(dateAlarm.tm_year, dateAlarm.tm_mon, dateAlarm.tm_mday, dateAlarm.tm_hour, dateAlarm.tm_min, dateAlarm.tm_sec)
if(dateAlarm1.strftime(‹ %d %H:%M:%S ›) != date.strftime(‹ %d %H:%M:%S ›)):
log(« WARNING », "Ecriture Witty : Date different : « +dateAlarm1.strftime(‹ %d %H:%M:%S ›)+ » au lieu de "+date.strftime(‹ %d %H:%M:%S ›))
return False
return True
except Exception as e:
log(« ERROR », "scheduleNextStartup - Read RTC : "+str(e))
return False
def initSshReverse():
try:
log(« INFO », « Init SSH reverse »)
# Run shell command in background
os.system(« autossh -M 0 -o "ServerAliveInterval 30" -o "ServerAliveCountMax 3" -N -f -o StrictHostKeyChecking=accept-new -i /home/pi/.ssh/id_rsa -R « +portssh+ »:127.0.0.1:22 -p PORTSSH USER@IPSERVER »)
except Exception as e:
log(« ERROR », "initSshReverse : "+str(e))
------------ END UTILS ------------
def scheduleNextStartup():
if internetUp:
# Ping interface and get info from it
try:
dataUPS = getUPS()
if dataUPS:
response = requests.get(‹ URLAPI ›+ id_materiel + dataUPS)
else:
response = requests.get(‹ URLAPI ›+ id_materiel)
# throw an error for status code 4xx or 5xx
if not response.ok:
response.raise_for_status()
data = response.json()
with open('/home/pi/script/heure_debut.txt', 'w') as f, open('/home/pi/script/heure_fin.txt', 'w') as f2, open('/home/pi/script/interval.txt', 'w') as f3, open('/home/pi/script/nb_photo_rotate.txt', 'w') as f4:
f.write(str(data['heureDebut']))
f2.write(str(data['heureFin']))
f3.write(str(data['intervalle']))
f4.write(str(data['nbPictureRotate']))
except Exception as e:
log("ERROR", "scheduleNextStartup - Get API & Write files : "+str(e))
with open('/home/pi/script/heure_debut.txt') as f, open('/home/pi/script/heure_fin.txt') as f2, open('/home/pi/script/interval.txt') as f3, open('/home/pi/script/nb_photo_rotate.txt') as f4:
heureDebut = f.read().strip()
heureFin = f2.read().strip()
interval = f3.read().strip()
dateNow = datetime.now()
dateNext = dateNow + timedelta(minutes=int(interval))
if(dateNow.strftime('%H:%M') >= heureFin):
dateNext = dateNow + timedelta(days=1)
dateNext = dateNext.replace(hour=int(heureDebut.split(":")[0]), minute=int(heureDebut.split(":")[1]), second=0, microsecond=0)
elif(dateNow.strftime('%H:%M') < heureDebut):
dateNext = dateNow
dateNext = dateNext.replace(hour=int(heureDebut.split(":")[0]), minute=int(heureDebut.split(":")[1]), second=0, microsecond=0)
else:
dateDebut = dateNow.replace(hour=int(heureDebut.split(":")[0]), minute=int(heureDebut.split(":")[1]), second=0, microsecond=0)
dateDiff = dateNow - dateDebut
nbIteration = ceil(dateDiff.total_seconds()/(int(interval)*60))
dateNext = dateDebut + timedelta(minutes=int(interval)*nbIteration)
if(dateNext.strftime('%H:%M') > heureFin):
dateNext = dateNext + timedelta(days=1)
dateNext = dateNext.replace(hour=int(heureDebut.split(":")[0]), minute=int(heureDebut.split(":")[1]), second=0, microsecond=0)
if(dateNext.strftime('%w') == "6"):
dateNext = dateNext + timedelta(days=2)
dateNext = dateNext.replace(hour=int(heureDebut.split(":")[0]), minute=int(heureDebut.split(":")[1]), second=0, microsecond=0)
if(dateNext.strftime('%w') == "0"):
dateNext = dateNext + timedelta(days=1)
dateNext = dateNext.replace(hour=int(heureDebut.split(":")[0]), minute=int(heureDebut.split(":")[1]), second=0, microsecond=0)
while True:
if(writeWitty(dateNext)):
if(readWritty(dateNext)):
break
else:
break
def takeNewPicture():
picam2 = Picamera2()
try:
StringTimeNow = datetime.today().strftime(‹ %Y-%m-%d_%H-%M-%S ›)
camera_config = picam2.create_still_configuration(main={« size »: (4056, 3040)}, lores={« size »: (640, 480)}, display=« lores »)
picam2.configure(camera_config)
picam2.start()
time.sleep(2)
picam2.capture_file(« /home/pi/photos/ »+StringTimeNow+« .jpg »)
except Exception as e:
log(« ERROR », "takeNewPicture : "+str(e))
picam2.stop()
def handleUploadRotate():
with open(‹ /home/pi/script/nb_photo_rotate.txt ›) as f:
nbFileRotate = int(f.read().strip())
dir_path = r’/home/pi/photos/’
nbFile = len([entry for entry in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, entry)) and entry.endswith(‹ .jpg ›)])
try:
if nbFile >= nbFileRotate:
filesName =[entry for entry in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, entry)) and entry.endswith(‹ .jpg ›)]
files = []
for fileName in filesName:
files.append(('pictures[]', (fileName, open(dir_path+fileName, 'rb'), 'image/jpeg')))
if files:
response = requests.post('URLAPI', data={'camera': id_materiel}, files=files)
if ((response.status_code == 200) and (response.json()['success'] == True)):
log("INFO", "handleUploadRotate - Upload success")
for fileName in filesName:
os.remove(dir_path+fileName)
log("INFO", "handleUploadRotate - Files removed")
else:
message = 'nbfiles:'+nbFile+',statuscode:'+response.status_code
response = requests.get('URLAPI'+ id_materiel +'&message'+ urllib.parse.quote(message))
log("ERROR", "handleUploadRotate - API answer bad : "+response.json())
else:
log("ERROR", "No file in handleUploadRotate")
except Exception as e:
log("ERROR", "handleUploadRotate : "+str(e))
def handleShutdown():
if(os.path.exists(« /home/pi/shutdown.txt »)):
os.system(« sudo shutdown -h now »)
else:
print(« No shutdown file, skip shutdown »)
------------ START SCRIPT ------------
try:
attempts = 0
attemptsModem = 0
internetUp = False
#internetUp = True
os.system("sudo nmcli con delete gsm")
os.system("sudo modprobe usbserial vendor=0x3566 product=0x2001; sudo usb_modeswitch -v 3566 -p 2001 -X; sudo modprobe option;")
#check and wait for modem to be registered
checkModem = subprocess.getoutput("sudo mmcli -m 0 | grep ' state' | awk '{print $3}'")
while attemptsModem < 50 and checkModem != "registered":
#while attemptsModem < 2 and checkModem != "registered":
log("INFO", "Modem not registered, retry in 2 seconds")
time.sleep(2)
checkModem = subprocess.getoutput("sudo mmcli -m 0 | grep ' state' | awk '{print $3}'")
attemptsModem += 1
while attempts < 11 and not internetUp:
#while attempts < 0 and not internetUp:
internetUp = isInternetUp()
if not internetUp:
log("INFO", "Internet not up, retry in 5 seconds")
resetConnection()
time.sleep(10)
attempts += 1
if not internetUp:
log("INFO", "Device don't have internet, do process without trying to access cloud")
else:
log("INFO", "Device have internet")
syncTime()
initSshReverse()
scheduleNextStartup()
takeNewPicture()
if internetUp:
handleUploadRotate()
else:
log("INFO", "No internet, skip handleUploadRotate")
except Exception as e:
log(« ERROR », "Main : "+str(e))
#os.system(« sudo nmcli con delete gsm »)
handleShutdown()