#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
ROS Scanner module.
:author Alias Robotics SL (https://aliasrobotics.com)
"""
import asyncio
import aiohttp
import logging
import re
from aiohttp_xmlrpc.client import ServerProxy
from aztarna.ros.commons import CommunicationROS
from aztarna.commons import RobotAdapter
from aztarna.ros.helpers import HelpersROS
from aztarna.ros.ros.helpers import Node, Topic, Service
from aztarna.ros.ros.helpers import ROSHost
import sys
from ipaddress import IPv4Address
[docs]class ROSScanner(RobotAdapter):
"""
ROSScanner class, an extension of BaseScanner for ROS.
"""
def __init__(self):
super().__init__()
self.timeout = aiohttp.ClientTimeout(total=3)
self.hosts = []
self.logger = logging.getLogger(__name__)
[docs] async def analyze_nodes(self, address, port):
"""
Scan a node and gather all its data including topics, services and Communications.
:param address: address of the ROS master
:param port: port of the ROS master
"""
async with aiohttp.ClientSession(loop=asyncio.get_event_loop(), timeout=self.timeout) as client:
full_host = 'http://' + str(address) + ':' + str(port)
ros_master_client = ServerProxy(full_host, loop=asyncio.get_event_loop(), client=client)
ros_host = ROSHost(address, port)
async with self.semaphore:
try:
code, msg, val = await ros_master_client.getSystemState('')
if code == 1:
self.hosts.append(ros_host)
if self.extended:
publishers_array = val[0]
subscribers_array = val[1]
services_array = val[2]
found_topics = await self.analyze_topic_types(ros_master_client) # In order to analyze the nodes topics are needed
self.extract_nodes(publishers_array, found_topics, 'pub', ros_host)
self.extract_nodes(subscribers_array, found_topics, 'sub', ros_host)
self.extract_services(services_array, ros_host)
for topic_name, topic_type in found_topics.items(): # key, value
current_topic = Topic(topic_name, topic_type)
comm = CommunicationROS(current_topic)
for node in ros_host.nodes:
if next((x for x in node.published_topics if x.name == current_topic.name), None) is not None:
comm.publishers.append(node)
if next((x for x in node.subscribed_topics if x.name == current_topic.name), None) is not None:
comm.subscribers.append(node)
ros_host.communications.append(comm)
await self.set_xmlrpcuri_node(ros_master_client, ros_host)
await client.close()
self.logger.warning('[+] ROS Host found at {}:{}'.format(ros_host.address, ros_host.port))
else:
self.logger.critical('[-] Error getting system state. Probably not a ROS Master')
except Exception as e:
# self.logger.error('[-] Error connecting to host ' + str(ros_host.address) + ':' + str(ros_host.port) + ' -> '+str(e) + '\n\tNot a ROS host')
pass # do not log anything to ensure a clean output
[docs] @staticmethod
def get_create_node(node_name, host):
"""
Generate new :class:`aztarna.ros.helpers.Node` objects, and if they exist just return them.
:param node_name: The name of the node to create or return
:param host: Current ROS host
:return: The newly created node or an existing that matches :attr:`node_name`
"""
node_name_attrs = [o.name for o in host.nodes]
if node_name not in node_name_attrs:
ret_node = Node(node_name)
host.nodes.append(ret_node)
else:
ret_node = next((x for x in host.nodes if x.name == node_name), None)
return ret_node
[docs] async def set_xmlrpcuri_node(self, ros_master_client, host):
"""
Once all node data is collected, set the xml.
:param ros_master_client: xml-rpc object for the ROS Master Client
"""
for node in host.nodes:
uri = await ros_master_client.lookupNode('', node.name)
if uri[2] != '':
regexp = re.compile(r'http://(?P<host>\S+):(?P<port>[0-9]{1,5})')
uri_groups = regexp.search(uri[2])
node.address = uri_groups.group('host')
node.port = uri_groups.group('port')
[docs] @staticmethod
async def analyze_topic_types(ros_master_client):
"""
Extract topic from ROS Master and disassemble them into topic name and topic type.
:param ros_master_client: xml-rpc object for the ROS Master Client
:return: A dictionary of topics. Key is the topic name and value the topic type
"""
topic_types = await ros_master_client.getTopicTypes('')
topics = {}
for topic_type_element in topic_types[2]:
topic_name = topic_type_element[0]
topic_type = topic_type_element[1]
topics[topic_name] = topic_type
return topics
[docs] async def scan_network(self):
"""
Scan the provided network (from args) searching for ROS nodes.
"""
try:
results = []
for port in self.ports:
for address in self.host_list:
results.append(self.analyze_nodes(address, port))
for result in await asyncio.gather(*results):
pass
except ValueError as e:
self.logger.error('Invalid address entered')
raise e
[docs] def scan(self):
"""
Call to :meth:`aztarna.ros.scanner.scan_network` asynchronously
"""
asyncio.get_event_loop().run_until_complete(self.scan_network())
[docs] async def scan_pipe(self):
async for line in RobotAdapter.stream_as_generator(asyncio.get_event_loop(), sys.stdin):
str_line = (line.decode()).rstrip('\n')
for port in self.ports:
await self.analyze_nodes(str_line, port)
[docs] def scan_pipe_main(self):
asyncio.get_event_loop().run_until_complete(self.scan_pipe())
[docs] def print_results(self):
"""
Print the information of a ROS system.
"""
for host in self.hosts:
for node in host.nodes:
print('\nNode: ' + str(node))
print('\n\t Published topics:')
for topic in node.published_topics:
print('\t\t * ' + str(topic))
print('\n\t Subscribed topics:')
for topic in node.subscribed_topics:
print('\t\t * ' + str(topic))
print('\n\t Services:')
for service in node.services:
print('\t\t * ' + str(service))
print('\nCommunications: ')
for i in range(len(host.communications)):
comm = host.communications[i]
print('\n\t Communication ' + str(i) + ':')
print('\t\t - Publishers:')
for node in comm.publishers:
print('\t\t\t' + str(node))
print('\t\t - Topic: ' + str(comm.topic))
print('\t\t - Subscribers:')
for node in comm.subscribers:
print('\t\t\t' + str(node))
print('\n\n')
[docs] def write_to_file(self, out_file):
"""
Write the information of a ROS system into the provided file.
:param out_file: The file where to write the results
"""
lines = []
header = 'Master Address;Master port;Node Name;Node Address;Port;Published Topics;Subscribed Topics;Services\n'
lines.append(header)
for host in self.hosts:
line = '{};{};;;;;;\n'.format(host.address, host.port)
if len(host.nodes) > 0:
for node in host.nodes:
for ptopic in node.published_topics:
for stopic in node.subscribed_topics:
for service in node.services:
line = '{};{};{};{};{};{};{};{}\n'.format(host.address, host.port, node.name, node.address, node.port, ptopic,
stopic, service)
lines.append(line)
with open(out_file, 'w') as file:
file.writelines(lines)