#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys, os import xmltodict import requests import json import time from pushbullet import Pushbullet pb = Pushbullet(os.environ['PB_APIKEY']) pin = os.environ['PIN'] PIN_ENTER_TEMPLATE = ''' 0 ''' + pin + ''' ''' SMS_LIST_TEMPLATE = ''' 1 20 1 0 0 0 ''' SMS_DEL_TEMPLATE = '{index}' """SMS_SEND_TEMPLATE = ''' -1 {phone} {content} {length} 1 {timestamp} ''' """ __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) def isHilink(device_ip): try: r = requests.get(url='http://' + device_ip + '/api/device/information', timeout=(2.0,2.0)) except requests.exceptions.RequestException as e: return False; if r.status_code != 200: return False return True def getHeaders(device_ip): token = None sessionID = None try: r = requests.get(url='http://' + device_ip + '/api/webserver/SesTokInfo') except requests.exceptions.RequestException as e: return (token, sessionID) try: d = xmltodict.parse(r.text, xml_attribs=True) if 'response' in d and 'TokInfo' in d['response']: token = d['response']['TokInfo'] d = xmltodict.parse(r.text, xml_attribs=True) if 'response' in d and 'SesInfo' in d['response']: sessionID = d['response']['SesInfo'] headers = {'__RequestVerificationToken': token, 'Cookie': sessionID} except: pass return headers def getSMS(device_ip, headers): r = requests.post(url = 'http://' + device_ip + '/api/sms/sms-list', data = SMS_LIST_TEMPLATE, headers = headers) d = xmltodict.parse(r.text, xml_attribs=True) numMessages = int(d['response']['Count']) messagesR = d['response']['Messages']['Message'] if numMessages == 1: temp = messagesR messagesR = [temp] messages = getContent(messagesR, numMessages) return messages, messagesR def statusPin(device_ip, headers): r = requests.get(url = 'http://' + device_ip + '/api/pin/status', headers = headers) d = xmltodict.parse(r.text, xml_attribs=True) state = int(d['response']['SimState']) return state def enterPin(pin, device_ip, headers): r = requests.post(url = 'http://' + device_ip + '/api/pin/operate', data = PIN_ENTER_TEMPLATE, headers = headers) d = xmltodict.parse(r.text, xml_attribs=True) print(d['response']) def getContent(data, numMessages): messages = [] for i in range(numMessages): message = data[i] number = message['Phone'] content = message['Content'] date = message['Date'] messages.append('Message from ' + number + ' recieved ' + date + ' : ' + str(content)) return messages def delMessage(device_ip, headers, ind): r = requests.post(url = 'http://' + device_ip + '/api/sms/delete-sms', data = SMS_DEL_TEMPLATE.format(index=ind), headers = headers) d = xmltodict.parse(r.text, xml_attribs=True) print(d['response']) def getUnread(device_ip, headers): r = requests.get(url = 'http://' + device_ip + '/api/monitoring/check-notifications', headers = headers) d = xmltodict.parse(r.text, xml_attribs=True) unread = int(d['response']['UnreadMessage']) return unread if __name__ == "__main__": while True: device_ip = os.environ['GW_IP'] if not isHilink(device_ip): print("Can't find a Huawei HiLink device on " + device_ip + ", exit") print('') sys.exit(-1) headers = getHeaders(device_ip) # Require to enter the PIN to unlock device if statusPin(device_ip, headers) == 260: print('# Configure PIN') enterPin(os.environ['PIN'], device_ip, headers) unread = getUnread(device_ip, headers) print('# Unread message(s) : %d' % unread) if unread != 0: messages, messagesR = getSMS(device_ip, headers) #Read if not os.path.exists(os.path.join(__location__,'data',)): os.makedirs(os.path.join(__location__,'data')) try: f1 = open(os.path.join(__location__,'data',"sms.json"), 'r') data = json.load(f1) f1.close() except: data = [] f3 = open(os.path.join(__location__,'data',"sms.txt"), 'a') # Ok, fonctionnel for i in range(len(messages)): # Json print('# Log JSON') data.append(messagesR[i]) # Text f3.write(messages[i] + '\n') # Pushbullet print('# Notif pushbullet') pb.push_note("SMS", "From: %s\nDate: %s\n%s" % (messagesR[i]['Phone'], messagesR[i]['Date'], messagesR[i]['Content'])) # Log HTTP print('# Notif HTTP') datam = {'sender' : messagesR[i]['Phone'], 'date': messagesR[i]['Date'], 'content': messagesR[i]['Content']} r = requests.post(url="https://sms.cabillot.eu", headers={'Content-Type': 'application/json' }, json=datam) #print(r.status_code) #Save f2 = open(os.path.join(__location__,'data',"sms.json"), 'w') json.dump(data, f2) f2.close() f3.close() #delete from device for i in range(len(messagesR)): headers = getHeaders(device_ip) delMessage(device_ip, headers, messagesR[i]['Index']) time.sleep(30)