Source code for aztarna.cmd

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import logging
import re
from argparse import ArgumentParser
import argcomplete
import uvloop

from aztarna.ros.industrial.scanner import ROSIndustrialScanner
from aztarna.industrialrouters.scanner import IndustrialRouterAdapter

# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

logging.getLogger(__name__).setLevel(logging.DEBUG)

[docs]def main(): """ Main method """ logging.basicConfig(level=logging.INFO, format="%(name)s - %(message)s") logger = logging.getLogger(__name__) parser = ArgumentParser(description='Aztarna, a reconnaissance tool for robots and robot components.') parser.add_argument('-t', '--type', help='<ROS/ros/SROS/sros/ROS2/ros2/IROUTERS/irouters> Scan ROS, SROS, ROS2 hosts or Industrial routers', required=True) parser.add_argument('-a', '--address', help='Single address or network range to scan.') parser.add_argument('-p', '--ports', help='Ports to scan (format: 13311 or 11111-11155 or 1,2,3,4)', default='11311') parser.add_argument('-i', '--input_file', help='Input file of addresses to use for scanning') parser.add_argument('-o', '--out_file', help='Output file for the results') parser.add_argument('-e', '--extended', help='Extended scan of the hosts', action='store_true') parser.add_argument('-r', '--rate', help='Maximum simultaneous network connections', default=100, type=int) parser.add_argument('-d', '--domain', help='ROS 2 DOMAIN ID (ROS_DOMAIN_ID environmental variable). Only applies to ROS 2.', type=int) parser.add_argument('--daemon', help='Use rclpy daemon (coming from ros2cli).', action='store_true') parser.add_argument('--hidden', help='Show hidden ROS 2 nodes. By default filtering _ros2cli*', action='store_true') parser.add_argument('--shodan', help='Use shodan for the scan types that support it.', action='store_true') parser.add_argument('--api-key', help='Shodan API Key') parser.add_argument('--verbose', help='Verbose output') parser.add_argument('--passive', help='Passive search for ROS2', action='store_true') argcomplete.autocomplete(parser) args = parser.parse_args() try: if args.type == 'ROS' or args.type == 'ros': from aztarna.ros.ros import ROSScanner scanner = ROSScanner() elif args.type == 'SROS' or args.type == 'sros': from aztarna.ros.sros import SROSScanner scanner = SROSScanner() elif args.type == 'IROUTERS' or args.type == 'irouters': scanner = IndustrialRouterAdapter() if args.shodan is True: scanner.use_shodan = True scanner.shodan_api_key = args.api_key scanner.initialize_shodan() elif args.type.upper() == 'ROSIN': scanner = ROSIndustrialScanner() elif args.type.upper() == 'ROS2': from aztarna.ros.ros2.scanner import ROS2Scanner scanner = ROS2Scanner() else: logger.critical('Invalid type selected') return if args.input_file: try: scanner.load_from_file(args.input_file) except FileNotFoundError: logger.critical('Input file not found') elif args.address: scanner.load_range(args.address) else: if args.type.upper() not in ['ROS2']: scanner.scan_pipe_main() return # TODO Implement a regex for port argument try: scanner.ports = range(int(args.ports.split('-')[0]), int(args.ports.split('-')[1])) except: try: scanner.ports = [int(port) for port in args.ports.split(',')] except: try: scanner.ports.append(int(args.ports)) except Exception as e: logger.error('[-] Error: ' + str(e)) scanner.extended = args.extended scanner.rate = args.rate scanner.domain = args.domain if args.daemon is True: scanner.use_daemon = True if args.hidden is True: scanner.hidden = True if args.passive is True: scanner.passive = True scanner.scan() if args.out_file: scanner.write_to_file(args.out_file) else: if args.extended is True: scanner.print_results() except Exception as e: logger.critical('Exception occurred during execution') raise e
if __name__ == '__main__': main()