From cb7ce3aac43a6099250d41d38a9294bcea978c09 Mon Sep 17 00:00:00 2001 From: Aart van Halteren Date: Fri, 10 Apr 2026 16:16:32 +0200 Subject: [PATCH] Fixed RMS calculation --- README.md | 29 ++++++++++++ boot.py | 11 +++++ config.py | 4 ++ delayedswitch.py | 91 +++++++++++++++++++++++++++++++++++++ main.py | 75 +++++++++++++++++++++++++++++++ urequests.py | 114 +++++++++++++++++++++++++++++++++++++++++++++++ wifi.py | 42 +++++++++++++++++ 7 files changed, 366 insertions(+) create mode 100644 README.md create mode 100644 boot.py create mode 100644 config.py create mode 100644 delayedswitch.py create mode 100644 main.py create mode 100644 urequests.py create mode 100644 wifi.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..5c99430 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +## Flashing Micropython + +Download the latest firmware from +https://micropython.org/download/?port=esp8266 +(ESP8266 with 2MiB+ flash) + +Flash Wemos D1 mini with (note the -fm dout option) + +esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash -fm dout --flash_size=detect 0 esp8266-20220618-v1.19.1.bin + +## Access webrepl + +minicom -D /dev/ttyUSB0 + +Password for webrepl: badkamer + +When the program is running, webrepl is not responsive. Get the prompt >>> with Ctrl+C. + + +## Flashing source code + +ampy --port /dev/ttyUSB0 put . / + +Flash Wemos D1 mini with (note the -fm dout option) + +esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash -fm dout --flash_size=detect 0 esp8266-20220618-v1.19.1.bin + + + diff --git a/boot.py b/boot.py new file mode 100644 index 0000000..3951a1c --- /dev/null +++ b/boot.py @@ -0,0 +1,11 @@ +# Configuration is stored in a separate config.py +from config import WIFI_ESSID, WIFI_PASSWORD, HOSTNAME +from wifi import wifi_connect, disable_wifi_ap +import webrepl + +# setup network +disable_wifi_ap() +wifi_connect(WIFI_ESSID, WIFI_PASSWORD,HOSTNAME) + +# Enabel webrepl +webrepl.start() diff --git a/config.py b/config.py new file mode 100644 index 0000000..b77e51b --- /dev/null +++ b/config.py @@ -0,0 +1,4 @@ +WIFI_ESSID='van-halteren-iot' +WIFI_PASSWORD='M0squ1ttoFh@m' +HOSTNAME = 'badkamer' +DELAY=160000 diff --git a/delayedswitch.py b/delayedswitch.py new file mode 100644 index 0000000..1e76f43 --- /dev/null +++ b/delayedswitch.py @@ -0,0 +1,91 @@ +import machine +import time +from machine import Timer +import urequests as requests + + +# +# Class for handling alarms and calling FHEM +# +class DelayedSwitch: + + def __init__(self,delay,rtc): + self.__delay = delay + self.__alarm = None + self.__is_light_on = False + self.__rtc = rtc + + + def _get_switch_status(self): + # check status of light + r = self._call_ccu3("http://10.0.10.81/addons/red/switch/state") + try: + data = r.json() + self.__is_light_on = data.get('state', False) + except: + print("Error parsing JSON response") + self.__is_light_on = False + r.close() + if self.__is_light_on: + print("Light is already on.") + else: + print("Light is off.") + + # + # Switch of the light through FHEM + def _switch_off_handler(self, alarm): + print("Switching light off") + r = self._call_ccu3("http://10.0.10.81/addons/red/switch/off") + self.__is_light_on = False + # close the response object + r.close() + + # + # Call CCU3 + # + def _call_ccu3(self,url): + try: + r = requests.get(url) + except OSError as oer: + print("Resetting. We received OSSeror: ", oer) + machine.reset() + return r + + def is_light_on(self): + return self.__is_light_on + + def set_delay(self,d): + self.__delay = d + + def absence(self): + # cancel any previous time (if any) + if self.__alarm != None: + print("Canceling previous alarm") + self.__alarm.deinit() + + print("Alarm started to switch off in:", self.__delay) + self.__alarm = Timer(-1) + self.__alarm.init(period=self.__delay, mode=Timer.ONE_SHOT, callback=self._switch_off_handler) + #self.__alarm = Timer.Alarm(self._switch_off_handler, self.__delay, periodic=False) + + + def presence(self): + # cancel any previous time (if any) + if self.__alarm != None: + print("Canceling previous alarm") + self.__alarm.deinit() + + # Check if we light is really on + # (in case somebody manually switched it off) + if self.__is_light_on: + self._get_switch_status() + + # + # Switch light on + # + if not self.__is_light_on: + print("Switching light on") + r = self._call_ccu3("http://10.0.10.81/addons/red/switch/on") + self.__is_light_on = True + # close the response object + r.close() diff --git a/main.py b/main.py new file mode 100644 index 0000000..6d1d7a1 --- /dev/null +++ b/main.py @@ -0,0 +1,75 @@ +import utime +import gc +from delayedswitch import DelayedSwitch +from ntptime import settime +from config import DELAY +from machine import ADC + +import machine + +from machine import Pin +# Wemos D1 pin14 = D5 +d5 = Pin(14, machine.Pin.IN, machine.Pin.PULL_UP) + +# voltage meter ZMPT101B is attached to pin A0 +ad0 = ADC(0) + +def read_voltage(): + nsamples = 50 + total = 0 + sq_total = 0 + for _ in range(nsamples): + v = ad0.read() + total += v + sq_total += v * v + utime.sleep_ms(2) + mean = total / nsamples + return (sq_total / nsamples - mean * mean) ** 0.5 + +# check if voltage is over a threshold +# read_voltage returns between 2-3 when there is no AC input +# read_voltage returns between 123-127 when there is AC input +# it seems 100 is a reasonable threshold +def high_voltage(): + v = read_voltage() + hv = v > 100 + # if hv: + # print("High voltage: ", v) + return hv + +# setup rtc +settime() +rtc = machine.RTC() +utime.sleep_ms(750) +print('\nRTC Set from NTP to UTC:', rtc.datetime()) + +def pir_thread(switch): + pir_on = False + while True: # change this! + utime.sleep_ms(20) + if high_voltage(): + # was PIR on in previous cycle? + if not pir_on: + # print some debug info + if not switch.is_light_on(): + d = "%d %d %d %d, %02d:%02d:%02d" % rtc.datetime()[0:7] + print("Presence detected at: %s." % d) + + switch.presence() + pir_on = True + else: + if pir_on: + d = "%d %d %d %d, %02d:%02d:%02d" % rtc.datetime()[0:7] + print("Absense detected at: %s." % d) + switch.absence() + pir_on = False + + gc.collect() + + +print("Starting pir thread") +switch = DelayedSwitch(DELAY,rtc) # delay in ms +pir_thread(switch) + +# Not reached +print("Done") diff --git a/urequests.py b/urequests.py new file mode 100644 index 0000000..0c4f399 --- /dev/null +++ b/urequests.py @@ -0,0 +1,114 @@ +import usocket + +class Response: + + def __init__(self, f): + self.raw = f + self.encoding = "utf-8" + self._cached = None + + def close(self): + if self.raw: + self.raw.close() + self.raw = None + self._cached = None + + @property + def content(self): + if self._cached is None: + self._cached = self.raw.read() + self.raw.close() + self.raw = None + return self._cached + + @property + def text(self): + return str(self.content, self.encoding) + + def json(self): + import ujson + return ujson.loads(self.content) + + +def request(method, url, data=None, json=None, headers={}, stream=None): + try: + proto, dummy, host, path = url.split("/", 3) + except ValueError: + proto, dummy, host = url.split("/", 2) + path = "" + if proto == "http:": + port = 80 + elif proto == "https:": + import ussl + port = 443 + else: + raise ValueError("Unsupported protocol: " + proto) + + if ":" in host: + host, port = host.split(":", 1) + port = int(port) + + ai = usocket.getaddrinfo(host, port) + addr = ai[0][-1] + s = usocket.socket() + s.connect(addr) + if proto == "https:": + s = ussl.wrap_socket(s, server_hostname=host) + s.write(b"%s /%s HTTP/1.0\r\n" % (method, path)) + if not "Host" in headers: + s.write(b"Host: %s\r\n" % host) + # Iterate over keys to avoid tuple alloc + for k in headers: + s.write(k) + s.write(b": ") + s.write(headers[k]) + s.write(b"\r\n") + if json is not None: + assert data is None + import ujson + data = ujson.dumps(json) + if data: + s.write(b"Content-Length: %d\r\n" % len(data)) + s.write(b"\r\n") + if data: + s.write(data) + + l = s.readline() + protover, status, msg = l.split(None, 2) + status = int(status) + #print(protover, status, msg) + while True: + l = s.readline() + if not l or l == b"\r\n": + break + #print(l) + if l.startswith(b"Transfer-Encoding:"): + if b"chunked" in l: + raise ValueError("Unsupported " + l) + elif l.startswith(b"Location:") and not 200 <= status <= 299: + raise NotImplementedError("Redirects not yet supported") + + resp = Response(s) + resp.status_code = status + resp.reason = msg.rstrip() + return resp + + +def head(url, **kw): + return request("HEAD", url, **kw) + +def get(url, **kw): + return request("GET", url, **kw) + +def post(url, **kw): + return request("POST", url, **kw) + +def put(url, **kw): + return request("PUT", url, **kw) + +def patch(url, **kw): + return request("PATCH", url, **kw) + +def delete(url, **kw): + return request("DELETE", url, **kw) + diff --git a/wifi.py b/wifi.py new file mode 100644 index 0000000..9cb17d3 --- /dev/null +++ b/wifi.py @@ -0,0 +1,42 @@ +import network +import utime + +def wifi_connect(essid, password,hostname): + # Connect to the wifi. Based on the example in the micropython + # documentation. + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + # Set DHCP host name to recognize the device in your router + wlan.config(dhcp_hostname=hostname) + if not wlan.isconnected(): + print('connecting to network ' + essid + '...') + wlan.connect(essid, password) + # connect() appears to be async - waiting for it to complete + while not wlan.isconnected(): + print('waiting for connection...') + utime.sleep(4) + print('checking connection...') + print('Wifi connect successful, network config: %s' % repr(wlan.ifconfig())) + else: + # Note that connection info is stored in non-volatile memory. If + # you are connected to the wrong network, do an explicity disconnect() + # and then reconnect. + print('Wifi already connected, network config: %s' % repr(wlan.ifconfig())) + +def wifi_disconnect(): + # Disconnect from the current network. You may have to + # do this explicitly if you switch networks, as the params are stored + # in non-volatile memory. + wlan = network.WLAN(network.STA_IF) + if wlan.isconnected(): + print("Disconnecting...") + wlan.disconnect() + else: + print("Wifi not connected.") + +def disable_wifi_ap(): + # Disable the built-in access point. + wlan = network.WLAN(network.AP_IF) + wlan.active(False) + print('Disabled access point, network status is %s' % + wlan.status())