Add a new pipeline to import advisories

TL;DR

  1. Create a new file {name}_importer.py inside vulnerabilities/pipelines/.

  2. Create a new importer pipeline by inheriting VulnerableCodeBaseImporterPipeline defined in vulnerabilities.pipelines. By convention the importer pipeline class should end with ImporterPipeline.

  3. Specify the license of upstream data being imported.

  4. Implement the advisories_count and collect_advisories methods.

  5. Add the newly created importer pipeline to the importers registry at vulnerabilities/importers/__init__.py

Pipeline

We use aboutcode.pipeline for importing and improving data. At a very high level, a working pipeline contains classmethod steps that defines what steps to run and in what order. These steps are essentially just functions. Pipeline provides an easy and effective way to log events inside these steps (it automatically handles rendering and dissemination for these logs.)

It also includes built-in progress indicator, which is essential since some of the jobs we run in the pipeline are long-running tasks that require proper progress indicators. Pipeline provides way to seamlessly records the progress (it automatically takes care of rendering and dissemination of these progress).

Additionally, the pipeline offers a consistent structure, making it easy to run these pipeline steps with message queue like RQ and store all events related to a particular pipeline for debugging/improvements.

This tutorial contains all the things one should know to quickly implement an importer pipeline. Many internal details about importer pipeline can be found inside the vulnerabilities/pipelines/__init__.py file.

Prerequisites

Before writing pipeline to import advisories, it is important to familiarize yourself with the following concepts.

PackageURL

VulnerableCode extensively uses Package URLs to identify a package. See the PackageURL specification and its Python implementation for more details.

Example usage:

from packageurl import PackageURL
purl = PackageURL(name="ffmpeg", type="deb", version="1.2.3")

AdvisoryData

AdvisoryData is an intermediate data format: it is expected that your importer will convert the raw scraped data into AdvisoryData objects. All the fields in AdvisoryData dataclass are optional; it is the importer’s responsibility to ensure that it contains meaningful information about a vulnerability.

AffectedPackage

AffectedPackage data type is used to store a range of affected versions and a fixed version of a given package. For all version-related data, univers library is used.

Univers

univers is a Python implementation of the vers specification. It can parse and compare all the package versions and all the ranges, from debian, npm, pypi, ruby and more. It processes all the version range specs and expressions.

Writing an Importer Pipeline

Create file for the new importer pipeline

All pipelines, including the importer pipeline, are located in the vulnerabilities/pipelines/ directory.

The importer pipeline is implemented by subclassing VulnerableCodeBaseImporterPipeline and implementing the unimplemented methods. Since most tasks, such as inserting AdvisoryData into the database and creating package-vulnerability relationships, are the same regardless of the source of the advisory, these tasks are already taken care of in the base importer pipeline, i.e., VulnerableCodeBaseImporterPipeline. You can simply focus on collecting the raw data and parsing it to create proper AdvisoryData objects.

Specify the importer license

The pipeline scrape data off the internet. In order to make sure the data is useable, a license must be provided.

Populate the spdx_license_expression with the appropriate value. The SPDX license identifiers can be found at ScanCode LicenseDB.

Note

An SPDX license identifier by itself is a valid license expression. In case you need more complex expressions, see https://spdx.github.io/spdx-spec/v2.3/SPDX-license-expressions/

Implement the advisories_count method

The advisories_count method returns the total number of advisories that will be collected by this pipeline.

Suppose the upstream data is a single JSON file containing a list of security advisories; in that case, you can simply return the count of security advisories in the JSON file, and that’s it.

Note

In some cases, it could be difficult to get the exact total number of advisories that would be collected without actually processing the advisories. In such case returning the best estimate will also work.

advisories_count is used to enable a proper progress indicator and is not used beyond that. If it is impossible (a super rare case) to compute the total advisory count beforehand, just return 0.

Implement the collect_advisories method

The collect_advisories method collects and parses the advisories from the data source and yield an AdvisoryData.

At this point, an example importer will look like this:

vulnerabilities/pipelines/example_importer.py
 1from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
 2
 3class ExampleImporterPipeline(VulnerableCodeBaseImporterPipeline):
 4    """Collect advisories Example."""
 5
 6    pipeline_id = "example_importer"
 7
 8    root_url = "https://example.org/path/to/advisories/"
 9    license_url = "https://exmaple.org/license/"
10    spdx_license_expression = "CC-BY-4.0"
11    importer_name = "Example Importer"
12
13    @classmethod
14    def steps(cls):
15        return (
16            cls.collect_and_store_advisories,
17            cls.import_new_advisories,
18        )
19
20    def advisories_count(self) -> int:
21        raise NotImplementedError
22
23    def collect_advisories(self) -> Iterable[AdvisoryData]:
24        raise NotImplementedError

This pipeline is only a valid skeleton and does not import anything at all.

Let us implement a working pipeline that actually imports some data.

Here we have a dummy_package which follows NginxVersionRange and SemverVersion for version management from univers.

Note

It is possible that the versioning scheme you are targeting has not yet been implemented in the univers library. If this is the case, you will need to head over there and implement one.

vulnerabilities/pipelines/example_importer.py
 1from datetime import datetime
 2from datetime import timezone
 3from typing import Iterable
 4
 5from packageurl import PackageURL
 6from univers.version_range import NginxVersionRange
 7from univers.versions import SemverVersion
 8
 9from vulnerabilities.importer import AdvisoryData
