#!/usr/bin/env python import sys, signal, gi from math import ceil from collections import namedtuple from sys import exit import traceback gi.require_version('Qmi', '1.0') from gi.repository import GLib, Gio, Qmi main_loop = None device = None def die(message): sys.stderr.write("Quitting") exit(1) def signal_handler(data): main_loop.quit() def device_close_ready(dev,result,user_data=None): try: dev.close_finish(result) except GLib.GError as error: die(sys.stderr.write("error: couldn't close QMI device: %s\n" % error.message)) main_loop.quit() def device_close(): device.close_async(10, None, device_close_ready, None) def release_client_ready(dev,result,user_data=None): try: dev.release_client_finish(result) except GLib.GError as error: sys.stderr.write("error: couldn't release QMI client: %s\n" % error.message) device_close() def release_client(client): device.release_client(client, Qmi.DeviceReleaseClientFlags.RELEASE_CID, 10, None, release_client_ready, None) def get_operation_mode_ready(client, result, user_data=None): try: output = client.get_operation_mode_finish(result) if output.get_result() != None: die(sys.stderr.write("unexpected result from get_operation_mode:", output.get_result())) except GLib.GError as error: die(sys.stderr.write("error: couldn't get operation mode: %s\n" % error.message)) def on_gnss_data(client, output, user_data=None): sys.stdout.write(output.get_nmea_string()) def register_events_ready(client, result, user_data=None): try: output = client.register_events_finish(result) if output.get_result() != None: die(sys.stderr.write("unexpected result from register_events:", output.get_result())) client.connect("nmea", on_gnss_data) except GLib.GError as error: die(sys.stderr.write("error: couldn't register events: %s\n" % error.message)) def start_streaming_ready(client, result, user_data=None): try: output = client.start_finish(result) if output.get_result() == None: events = Qmi.MessageLocRegisterEventsInput() events.set_event_registration_mask(Qmi.LocEventRegistrationFlag.NMEA) client.register_events(events, 10, None, register_events_ready, None) except GLib.GError as error: die(sys.stderr.write("error: couldn't start: %s\n" % error.message)) def start_streaming(client): fix_type = Qmi.LocFixRecurrenceType.PERIODIC_FIXES nput = Qmi.MessageLocStartInput() nput.set_minimum_interval_between_position_reports(1000) # milliseconds? nput.set_intermediate_report_state(True) nput.set_fix_recurrence_type(fix_type) nput.set_session_id(9) client.start(nput, 10, None, start_streaming_ready, None) def on_get_operation_mode(client, output): print("operation mode indication ", output.get_operation_mode(), file=sys.stderr) start_streaming(client) def allocate_client_ready(dev,result,user_data=None): try: client = dev.allocate_client_finish(result) except GLib.GError as error: sys.stderr.write("error: couldn't allocate QMI client: %s\n" % error.message) device_close() return client.connect("get-operation-mode", on_get_operation_mode) client.get_operation_mode(None, 10, None, get_operation_mode_ready, None) def open_ready(dev,result,user_data=None): try: dev.open_finish(result) except GLib.GError as error: sys.stderr.write("error: couldn't open QMI device: %s\n" % error.message) main_loop.quit() return device.allocate_client(Qmi.Service.LOC, Qmi.CID_NONE, 10, None, allocate_client_ready, None) def new_ready(unused,result,user_data=None): try: global device device = Qmi.Device.new_finish(result) except GLib.GError as error: sys.stderr.write("error: couldn't create QMI device: %s\n" % error.message) main_loop.quit() return device.open(Qmi.DeviceOpenFlags.PROXY | Qmi.DeviceOpenFlags.AUTO, 10, None, open_ready, None) if __name__ == "__main__": # Process input arguments if len(sys.argv) != 2: sys.stderr.write('error: wrong number of arguments\n') sys.stdout.write('usage: qmi-nmea \n') sys.exit(1) # Create Qmi device asynchronously file = Gio.File.new_for_path(sys.argv[1]) Qmi.Device.new (file, None, new_ready, None) # Main loop main_loop = GLib.MainLoop() GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, signal_handler, None) GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, signal_handler, None) try: main_loop.run() except KeyboardInterrupt: pass