OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_signalk_net.cpp
Go to the documentation of this file.
1/***************************************************************************
2 * Copyright (C) 2022 by David Register *
3 * Copyright (C) 2022 Alec Leamas *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, see <https://www.gnu.org/licenses/>. *
17 **************************************************************************/
18
25#include <chrono>
26#include <mutex> // std::mutex
27
28#include <wx/socket.h>
29
30#include "rapidjson/document.h"
31#include "ixwebsocket/IXNetSystem.h"
32#include "ixwebsocket/IXSocketTLSOptions.h"
33#include "ixwebsocket/IXWebSocket.h"
34#include "observable.h"
35
38#include "model/geodesic.h"
39#include "model/logger.h"
40#include "model/sys_events.h"
41#include "model/thread_ctrl.h"
42#include "wxServDisc.h"
43
44using namespace std::literals::chrono_literals;
45
46constexpr int kTimerSocket = 9006;
47constexpr int kSignalkSocketId = 5011;
48constexpr int kDogTimeoutReconnectSeconds = 10;
49
50constexpr double kMsToKnotFactor = 1.9438444924406;
51
53public:
54 IoThread(const std::string& iface, const wxIPV4address& address,
55 wxEvtHandler* consumer, const std::string& token);
56
57 ~IoThread() override = default;
58
59 void Run();
60
61 DriverStats GetStats() const;
62
63private:
64 wxIPV4address m_address;
65 wxEvtHandler* m_consumer;
66 const std::string m_iface;
67 std::string m_token;
68 ix::WebSocket m_ws;
69 ObsListener m_resume_listener;
70 DriverStats m_driver_stats;
71 mutable std::mutex m_stats_mutex;
72};
73
74// i. e. wxDEFINE_EVENT(), avoiding the evil macro.
75static const wxEventTypeTag<CommDriverSignalKNet::InputEvt> SignalkEvtType(
76 wxNewEventType());
77
78static wxIPV4address ParamsIpAddress(const ConnectionParams& params) {
79 wxIPV4address addr;
80 addr.Hostname(params.NetworkAddress);
81 addr.Service(params.NetworkPort);
82 return addr;
83}
84
85class CommDriverSignalKNet::InputEvt : public wxEvent {
86public:
87 explicit InputEvt(std::string payload)
88 : wxEvent(0, SignalkEvtType), m_payload(std::move(payload)) {};
89
90 std::string GetPayload() const { return m_payload; }
91
92 // required for sending with wxPostEvent()
93 wxEvent* Clone() const override { return new InputEvt(m_payload); };
94
95private:
96 const std::string m_payload;
97};
98
99//========================================================================
100// IoThread implementation
101//
102CommDriverSignalKNet::IoThread::IoThread(const std::string& iface,
103 const wxIPV4address& address,
104 wxEvtHandler* consumer,
105 const std::string& token)
106 : m_address(address), m_consumer(consumer), m_iface(iface), m_token(token) {
107 m_resume_listener.Init(SystemEvents::GetInstance().evt_resume,
108 [&](ObservedEvt& ev) {
109 m_ws.stop();
110 m_ws.start();
111 wxLogDebug("WebSocketThread: restarted");
112 });
113}
114
115void CommDriverSignalKNet::IoThread::Run() {
116 using namespace std::chrono_literals;
117
118 {
119 std::lock_guard lock(m_stats_mutex);
120 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
121 m_driver_stats.driver_iface = m_iface;
122 m_driver_stats.available = false;
123 }
124
125 // Craft the address strings
126 wxString host = m_address.IPAddress();
127 int port = m_address.Service();
128 std::stringstream wsAddress;
129 wsAddress << "ws://" << host << ":" << port
130 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
131 if (!m_token.empty()) wsAddress << "&token=" << m_token;
132 std::stringstream wssAddress;
133 wssAddress << "wss://" << host << ":" << port
134 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
135 if (!m_token.empty()) wssAddress << "&token=" << m_token;
136
137 m_ws.setUrl(wssAddress.str());
138 ix::SocketTLSOptions opt;
139 opt.disable_hostname_validation = true;
140 opt.caFile = "NONE";
141 m_ws.setTLSOptions(opt);
142 m_ws.setPingInterval(30);
143
144 auto message_callback = [&](const ix::WebSocketMessagePtr& msg) {
145 if (msg->type == ix::WebSocketMessageType::Message) {
146 m_consumer->QueueEvent(new InputEvt(msg->str));
147 m_driver_stats.rx_count++;
148 } else if (msg->type == ix::WebSocketMessageType::Open) {
149 wxLogDebug("websocket: Connection to %s established",
150 m_ws.getUrl().c_str());
151 std::lock_guard lock(m_stats_mutex);
152 m_driver_stats.available = true;
153 } else if (msg->type == ix::WebSocketMessageType::Close) {
154 wxLogDebug("websocket: Connection disconnected");
155 std::lock_guard lock(m_stats_mutex);
156 m_driver_stats.available = false;
157 } else if (msg->type == ix::WebSocketMessageType::Error) {
158 std::lock_guard lock(m_stats_mutex);
159 m_driver_stats.error_count++;
160 wxLogDebug("websocket: error: %s", msg->errorInfo.reason.c_str());
161 m_ws.getUrl() == wsAddress.str() ? m_ws.setUrl(wssAddress.str())
162 : m_ws.setUrl(wsAddress.str());
163 }
164 };
165 m_ws.setOnMessageCallback(message_callback);
166
167 m_ws.start();
168 while (KeepGoing()) {
169 std::this_thread::sleep_for(100ms);
170 }
171 m_ws.stop();
172 SignalExit();
173 std::lock_guard lock(m_stats_mutex);
174 m_driver_stats.available = false;
175}
176
177//========================================================================
178// CommDriverSignalKNet implementation
179//
180DriverStats CommDriverSignalKNet::IoThread::GetStats() const {
181 std::lock_guard lock(m_stats_mutex);
182 return m_driver_stats;
183}
184
185CommDriverSignalKNet::CommDriverSignalKNet(const ConnectionParams* params,
186 DriverListener& listener)
187 : CommDriverSignalK(params->GetStrippedDSPort()),
188 m_params(*params),
189 m_listener(listener),
190 m_dog_value(kDogTimeoutSeconds),
191 m_io_thread(std::make_unique<IoThread>(params->GetStrippedDSPort(),
192 ParamsIpAddress(*params), this,
193 params->AuthToken.ToStdString())),
194 m_stats_timer(*this, 2s) {
195 // Prepare the wxEventHandler to accept events from the actual hardware thread
196 Bind(SignalkEvtType, &CommDriverSignalKNet::HandleSkSentence, this);
197
198 m_socketread_watchdog_timer.SetOwner(this, kTimerSocket);
199
200 // Dummy Driver Stats, may be polled before worker thread is active
201 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
202 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
203 m_driver_stats.available = false;
204
205 Open();
206}
207
208CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
209
211 if (m_std_thread.joinable())
212 return m_io_thread->GetStats();
213 else
214 return m_driver_stats;
215}
216
217void CommDriverSignalKNet::Open() {
218 wxString discoveredIP;
219#if 0
220 int discoveredPort;
221#endif
222
223 // if (m_useWebSocket)
224 {
225 std::string service_ident =
226 std::string("_signalk-ws._tcp.local."); // Works for node.js server
227 OpenWebSocket();
228 }
229}
230void CommDriverSignalKNet::Close() { CloseWebSocket(); }
231
232bool CommDriverSignalKNet::DiscoverSkServer(const std::string& service_ident,
233 wxString& ip, int& port, int tSec) {
234 auto servscan = std::make_unique<wxServDisc>(
235 nullptr, wxString(service_ident.c_str()), QTYPE_PTR);
236 for (int i = 0; i < 10; i++) {
237 if (servscan->getResultCount()) {
238 auto result = servscan->getResults().at(0);
239 auto namescan =
240 std::make_unique<wxServDisc>(nullptr, result.name, QTYPE_SRV);
241 for (int j = 0; j < 10; j++) {
242 if (namescan->getResultCount()) {
243 auto namescan_result = namescan->getResults().at(0);
244 port = namescan_result.port;
245 auto addrscan = std::make_unique<wxServDisc>(
246 nullptr, namescan_result.name, QTYPE_A);
247 for (int k = 0; k < 10; k++) {
248 if (addrscan->getResultCount()) {
249 auto addrscan_result = addrscan->getResults().at(0);
250 ip = addrscan_result.ip;
251 return true;
252 } else {
253 wxYield();
254 wxMilliSleep(1000 * tSec / 10);
255 }
256 }
257 return false;
258 } else {
259 wxYield();
260 wxMilliSleep(1000 * tSec / 10);
261 }
262 }
263 return false;
264 } else {
265 wxYield();
266 wxMilliSleep(1000 * tSec / 10);
267 }
268 }
269 return false;
270}
271
272void CommDriverSignalKNet::OpenWebSocket() {
273 wxLogMessage("Opening Signal K WebSocket client: %s",
274 m_params.GetDSPort().c_str());
275 m_std_thread = std::thread([&] { m_io_thread->Run(); });
276 if (!m_std_thread.joinable()) {
277 wxLogError("Can't create WebSocketThread!");
278 return;
279 }
280 ResetWatchdog();
281 m_socketread_watchdog_timer.Start(1000, wxTIMER_ONE_SHOT);
282}
283
284void CommDriverSignalKNet::CloseWebSocket() {
285 if (m_std_thread.joinable()) {
286 if (m_io_thread->IsRunning()) {
287 wxLogMessage("Stopping Secondary SignalK Thread");
288 m_stats_timer.Stop();
289 m_io_thread->RequestStop();
290 std::chrono::milliseconds stop_delay;
291 bool stop_ok = m_io_thread->WaitUntilStopped(10s, stop_delay);
292 if (stop_ok)
293 MESSAGE_LOG << "Stopped in" << stop_delay.count() << " msec.";
294 else
295 WARNING_LOG << "Not stopped after 10 sec.";
296 }
297 wxMilliSleep(100);
298 m_std_thread.join();
299 } else {
300 WARNING_LOG << "Thread unexpectedly died";
301 }
302}
303
304void CommDriverSignalKNet::HandleSkSentence(const InputEvt& event) {
305 rapidjson::Document root;
306
307 std::string msg = event.GetPayload();
308 root.Parse(msg);
309 if (root.HasParseError()) {
310 wxLogMessage(
311 "SignalKDataStream ERROR: the JSON document is not well-formed: %d",
312 root.GetParseError());
313 return;
314 }
315 if (!root.IsObject()) {
316 wxLogMessage("SignalKDataStream ERROR: Message is not a JSON Object: %s",
317 msg.c_str());
318 return;
319 }
320
321 // Decode just enough of string to extract some identifiers
322 // such as the sK version, "self" context, and target context
323 if (root.HasMember("version")) {
324 wxString vers = "Connected to Signal K server version: ";
325 vers << (root["version"].GetString());
326 wxLogMessage(vers);
327 }
328 if (root.HasMember("self")) {
329 if (strncmp(root["self"].GetString(), "vessels.", 8) == 0)
330 m_self = (root["self"].GetString()); // for java server, and OpenPlotter
331 // node.js server 1.20
332 else
333 m_self = std::string("vessels.")
334 .append(root["self"].GetString()); // for Node.js server
335 }
336 if (root.HasMember("context") && root["context"].IsString()) {
337 m_context = root["context"].GetString();
338 }
339
340 // Notify all listeners
341 auto pos = iface.find(':');
342 std::string comm_interface;
343 if (pos != std::string::npos) comm_interface = iface.substr(pos + 1);
344 auto navmsg = std::make_shared<const SignalkMsg>(m_self, m_context, msg,
345 comm_interface);
346 m_listener.Notify(std::move(navmsg));
347}
348
349void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
350
351void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
const std::string iface
Physical device for 0183, else a unique string.
Definition comm_driver.h:95
static void uninitIXNetSystem()
ix::uninitIXNetSystem wrapper
DriverStats GetDriverStats() const override
Get the Driver Statistics.
static void initIXNetSystem()
ix::initIXNetSystem wrapper
static bool DiscoverSkServer(const std::string &service_ident, wxString &ip, int &port, int tSec)
Scan for a SignalK server on local network using mDNS.
Interface for handling incoming messages.
Definition comm_driver.h:50
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
Definition observable.h:257
Custom event class for OpenCPN's notification system.
Thread mixin providing a "stop thread"/"wait until stopped" interface.
Definition thread_ctrl.h:31
SignalK IP network driver.
Raw messages layer, supports sending and recieving navmsg messages.
Enhanced logging interface on top of wx/log.h.
General observable implementation with several specializations.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.
ThreadCtrl mixin class definition.