diff --git a/test/mitmproxy-addon/src/control_rpc.py b/test/mitmproxy-addon/src/control_rpc.py new file mode 100644 index 000000000..df9c47ab7 --- /dev/null +++ b/test/mitmproxy-addon/src/control_rpc.py @@ -0,0 +1,56 @@ +from base64 import b64encode, b64decode +from dataclasses import dataclass +from intercepted_messages_queue import DropMessageAction, ReplaceMessageAction +from typing import Literal +import logging + +@dataclass +class JSONRPCRequest: + id: str + + def create_dto(self): + return { "jsonrpc": "2.0", "id": self.id } + +@dataclass +class TransformInterceptedMessageJSONRPCRequest(JSONRPCRequest): + type: Literal["binary", "text"] + data: bytes | str + from_client: bool + + def create_dto(self): + data_param = None + match self.type: + case "binary": + data_param = b64encode(self.data).decode('utf-8') + case "text": + data_param = self.data + + params = { 'type': self.type, 'data': data_param, 'fromClient': self.from_client } + return { **super().create_dto(), "method": "transformInterceptedMessage", "params": params } + +def json_rpc_response_from_dto(dto): + # TODO when we add more methods we’ll need a way to know which method the request corresponds to, via the ID + return TransformInterceptedMessageJSONRPCResponse.from_dto(dto) + +@dataclass +class JSONRPCResponse: + id: str + +@dataclass +class TransformInterceptedMessageJSONRPCResponse(JSONRPCResponse): + result: DropMessageAction | ReplaceMessageAction + + def from_dto(dto): + match dto['result']['action']: + case 'drop': + result = DropMessageAction() + case 'replace': + type = dto['result']['type'] + match type: + case 'binary': + data = b64decode(dto['result']['data']) + case 'text': + data = dto['result']['data'] + result = ReplaceMessageAction(type, data) + + return TransformInterceptedMessageJSONRPCResponse(id = dto['id'], result = result) diff --git a/test/mitmproxy-addon/src/control_server.py b/test/mitmproxy-addon/src/control_server.py new file mode 100644 index 000000000..9b2a9b63a --- /dev/null +++ b/test/mitmproxy-addon/src/control_server.py @@ -0,0 +1,26 @@ +import websockets +import asyncio +import logging + +class ControlServer: + def __init__(self, interception_context): + self._websocket_connections = [] + self._interception_context = interception_context + + # TODO make it not keep trying to restart this server every time I modify this file + async def run(self): + async with websockets.serve(self._handle_websocket_connection, "", 8001): + await asyncio.Future() # run forever + + async def _handle_websocket_connection(self, websocket): + logging.info(f'TestProxy handle_websocket_connection {websocket}, open {websocket.open}') + # Store the connection so we can broadcast to it later. + self._websocket_connections.append(websocket) + logging.info(f'TestProxy now has websockets {self._websocket_connections}') + + async for message in websocket: + logging.info(f'TestProxy received control message: {message}') + try: + self._interception_context.on_websocket_message(message) + except Exception as err: + logging.info(f'TestProxy got error handling control message: {err}') diff --git a/test/mitmproxy-addon/src/intercepted_messages_queue.py b/test/mitmproxy-addon/src/intercepted_messages_queue.py new file mode 100644 index 000000000..b4097958f --- /dev/null +++ b/test/mitmproxy-addon/src/intercepted_messages_queue.py @@ -0,0 +1,71 @@ +import mitmproxy +import uuid +from dataclasses import dataclass +from typing import Literal + +@dataclass +class InterceptedMessagePredicate: + flow: mitmproxy.http.HTTPFlow + from_client: bool + + # https://stackoverflow.com/a/4901847 + + def __hash__(self): + return hash((self.flow, self.from_client)) + + def __eq__(self, other): + return (self.flow, self.from_client) == (other.flow, other.from_client) + +@dataclass +# A handle for locating a message within InterceptedMessageQueue. +class InterceptedMessageHandle: + predicate: InterceptedMessagePredicate + message_id: uuid.UUID + +class DropMessageAction: + pass + +@dataclass +class ReplaceMessageAction: + type: Literal["binary", "text"] + data: bytes | str + +@dataclass +class InterceptedMessage: + message: mitmproxy.websocket.WebSocketMessage + id: uuid.UUID = uuid.uuid4() + action: None | DropMessageAction | ReplaceMessageAction = None + +# Per-connection, per-direction message queue. We use it to queue intercepted messages whilst waiting for a control server message telling us what to do with the message at the head of the queue. +class InterceptedMessagesQueue: + def __init__(self): + # Maps an InterceptedMessagePredicate to a queue + self.queues = {} + + def _messages_for(self, predicate, create_if_needed = False): + if predicate in self.queues: + return self.queues[predicate] + else: + result = [] + if create_if_needed: + self.queues[predicate] = result + return result + + def pop(self, predicate): + return self._messages_for(predicate).pop(0) + + def has_messages(self, predicate): + return len(self._messages_for(predicate)) != 0 + + def append(self, message: InterceptedMessage, predicate): + self._messages_for(predicate, create_if_needed = True).append(message) + + def count(self, predicate): + return len(self._messages_for(predicate)) + + def is_head(self, handle: InterceptedMessageHandle): + head = self._messages_for(handle.predicate)[0] + return head.id == handle.message_id + + def get_head(self, predicate: InterceptedMessagePredicate): + return self._messages_for(predicate)[0] diff --git a/test/mitmproxy-addon/src/interception_context.py b/test/mitmproxy-addon/src/interception_context.py new file mode 100644 index 000000000..cf3f27187 --- /dev/null +++ b/test/mitmproxy-addon/src/interception_context.py @@ -0,0 +1,115 @@ +import logging +import json +import uuid +import asyncio +import mitmproxy + +from control_rpc import JSONRPCRequest, TransformInterceptedMessageJSONRPCRequest, JSONRPCResponse, TransformInterceptedMessageJSONRPCResponse, json_rpc_response_from_dto +from intercepted_messages_queue import InterceptedMessagesQueue, InterceptedMessagePredicate, InterceptedMessageHandle, DropMessageAction, ReplaceMessageAction, InterceptedMessage + +class InterceptionContext: + def __init__(self): + self._intercepted_messages_queue = InterceptedMessagesQueue() + self._json_rpc_request_ids_to_handles = {} + self.control_server = None + + # control API currently only works with text frames + def on_websocket_message(self, payload: str): + dto = json.loads(payload) + + if 'error' in dto: + raise Exception(f'TestProxy not expecting there to be an error in JSON-RPC response') + elif 'result' in dto: + response = json_rpc_response_from_dto(dto) + self.handle_json_rpc_response(response) + else: + raise Exception(f'TestProxy got unrecognised control API message {dto}') + + def handle_json_rpc_response(self, response: JSONRPCResponse): + logging.info(f'TestProxy got JSON-RPC response: {response}, type: {type(response)}') + match response: + case TransformInterceptedMessageJSONRPCResponse(): + self.handle_transform_intercepted_message_response(response) + case _: + raise Exception(f'TestProxy got unknown response {response}') + + def handle_transform_intercepted_message_response(self, response: TransformInterceptedMessageJSONRPCResponse): + json_rpc_request_id = uuid.UUID(response.id) + handle = self._json_rpc_request_ids_to_handles[json_rpc_request_id] + + if handle is None: + raise Exception(f'TestProxy doesn’t recognise response ID {json_rpc_request_id}, enqueued, messages are {self._intercepted_messages_queue}') + + del self._json_rpc_request_ids_to_handles[json_rpc_request_id] + + if not self._intercepted_messages_queue.is_head(handle): + raise Exception(f'TestProxy got response for an intercepted message that’s not at head of queue; shouldn’t be possible {json_rpc_request_id}') + + intercepted_message = self._intercepted_messages_queue.get_head(handle.predicate) + + if intercepted_message.action is not None: + raise Exception(f'TestProxy was asked to set the action for a message that already has an action set; shouldn’t happen.') + + intercepted_message.action = response.result + + self._dequeue_intercepted_message(handle.predicate) + + def _dequeue_intercepted_message(self, predicate): + logging.info(f'TestProxy dequeueing intercepted message') + + message = self._intercepted_messages_queue.pop(predicate) + + if message.action is None: + raise Exception(f'TestProxy attempted to dequeue {message} but it doesn’t have action') + + match message.action: + case ReplaceMessageAction(type, data): + # inject the replacement + # https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message + logging.info(f'TestProxy re-injecting message {message} with new type {type} and new data {data}') + # https://github.com/mitmproxy/mitmproxy/blob/e834259215dc4dd6f1b58dee8e0f84943a002db6/mitmproxy/addons/proxyserver.py#L308-L322 + mitmproxy.ctx.master.commands.call("inject.websocket", predicate.flow, not predicate.from_client, data.encode() if type == 'text' else data, type == 'text') + case DropMessageAction: + # drop the message + logging.info(f'TestProxy dropping message {message}') + + if self._intercepted_messages_queue.has_messages(predicate): + self._broadcast_next_message(predicate) + + def _broadcast_json_rpc_request(self, request: JSONRPCRequest): + data = json.dumps(request.create_dto()) + + logging.info(f'TestProxy broadcast request JSON {data}') + # TODO tidy up - have a method on server + for websocket in self.control_server._websocket_connections: + logging.info(f'TestProxy broadcast to connection {websocket}, open {websocket.open}') + asyncio.get_running_loop().create_task(websocket.send(data)) + + def _broadcast_next_message(self, predicate): + intercepted_message = self._intercepted_messages_queue.get_head(predicate) + + json_rpc_request_id = uuid.uuid4() + + handle = InterceptedMessageHandle(predicate, intercepted_message.id) + self._json_rpc_request_ids_to_handles[json_rpc_request_id] = handle + + # Broadcast to everyone connected to the control server. + # TODO I think would be better for there to be one client who sends an explicit message to become the active client, or to only allow a single connection at a time; not important now though + logging.info(f'TestProxy broadcast message {intercepted_message!r}') + + json_rpc_request = TransformInterceptedMessageJSONRPCRequest(id = str(json_rpc_request_id), type = "text" if intercepted_message.message.is_text else "binary", data = intercepted_message.message.text if intercepted_message.message.is_text else intercepted_message.message.content, from_client = intercepted_message.message.from_client) + self._broadcast_json_rpc_request(json_rpc_request) + + def enqueue_message(self, message, flow): + predicate = InterceptedMessagePredicate(flow, message.from_client) + + intercepted_message = InterceptedMessage(message) + self._intercepted_messages_queue.append(intercepted_message, predicate) + + # drop the message; we’ll insert it later when the client tells us what to do with it + message.drop() + + if self._intercepted_messages_queue.count(predicate) == 1: + self._broadcast_next_message(predicate) + else: + logging.info(f'TestProxy enqueued message {message} since there are {self._intercepted_messages_queue.count(predicate) - 1} pending messages') diff --git a/test/mitmproxy-addon/src/mitmproxy_addon.py b/test/mitmproxy-addon/src/mitmproxy_addon.py new file mode 100644 index 000000000..985f558fc --- /dev/null +++ b/test/mitmproxy-addon/src/mitmproxy_addon.py @@ -0,0 +1,67 @@ +import logging +import mitmproxy +import asyncio + +from control_server import ControlServer +from interception_context import InterceptionContext + +# TODO cleanup as control server connections go away +# TODO cleanup as intercepted connections go away + +class AblyInterceptionProxyAddon: + # Called when an addon is first loaded. This event receives a Loader object, which contains methods for adding options and commands. This method is where the addon configures itself. + def load(self, loader: mitmproxy.addonmanager.Loader): + logging.info("TestProxy load") + self._interception_context = InterceptionContext() + self._control_server = ControlServer(self._interception_context) + self._interception_context.control_server = self._control_server + self._control_server_task = asyncio.get_running_loop().create_task(self._control_server.run()) + +# events I observe when the script is hot-reloaded upon changing this file: +# +# [17:46:19.517] Loading script test-proxy.py +# +# presumably the old one: +# [17:46:19.519] TestProxy done +# +# presumably the new one: +# [17:46:19.525] TestProxy load +# [17:46:19.526] TestProxy configure +# [17:46:19.526] TestProxy running +# +# want to make sure the server get shut down, because it didn't before and I got errors + + def done(self): + logging.info("TestProxy done") + # wrote this before seeing https://websockets.readthedocs.io/en/stable/faq/server.html#how-do-i-stop-a-server, will keep what I’ve got here until I understand that incantation + # hmm, doesn't actually seem to be working, look into it further + self._control_server_task.cancel() + self._control_server = None + + def configure(self, updated): + logging.info("TestProxy configure") + + def running(self): + logging.info("TestProxy running") + + # A WebSocket connection has commenced. + def websocket_start(self, flow: mitmproxy.http.HTTPFlow): + logging.info("TestProxy websocket_start") + + # Called when a WebSocket message is received from the client or server. The most recent message will be flow.messages[-1]. The message is user-modifiable. Currently there are two types of messages, corresponding to the BINARY and TEXT frame types. + # TODO do we need to think about fragmentation? + def websocket_message(self, flow: mitmproxy.http.HTTPFlow): + message = flow.websocket.messages[-1] + + if message.injected: + logging.info("TestProxy re-received injected message; not doing anything to it") + return + + logging.info("TestProxy websocket_message") + self._interception_context.enqueue_message(message, flow) + + # A WebSocket connection has ended. You can check flow.websocket.close_code to determine why it ended. + def websocket_end(self, flow: mitmproxy.http.HTTPFlow): + logging.info("TestProxy websocket_end") + +addons = [AblyInterceptionProxyAddon()]