corelink.corelink

Python client for Corelink.

corelink.hsrn.nyu.edu

  1"""
  2Python client for Corelink.\n
  3corelink.hsrn.nyu.edu
  4"""
  5
  6import websockets
  7import ssl
  8import asyncio
  9import json
 10from .resources import reqs, processing, variables
 11from .resources.control import *
 12from .resources.variables import log
 13
 14def run(function):
 15    """Runs the user function in an event loop, and catches keyboard interrupt."""
 16    try:
 17        variables.loop = asyncio.get_event_loop()
 18        variables.loop.run_until_complete(function)
 19        print("Closing down Corelink...")
 20    except KeyboardInterrupt:
 21        print("Closing from keyboard interrupt.")
 22    finally:
 23        if variables.is_open:
 24            variables.loop.run_until_complete(_exit())
 25
 26async def keep_open():
 27    """Called by user to ensure that the program stays open while awaiting data."""
 28    asyncio.create_task(processing.maintain())
 29
 30async def close():
 31    """Called by user to end a receiver session (used after calling `keep_open()`)."""
 32    variables.shut_down = True
 33
 34async def connect(username: str, password: str, host: str, port: str):
 35    """Connects to server and authenticates.
 36    :param username: username registered with Corelink server
 37    :param password: password associated with username
 38    :param host: host address to connect to
 39    :param port: host port to connect to
 40    :return: token
 41    """
 42    variables.user = username
 43    variables.password = password
 44    variables.host = host
 45    variables.port = port
 46    # variables.protocol = protocol
 47    variables.connection = await websockets.connect(f"wss://{variables.host}:{variables.port}", ssl=ssl.SSLContext(ssl.PROTOCOL_TLS))
 48    variables.receiver_task = asyncio.create_task(processing.ws_control_receiver(variables.connection))
 49    await auth()
 50
 51async def set_data_callback(callback):
 52    """User should pass their callback function into this.
 53    The function is expected to take:
 54        param1 message: bytes,
 55        param2 streamID: int,
 56        param3 header: dict (sometimes empty)"""
 57    variables.user_cb = callback
 58
 59async def set_server_callback(callback, key: str):
 60    """Sets a callback function for server messages of the given key:
 61        options: 'update', 'subscriber', 'stale', 'dropped'
 62    callback should expect dict message with the server message (details in docs),
 63                        and str key listing what the message type is."""
 64    variables.server_callbacks[key] = callback
 65
 66async def active_streams() -> list:
 67    """Returns a list of current streamIDs."""
 68    return list(variables.streams) + list(variables.receiver)
 69
 70async def create_sender(workspace, protocol: str = "tcp", streamID="", data_type='', metadata='', sender="", ip="", port="") -> int:
 71    """Requests a sender from the server and opens the connection
 72    :return: streamID used to send
 73    """
 74    protocol = protocol.lower()
 75    if protocol not in variables.valid_proto:
 76        raise ValueError("protocol: protocol must be ws, udp, or tcp.")
 77    request = {
 78        "function": "sender",
 79        "workspace": workspace,
 80        "senderID": streamID,
 81        "proto": protocol,
 82        "IP": ip,
 83        "port": port,
 84        "alert": False,
 85        "type": data_type,
 86        "meta": metadata,
 87        "from": sender,
 88        "token": variables.token
 89    }
 90    sender = reqs.retrieve(await reqs.request_func(request), ret=True)
 91    variables.streams[sender['streamID']] = sender
 92    variables.streams[sender['streamID']]['protocol'] = request['proto']
 93    
 94    await processing.connect_sender(sender['streamID'])
 95    log(type(variables.streams[sender['streamID']]))
 96    return sender['streamID']
 97
 98async def create_receiver(workspace, protocol, data_type="", metadata="", alert: bool = False, echo: bool = False, receiver_id="", stream_ids=[], ip=None, port=0, subscribe=True) -> int:
 99    """Requests a receiver from the server and opens the connection.
100    :return: streamID used to receive
101    """
102    protocol = protocol.lower()
103    if protocol not in variables.valid_proto:
104        raise ValueError("protocol: protocol must be ws, udp, or tcp.")
105    if ip is None:
106        ip = variables.user_ip
107    request = {
108        "function": "receiver",
109        "workspace": workspace,
110        "receiverID": receiver_id,
111        "streamIDs": stream_ids,
112        "proto": protocol,
113        "type": data_type,
114        "alert": alert,
115        "echo": echo,
116        "IP": ip,
117        "port": port,
118        "meta": metadata,
119        "subscribe": subscribe,
120        "token": variables.token
121    }
122    receiver = reqs.retrieve(await reqs.request_func(request), ret=True)
123    log("receiver: " + str(receiver))
124    variables.receiver[receiver['streamID']] = receiver
125    await processing.connect_receiver(receiver['streamID']) # WHY ASYNC
126    return receiver['streamID']
127
128async def send(streamID, data, user_header: dict = None):
129    """Sends data to streamID's stream (user should first call connect_sender(streamID)).
130    data should be either str or bytes.
131    """
132    stream = variables.streams[streamID] # for convenience
133    user_h = json.dumps(user_header) if user_header else ""
134    header = [0, 0, 0, 0, 0, 0, 0, 0]
135    header = bytearray(header)
136    head = memoryview(header)
137    head[0:2] = int.to_bytes(len(user_h), 2, 'little')
138    head[2:4] = int.to_bytes(len(data), 2, 'little')
139    head[4:6] = int.to_bytes(int(streamID), 2, 'little')
140    message = bytes(header) + user_h.encode() + data.encode()
141    log(f"Sending message on protocol {stream['protocol']} for streamID {streamID}")
142    log(message)
143    if stream['protocol'] == 'ws':
144        asyncio.create_task(stream['connection'].send(message))
145        log("[ws] data sent")
146    elif stream['protocol'] == 'tcp':
147        loop = asyncio.get_running_loop()
148        await loop.sock_sendall(stream['connection'], message)  # Use sock_sendall to ensure all data is sent
149        log("[tcp] data sent")
150    else:
151        stream['connection'].send(message)
152
153async def disconnect_senders(streamIDs: list):
154    """Disconnects sender given list of streamIDs from server and removes streams."""
155    for ID in streamIDs:
156        log("disconnecting " + str(ID))
157        if variables.streams[ID]['protocol'] == 'ws':
158            await variables.streams[ID]['connection'].close()
159        else:
160            variables.streams[ID]['connection'].close()
161    await disconnect_streams(stream_ids=streamIDs)
162
163async def disconnect_receivers(streamIDs: list):
164    """Disconnects receiver given list of streamIDs from server and removes streams."""
165    for ID in streamIDs:
166        log("disconnecting receiver " + str(ID))
167        receiver = variables.receiver[ID]
168        if receiver['proto'] == 'ws':
169            await receiver['connection'].close()
170            variables.receiver_task.cancel()
171        else:
172            receiver['connection'][0].close()
173    await disconnect_streams(stream_ids=streamIDs)
174
175async def _exit():
176    """Disconnects all open streams and closes connection.
177    Automatically called by run()."""
178    if variables.streams:
179        await disconnect_senders(list(variables.streams))
180    if variables.receiver:
181        await disconnect_receivers(list(variables.receiver))
182    # expire()
183    await variables.connection.close()  # closes websockets control stream
184    variables.is_open = False
185    print("Closed.")
def run(function):
15def run(function):
16    """Runs the user function in an event loop, and catches keyboard interrupt."""
17    try:
18        variables.loop = asyncio.get_event_loop()
19        variables.loop.run_until_complete(function)
20        print("Closing down Corelink...")
21    except KeyboardInterrupt:
22        print("Closing from keyboard interrupt.")
23    finally:
24        if variables.is_open:
25            variables.loop.run_until_complete(_exit())

