-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add abandoned initial attempt at interception proxy
Implemented entirely as an mitmproxy addon. Abandoned because it didn’t give me sufficient control over WebSocket connection lifetimes.
- Loading branch information
1 parent
82c746b
commit eeebf5d
Showing
5 changed files
with
335 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from base64 import b64encode, b64decode | ||
from dataclasses import dataclass | ||
from intercepted_messages_queue import DropMessageAction, ReplaceMessageAction | ||
from typing import Literal | ||
import logging | ||
|
||
@dataclass | ||
class JSONRPCRequest: | ||
id: str | ||
|
||
def create_dto(self): | ||
return { "jsonrpc": "2.0", "id": self.id } | ||
|
||
@dataclass | ||
class TransformInterceptedMessageJSONRPCRequest(JSONRPCRequest): | ||
type: Literal["binary", "text"] | ||
data: bytes | str | ||
from_client: bool | ||
|
||
def create_dto(self): | ||
data_param = None | ||
match self.type: | ||
case "binary": | ||
data_param = b64encode(self.data).decode('utf-8') | ||
case "text": | ||
data_param = self.data | ||
|
||
params = { 'type': self.type, 'data': data_param, 'fromClient': self.from_client } | ||
return { **super().create_dto(), "method": "transformInterceptedMessage", "params": params } | ||
|
||
def json_rpc_response_from_dto(dto): | ||
# TODO when we add more methods we’ll need a way to know which method the request corresponds to, via the ID | ||
return TransformInterceptedMessageJSONRPCResponse.from_dto(dto) | ||
|
||
@dataclass | ||
class JSONRPCResponse: | ||
id: str | ||
|
||
@dataclass | ||
class TransformInterceptedMessageJSONRPCResponse(JSONRPCResponse): | ||
result: DropMessageAction | ReplaceMessageAction | ||
|
||
def from_dto(dto): | ||
match dto['result']['action']: | ||
case 'drop': | ||
result = DropMessageAction() | ||
case 'replace': | ||
type = dto['result']['type'] | ||
match type: | ||
case 'binary': | ||
data = b64decode(dto['result']['data']) | ||
case 'text': | ||
data = dto['result']['data'] | ||
result = ReplaceMessageAction(type, data) | ||
|
||
return TransformInterceptedMessageJSONRPCResponse(id = dto['id'], result = result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import websockets | ||
import asyncio | ||
import logging | ||
|
||
class ControlServer: | ||
def __init__(self, interception_context): | ||
self._websocket_connections = [] | ||
self._interception_context = interception_context | ||
|
||
# TODO make it not keep trying to restart this server every time I modify this file | ||
async def run(self): | ||
async with websockets.serve(self._handle_websocket_connection, "", 8001): | ||
await asyncio.Future() # run forever | ||
|
||
async def _handle_websocket_connection(self, websocket): | ||
logging.info(f'TestProxy handle_websocket_connection {websocket}, open {websocket.open}') | ||
# Store the connection so we can broadcast to it later. | ||
self._websocket_connections.append(websocket) | ||
logging.info(f'TestProxy now has websockets {self._websocket_connections}') | ||
|
||
async for message in websocket: | ||
logging.info(f'TestProxy received control message: {message}') | ||
try: | ||
self._interception_context.on_websocket_message(message) | ||
except Exception as err: | ||
logging.info(f'TestProxy got error handling control message: {err}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import mitmproxy | ||
import uuid | ||
from dataclasses import dataclass | ||
from typing import Literal | ||
|
||
@dataclass | ||
class InterceptedMessagePredicate: | ||
flow: mitmproxy.http.HTTPFlow | ||
from_client: bool | ||
|
||
# https://stackoverflow.com/a/4901847 | ||
|
||
def __hash__(self): | ||
return hash((self.flow, self.from_client)) | ||
|
||
def __eq__(self, other): | ||
return (self.flow, self.from_client) == (other.flow, other.from_client) | ||
|
||
@dataclass | ||
# A handle for locating a message within InterceptedMessageQueue. | ||
class InterceptedMessageHandle: | ||
predicate: InterceptedMessagePredicate | ||
message_id: uuid.UUID | ||
|
||
class DropMessageAction: | ||
pass | ||
|
||
@dataclass | ||
class ReplaceMessageAction: | ||
type: Literal["binary", "text"] | ||
data: bytes | str | ||
|
||
@dataclass | ||
class InterceptedMessage: | ||
message: mitmproxy.websocket.WebSocketMessage | ||
id: uuid.UUID = uuid.uuid4() | ||
action: None | DropMessageAction | ReplaceMessageAction = None | ||
|
||
# Per-connection, per-direction message queue. We use it to queue intercepted messages whilst waiting for a control server message telling us what to do with the message at the head of the queue. | ||
class InterceptedMessagesQueue: | ||
def __init__(self): | ||
# Maps an InterceptedMessagePredicate to a queue | ||
self.queues = {} | ||
|
||
def _messages_for(self, predicate, create_if_needed = False): | ||
if predicate in self.queues: | ||
return self.queues[predicate] | ||
else: | ||
result = [] | ||
if create_if_needed: | ||
self.queues[predicate] = result | ||
return result | ||
|
||
def pop(self, predicate): | ||
return self._messages_for(predicate).pop(0) | ||
|
||
def has_messages(self, predicate): | ||
return len(self._messages_for(predicate)) != 0 | ||
|
||
def append(self, message: InterceptedMessage, predicate): | ||
self._messages_for(predicate, create_if_needed = True).append(message) | ||
|
||
def count(self, predicate): | ||
return len(self._messages_for(predicate)) | ||
|
||
def is_head(self, handle: InterceptedMessageHandle): | ||
head = self._messages_for(handle.predicate)[0] | ||
return head.id == handle.message_id | ||
|
||
def get_head(self, predicate: InterceptedMessagePredicate): | ||
return self._messages_for(predicate)[0] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import logging | ||
import json | ||
import uuid | ||
import asyncio | ||
import mitmproxy | ||
|
||
from control_rpc import JSONRPCRequest, TransformInterceptedMessageJSONRPCRequest, JSONRPCResponse, TransformInterceptedMessageJSONRPCResponse, json_rpc_response_from_dto | ||
from intercepted_messages_queue import InterceptedMessagesQueue, InterceptedMessagePredicate, InterceptedMessageHandle, DropMessageAction, ReplaceMessageAction, InterceptedMessage | ||
|
||
class InterceptionContext: | ||
def __init__(self): | ||
self._intercepted_messages_queue = InterceptedMessagesQueue() | ||
self._json_rpc_request_ids_to_handles = {} | ||
self.control_server = None | ||
|
||
# control API currently only works with text frames | ||
def on_websocket_message(self, payload: str): | ||
dto = json.loads(payload) | ||
|
||
if 'error' in dto: | ||
raise Exception(f'TestProxy not expecting there to be an error in JSON-RPC response') | ||
elif 'result' in dto: | ||
response = json_rpc_response_from_dto(dto) | ||
self.handle_json_rpc_response(response) | ||
else: | ||
raise Exception(f'TestProxy got unrecognised control API message {dto}') | ||
|
||
def handle_json_rpc_response(self, response: JSONRPCResponse): | ||
logging.info(f'TestProxy got JSON-RPC response: {response}, type: {type(response)}') | ||
match response: | ||
case TransformInterceptedMessageJSONRPCResponse(): | ||
self.handle_transform_intercepted_message_response(response) | ||
case _: | ||
raise Exception(f'TestProxy got unknown response {response}') | ||
|
||
def handle_transform_intercepted_message_response(self, response: TransformInterceptedMessageJSONRPCResponse): | ||
json_rpc_request_id = uuid.UUID(response.id) | ||
handle = self._json_rpc_request_ids_to_handles[json_rpc_request_id] | ||
|
||
if handle is None: | ||
raise Exception(f'TestProxy doesn’t recognise response ID {json_rpc_request_id}, enqueued, messages are {self._intercepted_messages_queue}') | ||
|
||
del self._json_rpc_request_ids_to_handles[json_rpc_request_id] | ||
|
||
if not self._intercepted_messages_queue.is_head(handle): | ||
raise Exception(f'TestProxy got response for an intercepted message that’s not at head of queue; shouldn’t be possible {json_rpc_request_id}') | ||
|
||
intercepted_message = self._intercepted_messages_queue.get_head(handle.predicate) | ||
|
||
if intercepted_message.action is not None: | ||
raise Exception(f'TestProxy was asked to set the action for a message that already has an action set; shouldn’t happen.') | ||
|
||
intercepted_message.action = response.result | ||
|
||
self._dequeue_intercepted_message(handle.predicate) | ||
|
||
def _dequeue_intercepted_message(self, predicate): | ||
logging.info(f'TestProxy dequeueing intercepted message') | ||
|
||
message = self._intercepted_messages_queue.pop(predicate) | ||
|
||
if message.action is None: | ||
raise Exception(f'TestProxy attempted to dequeue {message} but it doesn’t have action') | ||
|
||
match message.action: | ||
case ReplaceMessageAction(type, data): | ||
# inject the replacement | ||
# https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message | ||
logging.info(f'TestProxy re-injecting message {message} with new type {type} and new data {data}') | ||
# https://github.com/mitmproxy/mitmproxy/blob/e834259215dc4dd6f1b58dee8e0f84943a002db6/mitmproxy/addons/proxyserver.py#L308-L322 | ||
mitmproxy.ctx.master.commands.call("inject.websocket", predicate.flow, not predicate.from_client, data.encode() if type == 'text' else data, type == 'text') | ||
case DropMessageAction: | ||
# drop the message | ||
logging.info(f'TestProxy dropping message {message}') | ||
|
||
if self._intercepted_messages_queue.has_messages(predicate): | ||
self._broadcast_next_message(predicate) | ||
|
||
def _broadcast_json_rpc_request(self, request: JSONRPCRequest): | ||
data = json.dumps(request.create_dto()) | ||
|
||
logging.info(f'TestProxy broadcast request JSON {data}') | ||
# TODO tidy up - have a method on server | ||
for websocket in self.control_server._websocket_connections: | ||
logging.info(f'TestProxy broadcast to connection {websocket}, open {websocket.open}') | ||
asyncio.get_running_loop().create_task(websocket.send(data)) | ||
|
||
def _broadcast_next_message(self, predicate): | ||
intercepted_message = self._intercepted_messages_queue.get_head(predicate) | ||
|
||
json_rpc_request_id = uuid.uuid4() | ||
|
||
handle = InterceptedMessageHandle(predicate, intercepted_message.id) | ||
self._json_rpc_request_ids_to_handles[json_rpc_request_id] = handle | ||
|
||
# Broadcast to everyone connected to the control server. | ||
# TODO I think would be better for there to be one client who sends an explicit message to become the active client, or to only allow a single connection at a time; not important now though | ||
logging.info(f'TestProxy broadcast message {intercepted_message!r}') | ||
|
||
json_rpc_request = TransformInterceptedMessageJSONRPCRequest(id = str(json_rpc_request_id), type = "text" if intercepted_message.message.is_text else "binary", data = intercepted_message.message.text if intercepted_message.message.is_text else intercepted_message.message.content, from_client = intercepted_message.message.from_client) | ||
self._broadcast_json_rpc_request(json_rpc_request) | ||
|
||
def enqueue_message(self, message, flow): | ||
predicate = InterceptedMessagePredicate(flow, message.from_client) | ||
|
||
intercepted_message = InterceptedMessage(message) | ||
self._intercepted_messages_queue.append(intercepted_message, predicate) | ||
|
||
# drop the message; we’ll insert it later when the client tells us what to do with it | ||
message.drop() | ||
|
||
if self._intercepted_messages_queue.count(predicate) == 1: | ||
self._broadcast_next_message(predicate) | ||
else: | ||
logging.info(f'TestProxy enqueued message {message} since there are {self._intercepted_messages_queue.count(predicate) - 1} pending messages') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import logging | ||
import mitmproxy | ||
import asyncio | ||
|
||
from control_server import ControlServer | ||
from interception_context import InterceptionContext | ||
|
||
# TODO cleanup as control server connections go away | ||
# TODO cleanup as intercepted connections go away | ||
|
||
class AblyInterceptionProxyAddon: | ||
# Called when an addon is first loaded. This event receives a Loader object, which contains methods for adding options and commands. This method is where the addon configures itself. | ||
def load(self, loader: mitmproxy.addonmanager.Loader): | ||
logging.info("TestProxy load") | ||
self._interception_context = InterceptionContext() | ||
self._control_server = ControlServer(self._interception_context) | ||
self._interception_context.control_server = self._control_server | ||
self._control_server_task = asyncio.get_running_loop().create_task(self._control_server.run()) | ||
|
||
# events I observe when the script is hot-reloaded upon changing this file: | ||
# | ||
# [17:46:19.517] Loading script test-proxy.py | ||
# | ||
# presumably the old one: | ||
# [17:46:19.519] TestProxy done | ||
# | ||
# presumably the new one: | ||
# [17:46:19.525] TestProxy load | ||
# [17:46:19.526] TestProxy configure | ||
# [17:46:19.526] TestProxy running | ||
# | ||
# want to make sure the server get shut down, because it didn't before and I got errors | ||
|
||
def done(self): | ||
logging.info("TestProxy done") | ||
# wrote this before seeing https://websockets.readthedocs.io/en/stable/faq/server.html#how-do-i-stop-a-server, will keep what I’ve got here until I understand that incantation | ||
# hmm, doesn't actually seem to be working, look into it further | ||
self._control_server_task.cancel() | ||
self._control_server = None | ||
|
||
def configure(self, updated): | ||
logging.info("TestProxy configure") | ||
|
||
def running(self): | ||
logging.info("TestProxy running") | ||
|
||
# A WebSocket connection has commenced. | ||
def websocket_start(self, flow: mitmproxy.http.HTTPFlow): | ||
logging.info("TestProxy websocket_start") | ||
|
||
# Called when a WebSocket message is received from the client or server. The most recent message will be flow.messages[-1]. The message is user-modifiable. Currently there are two types of messages, corresponding to the BINARY and TEXT frame types. | ||
# TODO do we need to think about fragmentation? | ||
def websocket_message(self, flow: mitmproxy.http.HTTPFlow): | ||
message = flow.websocket.messages[-1] | ||
|
||
if message.injected: | ||
logging.info("TestProxy re-received injected message; not doing anything to it") | ||
return | ||
|
||
logging.info("TestProxy websocket_message") | ||
self._interception_context.enqueue_message(message, flow) | ||
|
||
# A WebSocket connection has ended. You can check flow.websocket.close_code to determine why it ended. | ||
def websocket_end(self, flow: mitmproxy.http.HTTPFlow): | ||
logging.info("TestProxy websocket_end") | ||
|
||
addons = [AblyInterceptionProxyAddon()] |