Add pipeline to improve/enhance data
TL;DR
Create a new file
{improver_name}.py
inside vulnerabilities/pipelines/.Create a new improver pipeline by inheriting VulnerableCodePipeline defined in vulnerabilities.pipelines.
Implement
steps
classmethod to define what function to run and in which order.Implement the individual function defined in
steps
Add the newly created pipeline to the improvers registry at vulnerabilities/improvers/__init__.py.
Pipeline
We use aboutcode.pipeline
for importing and improving data. At a very high level, a working pipeline contains classmethod
steps
that defines what steps to run and in what order. These steps are essentially just
functions. Pipeline provides an easy and effective way to log events inside these steps (it
automatically handles rendering and dissemination for these logs.)
It also includes built-in progress indicator, which is essential since some of the jobs we run in the pipeline are long-running tasks that require proper progress indicators. Pipeline provides way to seamlessly records the progress (it automatically takes care of rendering and dissemination of these progress).
Additionally, the pipeline offers a consistent structure, making it easy to run these pipeline steps with message queue like RQ and store all events related to a particular pipeline for debugging/improvements.
This tutorial contains all the things one should know to quickly implement an improver pipeline.
Prerequisites
The new improver design lets you do all sorts of cool improvements and enhancements. Some of those are:
Let’s suppose you have a certain number of packages and vulnerabilities in your database, and you want to make sure that the packages being shown in VulnerableCode do indeed exist upstream. Oftentimes, we come across advisory data that contains made-up package versions. We can write (well, we already have) a pipeline that iterates through all the packages in VulnerableCode and labels them as ghost packages if they don’t exist upstream.
A basic security advisory only contains CVE/aliases, summary, fixed/affected version, and severity. But now we can use the new pipeline to enhance the vulnerability info with exploits from various sources like ExploitDB, Metasploit, etc.
Likewise, we can have more pipelines to flag malicious/yanked packages.
So you see, the new improver pipeline is very powerful in what you can achieve, but as always, with great power comes great responsibility. By design, the new improver are unconstrained, and you must be absolutely sure of what you’re doing and should have robust tests for these pipelines in place.
Writing an Improver Pipeline
Scenario: Suppose we come around a source that curates and stores the list of packages that don’t exist upstream and makes it available through the REST API endpoint https://example.org/api/non-existent-packages, which gives a JSON response with a list of non-existent packages.
Let’s write a pipeline that will use this source to flag these non-existent package as ghost package.
Create file for the new improver pipeline
All pipelines, including the improver pipeline, are located in the vulnerabilities/pipelines/ directory.
The improver pipeline is implemented by subclassing VulnerableCodePipeline.
Specify the importer license
If the improver pipeline scrapes data off the internet, we need to track the license for the scraped data to make sure that we can legally use it.
Populate the spdx_license_expression
with the appropriate value. The SPDX license identifiers
can be found at ScanCode LicenseDB.
Note
An SPDX license identifier by itself is a valid license expression. In case you need more complex expressions, see https://spdx.github.io/spdx-spec/v2.3/SPDX-license-expressions/
Add skeleton for new pipeline
In this scenario pipeline needs to do two thing fetch raw data and use that to flag those packages.
At this point improver will look like this:
1from vulnerabilities.pipelines import VulnerableCodePipeline
2
3class FlagGhostPackagesWithExampleOrg(VulnerableCodePipeline):
4 """Example improver pipeline to flag ghost packages."""
5
6 pipeline_id = "flag_ghost_package_with_example_org"
7
8 license_url = "https://exmaple.org/license/"
9 spdx_license_expression = "CC-BY-4.0"
10
11 @classmethod
12 def steps(cls):
13 return (
14 cls.fetch_response,
15 cls.flag_ghost_packages,
16 )
17
18 def fetch_response(self):
19 raise NotImplementedError
20
21 def flag_ghost_packages(self):
22 raise NotImplementedError
Implement the steps
We will evolve our high level design by implementing fetch_response
and flag_ghost_packages
methods.
1from vulnerabilities.models import Package
2from vulnerabilities.pipelines import VulnerableCodePipeline
3
4
5class FlagGhostPackagesWithExampleOrg(VulnerableCodePipeline):
6 """Example improver pipeline to flag ghost packages."""
7
8 pipeline_id = "flag_ghost_package_with_example_org"
9
10 license_url = "https://exmaple.org/license/"
11 spdx_license_expression = "CC-BY-4.0"
12
13 @classmethod
14 def steps(cls):
15 return (
16 cls.fetch_response,
17 cls.flag_ghost_packages,
18 )
19
20 def fetch_response(self):
21 # Since this is imaginary source we will mock the response
22 # In actual implementation you need to use request library to get data.
23 mock_response = {
24 "non-existent": [
25 "pkg:npm/626@1.1.1",
26 "pkg:npm/bootstrap-tagsinput@0.8.0",
27 "pkg:npm/dojo@1.0.0",
28 "pkg:npm/dojo@1.1.0",
29 "pkg:npm/electron@1.8.0",
30 ]
31 }
32 self.fetched_data = mock_response
33
34 def flag_ghost_packages(self):
35 non_existent_packages = self.fetched_data.get("non-existent", [])
36
37 ghost_packages = Package.objects.filter(package_url__in=non_existent_packages)
38 ghost_package_count = ghost_packages.count()
39
40 ghost_packages.update(is_ghost=True)
41
42 self.log(f"Successfully flagged {ghost_package_count:,d} ghost Packages")
Attention
Implement on_failure
to handle cleanup in case of pipeline failure.
Cleanup of downloaded archives or cloned repos is necessary to avoid potential resource leakage.
Note
make valid
to format your new code using black and isort automatically.make check
to check for formatting errors.Register the Improver Pipeline
Finally, register your improver in the improver registry at vulnerabilities/improvers/__init__.py
1from vulnerabilities.pipeline import enhance_with_kev
2from vulnerabilities.pipeline import flag_ghost_package_with_example_org
3
4IMPROVERS_REGISTRY = [
5 enhance_with_kev.VulnerabilityKevPipeline,
6 flag_ghost_package_with_example_org.FlagGhostPackagesWithExampleOrg,
7]
8
9IMPROVERS_REGISTRY = {
10 x.pipeline_id if issubclass(x, VulnerableCodePipeline) else x.qualified_name: x
11 for x in IMPROVERS_REGISTRY
12}
Congratulations! You have written your first improver pipeline.
Run Your First Improver Pipeline
If everything went well, you will see your improver in the list of available improvers.
$ ./manage.py improve --list
Vulnerability data can be processed by these available improvers:
enhance_with_kev
flag_ghost_package_with_example_org
Now, run the improver.
$ ./manage.py improve flag_ghost_package_with_example_org
Improving data using flag_ghost_package_with_example_org
INFO 2024-10-17 14:37:54.482 Pipeline [FlagGhostPackagesWithExampleOrg] starting
INFO 2024-10-17 14:37:54.482 Step [fetch_response] starting
INFO 2024-10-17 14:37:54.482 Step [fetch_response] completed in 0 seconds
INFO 2024-10-17 14:37:54.482 Step [flag_ghost_packages] starting
INFO 2024-10-17 14:37:54.488 Successfully flagged 5 ghost Packages
INFO 2024-10-17 14:37:54.488 Step [flag_ghost_packages] completed in 0 seconds
INFO 2024-10-17 14:37:54.488 Pipeline completed in 0 seconds
See Command Line Interface for command line usage instructions.
Tip
If you need to improve package vulnerability relations created using a certain pipeline, simply use the pipeline_id to filter out only those items. For example, if you want to improve only those AffectedByPackageRelatedVulnerability entries that were created by npm_importer pipeline, you can do so with the following query:
AffectedByPackageRelatedVulnerability.objects.filter(created_by=NpmImporterPipeline.pipeline_id)
Note
Make sure to use properly optimized query sets, and wherever needed, use paginated query sets.