Monitor running services using systemd and HomeAssistant

I was having issues with my Raspberry Pi, specifically the Bluetooth service running through systemd.  So naturally I wanted to be able to track when the service was reporting as failed or offline/disabled, and that manifested in another custom sensor setup for HomeAssistant.  This is a python script that will iterate through system services given in a list and create sensors in your HomeAssistant instance and have their state mapped to the sensor state (running = on, failed/stopped = off).  I have it set up to run every few minutes through a cron job, and its worked flawlessly so far.

import requests
import json
import subprocess
import time
import os

# systemd-service-sensor

# URL for homeassistant instance
hass_url = "http://HA_URL/api/states/sensor."

# Headers for homeassistant instance
hass_headers = {'Accept': 'application/json',
                'Content-Type': 'application/json',
                'x-ha-access': 'HA_PASSWORD'
               }

# Services I want to track
services = ["bluetooth.service","cron.service",
            "dasher.service","nginx.service",
            "ntp.service","ssh.service","supervisor.service"]

for service in services:

  # Get status information from systemctl
  service_info = subprocess.check_output("systemctl is-active " + service + "; exit 0",
                                         stderr=subprocess.STDOUT, 
                                         stdin=open(os.devnull), 
                                         shell=True).decode('utf-8').replace("\n","")

  # Generate payload for HASS
  hass_payload = {
        "state": service_info,
        "attributes": {
             "friendly_name": service.replace("."," ")
        }
  }

  # Formate sensor name to be *service*_service instead of *service*.service
  hass_sensor = service.replace(".","_")
  hass_sensor = hass_sensor.replace("-","_")

  # POST data to HASS
  hass_request = requests.post(hass_url + hass_sensor,
                               headers=hass_headers,
                               data=json.dumps(hass_payload))

 

Monitor Your Car with HomeAssistant and Dash

Recently I picked up a nice new bluetooth OBDII adapter for my car, specifically this one. I wanted the ability to automatically check and record information about my car (average trip distance, odometer rating, etc.) without having to do anything but getting in the car and driving.

The first part of the process was automatically recording my drives and uploading them to Dash. I achieved this by creating a Tasker profile to automatically launch the Dash app and start tracking my drive. The app talks to the OBDII adapter and uploads the information over my cell phone’s data connection to Dash’s servers.

The next part was getting access to the data uploaded through Dash’s API. The process was fairly simple, I used Hurl.it to set up and acquire an oauth token for use with the API. They have a lot of information that you can pull from their API endpoints, but I was most interested in the quantitative data (more specifically the odometer reading, distance traveled, fuel consumed).

I wrote a python script that I have set to run every 10 minutes on my raspberry pi to poll the data I wanted, and upload it to my HomeAssistant instance for tracking. The source code is below:

import requests
import json

# Custom Dash sensor python script

# Replace with your HomeAssistant Instance's URL
hass_url = "http://my-HA.com"

# Replace with your api_password you defined in configuration.yaml
hass_password = "hunter2"

# Make a request to the dash.by API
dash_url = "https://dash.by/api/chassis/v1/"
dash_header = {'Authorization': 'Bearer Token goes here'}

dash_request_user = requests.get(dash_url + "user", headers=dash_header)
dash_request_trip = requests.get(dash_url + "trips", headers=dash_header)

json_resp = dash_request_user.json()
trip_resp = dash_request_trip.json()

driver_score = str(json_resp["overallScore"])

vehicle_odometer = str(round(int(json_resp["currentVehicle"]["odometer"]), 0))

last_trip_time = str(round(int(trip_resp["result"][0]["stats"]["timeDriven"]), 0))
last_trip_average_speed = str(trip_resp["result"][0]["stats"]["averageSpeed"])
last_trip_distance_driven = str(round(int(trip_resp["result"][0]["stats"]["distanceDriven"]), 0))

driver_sensor = {
       "sensor_name": "sensor.dash_driver_score",
       "state": driver_score,
       "attributes": {"friendly_name": "Dash Drive Score", "unit_of_measurement": "points"}
}

vehicle_sensor = {
       "sensor_name": "sensor.dash_current_vehicle_odometer",
       "state": vehicle_odometer,
       "attributes": {"friendly_name": "Current Vehicle Odometer",
                      "unit_of_measurement": "miles"}
}

trip_duration_sensor = {
       "sensor_name": "sensor.dash_last_trip_time",
       "state": last_trip_time,
       "attributes": {"friendly_name": "Last Trip Duration", "unit_of_measurement": "min"}
}

trip_average_speed_sensor = {
       "sensor_name": "sensor.dash_last_trip_average_speed",
       "state": last_trip_average_speed,
       "attributes": {"friendly_name": "Last Trip Average Speed",
                      "unit_of_measurement": "mph"}
}

trip_distance_driven_sensor = {
       "sensor_name": "sensor.dash_last_trip_distance_driven",
       "state": last_trip_distance_driven,
       "attributes": {"friendly_name": "Last Trip Distance Driven",
                      "unit_of_measurement": "miles"}
}


sensor_list = []
sensor_list.append(driver_sensor)
sensor_list.append(vehicle_sensor)
sensor_list.append(trip_duration_sensor)
sensor_list.append(trip_average_speed_sensor)
sensor_list.append(trip_distance_driven_sensor)

for sensor in sensor_list:

  # Build the URL based on variables defined above
  hass_endpoint = hass_url + "/api/states/" + sensor["sensor_name"]

  # Define headers for HASS
  hass_headers = {'Accept': 'application/json',
                  'Content-Type': 'application/json',
                  'x-ha-access': hass_password}

  # Generate payload for HASS
  hass_payload = {
        "state": sensor["state"],
        "attributes": sensor["attributes"]
  }

  # POST data to HASS
  hass_request = requests.post(hass_endpoint,
                               headers=hass_headers,
                               data=json.dumps(hass_payload))