30#include "rapidjson/document.h"
31#include "ixwebsocket/IXNetSystem.h"
32#include "ixwebsocket/IXSocketTLSOptions.h"
33#include "ixwebsocket/IXWebSocket.h"
38#include "model/geodesic.h"
42#include "wxServDisc.h"
44using namespace std::literals::chrono_literals;
46constexpr int kTimerSocket = 9006;
47constexpr int kSignalkSocketId = 5011;
48constexpr int kDogTimeoutReconnectSeconds = 10;
50constexpr double kMsToKnotFactor = 1.9438444924406;
54 IoThread(
const std::string&
iface,
const wxIPV4address& address,
55 wxEvtHandler* consumer,
const std::string& token);
64 wxIPV4address m_address;
65 wxEvtHandler* m_consumer;
66 const std::string m_iface;
71 mutable std::mutex m_stats_mutex;
75static const wxEventTypeTag<CommDriverSignalKNet::InputEvt> SignalkEvtType(
80 addr.Hostname(params.NetworkAddress);
81 addr.Service(params.NetworkPort);
87 explicit InputEvt(std::string payload)
88 : wxEvent(0, SignalkEvtType), m_payload(std::move(payload)) {};
90 std::string GetPayload()
const {
return m_payload; }
93 wxEvent* Clone()
const override {
return new InputEvt(m_payload); };
96 const std::string m_payload;
102CommDriverSignalKNet::IoThread::IoThread(
const std::string& iface,
103 const wxIPV4address& address,
104 wxEvtHandler* consumer,
105 const std::string& token)
106 : m_address(address), m_consumer(consumer), m_iface(iface), m_token(token) {
107 m_resume_listener.Init(SystemEvents::GetInstance().evt_resume,
111 wxLogDebug(
"WebSocketThread: restarted");
115void CommDriverSignalKNet::IoThread::Run() {
116 using namespace std::chrono_literals;
119 std::lock_guard lock(m_stats_mutex);
120 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
121 m_driver_stats.driver_iface = m_iface;
122 m_driver_stats.available =
false;
126 wxString host = m_address.IPAddress();
127 int port = m_address.Service();
128 std::stringstream wsAddress;
129 wsAddress <<
"ws://" << host <<
":" << port
130 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
131 if (!m_token.empty()) wsAddress <<
"&token=" << m_token;
132 std::stringstream wssAddress;
133 wssAddress <<
"wss://" << host <<
":" << port
134 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
135 if (!m_token.empty()) wssAddress <<
"&token=" << m_token;
137 m_ws.setUrl(wssAddress.str());
138 ix::SocketTLSOptions opt;
139 opt.disable_hostname_validation =
true;
141 m_ws.setTLSOptions(opt);
142 m_ws.setPingInterval(30);
144 auto message_callback = [&](
const ix::WebSocketMessagePtr& msg) {
145 if (msg->type == ix::WebSocketMessageType::Message) {
146 m_consumer->QueueEvent(
new InputEvt(msg->str));
148 }
else if (msg->type == ix::WebSocketMessageType::Open) {
149 wxLogDebug(
"websocket: Connection to %s established",
150 m_ws.getUrl().c_str());
151 std::lock_guard lock(m_stats_mutex);
152 m_driver_stats.available =
true;
153 }
else if (msg->type == ix::WebSocketMessageType::Close) {
154 wxLogDebug(
"websocket: Connection disconnected");
155 std::lock_guard lock(m_stats_mutex);
156 m_driver_stats.available =
false;
157 }
else if (msg->type == ix::WebSocketMessageType::Error) {
158 std::lock_guard lock(m_stats_mutex);
160 wxLogDebug(
"websocket: error: %s", msg->errorInfo.reason.c_str());
161 m_ws.getUrl() == wsAddress.str() ? m_ws.setUrl(wssAddress.str())
162 : m_ws.setUrl(wsAddress.str());
165 m_ws.setOnMessageCallback(message_callback);
168 while (KeepGoing()) {
169 std::this_thread::sleep_for(100ms);
173 std::lock_guard lock(m_stats_mutex);
174 m_driver_stats.available =
false;
180DriverStats CommDriverSignalKNet::IoThread::GetStats()
const {
181 std::lock_guard lock(m_stats_mutex);
182 return m_driver_stats;
189 m_listener(listener),
190 m_dog_value(kDogTimeoutSeconds),
191 m_io_thread(std::make_unique<IoThread>(params->GetStrippedDSPort(),
192 ParamsIpAddress(*params), this,
193 params->AuthToken.ToStdString())),
194 m_stats_timer(*this, 2s) {
196 Bind(SignalkEvtType, &CommDriverSignalKNet::HandleSkSentence,
this);
198 m_socketread_watchdog_timer.SetOwner(
this, kTimerSocket);
201 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
202 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
203 m_driver_stats.available =
false;
208CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
211 if (m_std_thread.joinable())
212 return m_io_thread->GetStats();
214 return m_driver_stats;
217void CommDriverSignalKNet::Open() {
218 wxString discoveredIP;
225 std::string service_ident =
226 std::string(
"_signalk-ws._tcp.local.");
230void CommDriverSignalKNet::Close() { CloseWebSocket(); }
233 wxString& ip,
int& port,
int tSec) {
234 auto servscan = std::make_unique<wxServDisc>(
235 nullptr, wxString(service_ident.c_str()), QTYPE_PTR);
236 for (
int i = 0; i < 10; i++) {
237 if (servscan->getResultCount()) {
238 auto result = servscan->getResults().at(0);
240 std::make_unique<wxServDisc>(
nullptr, result.name, QTYPE_SRV);
241 for (
int j = 0; j < 10; j++) {
242 if (namescan->getResultCount()) {
243 auto namescan_result = namescan->getResults().at(0);
244 port = namescan_result.port;
245 auto addrscan = std::make_unique<wxServDisc>(
246 nullptr, namescan_result.name, QTYPE_A);
247 for (
int k = 0; k < 10; k++) {
248 if (addrscan->getResultCount()) {
249 auto addrscan_result = addrscan->getResults().at(0);
250 ip = addrscan_result.ip;
254 wxMilliSleep(1000 * tSec / 10);
260 wxMilliSleep(1000 * tSec / 10);
266 wxMilliSleep(1000 * tSec / 10);
272void CommDriverSignalKNet::OpenWebSocket() {
273 wxLogMessage(
"Opening Signal K WebSocket client: %s",
274 m_params.GetDSPort().c_str());
275 m_std_thread = std::thread([&] { m_io_thread->Run(); });
276 if (!m_std_thread.joinable()) {
277 wxLogError(
"Can't create WebSocketThread!");
281 m_socketread_watchdog_timer.Start(1000, wxTIMER_ONE_SHOT);
284void CommDriverSignalKNet::CloseWebSocket() {
285 if (m_std_thread.joinable()) {
286 if (m_io_thread->IsRunning()) {
287 wxLogMessage(
"Stopping Secondary SignalK Thread");
288 m_stats_timer.Stop();
289 m_io_thread->RequestStop();
290 std::chrono::milliseconds stop_delay;
291 bool stop_ok = m_io_thread->WaitUntilStopped(10s, stop_delay);
293 MESSAGE_LOG <<
"Stopped in" << stop_delay.count() <<
" msec.";
295 WARNING_LOG <<
"Not stopped after 10 sec.";
300 WARNING_LOG <<
"Thread unexpectedly died";
304void CommDriverSignalKNet::HandleSkSentence(
const InputEvt& event) {
305 rapidjson::Document root;
307 std::string msg =
event.GetPayload();
309 if (root.HasParseError()) {
311 "SignalKDataStream ERROR: the JSON document is not well-formed: %d",
312 root.GetParseError());
315 if (!root.IsObject()) {
316 wxLogMessage(
"SignalKDataStream ERROR: Message is not a JSON Object: %s",
323 if (root.HasMember(
"version")) {
324 wxString vers =
"Connected to Signal K server version: ";
325 vers << (root[
"version"].GetString());
328 if (root.HasMember(
"self")) {
329 if (strncmp(root[
"self"].GetString(),
"vessels.", 8) == 0)
330 m_self = (root[
"self"].GetString());
333 m_self = std::string(
"vessels.")
334 .append(root[
"self"].GetString());
336 if (root.HasMember(
"context") && root[
"context"].IsString()) {
337 m_context = root[
"context"].GetString();
341 auto pos =
iface.find(
':');
342 std::string comm_interface;
343 if (pos != std::string::npos) comm_interface =
iface.substr(pos + 1);
344 auto navmsg = std::make_shared<const SignalkMsg>(m_self, m_context, msg,
346 m_listener.
Notify(std::move(navmsg));
const std::string iface
Physical device for 0183, else a unique string.
static void uninitIXNetSystem()
ix::uninitIXNetSystem wrapper
DriverStats GetDriverStats() const override
Get the Driver Statistics.
static void initIXNetSystem()
ix::initIXNetSystem wrapper
static bool DiscoverSkServer(const std::string &service_ident, wxString &ip, int &port, int tSec)
Scan for a SignalK server on local network using mDNS.
Interface for handling incoming messages.
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
Custom event class for OpenCPN's notification system.
Thread mixin providing a "stop thread"/"wait until stopped" interface.
SignalK IP network driver.
Raw messages layer, supports sending and recieving navmsg messages.
Enhanced logging interface on top of wx/log.h.
General observable implementation with several specializations.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.
ThreadCtrl mixin class definition.