Source code for aztarna.helpers

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
from platform import system as system_name  # Returns the system/OS name
from subprocess import call as system_call  # Execute a shell command

[docs]class HelpersLINQ: """ A helper class for emulating .NET useful methods. """
[docs] @staticmethod def distinct(sequence): seen = set() for s in sequence: if not s in seen: seen.add(s) yield s
[docs]class HelpersNetWorking: """ A helper class that checks networking related data """
[docs] @staticmethod def ping(host): """ A method that replicates the command line ping utility. :param host: Host to ping to :return: A boolean type that means if the ping reaches the destination or not """ ret = None # Ping command count option as function of OS param = '-n' if system_name().lower() == 'windows' else '-c' # Building the command. Ex: "ping -c 1 google.com" command = ['ping', param, '1', host] with open("/dev/null", "w+") as f: ret = system_call(command, stdout=f) == 0 return ret
[docs]class PortScanner: """ A base class that provides methods to check correct por scans. """
[docs] @staticmethod async def check_port(ip, port): """ Checks if a certain port is open :param ip: The host's IP address :param port: The host's port :return: The scanned port if open, else None """ conn = asyncio.open_connection(ip, port) try: reader, writer = await asyncio.wait_for(conn, timeout=3) writer.close() return port except: return None
[docs] @staticmethod async def check_port_sem(sem, ip, port): """ Calls to :method:check_port with a Semaphore to avoid too many open connections. :param sem: :param ip: :param port: :return: """ async with sem: return await PortScanner.check_port(ip, port)
[docs] @staticmethod async def scan_host(address, start_port, end_port, max_conns=400): """ :param address: IPv4 address to scan :param start_port: First port value to scan :param end_port: Last port value to scan :param max_conns: Maximum simultaneous number of connections :return: """ sem = asyncio.Semaphore(max_conns) ports = range(start_port, end_port) tasks = [asyncio.ensure_future(PortScanner.check_port_sem(sem, address, port)) for port in ports] responses = await asyncio.gather(*tasks) open_ports = list(filter(lambda x: x is not None, responses)) return open_ports