Fixed RMS calculation

This commit is contained in:
2026-04-10 16:16:32 +02:00
commit cb7ce3aac4
7 changed files with 366 additions and 0 deletions

29
README.md Normal file
View File

@@ -0,0 +1,29 @@
## Flashing Micropython
Download the latest firmware from
https://micropython.org/download/?port=esp8266
(ESP8266 with 2MiB+ flash)
Flash Wemos D1 mini with (note the -fm dout option)
esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash -fm dout --flash_size=detect 0 esp8266-20220618-v1.19.1.bin
## Access webrepl
minicom -D /dev/ttyUSB0
Password for webrepl: badkamer
When the program is running, webrepl is not responsive. Get the prompt >>> with Ctrl+C.
## Flashing source code
ampy --port /dev/ttyUSB0 put . /
Flash Wemos D1 mini with (note the -fm dout option)
esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash -fm dout --flash_size=detect 0 esp8266-20220618-v1.19.1.bin

11
boot.py Normal file
View File

@@ -0,0 +1,11 @@
# Configuration is stored in a separate config.py
from config import WIFI_ESSID, WIFI_PASSWORD, HOSTNAME
from wifi import wifi_connect, disable_wifi_ap
import webrepl
# setup network
disable_wifi_ap()
wifi_connect(WIFI_ESSID, WIFI_PASSWORD,HOSTNAME)
# Enabel webrepl
webrepl.start()

4
config.py Normal file
View File

@@ -0,0 +1,4 @@
WIFI_ESSID='van-halteren-iot'
WIFI_PASSWORD='M0squ1ttoFh@m'
HOSTNAME = 'badkamer'
DELAY=160000

91
delayedswitch.py Normal file
View File

@@ -0,0 +1,91 @@
import machine
import time
from machine import Timer
import urequests as requests
#
# Class for handling alarms and calling FHEM
#
class DelayedSwitch:
def __init__(self,delay,rtc):
self.__delay = delay
self.__alarm = None
self.__is_light_on = False
self.__rtc = rtc
def _get_switch_status(self):
# check status of light
r = self._call_ccu3("http://10.0.10.81/addons/red/switch/state")
try:
data = r.json()
self.__is_light_on = data.get('state', False)
except:
print("Error parsing JSON response")
self.__is_light_on = False
r.close()
if self.__is_light_on:
print("Light is already on.")
else:
print("Light is off.")
#
# Switch of the light through FHEM
def _switch_off_handler(self, alarm):
print("Switching light off")
r = self._call_ccu3("http://10.0.10.81/addons/red/switch/off")
self.__is_light_on = False
# close the response object
r.close()
#
# Call CCU3
#
def _call_ccu3(self,url):
try:
r = requests.get(url)
except OSError as oer:
print("Resetting. We received OSSeror: ", oer)
machine.reset()
return r
def is_light_on(self):
return self.__is_light_on
def set_delay(self,d):
self.__delay = d
def absence(self):
# cancel any previous time (if any)
if self.__alarm != None:
print("Canceling previous alarm")
self.__alarm.deinit()
print("Alarm started to switch off in:", self.__delay)
self.__alarm = Timer(-1)
self.__alarm.init(period=self.__delay, mode=Timer.ONE_SHOT, callback=self._switch_off_handler)
#self.__alarm = Timer.Alarm(self._switch_off_handler, self.__delay, periodic=False)
def presence(self):
# cancel any previous time (if any)
if self.__alarm != None:
print("Canceling previous alarm")
self.__alarm.deinit()
# Check if we light is really on
# (in case somebody manually switched it off)
if self.__is_light_on:
self._get_switch_status()
#
# Switch light on
#
if not self.__is_light_on:
print("Switching light on")
r = self._call_ccu3("http://10.0.10.81/addons/red/switch/on")
self.__is_light_on = True
# close the response object
r.close()

75
main.py Normal file
View File

