C# Client 0.0.0.7
C# Library to interface with Corelink
Loading...
Searching...
No Matches
CorelinkWebSocket.cs
Go to the documentation of this file.
1using System;
2using System.Collections.Concurrent;
3using System.IO;
4using System.Net.Security;
5using System.Net.WebSockets;
6using System.Security.Cryptography.X509Certificates;
7using System.Text;
8using System.Threading;
9using System.Threading.Tasks;
10using SimpleJSON;
11using UnityEngine;
12
13namespace CoreLink
14{
15 // Code from https://www.patrykgalach.com/2019/11/11/implementing-websocket-in-unity/
17 {
22
23 // WebSocket
24 private ClientWebSocket ws;
25 private UTF8Encoding encoder; // For websocket text message encoding.
26 private const UInt64 MAXREADSIZE = 1 * 1024 * 1024;
27 // Server address
28 private Uri serverUri;
29
30 // Queues
31 public ConcurrentQueue<String> receiveQueue { get; }
32 public BlockingCollection<ArraySegment<byte>> sendQueue { get; }
33
34 // Threads
35 private Thread receiveThread { get; set; }
36 private Thread sendThread { get; set; }
37
42 public CorelinkWebSocket(string serverURL)
43 {
44 encoder = new UTF8Encoding();
45 ws = new ClientWebSocket();
46 //X509CertificateCollection certs = ws.Options.ClientCertificates;
47 //certs.Add(new X509Certificate("D:\\School\\College\\GradSchool\\HolodeckWork\\CoreLink\\networktest\\server\\config\\ca-crt.pem"));
48 //certs.Add(new X509Certificate("D:\\School\\College\\GradSchool\\HolodeckWork\\CoreLink\\networktest\\server\\config\\server-crt.pem"));
49 //ws.Options.ClientCertificates = certs;
50
51 serverUri = new Uri("wss://" + serverURL + ":20012");
52
53 receiveQueue = new ConcurrentQueue<string>();
54 receiveThread = new Thread(runReceive);
55 receiveThread.Start();
56
57 sendQueue = new BlockingCollection<ArraySegment<byte>>();
58 sendThread = new Thread(runSend);
59 sendThread.Start();
60 }
61
66 public async Task connect()
67 {
68
69 //ws.Options.RemoteCertificateValidationCallback = ValidateServerCertificate;
70 //ws.RemoteCertificateValidationCallback
71 //certs.Add()
72 Control.Print("Connecting to: " + serverUri);
73 // Removed the await keyword to end this gracefully if it can't connect instead of crashing unity
74 ws.ConnectAsync(serverUri, CancellationToken.None);
75 while (isConnecting())
76 {
77 Control.Print("Waiting to connect...");
78 Task.Delay(100).Wait();
79 }
80 Control.Print("Connect status: " + ws.State);
81 }
83 //public static bool ValidateServerCertificate(
84 // object sender,
85 // X509Certificate certificate,
86 // X509Chain chain,
87 // SslPolicyErrors sslPolicyErrors)
88 //{
89 // if (sslPolicyErrors == SslPolicyErrors.None)
90 // return true;
91
92 // Console.WriteLine("Certificate error: {0}", sslPolicyErrors);
93
94 // // Do not allow this client to communicate with unauthenticated servers.
95 // return false;
96 //}
97 override public void stopConnection()
98 {
99 ws.CloseAsync(WebSocketCloseStatus.NormalClosure, null, CancellationToken.None);
100 while (!(ws.State == WebSocketState.CloseReceived))
101 {
102 Control.Print("Waiting to disconnect...");
103 Task.Delay(50).Wait();
104 }
105 Control.Print("Connect status: " + ws.State);
106 }
107 #region [Status]
108
113 public bool isConnecting()
114 {
115 return ws.State == WebSocketState.Connecting;
116 }
117
122 public bool isConnectionOpen()
123 {
124 return ws.State == WebSocketState.Open;
125 }
126
127 #endregion
128
129 #region [Send]
130
135 override public void sendMessage(string message)
136 {
137 byte[] buffer = encoder.GetBytes(message);
138 //Control.Print("Message to queue for send: " + buffer.Length + ", message: " + message);
139 var sendBuf = new ArraySegment<byte>(buffer);
140
141 sendQueue.Add(sendBuf);
142 }
143 public string getResponse()
144 {
145 // Check if server send new messages
146 var cqueue = receiveQueue;
147 string msg;
148 int iterations = 0;
149 while (!cqueue.TryPeek(out msg) && iterations < Control.timeoutIterations)
150 {
151 Thread.Sleep(100);
152 iterations++;
153
154 }
155 if (iterations == Control.timeoutIterations)
156 {
157 throw new Exception("Timeout error");
158 }
159 cqueue.TryDequeue(out msg);
160
161 return msg;
162 }
166 private async void runSend()
167 {
168 Control.Print("WebSocket Message Sender looping.");
169 ArraySegment<byte> msg;
170 while (true)
171 {
172 while (!sendQueue.IsCompleted)
173 {
174 msg = sendQueue.Take();
175 //Control.Print("Dequeued this message to send: " + msg);
176 await ws.SendAsync(msg, WebSocketMessageType.Text, true /* is last part of message */ , CancellationToken.None);
177 }
178 }
179 }
180
181 #endregion
182
183 #region [Receive]
184
190 private async Task<string> receive(UInt64 maxSize = MAXREADSIZE)
191 {
192 // A read buffer, and a memory stream to stuff unknown number of chunks into:
193 byte[] buf = new byte[4 * 1024];
194 var ms = new MemoryStream();
195 ArraySegment<byte> arrayBuf = new ArraySegment<byte>(buf);
196 WebSocketReceiveResult chunkResult = null;
197
198 if (isConnectionOpen())
199 {
200 do
201 {
202 chunkResult = await ws.ReceiveAsync(arrayBuf, CancellationToken.None);
203 ms.Write(arrayBuf.Array, arrayBuf.Offset, chunkResult.Count);
204 //Control.Print("Size of Chunk message: " + chunkResult.Count);
205 if ((UInt64)(chunkResult.Count) > MAXREADSIZE)
206 {
207 Console.Error.WriteLine("Warning: Message is bigger than expected!");
208 }
209 } while (!chunkResult.EndOfMessage);
210 ms.Seek(0, SeekOrigin.Begin);
211
212 // Looking for UTF-8 JSON type messages.
213 if (chunkResult.MessageType == WebSocketMessageType.Text)
214 {
215 return streamToString(ms, Encoding.UTF8);
216 }
217 }
218 return "";
219 }
220
224 private async void runReceive()
225 {
226 Control.Print("WebSocket Message Receiver looping.");
227 string result;
228 while (true)
229 {
230 //Control.Print("Awaiting Receive...");
231 result = await receive();
232 if (result != null && result.Length > 0)
233 {
234 JSONNode response = JSON.Parse(result);
235 if (response["function"])
236 {
237 string function = response["function"].Value;
238 if (function.Equals("subscriber"))
239 {
240 onSubscriber(result);
241 }
242 else if (function.Equals("update"))
243 {
244 onUpdate(result);
245 }
246 else if (function.Equals("stale"))
247 {
248 onStale(result);
249 }
250 else if (function.Equals("dropped"))
251 {
252 onDropped(result);
253 }
254 else
255 {
256 Control.Print("Unknown callback. Maybe this library is outdated?");
257 }
258 }
259 else
260 {
261 receiveQueue.Enqueue(result);
262 }
263
264 }
265 else
266 {
267 Task.Delay(50).Wait();
268 }
269 }
270 }
271
272 #endregion
273
274 #region [Utility]
281 public static string streamToString(MemoryStream ms, Encoding encoding)
282 {
283 string readString = "";
284 if (encoding == Encoding.UTF8)
285 {
286 using (var reader = new StreamReader(ms, encoding))
287 {
288 readString = reader.ReadToEnd();
289 }
290 }
291 return readString;
292 }
293 #endregion
294 }
295}