28#include <wx/msw/winundef.h>
43#include <netinet/tcp.h>
46#include <wx/datetime.h>
50#include <wx/chartype.h>
51#include <wx/sckaddr.h>
56#include "model/config_vars.h"
57#include "model/garmin_protocol_mgr.h"
58#include "model/idents.h"
62#include "observable.h"
64using namespace std::literals::chrono_literals;
66#define N_DOG_TIMEOUT 8
69static bool IsBroadcastAddr(
unsigned addr,
unsigned netmask_bits) {
70 assert(netmask_bits <= 32);
71 uint32_t netmask = 0xffffffff << (32 - netmask_bits);
72 uint32_t host_mask = ~netmask;
73 return (addr & host_mask) == host_mask;
79 void SetMrqAddr(
unsigned int addr) {
80 m_mrq.imr_multiaddr.s_addr = addr;
81 m_mrq.imr_interface.s_addr = INADDR_ANY;
85static bool SetOutputSocketOptions(wxSocketBase* tsock) {
95 ret = tsock->SetOption(IPPROTO_TCP, TCP_NODELAY, &nagleDisable,
96 sizeof(nagleDisable));
102 unsigned long outbuf_size = 1024;
103 return (tsock->SetOption(SOL_SOCKET, SO_SNDBUF, &outbuf_size,
104 sizeof(outbuf_size)) &&
116 m_listener(listener),
119 m_socket_server(nullptr),
120 m_is_multicast(false),
121 m_stats_timer(*this, 2s),
124 m_rx_connect_event(false),
125 m_socket_timer(*this),
126 m_socketread_watchdog_timer(*this),
128 m_is_conn_err_reported(false) {
129 m_addr.Hostname(params->NetworkAddress);
130 m_addr.Service(params->NetworkPort);
131 this->attributes[
"netAddress"] = params->NetworkAddress.ToStdString();
132 this->attributes[
"netPort"] = std::to_string(params->NetworkPort);
133 this->attributes[
"userComment"] = params->UserComment.ToStdString();
134 this->attributes[
"ioDirection"] = DsPortTypeToString(params->IOSelect);
135 m_driver_stats.driver_bus = NavAddr::Bus::N0183;
136 m_driver_stats.driver_iface = params->GetStrippedDSPort();
138 m_mrq_container = std::make_unique<MrqContainer>();
141 resume_listener.Init(SystemEvents::GetInstance().evt_resume,
143 Bind(wxEVT_SOCKET, &CommDriverN0183Net::OnSocketEvent,
this, DS_SOCKET_ID);
144 Bind(wxEVT_SOCKET, &CommDriverN0183Net::OnServerSocketEvent,
this,
150CommDriverN0183Net::~CommDriverN0183Net() { Close(); }
152void CommDriverN0183Net::HandleN0183Msg(
const std::string& sentence) {
154 if ((sentence[0] ==
'$' || sentence[0] ==
'!') && sentence.size() > 5) {
155 m_driver_stats.
rx_count += sentence.size();
156 std::string identifier;
158 identifier = sentence.substr(1, 5);
163 std::make_shared<const Nmea0183Msg>(identifier, sentence, GetAddress());
164 auto msg_all = std::make_shared<const Nmea0183Msg>(*msg,
"ALL");
166 if (m_params.SentencePassesFilter(sentence, FILTER_INPUT))
167 m_listener.
Notify(std::move(msg));
168 m_listener.
Notify(std::move(msg_all));
172void CommDriverN0183Net::Open() {
175 ((
struct sockaddr_in*)m_addr.GetAddressData())->sin_addr.s_addr;
177 unsigned int addr = inet_addr(m_addr.IPAddress().mb_str());
180 switch (m_params.NetProtocol) {
186 OpenNetworkTcp(addr);
190 OpenNetworkUdp(addr);
199void CommDriverN0183Net::OpenNetworkUdp(
unsigned int addr) {
200 if (m_params.IOSelect != DS_TYPE_OUTPUT) {
203 wxIPV4address conn_addr;
204 conn_addr.Service(std::to_string(m_params.NetworkPort));
205 conn_addr.AnyAddress();
206 conn_addr.AnyAddress();
208 new wxDatagramSocket(conn_addr, wxSOCKET_NOWAIT | wxSOCKET_REUSEADDR);
211 if ((ntohl(addr) & 0xf0000000) == 0xe0000000) {
212 m_is_multicast =
true;
213 m_mrq_container->SetMrqAddr(addr);
214 m_sock->SetOption(IPPROTO_IP, IP_ADD_MEMBERSHIP, &m_mrq_container->m_mrq,
215 sizeof(m_mrq_container->m_mrq));
218 m_sock->SetEventHandler(*
this, DS_SOCKET_ID);
220 m_sock->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_INPUT_FLAG |
222 m_sock->Notify(TRUE);
223 m_sock->SetTimeout(1);
224 m_driver_stats.available =
true;
228 if (m_params.IOSelect != DS_TYPE_INPUT) {
229 wxIPV4address tconn_addr;
230 tconn_addr.Service(0);
231 tconn_addr.AnyAddress();
233 new wxDatagramSocket(tconn_addr, wxSOCKET_NOWAIT | wxSOCKET_REUSEADDR);
238 if (!m_is_multicast && IsBroadcastAddr(addr, g_netmask_bits)) {
239 int broadcastEnable = 1;
240 m_tsock->SetOption(SOL_SOCKET, SO_BROADCAST, &broadcastEnable,
241 sizeof(broadcastEnable));
246 m_connect_time = std::chrono::steady_clock::now();
249void CommDriverN0183Net::OpenNetworkTcp(
unsigned int addr) {
250 if (addr == INADDR_ANY) {
251 MESSAGE_LOG <<
"Listening for TCP connections on " << INADDR_ANY;
252 m_socket_server =
new wxSocketServer(m_addr, wxSOCKET_REUSEADDR);
253 m_socket_server->SetEventHandler(*
this, DS_SERVERSOCKET_ID);
254 m_socket_server->SetNotify(wxSOCKET_CONNECTION_FLAG);
255 m_socket_server->Notify(TRUE);
256 m_socket_server->SetTimeout(1);
258 MESSAGE_LOG <<
"Opening TCP connection to " << m_params.NetworkAddress
259 <<
":" << m_params.NetworkPort;
260 m_sock =
new wxSocketClient();
261 m_sock->SetEventHandler(*
this, DS_SOCKET_ID);
262 int notify_flags = (wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG);
263 if (m_params.IOSelect != DS_TYPE_INPUT)
264 notify_flags |= wxSOCKET_OUTPUT_FLAG;
265 if (m_params.IOSelect != DS_TYPE_OUTPUT)
266 notify_flags |= wxSOCKET_INPUT_FLAG;
267 m_sock->SetNotify(notify_flags);
268 m_sock->Notify(
true);
269 m_sock->SetTimeout(1);
271 m_rx_connect_event =
false;
272 m_socket_timer.Start(100, wxTIMER_ONE_SHOT);
276 m_connect_time = std::chrono::steady_clock::now();
279void CommDriverN0183Net::OpenNetworkGpsd() {
280 m_sock =
new wxSocketClient();
281 m_sock->SetEventHandler(*
this, DS_SOCKET_ID);
282 m_sock->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_INPUT_FLAG |
284 m_sock->Notify(TRUE);
285 m_sock->SetTimeout(1);
287 auto* tcp_socket =
dynamic_cast<wxSocketClient*
>(m_sock);
288 tcp_socket->Connect(m_addr,
false);
289 m_rx_connect_event =
false;
292void CommDriverN0183Net::OnSocketReadWatchdogTimer() {
295 if (m_dog_value <= 0) {
296 if (GetParams().NoDataReconnect) {
298 if (m_params.NetProtocol == TCP) {
299 auto* tcp_socket =
dynamic_cast<wxSocketClient*
>(m_sock);
300 if (tcp_socket) tcp_socket->Close();
302 int n_reconnect_delay = wxMax(N_DOG_TIMEOUT - 2, 2);
303 wxLogMessage(
"Reconnection scheduled in %d seconds.",
305 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
308 m_socketread_watchdog_timer.Stop();
314void CommDriverN0183Net::OnTimerSocket() {
316 using namespace std::chrono;
317 auto* tcp_socket =
dynamic_cast<wxSocketClient*
>(m_sock);
319 if (tcp_socket->IsDisconnected()) {
320 m_driver_stats.available =
false;
321 wxLogDebug(
"Attempting reconnection...");
322 m_rx_connect_event =
false;
324 m_socketread_watchdog_timer.Stop();
325 tcp_socket->Connect(m_addr,
false);
328 int n_reconnect_delay = N_DOG_TIMEOUT;
329 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
332 if (m_connect_time == time_point<steady_clock>())
return;
333 auto since_connect = steady_clock::now() - m_connect_time;
334 if (since_connect > 10s && !m_is_conn_err_reported) {
335 std::stringstream ss;
336 ss <<
"Cannot connect to remote server " << m_params.NetworkAddress
337 <<
":" << m_params.NetworkPort;
339 m_is_conn_err_reported =
true;
346void CommDriverN0183Net::HandleResume() {
348 auto* tcp_socket =
dynamic_cast<wxSocketClient*
>(m_sock);
350 m_socketread_watchdog_timer.Stop();
355 int n_reconnect_delay = wxMax(N_DOG_TIMEOUT - 2, 2);
356 wxLogMessage(
"Reconnection scheduled in %d seconds.", n_reconnect_delay);
358 m_socket_timer.Start(n_reconnect_delay * 1000, wxTIMER_ONE_SHOT);
362bool CommDriverN0183Net::SendMessage(std::shared_ptr<const NavMsg> msg,
363 std::shared_ptr<const NavAddr> addr) {
364 auto msg_0183 = std::dynamic_pointer_cast<const Nmea0183Msg>(msg);
365 std::string payload(msg_0183->payload);
366 if (!ocpn::endswith(payload,
"\r\n")) payload +=
"\r\n";
367 m_driver_stats.
tx_count += payload.size();
368 return SendSentenceNetwork(payload.c_str());
371void CommDriverN0183Net::OnSocketEvent(wxSocketEvent& event) {
372#define RD_BUF_SIZE 4096
376 switch (event.GetSocketEvent()) {
391 uint8_t buff[RD_BUF_SIZE + 1];
392 event.GetSocket()->Read(buff, RD_BUF_SIZE);
393 if (!event.GetSocket()->Error()) {
394 unsigned count =
event.GetSocket()->LastCount();
395 for (
unsigned i = 0; i < count; i += 1) n0183_buffer.
Put(buff[i]);
397 HandleN0183Msg(n0183_buffer.
GetSentence() +
"\r\n");
400 m_dog_value = N_DOG_TIMEOUT;
404 case wxSOCKET_LOST: {
405 m_driver_stats.available =
false;
406 using namespace std::chrono;
407 if (m_params.NetProtocol == TCP || m_params.NetProtocol == GPSD) {
408 if (m_rx_connect_event) {
409 MESSAGE_LOG <<
"NetworkDataStream connection lost: "
410 << m_params.GetDSPort();
412 if (m_socket_server) {
417 auto since_connect = 10s;
419 auto now = steady_clock::now();
420 if (m_connect_time != time_point<steady_clock>())
421 since_connect = duration_cast<seconds>(now - m_connect_time);
423 auto retry_time = 5s;
427 if (!m_rx_connect_event && (since_connect < 5s)) retry_time = 10s;
429 m_socketread_watchdog_timer.Stop();
432 m_socket_timer.Start(duration_cast<milliseconds>(retry_time).count(),
438 case wxSOCKET_CONNECTION: {
439 if (m_params.NetProtocol == GPSD) {
444 char cmd[] = R
"--(?WATCH={"class":"WATCH", "nmea":true})--";
445 m_sock->Write(cmd, strlen(cmd));
446 } else if (m_params.NetProtocol == TCP) {
447 MESSAGE_LOG <<
"TCP NetworkDataStream connection established: "
448 << m_params.GetDSPort();
450 m_dog_value = N_DOG_TIMEOUT;
451 if (m_params.IOSelect != DS_TYPE_OUTPUT) {
453 if (GetParams().NoDataReconnect)
454 m_socketread_watchdog_timer.Start(1000);
457 if (m_params.IOSelect != DS_TYPE_INPUT && GetSock()->IsOk())
458 (void)SetOutputSocketOptions(m_sock);
459 m_socket_timer.Stop();
460 m_rx_connect_event =
true;
463 m_driver_stats.available =
true;
464 m_connect_time = std::chrono::steady_clock::now();
473void CommDriverN0183Net::OnServerSocketEvent(wxSocketEvent& event) {
474 switch (event.GetSocketEvent()) {
475 case wxSOCKET_CONNECTION: {
476 m_sock = m_socket_server->Accept(
false);
479 m_sock->SetTimeout(2);
481 m_sock->SetEventHandler(*
this, DS_SOCKET_ID);
482 int notify_flags = (wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG);
483 if (m_params.IOSelect != DS_TYPE_INPUT) {
484 notify_flags |= wxSOCKET_OUTPUT_FLAG;
485 (void)SetOutputSocketOptions(m_sock);
487 if (m_params.IOSelect != DS_TYPE_OUTPUT)
488 notify_flags |= wxSOCKET_INPUT_FLAG;
489 m_sock->SetNotify(notify_flags);
490 m_sock->Notify(
true);
499bool CommDriverN0183Net::SendSentenceNetwork(
const wxString& payload) {
506 wxDatagramSocket* udp_socket;
507 switch (m_params.NetProtocol) {
509 if (GetSock() && GetSock()->IsOk()) {
510 m_sock->Write(payload.mb_str(), strlen(payload.mb_str()));
511 if (GetSock()->Error()) {
512 if (m_socket_server) {
516 auto* tcp_socket =
dynamic_cast<wxSocketClient*
>(m_sock);
517 if (tcp_socket) tcp_socket->Close();
518 if (!m_socket_timer.IsRunning())
519 m_socket_timer.Start(5000, wxTIMER_ONE_SHOT);
521 m_socketread_watchdog_timer.Stop();
530 udp_socket =
dynamic_cast<wxDatagramSocket*
>(m_tsock);
531 if (udp_socket && udp_socket->IsOk()) {
532 udp_socket->SendTo(m_addr, payload.mb_str(), payload.size());
533 if (udp_socket->Error()) ret =
false;
537 m_driver_stats.available = ret;
549void CommDriverN0183Net::Close() {
550 MESSAGE_LOG <<
"Closing NMEA NetworkDataStream " << m_params.NetworkPort;
554 m_sock->SetOption(IPPROTO_IP, IP_DROP_MEMBERSHIP, &m_mrq_container->m_mrq,
555 sizeof(m_mrq_container->m_mrq));
556 m_sock->Notify(FALSE);
561 m_tsock->Notify(FALSE);
565 if (m_socket_server) {
566 m_socket_server->Notify(FALSE);
567 m_socket_server->Destroy();
570 m_socket_timer.Stop();
571 m_socketread_watchdog_timer.Stop();
572 m_driver_stats.available =
false;
NMEA0183 drivers common part.
EventVar evt_driver_msg
Notified for messages from drivers.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
const void Notify()
Notify all listeners, no data supplied.
bool HasSentence() const
Return true if a sentence is available to be returned by GetSentence()
std::string GetSentence()
Retrieve a sentence from buffer.
void Put(uint8_t ch)
Add a single character, possibly making a sentence available.
Where messages are sent to or received from.
Adds a std::shared<void> element to wxCommandEvent.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Enhanced logging interface on top of wx/log.h.
unsigned tx_count
Number of bytes sent since program start.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.