Raspberry Pi 4 のUSBカメラで顔認識後スマートプラグで家電の電源制御


今回掲載のソースコードは生成AIが出力したもので、詳細はチェックしていないため動作保証対象外です。
参考程度にご覧ください。

制作の経緯

中学生の息子がもうすぐ冬休みになります。息子はテレビが大好きで、約束した時間をオーバーしてしまいます。(親としては、もう中学生なので時間くらいは守ってもらいたいのですが)

そこで、テレビを見る際にテレビの前で顔認証を行い、認証されたらテレビの電源が通電するようスマートプラグで制御します。

スマートプラグとは、Wifiに繋がる電源コンセントで通常はスマートフォンにインストールした専用アプリを使って制御するのですが、今回はRaspberry PiからPythonを使って制御します。

使用したスマートプラグは、「Tapo P105」です。

こちらのスマートプラグは、Wifi専用のためRaspberry PiもWifi接続する必要がありますが、他のスマートプラグだとBluteooth対応のものもあるため、そちらであれば、オフラインで構築することも可能と思います。

機材一覧

  • Raspberry Pi 4 Model B
  • USBカメラ LOGICOOL ウェブカム HD画質 120万画素 C270
  • 7インチ HDMIモニタ

処理内容

顔認証学習

今回のシステムでは複数人の顔認証に対応します。1人につき3枚の画像を撮影し、その画像を元に顔認証データの学習を行います。

顔認証

学習完了後は自動的に顔認証モードになります。

登録済みの人の場合には、スマートプラグの電源をONにし、テレビが映るようにします。
未登録の場合にはスマートプラグの電源がOFFのままなので、テレビは映りません。

顔認証処理は常時行われ、最後に認証した時点から1時間経過するとスマートプラグの電源をOFFにします。

特定人物の顔認証処理と電源管理

特定人物(今回は息子)の場合には、1日に30分間のみテレビを見られるように特殊処理を行います。
(ちなみに、息子は自分のPCを持っており、そちらで動画なども視聴するためテレビは30分にしています)

毎日0時になるとリセットされ、30分の視聴が可能になります。

デモ映像

テスト用に顔認識後、ライトを点灯し、人がいなくなってから15秒後に消灯するようにしたデモ動画です。

ソースコード

冒頭にも記載しましたが今回のコースコードは生成AIが作成したもので、あまり処理内容をチェックしていないため参考程度にご覧ください。

最初はTapoの制御に「tapoPlugApi」を利用するコードになっていましたが、実行時にエラーが発生していたため「PyP100」に切り替えました。

import cv2
import numpy as np
import os
from PyP100 import PyP100
import time
from datetime import datetime, timedelta

# Tapo接続情報
# スマートプラグのIPアドレス, アカウントメールアドレス, アカウントパスワード
tapo = PyP100.P100('xxx.xxx.xxx.xxx', 'xxxxxx@xxxxxxxxxx', 'xxxxxxxxx')

# Tapoの電源状態を保持する変数(初期値はoff)
tapo_power_state = False

# 最後に登録済み人物を検知した時刻
last_detection_time = None

# satoshiの1日の使用時間を管理する変数
satoshi_usage_time = timedelta()
satoshi_last_on_time = None

# satoshiの1日の使用時間制限(分)
SATOSHI_DAILY_LIMIT = 30

# 顔検出器の初期化
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

# 顔認識器の初期化
recognizer = cv2.face.LBPHFaceRecognizer_create()

# 画像保存ディレクトリ
image_dir = 'faces'
if not os.path.exists(image_dir):
    os.makedirs(image_dir)

# 登録済み人物のリスト
people = []

def train_model():
    faces = []
    labels = []
    if not os.listdir(image_dir):
        print('学習用データが存在しません。')
        return False
        
    for person in os.listdir(image_dir):
        person_dir = os.path.join(image_dir, person)
        if os.path.isdir(person_dir):
            people.append(person)
            for image_file in os.listdir(person_dir):
                image_path = os.path.join(person_dir, image_file)
                image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
                if image is None:
                    continue
                faces.append(image)
                labels.append(people.index(person))
    
    if len(faces) > 0:
        recognizer.train(faces, np.array(labels))
        recognizer.write('trained_model.yml')
        return True
    return False

