Add pipeline to improve/enhance data

TL;DR

  1. Create a new file {improver_name}.py inside vulnerabilities/pipelines/.

  2. Create a new improver pipeline by inheriting VulnerableCodePipeline defined in vulnerabilities.pipelines.

  3. Implement steps classmethod to define what function to run and in which order.

  4. Implement the individual function defined in steps

  5. Add the newly created pipeline to the improvers registry at vulnerabilities/improvers/__init__.py.

Pipeline

We use aboutcode.pipeline for importing and improving data. At a very high level, a working pipeline contains classmethod steps that defines what steps to run and in what order. These steps are essentially just functions. Pipeline provides an easy and effective way to log events inside these steps (it automatically handles rendering and dissemination for these logs.)

It also includes built-in progress indicator, which is essential since some of the jobs we run in the pipeline are long-running tasks that require proper progress indicators. Pipeline provides way to seamlessly records the progress (it automatically takes care of rendering and dissemination of these progress).

Additionally, the pipeline offers a consistent structure, making it easy to run these pipeline steps with message queue like RQ and store all events related to a particular pipeline for debugging/improvements.

This tutorial contains all the things one should know to quickly implement an improver pipeline.

Prerequisites

The new improver design lets you do all sorts of cool improvements and enhancements. Some of those are:

  • Let’s suppose you have a certain number of packages and vulnerabilities in your database, and you want to make sure that the packages being shown in VulnerableCode do indeed exist upstream. Oftentimes, we come across advisory data that contains made-up package versions. We can write (well, we already have) a pipeline that iterates through all the packages in VulnerableCode and labels them as ghost packages if they don’t exist upstream.

  • A basic security advisory only contains CVE/aliases, summary, fixed/affected version, and severity. But now we can use the new pipeline to enhance the vulnerability info with exploits from various sources like ExploitDB, Metasploit, etc.

  • Likewise, we can have more pipelines to flag malicious/yanked packages.

So you see, the new improver pipeline is very powerful in what you can achieve, but as always, with great power comes great responsibility. By design, the new improver are unconstrained, and you must be absolutely sure of what you’re doing and should have robust tests for these pipelines in place.

Writing an Improver Pipeline

Scenario: Suppose we come around a source that curates and stores the list of packages that don’t exist upstream and makes it available through the REST API endpoint https://example.org/api/non-existent-packages, which gives a JSON response with a list of non-existent packages.

Let’s write a pipeline that will use this source to flag these non-existent package as ghost package.

Create file for the new improver pipeline

All pipelines, including the improver pipeline, are located in the vulnerabilities/pipelines/ directory.

The improver pipeline is implemented by subclassing VulnerableCodePipeline.

Specify the importer license

If the improver pipeline scrapes data off the internet, we need to track the license for the scraped data to make sure that we can legally use it.

Populate the spdx_license_expression with the appropriate value. The SPDX license identifiers can be found at ScanCode LicenseDB.

Note

An SPDX license identifier by itself is a valid license expression. In case you need more complex expressions, see https://spdx.github.io/spdx-spec/v2.3/SPDX-license-expressions/

Add skeleton for new pipeline

In this scenario pipeline needs to do two thing fetch raw data and use that to flag those packages.

At this point improver will look like this:

vulnerabilities/pipelines/flag_ghost_package_with_example_org.py
 1from vulnerabilities.pipelines import VulnerableCodePipeline
 2
 3class FlagGhostPackagesWithExampleOrg(VulnerableCodePipeline):
 4    """Example improver pipeline to flag ghost packages."""
 5
 6    pipeline_id = "flag_ghost_package_with_example_org"
 7
 8    license_url = "https://exmaple.org/license/"
 9    spdx_license_expression = "CC-BY-4.0"
10
11    @classmethod
12    def steps(cls):
13        return (
14            cls.fetch_response,
15            cls.flag_ghost_packages,
16        )
17
18    def fetch_response(self):
19        raise NotImplementedError
20
21    def flag_ghost_packages(self):
22        raise NotImplementedError

Implement the steps

We will evolve our high level design by implementing fetch_response and flag_ghost_packages methods.

