|
| 1 | +{ |
| 2 | + "cells": [ |
| 3 | + { |
| 4 | + "cell_type": "code", |
| 5 | + "execution_count": null, |
| 6 | + "id": "d14779b0", |
| 7 | + "metadata": {}, |
| 8 | + "outputs": [], |
| 9 | + "source": [ |
| 10 | + "import requests\n", |
| 11 | + "from typing import Any, Dict\n", |
| 12 | + "import schedule\n", |
| 13 | + "import time\n", |
| 14 | + "import threading\n", |
| 15 | + "import numpy as np\n", |
| 16 | + "import matplotlib.pyplot as plt\n", |
| 17 | + "import csv\n", |
| 18 | + "# Replace these with your camera's IP address, username, and password\n", |
| 19 | + "\n", |
| 20 | + "camera_ip = \"xx.xx.xx.xx\"\n", |
| 21 | + "username = \"username\"\n", |
| 22 | + "password = \"password\"\n", |
| 23 | + "\n", |
| 24 | + "\n", |
| 25 | + "class cameraTestSuite():\n", |
| 26 | + " def __init__(\n", |
| 27 | + " self: Any,\n", |
| 28 | + " camera_ip: str,\n", |
| 29 | + " username: str,\n", |
| 30 | + " password: str,\n", |
| 31 | + " ):\n", |
| 32 | + " self.url = f\"http://{camera_ip}/axis-cgi/com/ptz.cgi?\"\n", |
| 33 | + " self.auth = requests.auth.HTTPDigestAuth(username, password)\n", |
| 34 | + " self.pan = 0\n", |
| 35 | + " self.tilt = 0\n", |
| 36 | + " self.zoom = 0\n", |
| 37 | + " self.data = []\n", |
| 38 | + "\n", |
| 39 | + " def reset(self):\n", |
| 40 | + " self.pan = 0\n", |
| 41 | + " self.tilt = 0\n", |
| 42 | + " self.data = []\n", |
| 43 | + "\n", |
| 44 | + " # Axis PTZ control URL and parameters\n", |
| 45 | + " def moveCamera(self,pan, tilt):\n", |
| 46 | + " response = requests.get(f'{self.url}pan={pan}&tilt={tilt}', auth=self.auth)\n", |
| 47 | + " # Check response\n", |
| 48 | + " if response.status_code != 204:\n", |
| 49 | + " print(f\"Failed to move camera: {response.status_code}, Response: {response.text}\")\n", |
| 50 | + "\n", |
| 51 | + " def zoomCamera(self,zoom):\n", |
| 52 | + " response = requests.get(f'{self.url}zoom={zoom}', auth=self.auth)\n", |
| 53 | + " # Check response\n", |
| 54 | + " if response.status_code != 204:\n", |
| 55 | + " print(f\"Failed to move camera: {response.status_code}, Response: {response.text}\")\n", |
| 56 | + " \n", |
| 57 | + " \n", |
| 58 | + " def moveCameraRate(self, panRate=0, tiltRate=0):\n", |
| 59 | + " response = requests.get(f'{self.url}continuouspantiltmove={panRate},{tiltRate}', auth=self.auth)\n", |
| 60 | + " # Check response\n", |
| 61 | + " if response.status_code != 204:\n", |
| 62 | + " print(f\"Failed to move camera: {response.status_code}, Response: {response.text}\")\n", |
| 63 | + "\n", |
| 64 | + " def getCurrentPosition(self):\n", |
| 65 | + " params = {\n", |
| 66 | + " \"query\": \"position\" # This query parameter should be adjusted based on camera API documentation\n", |
| 67 | + " }\n", |
| 68 | + " # Send the request to get the camera's current position\n", |
| 69 | + " response_start = time.time()\n", |
| 70 | + " response = requests.get(self.url, params=params, auth=self.auth)\n", |
| 71 | + " response_time = time.time() - response_start\n", |
| 72 | + " if response.status_code != 200:\n", |
| 73 | + " print(f\"Failed to get camera position: {response.status_code}, Response: {response.text}\")\n", |
| 74 | + " \n", |
| 75 | + " response_str = response.content.decode('UTF-8').split('\\r\\n')\n", |
| 76 | + " response_dict = {}\n", |
| 77 | + " for line in response_str:\n", |
| 78 | + " if line:\n", |
| 79 | + " key, value = line.split('=')\n", |
| 80 | + " response_dict[key] = value\n", |
| 81 | + " self.pan = float(response_dict['pan'])\n", |
| 82 | + " self.tilt = float(response_dict['tilt'])\n", |
| 83 | + " self.zoom = float(response_dict['zoom'])\n", |
| 84 | + " response_dict['timestamp'] = time.time()\n", |
| 85 | + " response_dict['response_time'] = response_time\n", |
| 86 | + " self.data.append(response_dict)\n", |
| 87 | + " # Check response\n", |
| 88 | + " return response_dict\n", |
| 89 | + " \n", |
| 90 | + " def run_schedule(self):\n", |
| 91 | + " while True:\n", |
| 92 | + " schedule.run_pending()\n", |
| 93 | + " time.sleep(0.01)\n", |
| 94 | + "\n", |
| 95 | + " def start_pan(self, panRate=0, tiltRate=0):\n", |
| 96 | + " \n", |
| 97 | + " self.moveCameraRate(panRate, tiltRate)\n", |
| 98 | + " self.moveCamera(pan=0, tilt=0)\n", |
| 99 | + " #self.zoomCamera(1)\n", |
| 100 | + " time.sleep(10)\n", |
| 101 | + " self.moveCamera(pan=0, tilt=0)\n", |
| 102 | + " time.sleep(10)\n", |
| 103 | + " self.reset()\n", |
| 104 | + " schedule.every(0.1).seconds.do(self.getCurrentPosition)\n", |
| 105 | + " #print(\"Moving camera at panRate: \", panRate, \"tiltRate: \", tiltRate)\n", |
| 106 | + " self.moveCameraRate(tiltRate=0, panRate=panRate)\n", |
| 107 | + " \n", |
| 108 | + " def start_tilt(self, panRate=0, tiltRate=0):\n", |
| 109 | + " \n", |
| 110 | + " self.moveCameraRate(panRate, tiltRate)\n", |
| 111 | + " self.moveCamera(pan=0, tilt=-90)\n", |
| 112 | + " #self.zoomCamera(1)\n", |
| 113 | + " time.sleep(10)\n", |
| 114 | + " self.moveCamera(pan=0, tilt=-90)\n", |
| 115 | + " time.sleep(10)\n", |
| 116 | + " self.reset()\n", |
| 117 | + " schedule.every(0.1).seconds.do(self.getCurrentPosition)\n", |
| 118 | + " print(\"Moving camera at panRate: \", panRate, \"tiltRate: \", tiltRate)\n", |
| 119 | + " self.moveCameraRate(tiltRate=tiltRate, panRate=0)\n", |
| 120 | + " \n", |
| 121 | + " def stop(self):\n", |
| 122 | + " self.moveCameraRate(0, 0)\n", |
| 123 | + " schedule.clear()\n", |
| 124 | + "\n", |
| 125 | + " def main(self):\n", |
| 126 | + " self.moveCamera(pan=0, tilt=0)\n", |
| 127 | + " self.zoomCamera(1)\n", |
| 128 | + " #schedule.every(0.1).seconds.do(self.getCurrentPosition)\n", |
| 129 | + "\n", |
| 130 | + " # Start the scheduler thread\n", |
| 131 | + " scheduler_thread = threading.Thread(target=self.run_schedule)\n", |
| 132 | + " scheduler_thread.daemon = True\n", |
| 133 | + " scheduler_thread.start()\n", |
| 134 | + " \n", |
| 135 | + "camera = cameraTestSuite(camera_ip=camera_ip, username=username, password=password)\n", |
| 136 | + "camera.main()" |
| 137 | + ] |
| 138 | + }, |
| 139 | + { |
| 140 | + "cell_type": "code", |
| 141 | + "execution_count": null, |
| 142 | + "id": "e619e90b", |
| 143 | + "metadata": {}, |
| 144 | + "outputs": [], |
| 145 | + "source": [ |
| 146 | + "zoom=0\n", |
| 147 | + "experiment_time = 120\n", |
| 148 | + "\n", |
| 149 | + "pan_rate_range = range(100, -1, -1)\n", |
| 150 | + "tilt_rate = 0\n", |
| 151 | + "camera.zoomCamera(zoom)\n", |
| 152 | + "results = []\n", |
| 153 | + "for pan_rate in pan_rate_range:\n", |
| 154 | + " #camera.start_tilt(tiltRate=pan_rate)\n", |
| 155 | + " camera.start_pan(panRate=pan_rate)\n", |
| 156 | + " timestart = time.time()\n", |
| 157 | + " while time.time()-timestart < experiment_time:\n", |
| 158 | + " time.sleep(0.1)\n", |
| 159 | + " camera.stop()\n", |
| 160 | + " \n", |
| 161 | + " # Calculate pan rate from last 2 measurement\n", |
| 162 | + " # Get the last two position measurements\n", |
| 163 | + " timestamps = [d['timestamp'] - camera.data[0]['timestamp'] for d in camera.data]\n", |
| 164 | + " #pan_values = np.unwrap([float(d['tilt']) for d in camera.data], period=360)\n", |
| 165 | + " pan_values = np.unwrap([float(d['pan']) for d in camera.data], period=360)\n", |
| 166 | + "\n", |
| 167 | + " measured_pan_rate = (pan_values[-1]-pan_values[0])/(timestamps[-1]-timestamps[0])\n", |
| 168 | + " #print(f\"Measured tilt rate for {pan_rate}%: {measured_pan_rate:.3f} degrees/second\")\n", |
| 169 | + " print(f\"Measured pan rate for {pan_rate}%: {measured_pan_rate:.3f} degrees/second\")\n", |
| 170 | + " measurement = {\n", |
| 171 | + " 'pan_rate': pan_rate,\n", |
| 172 | + " 'measured_pan_rate': measured_pan_rate\n", |
| 173 | + " }\n", |
| 174 | + " results.append(measurement)\n", |
| 175 | + "pan_rates = [result['pan_rate'] for result in results]\n", |
| 176 | + "measured_pan_rates = [result['measured_pan_rate'] for result in results]" |
| 177 | + ] |
| 178 | + }, |
| 179 | + { |
| 180 | + "cell_type": "code", |
| 181 | + "execution_count": null, |
| 182 | + "id": "c9989437", |
| 183 | + "metadata": {}, |
| 184 | + "outputs": [], |
| 185 | + "source": [ |
| 186 | + "# Save data to CSV\n", |
| 187 | + "csv_filename = \"pan-rate-analysis.csv\"\n", |
| 188 | + "with open(csv_filename, mode=\"w\", newline=\"\") as file:\n", |
| 189 | + " writer = csv.writer(file)\n", |
| 190 | + " writer.writerow([\"Pan Rate (%)\", \"Measured Pan Rate (degrees/second)\"])\n", |
| 191 | + " writer.writerows(zip(pan_rates, measured_pan_rates))\n", |
| 192 | + "\n", |
| 193 | + "print(f\"Data saved to '{csv_filename}'.\")" |
| 194 | + ] |
| 195 | + }, |
| 196 | + { |
| 197 | + "cell_type": "code", |
| 198 | + "execution_count": null, |
| 199 | + "id": "a45bfba6", |
| 200 | + "metadata": {}, |
| 201 | + "outputs": [], |
| 202 | + "source": [ |
| 203 | + "plt.figure(figsize=(10, 6))\n", |
| 204 | + "plt.plot(pan_rates, measured_pan_rates, marker='o')\n", |
| 205 | + "plt.xlabel('Pan Rate (%)')\n", |
| 206 | + "plt.ylabel('Measured Pan Rate (degrees/second)')\n", |
| 207 | + "plt.title('Measured Pan Rate vs. Pan Rate Setting')\n", |
| 208 | + "plt.grid(True)" |
| 209 | + ] |
| 210 | + } |
| 211 | + ], |
| 212 | + "metadata": { |
| 213 | + "kernelspec": { |
| 214 | + "display_name": "Python 3 (ipykernel)", |
| 215 | + "language": "python", |
| 216 | + "name": "python3" |
| 217 | + }, |
| 218 | + "language_info": { |
| 219 | + "codemirror_mode": { |
| 220 | + "name": "ipython", |
| 221 | + "version": 3 |
| 222 | + }, |
| 223 | + "file_extension": ".py", |
| 224 | + "mimetype": "text/x-python", |
| 225 | + "name": "python", |
| 226 | + "nbconvert_exporter": "python", |
| 227 | + "pygments_lexer": "ipython3", |
| 228 | + "version": "3.10.12" |
| 229 | + } |
| 230 | + }, |
| 231 | + "nbformat": 4, |
| 232 | + "nbformat_minor": 5 |
| 233 | +} |
0 commit comments