Runs the user function in an event loop, and catches keyboard interrupt.

async def keep_open():
27async def keep_open():
28    """Called by user to ensure that the program stays open while awaiting data."""
29    asyncio.create_task(processing.maintain())

Called by user to ensure that the program stays open while awaiting data.

async def close():
31async def close():
32    """Called by user to end a receiver session (used after calling `keep_open()`)."""
33    variables.shut_down = True

Called by user to end a receiver session (used after calling keep_open()).

async def connect(username: str, password: str, host: str, port: str):
35async def connect(username: str, password: str, host: str, port: str):
36    """Connects to server and authenticates.
37    :param username: username registered with Corelink server
38    :param password: password associated with username
39    :param host: host address to connect to
40    :param port: host port to connect to
41    :return: token
42    """
43    variables.user = username
44    variables.password = password
45    variables.host = host
46    variables.port = port
47    # variables.protocol = protocol
48    variables.connection = await websockets.connect(f"wss://{variables.host}:{variables.port}", ssl=ssl.SSLContext(ssl.PROTOCOL_TLS))
49    variables.receiver_task = asyncio.create_task(processing.ws_control_receiver(variables.connection))
50    await auth()

Connects to server and authenticates.

Parameters
  • username: username registered with Corelink server
  • password: password associated with username
  • host: host address to connect to
  • port: host port to connect to
