Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys
import time
import logging
import platform
from optparse import OptionParser
from threading import Thread, Event

Expand All @@ -20,6 +21,7 @@
from config import config
from config.providers import FileConfigProvider
from config.default import DEFAULT_PATH
from config.config import AGENT_VERSION

from utils.logs import initialize_logging
from utils.hostname import HostnameException, get_hostname
Expand All @@ -28,6 +30,7 @@
from utils.pidfile import PidFile
from utils.network import get_proxy, get_site_url
from utils.flare import Flare
from utils.platform import get_os
from metadata import get_metadata

from collector import Collector
Expand All @@ -43,7 +46,6 @@


# Globals
AGENT_VERSION = '1.1.6'
PID_NAME = 'datadog-unix-agent'

log = logging.getLogger('agent')
Expand Down Expand Up @@ -120,6 +122,7 @@ class Agent(Daemon):
'restart': False,
'status': False,
'flare': False,
'version': False,
}

STATUS_TIMEOUT = 5
Expand Down Expand Up @@ -200,22 +203,19 @@ def run(self):
logging.info("Starting the Forwarder")
api_key = config.get('api_key')
dd_url = config.get('dd_url')
forwarder_timeout = config.get("forwarder_timeout")
if not dd_url:
logging.error('No Datadog URL configured - cannot continue')
sys.exit(1)
if not api_key:
logging.error('No API key configured - cannot continue')
sys.exit(1)

# get proxy settings
proxies = get_proxy()
logging.debug('Proxy configuration used: %s', proxies)

# get site url
forwarder = Forwarder(
api_key,
get_site_url(dd_url, site=config.get('site')),
proxies=proxies,
forwarder_timeout=forwarder_timeout,
)
forwarder.start()

Expand Down Expand Up @@ -377,6 +377,12 @@ def main():
elif 'status' == command:
agent.status(config)

elif 'version' == command:
os_name = get_os()
os_release = platform.release()
py_version = sys.version.split()[0]
print(f"Datadog Unix Agent: {AGENT_VERSION} - Python: {py_version} - OS: {os_name} {os_release}")

elif 'flare' == command:
case_id = input('Do you have a support case id? Please enter it here (otherwise just hit enter): ').lower()
agent.flare(config, case_id)
Expand Down
3 changes: 3 additions & 0 deletions config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

from .providers import ConfigProvider

# CONSTANTS
AGENT_VERSION = "1.1.6"


log = logging.getLogger(__name__)

