OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_n0183_net.cpp
Go to the documentation of this file.
1/**************************************************************************
2 * Copyright (C) 2022 David Register *
3 * Copyright (C) 2022 Alec Leamas *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, see <https://www.gnu.org/licenses/>. *
17 **************************************************************************/
18
25#include <ctime>
26#include <deque>
27
28#ifdef __MSVC__
29#include "winsock2.h"
30#include <wx/msw/winundef.h>
31#include <ws2tcpip.h>
32#endif
33
34#ifndef _WIN32
35#include <arpa/inet.h>
36#include <netinet/tcp.h>
37#endif
38
39#include <wx/wxprec.h>
40#ifndef WX_PRECOMP
41#include <wx/wx.h>
42#endif
43
44#include <wx/datetime.h>
45#include <wx/socket.h>
46#include <wx/log.h>
47#include <wx/memory.h>
48#include <wx/chartype.h>
49#include <wx/sckaddr.h>
50
54#include "model/config_vars.h"
56#include "model/idents.h"
57#include "model/logger.h"
58#include "model/sys_events.h"
59
60#include "observable.h"
61
62using namespace std::literals::chrono_literals;
63
64#define N_DOG_TIMEOUT 8
65
67static bool IsBroadcastAddr(unsigned addr, unsigned netmask_bits) {
68 assert(netmask_bits <= 32);
69 uint32_t netmask = 0xffffffff << (32 - netmask_bits);
70 uint32_t host_mask = ~netmask;
71 return (addr & host_mask) == host_mask;
72}
73
75public:
76 struct ip_mreq m_mrq;
77 void SetMrqAddr(unsigned int addr) {
78 m_mrq.imr_multiaddr.s_addr = addr;
79 m_mrq.imr_interface.s_addr = INADDR_ANY;
80 }
81};
82
83static bool SetOutputSocketOptions(wxSocketBase* tsock) {
84 int ret;
85
86 // Disable nagle algorithm on outgoing connection
87 // Doing this here rather than after the accept() is
88 // pointless on platforms where TCP_NODELAY is
89 // not inherited. However, none of OpenCPN's currently
90 // supported platforms fall into that category.
91
92 int nagleDisable = 1;
93 ret = tsock->SetOption(IPPROTO_TCP, TCP_NODELAY, &nagleDisable,
94 sizeof(nagleDisable));
95
96 // Drastically reduce the size of the socket output buffer
97 // so that when client goes away without properly closing, the stream will
98 // quickly fill the output buffer, and thus fail the write() call
99 // within a few seconds.
100 unsigned long outbuf_size = 1024; // Smallest allowable value on Linux
101 return (tsock->SetOption(SOL_SOCKET, SO_SNDBUF, &outbuf_size,
102 sizeof(outbuf_size)) &&
103 ret);
104}
105
106//========================================================================
107/*
108 * CommDriverN0183Net implementation
109 */
110CommDriverN0183Net::CommDriverN0183Net(const ConnectionParams* params,
111 DriverListener& listener)
112 : CommDriverN0183(NavAddr::Bus::N0183, params->GetStrippedDSPort()),
113 m_params(*params),
114 m_listener(listener),
115 m_sock(nullptr),
116 m_tsock(nullptr),
117 m_socket_server(nullptr),
118 m_is_multicast(false),
119 m_stats_timer(*this, 2s),
120 m_txenter(0),
121 m_dog_value(0),
122 m_rx_connect_event(false),
123 m_socket_timer(*this),
124 m_socketread_watchdog_timer(*this),
125 m_ok(false),
126 m_is_conn_err_reported(false) {
127 m_addr.Hostname(params->NetworkAddress);
128 m_addr.Service(params->NetworkPort);
129 this->attributes["netAddress"] = params->NetworkAddress.ToStdString();
130 this->attributes["netPort"] = std::to_string(params->NetworkPort);
131 this->attributes["userComment"] = params->UserComment.ToStdString();
132 this->attributes["ioDirection"] = DsPortTypeToString(params->IOSelect);
133 m_driver_stats.driver_bus = NavAddr::Bus::N0183;
134 m_driver_stats.driver_iface = params->GetStrippedDSPort();
135
136 m_mrq_container = std::make_unique<MrqContainer>();
137
138 // Establish event listeners
139 resume_listener.Init(SystemEvents::GetInstance().evt_resume,
140 [&](ObservedEvt&) { HandleResume(); });
141 Bind(wxEVT_SOCKET, &CommDriverN0183Net::OnSocketEvent, this, DS_SOCKET_ID);
142 Bind(wxEVT_SOCKET, &CommDriverN0183Net::OnServerSocketEvent, this,
143 DS_SERVERSOCKET_ID);
144
145 Open();
146}
147
148CommDriverN0183Net::~CommDriverN0183Net() { Close(); }
149
150void CommDriverN0183Net::HandleN0183Msg(const std::string& sentence) {
151 // Sanity check
152 m_driver_stats.rx_count += sentence.size();
153 SendToListener(sentence, m_listener, m_params);
154}
155
156void CommDriverN0183Net::Open() {
157#ifdef __UNIX__
158 in_addr_t addr =
159 ((struct sockaddr_in*)m_addr.GetAddressData())->sin_addr.s_addr;
160#else
161 unsigned int addr = inet_addr(m_addr.IPAddress().mb_str());
162#endif
163 // Create the socket
164 switch (m_params.NetProtocol) {
165 case GPSD: {
166 OpenNetworkGpsd();
167 break;
168 }
169 case TCP: {
170 OpenNetworkTcp(addr);
171 break;
172 }
173 case UDP: {
174 OpenNetworkUdp(addr);
175 break;
176 }
177 default:
178 break;
179 }
180 m_ok = true;
181}
182
183void CommDriverN0183Net::OpenNetworkUdp(unsigned int addr) {
184 if (m_params.IOSelect != DS_TYPE_OUTPUT) {
185 // We need a local (bindable) address to create the Datagram receive socket
186 // Set up the reception socket
187 wxIPV4address conn_addr;
188 conn_addr.Service(std::to_string(m_params.NetworkPort));
189 conn_addr.AnyAddress();
190 conn_addr.AnyAddress();
191 m_sock =
192 new wxDatagramSocket(conn_addr, wxSOCKET_NOWAIT | wxSOCKET_REUSEADDR);
193
194 // Test if address is IPv4 multicast
195 if ((ntohl(addr) & 0xf0000000) == 0xe0000000) {
196 m_is_multicast = true;
197 m_mrq_container->SetMrqAddr(addr);
198 m_sock->SetOption(IPPROTO_IP, IP_ADD_MEMBERSHIP, &m_mrq_container->m_mrq,
199 sizeof(m_mrq_container->m_mrq));
200 }
201
202 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
203
204 m_sock->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_INPUT_FLAG |
205 wxSOCKET_LOST_FLAG);
206 m_sock->Notify(TRUE);
207 m_sock->SetTimeout(1); // Short timeout
208 m_driver_stats.available = true;
209 }
210
211 // Set up another socket for transmit
212 if (m_params.IOSelect != DS_TYPE_INPUT) {
213 wxIPV4address tconn_addr;
214 tconn_addr.Service(0); // use ephemeral out port
215 tconn_addr.AnyAddress();
216 m_tsock =
217 new wxDatagramSocket(tconn_addr, wxSOCKET_NOWAIT | wxSOCKET_REUSEADDR);
218 // Here would be the place to disable multicast loopback
219 // but for consistency with broadcast behaviour, we will
220 // instead rely on setting priority levels to ignore
221 // sentences read back that have just been transmitted
222 if (!m_is_multicast && IsBroadcastAddr(addr, g_netmask_bits)) {
223 int broadcastEnable = 1;
224 m_tsock->SetOption(SOL_SOCKET, SO_BROADCAST, &broadcastEnable,
225 sizeof(broadcastEnable));
226 m_driver_stats.available = true;
227 }
228 }
229
230 // In case the connection is lost before acquired....
231 m_connect_time = std::chrono::steady_clock::now();
232}
233
234void CommDriverN0183Net::OpenNetworkTcp(unsigned int addr) {
235 if (addr == INADDR_ANY) {
236 MESSAGE_LOG << "Listening for TCP connections on " << INADDR_ANY;
237 m_socket_server = new wxSocketServer(m_addr, wxSOCKET_REUSEADDR);
238 m_socket_server->SetEventHandler(*this, DS_SERVERSOCKET_ID);
239 m_socket_server->SetNotify(wxSOCKET_CONNECTION_FLAG);
240 m_socket_server->Notify(TRUE);
241 m_socket_server->SetTimeout(1); // Short timeout
242 } else {
243 MESSAGE_LOG << "Opening TCP connection to " << m_params.NetworkAddress
244 << ":" << m_params.NetworkPort;
245 m_sock = new wxSocketClient();
246 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
247 int notify_flags = (wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG);
248 if (m_params.IOSelect != DS_TYPE_INPUT)
249 notify_flags |= wxSOCKET_OUTPUT_FLAG;
250 if (m_params.IOSelect != DS_TYPE_OUTPUT)
251 notify_flags |= wxSOCKET_INPUT_FLAG;
252 m_sock->SetNotify(notify_flags);
253 m_sock->Notify(true);
254 m_sock->SetTimeout(1); // Short timeout
255
256 m_rx_connect_event = false;
257 m_socket_timer.Start(100, wxTIMER_ONE_SHOT); // schedule a connection
258 }
259
260 // In case the connection is lost before acquired....
261 m_connect_time = std::chrono::steady_clock::now();
262}
263
264void CommDriverN0183Net::OpenNetworkGpsd() {
265 m_sock = new wxSocketClient();
266 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
267 m_sock->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_INPUT_FLAG |
268 wxSOCKET_LOST_FLAG);
269 m_sock->Notify(TRUE);
270 m_sock->SetTimeout(1); // Short timeout
271
272 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
273 tcp_socket->Connect(m_addr, false);
274 m_rx_connect_event = false;
275}
276
277void CommDriverN0183Net::OnSocketReadWatchdogTimer() {
278 m_dog_value--;
279
280 if (m_dog_value <= 0) { // No receive in n seconds
281 if (GetParams().NoDataReconnect) {
282 // Reconnect on NO DATA is true, so try to reconnect now.
283 if (m_params.NetProtocol == TCP) {
284 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
285 if (tcp_socket) tcp_socket->Close();
286
287 int n_reconnect_delay = wxMax(N_DOG_TIMEOUT - 2, 2);
288 wxLogMessage("Reconnection scheduled in %d seconds.",
289 n_reconnect_delay);
290 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
291
292 // Stop DATA watchdog, will be restarted on successful connection.
293 m_socketread_watchdog_timer.Stop();
294 }
295 }
296 }
297}
298
299void CommDriverN0183Net::OnTimerSocket() {
300 // Attempt a connection
301 using namespace std::chrono;
302 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
303 if (tcp_socket) {
304 if (tcp_socket->IsDisconnected()) {
305 m_driver_stats.available = false;
306 wxLogDebug("Attempting reconnection...");
307 m_rx_connect_event = false;
308 // Stop DATA watchdog, may be restarted on successful connection.
309 m_socketread_watchdog_timer.Stop();
310 tcp_socket->Connect(m_addr, false);
311
312 // schedule another connection attempt, in case this one fails
313 int n_reconnect_delay = N_DOG_TIMEOUT;
314 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
315
316 // Possibly report connect error to GUI.
317 if (m_connect_time == time_point<steady_clock>()) return;
318 auto since_connect = steady_clock::now() - m_connect_time;
319 if (since_connect > 10s && !m_is_conn_err_reported) {
320 std::stringstream ss;
321 ss << _("Cannot connect to remote server ") << m_params.NetworkAddress
322 << ":" << m_params.NetworkPort;
323 CommDriverRegistry::GetInstance().evt_driver_msg.Notify(ss.str());
324 m_is_conn_err_reported = true;
325 m_driver_stats.error_count++;
326 }
327 }
328 }
329}
330
331void CommDriverN0183Net::HandleResume() {
332 // Attempt a stop and restart of connection
333 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
334 if (tcp_socket) {
335 m_socketread_watchdog_timer.Stop();
336
337 tcp_socket->Close();
338
339 // schedule reconnect attempt
340 int n_reconnect_delay = wxMax(N_DOG_TIMEOUT - 2, 2);
341 wxLogMessage("Reconnection scheduled in %d seconds.", n_reconnect_delay);
342
343 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
344 }
345}
346
347bool CommDriverN0183Net::SendMessage(std::shared_ptr<const NavMsg> msg,
348 std::shared_ptr<const NavAddr> addr) {
349 auto msg_0183 = std::dynamic_pointer_cast<const Nmea0183Msg>(msg);
350 std::string payload(msg_0183->payload);
351 if (!ocpn::endswith(payload, "\r\n")) payload += "\r\n";
352 m_driver_stats.tx_count += payload.size();
353 return SendSentenceNetwork(payload.c_str());
354}
355
356void CommDriverN0183Net::OnSocketEvent(wxSocketEvent& event) {
357#define RD_BUF_SIZE 4096
358 // Allows handling of high volume data streams, such as a National AIS
359 // stream with 100s of msgs a second.
360
361 switch (event.GetSocketEvent()) {
362 case wxSOCKET_INPUT: // from gpsd Daemon
363 {
364 // TODO determine if the following SetFlags needs to be done at every
365 // socket event or only once when socket is created, it it needs to be
366 // done at all!
367 // m_sock->SetFlags(wxSOCKET_WAITALL | wxSOCKET_BLOCK); // was
368 // (wxSOCKET_NOWAIT);
369
370 // We use wxSOCKET_BLOCK to avoid Yield() reentrancy problems
371 // if a long ProgressDialog is active, as in S57 SENC creation.
372
373 // Disable input event notifications to preclude re-entrancy on
374 // non-blocking socket
375 // m_sock->SetNotify(wxSOCKET_LOST_FLAG);
376 uint8_t buff[RD_BUF_SIZE + 1];
377 event.GetSocket()->Read(buff, RD_BUF_SIZE);
378 if (!event.GetSocket()->Error()) {
379 unsigned count = event.GetSocket()->LastCount();
380 for (unsigned i = 0; i < count; i += 1) n0183_buffer.Put(buff[i]);
381 while (n0183_buffer.HasSentence()) {
382 HandleN0183Msg(n0183_buffer.GetSentence() + "\r\n");
383 }
384 }
385 m_dog_value = N_DOG_TIMEOUT; // feed the dog
386 break;
387 }
388
389 case wxSOCKET_LOST: {
390 m_driver_stats.available = false;
391 using namespace std::chrono;
392 if (m_params.NetProtocol == TCP || m_params.NetProtocol == GPSD) {
393 if (m_rx_connect_event) {
394 MESSAGE_LOG << "NetworkDataStream connection lost: "
395 << m_params.GetDSPort();
396 }
397 if (m_socket_server) {
398 m_sock->Destroy();
399 m_sock = nullptr;
400 break;
401 }
402 auto since_connect = 10s;
403 // ten secs assumed, if connect time is uninitialized
404 auto now = steady_clock::now();
405 if (m_connect_time != time_point<steady_clock>())
406 since_connect = duration_cast<seconds>(now - m_connect_time);
407
408 auto retry_time = 5s; // default
409 // If the socket has never connected, and it is a short interval since
410 // the connect request then stretch the time a bit. This happens on
411 // Windows if there is no default IP on any interface
412 if (!m_rx_connect_event && (since_connect < 5s)) retry_time = 10s;
413
414 m_socketread_watchdog_timer.Stop();
415
416 // Schedule a re-connect attempt
417 m_socket_timer.Start(duration_cast<milliseconds>(retry_time).count(),
418 wxTIMER_ONE_SHOT);
419 }
420 break;
421 }
422
423 case wxSOCKET_CONNECTION: {
424 if (m_params.NetProtocol == GPSD) {
425 // Sign up for watcher mode, Cooked NMEA
426 // Note that SIRF devices will be converted by gpsd into
427 // pseudo-NMEA
428
429 char cmd[] = R"--(?WATCH={"class":"WATCH", "nmea":true})--";
430 m_sock->Write(cmd, strlen(cmd));
431 } else if (m_params.NetProtocol == TCP) {
432 MESSAGE_LOG << "TCP NetworkDataStream connection established: "
433 << m_params.GetDSPort();
434
435 m_dog_value = N_DOG_TIMEOUT; // feed the dog
436 if (m_params.IOSelect != DS_TYPE_OUTPUT) {
437 // start the DATA watchdog only if NODATA Reconnect is desired
438 if (GetParams().NoDataReconnect)
439 m_socketread_watchdog_timer.Start(1000);
440 }
441
442 if (m_params.IOSelect != DS_TYPE_INPUT && GetSock()->IsOk())
443 (void)SetOutputSocketOptions(m_sock);
444 m_socket_timer.Stop();
445 m_rx_connect_event = true;
446 }
447
448 m_driver_stats.available = true;
449 m_connect_time = std::chrono::steady_clock::now();
450 break;
451 }
452
453 default:
454 break;
455 }
456}
457
458void CommDriverN0183Net::OnServerSocketEvent(wxSocketEvent& event) {
459 switch (event.GetSocketEvent()) {
460 case wxSOCKET_CONNECTION: {
461 m_sock = m_socket_server->Accept(false);
462
463 if (GetSock()) {
464 m_sock->SetTimeout(2);
465 // GetSock()->SetFlags(wxSOCKET_BLOCK);
466 m_sock->SetEventHandler(*this, DS_SOCKET_ID);
467 int notify_flags = (wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG);
468 if (m_params.IOSelect != DS_TYPE_INPUT) {
469 notify_flags |= wxSOCKET_OUTPUT_FLAG;
470 (void)SetOutputSocketOptions(m_sock);
471 }
472 if (m_params.IOSelect != DS_TYPE_OUTPUT)
473 notify_flags |= wxSOCKET_INPUT_FLAG;
474 m_sock->SetNotify(notify_flags);
475 m_sock->Notify(true);
476 }
477 break;
478 }
479 default:
480 break;
481 }
482}
483
484bool CommDriverN0183Net::SendSentenceNetwork(const wxString& payload) {
485 if (m_txenter)
486 return false; // do not allow recursion, could happen with non-blocking
487 // sockets
488 m_txenter++;
489
490 bool ret = true;
491 wxDatagramSocket* udp_socket;
492 switch (m_params.NetProtocol) {
493 case TCP:
494 if (GetSock() && GetSock()->IsOk()) {
495 m_sock->Write(payload.mb_str(), strlen(payload.mb_str()));
496 m_dog_value = N_DOG_TIMEOUT; // feed the dog
497 if (GetSock()->Error()) {
498 if (m_socket_server) {
499 m_sock->Destroy();
500 m_sock = nullptr;
501 } else {
502 auto* tcp_socket = dynamic_cast<wxSocketClient*>(m_sock);
503 if (tcp_socket) tcp_socket->Close();
504 if (!m_socket_timer.IsRunning())
505 m_socket_timer.Start(5000, wxTIMER_ONE_SHOT);
506 // schedule a reconnect
507 m_socketread_watchdog_timer.Stop();
508 }
509 ret = false;
510 }
511
512 } else
513 ret = false;
514 m_driver_stats.available = ret;
515
516 break;
517 case UDP:
518 udp_socket = dynamic_cast<wxDatagramSocket*>(m_tsock);
519 if (udp_socket && udp_socket->IsOk()) {
520 udp_socket->SendTo(m_addr, payload.mb_str(), payload.size());
521 m_dog_value = N_DOG_TIMEOUT; // feed the dog
522 if (udp_socket->Error()) ret = false;
523 } else {
524 ret = false;
525 }
526 m_driver_stats.available = ret;
527 break;
528
529 case GPSD:
530 default:
531 ret = false;
532 break;
533 }
534 m_txenter--;
535 return ret;
536}
537
538void CommDriverN0183Net::Close() {
539 MESSAGE_LOG << "Closing NMEA NetworkDataStream " << m_params.NetworkPort;
540 m_stats_timer.Stop();
541 // Kill off the TCP Socket if alive
542 if (m_sock) {
543 if (m_is_multicast)
544 m_sock->SetOption(IPPROTO_IP, IP_DROP_MEMBERSHIP, &m_mrq_container->m_mrq,
545 sizeof(m_mrq_container->m_mrq));
546 m_sock->Notify(FALSE);
547 m_sock->Destroy();
548 }
549
550 if (m_tsock) {
551 m_tsock->Notify(FALSE);
552 m_tsock->Destroy();
553 }
554
555 if (m_socket_server) {
556 m_socket_server->Notify(FALSE);
557 m_socket_server->Destroy();
558 }
559
560 m_socket_timer.Stop();
561 m_socketread_watchdog_timer.Stop();
562 m_driver_stats.available = false;
563}
NMEA0183 basic parsing common parts:
void SendToListener(const std::string &payload, DriverListener &listener, const ConnectionParams &params)
Wrap argument string in NavMsg pointer, forward to listener.
EventVar evt_driver_msg
Notified for messages from drivers.
Interface for handling incoming messages.
Definition comm_driver.h:50
void Notify() override
Notify all listeners, no data supplied.
bool HasSentence() const
Return true if a sentence is available to be returned by GetSentence()
std::string GetSentence()
Retrieve a sentence from buffer.
void Put(uint8_t ch)
Add a single character, possibly making a sentence available.
Where messages are sent to or received from.
Custom event class for OpenCPN's notification system.
NMEA0183 over IP driver.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Global variables stored in configuration file.
std::string DsPortTypeToString(dsPortType type)
Return textual representation for use in driver ioDirection attribute.
NMEA Data Object.
GUI constant definitions.
Enhanced logging interface on top of wx/log.h.
bool endswith(const std::string &str, const std::string &suffix)
Return true if s ends with given suffix.
General observable implementation with several specializations.
unsigned tx_count
Number of bytes sent since program start.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.