OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_signalk_net.cpp
Go to the documentation of this file.
1/***************************************************************************
2 * Copyright (C) 2022 by David Register *
3 * Copyright (C) 2022 Alec Leamas *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, see <https://www.gnu.org/licenses/>. *
17 **************************************************************************/
18
25#include <vector>
26#include <mutex> // std::mutex
27#include <queue> // std::queue
28#include <chrono>
29#include <thread>
30
31#include "rapidjson/document.h"
32
36#include "model/geodesic.h"
37#include "model/sys_events.h"
38#include "wxServDisc.h"
39
40#include "observable.h"
41
42#include "ixwebsocket/IXNetSystem.h"
43#include "ixwebsocket/IXWebSocket.h"
44#include "ixwebsocket/IXUserAgent.h"
45#include "ixwebsocket/IXSocketTLSOptions.h"
46
47using namespace std::literals::chrono_literals;
48
49static const int kTimerSocket = 9006;
50
51class CommDriverSignalKNetEvent; // fwd
52
53class CommDriverSignalKNetThread : public wxThread {
54public:
56 const wxString& PortName,
57 const wxString& strBaudRate);
58
60 void* Entry();
61 bool SetOutMsg(const wxString& msg);
62 void OnExit();
63
64private:
65 void ThreadMessage(const wxString& msg);
66 bool OpenComPortPhysical(const wxString& com_name, int baud_rate);
67 void CloseComPortPhysical();
68 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
69 size_t WriteComPortPhysical(unsigned char* msg, size_t length);
70 void SetGatewayOperationMode();
71
72 CommDriverSignalKNet* m_pParentDriver;
73 wxString m_PortName;
74 wxString m_FullPortName;
75
76 unsigned char* put_ptr;
77 unsigned char* tak_ptr;
78
79 unsigned char* rx_buffer;
80
81 int m_baud;
82 int m_n_timeout;
83
84 // n2k_atomic_queue<char*> out_que;
85};
86
88wxDECLARE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
89
90class CommDriverSignalKNetEvent : public wxEvent {
91public:
92 CommDriverSignalKNetEvent(wxEventType commandType = wxEVT_NULL, int id = 0)
93 : wxEvent(id, commandType) {};
95
96 // accessors
97 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
98 std::shared_ptr<std::string> GetPayload() { return m_payload; }
99
100 // required for sending with wxPostEvent()
101 wxEvent* Clone() const {
103 newevent->m_payload = this->m_payload;
104 return newevent;
105 };
106
107private:
108 std::shared_ptr<std::string> m_payload;
109};
110
111// WebSocket implementation
112
113class WebSocketThread : public wxThread {
114public:
115 WebSocketThread(CommDriverSignalKNet* parent, wxIPV4address address,
116 wxEvtHandler* consumer, const std::string& token);
117 virtual void* Entry();
118
119 DriverStats GetStats() const;
120
121private:
122 void HandleMessage(const std::string& message);
123 wxEvtHandler* s_wsSKConsumer;
124 wxIPV4address m_address;
125 wxEvtHandler* m_consumer;
126 CommDriverSignalKNet* m_parentStream;
127 std::string m_token;
128 ix::WebSocket ws;
129 ObsListener resume_listener;
130 DriverStats m_driver_stats;
131 mutable std::mutex m_stats_mutex;
132};
133
134WebSocketThread::WebSocketThread(CommDriverSignalKNet* parent,
135 wxIPV4address address, wxEvtHandler* consumer,
136 const std::string& token)
137 : m_address(address),
138 m_consumer(consumer),
139 m_parentStream(parent),
140 m_token(token) {
141 resume_listener.Init(SystemEvents::GetInstance().evt_resume,
142 [&](ObservedEvt& ev) {
143 ws.stop();
144 ws.start();
145 wxLogDebug("WebSocketThread: restarted");
146 });
147}
148
149void* WebSocketThread::Entry() {
150 using namespace std::chrono_literals;
151 bool not_done = true;
152
153 m_parentStream->SetThreadRunning(true);
154
155 s_wsSKConsumer = m_consumer;
156
157 wxString host = m_address.IPAddress();
158 int port = m_address.Service();
159
160 // Craft the address string
161 std::stringstream wsAddress;
162 wsAddress << "ws://" << host << ":" << port
163 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
164 std::stringstream wssAddress;
165 wssAddress << "wss://" << host << ":" << port
166 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
167
168 if (!m_token.empty()) {
169 wsAddress << "&token=" << m_token;
170 wssAddress << "&token=" << m_token;
171 }
172
173 ws.setUrl(wssAddress.str());
174 ix::SocketTLSOptions opt;
175 opt.disable_hostname_validation = true;
176 opt.caFile = "NONE";
177 ws.setTLSOptions(opt);
178 ws.setPingInterval(30);
179
180 auto message_callback = [&](const ix::WebSocketMessagePtr& msg) {
181 if (msg->type == ix::WebSocketMessageType::Message) {
182 HandleMessage(msg->str);
183 } else if (msg->type == ix::WebSocketMessageType::Open) {
184 wxLogDebug("websocket: Connection established");
185 std::lock_guard lock(m_stats_mutex);
186 m_driver_stats.available = true;
187 } else if (msg->type == ix::WebSocketMessageType::Close) {
188 wxLogDebug("websocket: Connection disconnected");
189 std::lock_guard lock(m_stats_mutex);
190 m_driver_stats.available = false;
191 } else if (msg->type == ix::WebSocketMessageType::Error) {
192 std::lock_guard lock(m_stats_mutex);
193 m_driver_stats.error_count++;
194 wxLogDebug("websocket: error: %s", msg->errorInfo.reason.c_str());
195 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
196 : ws.setUrl(wsAddress.str());
197 }
198 };
199
200 ws.setOnMessageCallback(message_callback);
201
202 {
203 std::lock_guard lock(m_stats_mutex);
204 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
205 m_driver_stats.driver_iface = m_parentStream->m_params.GetStrippedDSPort();
206 m_driver_stats.available = false;
207 }
208
209 ws.start();
210
211 while (m_parentStream->m_Thread_run_flag > 0) {
212 std::this_thread::sleep_for(100ms);
213 }
214
215 ws.stop();
216 m_parentStream->SetThreadRunning(false);
217 m_parentStream->m_Thread_run_flag = -1;
218 {
219 std::lock_guard lock(m_stats_mutex);
220 m_driver_stats.available = false;
221 }
222
223 return 0;
224}
225
226DriverStats WebSocketThread::GetStats() const {
227 std::lock_guard lock(m_stats_mutex);
228 return m_driver_stats;
229}
230
231void WebSocketThread::HandleMessage(const std::string& message) {
232 if (s_wsSKConsumer) {
233 CommDriverSignalKNetEvent signalKEvent(wxEVT_COMMDRIVER_SIGNALK_NET, 0);
234 auto buffer = std::make_shared<std::string>(message);
235
236 signalKEvent.SetPayload(buffer);
237 s_wsSKConsumer->AddPendingEvent(signalKEvent);
238 m_driver_stats.rx_count++;
239 }
240}
241
242//========================================================================
243/* CommDriverSignalKNet implementation
244 * */
245
246wxDEFINE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
247
248CommDriverSignalKNet::CommDriverSignalKNet(const ConnectionParams* params,
249 DriverListener& listener)
250 : CommDriverSignalK(params->GetStrippedDSPort()),
251 m_Thread_run_flag(-1),
252 m_params(*params),
253 m_listener(listener),
254 m_stats_timer(*this, 2s) {
255 // Prepare the wxEventHandler to accept events from the actual hardware thread
256 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
257 this);
258
259 m_addr.Hostname(params->NetworkAddress);
260 m_addr.Service(params->NetworkPort);
261 m_token = params->AuthToken;
262 m_socketread_watchdog_timer.SetOwner(this, kTimerSocket);
263 m_wsThread = NULL;
264 m_threadActive = false;
265
266 // Dummy Driver Stats, may be polled before worker thread is active
267 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
268 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
269 m_driver_stats.available = false;
270
271 Open();
272}
273
274CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
275
277 if (m_wsThread)
278 return m_wsThread->GetStats();
279 else
280 return m_driver_stats;
281}
282
283void CommDriverSignalKNet::Open() {
284 wxString discoveredIP;
285#if 0
286 int discoveredPort;
287#endif
288
289 // if (m_useWebSocket)
290 {
291 std::string serviceIdent =
292 std::string("_signalk-ws._tcp.local."); // Works for node.js server
293 OpenWebSocket();
294 }
295}
296void CommDriverSignalKNet::Close() { CloseWebSocket(); }
297
298bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
299 wxString& ip, int& port, int tSec) {
300 wxServDisc* servscan =
301 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
302
303 for (int i = 0; i < 10; i++) {
304 if (servscan->getResultCount()) {
305 auto result = servscan->getResults().at(0);
306 delete servscan;
307
308 wxServDisc* namescan = new wxServDisc(0, result.name, QTYPE_SRV);
309 for (int j = 0; j < 10; j++) {
310 if (namescan->getResultCount()) {
311 auto namescanResult = namescan->getResults().at(0);
312 port = namescanResult.port;
313 delete namescan;
314
315 wxServDisc* addrscan =
316 new wxServDisc(0, namescanResult.name, QTYPE_A);
317 for (int k = 0; k < 10; k++) {
318 if (addrscan->getResultCount()) {
319 auto addrscanResult = addrscan->getResults().at(0);
320 ip = addrscanResult.ip;
321 delete addrscan;
322 return true;
323 break;
324 } else {
325 wxYield();
326 wxMilliSleep(1000 * tSec / 10);
327 }
328 }
329 delete addrscan;
330 return false;
331 } else {
332 wxYield();
333 wxMilliSleep(1000 * tSec / 10);
334 }
335 }
336 delete namescan;
337 return false;
338 } else {
339 wxYield();
340 wxMilliSleep(1000 * tSec / 10);
341 }
342 }
343
344 delete servscan;
345 return false;
346}
347
348void CommDriverSignalKNet::OpenWebSocket() {
349 // printf("OpenWebSocket\n");
350 wxLogMessage(wxString::Format("Opening Signal K WebSocket client: %s",
351 m_params.GetDSPort().c_str()));
352
353 // Start a thread to run the client without blocking
354
355 m_wsThread = new WebSocketThread(this, GetAddr(), this, m_token);
356 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
357 wxLogError("Can't create WebSocketThread!");
358
359 return;
360 }
361
362 ResetWatchdog();
363 GetSocketThreadWatchdogTimer()->Start(1000,
364 wxTIMER_ONE_SHOT); // Start the dog
365 SetThreadRunFlag(1);
366
367 m_wsThread->Run();
368}
369
370void CommDriverSignalKNet::CloseWebSocket() {
371 if (m_wsThread) {
372 if (IsThreadRunning()) {
373 wxLogMessage("Stopping Secondary SignalK Thread");
374 m_stats_timer.Stop();
375
376 m_Thread_run_flag = 0;
377 int tsec = 10;
378 while (IsThreadRunning() && tsec) {
379 wxSleep(1);
380 tsec--;
381 }
382
383 wxString msg;
384 if (m_Thread_run_flag <= 0)
385 msg.Printf("Stopped in %d sec.", 10 - tsec);
386 else
387 msg.Printf("Not Stopped after 10 sec.");
388 wxLogMessage(msg);
389 }
390
391 wxMilliSleep(100);
392 }
393}
394
395void CommDriverSignalKNet::handle_SK_sentence(
397 rapidjson::Document root;
398
399 // LOG_DEBUG("%s\n", msg.c_str());
400
401 std::string* msg = event.GetPayload().get();
402 std::string msgTerminated = *msg;
403 msgTerminated.append("\r\n");
404
405 root.Parse(*msg);
406 if (root.HasParseError()) {
407 wxLogMessage(wxString::Format(
408 "SignalKDataStream ERROR: the JSON document is not well-formed:%d",
409 root.GetParseError()));
410 return;
411 }
412
413 if (!root.IsObject()) {
414 wxLogMessage(wxString::Format(
415 "SignalKDataStream ERROR: Message is not a JSON Object: %s",
416 msg->c_str()));
417 return;
418 }
419
420 // Decode just enough of string to extract some identifiers
421 // such as the sK version, "self" context, and target context
422 if (root.HasMember("version")) {
423 wxString msg = "Connected to Signal K server version: ";
424 msg << (root["version"].GetString());
425 wxLogMessage(msg);
426 }
427
428 if (root.HasMember("self")) {
429 if (strncmp(root["self"].GetString(), "vessels.", 8) == 0)
430 m_self = (root["self"].GetString()); // for java server, and OpenPlotter
431 // node.js server 1.20
432 else
433 m_self = std::string("vessels.")
434 .append(root["self"].GetString()); // for Node.js server
435 }
436
437 if (root.HasMember("context") && root["context"].IsString()) {
438 m_context = root["context"].GetString();
439 }
440
441 // Notify all listeners
442 auto pos = iface.find(":");
443 std::string comm_interface = "";
444 if (pos != std::string::npos) comm_interface = iface.substr(pos + 1);
445 auto navmsg = std::make_shared<const SignalkMsg>(
446 m_self, m_context, msgTerminated, comm_interface);
447 m_listener.Notify(std::move(navmsg));
448}
449
450void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
451
452void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
453
455
456// std::vector<unsigned char>* payload = p.get();
457//
458// // Extract the NMEA0183 sentence
459// std::string full_sentence = std::string(payload->begin(), payload->end());
const std::string iface
Physical device for 0183, else a unique string.
Definition comm_driver.h:95
DriverStats GetDriverStats() const override
Get the Driver Statistics.
Interface for handling incoming messages.
Definition comm_driver.h:50
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
Definition observable.h:257
void Init(const KeyProvider &kp, const std::function< void(ObservedEvt &ev)> &action)
Initiate an object yet not listening.
Definition observable.h:295
Custom event class for OpenCPN's notification system.
Driver registration container, a singleton.
SignalK IP network driver.
Raw messages layer, supports sending and recieving navmsg messages.
General observable implementation with several specializations.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.