Skip to content

Commit 5930103

Browse files
author
Zach Moody
authored
Merge pull request #216 from tyler-8/threaded
Add threading to .all() and .filter() requests
2 parents 2523bd9 + 8301002 commit 5930103

File tree

4 files changed

+69
-0
lines changed

4 files changed

+69
-0
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,13 @@ nb.dcim.devices.all()
3838
[test1-leaf1, test1-leaf2]
3939
```
4040

41+
### Threading
42+
43+
pynetbox supports multithreaded calls (in Python 3 only) for `.filter()` and `.all()` queries. It is **highly recommended** you have `MAX_PAGE_SIZE` in your Netbox install set to anything *except* `0` or `None`. The default value of `1000` is usually a good value to use. To enable threading, add `threading=True` parameter to the `.api`:
44+
45+
```python
46+
nb = pynetbox.api(
47+
'http://localhost:8000',
48+
threading=True,
49+
)
50+
```

pynetbox/api.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
"""
16+
import sys
17+
1618
import requests
1719

1820
from pynetbox.core.endpoint import Endpoint
@@ -152,6 +154,7 @@ def __init__(
152154
private_key=None,
153155
private_key_file=None,
154156
ssl_verify=True,
157+
threading=False,
155158
):
156159
if private_key and private_key_file:
157160
raise ValueError(
@@ -165,6 +168,10 @@ def __init__(
165168
self.ssl_verify = ssl_verify
166169
self.session_key = None
167170
self.http_session = requests.Session()
171+
if threading and sys.version_info.major == 2:
172+
raise NotImplementedError("Threaded pynetbox calls not supported \
173+
in Python 2")
174+
self.threading = threading
168175

169176
if self.private_key_file:
170177
with open(self.private_key_file, "r") as kf:

pynetbox/core/endpoint.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def all(self):
9696
session_key=self.session_key,
9797
ssl_verify=self.ssl_verify,
9898
http_session=self.api.http_session,
99+
threading=self.api.threading,
99100
)
100101

101102
return [self._response_loader(i) for i in req.get()]
@@ -220,6 +221,7 @@ def filter(self, *args, **kwargs):
220221
session_key=self.session_key,
221222
ssl_verify=self.ssl_verify,
222223
http_session=self.api.http_session,
224+
threading=self.api.threading,
223225
)
224226

225227
ret = [self._response_loader(i) for i in req.get()]

pynetbox/core/query.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
"""
16+
try:
17+
import concurrent.futures as cf
18+
except ImportError:
19+
pass
1620
import json
1721
from six.moves.urllib.parse import urlencode
1822

@@ -28,6 +32,11 @@ def url_param_builder(param_dict):
2832
return "?{}".format(urlencode(param_dict))
2933

3034

35+
def calc_pages(limit, count):
36+
""" Calculate number of pages required for full results set. """
37+
return int(count / limit) + (limit % count > 0)
38+
39+
3140
class RequestError(Exception):
3241
"""Basic Request Exception
3342
@@ -141,6 +150,7 @@ def __init__(
141150
session_key=None,
142151
ssl_verify=True,
143152
url=None,
153+
threading=False,
144154
):
145155
"""
146156
Instantiates a new Request object
@@ -164,6 +174,7 @@ def __init__(
164174
self.ssl_verify = ssl_verify
165175
self.http_session = http_session
166176
self.url = self.base if not key else "{}{}/".format(self.base, key)
177+
self.threading = threading
167178

168179
def get_version(self):
169180
""" Gets the API version of NetBox.
@@ -262,6 +273,19 @@ def _make_call(
262273
else:
263274
raise RequestError(req)
264275

276+
def concurrent_get(self, ret, page_size, page_offsets):
277+
futures_to_results = []
278+
with cf.ThreadPoolExecutor(max_workers=4) as pool:
279+
for offset in page_offsets:
280+
new_params = {"offset": offset, "limit": page_size}
281+
futures_to_results.append(
282+
pool.submit(self._make_call, add_params=new_params)
283+
)
284+
285+
for future in cf.as_completed(futures_to_results):
286+
result = future.result()
287+
ret.extend(result["results"])
288+
265289
def get(self, add_params=None):
266290
"""Makes a GET request.
267291
@@ -297,6 +321,32 @@ def req_all():
297321
else:
298322
return req
299323

324+
def req_all_threaded(add_params):
325+
if add_params is None:
326+
# Limit must be 0 to discover the max page size
327+
add_params = {"limit": 0}
328+
req = self._make_call(add_params=add_params)
329+
if isinstance(req, dict) and req.get("results") is not None:
330+
ret = req["results"]
331+
if req.get("next"):
332+
page_size = len(req["results"])
333+
pages = calc_pages(page_size, req["count"])
334+
page_offsets = [
335+
increment * page_size for increment in range(1, pages)
336+
]
337+
if pages == 1:
338+
req = self._make_call(url_override=req.get("next"))
339+
ret.extend(req["results"])
340+
else:
341+
self.concurrent_get(ret, page_size, page_offsets)
342+
343+
return ret
344+
else:
345+
return req
346+
347+
if self.threading:
348+
return req_all_threaded(add_params)
349+
300350
return req_all()
301351

302352
def put(self, data):

0 commit comments

Comments
 (0)