forked from meshtastic/python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtcp_interface.py
More file actions
117 lines (99 loc) · 3.87 KB
/
tcp_interface.py
File metadata and controls
117 lines (99 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""TCPInterface class for interfacing with http endpoint
"""
# pylint: disable=R0917
import contextlib
import logging
import socket
import time
from typing import Optional
from meshtastic.stream_interface import StreamInterface
DEFAULT_TCP_PORT = 4403
logger = logging.getLogger(__name__)
class TCPInterface(StreamInterface):
"""Interface class for meshtastic devices over a TCP link"""
def __init__(
self,
hostname: str,
debugOut=None,
noProto: bool=False,
connectNow: bool=True,
portNumber: int=DEFAULT_TCP_PORT,
noNodes:bool=False,
timeout: int = 300,
):
"""Constructor, opens a connection to a specified IP address/hostname
Keyword Arguments:
hostname {string} -- Hostname/IP address of the device to connect to
timeout -- How long to wait for replies (default: 300 seconds)
"""
self.stream = None
self.hostname: str = hostname
self.portNumber: int = portNumber
self.socket: Optional[socket.socket] = None
if connectNow:
self.myConnect()
else:
self.socket = None
super().__init__(debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes, timeout=timeout)
def __repr__(self):
rep = f"TCPInterface({self.hostname!r}"
if self.debugOut is not None:
rep += f", debugOut={self.debugOut!r}"
if self.noProto:
rep += ", noProto=True"
if self.socket is None:
rep += ", connectNow=False"
if self.portNumber != DEFAULT_TCP_PORT:
rep += f", portNumber={self.portNumber!r}"
if self.noNodes:
rep += ", noNodes=True"
rep += ")"
return rep
def _socket_shutdown(self) -> None:
"""Shutdown the socket.
Note: Broke out this line so the exception could be unit tested.
"""
if self.socket is not None:
self.socket.shutdown(socket.SHUT_RDWR)
def myConnect(self) -> None:
"""Connect to socket"""
logger.debug(f"Connecting to {self.hostname}") # type: ignore[str-bytes-safe]
server_address = (self.hostname, self.portNumber)
self.socket = socket.create_connection(server_address)
def close(self) -> None:
"""Close a connection to the device"""
logger.debug("Closing TCP stream")
super().close()
# Sometimes the socket read might be blocked in the reader thread.
# Therefore we force the shutdown by closing the socket here
self._wantExit = True
if self.socket is not None:
with contextlib.suppress(Exception): # Ignore errors in shutdown, because we might have a race with the server
self._socket_shutdown()
self.socket.close()
self.socket = None
def _writeBytes(self, b: bytes) -> None:
"""Write an array of bytes to our stream and flush"""
if self.socket is not None:
self.socket.send(b)
def _readBytes(self, length) -> Optional[bytes]:
"""Read an array of bytes from our stream"""
if self.socket is not None:
data = self.socket.recv(length)
# empty byte indicates a disconnected socket,
# we need to handle it to avoid an infinite loop reading from null socket
if data == b'':
logger.debug("dead socket, re-connecting")
# cleanup and reconnect socket without breaking reader thread
with contextlib.suppress(Exception):
self._socket_shutdown()
self.socket.close()
self.socket = None
time.sleep(1)
self.myConnect()
self._startConfig()
return None
return data
# no socket, break reader thread
self._wantExit = True
return None