Returns

token

async def set_data_callback(callback):
52async def set_data_callback(callback):
53    """User should pass their callback function into this.
54    The function is expected to take:
55        param1 message: bytes,
56        param2 streamID: int,
57        param3 header: dict (sometimes empty)"""
58    variables.user_cb = callback

User should pass their callback function into this. The function is expected to take: param1 message: bytes, param2 streamID: int, param3 header: dict (sometimes empty)

async def set_server_callback(callback, key: str):
60async def set_server_callback(callback, key: str):
61    """Sets a callback function for server messages of the given key:
62        options: 'update', 'subscriber', 'stale', 'dropped'
63    callback should expect dict message with the server message (details in docs),
64                        and str key listing what the message type is."""
65    variables.server_callbacks[key] = callback

Sets a callback function for server messages of the given key: options: 'update', 'subscriber', 'stale', 'dropped' callback should expect dict message with the server message (details in docs), and str key listing what the message type is.

async def active_streams() -> list:
67async def active_streams() -> list:
68    """Returns a list of current streamIDs."""
69    return list(variables.streams) + list(variables.receiver)

Returns a list of current streamIDs.

async def create_sender( workspace, protocol: str = 'tcp', streamID='', data_type='', metadata='', sender='', ip='', port='') -> int:
71async def create_sender(workspace, protocol: str = "tcp", streamID="", data_type='', metadata='', sender="", ip="", port="") -> int:
72    """Requests a sender from the server and opens the connection
73    :return: streamID used to send
74    """
75    protocol = protocol.lower()
76    if protocol not in variables.valid_proto:
77        raise ValueError("protocol: protocol must be ws, udp, or tcp.")
78    request = {
79        "function": "sender",
80        "workspace": workspace,
81        "senderID": streamID,
82        "proto": protocol,
83        "IP": ip,
84        "port": port,
85        "alert": False,
86        "type": data_type,
87        "meta": metadata,
88        "from": sender,
89        "token": variables.token
90    }
91    sender = reqs.retrieve(await reqs.request_func(request), ret=True)
92    variables.streams[sender['streamID']] = sender
93    variables.streams[sender['streamID']]['protocol'] = request['proto']
94    
95    await processing.connect_sender(sender['streamID'])
96    log(type(variables.streams[sender['streamID']]))
97    return sender['streamID']

Requests a sender from the server and opens the connection

Returns

streamID used to send