@@ -0,0 +1,75 @@
import utime
import gc
from delayedswitch import DelayedSwitch
from ntptime import settime
from config import DELAY
from machine import ADC
import machine
from machine import Pin
# Wemos D1 pin14 = D5
d5 = Pin(14, machine.Pin.IN, machine.Pin.PULL_UP)
# voltage meter ZMPT101B is attached to pin A0
ad0 = ADC(0)
def read_voltage():
nsamples = 50
total = 0
sq_total = 0
for _ in range(nsamples):
v = ad0.read()
total += v
sq_total += v * v
utime.sleep_ms(2)
mean = total / nsamples
return (sq_total / nsamples - mean * mean) ** 0.5
# check if voltage is over a threshold
# read_voltage returns between 2-3 when there is no AC input
# read_voltage returns between 123-127 when there is AC input
# it seems 100 is a reasonable threshold
def high_voltage():
v = read_voltage()
hv = v > 100
# if hv:
# print("High voltage: ", v)
return hv
# setup rtc
settime()
rtc = machine.RTC()
utime.sleep_ms(750)
print('\nRTC Set from NTP to UTC:', rtc.datetime())
def pir_thread(switch):
pir_on = False
while True: # change this!
utime.sleep_ms(20)
if high_voltage():
# was PIR on in previous cycle?
if not pir_on:
# print some debug info
if not switch.is_light_on():
d = "%d %d %d %d, %02d:%02d:%02d" % rtc.datetime()[0:7]
print("Presence detected at: %s." % d)
switch.presence()
pir_on = True
else:
if pir_on:
d = "%d %d %d %d, %02d:%02d:%02d" % rtc.datetime()[0:7]
print("Absense detected at: %s." % d)
switch.absence()
pir_on = False
gc.collect()
print("Starting pir thread")
switch = DelayedSwitch(DELAY,rtc) # delay in ms
pir_thread(switch)
# Not reached
print("Done")

114
urequests.py Normal file
View File

@@ -0,0 +1,114 @@
import usocket
class Response:
def __init__(self, f):
self.raw = f
self.encoding = "utf-8"
self._cached = None
def close(self):
if self.raw:
self.raw.close()
self.raw = None
self._cached = None
@property
def content(self):
if self._cached is None:
self._cached = self.raw.read()
self.raw.close()
self.raw = None
return self._cached
@property
def text(self):
return str(self.content, self.encoding)
def json(self):
import ujson
return ujson.loads(self.content)
def request(method, url, data=None, json=None, headers={}, stream=None):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
import ussl
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
ai = usocket.getaddrinfo(host, port)
addr = ai[0][-1]
s = usocket.socket()
s.connect(addr)
if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if not "Host" in headers:
s.write(b"Host: %s\r\n" % host)
# Iterate over keys to avoid tuple alloc
for k in headers:
s.write(k)
s.write(b": ")
s.write(headers[k])
s.write(b"\r\n")
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
if data:
s.write(b"Content-Length: %d\r\n" % len(data))
s.write(b"\r\n")
if data:
s.write(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(l)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in l:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:") and not 200 <= status <= 299:
raise NotImplementedError("Redirects not yet supported")
resp = Response(s)
resp.status_code = status
resp.reason = msg.rstrip()
return resp
def head(url, **kw):
return request("HEAD", url, **kw)
def get(url, **kw):
return request("GET", url, **kw)
def post(url, **kw):
return request("POST", url, **kw)
def put(url, **kw):
return request("PUT", url, **kw)
def patch(url, **kw):
return request("PATCH", url, **kw)
def delete(url, **kw):
return request("DELETE", url, **kw)

42
wifi.py Normal file
View File

@@ -0,0 +1,42 @@
import network
import utime
def wifi_connect(essid, password,hostname):
# Connect to the wifi. Based on the example in the micropython
# documentation.
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Set DHCP host name to recognize the device in your router
wlan.config(dhcp_hostname=hostname)
if not wlan.isconnected():
print('connecting to network ' + essid + '...')
wlan.connect(essid, password)
# connect() appears to be async - waiting for it to complete
while not wlan.isconnected():
print('waiting for connection...')
utime.sleep(4)
print('checking connection...')
print('Wifi connect successful, network config: %s' % repr(wlan.ifconfig()))
else:
# Note that connection info is stored in non-volatile memory. If
# you are connected to the wrong network, do an explicity disconnect()
# and then reconnect.
print('Wifi already connected, network config: %s' % repr(wlan.ifconfig()))
def wifi_disconnect():
# Disconnect from the current network. You may have to
# do this explicitly if you switch networks, as the params are stored
# in non-volatile memory.
wlan = network.WLAN(network.STA_IF)
if wlan.isconnected():
print("Disconnecting...")
wlan.disconnect()
else:
print("Wifi not connected.")
def disable_wifi_ap():
# Disable the built-in access point.
wlan = network.WLAN(network.AP_IF)
wlan.active(False)
print('Disabled access point, network status is %s' %
wlan.status())