Skip to content
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,30 @@ for line in data:

```


### As a python logging.Filter

```python
import logging

from anonip import AnonipFilter

if __name__ == '__main__':
handler = logging.StreamHandler()
handler.addFilter(AnonipFilter())
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[handler]
)

logging.debug('192.0.2.123 - call from root logger')

logger = logging.getLogger('child')
logger.info('2001:db8:abcd:ef01:2345:6789:abcd:ef01 - call from child logger')
```


### Python 2 or 3?
For compatibility reasons, anonip uses the shebang `#! /usr/bin/env python`.
This will default to python2 on all Linux distributions except for Arch Linux.
Expand Down
45 changes: 45 additions & 0 deletions anonip.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import logging
import sys
from io import open
from collections import abc

try:
import ipaddress
Expand Down Expand Up @@ -260,6 +261,50 @@ def truncate_address(self, ip):
return ip.supernet(new_prefix=self._prefixes[ip.version])[0]


class AnonipFilter:
def __init__(self, args=None, extra=None, anonip=None):
"""
An implementation of Python logging.Filter using anonip.

:param args: list of log message args to filter. Defaults to []
:param extra: list of LogRecord attributes to filter. Defaults to []
:param anonip: dict of parameters for Anonip instance
"""
self.args = [] if args is None else args
self.extra = [] if extra is None else extra
self.anonip = Anonip(**(anonip or {}))

def filter(self, record):
"""
See logging.Filter.filter()
"""
if record.name != "anonip":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to make this method more readable. As a start we could exit early with

if record.name == "anonip":
    return True

for key in self.args:
if isinstance(record.args, abc.Mapping):
if key in record.args:
value = record.args[key]
if isinstance(value, str):
record.args[key] = self.anonip.process_line(value)
elif isinstance(record.args, abc.Sequence):
if key < len(record.args):
value = record.args[key]
if isinstance(value, str):
is_tuple = isinstance(record.args, tuple)
if is_tuple:
record.args = list(record.args)
record.args[key] = self.anonip.process_line(value)
if is_tuple:
record.args = tuple(record.args)

for key in self.extra:
if hasattr(record, key):
value = getattr(record, key)
if (isinstance(value, str)):
setattr(record, key, self.anonip.process_line(value))

return True


def _validate_ipmask(mask, bits=32):
"""
Verify if the supplied ip mask is valid.
Expand Down
77 changes: 77 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,80 @@ def test_properties_columns():
assert a.columns == [0]
a.columns = [5, 6]
assert a.columns == [4, 5]


def test_logging_filter_defaults(caplog):
logging.disable(logging.NOTSET)
logging.getLogger("anonip").setLevel(logging.CRITICAL)

logger = logging.getLogger("filter_defaults")
logger.addFilter(anonip.AnonipFilter())
logger.setLevel(logging.INFO)

logger.info("192.168.100.200 string")
logger.info("1.2.3.4 string")
logger.info("2001:0db8:85a3:0000:0000:8a2e:0370:7334 string")
logger.info("2a00:1450:400a:803::200e string")

assert caplog.record_tuples == [
("filter_defaults", logging.INFO, "192.168.96.0 string"),
("filter_defaults", logging.INFO, "1.2.0.0 string"),
("filter_defaults", logging.INFO, "2001:db8:85a0:: string"),
("filter_defaults", logging.INFO, "2a00:1450:4000:: string"),
]

logging.disable(logging.CRITICAL)


def test_logging_filter_args(caplog):
logging.disable(logging.NOTSET)
logging.getLogger("anonip").setLevel(logging.CRITICAL)

logger = logging.getLogger("filter_args")
logger.addFilter(anonip.AnonipFilter(args=["ip", "non-existing-attr"], extra=[]))
logger.setLevel(logging.INFO)

logger.info("%(ip)s string", {"ip": "192.168.100.200"})
logger.info("string %(ip)s", {"ip": "1.2.3.4"})
logger.info("%(ip)s string", {"ip": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"})
logger.info("string")

assert caplog.record_tuples == [
("filter_args", logging.INFO, "192.168.96.0 string"),
("filter_args", logging.INFO, "string 1.2.0.0"),
("filter_args", logging.INFO, "2001:db8:85a0:: string"),
("filter_args", logging.INFO, "string"),
]

logging.disable(logging.CRITICAL)


def test_logging_filter_extra(caplog):
logging.disable(logging.NOTSET)
logging.getLogger("anonip").setLevel(logging.CRITICAL)

logger = logging.getLogger("filter_args")
logger.addFilter(
anonip.AnonipFilter(
extra=["ip", "non-existing-key"], anonip={"ipv4mask": 16, "ipv6mask": 64}
)
)
logger.setLevel(logging.INFO)

logger.info("string", extra={"ip": "192.168.100.200"})
logger.info("string", extra={"ip": "1.2.3.4"})
logger.info("string", extra={"ip": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"})
logger.info("string", extra={"ip": "2a00:1450:400a:803::200e"})

expected = [
"192.168.0.0",
"1.2.0.0",
"2001:db8:85a3::",
"2a00:1450:400a:803::",
]

actual = [record.ip for record in caplog.records]

assert actual == expected

logging.disable(logging.CRITICAL)