Corelink CPP Client
Corelink C++ client library
 
Loading...
Searching...
No Matches
corelink_client.hpp
Go to the documentation of this file.
1#pragma once
2
6#include "utils/json.hpp"
8#include "utils/system.hpp"
16
17namespace corelink
18{
19 namespace client
20 {
21 class CORELINK_EXPORT corelink_classic_client
22 {
23 private:
24 struct CORELINK_EXPORT request_completion_pack
25 {
27 corelink_functions function;
28 request_response_handler request_response_handler_descriptor{};
29 std::shared_ptr<request_response::requests::corelink_server_request_base> request{};
31
32 request_completion_pack()
33 {}
34
35 request_completion_pack(clvref<request_completion_pack> rhs) :
36 channel_id(rhs.channel_id), function(rhs.function),
37 request_response_handler_descriptor(rhs.request_response_handler_descriptor),
38 request(rhs.request),
39 completion_handler(rhs.completion_handler)
40 {}
41
42 request_completion_pack(rvref<request_completion_pack> rhs) noexcept:
43 channel_id(rhs.channel_id), function(rhs.function),
44 request_response_handler_descriptor(std::move(rhs.request_response_handler_descriptor)),
45 request(std::move(rhs.request)),
46 completion_handler(std::move(rhs.completion_handler))
47 {}
48 };
49
54 std::shared_ptr<core::network::corelink_data_xchg_protocol>> m_protocol_managers{};
55
59 std::unordered_map<core::network::channel_id_type,
60 std::shared_ptr<corelink_client_channel_base_descriptor>> m_corelink_channels{};
61
62#if defined(CORELINK_USE_TCP) || defined(CORELINK_USE_UDP)
66 std::shared_ptr<core::network::corelink_data_xchg_raw_socket_protocol_context_manager> m_context_manager{};
67#endif
68
72 std::unordered_map<std::string, request_completion_pack> m_completion_handlers{};
73
75
76 void handle_authenticate(core::network::channel_id_type channel_id,
77 in<utils::json> response,
79 )
80 {
81 auto response_msg = std::make_shared<request_response::responses::corelink_server_response_base>();
82 response_msg->message = response.get_str("message");
83 response_msg->status_code = response.get_int("statusCode", -1);
84
85 if (response_msg->status_code == 0)
86 {
87 auto control_channel_descriptor = std::static_pointer_cast<corelink_client_control_channel_descriptor>(
88 m_corelink_channels.at(channel_id));
89 control_channel_descriptor->auth_token = response()["token"].GetString();
90 control_channel_descriptor->client_ip = response()["IP"].GetString();
91 }
92
93 if (rq_rp_params.completion_handler)
94 rq_rp_params.completion_handler(channel_id, "", response_msg);
95 }
96
97 void handle_create_sender(core::network::channel_id_type /*channel_id*/,
98 in<utils::json> response,
99 in<request_completion_pack> rq_rp_params,
100 in<std::string> remote_server_ep)
101 {
102 auto mod_data_stream_req =
103 std::static_pointer_cast<request_response::requests::modify_data_stream_request_base>(
104 rq_rp_params.request);
105 auto proto_manager = m_protocol_managers[mod_data_stream_req->protocol()];
106 corelink_client_data_channel_descriptor data_channel_descriptor(mod_data_stream_req->protocol);
107 data_channel_descriptor.stream_id = response.get_int("streamID");
108 data_channel_descriptor.max_tx_unit = response.get_int("MTU");
109
110 std::shared_ptr<core::network::ip_protocol_channel_descriptor> descriptor;
111 switch (mod_data_stream_req->protocol())
112 {
113#ifdef CORELINK_USE_WEBSOCKET
114 case core::network::constants::protocols::websocket():
115 {
116
117 descriptor = std::make_shared<core::network::websocket_protocol_per_channel_descriptor>(
118 mod_data_stream_req->client_certificate_path);
119 break;
120 }
121#endif
122#ifdef CORELINK_USE_TCP
123 case core::network::constants::protocols::tcp():
124 {
125 descriptor = std::make_shared<core::network::tcp_protocol_per_channel_descriptor>();
126 break;
127 }
128#endif
129#ifdef CORELINK_USE_UDP
130 case core::network::constants::protocols::udp():
131 {
132 descriptor = std::make_shared<core::network::udp_protocol_per_channel_descriptor>();
133 break;
134 }
135#endif
136 default:
137 break;
138 }
139 descriptor->on_init = mod_data_stream_req->on_init;
140 descriptor->on_uninit = mod_data_stream_req->on_uninit;
141 auto sender_stream_req = std::static_pointer_cast<request_response::requests::modify_sender_stream_request>(
142 mod_data_stream_req);
143 descriptor->on_send = sender_stream_req->on_send;
144 descriptor->on_receive = nullptr;
145 descriptor->port_number = response.get_int("port");
146 descriptor->endpoint = remote_server_ep.empty() ? response.get_str("IP") : remote_server_ep;
147 descriptor->on_error = mod_data_stream_req->on_error;
148 core::network::channel_id_type data_channel_channel_id;
149 std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
150 proto_manager)->add_and_init_channel(std::move(descriptor), data_channel_channel_id);
151
152 m_corelink_channels.insert(
153 {
154 data_channel_channel_id,
155 std::make_shared<corelink_client_data_channel_descriptor>(
156 std::move(data_channel_descriptor))
157 });
158
159 if (rq_rp_params.completion_handler)
160 {
161 rq_rp_params.completion_handler(data_channel_channel_id, "", nullptr);
162 }
163 }
164
165 void handle_create_receiver(core::network::channel_id_type /*channel_id*/,
166 in<utils::json> response,
167 in<request_completion_pack> rq_rp_params,
168 in<std::string> remote_server_ep)
169 {
170 auto receiver_stream_req =
171 std::static_pointer_cast<request_response::requests::modify_receiver_stream_request>(
172 rq_rp_params.request);
173 corelink_client_data_channel_descriptor data_channel_descriptor(receiver_stream_req->protocol);
174 auto proto_manager = m_protocol_managers[receiver_stream_req->protocol()];
175 data_channel_descriptor.stream_id = response.get_int("streamID");
176 data_channel_descriptor.max_tx_unit = response.get_int("MTU");
177
178 if (response().HasMember("streamList"))
179 {
180 auto modified_streams = response()["streamList"].GetArray();
181 for (auto &stream: modified_streams)
182 {
183 data_channel_descriptor.stream_list.emplace_back(
184 stream.HasMember("streamID") ? stream["streamID"].GetInt() : 0);
185 }
186 }
187
188 std::shared_ptr<core::network::ip_protocol_channel_descriptor> descriptor;
189 switch (receiver_stream_req->protocol())
190 {
191#ifdef CORELINK_USE_WEBSOCKET
192 case core::network::constants::protocols::websocket():
193 {
194 descriptor = std::make_shared<core::network::websocket_protocol_per_channel_descriptor>(
195 receiver_stream_req->client_certificate_path);
196 break;
197 }
198#endif
199#ifdef CORELINK_USE_TCP
200 case core::network::constants::protocols::tcp():
201 {
202 descriptor = std::make_shared<core::network::tcp_protocol_per_channel_descriptor>(true);
203 break;
204 }
205#endif
206#ifdef CORELINK_USE_UDP
207 case core::network::constants::protocols::udp():
208 {
209 descriptor = std::make_shared<core::network::udp_protocol_per_channel_descriptor>(true);
210 break;
211 }
212#endif
213 default:
214 break;
215 }
216 descriptor->on_send = nullptr;
217 data_channel_descriptor.user_receive_handler = receiver_stream_req->on_receive;
218 auto self = this;
219 descriptor->on_receive = [self]
220 (core::network::channel_id_type receiving_channel_id,
221 out<std::vector<uint8_t>> stream_data)
222 {
223 // if we received a "packet" less than 8 bytes in size, drop it.
224 if (stream_data.size() < 8)
225 return;
226
227 auto channel = self->m_corelink_channels.find(receiving_channel_id);
228 if (channel != self->m_corelink_channels.end())
229 {
230 auto data_channel_dsc = std::static_pointer_cast<corelink_client_data_channel_descriptor>(
231 channel->second);
232 bool can_have_multipart_frame = false;
233#ifdef CORELINK_USE_TCP
234 can_have_multipart_frame =
235 data_channel_dsc->protocol == core::network::constants::protocols::tcp;
236#endif
238 uint16_t header_size = 0, data_size = 0;
239 // if we have anything pending previously, coalesce that with
240 // the new frame and see if things work out.
241 // if not, we throw it out it
242 if (can_have_multipart_frame && !data_channel_dsc->incomplete_packet_buffer.empty())
243 {
244 stream_data.insert(
245 stream_data.begin(),
246 data_channel_dsc->incomplete_packet_buffer.begin(),
247 data_channel_dsc->incomplete_packet_buffer.end()
248 );
249 data_channel_dsc->incomplete_packet_buffer.erase(
250 data_channel_dsc->incomplete_packet_buffer.begin(),
251 data_channel_dsc->incomplete_packet_buffer.end()
252 );
253 }
254
255 for (size_t pkt_pos = 0;;)
256 {
258 decltype(header_size),
259 utils::system::endianness::little>
260 (std::vector<uint8_t>(
261 stream_data.begin() + pkt_pos,
262 stream_data.begin() + pkt_pos + 2
263 )
264 );
266 decltype(data_size),
267 utils::system::endianness::little>
268 (std::vector<uint8_t>(
269 stream_data.begin() + pkt_pos + 2,
270 stream_data.begin() + pkt_pos + 4
271 )
272 );
273
275 decltype(stream_id),
276 utils::system::endianness::little>
277 (std::vector<uint8_t>(
278 stream_data.begin() + pkt_pos + 4,
279 stream_data.begin() + pkt_pos + 8
280 )
281 );
282
283 if ((pkt_pos + 8 + header_size + data_size) <= stream_data.size())
284 {
285 std::string header_str;
286 if ((header_size > 0))
287 {
288 header_str = std::string(stream_data.begin() + pkt_pos + 8,
289 stream_data.begin() + pkt_pos + 8 + header_size);
290 }
291
292 std::vector<uint8_t> data_buff;
293 if (data_size > 0)
294 {
295 data_buff.insert(data_buff.end(),
296 (stream_data.begin() + pkt_pos + 8 + header_size),
297 stream_data.begin() + pkt_pos + 8 + header_size + data_size);
298 }
299
300
301 // call the user defined receive function
302 if (data_channel_dsc->user_receive_handler)
303 {
304 // call the user callback
305 data_channel_dsc->user_receive_handler(
306 receiving_channel_id,
307 stream_id,
308 utils::json(header_str),
309 data_buff
310 );
311 }
312 if (!can_have_multipart_frame) break;
313
314 // set the packet pos to next packet
315 pkt_pos += 8 + header_size + data_size;
316 }
317 else
318 {
319 if (can_have_multipart_frame)
320 {
321 // looks like we got an incomplete chunk in this frame.
322 // we take the incomplete buffer items and append it to a temp buffer
323 data_channel_dsc->incomplete_packet_buffer.insert(
324 data_channel_dsc->incomplete_packet_buffer.end(),
325 stream_data.begin() + pkt_pos,
326 stream_data.end());
327 }
328 break;
329 }
330 }
331 }
332 };
333
334 descriptor->port_number = response.get_int("port");
335 descriptor->endpoint = remote_server_ep.empty() ? response.get_str("IP") : remote_server_ep;
336 descriptor->on_error = receiver_stream_req->on_error;
337 descriptor->on_init = [receiver_stream_req, self]
339 {
340 // ping the server
341 self->send_data(ch_id, std::vector<uint8_t>(), utils::json());
342 if (receiver_stream_req->on_init)
343 receiver_stream_req->on_init(ch_id);
344 };
345 descriptor->on_uninit = receiver_stream_req->on_uninit;
346 core::network::channel_id_type data_channel_channel_id;
347 if (!std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
348 proto_manager)->add_and_init_channel(std::move(descriptor), data_channel_channel_id))
349 {
350 rq_rp_params.completion_handler(data_channel_channel_id, "Failed to create data channel!", nullptr);
351 return;
352 }
353 m_corelink_channels.insert(
354 {
355 data_channel_channel_id,
356 std::make_shared<corelink_client_data_channel_descriptor>(
357 std::move(data_channel_descriptor))
358 });
359 if (rq_rp_params.completion_handler)
360 {
361 rq_rp_params.completion_handler(data_channel_channel_id, "", nullptr);
362 }
363 }
364
365 void destroy()
366 {
367 for (auto &proto_mgr: m_protocol_managers)
368 {
369 std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
370 proto_mgr.second)->teardown_all();
371 }
372#if defined(CORELINK_USE_TCP) || defined (CORELINK_USE_UDP)
373 m_context_manager->stop_context_manager();
374#endif
375 }
376
377 public:
379
381
383
385
387
395 {
396 return m_corelink_channels.at(channel_id);
397 }
398
407 0
408 #ifdef CORELINK_USE_TCP
409 | (1 << core::network::constants::protocols::tcp())
410 #endif
411 #ifdef CORELINK_USE_UDP
412 | (1 << core::network::constants::protocols::udp())
413 #endif
414 #ifdef CORELINK_USE_WEBSOCKET
415 | (1 << core::network::constants::protocols::websocket())
416#endif
417 )
418 {
419 if (protocols == 0)
420 return false;
421
422 // check if it is a raw socket protocol from ASIO C++
423 // if yes, then switch on the ASIO context manager "service"
424
425 if (false
426 #if defined (CORELINK_USE_TCP)
427 || protocols.test(core::network::constants::protocols::tcp())
428 #endif
429 #if defined (CORELINK_USE_UDP)
430 || protocols.test(core::network::constants::protocols::udp())
431#endif
432 )
433 {
434 // create context manager
435 m_context_manager = std::make_shared<core::network::corelink_data_xchg_raw_socket_protocol_context_manager>();
436 // start it
437 m_context_manager->start_context_manager();
438 }
439
440#if defined(CORELINK_USE_TCP)
441 // check if we want to use TCP protocol
442 if (protocols.test(core::network::constants::protocols::tcp()))
443 {
444 m_protocol_managers.insert(
445 {
446 core::network::constants::protocols::tcp(),
447 std::make_shared<core::network::corelink_data_xchg_tcp_protocol_manager>(
448 m_context_manager)
449 });
450 }
451#endif
452#if defined(CORELINK_USE_UDP)
453 // check if we want to use the UDP protocol
454 if (protocols.test(core::network::constants::protocols::udp()))
455 {
456 m_protocol_managers.insert(
457 {
458 core::network::constants::protocols::udp(),
459 std::make_shared<core::network::corelink_data_xchg_udp_protocol_manager>(
460 m_context_manager)
461 });
462 }
463#endif
464#ifdef CORELINK_USE_WEBSOCKET
465 // check if we want to use the Websocket protocol.
466 if (protocols.test(core::network::constants::protocols::websocket()))
467 {
468
469 auto ws_proto_manager = std::make_shared<core::network::corelink_data_xchg_websocket_protocol_manager>();
470 ws_proto_manager->start_context_manager();
471 m_protocol_managers.insert(
472 {
473 core::network::constants::protocols::websocket(),
474 std::move(ws_proto_manager)
475 });
476 }
477#endif
478
479 return true;
480 }
481
482
485 in<core::network::on_error_type_1> on_error = nullptr,
486 in<core::network::on_init_type_1> on_connection_init = nullptr,
487 in<core::network::on_uninit_type_1> on_connection_uninit = nullptr
488 )
489 {
490 return add_control_channel(
491 conn_info.protocol,
492 conn_info.endpoint,
493 conn_info.port_number,
494 conn_info.client_certificate_path,
495 on_error,
496 on_connection_init,
497 on_connection_uninit
498 );
499 }
500
503 in<std::string> endpoint,
504 in<uint16_t> port,
505 in<std::string> cert_path = "",
506 in<core::network::on_error_type_1> on_error = nullptr,
507 in<core::network::on_init_type_1> on_connection_init = nullptr,
508 in<core::network::on_uninit_type_1> on_connection_uninit = nullptr
509 )
510 {
511 // step 1: find the protocol using which we need to set up the control channel.
512 auto proto_manager = std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
513 m_protocol_managers[channel_protocol()]);
514
515 if (!proto_manager && (on_error != nullptr))
516 {
517 on_error(0,
518 "Error: Could not locate a proto manager for supplied control channel protocol\n Skipping creation of control channel");
519 return 0;
520 }
521
522 // step 2: construct the appropriate descriptor
523 std::shared_ptr<corelink::core::network::ip_protocol_channel_descriptor> descriptor;
524 core::network::channel_id_type channel_id = 0;
525 switch (channel_protocol())
526 {
527#ifdef CORELINK_USE_WEBSOCKET
528 case core::network::constants::protocols::websocket():
529 {
530 descriptor = std::make_shared<corelink::core::network::websocket_protocol_per_channel_descriptor>(
531 cert_path);
532 break;
533 }
534#endif
535#ifdef CORELINK_USE_TCP
536 case core::network::constants::protocols::tcp():
537 {
538 descriptor = std::make_shared<corelink::core::network::tcp_protocol_per_channel_descriptor>(
539 true,
540 true);
541 break;
542 }
543#endif
544 default:
545 descriptor = nullptr;
546 break;
547 }
548
549 if (descriptor == nullptr)
550 {
551 throw std::runtime_error(
552 "The chosen protocol is not supported to construct a control channel");
553 }
554 // assign all the common parameters
555 descriptor->port_number = port;
556 descriptor->endpoint = endpoint;
557 descriptor->on_init = on_connection_init;
558 descriptor->on_error = on_error;
559 descriptor->on_uninit = on_connection_uninit;
560 // this is slightly involved, but I have tried to make it is as straightforward for a first write.
561 // I am sure I can make this less verbose in a future version.
562 auto self = this;
563 descriptor->on_receive = [self](
566 )
567 {
568 if (data.empty())
569 return;
570
571 auto descriptor = std::static_pointer_cast<corelink_client_control_channel_descriptor>(
572 self->m_corelink_channels.at(channel_id));
573
574 auto channel_descriptor =
575 std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
576 self->m_protocol_managers[descriptor->protocol()])->get_channel(channel_id);
577
578 // the data that we got from the server is not guaranteed to be the whole message.
579 // or in fact it could be a mixture of multiple responses
580 // we collect it in a stream, and then, split it based on some obvious JSON parsing rules.
581
582 corelink::commons::exclusive_lock lock(descriptor->response_stream_sync);
583 std::string response_string(data.begin(), data.end());
584 descriptor->response_stream += response_string;
585
586 int16_t count = 0, current_idx = 0;
587 try
588 {
589 for (;; ++current_idx)
590 {
591 if (current_idx >=
592 static_cast<decltype(current_idx)>(descriptor->response_stream.size()))
593 break;
594 if (descriptor->response_stream[current_idx] == '{') ++count;
595 else if (descriptor->response_stream[current_idx] == '}') --count;
596
597 if (count == 0 && (descriptor->response_stream.find('{') != std::string::npos))
598 {
599 std::string complete_response_str =
600 current_idx ==
601 static_cast<decltype(current_idx)>(descriptor->response_stream.size() - 1) ?
602 descriptor->response_stream :
603 descriptor->response_stream.substr(0, current_idx + 1);
604
605 descriptor->response_stream.erase(0, current_idx + 1);
606 current_idx = -1;
607
608 // on receiving a response, the first step is to try and read the JSON response.
609 // This might be partial, but we need to make a try.
610 utils::json response(complete_response_str);
611 // locate a response handler
612 std::string handler_id = response.get_str("ID");
613 // check if this is a response to a request, or a server init function
614 if (handler_id.empty()) handler_id = response.get_str("function");
615 if (handler_id.empty()) return; // if handler is empty at this point that means there is something wrong. return
616
617 auto handler = self->m_completion_handlers.find(handler_id);
618 if (handler == self->m_completion_handlers.end())
619 {
620 channel_descriptor->on_error(channel_id,
621 "Could not locate a handler for the response sent by the server.");
622 return;
623 }
624
625 switch (handler->second.function)
626 {
627 case corelink_functions::create_sender:
628 {
629 self->handle_create_sender(
630 channel_id,
631 response,
632 handler->second,
633 channel_descriptor->endpoint
634 );
635 break;
636 }
637 case corelink_functions::create_receiver:
638 {
639 self->handle_create_receiver(
640 channel_id,
641 response,
642 handler->second,
643 channel_descriptor->endpoint
644 );
645 break;
646 }
647 case corelink_functions::authenticate:
648 {
649 self->handle_authenticate(
650 channel_id,
651 response,
652 handler->second
653 );
654 break;
655 }
656 default:
657 {
658 if (handler->second.request_response_handler_descriptor.response_handler)
659 {
660 handler->second.request_response_handler_descriptor.response_handler(
661 channel_id,
662 response,
663 handler->second.completion_handler);
664 }
665 break;
666 }
667 }
668
669 if (response().HasMember("ID"))
670 {
671 self->m_completion_handlers.erase(handler_id);
672 descriptor->channel_in_use = false;
673 }
674 }
675 }
676
677 if (!self->m_outbound_control_msg_queue.empty())
678 {
679 auto &req = self->m_outbound_control_msg_queue.peek();
680 self->request(req.channel_id, req.function, req.request, req.completion_handler);
681 self->m_outbound_control_msg_queue.pop();
682 }
683 }
684 catch (lvref<std::exception> ex)
685 {
686 channel_descriptor->on_error(
687 1,
688 std::string("Error while processing control channel message received: ") + ex.what()
689 );
690 }
691 };
692
693 proto_manager->add_and_init_channel(descriptor, channel_id);
694 if (m_corelink_channels.find(channel_id) == m_corelink_channels.end())
695 {
696 m_corelink_channels.insert(
697 {channel_id,
698 std::make_shared<corelink_client_control_channel_descriptor>(channel_protocol)});
699 }
700
701 return channel_id;
702 }
703
713 core::network::channel_id_type control_channel_channel_id,
715 in<std::shared_ptr<request_response::requests::corelink_server_request_base>> request_params,
717 )
718 {
719 // locate the request response handler.
720 auto function_descriptor = corelink_functions_request_response_handlers.find(func);
721 if (function_descriptor == corelink_functions_request_response_handlers.end())
722 throw std::runtime_error("The requested function is invalid");
723
724 // locate the control channel we wish to send a control message on
725 auto control_channel_descriptor = std::static_pointer_cast<corelink_client_control_channel_descriptor>(
726 m_corelink_channels.at(control_channel_channel_id));
727 // locate the protocol manager
728 auto proto_manager = std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
729 m_protocol_managers.at(control_channel_descriptor->protocol()));
730 // locate the channel based on the
731 auto channel_descriptor =
732 std::dynamic_pointer_cast<corelink::core::network::ip_protocol_channel_descriptor>(
733 proto_manager->get_channel(control_channel_channel_id));
734
735 // setup data for managing the callback.
736 request_completion_pack p;
737 p.function = func;
738 p.request_response_handler_descriptor = function_descriptor->second;
739 p.request = request_params;
740 p.completion_handler = completion_handler;
741 p.channel_id = control_channel_channel_id;
742
743 switch (func)
744 {
745 case corelink_functions::server_callback_on_dropped:
746 {
747 m_completion_handlers.insert({"dropped", std::move(p)});
748 break;
749 }
750 case corelink_functions::server_callback_on_update:
751 {
752 m_completion_handlers.insert({"update", std::move(p)});
753 break;
754 }
755 case corelink_functions::server_callback_on_subscribed:
756 {
757 m_completion_handlers.insert({"subscriber", std::move(p)});
758 break;
759 }
760 case corelink_functions::server_callback_on_stale:
761 {
762 m_completion_handlers.insert({"stale", std::move(p)});
763 break;
764 }
765 default:
766 {
767 // Okay I detest the next 4 lines, however, currently this is the only solution I have, without opening
768 // multiple sockets
769 // this is specifically required for TCP based control channel, but I don't want
770 // because of 2 reasons. First, if you rapidly send out multiple requests, TCP write buffer doesn't flush
771 // and the kernel sometimes bunches messages together (c.f. Nagel's algorithm).
772 // As a result the server sometimes ignores our messages
773 // so what we do is build a request-response style mechanism in such a way that we only dispatch our next
774 // request when we get a response. It seems to work well without any significant drawbacks.
775 if (control_channel_descriptor->channel_in_use)
776 {
777 m_outbound_control_msg_queue.push(p);
778 return;
779 }
780
781 control_channel_descriptor->channel_in_use = true;
782 if (function_descriptor->second.request_handler)
783 {
784 // generate request data
785 auto request = function_descriptor->second.request_handler(control_channel_descriptor,
786 request_params);
787
788 // check if the request data has the ID. If not, add a randomly generated ID
789 std::string request_id;
790 if (!request().HasMember("ID"))
791 {
792 do
793 {
794 request_id = std::to_string(utils::random_numbers::get_random_int());
795 } while (m_completion_handlers.find(request_id) != m_completion_handlers.end());
796
797 request.append("ID", request_id);
798 }
799 else
800 {
801 request_id = request()["ID"].GetString();
802 }
803 // generate the request string
804 in<std::string> request_string = request.to_string();
805 m_completion_handlers.insert({request_id, std::move(p)});
806 // post the request to the control channel protocol manager
807 proto_manager->send_data(
808 std::vector<uint8_t>(request_string.begin(), request_string.end()),
809 control_channel_channel_id
810 );
811 }
812 break;
813 }
814 }
815 }
816
826 core::network::channel_id_type data_channel_channel_id,
827 rvref<std::vector<uint8_t>> packet,
828 rvref<utils::json> headers
829 )
830 {
831 // locate the control channel we wish to send a control message on
832 auto data_channel_descriptor = std::static_pointer_cast<corelink_client_data_channel_descriptor>(
833 m_corelink_channels.at(data_channel_channel_id));
834
835 if (!data_channel_descriptor)
836 {
837
838 }
839 // locate the protocol manager
840 auto proto_manager = std::dynamic_pointer_cast<core::network::corelink_data_xchg_ip_proto_base>(
841 m_protocol_managers.at(data_channel_descriptor->protocol()));
842 // locate the channel based on the
843 auto channel_descriptor =
844 std::dynamic_pointer_cast<corelink::core::network::ip_protocol_channel_descriptor>(
845 proto_manager->get_channel(data_channel_channel_id));
846
847 const std::string header_string = headers.to_string();
848 const size_t data_size = packet.size();
849
850 // if we try to send more data than the MTU, then flag the packet and drop it.
851 if ((data_size + header_string.size() + 8) > data_channel_descriptor->max_tx_unit)
852 {
853 if (channel_descriptor->on_error)
854 {
855 std::stringstream err;
856 err << "Unable to send data packet as MTU for this channel is "
857 << data_channel_descriptor->max_tx_unit
858 << ", whereas attempted to pass " << (data_size + header_string.size())
859 << " bytes of data excluding corelink headers.\n";
860 channel_descriptor->on_error(1, err.str());
861 }
862 return;
863 }
864
865 auto header_size_bytes = corelink::utils::system::to_bytes<
866 decltype(header_string.size()),
867 2,
869 (header_string.size());
870 auto data_size_bytes = corelink::utils::system::to_bytes<
871 decltype(data_size),
872 2,
874 (data_size);
875
876 auto stream_id_bytes = corelink::utils::system::to_bytes<
877 decltype(data_channel_descriptor->stream_id),
878 4,
880 (data_channel_descriptor->stream_id);
881
882
883 // We are stacking elements. The order is
884 // |2 bytes |2 bytes |4 bytes |header_len bytes |data_len bytes|
885 // |header_len|data_len|stream_id|header |data |
886 // All data has to be LE aligned.
887 if (!header_string.empty())
888 {
889 packet.insert(
890 packet.begin(),
891 std::make_move_iterator(header_string.begin()),
892 std::make_move_iterator(header_string.end())
893 );
894 }
895 // insert the stream ID
896 packet.insert(packet.begin(), std::make_move_iterator(stream_id_bytes.begin()),
897 std::make_move_iterator(stream_id_bytes.end()));
898 // insert data size
899 packet.insert(packet.begin(), std::make_move_iterator(data_size_bytes.begin()),
900 std::make_move_iterator(data_size_bytes.end()));
901 // insert header size
902 packet.insert(packet.begin(), std::make_move_iterator(header_size_bytes.begin()),
903 std::make_move_iterator(header_size_bytes.end()));
904 proto_manager->send_data(std::move(packet), data_channel_channel_id);
905 }
906
908 {
909 destroy();
910 }
911 };
912 }
913}