Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Micropython
#1
Can I program the KC868-A6 PLC with micropython?
Reply
#2
sure, ESP32 support program code by micopython.
Reply
#3
I have some Micropython code working with the KC868-A6.  For some reason the clock can be set and the time read back fine with Micropython when it can't be set reliably with any of the Arduino code I've tested on this board.

This is a simple get/set time script. I tried various other more common get/set time methods without success but this one works well. To set the time remove the # before this line in the code below  -  #await set_time(i2c, year, month, day, hour, minute, second), then enter the current time in the section below this (# Set initial time using variables) and run the script. To read the time replace the # and run the script. 


Code:
from machine import Pin, SoftI2C
import time
import uasyncio as asyncio  # Use uasyncio for MicroPython

# DS1307 I2C address
DS1307_ADDRESS = 0x68

def bcd_to_decimal(bcd):
    return (bcd >> 4) * 10 + (bcd & 0x0F)

def decimal_to_bcd(decimal):
    return ((decimal // 10) << 4) | (decimal % 10)

async def safe_writeto_mem(i2c, address, mem_address, data):
    try:
        i2c.writeto_mem(address, mem_address, bytes(data))
    except Exception as e:
        print("Error writing to I2C:", e)

async def safe_readfrom_mem(i2c, address, mem_address, num_bytes):
    try:
        return i2c.readfrom_mem(address, mem_address, num_bytes)
    except Exception as e:
        print("Error reading from I2C:", e)
        return None

async def set_time(i2c, year, month, day, hour, minute, second):
    weekday = (time.mktime((year, month, day, hour, minute, second, 0, 0, -1)) // 86400) % 7

    data = [
        decimal_to_bcd(second),
        decimal_to_bcd(minute),
        decimal_to_bcd(hour),
        decimal_to_bcd(weekday),
        decimal_to_bcd(day),
        decimal_to_bcd(month),
        decimal_to_bcd(year - 2000)
    ]
   
    await safe_writeto_mem(i2c, DS1307_ADDRESS, 0x00, data)
   
    control = await safe_readfrom_mem(i2c, DS1307_ADDRESS, 0x00, 1)
    if control is not None:
        control = control[0] & 0x7F
        await safe_writeto_mem(i2c, DS1307_ADDRESS, 0x00, [control])

async def get_time(i2c):
    data = await safe_readfrom_mem(i2c, DS1307_ADDRESS, 0x00, 7)
    if data is not None:
        second = bcd_to_decimal(data[0] & 0x7F)
        minute = bcd_to_decimal(data[1])
        hour = bcd_to_decimal(data[2] & 0x3F)
        day = bcd_to_decimal(data[4])
        month = bcd_to_decimal(data[5])
        year = bcd_to_decimal(data[6]) + 2000
        return year, month, day, hour, minute, second
    return None, None, None, None, None, None

# Initialize SoftI2C
scl_pin = Pin(15)
sda_pin = Pin(4)
i2c = SoftI2C(scl=scl_pin, sda=sda_pin, freq=100000)

# Set initial time using variables
year = 2025
month = 3
day = 16
hour = 9
minute = 26
second = 6

async def main():
    global year, month, day, hour, minute, second  # Declare as global
    # Set the time asynchronously
    #await set_time(i2c, year, month, day, hour, minute, second)

    # Loop to read time asynchronously
    while True:
        year, month, day, hour, minute, second = await get_time(i2c)
        if year is not None:
            print(f"Time: {hour:02}:{minute:02}:{second:02}, Date: {year:04}/{month:02}/{day:02}")
        await asyncio.sleep(1)

# Run the main loop
asyncio.run(main())
Reply
#4
here is DS1307 RTC arduino demo code for KC868-A6: https://www.kincony.com/forum/showthread.php?tid=1863
Reply
#5
This script is a temperature controller using a DHT22 to switch a cooling device (fan, AC etc.) and DS18B20 to control a heating device (Heater, incubator, heat mat etc) at the same time. They could both be used to control a heating device or a cooling device, the logic in the code would just need to be adjusted/inverted and the serial messages altered to suit.

The DHT22 is connected to IO-2 (switches relay 3)
The DSB1820 is connected to IO-1 (switches relay 2)

Code:
from machine import Pin, SoftI2C
import onewire
import ds18x20
import dht
import uasyncio as asyncio  # Use uasyncio for MicroPython

# PCF8574 I2C addresses
PCF8574_OUTPUT_ADDRESS = 0x24  # Address for controlling relays
PCF8574_INPUT_ADDRESS = 0x22    # Address for reading inputs

# Initialize SoftI2C
scl_pin = Pin(15) 
sda_pin = Pin(4)  
i2c = SoftI2C(scl=scl_pin, sda=sda_pin, freq=100000)

# Initialize OneWire and DS18B20
ow_pin = Pin(32)  # Adjust according to your wiring for the DS18B20
ow = onewire.OneWire(ow_pin)
ds = ds18x20.DS18X20(ow)

# Initialize DHT22
dht_pin = Pin(33)  # Adjust according to your wiring for the DHT22
dht_sensor = dht.DHT22(dht_pin)

# Temperature thresholds for DS18B20
TEMP_THRESHOLD_ON_DS18B20 = 26.0  # Temperature to turn on relay 2 (in °C)
TEMP_THRESHOLD_OFF_DS18B20 = 27.5  # Temperature to turn off relay 2 (in °C)

# Temperature thresholds for DHT22
TEMP_THRESHOLD_ON_DHT22 = 29  # Temperature to turn on relay 3 (in °C)
TEMP_THRESHOLD_OFF_DHT22 = 27.5  # Temperature to turn off relay 3 (in °C)

# Humidity thresholds for DHT22
HUMIDITY_THRESHOLD_ON_DHT22 = 85.0  # Humidity to turn on relay 3 (in %)
HUMIDITY_THRESHOLD_OFF_DHT22 = 80.0  # Humidity to turn off relay 3 (in %)

# Function to write to a pin
async def digital_write(pin, value):
    # Read current state
    current_state = i2c.readfrom(PCF8574_OUTPUT_ADDRESS, 1)[0]
   
    # Set or clear the specific pin
    if value == 1:
        current_state |= (1 << pin)  # Set the pin HIGH
    else:
        current_state &= ~(1 << pin)  # Set the pin LOW
   
    # Write back the new state
    i2c.writeto(PCF8574_OUTPUT_ADDRESS, bytes([current_state]))

# Function to turn off all relays
async def turn_off_all_relays():
    # Set all pins to HIGH (0xFF) to turn OFF all relays
    for pin in range(6):  # Assuming PCF8574 has 6 pins
        await digital_write(pin, 1)  # Set each pin HIGH to turn off the relay
    print("All relays are turned OFF.")

# Function to read temperature from DS18B20 and control relay 1
async def read_ds18b20_temperature():
    roms = ds.scan()  # Scan for devices on the OneWire bus
    if roms:
        ds.convert_temp()  # Start temperature conversion
        await asyncio.sleep(1)  # Wait for conversion to complete
        for rom in roms:
            temperature = ds.read_temp(rom)  # Read temperature
            print("DS18B20 Temperature: {:.2f} °C".format(temperature))
           
            # Control relay 2 based on temperature
            if temperature <= TEMP_THRESHOLD_ON_DS18B20:
                await digital_write(1, 0)  # Turn on relay 2 (active LOW)
                print("Relay 2 is ON due to low temperature.")
            elif temperature >= TEMP_THRESHOLD_OFF_DS18B20:
                await digital_write(1, 1)  # Turn off relay 2 (active LOW)
                print("Relay 2 is OFF due to high temperature.")
    else:
        print("No DS18B20 sensor found.")

# Function to read temperature and humidity from DHT22 and control relays
async def read_dht22():
    try:
        dht_sensor.measure()  # Trigger a measurement
        temperature = dht_sensor.temperature()  # Get temperature in °C
        humidity = dht_sensor.humidity()  # Get humidity in %

        print("DHT22 Temperature: {:.2f} °C, Humidity: {:.2f} %".format(temperature, humidity))
       
        # Determine if the relay should be on or off
        relay_on = False

        # Check temperature conditions
        if temperature >= TEMP_THRESHOLD_ON_DHT22:
            relay_on = True
            print("Relay 3 is ON due to high temp.")
        elif temperature <= TEMP_THRESHOLD_OFF_DHT22:
            print("Temperature is below threshold, but checking humidity.")

        # Check humidity conditions
        if humidity >= HUMIDITY_THRESHOLD_ON_DHT22:
            relay_on = True
            print("Relay 3 is ON due to high humidity.")
        elif humidity <= HUMIDITY_THRESHOLD_OFF_DHT22:
            print("Humidity is below threshold.")

        # Control relay based on the combined conditions
        if relay_on:
            await digital_write(2, 0)  # Turn on relay 3 (active LOW)
            print("Relay 3 is ON.")
        else:
            await digital_write(2, 1)  # Turn off relay 3 (active LOW)
            print("Relay 3 is OFF.")

    except Exception as e:
        print("Error reading DHT22 sensor:", e)

async def main():
    await turn_off_all_relays()
    while True:  # Continuously check the temperature and humidity
        await read_ds18b20_temperature()  # Read from DS18B20
        await read_dht22()  # Read from DHT22
        await asyncio.sleep(5)  # Check every 5 seconds

# Run the main loop
asyncio.run(main())

(03-15-2025, 11:20 PM)admin Wrote: here is DS1307 RTC arduino demo code for KC868-A6: https://www.kincony.com/forum/showthread.php?tid=1863

That code doesn't work on the boards I have, I bought three boards and they're all the same. All I get is garbage in the serial monitor and no time is displayed.  The KCS firmware is useless to me too because the RTC cannot be set. That's why I'm using Micropython, it's the only way I can get any sense out of the clock and salvage some use out of the boards.

If there's a way to configure the KCS firmware to use another RTC on the I2C port then I might be able to use that but without that all three boards may as well go in the rubbish. Without a functional clock they are useless to me. They might be OK with home assistant but I don't need all that extra overhead, I just want a standalone system with a working clock.
Reply
#6
A few more code snippets which could come in handy for someone.

Cycle relays
Code:
from machine import SoftI2C, Pin
import uasyncio as asyncio
# Set I2C address and initialize SoftI2C
i2c = SoftI2C(scl=Pin(15), sda=Pin(4))  # Adjust the pins as needed
PCF8574_ADDRESS = 0x24
# Function to write to a pin
async def digital_write(pin, value):
    # Read current state
    current_state = i2c.readfrom(PCF8574_ADDRESS, 1)[0]
   
    # Set or clear the specific pin
    if value == 1:
        current_state |= (1 << pin)  # Set the pin
    else:
        current_state &= ~(1 << pin)  # Clear the pin
   
    # Write back the new state
    i2c.writeto(PCF8574_ADDRESS, bytes([current_state]))

# Function to initialize the PCF8574
async def init_pcf8574():
    print("Init PCF8574...")
    try:
        # Attempt to read from the device to check if it's connected
        i2c.readfrom(PCF8574_ADDRESS, 1)
        print("OK")
    except OSError:
        print("KO")

# Main loop
async def main():
    await init_pcf8574()   
    while True:
        for pin in range(6):
            await digital_write(pin, 1)  # Set pin HIGH
            await asyncio.sleep(0.2)
        for pin in range(6):
            await digital_write(pin, 0)  # Set pin LOW
            await asyncio.sleep(0.2)

# Run the main function
asyncio.run(main())

All relays off
Code:
from machine import Pin, SoftI2C
import uasyncio as asyncio  # Use uasyncio for MicroPython

# PCF8574 I2C address
PCF8574_ADDRESS = 0x24

# Initialize SoftI2C
scl_pin = Pin(15)  # Adjust according to your wiring
sda_pin = Pin(4)   # Adjust according to your wiring
i2c = SoftI2C(scl=scl_pin, sda=sda_pin, freq=100000)

# Function to write to a pin
async def digital_write(pin, value):
    # Read current state
    current_state = i2c.readfrom(PCF8574_ADDRESS, 1)[0]
   
    # Set or clear the specific pin
    if value == 1:
        current_state |= (1 << pin)  # Set the pin HIGH
    else:
        current_state &= ~(1 << pin)  # Set the pin LOW
   
    # Write back the new state
    i2c.writeto(PCF8574_ADDRESS, bytes([current_state]))

# Function to turn off all relays
async def turn_off_all_relays():
    # Set all pins to HIGH (0xFF) to turn OFF all relays
    for pin in range(8):  # Assuming PCF8574 has 8 pins
        await digital_write(pin, 1)  # Set each pin HIGH to turn off the relay
    print("All relays are turned OFF.")

async def main():
    await turn_off_all_relays()

# Run the main loop
asyncio.run(main())

All relays on
Code:
from machine import Pin, SoftI2C
import uasyncio as asyncio  # Use uasyncio for MicroPython

# PCF8574 I2C address
PCF8574_ADDRESS = 0x24

# Initialize SoftI2C
scl_pin = Pin(15)  # Adjust according to your wiring
sda_pin = Pin(4)   # Adjust according to your wiring
i2c = SoftI2C(scl=scl_pin, sda=sda_pin, freq=100000)

# Function to write to a pin
async def digital_write(pin, value):
    # Read current state
    current_state = i2c.readfrom(PCF8574_ADDRESS, 1)[0]
   
    # Set or clear the specific pin
    if value == 1:
        current_state |= (1 << pin)  # Set the pin HIGH
    else:
        current_state &= ~(1 << pin)  # Set the pin LOW
   
    # Write back the new state
    i2c.writeto(PCF8574_ADDRESS, bytes([current_state]))

# Function to turn off all relays
async def turn_off_all_relays():
    # Set all pins to LOW (0x00) to turn ON all relays
    for pin in range(6):  # Assuming PCF8574 has 6 pins
        await digital_write(pin, 0)  # Set each pin LOW to turn on the relay
    print("All relays are turned ON.")

async def main():
    await turn_off_all_relays()

# Run the main loop
asyncio.run(main())

I had to do a bit of black magic to get the onboard RTC working with Micropython, so the method to read the time and the method to set the RTC are slightly unusual. Quite a few other methods gave me the same unreliable results I was seeing with Arduino code to use clock functions on this board but this method is working for me with no hardware changes.

24 hour relay timer and working onboard RTC
Code:
from machine import Pin, SoftI2C
import uasyncio as asyncio  # Use uasyncio for MicroPython

# DS1307 I2C address
DS1307_ADDRESS = 0x68
# PCF8574 I2C address
PCF8574_ADDRESS = 0x24

def bcd_to_decimal(bcd):
    return (bcd >> 4) * 10 + (bcd & 0x0F)

async def safe_writeto_mem(i2c, address, mem_address, data):
    try:
        i2c.writeto_mem(address, mem_address, bytes(data))
    except Exception as e:
        print("Error writing to I2C:", e)

async def safe_readfrom_mem(i2c, address, mem_address, num_bytes):
    try:
        return i2c.readfrom_mem(address, mem_address, num_bytes)
    except Exception as e:
        print("Error reading from I2C:", e)
        return None

async def get_time(i2c):
    data = await safe_readfrom_mem(i2c, DS1307_ADDRESS, 0x00, 7)
    if data is not None:
        second = bcd_to_decimal(data[0] & 0x7F)
        minute = bcd_to_decimal(data[1])
        hour = bcd_to_decimal(data[2] & 0x3F)
        day = bcd_to_decimal(data[4])
        month = bcd_to_decimal(data[5])
        year = bcd_to_decimal(data[6]) + 2000
        return year, month, day, hour, minute, second
    return None, None, None, None, None, None

# Initialize SoftI2C
scl_pin = Pin(15)
sda_pin = Pin(4)
i2c = SoftI2C(scl=scl_pin, sda=sda_pin, freq=100000)

# Function to write to a pin
async def digital_write(pin, value):
    # Read current state
    current_state = await safe_readfrom_mem(i2c, PCF8574_ADDRESS, 0x00, 1)
    if current_state is not None:
        current_state = current_state[0]
       
        # Invert the logic: Set or clear the specific pin
        if value == 1:  # Turn ON the relay (set pin LOW)
            current_state &= ~(1 << pin)  # Set the pin LOW
        else:  # Turn OFF the relay (set pin HIGH)
            current_state |= (1 << pin)  # Set the pin HIGH
       
        # Write back the new state
        await safe_writeto_mem(i2c, PCF8574_ADDRESS, 0x00, [current_state])
        print(f"Relay {pin} is {'ON' if value == 1 else 'OFF'}")

async def main():
    # Turn off all relays initially
    await safe_writeto_mem(i2c, PCF8574_ADDRESS, 0x00, [0xFF])  # Set all pins HIGH (turn OFF all relays)
    print("All relays are turned OFF.")

    # Define the target time to turn on and off the relay
    target_on_hour = 16  # Set to the desired hour to turn ON
    target_on_minute = 32  # Set to the desired minute to turn ON
    target_off_hour = 16  # Set to the desired hour to turn OFF
    target_off_minute = 34  # Set to the desired minute to turn OFF

    # Loop to read time asynchronously
    while True:
        year, month, day, hour, minute, second = await get_time(i2c)
        if year is not None:
            print(f"Time: {hour:02}:{minute:02}:{second:02}, Date: {year:04}/{month:02}/{day:02}")

            # Check if the current time matches the target time to turn ON the relay
            if hour == target_on_hour and minute == target_on_minute:
                # Turn off all relays first
                await safe_writeto_mem(i2c, PCF8574_ADDRESS, 0x00, [0xFF])  # Set all pins HIGH (turn OFF all relays)
               
                # Now turn on Relay 0
                await safe_writeto_mem(i2c, PCF8574_ADDRESS, 0x00, [0xFE])  # Set pin 0 LOW (turn ON Relay 0)
                print("Relay 0 is turned ON.")

            # Check if the current time matches the target time to turn OFF the relay
            if hour == target_off_hour and minute == target_off_minute:
                # Turn off Relay 0 using safe_writeto_mem
                await safe_writeto_mem(i2c, PCF8574_ADDRESS, 0x00, [0xFF])  # Set all pins HIGH (turn OFF all relays)
                print("Relay 0 is turned OFF.")

            # Sleep for a short duration before checking the time again
            await asyncio.sleep(30)  # Check every 30 seconds

# Run the main loop
asyncio.run(main())

So set the RTC first with the get/set script (in my first post in this thread) and then adjust the on & off times in the code above. All these scripts can be saved on the 868-A6 at the same time, so upload the ones you need and run the get/set time script to set the clock.

I plan on integrating these functions into a package with a webserver interface but I don't have the time to work on it right now but if anyone else buys this board and has trouble with the RTC it could give you a few options. Just piece together the functions you need into one script.

A Micropython I2C scanner, a handy little utility that I upload to any microcontroller I'm working with along with whatever code I'm working on. SCL & SDA pins set for the 868_A6 board.

I2C scanner
Code:
# I2C Scanner MicroPython
from machine import Pin, SoftI2C

# You can choose any other combination of I2C pins
i2c = SoftI2C(scl=Pin(15), sda=Pin(4))

print('I2C SCANNER')
devices = i2c.scan()

if len(devices) == 0:
  print("No i2c device !")
else:
  print('i2c devices found:', len(devices))

  for device in devices:
    print("I2C hexadecimal address: ", hex(device))
Reply
#7
thanks share your code.
Reply
#8
This is to create an access point with a webpage that serves a simple "hello world" using asyncio. I try to use asyncio whenever possible so I don't have blocking code stopping everything and causing timing issues.

Code:
import socket
import network
#import esp
import uasyncio as asyncio
import gc

#esp.osdebug(None)
gc.collect()

ssid = 'New AP'
password = '123456789'

ap = network.WLAN(network.AP_IF)
ap.config(essid=ssid, authmode=network.AUTH_WPA_WPA2_PSK, password=password)
ap.active(True)

while not ap.active():
    pass

print(('Access point started -'), ssid)
print(ap.ifconfig())

def web_page():
    html = """<html><head><meta name="viewport" content="width=device-width, initial-scale=1"></head>
    <body><h1>Hello, World!</h1></body></html>"""
    return html

async def handle_client(conn):
    print('Got a connection')
    request = await conn.recv(1024)
    print('Content = %s' % str(request))
    response = web_page()
    conn.send(response)
    conn.close()

async def main():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('', 80))
    s.listen(5)

    while True:
        conn, addr = s.accept()
        print('Got a connection from %s' % str(addr))
        asyncio.create_task(handle_client(conn))

# Run the main function
asyncio.run(main())

This creates an asyncio access point with a static IP address. You can configure your own addressing scheme in the ap.config line.

Code:
import socket
import network
import uasyncio as asyncio

ssid = 'Access Point'
password = 'password'

ap = network.WLAN(network.AP_IF)
ap.config(essid=ssid, authmode=network.AUTH_WPA_WPA2_PSK, password=password)
# Static IP
ap.ifconfig(('192.168.100.205', '255.255.255.0', '192.168.100.2', '8.8.8.8'))
ap.active(True)

# Wait for WiFi to go active
async def wait_for_wifi():
    wait_counter = 0
    while not ap.active():
        print("waiting " + str(wait_counter))
        await asyncio.sleep(0.5)
        wait_counter += 1

async def print_status():
    await asyncio.sleep(1)  # Give some time for the AP to be active
    print('WiFi active')
    status = ap.ifconfig()
    print('subnet mask = ' + status[1])
    print('gateway  = ' + status[2])
    print('DNS server = ' + status[3])
    print('IP address = ' + status[0])
    print('AP Name = ', (ssid))

def web_page():
    html = """<html><head><meta name="viewport" content="width=device-width, initial-scale=1"></head>
    <body><h1>Hello, World!</h1></body></html>"""
    return html

async def handle_client(conn):
    print('Got a connection')
    request = await conn.recv(1024)
    print('Content = %s' % str(request))
    response = web_page()
    conn.send(response)
    conn.close()

async def main():
    await wait_for_wifi()
    await print_status()

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('', 80))
    s.listen(5)

    while True:
        conn, addr = s.accept()
        print('Got a connection from %s' % str(addr))
        asyncio.create_task(handle_client(conn))

# Run the main function
asyncio.run(main())
Reply


Forum Jump:


Users browsing this thread:
1 Guest(s)