Python SIEM Query Utils with Ibis support for scalable data processing.
- Azure Sentinel Integration: Query Log Analytics workspaces with KQL
- Multiple Output Formats: Support for pandas DataFrame, CSV, JSON, list, and Ibis expressions
- Scalable Processing: Built on Ibis for efficient data transformations
- Modern Architecture: Built on uv for fast dependency management
pip install wagov-squgit clone https://github.com/wagov/nbdev-squ.git
cd nbdev-squ
uv sync --devSet the SQU_CONFIG environment variable to specify Azure Key Vault configuration:
import os
os.environ["SQU_CONFIG"] = "keyvault_name/tenant_id"You can also configure services directly with environment variables:
export SQU_JIRA_URL="https://yourorg.atlassian.net"
export SQU_JIRA_USERNAME="[email protected]"
export SQU_JIRA_PASSWORD="your-api-token"
export SQU_RUNZERO_APITOKEN="your-runzero-token"
export SQU_ABUSEIPDB_API_KEY="your-abuseipdb-key"
export SQU_TENABLE_ACCESS_KEY="your-tenable-key"
export SQU_TENABLE_SECRET_KEY="your-tenable-secret"from wagov_squ import list_workspaces, query_all, Fmt
# List Azure Sentinel workspaces (returns pandas DataFrame by default)
workspaces = list_workspaces()
print(workspaces)
# Get data in different formats
csv_data = list_workspaces(fmt=Fmt.csv)
json_data = list_workspaces(fmt=Fmt.json)
list_data = list_workspaces(fmt=Fmt.list)
# Execute KQL queries
results = query_all("SecurityEvent | take 10")from wagov_squ import list_workspaces, Fmt
# Get data as Ibis expression for advanced processing
ibis_expr = list_workspaces(fmt=Fmt.ibis)
# Use Ibis for complex transformations
filtered = ibis_expr.filter(ibis_expr.alias == "MyAgency")
result = filtered.to_pandas() # Convert back to pandas when neededlist_workspaces(fmt="df", agency="ALL")- List Azure Sentinel workspaceslist_securityinsights(fmt="df")- List Security Insights resourcesquery_all(query, fmt="df", timespan=14d)- Execute KQL queries across workspacesclients.jira- Access Jira API client
Fmt.pandasor"df"- pandas DataFrame (default)Fmt.csvor"csv"- CSV stringFmt.jsonor"json"- List of dictionariesFmt.listor"list"- List of listsFmt.ibisor"ibis"- Ibis expression for advanced processing
wagov-squ includes a built-in dbt-duckdb plugin for seamless integration. Configure your dbt profiles.yml:
your_project:
target: dev
outputs:
dev:
type: duckdb
path: 'dbt.duckdb'
plugins:
- module: 'wagov_squ.api'
alias: 'squ'Then use in your dbt models:
-- models/security/persistence_hunting.sql
{{ config(materialized='view') }}
SELECT * FROM (
SELECT * FROM squ(kql_path='queries/persistence_detection.kql', timespan='14d')
)Create queries/persistence_detection.kql:
DeviceProcessEvents
| where ActionType == "ProcessCreated"
| where ProcessCommandLine has_all (dynamic(['reg',' ADD', @'Software\Microsoft\Windows\CurrentVersion\Run']))
| where InitiatingProcessFileName !in (dynamic(['Discord.exe','Skype.exe']))
| project Timestamp, DeviceName, ProcessCommandLine, InitiatingProcessFileNameOr query workspaces directly:
SELECT * FROM squ(list_workspaces=true)# models/security_events.py
from sqlmesh import model
from wagov_squ import query_all, Fmt
@model(
"security.persistence_events",
kind="view",
cron="@daily"
)
def persistence_events(context):
kql_query = """
DeviceProcessEvents
| where ActionType == "ProcessCreated"
| where ProcessCommandLine contains "reg add"
| project Timestamp, DeviceName, ProcessCommandLine
"""
# Get data as Ibis expression for further processing
data = query_all(kql_query, fmt=Fmt.ibis)
return data.filter(data.Timestamp >= context.start_date)Configuration secrets are cached securely in the user_cache_dir with restricted permissions. Ensure:
- System runs on encrypted disk
- User cache directory access is restricted
- No external logging of cache directory
- Use isolated VMs/workstations for sensitive activities
just install # Install development dependencies
just test # Run all tests
just test-fast # Run unit tests only
just test-integration # Run integration tests (requires SQU_CONFIG)
just lint # Format and lint code
just build # Build package
just complexity # Analyze code complexityIntegration tests require Azure authentication via SQU_CONFIG:
export SQU_CONFIG="keyvault_name/tenant_id"
just test-integrationApache-2.0