Implementing a new Processor

A processor is used to process log messages. Basically, a processor is called for every incoming log message (see Pipelines), which it then can modify. For this, some directions have to be considered.

Concurrency/IPC

Processors can run in parallel on multiple different system processes. Therefore, it is not guaranteed that a specific processor will see all incoming log messages. Inter-process-communication (IPC) must be used if information of log messages has to be shared between multiple processor instances. IPC is relatively slow and can not be used if the processor instances are located on different machines. In those cases it should be reconsidered if it is necessary that information is being shared or if an implementation of the functionality is generally sensible in the context of ths framework.

Getting started

class logprep.util.processor_generator.ProcessorCodeGenerator

If you want to implement a new processor, we have a tiny helper to generate the needed boilerplate code for you. You can run it in a python shell with:

1from logprep.util.processor_generator import ProcessorCodeGenerator
2processor_config = { "name": "NewProcessor", "base_class": "FieldManager" }
3generator = ProcessorCodeGenerator(**processor_config)
4generator.generate()

After the code is generated you have following new folders and files:

  • logprep/processor/<processor name> with a file processor.py and a file rule.py

  • tests/unit/processor/<processor name> with a file test_<processor name>.py and a file test_<processor name>_rule.py.

After registering your processor in logprep/registry.py you can start implementing tests and _apply_rules method as explained in the following sections. Do not forget to add the processor and rule configuration to

  • doc/source/user_manual/configuration/processor.rst

  • doc/source/user_manual/configuration/rule.rst

Processor

Processors must implement the interface Processor. If you want your processor to have processor specific configuration parameters you have to create a Config class inside your processor class definition. This Config class has to inherit from Processor.Config and it has to be written as a attrs dataclass with kw_only set to true.

 1"""
 2NewProcessor
 3------------
 4
 5Write your processor description here. It will be rendered in the processor documentation.
 6
 7Example
 8^^^^^^^
 9..  code-block:: yaml
10    :linenos:
11
12    - newprocessorname:
13        type: new_processor
14        specific_rules:
15            - tests/testdata/rules/specific/
16        generic_rules:
17            - tests/testdata/rules/generic/
18        new_config_parameter: config_value
19
20"""
21from logprep.abc.processor import Processor
22from attrs import define, field
23
24class NewProcessor(Processor):
25    """short docstring for new_processor"""
26    @define(kw_only=True)
27    class Config(Processor.Config):
28        """NewProcessor config"""
29        new_config_parameter: str = field(...)
30        """the new processor specific config parameter"""
31
32    __slots__ = ["processor_attribute"]
33
34    processor_attribute: list
35
36    def __init__(self, name, configuration, logger):
37        super().__init__(name, configuration, logger)
38        self.processor_attribute = []
39
40    def _apply_rules(self, event, rule):
41        """your implemented workload"""
42        ...

The rules must implement the interface Rule.

setup, shut_down

The method setup() is called before the first log message will be processed, the method shut_down() after the last log message was processed.

Those methods could be implemented to create additional data structures and to release them after processing has finished.

process

This method is implemented in the Processor and is called for every log message. To process the event it invokes the processors apply_rules method. If you want to do somthing to the event after all rules have been applied, then you could overwrite this method and implement your code after calling the super().process(event). The log message is being passed as a dictionary and modified ‘in place’, meaning that modifications are being performed directly on the input event.

1 def process(self, event: dict):
2     super().process(event)
3     if self.new_config_parameter:
4         self._do_more_stuff()

Warning

It is possible to cancel processing of a log message and to discard it by deleting all of its fields. This could be used if a large amounts of useless logs are being generated, but it does not conform to the goal of Logprep and should be avoided.

Exceptions/Error Handling

An exception should be thrown if an error occurs during the processing of a log message. All exceptions are being logged and should return a helpful error message with str(exception). Exceptions derived from ProcessorWarningError have no impact on the operation of the processor. Other exceptions stop the processing of a log message. However, the log message will be separately stored as failed (see Output, store_failed`).

Metrics

To achieve implementing new processor specific metrics you have to implement a embedded class Metrics inside the processor class which inherits from Component.Metrics. For further information about metrics see the reference implementation in the Amides Processor.

The following code example highlights an implementation of processor specific metrics, aligned with the general implementation of a new processor seen in Getting started.

 1"""Processor Documentation"""
 2from logprep.abc.processor import Processor
 3from logprep.metrics.metrics import CounterMetric
 4from attrs import define
 5
 6class NewProcessor(Processor):
 7    """short docstring for new_processor"""
 8
 9    @define(kw_only=True)
10    class Config(Processor.Config):
11        """NewProcessor config"""
12        ...
13
14    @define(kw_only=True)
15    class Metrics(Component.Metrics):
16        """Tracks statistics about the NewProcessor"""
17
18        new_metric: CounterMetric = field(
19            factory=lambda: CounterMetric(
20                description="Short description of this metric",
21                name="new_metric",
22            )
23        )
24        """Short description of this metric"""
25
26
27    __slots__ = ["processor_attribute"]
28
29    processor_attribute: list
30
31    def __init__(self, name, configuration, logger):
32        super().__init__(name, configuration, logger)
33        self.processor_attribute = []
34        self.metrics = self.NewProcessorMetrics(
35            labels=self.metric_labels,
36            generic_rule_tree=self._generic_tree.metrics,
37            specific_rule_tree=self._specific_tree.metrics,
38        )
39
40    def _apply_rules(self, event, rule):
41        """your implemented workload"""
42        ...

After initialization of these new metrics it is necessary to increase or change them accordingly. This can be simply done by accessing the attribute and changing it’s value. For example, the following code will increase the metrics inside the apply_rules method:

1def _apply_rules(self, event, rule):
2    """your implemented workload"""
3    ...
4    if something_happens:
5        self.metrics.new_metric += 1

If the processor already has processor specific metrics and only one new metric value is needed, it can simply be created by adding a new attribute to the ProcessorMetrics class. Once the new attribute exists, it can be accessed and updated when needed. The exporter will automatically recognize it as a new metric and will expose it as such.

Tests

While developing the new processor you have to create a test class under tests.unit.processor.yourprocessor_package.processor. Your test class has to inherit from BaseProcessorTestCase. It will help you to implement the necessary methods the right way. All tests should pass successfully. Appropriate tests for the processor specific functions have to be implemented independently.