Nate Miller
Nate Miller

Reputation: 1

Pulling Vuln details for WPscan bot

I am working on a slack bot that will act as an automatic WPscan tool to check our websites that use WP to see if there is a vulnerability whenever an article mentioning WP pops up in our RSS feed. I am struggling to get it tuned just right so that it can automatically find and detect the plugin name for the keyword WP scan when it runs. I am also running into issues when pulling from the NVD in the form of api rate limiting. I tried to work around this using the Github Vuln Database with the NVD, but the method I was using to pull from the NVD doesn't work because github uses its own vuln number system. I'm a security guy, not a dev or a coder, so I may be missing something as I've been working on building this. The main part I've been troubleshooting is the 'ArticleParser' class in the code which I've provided below. I can post the full code if it helps, as I have my personal API keys hosted in a separate file for when/if I choose to post the code publicly on GitHub since it could be a useful automation for others. I just can't seem to solve the issue of pulling the info I want from the articles and/or the CVE reports. Any suggestions?

Code:

class ArticleParser:
    def __init__(self, github_token):
        self.common_article_selectors = [
            'article', 
            '.post-content', 
            '.entry-content',
            'main',
            '#content'
        ]
        self.github_headers = {
            'Accept': 'application/vnd.github.v3+json',
            'Authorization': f'token {github_token}'
        }

    def format_error_message(self, url, error):
        base_message = f"⚠️ Unable to parse article from {url}\n"
        solutions = ["Try manual keyword scan with /scan [plugin name]"]

        if isinstance(error, requests.exceptions.SSLError):
            solutions.append("Site's security certificate might be invalid")
        elif isinstance(error, requests.exceptions.ConnectionError):
            solutions.append("Check if the site is accessible")
        elif isinstance(error, requests.exceptions.Timeout):
            solutions.append("Site might be temporarily slow or unavailable")
        elif isinstance(error, requests.exceptions.HTTPError):
            if error.response.status_code == 403:
                solutions.append("Article might be behind a paywall")
            elif error.response.status_code == 404:
                solutions.append("Article might have been removed")

        return base_message + "\nPossible solutions:\n" + "\n".join(f"- {s}" for s in solutions)

    def extract_article_info(self, url):
        scraper_logger.info(f"Starting article extraction for: {url}")
        try:
            response = http_session.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            
            content = None
            for selector in self.common_article_selectors:
                element = soup.select_one(selector)
                if element:
                    content = element.get_text()
                    break
            
            if not content:
                scraper_logger.warning(f"Could not find article content with common selectors: {url}")
                content = soup.get_text()

            vuln_info = self.extract_vulnerability_info(content)
            debug_logger.debug(f"Extracted vulnerability info: {vuln_info}")

            # Only fall back to article content plugin detection if we didn't get keywords from vuln source
            plugin_info = (
                {'plugin_name': ' '.join(vuln_info['plugin_keywords']), 'version': None} 
                if vuln_info['plugin_keywords'] 
                else self.extract_plugin_info(content)
            )

            debug_logger.debug(f"Extracted plugin info: {plugin_info}")

            return {
                **plugin_info,
                'vulnerability': vuln_info['description'] if vuln_info['description'] else "No vulnerability details found"
            }

        except Exception as e:
            error_message = self.format_error_message(url, e)
            scraper_logger.error(f"Article parsing error: {str(e)}", exc_info=True)
            return {'error': error_message}

    def extract_plugin_info(self, content):
        plugin_patterns = [
            r'(?i)"([^"]+)"\s+(?:wordpress\s+)?plugin',
            r'(?i)plugin\s+(?:called|named)\s+"([^"]+)"',
            r'(?i)(?:wordpress\s+)?plugin\s+([A-Za-z0-9\s-]+?)\s+(?:is|has|contains)',
            r'(?i)([A-Za-z0-9\s-]+?)\s+is\s+a\s+(?:wordpress\s+)?plugin',
            r'(?i)vulnerability\s+in\s+(?:the\s+)?([A-Za-z0-9\s-]+?)\s+plugin'
        ]
        
        version_patterns = [
            r'version\s*((?:\d+\.)?(?:\d+\.)?(?:\*|\d+))',
            r'v((?:\d+\.)?(?:\d+\.)?(?:\*|\d+))',
            r'affected\s+versions?\s*:?\s*((?:\d+\.)?(?:\d+\.)?(?:\*|\d+))'
        ]

        plugin_name = None
        version = None

        for pattern in plugin_patterns:
            match = re.search(pattern, content, re.I)
            if match:
                candidate = match.group(1).strip()
                if len(candidate.split()) <= 5 and not any(term in candidate.lower() for term in ['bug', 'issue', 'vulnerability']):
                    plugin_name = candidate
                    break

        for pattern in version_patterns:
            match = re.search(pattern, content, re.I)
            if match:
                version = match.group(1)
                break

        return {
            'plugin_name': plugin_name,
            'version': version
        }

    def extract_vulnerability_info(self, content):
        cve_pattern = r'CVE-\d{4}-\d{4,7}'
        cve_match = re.search(cve_pattern, content)
        
        vuln_info = {
            'description': None,
            'source': None,
            'plugin_keywords': []
        }
        
        if not cve_match:
            vuln_patterns = [
                r'vulnerability[^.]*allows[^.]*\.',
                r'security (issue|flaw)[^.]*\.',
                r'(?:plugin|theme)[^.]*vulnerable[^.]*\.'
            ]
            for pattern in vuln_patterns:
                match = re.search(pattern, content, re.I)
                if match:
                    scraper_logger.info("Vulnerability info extracted from article content")
                    vuln_info['description'] = match.group(0).strip()
                    vuln_info['source'] = 'article'
                    return vuln_info
            return vuln_info
            
        cve_number = cve_match.group(0)
        
        try:
            # Try GitHub Advisory Database
            scraper_logger.info(f"Attempting to fetch CVE data from GitHub Advisory Database for {cve_number}")
            search_url = f"https://api.github.com/search/advisories?q={cve_number}"
            response = requests.get(search_url, headers=self.github_headers)
            
            if response.status_code == HTTPStatus.OK:
                data = response.json()
                if data.get('items') and len(data['items']) > 0:
                    advisory = data['items'][0]
                    ghsa_id = advisory.get('ghsa_id')
                    if ghsa_id:
                        advisory_url = f"https://api.github.com/advisories/{ghsa_id}"
                        advisory_response = requests.get(advisory_url, headers=self.github_headers)
                        if advisory_response.status_code == HTTPStatus.OK:
                            advisory_data = advisory_response.json()
                            description = advisory_data.get('summary')
                            if description:
                                scraper_logger.info(f"Successfully retrieved vulnerability info from GitHub (GHSA: {ghsa_id})")
                                vuln_info['description'] = f"{cve_number}: {description}"
                                vuln_info['source'] = 'github'
                                # Extract first few words as potential plugin name
                                title_words = description.split()[:5]
                                vuln_info['plugin_keywords'] = [word.lower() for word in title_words 
                                                              if len(word) > 3 and word.lower() not in 
                                                              ['wordpress', 'the', 'and', 'for', 'with', 'plugin']]
                                return vuln_info
                                
            # If GitHub fails, try NVD API
            scraper_logger.info(f"Attempting to fetch CVE data from NVD API for {cve_number}")
            time.sleep(2)
            nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_number}"
            response = http_session.get(nvd_url)
            data = response.json() if response.status_code == 200 else {}
            
            if data and isinstance(data, dict):
                vulns = data.get('vulnerabilities', [])
                if vulns and len(vulns) > 0:
                    vuln = vulns[0]
                    if vuln and isinstance(vuln, dict):
                        cve_data = vuln.get('cve', {})
                        if cve_data and isinstance(cve_data, dict):
                            descriptions = [d.get('value', '') for d in cve_data.get('descriptions', [])
                                         if d.get('lang') == 'en' and len(d.get('value', '')) > 50]
                            if descriptions:
                                description = max(descriptions, key=len)
                                scraper_logger.info("Successfully retrieved vulnerability info from NVD API")
                                vuln_info['description'] = f"{cve_number}: {description}"
                                vuln_info['source'] = 'nvd'
                                # Extract first few words as potential plugin name
                                title_words = description.split()[:5]
                                vuln_info['plugin_keywords'] = [word.lower() for word in title_words 
                                                              if len(word) > 3 and word.lower() not in 
                                                              ['wordpress', 'the', 'and', 'for', 'with', 'plugin']]
                                return vuln_info
                    
        except Exception as e:
            scraper_logger.error(f"Error fetching CVE details: {str(e)}")
            # Extract from article as fallback
            vuln_patterns = [
                r'vulnerability[^.!?]*[.!?]',
                r'security (?:issue|flaw)[^.!?]*[.!?]',
                r'(?:plugin|theme)[^.!?]*vulnerable[^.!?]*[.!?]'
            ]
            full_description = []
            for pattern in vuln_patterns:
                matches = re.finditer(pattern, content, re.I)
                for match in matches:
                    full_description.append(match.group(0).strip())
            
            if full_description:
                scraper_logger.info("Using article content as fallback for vulnerability description")
                vuln_info['description'] = f"{cve_number}: {' '.join(full_description)}"
                vuln_info['source'] = 'article'
                return vuln_info
        
        return vuln_info

I've tried using the NVD api to pull the plugin name and summary, and that worked for a time, however in testing I've been rate limited due to testing my changes frequently.

I have tried switching to pulling from the Github Advisory Database, but the method I was using (www.vulndatabasesite.com/whatever-cve-number) doesn't work with it due to the GAV using a different numbering scheme. The CVE's can be searched on there and the pages are named after the CVE numbers, but the actual URL uses GitHub's own numbering scheme.

Upvotes: 0

Views: 23

Answers (0)

Related Questions