rewrite qmi-nmea in python

This commit is contained in:
Daniel Barlow 2025-05-12 00:22:50 +01:00
parent 3a8043a0cb
commit 0ae8c85966
3 changed files with 176 additions and 23 deletions

View File

@ -1,21 +1,9 @@
PREFIX?=/usr/local
CFLAGS=\
-DPACKAGE_VERSION=0.1 -I. \
$(shell $(PKG_CONFIG) --cflags qmi-glib) \
$(shell $(PKG_CONFIG) --cflags mbim-glib) \
$(shell $(PKG_CONFIG) --cflags glib-2.0)
LDFLAGS=\
$(shell $(PKG_CONFIG) --libs qmi-glib) \
$(shell $(PKG_CONFIG) --libs mbim-glib) \
$(shell $(PKG_CONFIG) --libs glib-2.0)
default: qmi-nmea
SRCS=qmi-nmea.c qmicli-loc.c qmi-nmea.c
qmi-nmea: $(patsubst %.c,%.o,$(SRCS))
qmi-nmea: qmi-nmea.py
cp $< $@
install:
install -D -t $(PREFIX)/bin qmi-nmea

View File

@ -1,16 +1,24 @@
{
stdenv,
libqmi,
libmbim,
glib,
pkg-config
stdenv, libqmi, gobject-introspection, python3,
python3Packages,
wrapGAppsHook3
}:
stdenv.mkDerivation {
name = "qmi-nmea";
src = ./.;
nativeBuildInputs = [ pkg-config ];
buildInputs = [ libqmi libmbim glib ];
PACKAGE_VERSION="1";
makeFlags = [ "PREFIX=${placeholder "out"}" ];
buildInputs = [
(python3.withPackages (ppkgs: [
ppkgs.pygobject3
]))
gobject-introspection.dev
libqmi
];
nativeBuildInputs = [
gobject-introspection
wrapGAppsHook3
];
}

157
pkgs/qmi-nmea/qmi-nmea.py Normal file
View File

@ -0,0 +1,157 @@
#!/usr/bin/env python
import sys, signal, gi
from math import ceil
from collections import namedtuple
from sys import exit
import traceback
gi.require_version('Qmi', '1.0')
from gi.repository import GLib, Gio, Qmi
main_loop = None
device = None
def die(message):
sys.stderr.write("Quitting")
exit(1)
def signal_handler(data):
main_loop.quit()
def device_close_ready(dev,result,user_data=None):
try:
dev.close_finish(result)
except GLib.GError as error:
die(sys.stderr.write("error: couldn't close QMI device: %s\n" % error.message))
main_loop.quit()
def device_close():
device.close_async(10, None, device_close_ready, None)
def release_client_ready(dev,result,user_data=None):
try:
dev.release_client_finish(result)
except GLib.GError as error:
sys.stderr.write("error: couldn't release QMI client: %s\n" % error.message)
device_close()
def release_client(client):
device.release_client(client, Qmi.DeviceReleaseClientFlags.RELEASE_CID, 10, None, release_client_ready, None)
def on_operation_mode(client, output, task):
print("on operation mode ", client, output, task)
print(output.get_operation_mode())
def get_operation_mode_ready(client, result, user_data=None):
try:
output = client.get_operation_mode_finish(result)
# should be None for no error
print("operation mode result", output.get_result())
client.connect("get-operation-mode", on_operation_mode)
except GLib.GError as error:
sys.stderr.write("error: couldn't get operation mode: %s\n" % error.message)
def on_gnss_data(client, output, user_data=None):
sys.stdout.write(output.get_nmea_string())
def register_events_ready(client, result, user_data=None):
try:
output = client.register_events_finish(result)
print("output", output.get_result())
client.connect("nmea", on_gnss_data)
except GLib.GError as error:
sys.stderr.write("error: couldn't register events: %s\n" % error.message)
def start_streaming_ready(client, result, user_data=None):
print("streaming ready", client, result)
try:
output = client.start_finish(result)
if output.get_result() == None:
events = Qmi.MessageLocRegisterEventsInput()
events.set_event_registration_mask(Qmi.LocEventRegistrationFlag.NMEA)
print("events, dear boy", events)
client.register_events(events, 10, None, register_events_ready, None)
except GLib.GError as error:
sys.stderr.write("error: couldn't start: %s\n" % error.message)
def start_streaming(client):
fix_type = Qmi.LocFixRecurrenceType.PERIODIC_FIXES
nput = Qmi.MessageLocStartInput()
nput.set_minimum_interval_between_position_reports(1000) # milliseconds?
nput.set_intermediate_report_state(True)
nput.set_fix_recurrence_type(fix_type)
nput.set_session_id(9)
client.start(nput, 10, None, start_streaming_ready, None)
def on_get_operation_mode(client, output):
print("operation mode indication ", output.get_operation_mode())
start_streaming(client)
def allocate_client_ready(dev,result,user_data=None):
try:
client = dev.allocate_client_finish(result)
except GLib.GError as error:
sys.stderr.write("error: couldn't allocate QMI client: %s\n" % error.message)
device_close()
return
print("client id ", client.get_cid())
client.connect("get-operation-mode", on_get_operation_mode)
client.get_operation_mode(None, 10, None, get_operation_mode_ready, None)
def open_ready(dev,result,user_data=None):
try:
dev.open_finish(result)
except GLib.GError as error:
sys.stderr.write("error: couldn't open QMI device: %s\n" % error.message)
main_loop.quit()
return
device.allocate_client(Qmi.Service.LOC, Qmi.CID_NONE, 10, None, allocate_client_ready, None)
def new_ready(unused,result,user_data=None):
try:
global device
device = Qmi.Device.new_finish(result)
except GLib.GError as error:
sys.stderr.write("error: couldn't create QMI device: %s\n" % error.message)
main_loop.quit()
return
device.open(Qmi.DeviceOpenFlags.PROXY | Qmi.DeviceOpenFlags.AUTO, 10, None, open_ready, None)
if __name__ == "__main__":
# Process input arguments
if len(sys.argv) != 2:
sys.stderr.write('error: wrong number of arguments\n')
sys.stdout.write('usage: qmi-nmea <DEVICE>\n')
sys.exit(1)
# Create Qmi device asynchronously
file = Gio.File.new_for_path(sys.argv[1])
Qmi.Device.new (file, None, new_ready, None)
# Main loop
main_loop = GLib.MainLoop()
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, signal_handler, None)
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, signal_handler, None)
try:
main_loop.run()
except TypeError:
print(traceback.format_exc())
except KeyboardInterrupt:
pass