Actual combat | Python write port scanner

**Introduction: **

This study note will record the learning route of using python to write Scan, record the entire python scanner writing process, record from the first line of code to the latest version, and explain in detail the technology used in each version update

Version 1.0 (socket library)

Use socket library for port scanning:

Update log:

Call the library in the socket to scan the target and count the open status of the target port

#! /usr/bin/python
# - *- coding: UTF-8-*-import sys

from socket import*
# import socket

# Port scanning module
def portScan(ip,portStart,portEnd):
 open_ports=[]for port inrange(int(portStart),int(portEnd)+1):
 # Show scan percentage
 percent =float(port)*100/float(int(portEnd))
 sys.stdout.write("%.2f"%percent)
 sys.stdout.write("%\r")
 sys.stdout.flush()
 # Send data, try to establish a connection
 sock =socket(AF_INET, SOCK_STREAM)
 # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 sock.settimeout(10)
 result = sock.connect_ex((ip,port))if result ==0:
  open_ports.append(port)
 pass
 print open_ports
 pass

# Get ip and port scan range
def main():
 ip = sys.argv[1]
 port = sys.argv[2].split("-")
 portStart = port[0]
 portEnd = port[1]portScan(ip,portStart,portEnd)if __name__ =='__main__':main()

Version 1.1 (Threadpool multithreading)

Use Threadpool for multi-threaded port scanning:

Update log:

Call the Threadpool module in python, set the port of multi-thread and multi-target to scan, increase the efficiency of scanning

#! /usr/bin/python
# - *- coding: UTF-8-*-import socket
import sys
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool

remote_server = sys.argv[1]
targetport = sys.argv[2].split("-")
startPort = targetport[0]
endPort = targetport[1]
remote_server_ip = socket.gethostbyname(remote_server)
ports =[]

print '-'*60
print 'Targeting:', remote_server_ip +'To scan'
print '-'*60

socket.setdefaulttimeout(0.5)

def scan_port(port):try:
  s = socket.socket(2,1)
  res = s.connect_ex((remote_server_ip,port))if res ==0: #If the port is open, send hello to get the banner
   print 'Port {}: OPEN'.format(port)
  s.close()
 except Exception,e:
  print str(e.message)for i inrange(int(startPort),int(endPort)+1):
 ports.append(i)

# Check what time the scan started
t1 = datetime.now()

# Create thread
pool =ThreadPool(processes =32) #Set the number of threads
results = pool.map(scan_port,ports) #Set the name of the function that needs to use multithreading, and pass the set of parameters. The function will pass the set of passed parameters to the function in sections.
pool.close()
pool.join()

print 'When this port scan is shared', datetime.now()- t1

Demo:

Version 1.2 (optparse library)

Use optparse to parse the commands of the python process

Update log:

Call python's optparse library to specify parameter names using "–host" and other methods in the process of running the script

#! /usr/bin/python
# - *- coding: UTF-8-*-import optparse
import socket
import sys
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool

print "------------------------------------------------------------------------------------------"
print "|             ___              _   _                            _                        |"
print "|   ___ ___  / _ \  ___  _ __ | |_| |_ _   _   _ __   ___  _ __| |_ ___  ___ __ _ _ __   |"
print "|  / __/ _ \| | | |/ _ \| '_ \| __| __| | | | | '_ \ / _ \| '__| __/ __|/ __/ _` | '_ \  |"
print "| | (_| (_) | |_| | (_) | | | | |_| |_| |_| | | |_) | (_) | |  | |_\__ \ (_| (_| | | | | |"
print "|  \___\___/ \___/ \___/|_| |_|\__|\__|\__, | | .__/ \___/|_|   \__|___/\___\__,_|_| |_| |"
print "|                                      |___/  |_|                                        |"
print "------------------------------------------------------------------------------------------"

parse=optparse.OptionParser(usage='python portscan.py -H 127.0.0.1 -P 60,90 -T 32',version="co0ontty portscan version:1.2")
parse.add_option('-H','--Host',dest='host',action='store',type=str,metavar='host',help='Enter Host!!')
parse.add_option('-P','--Port',dest='port',type=str,metavar='port',default='1,10000',help='Enter Port!!')
parse.add_option('-T','--Thread',dest='thread',type=int,metavar='thread')
parse.set_defaults(thread=32)  
options,args=parse.parse_args()

# optparse.OptionParser usage=''Introduce how to use
# dest='host',Pass parameters to a variable named host
# type='str',The type of parameters passed
# metavar='host',The name after the parameter in help
# help='', The statement in help
# parse.set_defaults(thread=32)Another way to set parameter default values
# When you have defined all the command line parameters, we need to call parse_args()Method add_option()Parameters passed by the function in turn: options,args=parse.parse_args()

portList = options.port.split(",")
startPort = portList[0]
endPort = portList[1]
remote_server_ip = socket.gethostbyname(options.host)
# remote_server_info = socket.gethostbyname_ex(host)
ports =[]
openPort =[]

print 'Targeting:'+remote_server_ip +'get on'+str(options.thread)+'Thread scan'
socket.setdefaulttimeout(0.5)

