Devuan fork of gpsd
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

309 lines
12 KiB

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Copyright (c) 2019 Grand Joldes (grandwork2@yahoo.com).
  4. #
  5. # This file is Copyright (c) 2019 by the GPSD project
  6. #
  7. # SPDX-License-Identifier: BSD-2-clause
  8. # This code run compatibly under Python 3.x for x >= 6.
  9. """aiogps.py -- Asyncio Python interface to GPSD.
  10. This module adds asyncio support to the Python gps interface. It runs on
  11. Python versions >= 3.6 and provides the following benefits:
  12. - easy integration in asyncio applications (all I/O operations done through
  13. non-blocking coroutines, async context manager, async iterator);
  14. - support for cancellation (all operations are cancellable);
  15. - support for timeouts (on both read and connect);
  16. - support for connection keep-alive (using the TCP keep alive mechanism)
  17. - support for automatic re-connection;
  18. - configurable connection parameters;
  19. - configurable exeption handling (internally or by application);
  20. - logging support (logger name: 'gps.aiogps').
  21. The use of timeouts, keepalive and automatic reconnection make possible easy
  22. handling of GPSD connections over unreliable networks.
  23. Examples:
  24. import logging
  25. import gps.aiogps
  26. # configuring logging
  27. logging.basicConfig()
  28. logging.root.setLevel(logging.INFO)
  29. # Example of setting up logging level for the aiogps logger
  30. logging.getLogger('gps.aiogps').setLevel(logging.ERROR)
  31. # using default parameters
  32. async with gps.aiogps.aiogps() as gpsd:
  33. async for msg in gpsd:
  34. # Log last message
  35. logging.info(f'Received: {msg}')
  36. # Log updated GPS status
  37. logging.info(f'\nGPS status:\n{gpsd}')
  38. # using custom parameters
  39. try:
  40. async with gps.aiogps.aiogps(
  41. connection_args = {
  42. 'host': '192.168.10.116',
  43. 'port': 2947
  44. },
  45. connection_timeout = 5,
  46. reconnect = 0, # do not try to reconnect, raise exceptions
  47. alive_opts = {
  48. 'rx_timeout': 5
  49. }
  50. ) as gpsd:
  51. async for msg in gpsd:
  52. logging.info(msg)
  53. except asyncio.CancelledError:
  54. return
  55. except asyncio.IncompleteReadError:
  56. logging.info('Connection closed by server')
  57. except asyncio.TimeoutError:
  58. logging.error('Timeout waiting for gpsd to respond')
  59. except Exception as exc:
  60. logging.error(f'Error: {exc}')
  61. """
  62. __all__ = ['aiogps', ]
  63. import logging
  64. import asyncio
  65. import socket
  66. from typing import Optional, Union, Awaitable
  67. from .client import gpsjson, dictwrapper
  68. from .gps import gps, gpsdata, WATCH_ENABLE, PACKET_SET
  69. from .misc import polystr, polybytes
  70. class aiogps(gps): # pylint: disable=R0902
  71. """An asyncio gps client.
  72. Reimplements all gps IO methods using asyncio coros. Adds connection
  73. management, an asyncio context manager and an asyncio iterator.
  74. The class uses a logger named 'gps.aiogps' to record events. The logger is
  75. configured with a NullHandler to disable any message logging until the
  76. application configures another handler.
  77. """
  78. def __init__(self, # pylint: disable=W0231
  79. connection_args: Optional[dict] = None,
  80. connection_timeout: Optional[float] = None,
  81. reconnect: Optional[float] = 2,
  82. alive_opts: Optional[dict] = None) -> None:
  83. """
  84. Arguments:
  85. connection_args: arguments needed for opening a connection.
  86. These will be passed directly to asyncio.open_connection.
  87. If set to None, a connection to the default gps host and port
  88. will be attempded.
  89. connection_timeout: time to wait for a connection to complete
  90. (seconds). Set to None to disable.
  91. reconnect: configures automatic reconnections:
  92. - 0: reconnection is not attempted in case of an error and the
  93. error is raised to the user;
  94. - number > 0: delay until next reconnection attempt (seconds).
  95. alive_opts: options related to detection of disconnections.
  96. Two mecanisms are supported: TCP keepalive (default, may not be
  97. available on all platforms) and Rx timeout, through the
  98. following options:
  99. - rx_timeout: Rx timeout (seconds). Set to None to disable.
  100. - SO_KEEPALIVE: socket keepalive and related parameters:
  101. - TCP_KEEPIDLE
  102. - TCP_KEEPINTVL
  103. - TCP_KEEPCNT
  104. """
  105. # If connection_args are not specified use defaults
  106. self.connection_args = connection_args or {
  107. 'host': self.host,
  108. 'port': self.port
  109. }
  110. self.connection_timeout = connection_timeout
  111. assert reconnect >= 0
  112. self.reconnect = reconnect
  113. # If alive_opts are not specified use defaults
  114. self.alive_opts = alive_opts or {
  115. 'rx_timeout': None,
  116. 'SO_KEEPALIVE': 1,
  117. 'TCP_KEEPIDLE': 2,
  118. 'TCP_KEEPINTVL': 2,
  119. 'TCP_KEEPCNT': 3
  120. }
  121. # Connection access streams
  122. self.reader: Optional[asyncio.StreamReader] = None
  123. self.writer: Optional[asyncio.StreamWriter] = None
  124. # Set up logging
  125. self.logger = logging.getLogger(__name__)
  126. # Set the Null handler - prevents logging message handling unless the
  127. # application sets up a handler.
  128. self.logger.addHandler(logging.NullHandler())
  129. # Init gps parents
  130. gpsdata.__init__(self) # pylint: disable=W0233
  131. gpsjson.__init__(self) # pylint: disable=W0233
  132. # Provide the response in both 'str' and 'bytes' form
  133. self.bresponse = b''
  134. self.response = polystr(self.bresponse)
  135. # Default stream command
  136. self.stream_command = self.generate_stream_command(WATCH_ENABLE)
  137. self.loop = self.connection_args.get('loop', asyncio.get_event_loop())
  138. def __del__(self) -> None:
  139. """ Destructor """
  140. self.close()
  141. async def _open_connection(self) -> None:
  142. """
  143. Opens a connection to the GPSD server and configures the TCP socket.
  144. """
  145. self.logger.info(
  146. f"Connecting to gpsd at {self.connection_args['host']}" +
  147. (f":{self.connection_args['port']}"
  148. if self.connection_args['port'] else ''))
  149. self.reader, self.writer = await asyncio.wait_for(
  150. asyncio.open_connection(**self.connection_args),
  151. self.connection_timeout,
  152. loop=self.loop)
  153. # Set socket options
  154. sock = self.writer.get_extra_info('socket')
  155. if sock is not None:
  156. if 'SO_KEEPALIVE' in self.alive_opts:
  157. sock.setsockopt(socket.SOL_SOCKET,
  158. socket.SO_KEEPALIVE,
  159. self.alive_opts['SO_KEEPALIVE'])
  160. if hasattr(
  161. sock,
  162. 'TCP_KEEPIDLE') and 'TCP_KEEPIDLE' in self.alive_opts:
  163. sock.setsockopt(socket.IPPROTO_TCP,
  164. socket.TCP_KEEPIDLE, # pylint: disable=E1101
  165. self.alive_opts['TCP_KEEPIDLE'])
  166. if hasattr(
  167. sock,
  168. 'TCP_KEEPINTVL') and 'TCP_KEEPINTVL' in self.alive_opts:
  169. sock.setsockopt(socket.IPPROTO_TCP,
  170. socket.TCP_KEEPINTVL, # pylint: disable=E1101
  171. self.alive_opts['TCP_KEEPINTVL'])
  172. if hasattr(
  173. sock,
  174. 'TCP_KEEPCNT') and 'TCP_KEEPCNT' in self.alive_opts:
  175. sock.setsockopt(socket.IPPROTO_TCP,
  176. socket.TCP_KEEPCNT,
  177. self.alive_opts['TCP_KEEPCNT'])
  178. def close(self) -> None:
  179. """ Closes connection to GPSD server """
  180. if self.writer:
  181. try:
  182. self.writer.close()
  183. except Exception: # pylint: disable=W0703
  184. pass
  185. self.writer = None
  186. def waiting(self) -> bool: # pylint: disable=W0221
  187. """ Mask the blocking waiting method from gpscommon """
  188. return True
  189. async def read(self) -> Union[dictwrapper, str]:
  190. """ Reads data from GPSD server """
  191. while True:
  192. await self.connect()
  193. try:
  194. rx_timeout = self.alive_opts.get('rx_timeout', None)
  195. reader = self.reader.readuntil(separator=b'\n')
  196. self.bresponse = await asyncio.wait_for(reader,
  197. rx_timeout,
  198. loop=self.loop)
  199. self.response = polystr(self.bresponse)
  200. if self.response.startswith(
  201. "{") and self.response.endswith("}\r\n"):
  202. self.unpack(self.response)
  203. self._oldstyle_shim()
  204. self.valid |= PACKET_SET
  205. return self.data
  206. return self.response
  207. except asyncio.CancelledError:
  208. self.close()
  209. raise
  210. except Exception as exc: # pylint: disable=W0703
  211. error = 'timeout' if isinstance(
  212. exc, asyncio.TimeoutError) else exc
  213. self.logger.warning(
  214. f'Failed to get message from GPSD: {error}')
  215. self.close()
  216. if self.reconnect:
  217. # Try again later
  218. await asyncio.sleep(self.reconnect)
  219. else:
  220. raise
  221. async def connect(self) -> None: # pylint: disable=W0221
  222. """ Connects to GPSD server and starts streaming data """
  223. while not self.writer:
  224. try:
  225. await self._open_connection()
  226. await self.stream()
  227. self.logger.info('Connected to gpsd')
  228. except asyncio.CancelledError:
  229. self.close()
  230. raise
  231. except Exception as exc: # pylint: disable=W0703
  232. error = 'timeout' if isinstance(
  233. exc, asyncio.TimeoutError) else exc
  234. self.logger.error(f'Failed to connect to GPSD: {error}')
  235. self.close()
  236. if self.reconnect:
  237. # Try again later
  238. await asyncio.sleep(self.reconnect)
  239. else:
  240. raise
  241. async def send(self, commands) -> None:
  242. """ Sends commands """
  243. bcommands = polybytes(commands + "\n")
  244. if self.writer:
  245. self.writer.write(bcommands)
  246. await self.writer.drain()
  247. async def stream(self, flags: Optional[int] = 0,
  248. devpath: Optional[str] = None) -> None:
  249. """ Creates and sends the stream command """
  250. if flags > 0:
  251. # Update the stream command
  252. self.stream_command = self.generate_stream_command(flags, devpath)
  253. if self.stream_command:
  254. self.logger.info(f'Sent stream as: {self.stream_command}')
  255. await self.send(self.stream_command)
  256. else:
  257. raise TypeError(f'Invalid streaming command: {flags}')
  258. async def __aenter__(self) -> 'aiogps':
  259. """ Context manager entry """
  260. return self
  261. async def __aexit__(self, exc_type, exc, traceback) -> None:
  262. """ Context manager exit: close connection """
  263. self.close()
  264. def __aiter__(self) -> 'aiogps':
  265. """ Async iterator interface """
  266. return self
  267. async def __anext__(self) -> Union[dictwrapper, str]:
  268. """ Returns next message from GPSD """
  269. data = await self.read()
  270. return data
  271. def __next__(self) -> Awaitable:
  272. """
  273. Reimplementation of the blocking iterator from gps.
  274. Returns an awaitable which returns the next message from GPSD.
  275. """
  276. return self.read()