OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_signalk_net.cpp
1/***************************************************************************
2 *
3 * Project: OpenCPN
4 * Purpose:
5 * Author: David Register, Alec Leamas
6 *
7 ***************************************************************************
8 * Copyright (C) 2022 by David Register, Alec Leamas *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the *
22 * Free Software Foundation, Inc., *
23 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
24 **************************************************************************/
25
26#include <vector>
27#include <mutex> // std::mutex
28#include <queue> // std::queue
29#include <chrono>
30#include <thread>
31
32#include "rapidjson/document.h"
33
34#include "model/comm_drv_signalk_net.h"
37#include "model/geodesic.h"
38#include "model/sys_events.h"
39#include "wxServDisc.h"
40
41#include "observable.h"
42
43#include "ixwebsocket/IXNetSystem.h"
44#include "ixwebsocket/IXWebSocket.h"
45#include "ixwebsocket/IXUserAgent.h"
46#include "ixwebsocket/IXSocketTLSOptions.h"
47
48const int kTimerSocket = 9006;
49
50class CommDriverSignalKNetEvent; // fwd
51
52class CommDriverSignalKNetThread : public wxThread {
53public:
55 const wxString& PortName,
56 const wxString& strBaudRate);
57
59 void* Entry();
60 bool SetOutMsg(const wxString& msg);
61 void OnExit(void);
62
63private:
64 void ThreadMessage(const wxString& msg);
65 bool OpenComPortPhysical(const wxString& com_name, int baud_rate);
66 void CloseComPortPhysical();
67 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
68 size_t WriteComPortPhysical(unsigned char* msg, size_t length);
69 void SetGatewayOperationMode(void);
70
71 CommDriverSignalKNet* m_pParentDriver;
72 wxString m_PortName;
73 wxString m_FullPortName;
74
75 unsigned char* put_ptr;
76 unsigned char* tak_ptr;
77
78 unsigned char* rx_buffer;
79
80 int m_baud;
81 int m_n_timeout;
82
83 // n2k_atomic_queue<char*> out_que;
84};
85
87wxDECLARE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
88
89class CommDriverSignalKNetEvent : public wxEvent {
90public:
91 CommDriverSignalKNetEvent(wxEventType commandType = wxEVT_NULL, int id = 0)
92 : wxEvent(id, commandType) {};
94
95 // accessors
96 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
97 std::shared_ptr<std::string> GetPayload() { return m_payload; }
98
99 // required for sending with wxPostEvent()
100 wxEvent* Clone() const {
102 newevent->m_payload = this->m_payload;
103 return newevent;
104 };
105
106private:
107 std::shared_ptr<std::string> m_payload;
108};
109
110// WebSocket implementation
111
112class WebSocketThread : public wxThread {
113public:
114 WebSocketThread(CommDriverSignalKNet* parent, wxIPV4address address,
115 wxEvtHandler* consumer, const std::string& token);
116 virtual void* Entry();
117
118private:
119 void HandleMessage(const std::string& message);
120 wxEvtHandler* s_wsSKConsumer;
121 wxIPV4address m_address;
122 wxEvtHandler* m_consumer;
123 CommDriverSignalKNet* m_parentStream;
124 std::string m_token;
125 ix::WebSocket ws;
126 ObsListener resume_listener;
127};
128
129WebSocketThread::WebSocketThread(CommDriverSignalKNet* parent,
130 wxIPV4address address, wxEvtHandler* consumer,
131 const std::string& token)
132 : m_address(address),
133 m_consumer(consumer),
134 m_parentStream(parent),
135 m_token(token) {
136 resume_listener.Init(SystemEvents::GetInstance().evt_resume,
137 [&](ObservedEvt& ev) {
138 ws.stop();
139 ws.start();
140 wxLogDebug("WebSocketThread: restarted");
141 });
142}
143
144void* WebSocketThread::Entry() {
145 using namespace std::chrono_literals;
146 bool not_done = true;
147
148 m_parentStream->SetThreadRunning(true);
149
150 s_wsSKConsumer = m_consumer;
151
152 wxString host = m_address.IPAddress();
153 int port = m_address.Service();
154
155 // Craft the address string
156 std::stringstream wsAddress;
157 wsAddress << "ws://" << host << ":" << port
158 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
159 std::stringstream wssAddress;
160 wssAddress << "wss://" << host << ":" << port
161 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
162
163 if (!m_token.empty()) {
164 wsAddress << "&token=" << m_token;
165 wssAddress << "&token=" << m_token;
166 }
167
168 ws.setUrl(wssAddress.str());
169 ix::SocketTLSOptions opt;
170 opt.disable_hostname_validation = true;
171 opt.caFile = "NONE";
172 ws.setTLSOptions(opt);
173 ws.setPingInterval(30);
174
175 auto message_callback = [&](const ix::WebSocketMessagePtr& msg) {
176 if (msg->type == ix::WebSocketMessageType::Message) {
177 HandleMessage(msg->str);
178 } else if (msg->type == ix::WebSocketMessageType::Open) {
179 wxLogDebug("websocket: Connection established");
180 } else if (msg->type == ix::WebSocketMessageType::Close) {
181 wxLogDebug("websocket: Connection disconnected");
182 } else if (msg->type == ix::WebSocketMessageType::Error) {
183 wxLogDebug("websocket: error: %s", msg->errorInfo.reason.c_str());
184 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
185 : ws.setUrl(wsAddress.str());
186 }
187 };
188
189 ws.setOnMessageCallback(message_callback);
190 ws.start();
191
192 while (m_parentStream->m_Thread_run_flag > 0) {
193 std::this_thread::sleep_for(100ms);
194 }
195
196 ws.stop();
197 m_parentStream->SetThreadRunning(false);
198 m_parentStream->m_Thread_run_flag = -1;
199
200 return 0;
201}
202
203void WebSocketThread::HandleMessage(const std::string& message) {
204 if (s_wsSKConsumer) {
205 CommDriverSignalKNetEvent signalKEvent(wxEVT_COMMDRIVER_SIGNALK_NET, 0);
206 auto buffer = std::make_shared<std::string>(message);
207
208 signalKEvent.SetPayload(buffer);
209 s_wsSKConsumer->AddPendingEvent(signalKEvent);
210 }
211}
212
213//========================================================================
214/* CommDriverSignalKNet implementation
215 * */
216
217wxDEFINE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
218
219CommDriverSignalKNet::CommDriverSignalKNet(const ConnectionParams* params,
220 DriverListener& listener)
221 : CommDriverSignalK(params->GetStrippedDSPort()),
222 m_Thread_run_flag(-1),
223 m_params(*params),
224 m_listener(listener) {
225 // Prepare the wxEventHandler to accept events from the actual hardware thread
226 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
227 this);
228
229 m_addr.Hostname(params->NetworkAddress);
230 m_addr.Service(params->NetworkPort);
231 m_token = params->AuthToken;
232 m_socketread_watchdog_timer.SetOwner(this, kTimerSocket);
233 m_wsThread = NULL;
234 m_threadActive = false;
235
236 Open();
237}
238
239CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
240
241void CommDriverSignalKNet::Open(void) {
242 wxString discoveredIP;
243#if 0
244 int discoveredPort;
245#endif
246
247 // if (m_useWebSocket)
248 {
249 std::string serviceIdent =
250 std::string("_signalk-ws._tcp.local."); // Works for node.js server
251#if 0
252 if (m_params->AutoSKDiscover) {
253 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
254 1)) { // 1 second scan
255 wxLogDebug(wxString::Format(
256 _T("SK server autodiscovery finds WebSocket service: %s:%d"),
257 discoveredIP.c_str(), discoveredPort));
258 m_addr.Hostname(discoveredIP);
259 m_addr.Service(discoveredPort);
260
261 // Update the connection params, by pointer to item in global params
262 // array
263 ConnectionParams *params = (ConnectionParams *)m_params; // non-const
264 params->NetworkAddress = discoveredIP;
265 params->NetworkPort = discoveredPort;
266 } else
267 wxLogDebug(_T("SK server autodiscovery finds no WebSocket server."));
268 }
269#endif
270 OpenWebSocket();
271 }
272}
273void CommDriverSignalKNet::Close() { CloseWebSocket(); }
274
275bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
276 wxString& ip, int& port, int tSec) {
277 wxServDisc* servscan =
278 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
279
280 for (int i = 0; i < 10; i++) {
281 if (servscan->getResultCount()) {
282 auto result = servscan->getResults().at(0);
283 delete servscan;
284
285 wxServDisc* namescan = new wxServDisc(0, result.name, QTYPE_SRV);
286 for (int j = 0; j < 10; j++) {
287 if (namescan->getResultCount()) {
288 auto namescanResult = namescan->getResults().at(0);
289 port = namescanResult.port;
290 delete namescan;
291
292 wxServDisc* addrscan =
293 new wxServDisc(0, namescanResult.name, QTYPE_A);
294 for (int k = 0; k < 10; k++) {
295 if (addrscan->getResultCount()) {
296 auto addrscanResult = addrscan->getResults().at(0);
297 ip = addrscanResult.ip;
298 delete addrscan;
299 return true;
300 break;
301 } else {
302 wxYield();
303 wxMilliSleep(1000 * tSec / 10);
304 }
305 }
306 delete addrscan;
307 return false;
308 } else {
309 wxYield();
310 wxMilliSleep(1000 * tSec / 10);
311 }
312 }
313 delete namescan;
314 return false;
315 } else {
316 wxYield();
317 wxMilliSleep(1000 * tSec / 10);
318 }
319 }
320
321 delete servscan;
322 return false;
323}
324
325void CommDriverSignalKNet::OpenWebSocket() {
326 // printf("OpenWebSocket\n");
327 wxLogMessage(wxString::Format(_T("Opening Signal K WebSocket client: %s"),
328 m_params.GetDSPort().c_str()));
329
330 // Start a thread to run the client without blocking
331
332 m_wsThread = new WebSocketThread(this, GetAddr(), this, m_token);
333 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
334 wxLogError(wxT("Can't create WebSocketThread!"));
335
336 return;
337 }
338
339 ResetWatchdog();
340 GetSocketThreadWatchdogTimer()->Start(1000,
341 wxTIMER_ONE_SHOT); // Start the dog
342 SetThreadRunFlag(1);
343
344 m_wsThread->Run();
345}
346
347void CommDriverSignalKNet::CloseWebSocket() {
348 if (m_wsThread) {
349 if (IsThreadRunning()) {
350 wxLogMessage(_T("Stopping Secondary SignalK Thread"));
351
352 m_Thread_run_flag = 0;
353 int tsec = 10;
354 while (IsThreadRunning() && tsec) {
355 wxSleep(1);
356 tsec--;
357 }
358
359 wxString msg;
360 if (m_Thread_run_flag <= 0)
361 msg.Printf(_T("Stopped in %d sec."), 10 - tsec);
362 else
363 msg.Printf(_T("Not Stopped after 10 sec."));
364 wxLogMessage(msg);
365 }
366
367 wxMilliSleep(100);
368
369#if 0
370 m_thread_run_flag = 0;
371 printf("sending delete\n");
372 m_wsThread->Delete();
373 wxMilliSleep(100);
374
375 int nDeadman = 0;
376 while (IsThreadRunning() && (++nDeadman < 200)) { // spin for max 2 secs.
377 wxMilliSleep(10);
378 }
379 printf("Closed in %d\n", nDeadman);
380 wxMilliSleep(100);
381#endif
382 }
383}
384
385void CommDriverSignalKNet::handle_SK_sentence(
387 rapidjson::Document root;
388
389 // LOG_DEBUG("%s\n", msg.c_str());
390
391 std::string* msg = event.GetPayload().get();
392 std::string msgTerminated = *msg;
393 msgTerminated.append("\r\n");
394
395 root.Parse(*msg);
396 if (root.HasParseError()) {
397 wxLogMessage(wxString::Format(
398 _T("SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
399 root.GetParseError()));
400 return;
401 }
402
403 // Decode just enough of string to extract some identifiers
404 // such as the sK version, "self" context, and target context
405 if (root.HasMember("version")) {
406 wxString msg = _T("Connected to Signal K server version: ");
407 msg << (root["version"].GetString());
408 wxLogMessage(msg);
409 }
410
411 if (root.HasMember("self")) {
412 if (strncmp(root["self"].GetString(), "vessels.", 8) == 0)
413 m_self = (root["self"].GetString()); // for java server, and OpenPlotter
414 // node.js server 1.20
415 else
416 m_self = std::string("vessels.")
417 .append(root["self"].GetString()); // for Node.js server
418 }
419
420 if (root.HasMember("context") && root["context"].IsString()) {
421 m_context = root["context"].GetString();
422 }
423
424 // Notify all listeners
425 auto pos = iface.find(":");
426 std::string comm_interface = "";
427 if (pos != std::string::npos) comm_interface = iface.substr(pos + 1);
428 auto navmsg = std::make_shared<const SignalkMsg>(
429 m_self, m_context, msgTerminated, comm_interface);
430 m_listener.Notify(std::move(navmsg));
431}
432
433void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
434
435void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
436
437#if 0
438void CommDriverSignalKNet::handleUpdate(wxJSONValue &update) {
439 wxString sfixtime = "";
440
441 if (update.HasMember("timestamp")) {
442 sfixtime = update["timestamp"].AsString();
443 }
444 if (update.HasMember("values") && update["values"].IsArray()) {
445 for (int j = 0; j < update["values"].Size(); ++j) {
446 wxJSONValue &item = update["values"][j];
447 updateItem(item, sfixtime);
448 }
449 }
450}
451
452void CommDriverSignalKNet::updateItem(wxJSONValue &item,
453 wxString &sfixtime) {
454 if (item.HasMember("path") && item.HasMember("value")) {
455 const wxString &update_path = item["path"].AsString();
456 wxJSONValue &value = item["value"];
457
458 if (update_path == _T("navigation.position") && !value.IsNull()) {
459 updateNavigationPosition(value, sfixtime);
460 } else if (update_path == _T("navigation.speedOverGround") &&
461 m_bGPSValid_SK && !value.IsNull()) {
462 updateNavigationSpeedOverGround(value, sfixtime);
463 } else if (update_path == _T("navigation.courseOverGroundTrue") &&
464 m_bGPSValid_SK && !value.IsNull()) {
465 updateNavigationCourseOverGround(value, sfixtime);
466 } else if (update_path == _T("navigation.courseOverGroundMagnetic")) {
467 }
468 else if (update_path ==
469 _T("navigation.gnss.satellites")) // From GGA sats in use
470 {
471 /*if (g_priSats >= 2)*/ updateGnssSatellites(value, sfixtime);
472 } else if (update_path ==
473 _T("navigation.gnss.satellitesInView")) // From GSV sats in view
474 {
475 /*if (g_priSats >= 3)*/ updateGnssSatellites(value, sfixtime);
476 } else if (update_path == _T("navigation.headingTrue")) {
477 if(!value.IsNull())
478 updateHeadingTrue(value, sfixtime);
479 } else if (update_path == _T("navigation.headingMagnetic")) {
480 if(!value.IsNull())
481 updateHeadingMagnetic(value, sfixtime);
482 } else if (update_path == _T("navigation.magneticVariation")) {
483 if(!value.IsNull())
484 updateMagneticVariance(value, sfixtime);
485 } else {
486 // wxLogMessage(wxString::Format(_T("** Signal K unhandled update: %s"),
487 // update_path));
488#if 0
489 wxString dbg;
490 wxJSONWriter writer;
491 writer.Write(item, dbg);
492 wxString msg( _T("update: ") );
493 msg.append(dbg);
494 wxLogMessage(msg);
495#endif
496 }
497 }
498}
499
500void CommDriverSignalKNet::updateNavigationPosition(
501 wxJSONValue &value, const wxString &sfixtime) {
502 if ((value.HasMember("latitude" && value["latitude"].IsDouble())) &&
503 (value.HasMember("longitude") && value["longitude"].IsDouble())) {
504 // wxLogMessage(_T(" ***** Position Update"));
505 m_lat = value["latitude"].AsDouble();
506 m_lon = value["longitude"].AsDouble();
507 m_bGPSValid_SK = true;
508 } else {
509 m_bGPSValid_SK = false;
510 }
511}
512
513
514void CommDriverSignalKNet::updateNavigationSpeedOverGround(
515 wxJSONValue &value, const wxString &sfixtime){
516 double sog_ms = value.AsDouble();
517 double sog_knot = sog_ms * ms_to_knot_factor;
518 // wxLogMessage(wxString::Format(_T(" ***** SOG: %f, %f"), sog_ms, sog_knot));
519 m_sog = sog_knot;
520}
521
522void CommDriverSignalKNet::updateNavigationCourseOverGround(
523 wxJSONValue &value, const wxString &sfixtime) {
524 double cog_rad = value.AsDouble();
525 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
526 // wxLogMessage(wxString::Format(_T(" ***** COG: %f, %f"), cog_rad, cog_deg));
527 m_cog = cog_deg;
528}
529
530void CommDriverSignalKNet::updateGnssSatellites(wxJSONValue &value,
531 const wxString &sfixtime) {
532#if 0
533 if (value.IsInt()) {
534 if (value.AsInt() > 0) {
535 m_frame->setSatelitesInView(value.AsInt());
536 g_priSats = 2;
537 }
538 } else if ((value.HasMember("count") && value["count"].IsInt())) {
539 m_frame->setSatelitesInView(value["count"].AsInt());
540 g_priSats = 3;
541 }
542#endif
543}
544
545void CommDriverSignalKNet::updateHeadingTrue(wxJSONValue &value,
546 const wxString &sfixtime) {
547 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
548}
549
550void CommDriverSignalKNet::updateHeadingMagnetic(
551 wxJSONValue &value, const wxString &sfixtime) {
552 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
553}
554
555void CommDriverSignalKNet::updateMagneticVariance(
556 wxJSONValue &value, const wxString &sfixtime) {
557 m_var = GEODESIC_RAD2DEG(value.AsDouble());
558}
559
560#endif
562
563// std::vector<unsigned char>* payload = p.get();
564//
565// // Extract the NMEA0183 sentence
566// std::string full_sentence = std::string(payload->begin(), payload->end());
const std::string iface
Physical device for 0183, else a unique string.
Definition comm_driver.h:88
Interface implemented by transport layer and possible other parties like test code which should handl...
Definition comm_driver.h:48
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
Definition observable.h:228
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Definition observable.h:255
Adds a std::shared<void> element to wxCommandEvent.
The JSON value class implementation.
Definition jsonval.h:84
int Size() const
Return the size of the array or map stored in this value.
Definition jsonval.cpp:1332
bool HasMember(unsigned index) const
Return TRUE if the object contains an element at the specified index.
Definition jsonval.cpp:1298
wxString AsString() const
Return the stored value as a wxWidget's string.
Definition jsonval.cpp:872
The JSON document writer.
Definition jsonwriter.h:50
void Write(const wxJSONValue &value, wxString &str)
Write the JSONvalue object to a JSON text.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Suspend/resume and new devices events exchange point.