#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SROS Scanner helpers classes module.
:author Alias Robotics SL (https://aliasrobotics.com)
"""
import asyncio
from scapy.all import *
from scapy.layers.tls.extensions import TLS_Ext_SupportedGroups, TLS_Ext_SupportedPointFormat, \
TLS_Ext_SignatureAlgorithms, TLS_Ext_Heartbeat, TLS_Ext_Padding
from scapy.layers.tls.handshake import TLSClientHello
from scapy.layers.tls.record import TLS
from aztarna.ros.commons import BaseHostROS, BaseNodeROS
load_layer('tls')
logger = logging.getLogger(__name__)
[docs]class SROSNode(BaseNodeROS):
"""
Class for keeping all the attributes of a SROS Node. Extends :class:`aztarna.commons.BaseNodeROS`
"""
def __init__(self):
super().__init__()
self.is_demo = False
self.policies = []
def __repr__(self):
return "Name: {}, Address: {}, Port: {}, Demo: {}, Policies: {}".format(self.name, self.address,
self.port, self.is_demo, self.policies)
[docs]class SROSHost(BaseHostROS):
"""
Class for keeping all the attributes of a SROS Node.Extends:class:`aztarna.commons.BaseHostROS`
"""
def __init__(self):
super().__init__()
def __repr__(self):
return "Address: {}, Nodes: {}".format(self.address, self.nodes)
[docs]class SROSPolicy:
"""
Class for representing a SROS Policy, containing the possible types for it, the value and its permissions.
"""
TYPE_SUBSCRIPTABLE_TOPICS = 'Subscriptable topics'
TYPE_PUBLISHABLE_TOPICS = 'Publishable topics'
TYPE_EXECUTABLE_SVCS = 'Executable services'
TYPE_READABLE_PARAMS = 'Readable parameters'
TYPE_UNKNOWN = 'Unknown'
POLICY_ALLOWED = True
POLICY_DENIED = False
def __init__(self):
self.type = SROSPolicy.TYPE_UNKNOWN
self.values = []
self.permissions = SROSPolicy.POLICY_DENIED
def __repr__(self):
return 'Type: {}, Values: {}, Permission: {}'.format(self.type, self.values, self.permissions)
[docs]def get_node_info(cert):
"""
Extract all the information for a node, based on it's certificate.
:param cert: The input certificate in X509 format.
:return: :class:`aztarna.sros.helpers.SROSNode` The extracted node info from the certificate.
"""
node = SROSNode()
ros_demo_fields = {'stateOrProvinceName': 'Sate',
'organizationName': 'Organization',
'countryName': 'ZZ',
'organizationUnitName': 'Organizational Unit',
'commonName': 'master',
'localityName': 'Locality'
}
try:
node.name = cert.subject['commonName']
if cert.issuer == ros_demo_fields:
node.is_demo = True
except Exception:
logger.warning('\t\tException when obtaining node info')
return None
else:
return node
[docs]def get_policies(cert):
"""
Get the related policies from an input SROS Node certificate.
:param cert: Certificate in X509 format.
:return: List containing all policies as instances of :class:`aztarna.sros.helpers.SROSPolicy`
"""
policies = []
try:
extensions = cert.tbsCertificate.extensions
policies_extension = list(filter(lambda ext: ext.extnID.val == '2.5.29.32', extensions))[0]
for cert_policy in policies_extension.extnValue.certificatePolicies:
id = cert_policy.policyIdentifier.val
policy = SROSPolicy()
if id[-3] == '1':
policy.type = SROSPolicy.TYPE_SUBSCRIPTABLE_TOPICS
elif id[-3] == '2':
policy.type = SROSPolicy.TYPE_PUBLISHABLE_TOPICS
elif id[-3] in ['3', '6']:
policy.type = SROSPolicy.TYPE_UNKNOWN
elif id[-3] == '4':
policy.type = SROSPolicy.TYPE_EXECUTABLE_SVCS
elif id[-3] == '5':
policy.type = SROSPolicy.TYPE_READABLE_PARAMS
if id[-1] == '1':
policy.permission = SROSPolicy.POLICY_ALLOWED
else:
policy.permission = SROSPolicy.POLICY_DENIED
for qualifier in cert_policy.policyQualifiers:
policy.values.append(qualifier.qualifier.val)
policies.append(policy)
except Exception:
logger.warning('\t\tException when capturing the certificate policies')
return policies
[docs]async def get_sros_certificate(address, port, timeout=3):
"""
Function to connect to a SROS Node, simulate the TLS handshake, and get it's server certificate on the process.
:param address: Address of the node.
:param port: Port of the node.
:param timeout: Timeout for the connection.
:return: A tuple containing the address, port and certificate if found, otherwise,
a tuple containing address, port and None.
"""
client_hello = TLS(version='TLS 1.0', msg=TLSClientHello(
ciphers=[49200, 49196, 49202, 49198, 49199, 49195, 49201, 49197, 165, 163, 161, 159, 164, 162, 160, 158, 49192,
49188, 49172, 49162, 49194, 49190, 49167, 49157, 107, 106, 105, 104, 57, 56, 55, 54, 49191, 49187,
49171, 49161, 49193, 49189, 49166, 49156, 103, 64, 63, 62, 51, 50, 49, 48, 136, 135, 134, 133, 69, 68,
67, 66, 49170, 49160, 49165, 49155, 22, 19, 16, 13, 157, 156, 61, 53, 60, 47, 132, 65, 10, 255],
comp=[0],
gmt_unix_time=12345566,
ext=[TLS_Ext_SupportedGroups(groups=[23, 25, 28, 27, 24, 26, 22, 14, 13, 11, 12, 9, 10]),
TLS_Ext_SupportedPointFormat(ecpl=[0, 1, 2]),
TLS_Ext_SignatureAlgorithms(
sig_algs=[1537, 1538, 1539, 1281, 1282, 1283, 1025, 1026, 1027, 769, 770, 771, 513, 514, 515]),
TLS_Ext_Heartbeat(heartbeat_mode=1),
TLS_Ext_Padding(padding=212 * b'\x00')]))
tls_header = b'\x16\x03\x03'
server_hello_done = b'\x0e\x00\x00\x00'
received_data = b''
is_sros = True
writer = None
try:
conn = asyncio.open_connection(str(address), port, loop=asyncio.get_event_loop())
reader, writer = await asyncio.wait_for(conn, timeout=timeout)
writer.write(bytes(client_hello))
await writer.drain()
while received_data[-4:] != server_hello_done:
received_data += await asyncio.wait_for(reader.read(1024), timeout=3)
if received_data[:3] != tls_header:
is_sros = False
break
except Exception as e:
logger.error('[-] Error connecting to host ' + str(address) + ': ' + str(e) + '\n\tNot a SROS host')
return address, port, None
else:
if is_sros:
server_hello = TLS(received_data)
cert = server_hello.payload.msg[0].certs[0][1]
logger.warning('[+] SROS host found!!!')
return address, port, cert
finally:
if writer:
writer.close()
if sys.version_info >= (3, 7, 0):
await writer.wait_closed()
return address, port, None
[docs]async def check_port(ip, port):
"""
Check if a port is open.
:param ip: Address of the host
:param port: Port to check
:return: Returns the port if it is open. None otherwise.
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(0.1)
await asyncio.wait_for(asyncio.get_event_loop().sock_connect(sock, (str(ip), port,)), timeout=0.1)
logger.warning('Scanning host {}:{}'.format(ip, port))
return port
except Exception as e:
pass
finally:
try:
sock.close()
except:
pass
[docs]async def check_port_sem(sem, ip, port):
"""
Check ports from a host, limiting concurrency with a semaphore.
:param sem: Asyncio semaphore.
:param ip: Address of the host.
:param port: Port to be checked.
:return: The result of :meth:`aztarna.sros.helpers.check_port`.
"""
async with sem:
return await check_port(ip, port)
[docs]async def find_node_ports(address, ports):
"""
Find all the open ports for a host.
:param address: IP address of the host.
:param ports: Port to check.
:return: A list of the found open ports.
"""
sem = asyncio.Semaphore(400) # Change this value for concurrency limitation
tasks = [asyncio.ensure_future(check_port_sem(sem, address, p)) for p in ports]
found_ports = []
for response in await asyncio.gather(*tasks):
if response:
found_ports.append(response)
return found_ports