async def create_receiver( workspace, protocol, data_type='', metadata='', alert: bool = False, echo: bool = False, receiver_id='', stream_ids=[], ip=None, port=0, subscribe=True) -> int:
 99async def create_receiver(workspace, protocol, data_type="", metadata="", alert: bool = False, echo: bool = False, receiver_id="", stream_ids=[], ip=None, port=0, subscribe=True) -> int:
100    """Requests a receiver from the server and opens the connection.
101    :return: streamID used to receive
102    """
103    protocol = protocol.lower()
104    if protocol not in variables.valid_proto:
105        raise ValueError("protocol: protocol must be ws, udp, or tcp.")
106    if ip is None:
107        ip = variables.user_ip
108    request = {
109        "function": "receiver",
110        "workspace": workspace,
111        "receiverID": receiver_id,
112        "streamIDs": stream_ids,
113        "proto": protocol,
114        "type": data_type,
115        "alert": alert,
116        "echo": echo,
117        "IP": ip,
118        "port": port,
119        "meta": metadata,
120        "subscribe": subscribe,
121        "token": variables.token
122    }
123    receiver = reqs.retrieve(await reqs.request_func(request), ret=True)
124    log("receiver: " + str(receiver))
125    variables.receiver[receiver['streamID']] = receiver
126    await processing.connect_receiver(receiver['streamID']) # WHY ASYNC
127    return receiver['streamID']

Requests a receiver from the server and opens the connection.

Returns

streamID used to receive

async def send(streamID, data, user_header: dict = None):
129async def send(streamID, data, user_header: dict = None):
130    """Sends data to streamID's stream (user should first call connect_sender(streamID)).
131    data should be either str or bytes.
132    """
133    stream = variables.streams[streamID] # for convenience
134    user_h = json.dumps(user_header) if user_header else ""
135    header = [0, 0, 0, 0, 0, 0, 0, 0]
136    header = bytearray(header)
137    head = memoryview(header)
138    head[0:2] = int.to_bytes(len(user_h), 2, 'little')
139    head[2:4] = int.to_bytes(len(data), 2, 'little')
140    head[4:6] = int.to_bytes(int(streamID), 2, 'little')
141    message = bytes(header) + user_h.encode() + data.encode()
142    log(f"Sending message on protocol {stream['protocol']} for streamID {streamID}")
143    log(message)
144    if stream['protocol'] == 'ws':
145        asyncio.create_task(stream['connection'].send(message))
146        log("[ws] data sent")
147    elif stream['protocol'] == 'tcp':
148        loop = asyncio.get_running_loop()
149        await loop.sock_sendall(stream['connection'], message)  # Use sock_sendall to ensure all data is sent
150        log("[tcp] data sent")
151    else:
152        stream['connection'].send(message)

Sends data to streamID's stream (user should first call connect_sender(streamID)). data should be either str or bytes.

async def disconnect_senders(streamIDs: list):
154async def disconnect_senders(streamIDs: list):
155    """Disconnects sender given list of streamIDs from server and removes streams."""
156    for ID in streamIDs:
157        log("disconnecting " + str(ID))
158        if variables.streams[ID]['protocol'] == 'ws':
159            await variables.streams[ID]['connection'].close()
160        else:
161            variables.streams[ID]['connection'].close()
162    await disconnect_streams(stream_ids=streamIDs)

Disconnects sender given list of streamIDs from server and removes streams.

async def disconnect_receivers(streamIDs: list):
164async def disconnect_receivers(streamIDs: list):
165    """Disconnects receiver given list of streamIDs from server and removes streams."""
166    for ID in streamIDs:
167        log("disconnecting receiver " + str(ID))
168        receiver = variables.receiver[ID]
169        if receiver['proto'] == 'ws':
170            await receiver['connection'].close()
171            variables.receiver_task.cancel()
172        else:
173            receiver['connection'][0].close()
174    await disconnect_streams(stream_ids=streamIDs)

Disconnects receiver given list of streamIDs from server and removes streams.