def capture_images(name):
    cap = cv2.VideoCapture(0)
    count = 0
    while count < 3:
        ret, frame = cap.read()
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)
        
        for (x, y, w, h) in faces:
            cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
        
        cv2.imshow('Capture', frame)
        
        key = cv2.waitKey(1)
        if key == ord('c'):
            person_dir = os.path.join(image_dir, name)
            if not os.path.exists(person_dir):
                os.makedirs(person_dir)
            img_path = os.path.join(person_dir, f'{name}_{count}.jpg')
            cv2.imwrite(img_path, gray[y:y+h, x:x+w])
            print(f'画像保存: {img_path}')
            count += 1
        elif key == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

def reset_usage_time():
    global satoshi_usage_time
    now = datetime.now()
    if now.hour == 0 and now.minute == 0:
        satoshi_usage_time = timedelta()
        print("satoshiの使用時間をリセットしました")

def recognize_face():
    global tapo_power_state, last_detection_time, satoshi_usage_time, satoshi_last_on_time
    cap = cv2.VideoCapture(0)
    while True:
        reset_usage_time()
        ret, frame = cap.read()
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)
        
        registered_person_detected = False
        
        for (x, y, w, h) in faces:
            roi_gray = gray[y:y+h, x:x+w]
            id_, conf = recognizer.predict(roi_gray)
            if conf < 70:
                name = people[id_]
                cv2.putText(frame, name, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                print(f'認識された人物: {name}')
                registered_person_detected = True
                last_detection_time = datetime.now()
                
                if name == 'satoshi':
                    if not tapo_power_state and satoshi_usage_time < timedelta(minutes=SATOSHI_DAILY_LIMIT):
                        tapo.turnOn()
                        tapo_power_state = True
                        satoshi_last_on_time = datetime.now()
                        print('Tapo P105の電源をONにしました')
                    elif tapo_power_state:
                        current_usage = datetime.now() - satoshi_last_on_time
                        satoshi_usage_time += current_usage
                        satoshi_last_on_time = datetime.now()
                        if satoshi_usage_time >= timedelta(minutes=SATOSHI_DAILY_LIMIT):
                            tapo.turnOff()
                            tapo_power_state = False
                            print('satoshiの使用時間が30分を超えたため、Tapo P105の電源をOFFにしました')
                else:
                    if not tapo_power_state:
                        tapo.turnOn()
                        tapo_power_state = True
                        print('Tapo P105の電源をONにしました')
            else:
                cv2.putText(frame, 'Unknown', (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            
            cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
        
        if not registered_person_detected and last_detection_time is not None:
            time_since_last_detection = datetime.now() - last_detection_time
            if time_since_last_detection > timedelta(hours=1) and tapo_power_state:
                tapo.turnOff()
                tapo_power_state = False
                print('1時間経過したため、Tapo P105の電源をOFFにしました')
        
        cv2.imshow('Face Recognition', frame)
        
        key = cv2.waitKey(1)
        if key == ord('q'):
            break
        elif key == ord('t'):
            name = input('登録する人物の名前を入力してください: ')
            capture_images(name)
            train_model()
    
    cap.release()
    cv2.destroyAllWindows()

def register_new_person():
    print('新規登録を開始します')
    name = input('登録する人物の名前を入力してください: ')
    capture_images(name)
    return train_model()

def start_recognition():
    print('顔認証を開始します')
    recognize_face()

if __name__ == '__main__':
    model_exists = False
    
    if os.path.exists('trained_model.yml'):
        recognizer.read('trained_model.yml')
        if os.listdir(image_dir):
            train_model()  # 既存モデルの更新
        model_exists = True
    elif os.listdir(image_dir):
        model_exists = train_model()
    
    if not model_exists:
        print('学習モデルが存在しないため、登録処理を開始します')
        if register_new_person():
            print('登録が完了しました')
            start_recognition()
        else:
            print('登録に失敗しました')
            exit(1)
    else:
        start_recognition()