24 struct CORELINK_EXPORT request_completion_pack
29 std::shared_ptr<request_response::requests::corelink_server_request_base> request{};
32 request_completion_pack()
36 channel_id(rhs.channel_id), function(rhs.function),
37 request_response_handler_descriptor(rhs.request_response_handler_descriptor),
39 completion_handler(rhs.completion_handler)
43 channel_id(rhs.channel_id), function(rhs.function),
44 request_response_handler_descriptor(std::move(rhs.request_response_handler_descriptor)),
45 request(std::move(rhs.request)),
46 completion_handler(std::move(rhs.completion_handler))
54 std::shared_ptr<core::network::corelink_data_xchg_protocol>> m_protocol_managers{};
60 std::shared_ptr<corelink_client_channel_base_descriptor>> m_corelink_channels{};
62#if defined(CORELINK_USE_TCP) || defined(CORELINK_USE_UDP)
66 std::shared_ptr<core::network::corelink_data_xchg_raw_socket_protocol_context_manager> m_context_manager{};
72 std::unordered_map<std::string, request_completion_pack> m_completion_handlers{};
81 auto response_msg = std::make_shared<request_response::responses::corelink_server_response_base>();
82 response_msg->message = response.get_str(
"message");
83 response_msg->status_code = response.get_int(
"statusCode", -1);
85 if (response_msg->status_code == 0)
87 auto control_channel_descriptor = std::static_pointer_cast<corelink_client_control_channel_descriptor>(
88 m_corelink_channels.at(channel_id));
89 control_channel_descriptor->auth_token = response()[
"token"].GetString();
90 control_channel_descriptor->client_ip = response()[
"IP"].GetString();
93 if (rq_rp_params.completion_handler)
94 rq_rp_params.completion_handler(channel_id,
"", response_msg);
102 auto mod_data_stream_req =
103 std::static_pointer_cast<request_response::requests::modify_data_stream_request_base>(
104 rq_rp_params.request);
105 auto proto_manager = m_protocol_managers[mod_data_stream_req->protocol()];
107 data_channel_descriptor.
stream_id = response.get_int(
"streamID");
108 data_channel_descriptor.
max_tx_unit = response.get_int(
"MTU");
110 std::shared_ptr<core::network::ip_protocol_channel_descriptor> descriptor;
111 switch (mod_data_stream_req->protocol())
113#ifdef CORELINK_USE_WEBSOCKET
114 case core::network::constants::protocols::websocket():
117 descriptor = std::make_shared<core::network::websocket_protocol_per_channel_descriptor>(
118 mod_data_stream_req->client_certificate_path);
122#ifdef CORELINK_USE_TCP
123 case core::network::constants::protocols::tcp():
125 descriptor = std::make_shared<core::network::tcp_protocol_per_channel_descriptor>();
129#ifdef CORELINK_USE_UDP
130 case core::network::constants::protocols::udp():
132 descriptor = std::make_shared<core::network::udp_protocol_per_channel_descriptor>();
139 descriptor->on_init = mod_data_stream_req->on_init;
140 descriptor->on_uninit = mod_data_stream_req->on_uninit;
141 auto sender_stream_req = std::static_pointer_cast<request_response::requests::modify_sender_stream_request>(
142 mod_data_stream_req);
143 descriptor->on_send = sender_stream_req->on_send;
144 descriptor->on_receive =
nullptr;
145 descriptor->port_number = response.get_int(
"port");
146 descriptor->endpoint = remote_server_ep.empty() ? response.get_str(
"IP") : remote_server_ep;
147 descriptor->on_error = mod_data_stream_req->on_error;
149 std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
150 proto_manager)->add_and_init_channel(std::move(descriptor), data_channel_channel_id);
152 m_corelink_channels.insert(
154 data_channel_channel_id,
155 std::make_shared<corelink_client_data_channel_descriptor>(
156 std::move(data_channel_descriptor))
159 if (rq_rp_params.completion_handler)
161 rq_rp_params.completion_handler(data_channel_channel_id,
"",
nullptr);
170 auto receiver_stream_req =
171 std::static_pointer_cast<request_response::requests::modify_receiver_stream_request>(
172 rq_rp_params.request);
174 auto proto_manager = m_protocol_managers[receiver_stream_req->protocol()];
175 data_channel_descriptor.
stream_id = response.get_int(
"streamID");
176 data_channel_descriptor.
max_tx_unit = response.get_int(
"MTU");
178 if (response().HasMember(
"streamList"))
180 auto modified_streams = response()[
"streamList"].GetArray();
181 for (
auto &stream: modified_streams)
184 stream.HasMember(
"streamID") ? stream[
"streamID"].GetInt() : 0);
188 std::shared_ptr<core::network::ip_protocol_channel_descriptor> descriptor;
189 switch (receiver_stream_req->protocol())
191#ifdef CORELINK_USE_WEBSOCKET
192 case core::network::constants::protocols::websocket():
194 descriptor = std::make_shared<core::network::websocket_protocol_per_channel_descriptor>(
195 receiver_stream_req->client_certificate_path);
199#ifdef CORELINK_USE_TCP
200 case core::network::constants::protocols::tcp():
202 descriptor = std::make_shared<core::network::tcp_protocol_per_channel_descriptor>(
true);
206#ifdef CORELINK_USE_UDP
207 case core::network::constants::protocols::udp():
209 descriptor = std::make_shared<core::network::udp_protocol_per_channel_descriptor>(
true);
216 descriptor->on_send =
nullptr;
219 descriptor->on_receive = [self]
224 if (stream_data.size() < 8)
227 auto channel = self->m_corelink_channels.find(receiving_channel_id);
228 if (channel != self->m_corelink_channels.end())
230 auto data_channel_dsc = std::static_pointer_cast<corelink_client_data_channel_descriptor>(
232 bool can_have_multipart_frame =
false;
233#ifdef CORELINK_USE_TCP
234 can_have_multipart_frame =
235 data_channel_dsc->protocol == core::network::constants::protocols::tcp;
238 uint16_t header_size = 0, data_size = 0;
242 if (can_have_multipart_frame && !data_channel_dsc->incomplete_packet_buffer.empty())
246 data_channel_dsc->incomplete_packet_buffer.begin(),
247 data_channel_dsc->incomplete_packet_buffer.end()
249 data_channel_dsc->incomplete_packet_buffer.erase(
250 data_channel_dsc->incomplete_packet_buffer.begin(),
251 data_channel_dsc->incomplete_packet_buffer.end()
255 for (
size_t pkt_pos = 0;;)
258 decltype(header_size),
259 utils::system::endianness::little>
260 (std::vector<uint8_t>(
261 stream_data.begin() + pkt_pos,
262 stream_data.begin() + pkt_pos + 2
267 utils::system::endianness::little>
268 (std::vector<uint8_t>(
269 stream_data.begin() + pkt_pos + 2,
270 stream_data.begin() + pkt_pos + 4
276 utils::system::endianness::little>
277 (std::vector<uint8_t>(
278 stream_data.begin() + pkt_pos + 4,
279 stream_data.begin() + pkt_pos + 8
283 if ((pkt_pos + 8 + header_size + data_size) <= stream_data.size())
285 std::string header_str;
286 if ((header_size > 0))
288 header_str = std::string(stream_data.begin() + pkt_pos + 8,
289 stream_data.begin() + pkt_pos + 8 + header_size);
292 std::vector<uint8_t> data_buff;
295 data_buff.insert(data_buff.end(),
296 (stream_data.begin() + pkt_pos + 8 + header_size),
297 stream_data.begin() + pkt_pos + 8 + header_size + data_size);
302 if (data_channel_dsc->user_receive_handler)
305 data_channel_dsc->user_receive_handler(
306 receiving_channel_id,
312 if (!can_have_multipart_frame)
break;
315 pkt_pos += 8 + header_size + data_size;
319 if (can_have_multipart_frame)
323 data_channel_dsc->incomplete_packet_buffer.insert(
324 data_channel_dsc->incomplete_packet_buffer.end(),
325 stream_data.begin() + pkt_pos,
334 descriptor->port_number = response.get_int(
"port");
335 descriptor->endpoint = remote_server_ep.empty() ? response.get_str(
"IP") : remote_server_ep;
336 descriptor->on_error = receiver_stream_req->on_error;
337 descriptor->on_init = [receiver_stream_req, self]
341 self->send_data(ch_id, std::vector<uint8_t>(),
utils::json());
342 if (receiver_stream_req->on_init)
343 receiver_stream_req->on_init(ch_id);
345 descriptor->on_uninit = receiver_stream_req->on_uninit;
347 if (!std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
348 proto_manager)->add_and_init_channel(std::move(descriptor), data_channel_channel_id))
350 rq_rp_params.completion_handler(data_channel_channel_id,
"Failed to create data channel!",
nullptr);
353 m_corelink_channels.insert(
355 data_channel_channel_id,
356 std::make_shared<corelink_client_data_channel_descriptor>(
357 std::move(data_channel_descriptor))
359 if (rq_rp_params.completion_handler)
361 rq_rp_params.completion_handler(data_channel_channel_id,
"",
nullptr);
367 for (
auto &proto_mgr: m_protocol_managers)
369 std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
370 proto_mgr.second)->teardown_all();
372#if defined(CORELINK_USE_TCP) || defined (CORELINK_USE_UDP)
373 m_context_manager->stop_context_manager();
396 return m_corelink_channels.at(channel_id);
408 #ifdef CORELINK_USE_TCP
409 | (1 << core::network::constants::protocols::tcp())
411 #ifdef CORELINK_USE_UDP
412 | (1 << core::network::constants::protocols::udp())
414 #ifdef CORELINK_USE_WEBSOCKET
415 | (1 << core::network::constants::protocols::websocket())
426 #
if defined (CORELINK_USE_TCP)
427 || protocols.test(core::network::constants::protocols::tcp())
429 #
if defined (CORELINK_USE_UDP)
430 || protocols.test(core::network::constants::protocols::udp())
435 m_context_manager = std::make_shared<core::network::corelink_data_xchg_raw_socket_protocol_context_manager>();
437 m_context_manager->start_context_manager();
440#if defined(CORELINK_USE_TCP)
442 if (protocols.test(core::network::constants::protocols::tcp()))
444 m_protocol_managers.insert(
446 core::network::constants::protocols::tcp(),
447 std::make_shared<core::network::corelink_data_xchg_tcp_protocol_manager>(
452#if defined(CORELINK_USE_UDP)
454 if (protocols.test(core::network::constants::protocols::udp()))
456 m_protocol_managers.insert(
458 core::network::constants::protocols::udp(),
459 std::make_shared<core::network::corelink_data_xchg_udp_protocol_manager>(
464#ifdef CORELINK_USE_WEBSOCKET
466 if (protocols.test(core::network::constants::protocols::websocket()))
469 auto ws_proto_manager = std::make_shared<core::network::corelink_data_xchg_websocket_protocol_manager>();
470 ws_proto_manager->start_context_manager();
471 m_protocol_managers.insert(
473 core::network::constants::protocols::websocket(),
474 std::move(ws_proto_manager)
490 return add_control_channel(
493 conn_info.port_number,
494 conn_info.client_certificate_path,
512 auto proto_manager = std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
513 m_protocol_managers[channel_protocol()]);
515 if (!proto_manager && (on_error !=
nullptr))
518 "Error: Could not locate a proto manager for supplied control channel protocol\n Skipping creation of control channel");
523 std::shared_ptr<corelink::core::network::ip_protocol_channel_descriptor> descriptor;
525 switch (channel_protocol())
527#ifdef CORELINK_USE_WEBSOCKET
528 case core::network::constants::protocols::websocket():
530 descriptor = std::make_shared<corelink::core::network::websocket_protocol_per_channel_descriptor>(
535#ifdef CORELINK_USE_TCP
536 case core::network::constants::protocols::tcp():
538 descriptor = std::make_shared<corelink::core::network::tcp_protocol_per_channel_descriptor>(
545 descriptor =
nullptr;
549 if (descriptor ==
nullptr)
551 throw std::runtime_error(
552 "The chosen protocol is not supported to construct a control channel");
555 descriptor->port_number = port;
556 descriptor->endpoint = endpoint;
557 descriptor->on_init = on_connection_init;
558 descriptor->on_error = on_error;
559 descriptor->on_uninit = on_connection_uninit;
563 descriptor->on_receive = [self](
571 auto descriptor = std::static_pointer_cast<corelink_client_control_channel_descriptor>(
572 self->m_corelink_channels.at(channel_id));
574 auto channel_descriptor =
575 std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
576 self->m_protocol_managers[descriptor->protocol()])->get_channel(channel_id);
583 std::string response_string(data.begin(), data.end());
584 descriptor->response_stream += response_string;
586 int16_t count = 0, current_idx = 0;
589 for (;; ++current_idx)
592 static_cast<decltype(current_idx)
>(descriptor->response_stream.size()))
594 if (descriptor->response_stream[current_idx] ==
'{') ++count;
595 else if (descriptor->response_stream[current_idx] ==
'}') --count;
597 if (count == 0 && (descriptor->response_stream.find(
'{') != std::string::npos))
599 std::string complete_response_str =
601 static_cast<decltype(current_idx)
>(descriptor->response_stream.size() - 1) ?
602 descriptor->response_stream :
603 descriptor->response_stream.substr(0, current_idx + 1);
605 descriptor->response_stream.erase(0, current_idx + 1);
612 std::string handler_id = response.
get_str(
"ID");
614 if (handler_id.empty()) handler_id = response.
get_str(
"function");
615 if (handler_id.empty())
return;
617 auto handler = self->m_completion_handlers.find(handler_id);
618 if (handler == self->m_completion_handlers.end())
620 channel_descriptor->on_error(channel_id,
621 "Could not locate a handler for the response sent by the server.");
625 switch (handler->second.function)
627 case corelink_functions::create_sender:
629 self->handle_create_sender(
633 channel_descriptor->endpoint
637 case corelink_functions::create_receiver:
639 self->handle_create_receiver(
643 channel_descriptor->endpoint
647 case corelink_functions::authenticate:
649 self->handle_authenticate(
658 if (handler->second.request_response_handler_descriptor.response_handler)
660 handler->second.request_response_handler_descriptor.response_handler(
663 handler->second.completion_handler);
669 if (response().HasMember(
"ID"))
671 self->m_completion_handlers.erase(handler_id);
672 descriptor->channel_in_use =
false;
677 if (!self->m_outbound_control_msg_queue.empty())
679 auto &req = self->m_outbound_control_msg_queue.peek();
680 self->request(req.channel_id, req.function, req.request, req.completion_handler);
681 self->m_outbound_control_msg_queue.pop();
686 channel_descriptor->on_error(
688 std::string(
"Error while processing control channel message received: ") + ex.what()
693 proto_manager->add_and_init_channel(descriptor, channel_id);
694 if (m_corelink_channels.find(channel_id) == m_corelink_channels.end())
696 m_corelink_channels.insert(
698 std::make_shared<corelink_client_control_channel_descriptor>(channel_protocol)});
715 in<std::shared_ptr<request_response::requests::corelink_server_request_base>> request_params,
720 auto function_descriptor = corelink_functions_request_response_handlers.find(func);
721 if (function_descriptor == corelink_functions_request_response_handlers.end())
722 throw std::runtime_error(
"The requested function is invalid");
725 auto control_channel_descriptor = std::static_pointer_cast<corelink_client_control_channel_descriptor>(
726 m_corelink_channels.at(control_channel_channel_id));
728 auto proto_manager = std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
729 m_protocol_managers.at(control_channel_descriptor->protocol()));
731 auto channel_descriptor =
732 std::dynamic_pointer_cast<corelink::core::network::ip_protocol_channel_descriptor>(
733 proto_manager->get_channel(control_channel_channel_id));
736 request_completion_pack p;
738 p.request_response_handler_descriptor = function_descriptor->second;
739 p.request = request_params;
740 p.completion_handler = completion_handler;
741 p.channel_id = control_channel_channel_id;
745 case corelink_functions::server_callback_on_dropped:
747 m_completion_handlers.insert({
"dropped", std::move(p)});
750 case corelink_functions::server_callback_on_update:
752 m_completion_handlers.insert({
"update", std::move(p)});
755 case corelink_functions::server_callback_on_subscribed:
757 m_completion_handlers.insert({
"subscriber", std::move(p)});
760 case corelink_functions::server_callback_on_stale:
762 m_completion_handlers.insert({
"stale", std::move(p)});
775 if (control_channel_descriptor->channel_in_use)
777 m_outbound_control_msg_queue.
push(p);
781 control_channel_descriptor->channel_in_use =
true;
782 if (function_descriptor->second.request_handler)
785 auto request = function_descriptor->second.request_handler(control_channel_descriptor,
789 std::string request_id;
790 if (!request().HasMember(
"ID"))
794 request_id = std::to_string(utils::random_numbers::get_random_int());
795 }
while (m_completion_handlers.find(request_id) != m_completion_handlers.end());
797 request.append(
"ID", request_id);
801 request_id = request()[
"ID"].GetString();
805 m_completion_handlers.insert({request_id, std::move(p)});
807 proto_manager->send_data(
808 std::vector<uint8_t>(request_string.begin(), request_string.end()),
809 control_channel_channel_id
827 rvref<std::vector<uint8_t>> packet,
832 auto data_channel_descriptor = std::static_pointer_cast<corelink_client_data_channel_descriptor>(
833 m_corelink_channels.at(data_channel_channel_id));
835 if (!data_channel_descriptor)
840 auto proto_manager = std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
841 m_protocol_managers.at(data_channel_descriptor->protocol()));
843 auto channel_descriptor =
844 std::dynamic_pointer_cast<corelink::core::network::ip_protocol_channel_descriptor>(
845 proto_manager->get_channel(data_channel_channel_id));
847 const std::string header_string = headers.to_string();
848 const size_t data_size = packet.size();
851 if ((data_size + header_string.size() + 8) > data_channel_descriptor->max_tx_unit)
853 if (channel_descriptor->on_error)
855 std::stringstream err;
856 err <<
"Unable to send data packet as MTU for this channel is "
857 << data_channel_descriptor->max_tx_unit
858 <<
", whereas attempted to pass " << (data_size + header_string.size())
859 <<
" bytes of data excluding corelink headers.\n";
860 channel_descriptor->on_error(1, err.str());
866 decltype(header_string.size()),
869 (header_string.size());
877 decltype(data_channel_descriptor->stream_id),
880 (data_channel_descriptor->stream_id);
887 if (!header_string.empty())
891 std::make_move_iterator(header_string.begin()),
892 std::make_move_iterator(header_string.end())
896 packet.insert(packet.begin(), std::make_move_iterator(stream_id_bytes.begin()),
897 std::make_move_iterator(stream_id_bytes.end()));
899 packet.insert(packet.begin(), std::make_move_iterator(data_size_bytes.begin()),
900 std::make_move_iterator(data_size_bytes.end()));
902 packet.insert(packet.begin(), std::make_move_iterator(header_size_bytes.begin()),
903 std::make_move_iterator(header_size_bytes.end()));
904 proto_manager->send_data(std::move(packet), data_channel_channel_id);