32#include "rapidjson/document.h"
34#include "model/comm_drv_signalk_net.h"
37#include "model/geodesic.h"
39#include "wxServDisc.h"
41#include "observable.h"
43#include "ixwebsocket/IXNetSystem.h"
44#include "ixwebsocket/IXWebSocket.h"
45#include "ixwebsocket/IXUserAgent.h"
46#include "ixwebsocket/IXSocketTLSOptions.h"
48const int kTimerSocket = 9006;
55 const wxString& PortName,
56 const wxString& strBaudRate);
60 bool SetOutMsg(
const wxString& msg);
64 void ThreadMessage(
const wxString& msg);
65 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
66 void CloseComPortPhysical();
67 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
68 size_t WriteComPortPhysical(
unsigned char* msg,
size_t length);
69 void SetGatewayOperationMode(
void);
73 wxString m_FullPortName;
75 unsigned char* put_ptr;
76 unsigned char* tak_ptr;
78 unsigned char* rx_buffer;
92 : wxEvent(
id, commandType) {};
96 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
97 std::shared_ptr<std::string> GetPayload() {
return m_payload; }
100 wxEvent* Clone()
const {
102 newevent->m_payload = this->m_payload;
107 std::shared_ptr<std::string> m_payload;
115 wxEvtHandler* consumer,
const std::string& token);
116 virtual void* Entry();
119 void HandleMessage(
const std::string& message);
120 wxEvtHandler* s_wsSKConsumer;
121 wxIPV4address m_address;
122 wxEvtHandler* m_consumer;
130 wxIPV4address address, wxEvtHandler* consumer,
131 const std::string& token)
132 : m_address(address),
133 m_consumer(consumer),
134 m_parentStream(parent),
136 resume_listener.
Init(SystemEvents::GetInstance().evt_resume,
140 wxLogDebug(
"WebSocketThread: restarted");
144void* WebSocketThread::Entry() {
145 using namespace std::chrono_literals;
146 bool not_done =
true;
148 m_parentStream->SetThreadRunning(
true);
150 s_wsSKConsumer = m_consumer;
152 wxString host = m_address.IPAddress();
153 int port = m_address.Service();
156 std::stringstream wsAddress;
157 wsAddress <<
"ws://" << host <<
":" << port
158 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
159 std::stringstream wssAddress;
160 wssAddress <<
"wss://" << host <<
":" << port
161 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
163 if (!m_token.empty()) {
164 wsAddress <<
"&token=" << m_token;
165 wssAddress <<
"&token=" << m_token;
168 ws.setUrl(wssAddress.str());
169 ix::SocketTLSOptions opt;
170 opt.disable_hostname_validation =
true;
172 ws.setTLSOptions(opt);
173 ws.setPingInterval(30);
175 auto message_callback = [&](
const ix::WebSocketMessagePtr& msg) {
176 if (msg->type == ix::WebSocketMessageType::Message) {
177 HandleMessage(msg->str);
178 }
else if (msg->type == ix::WebSocketMessageType::Open) {
179 wxLogDebug(
"websocket: Connection established");
180 }
else if (msg->type == ix::WebSocketMessageType::Close) {
181 wxLogDebug(
"websocket: Connection disconnected");
182 }
else if (msg->type == ix::WebSocketMessageType::Error) {
183 wxLogDebug(
"websocket: error: %s", msg->errorInfo.reason.c_str());
184 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
185 : ws.setUrl(wsAddress.str());
189 ws.setOnMessageCallback(message_callback);
192 while (m_parentStream->m_Thread_run_flag > 0) {
193 std::this_thread::sleep_for(100ms);
197 m_parentStream->SetThreadRunning(
false);
198 m_parentStream->m_Thread_run_flag = -1;
203void WebSocketThread::HandleMessage(
const std::string& message) {
204 if (s_wsSKConsumer) {
206 auto buffer = std::make_shared<std::string>(message);
208 signalKEvent.SetPayload(buffer);
209 s_wsSKConsumer->AddPendingEvent(signalKEvent);
222 m_Thread_run_flag(-1),
224 m_listener(listener) {
226 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
229 m_addr.Hostname(params->NetworkAddress);
230 m_addr.Service(params->NetworkPort);
231 m_token = params->AuthToken;
232 m_socketread_watchdog_timer.SetOwner(
this, kTimerSocket);
234 m_threadActive =
false;
239CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
241void CommDriverSignalKNet::Open(
void) {
242 wxString discoveredIP;
249 std::string serviceIdent =
250 std::string(
"_signalk-ws._tcp.local.");
252 if (m_params->AutoSKDiscover) {
253 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
255 wxLogDebug(wxString::Format(
256 _T(
"SK server autodiscovery finds WebSocket service: %s:%d"),
257 discoveredIP.c_str(), discoveredPort));
258 m_addr.Hostname(discoveredIP);
259 m_addr.Service(discoveredPort);
264 params->NetworkAddress = discoveredIP;
265 params->NetworkPort = discoveredPort;
267 wxLogDebug(_T(
"SK server autodiscovery finds no WebSocket server."));
273void CommDriverSignalKNet::Close() { CloseWebSocket(); }
275bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
276 wxString& ip,
int& port,
int tSec) {
277 wxServDisc* servscan =
278 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
280 for (
int i = 0; i < 10; i++) {
281 if (servscan->getResultCount()) {
282 auto result = servscan->getResults().at(0);
285 wxServDisc* namescan =
new wxServDisc(0, result.name, QTYPE_SRV);
286 for (
int j = 0; j < 10; j++) {
287 if (namescan->getResultCount()) {
288 auto namescanResult = namescan->getResults().at(0);
289 port = namescanResult.port;
292 wxServDisc* addrscan =
293 new wxServDisc(0, namescanResult.name, QTYPE_A);
294 for (
int k = 0; k < 10; k++) {
295 if (addrscan->getResultCount()) {
296 auto addrscanResult = addrscan->getResults().at(0);
297 ip = addrscanResult.ip;
303 wxMilliSleep(1000 * tSec / 10);
310 wxMilliSleep(1000 * tSec / 10);
317 wxMilliSleep(1000 * tSec / 10);
325void CommDriverSignalKNet::OpenWebSocket() {
327 wxLogMessage(wxString::Format(_T(
"Opening Signal K WebSocket client: %s"),
328 m_params.GetDSPort().c_str()));
333 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
334 wxLogError(wxT(
"Can't create WebSocketThread!"));
340 GetSocketThreadWatchdogTimer()->Start(1000,
347void CommDriverSignalKNet::CloseWebSocket() {
349 if (IsThreadRunning()) {
350 wxLogMessage(_T(
"Stopping Secondary SignalK Thread"));
352 m_Thread_run_flag = 0;
354 while (IsThreadRunning() && tsec) {
360 if (m_Thread_run_flag <= 0)
361 msg.Printf(_T(
"Stopped in %d sec."), 10 - tsec);
363 msg.Printf(_T(
"Not Stopped after 10 sec."));
370 m_thread_run_flag = 0;
371 printf(
"sending delete\n");
372 m_wsThread->Delete();
376 while (IsThreadRunning() && (++nDeadman < 200)) {
379 printf(
"Closed in %d\n", nDeadman);
385void CommDriverSignalKNet::handle_SK_sentence(
387 rapidjson::Document root;
391 std::string* msg =
event.GetPayload().get();
392 std::string msgTerminated = *msg;
393 msgTerminated.append(
"\r\n");
396 if (root.HasParseError()) {
397 wxLogMessage(wxString::Format(
398 _T(
"SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
399 root.GetParseError()));
405 if (root.HasMember(
"version")) {
406 wxString msg = _T(
"Connected to Signal K server version: ");
407 msg << (root[
"version"].GetString());
411 if (root.HasMember(
"self")) {
412 if (strncmp(root[
"self"].GetString(),
"vessels.", 8) == 0)
413 m_self = (root[
"self"].GetString());
416 m_self = std::string(
"vessels.")
417 .append(root[
"self"].GetString());
420 if (root.HasMember(
"context") && root[
"context"].IsString()) {
421 m_context = root[
"context"].GetString();
425 auto pos =
iface.find(
":");
426 std::string comm_interface =
"";
427 if (pos != std::string::npos) comm_interface =
iface.substr(pos + 1);
428 auto navmsg = std::make_shared<const SignalkMsg>(
429 m_self, m_context, msgTerminated, comm_interface);
430 m_listener.
Notify(std::move(navmsg));
433void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
435void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
438void CommDriverSignalKNet::handleUpdate(
wxJSONValue &update) {
439 wxString sfixtime =
"";
442 sfixtime = update[
"timestamp"].
AsString();
444 if (update.
HasMember(
"values") && update[
"values"].IsArray()) {
445 for (
int j = 0; j < update[
"values"].
Size(); ++j) {
447 updateItem(item, sfixtime);
452void CommDriverSignalKNet::updateItem(
wxJSONValue &item,
453 wxString &sfixtime) {
455 const wxString &update_path = item[
"path"].
AsString();
458 if (update_path == _T(
"navigation.position") && !value.IsNull()) {
459 updateNavigationPosition(value, sfixtime);
460 }
else if (update_path == _T(
"navigation.speedOverGround") &&
461 m_bGPSValid_SK && !value.IsNull()) {
462 updateNavigationSpeedOverGround(value, sfixtime);
463 }
else if (update_path == _T(
"navigation.courseOverGroundTrue") &&
464 m_bGPSValid_SK && !value.IsNull()) {
465 updateNavigationCourseOverGround(value, sfixtime);
466 }
else if (update_path == _T(
"navigation.courseOverGroundMagnetic")) {
468 else if (update_path ==
469 _T(
"navigation.gnss.satellites"))
471 updateGnssSatellites(value, sfixtime);
472 }
else if (update_path ==
473 _T(
"navigation.gnss.satellitesInView"))
475 updateGnssSatellites(value, sfixtime);
476 }
else if (update_path == _T(
"navigation.headingTrue")) {
478 updateHeadingTrue(value, sfixtime);
479 }
else if (update_path == _T(
"navigation.headingMagnetic")) {
481 updateHeadingMagnetic(value, sfixtime);
482 }
else if (update_path == _T(
"navigation.magneticVariation")) {
484 updateMagneticVariance(value, sfixtime);
491 writer.
Write(item, dbg);
492 wxString msg( _T(
"update: ") );
500void CommDriverSignalKNet::updateNavigationPosition(
502 if ((value.HasMember(
"latitude" && value[
"latitude"].IsDouble())) &&
503 (value.HasMember(
"longitude") && value[
"longitude"].IsDouble())) {
505 m_lat = value[
"latitude"].AsDouble();
506 m_lon = value[
"longitude"].AsDouble();
507 m_bGPSValid_SK =
true;
509 m_bGPSValid_SK =
false;
514void CommDriverSignalKNet::updateNavigationSpeedOverGround(
516 double sog_ms = value.AsDouble();
517 double sog_knot = sog_ms * ms_to_knot_factor;
522void CommDriverSignalKNet::updateNavigationCourseOverGround(
524 double cog_rad = value.AsDouble();
525 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
530void CommDriverSignalKNet::updateGnssSatellites(
wxJSONValue &value,
531 const wxString &sfixtime) {
534 if (value.AsInt() > 0) {
535 m_frame->setSatelitesInView(value.AsInt());
538 }
else if ((value.HasMember(
"count") && value[
"count"].IsInt())) {
539 m_frame->setSatelitesInView(value[
"count"].AsInt());
545void CommDriverSignalKNet::updateHeadingTrue(
wxJSONValue &value,
546 const wxString &sfixtime) {
547 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
550void CommDriverSignalKNet::updateHeadingMagnetic(
552 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
555void CommDriverSignalKNet::updateMagneticVariance(
557 m_var = GEODESIC_RAD2DEG(value.AsDouble());
const std::string iface
Physical device for 0183, else a unique string.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Adds a std::shared<void> element to wxCommandEvent.
The JSON value class implementation.
int Size() const
Return the size of the array or map stored in this value.
bool HasMember(unsigned index) const
Return TRUE if the object contains an element at the specified index.
wxString AsString() const
Return the stored value as a wxWidget's string.
The JSON document writer.
void Write(const wxJSONValue &value, wxString &str)
Write the JSONvalue object to a JSON text.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Suspend/resume and new devices events exchange point.