10from vulnerabilities.importer import AffectedPackage
11from vulnerabilities.importer import Reference
12from vulnerabilities.importer import VulnerabilitySeverity
13from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
14from vulnerabilities.severity_systems import SCORING_SYSTEMS
15
16
17class ExampleImporterPipeline(VulnerableCodeBaseImporterPipeline):
18    """Collect advisories Example."""
19
20    pipeline_id = "example_importer"
21
22    root_url = "https://example.org/path/to/advisories/"
23    license_url = "https://example.org/license/"
24    spdx_license_expression = "CC-BY-4.0"
25    importer_name = "Example Importer"
26
27    @classmethod
28    def steps(cls):
29        return (
30            cls.collect_and_store_advisories,
31            cls.import_new_advisories,
32        )
33
34    def advisories_count(self) -> int:
35        return len(fetch_advisory_data())
36
37    def collect_advisories(self) -> Iterable[AdvisoryData]:
38        raw_data = fetch_advisory_data()
39        for data in raw_data:
40            yield parse_advisory_data(data)
41
42
43def fetch_advisory_data():
44    return [
45        {
46            "id": "CVE-2021-23017",
47            "summary": "1-byte memory overwrite in resolver",
48            "advisory_severity": "medium",
49            "vulnerable": "0.6.18-1.20.0",
50            "fixed": "1.20.1",
51            "reference": "http://mailman.nginx.org/pipermail/nginx-announce/2021/000300.html",
52            "published_on": "14-02-2021 UTC",
53        },
54        {
55            "id": "CVE-2021-1234",
56            "summary": "Dummy advisory",
57            "advisory_severity": "high",
58            "vulnerable": "0.6.18-1.20.0",
59            "fixed": "1.20.1",
60            "reference": "http://example.org/cve-2021-1234",
61            "published_on": "06-10-2021 UTC",
62        },
63    ]
64
65
66def parse_advisory_data(raw_data) -> AdvisoryData:
67    purl = PackageURL(type="example", name="dummy_package")
68    affected_version_range = NginxVersionRange.from_native(raw_data["vulnerable"])
69    fixed_version = SemverVersion(raw_data["fixed"])
70    affected_package = AffectedPackage(
71        package=purl, affected_version_range=affected_version_range, fixed_version=fixed_version
72    )
73    severity = VulnerabilitySeverity(
74        system=SCORING_SYSTEMS["generic_textual"], value=raw_data["advisory_severity"]
75    )
76    references = [Reference(url=raw_data["reference"], severities=[severity])]
77    date_published = datetime.strptime(raw_data["published_on"], "%d-%m-%Y %Z").replace(
78        tzinfo=timezone.utc
79    )
80    advisory_url = f"https://example.org/advisory/{raw_data['id']}"
81
82    return AdvisoryData(
83        aliases=[raw_data["id"]],
84        summary=raw_data["summary"],
85        affected_packages=[affected_package],
86        references=references,
87        url=advisory_url,
88        date_published=date_published,
89    )

Important

Steps should include collect_and_store_advisories and import_new_advisories in the order shown above. They are defined in VulnerableCodeBaseImporterPipeline.

It is the collect_and_store_advisories that is responsible for making calls to collect_advisories and advisories_count, and hence collect_advisories and advisories_count should never be directly added in steps.

Attention

Implement on_failure to handle cleanup in case of pipeline failure. Cleanup of downloaded archives or cloned repos is necessary to avoid potential resource leakage.

Note

Use make valid to format your new code using black and isort automatically.
Use make check to check for formatting errors.

Register the Importer Pipeline

Finally, register your pipeline in the importer registry at vulnerabilities/importers/__init__.py

vulnerabilities/importers/__init__.py
 1from vulnerabilities.pipelines import example_importer
 2from vulnerabilities.pipelines import nginx_importer
 3
 4IMPORTERS_REGISTRY = [
 5    nginx_importer.NginxImporterPipeline,
 6    example_importer.ExampleImporterPipeline,
 7    ]
 8
 9IMPORTERS_REGISTRY = {
10    x.pipeline_id if issubclass(x, VulnerableCodeBaseImporterPipeline) else x.qualified_name: x
11    for x in IMPORTERS_REGISTRY
12}

Congratulations! You have written your first importer pipeline.

Run Your First Importer Pipeline

If everything went well, you will see your pipeline in the list of available importers.

 $ ./manage.py import --list

 Vulnerability data can be imported from the following importers:
 nginx_importer
 example_importer

Now, run the importer.

$ ./manage.py import example_importer

Importing data using example_importer
INFO 2024-10-16 10:15:10.483 Pipeline [ExampleImporterPipeline] starting
INFO 2024-10-16 10:15:10.483 Step [collect_and_store_advisories] starting
INFO 2024-10-16 10:15:10.483 Collecting 2 advisories
INFO 2024-10-16 10:15:10.498 Successfully collected 2 advisories
INFO 2024-10-16 10:15:10.498 Step [collect_and_store_advisories] completed in 0 seconds
INFO 2024-10-16 10:15:10.498 Step [import_new_advisories] starting
INFO 2024-10-16 10:15:10.499 Importing 2 new advisories
INFO 2024-10-16 10:15:10.562 Successfully imported 2 new advisories
INFO 2024-10-16 10:15:10.563 Step [import_new_advisories] completed in 0 seconds
INFO 2024-10-16 10:15:10.563 Pipeline completed in 0 seconds

See Command Line Interface for command line usage instructions.