OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_n0183_net.cpp
Go to the documentation of this file.
1/**************************************************************************
2 * Copyright (C) 2022 David Register *
3 * Copyright (C) 2022 Alec Leamas *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
19 **************************************************************************/
20
26#ifdef __MSVC__
27#include "winsock2.h"
28#include <wx/msw/winundef.h>
29#include <ws2tcpip.h>
30#endif
31
32#include <wx/wxprec.h>
33
34#ifndef WX_PRECOMP
35#include <wx/wx.h>
36#endif
37
38#include <ctime>
39#include <deque>
40
41#ifndef __WXMSW__
42#include <arpa/inet.h>
43#include <netinet/tcp.h>
44#endif
45
46#include <wx/datetime.h>
47#include <wx/socket.h>
48#include <wx/log.h>
49#include <wx/memory.h>
50#include <wx/chartype.h>
51#include <wx/sckaddr.h>
52
56#include "model/config_vars.h"
57#include "model/garmin_protocol_mgr.h"
58#include "model/idents.h"
59#include "model/logger.h"
60#include "model/sys_events.h"
61
62#include "observable.h"
63
64using namespace std::literals::chrono_literals;
65
66#define N_DOG_TIMEOUT 8
67
69static bool IsBroadcastAddr(unsigned addr, unsigned netmask_bits) {
70 assert(netmask_bits <= 32);
71 uint32_t netmask = 0xffffffff << (32 - netmask_bits);
72 uint32_t host_mask = ~netmask;
73 return (addr & host_mask) == host_mask;
74}
75
77public:
78 struct ip_mreq m_mrq;
79 void SetMrqAddr(unsigned int addr) {
80 m_mrq.imr_multiaddr.s_addr = addr;
81 m_mrq.imr_interface.s_addr = INADDR_ANY;
82 }
83};
84
85static bool SetOutputSocketOptions(wxSocketBase* tsock) {
86 int ret;
87
88 // Disable nagle algorithm on outgoing connection
89 // Doing this here rather than after the accept() is
90 // pointless on platforms where TCP_NODELAY is
91 // not inherited. However, none of OpenCPN's currently
92 // supported platforms fall into that category.
93
94 int nagleDisable = 1;
95 ret = tsock->SetOption(IPPROTO_TCP, TCP_NODELAY, &nagleDisable,
96 sizeof(nagleDisable));
97
98 // Drastically reduce the size of the socket output buffer
99 // so that when client goes away without properly closing, the stream will
100 // quickly fill the output buffer, and thus fail the write() call
101 // within a few seconds.
102 unsigned long outbuf_size = 1024; // Smallest allowable value on Linux
103 return (tsock->SetOption(SOL_SOCKET, SO_SNDBUF, &outbuf_size,
104 sizeof(outbuf_size)) &&
105 ret);
106}
107
108//========================================================================
109/*
110 * CommDriverN0183Net implementation
111 */
112CommDriverN0183Net::CommDriverN0183Net(const ConnectionParams* params,
113 DriverListener& listener)
114 : CommDriverN0183(NavAddr::Bus::N0183, params->GetStrippedDSPort()),
115 m_params(*params),
116 m_listener(listener),
117 m_sock(nullptr),
118 m_tsock(nullptr),
119 m_socket_server(nullptr),
120 m_is_multicast(false),
121 m_stats_timer(*this, 2s),
122 m_txenter(0),
123 m_dog_value(0),
124 m_rx_connect_event(false),
125 m_socket_timer(*this),
126 m_socketread_watchdog_timer(*this),
127 m_ok(false),
128 m_is_conn_err_reported(false) {
129 m_addr.Hostname(params->NetworkAddress);
130 m_addr.Service(params->NetworkPort);
131 this->attributes["netAddress"] = params->NetworkAddress.ToStdString();
132 this->attributes["netPort"] = std::to_string(params->NetworkPort);
133 this->attributes["userComment"] = params->UserComment.ToStdString();
134 this->attributes["ioDirection"] = DsPortTypeToString(params->IOSelect);
135 m_driver_stats.driver_bus = NavAddr::Bus::N0183;
136 m_driver_stats.driver_iface = params->GetStrippedDSPort();
137
138 m_mrq_container = std::make_unique<MrqContainer>();
139
140 // Establish event listeners
141 resume_listener.Init(SystemEvents::GetInstance().evt_resume,
142 [&](ObservedEvt&) { HandleResume(); });
143 Bind(wxEVT_SOCKET, &CommDriverN0183Net::OnSocketEvent, this, DS_SOCKET_ID);
144 Bind(wxEVT_SOCKET, &CommDriverN0183Net::OnServerSocketEvent, this,
145 DS_SERVERSOCKET_ID);
146
147 Open();
148}
149
150CommDriverN0183Net::~CommDriverN0183Net() { Close(); }
151
152void CommDriverN0183Net::HandleN0183Msg(const std::string& sentence) {
153 // Sanity check
154 if ((sentence[0] == '$' || sentence[0] == '!') && sentence.size() > 5) {
155 m_driver_stats.rx_count += sentence.size();
156 std::string identifier;
157 // We notify based on full message, including the Talker ID
158 identifier = sentence.substr(1, 5);
159
160 // notify message listener and also "ALL" N0183 messages, to support plugin
161 // API using original talker id
162 auto msg =
163 std::make_shared<const Nmea0183Msg>(identifier, sentence, GetAddress());
164 auto msg_all = std::make_shared<const Nmea0183Msg>(*msg, "ALL");
165
166 if (m_params.SentencePassesFilter(sentence, FILTER_INPUT))
167 m_listener.Notify(std::move(msg));
168 m_listener.Notify(std::move(msg_all));
169 }
170}
171
172void CommDriverN0183Net::Open() {
173#ifdef __UNIX__
174 in_addr_t addr =
175 ((struct sockaddr_in*)m_addr.GetAddressData())->sin_addr.s_addr;
176#else
177 unsigned int addr = inet_addr(m_addr.IPAddress().mb_str());
178#endif
179 // Create the socket
180 switch (m_params.NetProtocol) {
181 case GPSD: {
182 OpenNetworkGpsd();
183 break;
184 }
185 case TCP: {
186 OpenNetworkTcp(addr);
187 break;
188 }
189 case UDP: {
190 OpenNetworkUdp(addr);
191 break;
192 }
193 default:
194 break;
195 }
196 m_ok = true;
197}
198
199void CommDriverN0183Net::OpenNetworkUdp(unsigned int addr) {
200 if (m_params.IOSelect != DS_TYPE_OUTPUT) {
201 // We need a local (bindable) address to create the Datagram receive socket
202 // Set up the reception socket
203 wxIPV4address conn_addr;
204 conn_addr.Service(std::to_string(m_params.NetworkPort));
205 conn_addr.AnyAddress();
206 conn_addr.AnyAddress();
207 m_sock =
208 new wxDatagramSocket(conn_addr, wxSOCKET_NOWAIT | wxSOCKET_REUSEADDR);
209
210 // Test if address is IPv4 multicast
211 if ((ntohl(addr) & 0xf0000000) == 0xe0000000) {
212 m_is_multicast = true;
213 m_mrq_container->SetMrqAddr(addr);
214 m_sock->SetOption(IPPROTO_IP, IP_ADD_MEMBERSHIP, &m_mrq_container->m_mrq,
215 sizeof(m_mrq_container->m_mrq));
216 }
217
218 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
219
220 m_sock->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_INPUT_FLAG |
221 wxSOCKET_LOST_FLAG);
222 m_sock->Notify(TRUE);
223 m_sock->SetTimeout(1); // Short timeout
224 m_driver_stats.available = true;
225 }
226
227 // Set up another socket for transmit
228 if (m_params.IOSelect != DS_TYPE_INPUT) {
229 wxIPV4address tconn_addr;
230 tconn_addr.Service(0); // use ephemeral out port
231 tconn_addr.AnyAddress();
232 m_tsock =
233 new wxDatagramSocket(tconn_addr, wxSOCKET_NOWAIT | wxSOCKET_REUSEADDR);
234 // Here would be the place to disable multicast loopback
235 // but for consistency with broadcast behaviour, we will
236 // instead rely on setting priority levels to ignore
237 // sentences read back that have just been transmitted
238 if (!m_is_multicast && IsBroadcastAddr(addr, g_netmask_bits)) {
239 int broadcastEnable = 1;
240 m_tsock->SetOption(SOL_SOCKET, SO_BROADCAST, &broadcastEnable,
241 sizeof(broadcastEnable));
242 m_driver_stats.available = true;
243 }
244 }
245
246 // In case the connection is lost before acquired....
247 m_connect_time = std::chrono::steady_clock::now();
248}
249
250void CommDriverN0183Net::OpenNetworkTcp(unsigned int addr) {
251 if (addr == INADDR_ANY) {
252 MESSAGE_LOG << "Listening for TCP connections on " << INADDR_ANY;
253 m_socket_server = new wxSocketServer(m_addr, wxSOCKET_REUSEADDR);
254 m_socket_server->SetEventHandler(*this, DS_SERVERSOCKET_ID);
255 m_socket_server->SetNotify(wxSOCKET_CONNECTION_FLAG);
256 m_socket_server->Notify(TRUE);
257 m_socket_server->SetTimeout(1); // Short timeout
258 } else {
259 MESSAGE_LOG << "Opening TCP connection to " << m_params.NetworkAddress
260 << ":" << m_params.NetworkPort;
261 m_sock = new wxSocketClient();
262 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
263 int notify_flags = (wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG);
264 if (m_params.IOSelect != DS_TYPE_INPUT)
265 notify_flags |= wxSOCKET_OUTPUT_FLAG;
266 if (m_params.IOSelect != DS_TYPE_OUTPUT)
267 notify_flags |= wxSOCKET_INPUT_FLAG;
268 m_sock->SetNotify(notify_flags);
269 m_sock->Notify(true);
270 m_sock->SetTimeout(1); // Short timeout
271
272 m_rx_connect_event = false;
273 m_socket_timer.Start(100, wxTIMER_ONE_SHOT); // schedule a connection
274 }
275
276 // In case the connection is lost before acquired....
277 m_connect_time = std::chrono::steady_clock::now();
278}
279
280void CommDriverN0183Net::OpenNetworkGpsd() {
281 m_sock = new wxSocketClient();
282 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
283 m_sock->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_INPUT_FLAG |
284 wxSOCKET_LOST_FLAG);
285 m_sock->Notify(TRUE);
286 m_sock->SetTimeout(1); // Short timeout
287
288 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
289 tcp_socket->Connect(m_addr, false);
290 m_rx_connect_event = false;
291}
292
293void CommDriverN0183Net::OnSocketReadWatchdogTimer() {
294 m_dog_value--;
295
296 if (m_dog_value <= 0) { // No receive in n seconds
297 if (GetParams().NoDataReconnect) {
298 // Reconnect on NO DATA is true, so try to reconnect now.
299 if (m_params.NetProtocol == TCP) {
300 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
301 if (tcp_socket) tcp_socket->Close();
302
303 int n_reconnect_delay = wxMax(N_DOG_TIMEOUT - 2, 2);
304 wxLogMessage("Reconnection scheduled in %d seconds.",
305 n_reconnect_delay);
306 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
307
308 // Stop DATA watchdog, will be restarted on successful connection.
309 m_socketread_watchdog_timer.Stop();
310 }
311 }
312 }
313}
314
315void CommDriverN0183Net::OnTimerSocket() {
316 // Attempt a connection
317 using namespace std::chrono;
318 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
319 if (tcp_socket) {
320 if (tcp_socket->IsDisconnected()) {
321 m_driver_stats.available = false;
322 wxLogDebug("Attempting reconnection...");
323 m_rx_connect_event = false;
324 // Stop DATA watchdog, may be restarted on successful connection.
325 m_socketread_watchdog_timer.Stop();
326 tcp_socket->Connect(m_addr, false);
327
328 // schedule another connection attempt, in case this one fails
329 int n_reconnect_delay = N_DOG_TIMEOUT;
330 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
331
332 // Possibly report connect error to GUI.
333 if (m_connect_time == time_point<steady_clock>()) return;
334 auto since_connect = steady_clock::now() - m_connect_time;
335 if (since_connect > 10s && !m_is_conn_err_reported) {
336 std::stringstream ss;
337 ss << _("Cannot connect to remote server ") << m_params.NetworkAddress
338 << ":" << m_params.NetworkPort;
339 CommDriverRegistry::GetInstance().evt_driver_msg.Notify(ss.str());
340 m_is_conn_err_reported = true;
341 m_driver_stats.error_count++;
342 }
343 }
344 }
345}
346
347void CommDriverN0183Net::HandleResume() {
348 // Attempt a stop and restart of connection
349 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
350 if (tcp_socket) {
351 m_socketread_watchdog_timer.Stop();
352
353 tcp_socket->Close();
354
355 // schedule reconnect attempt
356 int n_reconnect_delay = wxMax(N_DOG_TIMEOUT - 2, 2);
357 wxLogMessage("Reconnection scheduled in %d seconds.", n_reconnect_delay);
358
359 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
360 }
361}
362
363bool CommDriverN0183Net::SendMessage(std::shared_ptr<const NavMsg> msg,
364 std::shared_ptr<const NavAddr> addr) {
365 auto msg_0183 = std::dynamic_pointer_cast<const Nmea0183Msg>(msg);
366 std::string payload(msg_0183->payload);
367 if (!ocpn::endswith(payload, "\r\n")) payload += "\r\n";
368 m_driver_stats.tx_count += payload.size();
369 return SendSentenceNetwork(payload.c_str());
370}
371
372void CommDriverN0183Net::OnSocketEvent(wxSocketEvent& event) {
373#define RD_BUF_SIZE 4096
374 // Allows handling of high volume data streams, such as a National AIS
375 // stream with 100s of msgs a second.
376
377 switch (event.GetSocketEvent()) {
378 case wxSOCKET_INPUT: // from gpsd Daemon
379 {
380 // TODO determine if the following SetFlags needs to be done at every
381 // socket event or only once when socket is created, it it needs to be
382 // done at all!
383 // m_sock->SetFlags(wxSOCKET_WAITALL | wxSOCKET_BLOCK); // was
384 // (wxSOCKET_NOWAIT);
385
386 // We use wxSOCKET_BLOCK to avoid Yield() reentrancy problems
387 // if a long ProgressDialog is active, as in S57 SENC creation.
388
389 // Disable input event notifications to preclude re-entrancy on
390 // non-blocking socket
391 // m_sock->SetNotify(wxSOCKET_LOST_FLAG);
392 uint8_t buff[RD_BUF_SIZE + 1];
393 event.GetSocket()->Read(buff, RD_BUF_SIZE);
394 if (!event.GetSocket()->Error()) {
395 unsigned count = event.GetSocket()->LastCount();
396 for (unsigned i = 0; i < count; i += 1) n0183_buffer.Put(buff[i]);
397 while (n0183_buffer.HasSentence()) {
398 HandleN0183Msg(n0183_buffer.GetSentence() + "\r\n");
399 }
400 }
401 m_dog_value = N_DOG_TIMEOUT; // feed the dog
402 break;
403 }
404
405 case wxSOCKET_LOST: {
406 m_driver_stats.available = false;
407 using namespace std::chrono;
408 if (m_params.NetProtocol == TCP || m_params.NetProtocol == GPSD) {
409 if (m_rx_connect_event) {
410 MESSAGE_LOG << "NetworkDataStream connection lost: "
411 << m_params.GetDSPort();
412 }
413 if (m_socket_server) {
414 m_sock->Destroy();
415 m_sock = nullptr;
416 break;
417 }
418 auto since_connect = 10s;
419 // ten secs assumed, if connect time is uninitialized
420 auto now = steady_clock::now();
421 if (m_connect_time != time_point<steady_clock>())
422 since_connect = duration_cast<seconds>(now - m_connect_time);
423
424 auto retry_time = 5s; // default
425 // If the socket has never connected, and it is a short interval since
426 // the connect request then stretch the time a bit. This happens on
427 // Windows if there is no default IP on any interface
428 if (!m_rx_connect_event && (since_connect < 5s)) retry_time = 10s;
429
430 m_socketread_watchdog_timer.Stop();
431
432 // Schedule a re-connect attempt
433 m_socket_timer.Start(duration_cast<milliseconds>(retry_time).count(),
434 wxTIMER_ONE_SHOT);
435 }
436 break;
437 }
438
439 case wxSOCKET_CONNECTION: {
440 if (m_params.NetProtocol == GPSD) {
441 // Sign up for watcher mode, Cooked NMEA
442 // Note that SIRF devices will be converted by gpsd into
443 // pseudo-NMEA
444
445 char cmd[] = R"--(?WATCH={"class":"WATCH", "nmea":true})--";
446 m_sock->Write(cmd, strlen(cmd));
447 } else if (m_params.NetProtocol == TCP) {
448 MESSAGE_LOG << "TCP NetworkDataStream connection established: "
449 << m_params.GetDSPort();
450
451 m_dog_value = N_DOG_TIMEOUT; // feed the dog
452 if (m_params.IOSelect != DS_TYPE_OUTPUT) {
453 // start the DATA watchdog only if NODATA Reconnect is desired
454 if (GetParams().NoDataReconnect)
455 m_socketread_watchdog_timer.Start(1000);
456 }
457
458 if (m_params.IOSelect != DS_TYPE_INPUT && GetSock()->IsOk())
459 (void)SetOutputSocketOptions(m_sock);
460 m_socket_timer.Stop();
461 m_rx_connect_event = true;
462 }
463
464 m_driver_stats.available = true;
465 m_connect_time = std::chrono::steady_clock::now();
466 break;
467 }
468
469 default:
470 break;
471 }
472}
473
474void CommDriverN0183Net::OnServerSocketEvent(wxSocketEvent& event) {
475 switch (event.GetSocketEvent()) {
476 case wxSOCKET_CONNECTION: {
477 m_sock = m_socket_server->Accept(false);
478
479 if (GetSock()) {
480 m_sock->SetTimeout(2);
481 // GetSock()->SetFlags(wxSOCKET_BLOCK);
482 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
483 int notify_flags = (wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG);
484 if (m_params.IOSelect != DS_TYPE_INPUT) {
485 notify_flags |= wxSOCKET_OUTPUT_FLAG;
486 (void)SetOutputSocketOptions(m_sock);
487 }
488 if (m_params.IOSelect != DS_TYPE_OUTPUT)
489 notify_flags |= wxSOCKET_INPUT_FLAG;
490 m_sock->SetNotify(notify_flags);
491 m_sock->Notify(true);
492 }
493 break;
494 }
495 default:
496 break;
497 }
498}
499
500bool CommDriverN0183Net::SendSentenceNetwork(const wxString& payload) {
501 if (m_txenter)
502 return false; // do not allow recursion, could happen with non-blocking
503 // sockets
504 m_txenter++;
505
506 bool ret = true;
507 wxDatagramSocket* udp_socket;
508 switch (m_params.NetProtocol) {
509 case TCP:
510 if (GetSock() && GetSock()->IsOk()) {
511 m_sock->Write(payload.mb_str(), strlen(payload.mb_str()));
512 if (GetSock()->Error()) {
513 if (m_socket_server) {
514 m_sock->Destroy();
515 m_sock = nullptr;
516 } else {
517 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
518 if (tcp_socket) tcp_socket->Close();
519 if (!m_socket_timer.IsRunning())
520 m_socket_timer.Start(5000, wxTIMER_ONE_SHOT);
521 // schedule a reconnect
522 m_socketread_watchdog_timer.Stop();
523 }
524 ret = false;
525 }
526
527 } else
528 ret = false;
529 m_driver_stats.available = ret;
530
531 break;
532 case UDP:
533 udp_socket = dynamic_cast<wxDatagramSocket*>(m_tsock);
534 if (udp_socket && udp_socket->IsOk()) {
535 udp_socket->SendTo(m_addr, payload.mb_str(), payload.size());
536 if (udp_socket->Error()) ret = false;
537 } else {
538 ret = false;
539 }
540 m_driver_stats.available = ret;
541 break;
542
543 case GPSD:
544 default:
545 ret = false;
546 break;
547 }
548 m_txenter--;
549 return ret;
550}
551
552void CommDriverN0183Net::Close() {
553 MESSAGE_LOG << "Closing NMEA NetworkDataStream " << m_params.NetworkPort;
554 // Kill off the TCP Socket if alive
555 if (m_sock) {
556 if (m_is_multicast)
557 m_sock->SetOption(IPPROTO_IP, IP_DROP_MEMBERSHIP, &m_mrq_container->m_mrq,
558 sizeof(m_mrq_container->m_mrq));
559 m_sock->Notify(FALSE);
560 m_sock->Destroy();
561 }
562
563 if (m_tsock) {
564 m_tsock->Notify(FALSE);
565 m_tsock->Destroy();
566 }
567
568 if (m_socket_server) {
569 m_socket_server->Notify(FALSE);
570 m_socket_server->Destroy();
571 }
572
573 m_socket_timer.Stop();
574 m_socketread_watchdog_timer.Stop();
575 m_driver_stats.available = false;
576}
NMEA0183 drivers common part.
EventVar evt_driver_msg
Notified for messages from drivers.
Interface implemented by transport layer and possible other parties like test code which should handl...
Definition comm_driver.h:48
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
const void Notify()
Notify all listeners, no data supplied.
bool HasSentence() const
Return true if a sentence is available to be returned by GetSentence()
std::string GetSentence()
Retrieve a sentence from buffer.
void Put(uint8_t ch)
Add a single character, possibly making a sentence available.
Where messages are sent to or received from.
Custom event class for OpenCPN's notification system.
NMEA0183 over IP driver.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Enhanced logging interface on top of wx/log.h.
bool endswith(const std::string &str, const std::string &suffix)
Return true if s ends with given suffix.
unsigned tx_count
Number of bytes sent since program start.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.