2using System.Collections.Concurrent;
4using System.Net.Security;
5using System.Net.WebSockets;
6using System.Security.Cryptography.X509Certificates;
9using System.Threading.Tasks;
24 private ClientWebSocket ws;
25 private UTF8Encoding encoder;
26 private const UInt64 MAXREADSIZE = 1 * 1024 * 1024;
28 private Uri serverUri;
32 public BlockingCollection<ArraySegment<byte>>
sendQueue {
get; }
35 private Thread receiveThread {
get;
set; }
36 private Thread sendThread {
get;
set; }
44 encoder =
new UTF8Encoding();
45 ws =
new ClientWebSocket();
51 serverUri =
new Uri(
"wss://" + serverURL +
":20012");
54 receiveThread =
new Thread(runReceive);
55 receiveThread.Start();
57 sendQueue =
new BlockingCollection<ArraySegment<byte>>();
58 sendThread =
new Thread(runSend);
74 ws.ConnectAsync(serverUri, CancellationToken.None);
78 Task.Delay(100).Wait();
99 ws.CloseAsync(WebSocketCloseStatus.NormalClosure,
null, CancellationToken.None);
100 while (!(ws.State == WebSocketState.CloseReceived))
103 Task.Delay(50).Wait();
115 return ws.State == WebSocketState.Connecting;
124 return ws.State == WebSocketState.Open;
137 byte[] buffer = encoder.GetBytes(message);
139 var sendBuf =
new ArraySegment<byte>(buffer);
157 throw new Exception(
"Timeout error");
159 cqueue.TryDequeue(out msg);
166 private async
void runSend()
169 ArraySegment<byte> msg;
176 await ws.SendAsync(msg, WebSocketMessageType.Text,
true , CancellationToken.None);
190 private async Task<string> receive(UInt64 maxSize = MAXREADSIZE)
193 byte[] buf =
new byte[4 * 1024];
194 var ms =
new MemoryStream();
195 ArraySegment<byte> arrayBuf =
new ArraySegment<byte>(buf);
196 WebSocketReceiveResult chunkResult =
null;
202 chunkResult = await ws.ReceiveAsync(arrayBuf, CancellationToken.None);
203 ms.Write(arrayBuf.Array, arrayBuf.Offset, chunkResult.Count);
205 if ((UInt64)(chunkResult.Count) > MAXREADSIZE)
207 Console.Error.WriteLine(
"Warning: Message is bigger than expected!");
209 }
while (!chunkResult.EndOfMessage);
210 ms.Seek(0, SeekOrigin.Begin);
213 if (chunkResult.MessageType == WebSocketMessageType.Text)
224 private async
void runReceive()
226 Control.Print(
"WebSocket Message Receiver looping.");
231 result = await receive();
232 if (result !=
null && result.Length > 0)
234 JSONNode response = JSON.Parse(result);
235 if (response[
"function"])
237 string function = response[
"function"].Value;
238 if (
function.Equals(
"subscriber"))
242 else if (
function.Equals(
"update"))
246 else if (
function.Equals(
"stale"))
250 else if (
function.Equals(
"dropped"))
256 Control.Print(
"Unknown callback. Maybe this library is outdated?");
267 Task.Delay(50).Wait();
283 string readString =
"";
284 if (encoding == Encoding.UTF8)
286 using (var reader =
new StreamReader(ms, encoding))
288 readString = reader.ReadToEnd();
Singleton class to communicate with Corelink.
static int timeoutIterations
In some callbacks, the code is waiting on a response from the server. This specifies the number of 10...
static void Print(string message)
Abstract class for a control stream that contains the function prototypes for the server callback fun...
virtual void onUpdate(string msg)
virtual void onSubscriber(string msg)
virtual void onStale(string msg)
virtual void onDropped(string msg)
CorelinkWebSocket(string serverURL)
Initializes a new instance of the T:WsClient class.
bool isConnectionOpen()
Return if connection with server is open.
BlockingCollection< ArraySegment< byte > > sendQueue
ConcurrentQueue< String > receiveQueue
bool isConnecting()
Return if is connecting to the server.
override void sendMessage(string message)
Method used to send a message to the server.
static string streamToString(MemoryStream ms, Encoding encoding)
Converts memory stream into string.
async Task connect()
Method which connects client to the server.
override void stopConnection()