Skip to content

Commit

Permalink
Merge pull request #18 from securityinmobility/main
Browse files Browse the repository at this point in the history
Add ISO15118 discovery module for detecting charging stations / vehicles in a network
  • Loading branch information
eder-lukas authored Sep 19, 2024
2 parents c04d639 + 86067f3 commit 452d237
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/.vscode
venv/
.venv/

*.pyc
__pycache__
Expand Down
4 changes: 4 additions & 0 deletions autosec/core/ressources/ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ def __eq__(self, other) -> bool:
tmp_3 = self.get_url() == other.get_url()
return tmp_1 and tmp_2 and tmp_3

def __str__(self):
#return str(self.get_port())
#return str(self.get_name())
return str(self.get_service_name())

class InternetConnection(AutosecRessource):
_service: InternetService
Expand Down
212 changes: 212 additions & 0 deletions autosec/modules/ev_charging_station_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from scapy.all import conf
from autosec.core.ressources.ip import InternetInterface, InternetService, InternetDevice
from autosec.core.ressources.base import AutosecRessource
from autosec.core.autosec_module import AutosecModule, AutosecModuleInformation
from typing import List, Optional, Tuple
from asyncio import DatagramProtocol, DatagramTransport
import socket
import struct
import asyncio
import ipaddress

# Define a UDP client class for communication
class UDPClient(DatagramProtocol):
# Initialize the UDP client with a network interface
def __init__(self, iface: str):
self.iface = iface # Store the network interface name
self._transport: Optional[DatagramTransport] = None # Will hold the transport for sending/receiving data
self._rcv_queue: asyncio.Queue = asyncio.Queue() # Queue to handle received data asynchronously
self.received_data = {} # Dictionary to store received data keyed by IPv6 address

# Static method to create and configure a socket for IPv6 UDP communication
@staticmethod
def _create_socket(iface: str) -> socket.socket:
# Create an IPv6 UDP socket
sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM)
# Allow address reuse
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set the TTL (Time To Live) for multicast packets to 1 (restrict to local network)
ttl = struct.pack("@i", 1)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl)
# Get the index of the network interface
interface_index = socket.if_nametoindex(iface)
# Set the interface to be used for outgoing multicast packets
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, interface_index)
return sock

# Start the UDP client and set up the transport layer
async def start(self):
loop = asyncio.get_running_loop()
# Create a datagram endpoint using the custom socket
self._transport, _ = await loop.create_datagram_endpoint(
protocol_factory=lambda: self,
sock=self._create_socket(self.iface),
)

# Send a broadcast packet to discover EV-Charging stations
async def send_broadcast(self):
# Predefined broadcast packet to send
BROADCAST_PACKET = bytes.fromhex("01fe9000000000021000")
SDP_MULTICAST_GROUP = 'ff02::1' # IPv6 multicast group for all nodes on the local network
UDP_PORT = 15118 # UDP port to send the packet to
# Send the packet via the transport layer
self._transport.sendto(BROADCAST_PACKET, (SDP_MULTICAST_GROUP, UDP_PORT))
print(f"\nSent broadcast packet to {SDP_MULTICAST_GROUP}:{UDP_PORT}")

# Handle received datagrams
def datagram_received(self, data: bytes, addr: Tuple[str, int, int, int]):
ipv6_address, port, flowinfo, scope_id = addr # Unpack the address information
data_hex = data.hex() # Convert the data to hexadecimal
self._rcv_queue.put_nowait((data, addr)) # Put the received data in the queue
self.received_data[ipv6_address] = data_hex # Store the data associated with the IPv6 address

# Handle errors that occur during transmission/reception
def error_received(self, exc):
print(f"Error received: {exc}")

# Handle connection loss events
def connection_lost(self, exc):
print(f"Connection closed: {exc}")

# Static method to parse the received response data
@staticmethod
def parse_response(data_hex):
data = bytes.fromhex(data_hex) # Convert the hexadecimal string back to bytes

if len(data) < 28: # Check if the data length is sufficient
raise ValueError("Data too short to contain all necessary fields")

# Extract fields from the received data
version = data[0]
msg_type = data[1]
msg_length = int.from_bytes(data[2:4], byteorder='big')
reserved = int.from_bytes(data[4:8], byteorder='big')
ip_address = ipaddress.IPv6Address(data[8:24]) # Extract IPv6 address
port = int.from_bytes(data[24:26], byteorder='big') # Extract port number
security = data[26]
protocol = data[27]

# Return a dictionary with the parsed fields
decoded_message = {
'Version': version,
'Message Type': msg_type,
'Message Length': msg_length,
'Reserved': reserved,
'SECC IP Address': str(ip_address),
'SECC Port': port,
'Security': security,
'Transport Protocol': protocol,
}