def scan_port(port):try:
  s = socket.socket(2,1)
  res = s.connect_ex((remote_server_ip,port))if res ==0: 
   openPort.append(port)
  s.close()
 except Exception,e:
  print str(e.message)for i inrange(int(startPort),int(endPort)+1):
 ports.append(i)

# Scan start
t1 = datetime.now()
# Create thread
pool =ThreadPool(processes =int(options.thread))
results = pool.map(scan_port,ports)
pool.close()
pool.join()

print openPort
print 'When this port scan is shared', datetime.now()- t1

Version 1.3 (gethostbyname_ex)

Use the gethostbyname_ex function to get the target's domain name, ip and other information

Update log:

1、 Use the gethostbyname_ex function to resolve the entered domain name

2、 Use the -D parameter to pass the domain name, the scanner will perform port scanning on the ip related to the domain name

3、 Modularized the code

#! /usr/bin/python
# - *- coding: UTF-8-*-import socket,sys,optparse
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool

print "------------------------------------------------------------------------------------------"
print "|             ___              _   _                            _                        |"
print "|   ___ ___  / _ \  ___  _ __ | |_| |_ _   _   _ __   ___  _ __| |_ ___  ___ __ _ _ __   |"
print "|  / __/ _ \| | | |/ _ \| '_ \| __| __| | | | | '_ \ / _ \| '__| __/ __|/ __/ _` | '_ \  |"
print "| | (_| (_) | |_| | (_) | | | | |_| |_| |_| | | |_) | (_) | |  | |_\__ \ (_| (_| | | | | |"
print "|  \___\___/ \___/ \___/|_| |_|\__|\__|\__, | | .__/ \___/|_|   \__|___/\___\__,_|_| |_| |"
print "|                                      |___/  |_|                                        |"
print "|                             Blog: https://co0ontty.github.io                           |"
print "------------------------------------------------------------------------------------------"

def Ip_scan_port(port):
 socket.setdefaulttimeout(0.5)
 remote_server_ip = socket.gethostbyname(Ip_target)try:
 s = socket.socket(2,1)
 res = s.connect_ex((remote_server_ip,port))if res ==0: 
  openPort.append(port)
 s.close()
 except Exception,e:
 print str(e.message)
def Domain_scan_port(port):
 socket.setdefaulttimeout(0.5)for remote_server_ip in Ip_from_domain:try:
  s = socket.socket(2,1)
  res = s.connect_ex((remote_server_ip,port))if res ==0:
  # Domain_res =str(remote_server_ip)+":"+str(port)
  Domain_result.append(str(remote_server_ip)+":"+str(port))
  s.close()
  pass
 except Exception as e:
  print str(e.message)    

def moreInfo(domainName):
 global Ip_from_domain
 Ip_from_domain =[]
 domainNames = socket.gethostbyname_ex(domainName)
 print "[+]Start domain Scan"for x in domainNames:iftype(x)== list:for i in x:
  print "Find : "+str(i)+"\n"+" IP :"+str(socket.gethostbyname(i))
  Ip_from_domain.append(socket.gethostbyname(i))else:
  Ip_from_domain.append(socket.gethostbyname(x))
 Ip_from_domain =list(set(Ip_from_domain)) #Deduplication
 start_domain_pool()

def start_IP_Pool():
 pool =ThreadPool(processes =int(thread))
 results = pool.map(Ip_scan_port,ports)
 pool.close()
 pool.join()
 print openPort

def start_domain_pool():
 print "[+] Start portscan on those IP from "+str(startPort)+" to "+str(endPort)
 pool =ThreadPool(processes =int(thread))
 results = pool.map(Domain_scan_port,ports)
 pool.close()
 pool.join()for x in Domain_result:
 print "Find open port :"+str(x)
 pass

def main():
 parse=optparse.OptionParser(usage='python portscan.py -H 127.0.0.1 -P 60,90 -T 32 or python portscan.py -D www.baidu.com -P 60,90 -T 32 ',version="co0ontty portscan version:1.0")
 parse.add_option('-H','--Host',dest='host',action='store',type=str,default="0")
 parse.add_option('-P','--Port',dest='port',type=str,default='1,10000')
 parse.add_option('-T','--Thread',dest='thread',type=int)
 parse.add_option('-D','--Domain',dest='domainName',type=str,default="0")
 parse.set_defaults(thread=32)  
 options,args=parse.parse_args()

 global remote_server_ip,openPort,domainName,Ip_target,thread,openPort,ports,Domain_result,startPort,endPort
 Ip_target = options.host
 domainName = options.domainName
 portList = options.port.split(",")
 thread = options.thread
 startPort = portList[0]
 endPort = portList[1]
 ports =[]
 openPort =[]
 Domain_result =[]for i inrange(int(startPort),int(endPort)+1):
  ports.append(i)if domainName =="0":
 print "[+]port scan :"+Ip_target
 start_IP_Pool()else:moreInfo(domainName)
 pass
if __name__ =='__main__':main()

Original contributing authors: co0ontty

Author's blog: co0ontty.github.io

Recommended Posts

Actual combat | Python write port scanner
Python write Tetris
Write gui in python
Python open read and write
Python multi-threaded port scanning tool
Python write breakpoint download software