vulnerabilities/pipelines/flag_ghost_package_with_example_org.py
 1from vulnerabilities.models import Package
 2from vulnerabilities.pipelines import VulnerableCodePipeline
 3
 4
 5class FlagGhostPackagesWithExampleOrg(VulnerableCodePipeline):
 6    """Example improver pipeline to flag ghost packages."""
 7
 8    pipeline_id = "flag_ghost_package_with_example_org"
 9
10    license_url = "https://exmaple.org/license/"
11    spdx_license_expression = "CC-BY-4.0"
12
13    @classmethod
14    def steps(cls):
15        return (
16            cls.fetch_response,
17            cls.flag_ghost_packages,
18        )
19
20    def fetch_response(self):
21        # Since this is imaginary source we will mock the response
22        # In actual implementation you need to use request library to get data.
23        mock_response = {
24            "non-existent": [
25                "pkg:npm/626@1.1.1",
26                "pkg:npm/bootstrap-tagsinput@0.8.0",
27                "pkg:npm/dojo@1.0.0",
28                "pkg:npm/dojo@1.1.0",
29                "pkg:npm/electron@1.8.0",
30            ]
31        }
32        self.fetched_data = mock_response
33
34    def flag_ghost_packages(self):
35        non_existent_packages = self.fetched_data.get("non-existent", [])
36
37        ghost_packages = Package.objects.filter(package_url__in=non_existent_packages)
38        ghost_package_count = ghost_packages.count()
39
40        ghost_packages.update(is_ghost=True)
41
42        self.log(f"Successfully flagged {ghost_package_count:,d} ghost Packages")

Attention

Implement on_failure to handle cleanup in case of pipeline failure. Cleanup of downloaded archives or cloned repos is necessary to avoid potential resource leakage.

Note

Use make valid to format your new code using black and isort automatically.
Use make check to check for formatting errors.

Register the Improver Pipeline

Finally, register your improver in the improver registry at vulnerabilities/improvers/__init__.py

vulnerabilities/improvers/__init__.py
 1from vulnerabilities.pipeline import enhance_with_kev
 2from vulnerabilities.pipeline import flag_ghost_package_with_example_org
 3
 4IMPROVERS_REGISTRY = [
 5    enhance_with_kev.VulnerabilityKevPipeline,
 6    flag_ghost_package_with_example_org.FlagGhostPackagesWithExampleOrg,
 7]
 8
 9IMPROVERS_REGISTRY = {
10    x.pipeline_id if issubclass(x, VulnerableCodePipeline) else x.qualified_name: x
11    for x in IMPROVERS_REGISTRY
12}

Congratulations! You have written your first improver pipeline.

Run Your First Improver Pipeline

If everything went well, you will see your improver in the list of available improvers.

 $ ./manage.py improve --list

 Vulnerability data can be processed by these available improvers:
 enhance_with_kev
 flag_ghost_package_with_example_org

Now, run the improver.

$ ./manage.py improve flag_ghost_package_with_example_org

Improving data using flag_ghost_package_with_example_org
INFO 2024-10-17 14:37:54.482 Pipeline [FlagGhostPackagesWithExampleOrg] starting
INFO 2024-10-17 14:37:54.482 Step [fetch_response] starting
INFO 2024-10-17 14:37:54.482 Step [fetch_response] completed in 0 seconds
INFO 2024-10-17 14:37:54.482 Step [flag_ghost_packages] starting
INFO 2024-10-17 14:37:54.488 Successfully flagged 5 ghost Packages
INFO 2024-10-17 14:37:54.488 Step [flag_ghost_packages] completed in 0 seconds
INFO 2024-10-17 14:37:54.488 Pipeline completed in 0 seconds

See Command Line Interface for command line usage instructions.

Tip

If you need to improve package vulnerability relations created using a certain pipeline, simply use the pipeline_id to filter out only those items. For example, if you want to improve only those AffectedByPackageRelatedVulnerability entries that were created by npm_importer pipeline, you can do so with the following query:

AffectedByPackageRelatedVulnerability.objects.filter(created_by=NpmImporterPipeline.pipeline_id)

Note

Make sure to use properly optimized query sets, and wherever needed, use paginated query sets.