return decoded_message

# Define the Autosec module class for finding EV charging stations
class EVChargingStationFinder(AutosecModule):

# Initialize the module
def __init__(self):
super().__init__()

# Provide information about the module
def get_info(self):
return AutosecModuleInformation(
name="EVChargingStationFinder", # Name of the module
description="Module to find EV-Charging Stations using UDP broadcast.", # Description of the module
dependencies=[], # Dependencies (none in this case)
tags=["IP", "EV-Charging", "UDP", "Scan"], # Tags associated with the module
)

# Define the outputs produced by the module
def get_produced_outputs(self) -> List[InternetService]:
return [InternetService] # The module outputs a list of InternetService objects

# Define the resources required by the module
def get_required_ressources(self) -> List[AutosecRessource]:
return [InternetInterface] # The module requires an InternetInterface resource

# Run the exploit to find EV charging stations
async def run_exploit(self, iface: str) -> List[InternetService]:
loop = asyncio.get_running_loop()

# Create a socket to listen for UDP responses on port 15119
udp_listen_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
udp_listen_socket.bind(('::', 15119)) # Bind to all IPv6 addresses on port 15119
udp_listen_socket.setblocking(True) # Set the socket to blocking mode

# Create a UDP client and start it
udp_client = UDPClient(iface)
await udp_client.start()
await udp_client.send_broadcast() # Send the broadcast packet

processed_responses = set() # Set to keep track of processed responses

await asyncio.sleep(5) # Wait for 5 seconds to receive responses

services = []
# Process each received response
while not udp_client._rcv_queue.empty():
broadcast_data, addr = await udp_client._rcv_queue.get()
response_hex = broadcast_data.hex()
if response_hex in processed_responses: # Skip already processed responses
continue

processed_responses.add(response_hex)

try:
# Parse the response and create an InternetService object
response = UDPClient.parse_response(response_hex)
service = InternetService(
device=InternetDevice(
interface=InternetInterface(
interface=iface,
ipv4_address="", # Not used, as we are dealing with IPv6
subnet_length=64 # Assuming a default subnet length
),
ipv6=response['SECC IP Address'] # Assign the parsed IPv6 address
),
port=response['SECC Port'], # Assign the parsed port
service_name="EV-Charging Station" # Assign a service name
)
services.append(service) # Add the service to the list
except ValueError as e:
print(f"Failed to parse response: {e}") # Handle any errors during parsing

return services # Return the list of discovered services

# Run the module and return the discovered services
def run(self, inputs: List[AutosecRessource]) -> List[InternetService]:
interface = self.get_ressource(inputs, InternetInterface) # Get the network interface resource
iface = interface.get_interface_name() # Get the interface name
services = asyncio.run(self.run_exploit(iface)) # Run the exploit and get services

if services and len(services) > 0:
print(f"\nCharging Station Found: \n")
else:
print(f"\nNo Charging Station Found.\n")

return services # Return the discovered services

# Provide the module to the Autosec framework
def load_module():
return [EVChargingStationFinder()] # Return the module instance



'''
test.py
from autosec.core.ressources.ip import InternetInterface
from autosec.modules.ev_charging_station_discovery import EVChargingStationFinder
inet_iface = InternetInterface("eth0", ipv4_address="0.0.0.0", subnet_length=20)
results = EVChargingStationFinder().run([inet_iface])
for result in results:
secc_ip = result.get_device().get_ipv6()
tcp_port = result.get_port()
print(f"IP Address of the Charging Station: {secc_ip}")
print(f"\nTCP Port number of the Charging Station: {tcp_port}\n")
'''

13 changes: 12 additions & 1 deletion autosec/test/test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
''' Module description
Diese Modul ist nur zum testen von pylint in der CI chain gedacht'''

'''
def summe(erster_summand, zweiter_summand):
'''Function docstring'''
Function docstring
return erster_summand + zweiter_summand
'''

from autosec.core.ressources.ip import InternetDevice, InternetInterface
from autosec.modules.ev_charging_station_discovery import EVChargingStationFinder

inet_iface = InternetInterface("eth0", ipv4_address="172.27.106.161", subnet_length=20)
# inet_device = InternetDevice(interface=inet_iface, ipv6="fe80::215:5dff:fe9f:e97b")
results = EVChargingStationFinder().run([inet_iface])

print(results)
107 changes: 107 additions & 0 deletions documentation/modules/ev_charging_station_discovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# EV Charging Station Discovery Module

## Overview

