Guides & Examples
Advanced guides for configuring TorScan, implementing custom solutions, and following security best practices.
Advanced Configuration
Tor Circuit Management
Optimize Tor circuit rotation for better anonymity and performance.
# config/tor_settings.yaml tor: # Circuit rotation settings circuit_rotation: enabled: true rotation_interval: 300 # seconds max_circuits: 10 countries_exclude: ["US", "GB", "CA", "AU", "NZ"] # Five Eyes countries_prefer: ["CH", "IS", "RO", "CZ"] # Performance tuning performance: concurrent_circuits: 5 connection_timeout: 30 read_timeout: 60 retry_attempts: 3 # Security settings security: use_bridges: true bridge_type: "obfs4" isolate_streams: true new_circuit_on_error: true # Entry guards entry_guards: num_guards: 3 guard_lifetime: 60 # days restrict_guards: true
Implementing Circuit Monitoring
# plugins/circuit_monitor.py import time from stem import Signal from stem.control import Controller class CircuitMonitor: def __init__(self, control_port=9051, password='your_password'): self.controller = Controller.from_port(port=control_port) self.controller.authenticate(password=password) self.circuit_times = {} def get_circuit_info(self): """Get current circuit information""" circuits = self.controller.get_circuits() circuit_info = [] for circuit in circuits: if circuit.status == 'BUILT': path = [r.fingerprint for r in circuit.path] exit_fp = path[-1] if path else None exit_relay = self.controller.get_network_status(exit_fp) circuit_info.append({ 'id': circuit.id, 'path_length': len(path), 'exit_country': exit_relay.address if exit_relay else 'Unknown', 'build_time': circuit.created, 'purpose': circuit.purpose }) return circuit_info def rotate_circuit(self): """Force new Tor circuit""" self.controller.signal(Signal.NEWNYM) time.sleep(self.controller.get_newnym_wait()) return self.get_circuit_info() def monitor_performance(self): """Track circuit performance metrics""" circuits = self.get_circuit_info() metrics = { 'total_circuits': len(circuits), 'countries': set(), 'avg_path_length': 0 } for circuit in circuits: metrics['countries'].add(circuit['exit_country']) metrics['avg_path_length'] += circuit['path_length'] metrics['avg_path_length'] /= len(circuits) if circuits else 1 metrics['countries'] = list(metrics['countries']) return metrics
Elasticsearch Optimization
Configure Elasticsearch for optimal dark web content indexing.
# Create optimized index mapping PUT /torscan_content { "settings": { "number_of_shards": 3, "number_of_replicas": 1, "analysis": { "analyzer": { "dark_web_analyzer": { "type": "custom", "tokenizer": "standard", "filter": [ "lowercase", "stop", "dark_web_synonyms", "edge_ngram_filter" ] } }, "filter": { "dark_web_synonyms": { "type": "synonym", "synonyms": [ "cc, credit card", "ssn, social security", "btc, bitcoin", "onion, tor, hidden service" ] }, "edge_ngram_filter": { "type": "edge_ngram", "min_gram": 3, "max_gram": 20 } } } }, "mappings": { "properties": { "url": { "type": "keyword", "fields": { "text": { "type": "text" } } }, "content": { "type": "text", "analyzer": "dark_web_analyzer", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "found_at": { "type": "date" }, "matched_patterns": { "type": "keyword" }, "iocs": { "type": "nested", "properties": { "type": {"type": "keyword"}, "value": {"type": "keyword"}, "context": {"type": "text"} } }, "metadata": { "type": "object", "properties": { "language": {"type": "keyword"}, "tor_exit": {"type": "ip"}, "page_size": {"type": "long"}, "response_time": {"type": "float"} } } } } }
Search Query Optimization
# Advanced search with aggregations POST /torscan_content/_search { "query": { "bool": { "must": [ { "multi_match": { "query": "data breach credit card", "fields": ["content^2", "url.text", "title^3"], "type": "best_fields", "fuzziness": "AUTO" } } ], "filter": [ { "range": { "found_at": { "gte": "now-7d/d" } } }, { "terms": { "matched_patterns": ["credit_card", "personal_data"] } } ] } }, "aggs": { "patterns_over_time": { "date_histogram": { "field": "found_at", "calendar_interval": "day" }, "aggs": { "top_patterns": { "terms": { "field": "matched_patterns", "size": 5 } } } }, "ioc_types": { "nested": { "path": "iocs" }, "aggs": { "by_type": { "terms": { "field": "iocs.type" } } } } }, "highlight": { "fields": { "content": { "fragment_size": 150, "number_of_fragments": 3 } } } }
Custom Pattern Development
Creating Advanced Detection Patterns
Financial Data Patterns
# patterns/financial.yaml patterns: # Credit Card Detection (with BIN validation) - name: credit_card_full regex: | (?:4[0-9]{12}(?:[0-9]{3})?| # Visa 5[1-5][0-9]{14}| # MasterCard 3[47][0-9]{13}| # Amex 3(?:0[0-5]|[68][0-9])[0-9]{11}| # Diners 6(?:011|5[0-9]{2})[0-9]{12}| # Discover (?:2131|1800|35\d{3})\d{11}) # JCB validation: luhn severity: critical extract_context: 50 # IBAN Detection - name: iban regex: | [A-Z]{2}[0-9]{2}[A-Z0-9]{1,30} validation: iban_checksum countries: ["DE", "GB", "CH", "FR"] # Cryptocurrency Addresses - name: crypto_addresses patterns: bitcoin: '^[13][a-km-zA-HJ-NP-Z1-9]{25,34}$' ethereum: '^0x[a-fA-F0-9]{40}$' monero: '^4[0-9AB][0-9a-zA-Z]{93}$' validation: address_checksum track_transactions: true
Custom Pattern Plugin
# plugins/custom_matcher.py import re from typing import List, Dict, Any import hashlib class AdvancedMatcher: def __init__(self, config: Dict[str, Any]): self.patterns = self._compile_patterns(config['patterns']) self.ml_model = self._load_ml_model(config.get('ml_model')) def match(self, text: str) -> List[Dict]: matches = [] # Regex pattern matching for pattern_name, pattern in self.patterns.items(): for match in pattern.finditer(text): match_data = { 'type': pattern_name, 'value': match.group(), 'position': match.span(), 'confidence': 1.0 } # Apply ML-based validation if self.ml_model: context = text[max(0, match.start()-100):match.end()+100] match_data['confidence'] = self.ml_model.predict(context) # Extract additional context match_data['context'] = self._extract_context(text, match) # Check for related IOCs match_data['related'] = self._find_related_iocs(text, match) matches.append(match_data) return matches def _extract_context(self, text: str, match) -> Dict: """Extract meaningful context around match""" start = max(0, match.start() - 200) end = min(len(text), match.end() + 200) context = text[start:end] # Extract sentences containing the match sentences = re.split(r'[.!?]', context) relevant_sentences = [s for s in sentences if match.group() in s] return { 'full': context, 'sentences': relevant_sentences, 'keywords': self._extract_keywords(context) } def _find_related_iocs(self, text: str, match) -> List[Dict]: """Find other IOCs near this match""" related = [] search_window = 500 # Look for emails near financial data email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[A-Z|a-z]{2,}' for email in re.finditer(email_pattern, text[match.start()-search_window:match.end()+search_window]): related.append({ 'type': 'email', 'value': email.group(), 'distance': abs(email.start() - match.start()) }) return related
Threat Intelligence Integration
MISP Integration Guide
# config/misp_integration.py from pymisp import PyMISP, MISPEvent, MISPAttribute import json class MISPIntegration: def __init__(self, url, key, verifycert=True): self.misp = PyMISP(url, key, verifycert) def create_event_from_scan(self, scan_results): """Create MISP event from TorScan results""" event = MISPEvent() event.info = f"TorScan Dark Web Intelligence - {scan_results['scan_id']}" event.distribution = 3 # All communities event.threat_level_id = 2 # Medium event.analysis = 1 # Ongoing # Add tags event.add_tag('tlp:amber') event.add_tag('dark-web') event.add_tag(f"torscan:scan_id={scan_results['scan_id']}") # Add attributes from results for result in scan_results['results']: # Add URL as attribute url_attr = MISPAttribute() url_attr.type = 'url' url_attr.value = result['url'] url_attr.comment = f"Found: {result['found_at']}" url_attr.to_ids = False event.add_attribute(**url_attr) # Add IOCs for ioc in result.get('iocs', []): attr = MISPAttribute() attr.type = self._map_ioc_type(ioc['type']) attr.value = ioc['value'] attr.comment = f"Context: {ioc.get('context', '')[:100]}" attr.to_ids = True # Add sighting attr.add_sighting( source="TorScan", type=0, # Positive sighting timestamp=result['found_at'] ) event.add_attribute(**attr) # Add correlation data event.add_attribute( type='text', value=json.dumps(scan_results['correlation_data']), comment='TorScan correlation analysis' ) # Publish event response = self.misp.add_event(event, pythonify=True) return response def _map_ioc_type(self, torscan_type): """Map TorScan IOC types to MISP types""" mapping = { 'bitcoin_address': 'btc', 'email': 'email-src', 'ip': 'ip-src', 'domain': 'domain', 'md5': 'md5', 'sha256': 'sha256', 'credit_card': 'cc-number' } return mapping.get(torscan_type, 'text')
OpenCTI Integration
# config/opencti_connector.py from pycti import OpenCTIApiClient from datetime import datetime import stix2 class OpenCTIConnector: def __init__(self, url, token): self.client = OpenCTIApiClient(url, token) def create_incident_from_scan(self, scan_data): """Create OpenCTI incident from scan results""" # Create incident incident = stix2.Incident( name=f"Dark Web Activity - {scan_data['name']}", description=self._build_description(scan_data), first_seen=scan_data['started_at'], last_seen=scan_data['completed_at'], confidence=85, labels=['dark-web', 'torscan', 'automated-detection'] ) # Create indicators indicators = [] for result in scan_data['results']: for ioc in result['iocs']: indicator = stix2.Indicator( pattern=self._create_pattern(ioc), pattern_type='stix', name=f"{ioc['type']}: {ioc['value']}", description=f"Found on {result['url']}", confidence=result['confidence'] * 100, valid_from=result['found_at'] ) indicators.append(indicator) # Create relationships relationships = [] for indicator in indicators: rel = stix2.Relationship( source_ref=incident.id, target_ref=indicator.id, relationship_type='indicates' ) relationships.append(rel) # Bundle and submit bundle = stix2.Bundle( incident, *indicators, *relationships ) response = self.client.stix2.import_bundle(bundle) return response
Security Best Practices
1. Operational Security (OPSEC)
- • Never use TorScan from your main network - use isolated VMs
- • Rotate API keys and credentials regularly
- • Use dedicated infrastructure for dark web operations
- • Implement strict access controls and audit logging
- • Never store sensitive scan results unencrypted
2. Data Handling
# Secure data handling configuration security: encryption: at_rest: enabled: true algorithm: "AES-256-GCM" key_management: "HSM" # Hardware Security Module in_transit: tls_version: "1.3" cipher_suites: - "TLS_AES_256_GCM_SHA384" - "TLS_CHACHA20_POLY1305_SHA256" data_retention: scan_results: 90 # days logs: 30 sensitive_data: 7 anonymization: enabled: true fields: - "user_data" - "personal_info" - "credit_cards" method: "tokenization" access_control: require_mfa: true ip_whitelist: - "10.0.0.0/8" session_timeout: 3600
3. Legal Compliance
- • Only scan sites you have authorization to monitor
- • Comply with local laws regarding dark web access
- • Implement data protection measures (GDPR, CCPA)
- • Maintain chain of custody for evidence
- • Document all scanning activities for audit trails
Real-World Use Cases
Brand Protection Monitoring
Monitor for counterfeit products, brand impersonation, and trademark violations.
# Brand monitoring configuration scan_config: name: "Brand Protection Monitor" patterns: - "CompanyName" - "Product1|Product2|Product3" - "official.*store|authorized.*dealer" - "(fake|counterfeit|replica).*(CompanyName|Product)" sources: - "marketplace_sites.yaml" - "known_counterfeit_forums.yaml" alerts: high_priority: - pattern: "selling.*(CompanyName|Product)" - pattern: "crack|keygen|license" medium_priority: - pattern: "review|discussion" actions: on_match: - screenshot: true - archive_page: true - notify_legal: true
Data Breach Detection
Early detection of company data breaches and leaked credentials.
# Breach detection patterns patterns: employee_emails: regex: "[a-zA-Z0-9._%+-]+@company\.com" severity: critical auto_verify: true database_dumps: keywords: - "company database" - "SQL dump" - "customer records" file_extensions: [".sql", ".csv", ".json", ".txt"] credentials: patterns: - "username:.*password:" - "email:.*pass:" context_required: true sensitive_docs: keywords: - "confidential" - "internal only" - "proprietary" near: "@company.com" # Within 50 chars response_plan: critical: - isolate_affected_accounts - reset_credentials - notify_security_team - preserve_evidence
Threat Actor Tracking
Monitor known threat actors and their activities.
# Threat actor monitoring actors: - name: "APT_Group_X" aliases: ["DarkHydra", "CyberPhantom"] indicators: usernames: ["darkhydra2024", "phantom_cyber"] bitcoin_addresses: ["1A1zP1...", "3J98t1..."] pgp_keys: ["0x1234ABCD"] ttps: # Tactics, Techniques, Procedures - "ransomware deployment" - "data exfiltration" - "supply chain attacks" monitoring: forums: ["forum1.onion", "market2.onion"] keywords: ["new operation", "selling access", "0day"] tracking_config: correlation: time_window: 7 # days min_indicators: 2 alerts: new_activity: true new_indicators: true tool_updates: true
Performance Tuning
Scaling Considerations
# Production scaling configuration scaling: crawler_workers: min: 5 max: 50 scale_factor: "queue_length / 100" tor_circuits: pool_size: 20 rotation_workers: 5 health_check_interval: 60 elasticsearch: index_shards: 5 replicas: 2 refresh_interval: "30s" redis: maxmemory: "4gb" maxmemory_policy: "allkeys-lru" mongodb: connection_pool: 100 write_concern: "majority" monitoring: metrics: - crawler_performance - tor_circuit_health - queue_lengths - error_rates alerts: high_queue_length: 1000 low_success_rate: 0.7 high_error_rate: 0.1
Advanced Troubleshooting
Tor Circuit Issues
# Debug Tor connectivity docker-compose exec tor-proxy tor-resolve check.torproject.org # Check circuit status curl -X GET http://localhost:5000/api/debug/tor/circuits \ -H "Authorization: Bearer $TOKEN" # Force new identity curl -X POST http://localhost:5000/api/debug/tor/newnym \ -H "Authorization: Bearer $TOKEN" # Common fixes: # 1. Restart Tor service docker-compose restart tor-proxy # 2. Clear Tor cache docker-compose exec tor-proxy rm -rf /var/lib/tor/* # 3. Update bridge configuration # Edit torrc and add new bridges from https://bridges.torproject.org/
Performance Diagnostics
# Enable debug mode export FLASK_DEBUG=1 export LOG_LEVEL=DEBUG # Profile slow queries docker-compose exec elasticsearch \ curl -X GET "localhost:9200/_nodes/hot_threads" # Monitor resource usage docker stats --format "table {{.Container}}\t{{.CPUPerc}}\t{{.MemUsage}}" # Check queue backlogs docker-compose exec redis redis-cli > LLEN crawl_queue > LLEN result_queue # Analyze crawler performance curl -X GET http://localhost:5000/api/debug/performance \ -H "Authorization: Bearer $TOKEN"