Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • CVE-Shodan-Update-5-2
  • Pass-Email-Breach
  • Pass_strength
  • Shodan
  • breach-module
  • breach-module-2-23
  • main
  • master
  • module/breachv2
9 results

Target

Select target project
  • data-guardian/senior-project
1 result
Select Git revision
  • CVE-Shodan-Update-5-2
  • Pass-Email-Breach
  • Pass_strength
  • Shodan
  • breach-module
  • breach-module-2-23
  • main
  • master
  • module/breachv2
9 results
Show changes
Commits on Source (2)
Showing
with 450 additions and 2 deletions
import requests
import netifaces
import ipaddress
import threading
import itertools
import time
import sys
import nmap
def list_network_interfaces():
"""
List all network interfaces and their associated IP addresses.
:return: Dictionary of interfaces with their IP addresses and subnet masks.
"""
interfaces = {}
for interface in netifaces.interfaces():
addrs = netifaces.ifaddresses(interface)
if netifaces.AF_INET in addrs:
ipv4_info = addrs[netifaces.AF_INET][0]
ip = ipv4_info.get('addr')
netmask = ipv4_info.get('netmask')
if ip and netmask:
network = ipaddress.ip_network(f"{ip}/{netmask}", strict=False)
interfaces[interface] = f"{network}"
return interfaces
def get_user_selected_interface(interfaces):
"""
Prompt user to select an interface from a list.
:param interfaces: Dictionary of interfaces and their IP ranges.
:return: CIDR notation of the selected network range.
"""
print("Available network interfaces and their IP ranges:")
for idx, (interface, ip_range) in enumerate(interfaces.items(), start=1):
print(f"{idx}. Interface: {interface} - IP Range: {ip_range}")
while True:
choice = input("Enter the number of the interface you want to use: ")
if choice.isdigit() and 1 <= int(choice) <= len(interfaces):
selected_interface = list(interfaces.keys())[int(choice) - 1]
return interfaces[selected_interface]
else:
print("Invalid selection, please try again.")
done = False
def spinner():
spinner_chars = itertools.cycle(['-', '/', '|', '\\'])
while not done:
sys.stdout.write(next(spinner_chars) + '\r') # write the next character and return to start of line
sys.stdout.flush() # flush stdout buffer
time.sleep(0.1)
sys.stdout.write('Done scanning!\n')
def perform_scan(target, options="-sV"):
global done
thread = threading.Thread(target=spinner)
thread.start()
scanner = nmap.PortScanner()
try:
scanner.scan(hosts=target, arguments=options)
print(f"Scan on {target} completed.")
except Exception as e:
print(f"Scanning failed: {e}")
finally:
done = True
thread.join() # Wait for the spinner to finish
hosts_list = list(scanner.all_hosts())
scan_results = []
for host in hosts_list:
if scanner[host].state() == "up":
host_data = {
"ip_address": host,
"hostname": scanner[host].hostname() or "None",
"mac_address": scanner[host]['addresses'].get('mac') or "None",
"services": []
}
for proto in scanner[host].all_protocols():
for port in scanner[host][proto].keys():
service = scanner[host][proto][port]
service_data = {
"port": port,
"name": service.get('name') or "None",
"state": service.get('state') or "None",
"product": service.get('product') or "None",
"version": service.get('version') or "None",
"extra_info": service.get('extrainfo') or "None"
}
host_data["services"].append(service_data)
scan_results.append(host_data)
return scan_results
def send_data_to_backend(api_url, login_url, data, username, password):
"""
Send scan data to the Django backend API.
:param api_url: URL of the Django API endpoint.
:param data: Data to be sent as JSON.
:param username: Username for authentication.
:param password: Password for authentication.
"""
token = authenticate(login_url, username, password)
if not token:
print("Failed to authenticate.")
return
headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'}
response = requests.post(api_url, json={"devices": data}, headers=headers)
if response.status_code == 200:
print("Data sent successfully.")
elif response.status_code == 401:
print("Authentication failed or token expired. Re-authenticating...")
token = authenticate(username, password)
if not token:
print("Failed to re-authenticate.")
return
headers['Authorization'] = f'Bearer {token}'
response = requests.post(api_url, json={"devices": data}, headers=headers)
if response.status_code == 200:
print("Data sent successfully after re-authentication.")
else:
print(f"Failed to send data after re-authentication: {response.status_code} - {response.text}")
else:
print(f"Failed to send data: {response.status_code} - {response.text}")
def authenticate(login_url, username, password):
"""
Authenticate with the Django backend and retrieve a token.
:param login_url: URL of the Django API login endpoint.
:param username: Username for login.
:param password: Password for login.
:return: Token or None if authentication fails.
"""
try:
response = requests.post(login_url, json={"username": username, "password": password})
if response.status_code == 200:
return response.json().get("access") # Match the key to your JavaScript handling
else:
print(f"Authentication failed: {response.status_code} - {response.text}")
except requests.exceptions.RequestException as e:
print(f"Failed to connect to {login_url}: {e}")
return None
def main():
base_url = "http://localhost:8000/api/auth/"
login_url = f"{base_url}login/"
api_url = "http://localhost:8000/api/networkscanner/results/"
print("Please log in to the system.")
username = input("Username: ")
password = input("Password: ")
token = authenticate(login_url, username, password)
if token:
interfaces = list_network_interfaces()
if interfaces:
selected_network_range = get_user_selected_interface(interfaces)
print(f"Selected network range: {selected_network_range}")
print("Starting the network scan...")
scan_results = perform_scan(selected_network_range, "-sV")
if scan_results:
print("Scan complete, sending data to backend...")
send_data_to_backend(api_url, login_url, scan_results, username, password) # Pass username and password
else:
print("No devices found during scan.")
else:
print("No valid network interfaces found. Please check your network connection.")
else:
print("Exiting due to failed authentication.")
if __name__ == "__main__":
main()
......@@ -6,11 +6,14 @@ from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.permissions import AllowAny
# This view extends TokenObtainPairView to handle token creation on user login.
class MyTokenObtainPairView(TokenObtainPairView):
permission_classes = (permissions.AllowAny,) # Allow any user to access this view
# You can customize this view further if needed, e.g., adding extra data to the token response.
class RegisterView(APIView):
permission_classes = (AllowAny,)
......
No preview for this file type
from django.contrib import admin
from .models import ScanResult, Service, Recommendation
# Register your models here.
admin.site.register(ScanResult)
admin.site.register(Service)
admin.site.register(Recommendation)
from django.apps import AppConfig
class NetworkScannerConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "network_scanner"
# backend/networkscanner/models.py
from django.db import models
class ScanResult(models.Model):
ip_address = models.CharField(max_length=100)
hostname = models.CharField(max_length=255)
mac_address = models.CharField(max_length=100)
def __str__(self):
return f"{self.ip_address} - {self.hostname}"
class Service(models.Model):
scan_result = models.ForeignKey(ScanResult, related_name='services', on_delete=models.CASCADE)
port = models.IntegerField()
name = models.CharField(max_length=100)
state = models.CharField(max_length=50)
product = models.CharField(max_length=100, blank=True)
version = models.CharField(max_length=50, blank=True)
extra_info = models.CharField(max_length=200, blank=True)
def __str__(self):
return f"Port {self.port} - {self.name}"
class Recommendation(models.Model):
scan_result = models.ForeignKey(ScanResult, related_name='recommendations', on_delete=models.CASCADE)
recommendation_text = models.TextField()
def __str__(self):
return f"Recommendation for {self.scan_result.ip_address}"
# backend/networkscanner/serializers.py
from rest_framework import serializers
from .models import ScanResult, Service, Recommendation
class ServiceSerializer(serializers.ModelSerializer):
class Meta:
model = Service
fields = ['port', 'name', 'state', 'product', 'version', 'extra_info']
class ScanResultSerializer(serializers.ModelSerializer):
services = ServiceSerializer(many=True)
class Meta:
model = ScanResult
fields = ['ip_address', 'hostname', 'mac_address', 'services']
def create(self, validated_data):
print("Creating ScanResult with data:", validated_data)
services_data = validated_data.pop('services', [])
scan_result = ScanResult.objects.create(**validated_data)
print("Created ScanResult, now creating services...")
for service_data in services_data:
print("Creating service with data:", service_data)
Service.objects.create(scan_result=scan_result, **service_data)
print("All services created.")
return scan_result
class RecommendationSerializer(serializers.ModelSerializer):
class Meta:
model = Recommendation
fields = '__all__'
\ No newline at end of file
from django.test import TestCase
# Create your tests here.
from django.urls import path
from .views import recommendations_view, ResultsView #fetch_recommendations
urlpatterns = [
path('results/', ResultsView.as_view(), name='results'),
path('recommendations/', recommendations_view, name='recommendations_view'),
#path('fetch-recommendations/', fetch_recommendations, name='fetch_recommendations'),
# Add other URL patterns as needed
]
import openai
def prepare_scan_data_for_openai(scan_result):
"""
Format the scan result data into a structured summary that the AI can understand.
"""
services_info = []
for service in scan_result.services.all():
service_info = f"Service: {service.name}, Port: {service.port}, Product: {service.product}, Version: {service.version}"
services_info.append(service_info)
scan_data = f"Analyze the following network scan and provide network recommendations for IP {scan_result.ip_address}: " + ", ".join(services_info)
return scan_data
def get_recommendations_from_openai(scan_result):
"""
Uses OpenAI's chat completions to generate security recommendations based on scan results.
"""
openai.api_key = 'sk-proj-mWLHYmnxOBQUZoTRzVbHT3BlbkFJYZn5zyGI6ZCvTzHUwsAh' # Ensure this is securely managed
prompt = prepare_scan_data_for_openai(scan_result)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", # Specify the appropriate chat model
messages=[
{"role": "system", "content": "You are an AI trained to provide network security advice."},
{"role": "user", "content": prompt}
]
)
return response['choices'][0]['message']['content'].strip()
except Exception as e:
print(f"Failed to generate recommendations: {str(e)}")
return "Failed to generate recommendations."
from rest_framework.views import APIView
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from .models import ScanResult, Recommendation
from .serializers import ScanResultSerializer
from .utils import get_recommendations_from_openai
class ResultsView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
serializer = ScanResultSerializer(data=request.data.get('devices', []), many=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def recommendations_view(request):
all_scans = ScanResult.objects.all()
if not all_scans:
return Response({'error': 'No scan results available'}, status=status.HTTP_404_NOT_FOUND)
all_recommendations = []
for scan in all_scans:
recommendations = Recommendation.objects.filter(scan_result=scan)
if recommendations.exists():
all_recommendations.extend([{
'id': rec.id,
'ip_address': rec.scan_result.ip_address,
'recommendation_text': rec.recommendation_text
} for rec in recommendations])
else:
# Generate recommendations if none exist
recommendation_text = get_recommendations_from_openai(scan)
if recommendation_text and not recommendation_text.startswith("Failed"):
new_rec = Recommendation.objects.create(scan_result=scan, recommendation_text=recommendation_text)
all_recommendations.append({
'id': new_rec.id,
'ip_address': scan.ip_address,
'recommendation_text': recommendation_text
})
if all_recommendations:
return Response(all_recommendations)
else:
return Response({'error': 'Failed to generate recommendations for any scans'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
......@@ -35,6 +35,7 @@ INSTALLED_APPS = [
"rest_framework_simplejwt",
"breach_check",
"shodan_scan",
"network_scanner"
]
......
......@@ -12,5 +12,6 @@ urlpatterns = [
path('haveibeenpwned/<str:identifier>/', have_i_been_pwned, name='have_i_been_pwned'),
path('haveibeenpwned/passwords/<str:identifier>/', breached_password_info, name='breached_password_info'),
path('', include('shodan_scan.urls')),
path('api/networkscanner/', include('network_scanner.urls')),
# Add other URL patterns as needed
]
......@@ -69,6 +69,12 @@ const _nav = [
to: '/scan',
icon: <CIcon icon={cilShieldAlt} customClassName="nav-icon" />,
},
{
component: CNavItem,
name: 'Network Recommendations',
to: '/recommendations',
icon: <CIcon icon={cilShieldAlt} customClassName="nav-icon" />,
},
{
component: CNavTitle,
name: 'Theme',
......
......@@ -13,8 +13,9 @@ axios.interceptors.response.use(
(response) => response,
(error) => {
if (error.response && error.response.status === 401) {
console.error('Unauthorized access detected.')
// Redirect to the login page
window.location.href = '/login'
//window.location.href = '/login' FIX THIS LATER, not the answer to protected routes.
}
return Promise.reject(error)
},
......
......@@ -56,6 +56,9 @@ const PasswordBreaches = React.lazy(() => import('./views/passwords/Breaches'))
const TrackedPasswords = React.lazy(() => import('./views/passwords/BreachesPasswords'))
const Profile = React.lazy(() => import('./views/pages/profile/Profile'))
const shodan_scan = React.lazy(() => import('./views/shodan/shodanscan'))
const RecommendationsPage = React.lazy(() =>
import('./views/NetworkScanner/RecommendationComponent'),
)
const routes = [
{ path: '/', exact: true, name: 'Home' },
......@@ -108,6 +111,7 @@ const routes = [
{ path: '/password-breaches/view', name: 'View Breached Passwords', element: TrackedPasswords },
{ path: '/profile', name: 'Profile', element: Profile },
{ path: '/scan', name: 'Shodan Scan', element: shodan_scan },
{ path: '/recommendations', name: 'Recommendations', element: RecommendationsPage },
]
export default routes
import React, { useState, useEffect } from 'react'
import axios from 'axios'
const RecommendationsPage = () => {
const [recommendations, setRecommendations] = useState([])
useEffect(() => {
const fetchRecommendations = async () => {
const token = localStorage.getItem('token')
console.log('Token:', token)
if (!token) {
console.error('No token available.')
return
}
try {
const response = await axios.get(
'http://localhost:8000/api/networkscanner/recommendations/',
{
headers: {
Authorization: `Bearer ${token}`,
},
},
)
setRecommendations(response.data)
} catch (error) {
console.error('Error fetching recommendations:', error)
// Handle different errors appropriately here without redirecting
if (error.response) {
console.error('Error status:', error.response.status) // Log specific error status
}
}
}
fetchRecommendations()
}, [])
return (
<div>
<h2>Network Security Recommendations</h2>
<table className="table table-striped table-responsive mt-3">
<thead>
<tr>
<th>IP Address</th>
<th>Port</th>
<th>Service Name</th>
<th>Recommendation</th>
</tr>
</thead>
<tbody>
{recommendations.map((rec) => (
<tr key={rec.id}>
<td>{rec.ip_address}</td>
<td>{rec.port}</td>
<td>{rec.service_name}</td>
<td>{rec.recommendation_text}</td>
</tr>
))}
</tbody>
</table>
</div>
)
}
export default RecommendationsPage