This project contains a Python script designed to discover Electric Vehicle (EV) charging stations on a local network using UDP broadcast over IPv6. The discovery process is encapsulated in the `EVChargingStationFinder` class, which is part of the `autosec` framework. The `test.py` script demonstrates how to use this module to find and list EV charging stations on the specified network interface.

## test.py

```python

'''
Required modules
'''
from autosec.core.ressources.ip import InternetInterface
from autosec.modules.ev_charging_station_discovery import EVChargingStationFinder

'''
The following is required to specify the interface, The ipv4 address and the subnet length are not required.
'''
inet_iface = InternetInterface("eth0", ipv4_address="0.0.0.0", subnet_length=20)

'''
The main function that should need to run
'''
results = EVChargingStationFinder().run([inet_iface])

'''
The following are the results that should be shown as outcomes from the ev_charging_station_discovery.py script
'''
for result in results:
secc_ip = result.get_device().get_ipv6()
tcp_port = result.get_port()
print(f"IP Address of the Charging Station: {secc_ip}")
print(f"\nTCP Port number of the Charging Station: {tcp_port}\n")

```

## Prerequisites

Before running the script, ensure you have the required modules installed and correctly configured. The script depends on the `autosec` framework, specifically the `InternetInterface` class and the `EVChargingStationFinder` module.

## Project Structure

- **ev_charging_station_discovery.py**: Contains the core logic for discovering EV charging stations.
- **test.py**: A test script that demonstrates how to use the discovery module to find and display EV charging stations.

## How It Works

### 1. Setup the Network Interface

The `test.py` script starts by defining the network interface that will be used to broadcast the UDP packet:

```python
inet_iface = InternetInterface("eth0", ipv4_address="0.0.0.0", subnet_length=20)
```

- **Interface Name**: `"eth0"` is specified as the network interface to be used. You can change this to another interface (e.g., `"wlan0"` for Wi-Fi).
- **IPv4 Address & Subnet Length**: The IPv4 address and subnet length are not necessary for the discovery process and can be set to defaults.

### 2. Discovering EV Charging Stations

The main functionality of the script is to use the `EVChargingStationFinder` module:

```python
results = EVChargingStationFinder().run([inet_iface])
```

- **Module Initialization**: The `EVChargingStationFinder` is initialized and executed. The `run` method triggers the discovery process on the provided network interface.
- **UDP Broadcast**: The script sends a specially crafted UDP broadcast packet to the local network, targeting all devices in the `ff02::1` multicast group (all IPv6 nodes on the local link).
- **Response Handling**: The module listens for responses from EV charging stations and parses the data to extract relevant information (such as the station's IPv6 address and TCP port).

### 3. Displaying Results

After running the discovery module, the script iterates over the discovered services and prints out the results:

```python
for result in results:
secc_ip = result.get_device().get_ipv6()
tcp_port = result.get_port()
print(f"IP Address of the Charging Station: {secc_ip}")
print(f"\nTCP Port number of the Charging Station: {tcp_port}\n")
```

- **Output**: For each discovered EV charging station, the script outputs the IPv6 address and TCP port, which are crucial for establishing further communication with the station.

### Example Output

When you run the script, you might see an output like this (depending on the network setup and availability of EV charging stations):

```
IP Address of the Charging Station: fe80::abcd:1234:5678:9abc
TCP Port number of the Charging Station: 45868
```

## Running the Script

To run the `test.py` script, follow these steps:

1. **Ensure dependencies are installed**: Ensure that the `autosec` framework and all necessary modules are installed in your environment.

2. **Modify the network interface**: If needed, change the network interface name in `test.py` to match your environment (e.g., replace `"eth0"` with your actual interface name).

3. **Execute the script**: Run the `test.py` script:

```bash
python test.py
```
16 changes: 10 additions & 6 deletions test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from autosec.core.ressources.ip import InternetInterface, InternetDevice
from autosec.modules.port_scan import PortService
from autosec.core.ressources.ip import InternetInterface
from autosec.modules.ev_charging_station_discovery import EVChargingStationFinder

iface = InternetInterface("lo")
device = InternetDevice(iface, "127.0.0.1")
result = PortService().run([iface, device])
inet_iface = InternetInterface("eth0", ipv4_address="0.0.0.0", subnet_length=20)

print(result)
results = EVChargingStationFinder().run([inet_iface])

for result in results:
secc_ip = result.get_device().get_ipv6()
tcp_port = result.get_port()
print(f"IP Address of the Charging Station: {secc_ip}")
print(f"\nTCP Port number of the Charging Station: {tcp_port}\n")

0 comments on commit 452d237

Please sign in to comment.