Source code for senor_octopus.sources.udp.protocols.micron_bolt_mini_2

"""
An UDP protocol for the Micron Bolt Mini 2 GPS tracker.
"""

import asyncio
import logging
import re
from datetime import datetime, timezone
from typing import List, Optional, Tuple, TypedDict, cast

import requests

_logger = logging.getLogger(__name__)


[docs]def parse_timestamp(timestamp: str) -> datetime: """ Parse the GPS timestamp into a datetime object. """ return datetime.strptime(timestamp, "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc)
[docs]class WifiAccessPointType(TypedDict): """ A WiFi access point. """ macAddress: str signalStrength: int
[docs]class MicronBoltMini2UDPProtocol(asyncio.DatagramProtocol): """ An UDP protocol for the Micron Bolt Mini 2 GPS tracker. To configure it, send the following commands to the tracker via SMS: - AT+GTQSS=AIR11,${apn},,,4,,1,${host1},${port1},${host2},${port2},,10,1,,,0001$ - AT+GTNMD=AIR11,8,2,4,15,240,,,,0002$ - AT+GTFRI=AIR11,1,1,,,0000,2359,86400,1,86400,1,001F,1000,1000,,,,,,,0003$ This will the device send GPS data once per day. If an API key is provided, the protocol will use the Google Geolocation service to determine the location of the device based on the WiFi access points it can see. """ def __init__(self, queue: asyncio.Queue, api_key: Optional[str] = None) -> None: self.queue = queue self.api_key = api_key self.transport: Optional[asyncio.DatagramTransport] = None
[docs] def connection_made(self, transport: asyncio.BaseTransport) -> None: self.transport = cast(asyncio.DatagramTransport, transport)
# pylint: disable=too-many-locals
[docs] def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: """ Handle incoming data. """ if self.transport is None: return _logger.debug("Received %r from %s", data, addr) message = data.decode().strip() parts = message.split(",") count = parts[-1].rstrip("$") reply = f"+SACK:{count}$\r\n" # heartbeat if parts[0] == "+ACK:GTHBD": protocol = parts[1] reply = f"+SACK:GTHBD,{protocol},{count}$\r\n" # WIFI information elif self.api_key and parts[0] in {"+RESP:GTWIF", "+BUFF:GTWIF"}: wifi_tokens = parts[5:-7] wifi_access_points: List[WifiAccessPointType] = [] while wifi_tokens: mac_address, signal_strength = wifi_tokens[:2] wifi_tokens = wifi_tokens[5:] wifi_access_points.append( { "macAddress": re.sub(r"(..)", r"\1:", mac_address), "signalStrength": int(signal_strength), }, ) payload = { "considerIp": False, "wifiAccessPoints": wifi_access_points, } response = requests.post( "https://www.googleapis.com/geolocation/v1/geolocate", params={"key": self.api_key}, json=payload, timeout=60, ) if response.ok: result = response.json() value = { "id": parts[2], "latitude": result["location"]["lat"], "longitude": result["location"]["lng"], "accuracy": result["accuracy"], "battery": float(parts[-3]), "send_time": parse_timestamp(parts[-2]), "source": "wifi", } self.queue.put_nowait(value) # GPS information elif parts[0] in {"+RESP:GTFRI", "+BUFF:GTFRI"}: value = { "id": parts[2], "accuracy": float(parts[7]), "speed": float(parts[8]), "azimuth": float(parts[9]), "altitude": float(parts[10]), "longitude": float(parts[11]), "latitude": float(parts[12]), "fix_time": parse_timestamp(parts[13]), "battery": float(parts[-3]), "send_time": parse_timestamp(parts[-2]), "source": "gps", } self.queue.put_nowait(value) self.transport.sendto(reply.encode(), addr)