22
33from __future__ import annotations
44
5+ import ipaddress
56import json
67import logging
7- import re
88from typing import Callable , Iterable , Optional
9- from urllib .parse import urlencode , urljoin
9+ from urllib .parse import urlencode , urljoin , urlunparse
1010
1111import requests
1212from requests import Session , Response
1919# noinspection PyUnresolvedReferences
2020urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning )
2121
22+ HTTP = "http"
2223HTTPS = "https"
2324PORT_443 = 443
2425PORT_80 = 80
@@ -59,7 +60,7 @@ def __init__(self, **kwargs):
5960 :param bool logging_error: Logging only the REST API response with error.
6061 `True` - Enable errors logging, `False` - otherwise. Default is `False`.
6162 """
62- self .host = str ( kwargs . get ( "host" ) )
63+ self .host = _init_host ( ** kwargs )
6364 self .username = str (kwargs .get ("username" ))
6465 self .password = str (kwargs .get ("password" ))
6566 self .token = _init_token (** kwargs )
@@ -81,7 +82,7 @@ def __repr__(self):
8182 host = self .host
8283 username = self .username
8384 scheme = self .scheme
84- port = self .port if not ( scheme == HTTPS and self . port == PORT_443 ) else ""
85+ port = self .port
8586 timeout = self .timeout
8687 verify = self .verify
8788 vdom = self .vdom
@@ -123,11 +124,8 @@ def is_connected(self) -> bool:
123124 @property
124125 def url (self ) -> str :
125126 """Return URL to the Fortigate."""
126- if self .scheme == HTTPS and self .port == 443 :
127- return f"{ self .scheme } ://{ self .host } "
128- if self .scheme == "http" and self .port == 80 :
129- return f"{ self .scheme } ://{ self .host } "
130- return f"{ self .scheme } ://{ self .host } :{ self .port } "
127+ components = (self .scheme , f"{ self .host } :{ self .port } " , "/" , "" , "" , "" )
128+ return urlunparse (components )
131129
132130 # ============================ login =============================
133131
@@ -145,7 +143,7 @@ def login(self) -> None:
145143 if self .token :
146144 try :
147145 response : Response = session .get (
148- url = f" { self .url } /api/v2/monitor/system/status" ,
146+ url = urljoin ( self .url , " /api/v2/monitor/system/status") ,
149147 headers = self ._bearer_token (),
150148 verify = self .verify ,
151149 )
@@ -158,7 +156,7 @@ def login(self) -> None:
158156 # password
159157 try :
160158 response = session .post (
161- url = f" { self .url } /logincheck" ,
159+ url = urljoin ( self .url , " /logincheck") ,
162160 data = urlencode ([("username" , self .username ), ("secretkey" , self .password )]),
163161 timeout = self .timeout ,
164162 verify = self .verify ,
@@ -182,13 +180,12 @@ def logout(self) -> None:
182180 if not self .token :
183181 try :
184182 self ._session .get (
185- url = f" { self .url } /logout" ,
183+ url = urljoin ( self .url , " /logout") ,
186184 timeout = self .timeout ,
187185 verify = self .verify ,
188186 )
189187 except SSLError :
190188 pass
191- del self ._session
192189 self ._session = None
193190
194191 # =========================== helpers ============================
@@ -251,7 +248,7 @@ def _init_port(self, **kwargs) -> int:
251248 """Init port, 443 for scheme=`https`, 80 for scheme=`http`."""
252249 if port := int (kwargs .get ("port" ) or 0 ):
253250 return port
254- if self .scheme == "http" :
251+ if self .scheme == HTTP :
255252 return PORT_80
256253 return PORT_443
257254
@@ -281,7 +278,7 @@ def _response(self, method: Method, url: str, data: ODAny = None) -> Response:
281278 :rtype: Response
282279 """
283280 params : DAny = {
284- "url" : self ._valid_url ( url ),
281+ "url" : urljoin ( self .url , url ),
285282 "params" : urlencode ([("vdom" , self .vdom )]),
286283 "timeout" : self .timeout ,
287284 "verify" : self .verify ,
@@ -299,26 +296,37 @@ def _response(self, method: Method, url: str, data: ODAny = None) -> Response:
299296 raise self ._hide_secret_ex (ex )
300297 return response
301298
302- def _valid_url (self , url : str ) -> str :
303- """Return a valid URL string.
304299
305- Add `https://` to `url` if it is absent and remove trailing `/` character.
306- """
307- if re .match ("http(s)?://" , url ):
308- return url .rstrip ("/" )
309- path = url .strip ("/" )
310- return urljoin (self .url , path )
300+ # =========================== helpers ============================
311301
312302
313- # =========================== helpers ============================
303+ def _init_host (** kwargs ) -> str :
304+ """Init host: valid hostname or valid IPv4/IPv6 address."""
305+ host = str (kwargs .get ("host" , "" )).strip ()
306+ if not host :
307+ raise ValueError (f"{ host = !r} , hostname is not specified." )
308+
309+ # Strip brackets if provided (IPv6 URL style)
310+ if host .startswith ("[" ) and host .endswith ("]" ):
311+ host = host [1 :- 1 ]
312+
313+ # Try IP validation first
314+ try :
315+ ip = ipaddress .ip_address (host )
316+ if isinstance (ip , ipaddress .IPv6Address ):
317+ host = f"[{ host } ]"
318+ except ValueError :
319+ pass # hostname
320+
321+ return host
314322
315323
316324def _init_scheme (** kwargs ) -> str :
317325 """Init scheme `https` or `http`."""
318326 scheme = str (kwargs .get ("scheme" ) or HTTPS )
319- expected = ["https" , "http" ]
327+ expected = [HTTPS , HTTP ]
320328 if scheme not in expected :
321- raise ValueError (f"{ scheme = } , { expected = } ." )
329+ raise ValueError (f"{ scheme = !r } , { expected = !r } ." )
322330 return scheme
323331
324332
@@ -328,7 +336,7 @@ def _init_token(**kwargs) -> str:
328336 if not token :
329337 return ""
330338 if kwargs .get ("username" ):
331- raise ValueError ("Mutually excluded: username, token." )
339+ raise ValueError ("A username and a token are mutually exclusive ." )
332340 if kwargs .get ("password" ):
333- raise ValueError ("Mutually excluded: password, token." )
341+ raise ValueError ("A password and a token are mutually exclusive ." )
334342 return token
0 commit comments