Automatically Export SwitchBot Temperature & Humidity Data to CSV Using Python

PC

Automatically Fetch and Save SwitchBot Temperature and Humidity Data Every Hour with Python [Using API]

This post is a memo-style tutorial on how to automatically retrieve temperature and humidity data from SwitchBot devices via the cloud, and log them every hour using Python.

📌 Key Features of This Method

  • Utilizes the official SwitchBot API
  • Supports parallel processing for multiple devices
  • Compatible with Power BI and other visualization tools
  • Includes duplicate record prevention logic

💡 Requirements & Setup

  • SwitchBot Temperature and Humidity Meter + Hub
  • SwitchBot API token
  • Python 3.8 or higher (Windows/Linux)
  • Device IDs and a folder path for saving CSV files

💻 Full Python Script (with dummy values)

The following code collects temperature and humidity data every hour and appends it to CSV files with duplicate check logic:

import requests
import csv
import time
import os
import shutil
from datetime import datetime
import multiprocessing

API_TOKEN = os.getenv("SWITCHBOT_API_TOKEN")

DEVICE_INFO = {
    'AAAAAAAAAA': 'aaaa.csv',
    'BBBBBBBBBB': 'bbbb.csv',
    'CCCCCCCCCC': 'cccc.csv',
    'DDDDDDDDDD': 'dddd.csv',
}

URL_TEMPLATE = 'https://api.switch-bot.com/v1.0/devices/{}/status'

HEADERS = {
    'Authorization': API_TOKEN,
    'Content-Type': 'application/json; charset=utf8'
}

SOURCE_FOLDER = './data'
DESTINATION_FOLDER = './data'

def get_temperature_humidity(device_id):
    url = URL_TEMPLATE.format(device_id)
    response = requests.get(url, headers=HEADERS)
    data = response.json()
    temperature = data['body']['temperature']
    humidity = data['body']['humidity']
    return temperature, humidity

def read_last_two_timestamps(filename):
    last_two = []
    if os.path.exists(filename):
        with open(filename, 'r', newline='') as file:
            reader = csv.reader(file)
            next(reader)
            for row in reader:
                last_two.append(row[0])
    return last_two[-2:]

def save_to_csv(device_id, temperature, humidity):
    now = datetime.now().replace(minute=0, second=0, microsecond=0)
    timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
    filename = f'temperature_humidity_data_{device_id}.csv'
    file_path = os.path.join(SOURCE_FOLDER, filename)
    file_exists = os.path.isfile(file_path)

    with open(file_path, mode='a', newline='') as file:
        writer = csv.writer(file)
        if not file_exists:
            writer.writerow(['Timestamp', 'Temperature (°C)', 'Humidity (%)'])
        writer.writerow([timestamp, temperature, humidity])

    copy_csv(filename, DEVICE_INFO[device_id])

    last_two = read_last_two_timestamps(file_path)
    if len(last_two) == 2 and last_two[0] == last_two[1]:
        remove_last_row(file_path)

    copied_file_path = os.path.join(DESTINATION_FOLDER, DEVICE_INFO[device_id])
    last_two_copied = read_last_two_timestamps(copied_file_path)
    if len(last_two_copied) == 2 and last_two_copied[0] == last_two_copied[1]:
        remove_last_row(copied_file_path)

def copy_csv(source_filename, destination_filename):
    if not os.path.exists(DESTINATION_FOLDER):
        os.makedirs(DESTINATION_FOLDER)
    shutil.copy(
        os.path.join(SOURCE_FOLDER, source_filename),
        os.path.join(DESTINATION_FOLDER, destination_filename)
    )

def remove_last_row(filename):
    with open(filename, 'r+', newline='') as file:
        lines = file.readlines()
        file.seek(0)
        file.writelines(lines[:-1])
        file.truncate()

def process_device(device_id):
    while True:
        try:
            temperature, humidity = get_temperature_humidity(device_id)
            save_to_csv(device_id, temperature, humidity)
            print(f'{device_id}: {temperature}°C, {humidity}% saved at {datetime.now()}')
        except Exception as e:
            print(f'Error ({device_id}): {e}')
        time.sleep(3600)

def main():
    processes = []
    for device_id in DEVICE_INFO.keys():
        p = multiprocessing.Process(target=process_device, args=(device_id,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

if __name__ == '__main__':
    main()

🔐 Security & Privacy Notes

Replace the following placeholders with your actual settings:

PlaceholderDescription
os.getenv("SWITCHBOT_API_TOKEN")Your SwitchBot API token
'AAAAAAAAAA' etc.Your individual device IDs
'aaaa.csv' etc.Your target CSV file names

📊 Visualization Ideas & Applications

Run on a Raspberry Pi or similar device for 24/7 tracking

Load the CSV data into Power BI Desktop for visualization

Sync to Google Sheets for cloud-based monitoring

Monitor multiple rooms in your home simultaneously

コメント

Copied title and URL