Expand Down
1 change: 1 addition & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def init(config):
'aggregator_interval': DEFAULT_AGGREGATOR_INTERVAL,
'aggregator_expiry_seconds': DEFAULT_AGGREGATOR_EXPIRY_SECS,
'recent_point_threshold': DEFAULT_RECENT_POINT_THRESHOLD,
'skip_ssl_validation': False,
'proxy': {
'http': None,
'https': None,
Expand Down
1 change: 1 addition & 0 deletions datadog.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ api_key: <api_key_here> # DD API key: required.
# bind_host: localhost # address we'll be binding to
# port: 8125 # dogstatsd UDP listening port
# non_local_traffic: false # listen to non-local traffic
# skip_ssl_validation: false # Skip SSL certificate verification (insecure)
78 changes: 51 additions & 27 deletions forwarder/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

from .worker import Worker, RetryWorker
from .transaction import Transaction

from config.default import DEFAULT_FORWARDER_TO
from utils.stats import Stats
from utils.http import get_shared_requests

log = logging.getLogger(__name__)

Expand All @@ -20,12 +21,12 @@ class Forwarder(object):
V1_SERIES_ENDPOINT = "/api/v1/series"
V1_SERVICE_CHECKS_ENDPOINT = "/api/v1/check_run"

DD_API_HEADER = "DD-Api-Key"
DD_API_HEADER = "DD-API-KEY"

QUEUES_SIZE = 100
WORKER_JOIN_TIME = 2

def __init__(self, api_key, domain, nb_worker=4, proxies={}):
def __init__(self, api_key, domain, forwarder_timeout=DEFAULT_FORWARDER_TO, nb_worker=4):
self.api_key = api_key
self.domain = domain
self.stats = Stats()
Expand All @@ -34,10 +35,14 @@ def __init__(self, api_key, domain, nb_worker=4, proxies={}):
self.workers = []
self.nb_worker = nb_worker
self.retry_worker = None
self.proxies = proxies

self.forwarder_timeout = forwarder_timeout
# --------------------------------------------------------------------------
# Lifecycle management
# --------------------------------------------------------------------------
def start(self):
self.retry_worker = RetryWorker(self.input_queue, self.retry_queue, self.stats)
"""Start retry worker and all forwarder workers."""
self.retry_worker = RetryWorker(
self.input_queue, self.retry_queue, self.stats)
self.retry_worker.start()

for i in range(self.nb_worker):
Expand All @@ -46,6 +51,7 @@ def start(self):
self.workers.append(w)

def stop(self):
"""Stop all workers gracefully."""
self.retry_worker.stop()

for w in self.workers:
Expand All @@ -63,28 +69,46 @@ def stop(self):
log.error("Could not stop thread '%s'", w.name)
self.workers = []

def _submit_payload(self, endpoint, payload, extra_header=None):
endpoint += "?api_key=" + self.api_key

if extra_header:
extra_header[self.DD_API_HEADER] = self.api_key
else:
extra_header = {self.DD_API_HEADER: self.api_key}
# --------------------------------------------------------------------------
# Internal payload submission helper
# --------------------------------------------------------------------------
def _submit_payload(self, endpoint, payload, extra_headers=None):
"""Create and enqueue a Transaction for the given payload and endpoint."""
shared_requests = get_shared_requests()
base_options = shared_requests.options.copy()
base_options["timeout"] = self.forwarder_timeout

# Build headers for this transaction
headers = base_options.get("headers", {}).copy()
headers[self.DD_API_HEADER] = self.api_key
if extra_headers:
headers.update(extra_headers)
base_options["headers"] = headers

# Create the transaction
t = Transaction(payload, self.domain, endpoint, options=base_options)

t = Transaction(payload, self.domain, endpoint, extra_header, proxies=self.proxies)
try:
self.input_queue.put_nowait(t)
except queue.Full as e:
log.error("Could not submit transaction to '%s', queue is full (dropping it): %s", endpoint, e)

def submit_v1_series(self, payload, extra_header):
self.stats.inc_stat('series_payloads', 1)
self._submit_payload(self.V1_SERIES_ENDPOINT, payload, extra_header)

def submit_v1_intake(self, payload, extra_header):
self.stats.inc_stat('intake_payloads', 1)
self._submit_payload(self.V1_ENDPOINT, payload, extra_header)

def submit_v1_service_checks(self, payload, extra_header):
self.stats.inc_stat('service_check_payloads', 1)
self._submit_payload(self.V1_SERVICE_CHECKS_ENDPOINT, payload, extra_header)
log.error(
"Could not submit transaction to '%s', queue is full (dropping it): %s",
endpoint,
e,
)

# --------------------------------------------------------------------------
# Public submission helpers
# --------------------------------------------------------------------------
def submit_v1_series(self, payload, extra_headers=None):
self.stats.inc_stat("series_payloads", 1)
self._submit_payload(self.V1_SERIES_ENDPOINT, payload, extra_headers)

def submit_v1_intake(self, payload, extra_headers=None):
self.stats.inc_stat("intake_payloads", 1)
self._submit_payload(self.V1_ENDPOINT, payload, extra_headers)

def submit_v1_service_checks(self, payload, extra_headers=None):
self.stats.inc_stat("service_check_payloads", 1)
self._submit_payload(
self.V1_SERVICE_CHECKS_ENDPOINT, payload, extra_headers)
40 changes: 29 additions & 11 deletions forwarder/tests/test_forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
import queue

from forwarder import Forwarder
from config import config
from config.config import AGENT_VERSION


DOMAIN = "https://app.datadoghq.com"
TIMEOUT = config.get("forwarder_timeout")

def test_forwarder_creation():
f = Forwarder("api_key", DOMAIN)
f = Forwarder("api_key", DOMAIN, TIMEOUT)
assert f.api_key == "api_key"
assert f.domain == "https://app.datadoghq.com"

def test_forwarder_start_stop():
f = Forwarder("api_key", "https://datadog.com", 2)
f = Forwarder("api_key", "https://datadog.com", TIMEOUT, 2)
f.start()

assert len(f.workers) == 2
Expand All @@ -42,34 +45,49 @@ def get_transaction(f):
raise Exception("input_queue should not be empty")

def test_submit_payload_():
f = Forwarder("api_key", DOMAIN)
f = Forwarder("api_key", DOMAIN, TIMEOUT)

# case 1: extra headers provided
f._submit_payload("test", "data", {"test": 21})
t = get_transaction(f)
assert t.payload == "data"
assert t.domain == DOMAIN
assert t.endpoint == "test?api_key=api_key"
assert t.headers == {"test": 21, Forwarder.DD_API_HEADER: "api_key"}
assert t.endpoint == "test"

expected_headers = {
"User-Agent": f"datadog-unix-agent/{AGENT_VERSION}",
"test": 21,
Forwarder.DD_API_HEADER: "api_key",
}
for k, v in expected_headers.items():
assert t.options["headers"][k] == v

# case 2: no extra headers
f._submit_payload("test", "data")
t = get_transaction(f)
assert t.payload == "data"
assert t.domain == DOMAIN
assert t.endpoint == "test?api_key=api_key"
assert t.headers == {Forwarder.DD_API_HEADER: "api_key"}
assert t.endpoint == "test"

expected_headers = {
"User-Agent": f"datadog-unix-agent/{AGENT_VERSION}",
Forwarder.DD_API_HEADER: "api_key",
}
for k, v in expected_headers.items():
assert t.options["headers"][k] == v

def test_submit_v1_series():
f = Forwarder("api_key", DOMAIN)
f = Forwarder("api_key", DOMAIN, TIMEOUT)
f.submit_v1_series("data", None)
t = get_transaction(f)

assert t.endpoint == "/api/v1/series?api_key=api_key"
assert t.endpoint == "/api/v1/series"
assert t.payload == "data"

def test_submit_v1_service_checks():
f = Forwarder("api_key", DOMAIN)
f = Forwarder("api_key", DOMAIN, TIMEOUT)
f.submit_v1_service_checks("data", None)
t = get_transaction(f)

assert t.endpoint == "/api/v1/check_run?api_key=api_key"
assert t.endpoint == "/api/v1/check_run"
assert t.payload == "data"
17 changes: 13 additions & 4 deletions forwarder/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@

def test_transaction_creation():
start = time.time()
t = Transaction("data", "https://datadog.com", "/v1/series", {"DD": "true", "Content-Type": "application/json"})

t = Transaction(
"data",
"https://datadog.com",
"/v1/series",
options={
"headers": {"DD": "true", "Content-Type": "application/json"},
"timeout": 20,
},
)
assert t.payload == "data"
assert t.domain == "https://datadog.com"
assert t.endpoint == "/v1/series"
assert t.headers == {"DD": "true", "Content-Type": "application/json"}
assert t.nb_try == 0
assert t.timeout == 20 # default value from config
assert t.options["headers"] == {"DD": "true", "Content-Type": "application/json"}
assert t.options["timeout"] == 20

# test created_at value
assert t.created_at >= start
Expand Down Expand Up @@ -73,7 +82,7 @@ def test_process_success(m):
assert t.nb_try == 1

def test_process_error(m):
t = Transaction("data", "https://datadog.com", "/v1/series", {"test": "21"})
t = Transaction("data", "https://datadog.com", "/v1/series", options={"headers": {"test": "21"}})
headers = {"test": "21"}

nb_try = 1
Expand Down
10 changes: 5 additions & 5 deletions forwarder/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def test_worker_process_transactions(m):
retry_queue = queue.Queue(2)
w = Worker(input_queue, retry_queue, stats)

t_success = Transaction("data", "https://datadog.com", "/success", None)
t_error = Transaction("data", "https://datadog.com", "/error", None)
t_success = Transaction("data", "https://datadog.com", "/success")
t_error = Transaction("data", "https://datadog.com", "/error")
m.post("https://datadog.com/success", status_code=200)
m.post("https://datadog.com/error", status_code=402)

Expand Down Expand Up @@ -74,11 +74,11 @@ def test_retry_worker_flush():
retry_queue = queue.Queue(1)
w = RetryWorker(input_queue, retry_queue, stats)

t_ready = Transaction("data", "https://datadog.com", "/success", None)
t_ready = Transaction("data", "https://datadog.com", "/success")
t_ready.next_flush = time.time() - 10
w.transactions.append(t_ready)

t_not_ready = Transaction("data", "https://datadog.com", "/success", None)
t_not_ready = Transaction("data", "https://datadog.com", "/success")
t_not_ready.next_flush = time.time() + 1000
w.transactions.append(t_not_ready)

Expand All @@ -101,7 +101,7 @@ def test_retry_worker_process_transaction():
w = RetryWorker(input_queue, retry_queue, stats, flush_interval=1)

# test pulling 1 transaction without flushing
t1 = Transaction("data", "https://datadog.com", "/success", None)
t1 = Transaction("data", "https://datadog.com", "/success")
t1.next_flush = time.time()
retry_queue.put(t1)

Expand Down
Loading
Loading