Skip to content

Commit 19f3a0b

Browse files
committed
several updates, moved json and datalink processors to separate functions
1 parent 11762ec commit 19f3a0b

File tree

3 files changed

+238
-160
lines changed

3 files changed

+238
-160
lines changed

fornax/access.py

Lines changed: 183 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,58 @@
11
import pyvo
2+
import json
23

3-
from astropy.table import Table
4+
from astropy.table import Table, unique
45
from astropy.io import votable
56

67
from .prem import AccessPoint
78
from .aws import AWSAccessPoint
89

910

1011
__all__ = ['AccessManager', 'DataHandler']
12+
13+
14+
ACCESS_POINTS = [
15+
AccessPoint,
16+
AWSAccessPoint
17+
]
18+
class_mapper = {ap.name: ap for ap in ACCESS_POINTS}
1119

1220

1321
class AccessManager:
1422
"""AccessPoint container and manager"""
1523

16-
def __init__(self, base_access):
17-
"""Initilize an AccessManager with a basic AccessPoint.
24+
def __init__(self, access_point):
25+
"""Initilize an AccessManager with a some AccessPoint.
1826
1927
Parameters
2028
----------
21-
base_access: AccessPoint
29+
access_point: AccessPoint or a subclass
2230
a minimum access point with a simple url.
2331
2432
"""
2533

26-
if not isinstance(base_access, AccessPoint):
34+
if not isinstance(access_point, AccessPoint):
2735
raise ValueError(
2836
f'type(base_access) is expected with be '
2937
f'AccessApoint not {type(base_access)}'
3038
)
3139

32-
self.access_points = {base_access.type: [base_access]}
40+
self.access_points = {access_point.name: [access_point]}
3341

3442
# the default is the one to use. One of access_points
35-
self.default_access_point = {base_access.type: base_access}
36-
43+
self.default_access_point = {access_point.name: access_point}
44+
45+
46+
def __repr__(self):
47+
summary = ', '.join([f'{k}:{len(g)}' for k,g in self.access_points.items()])
48+
return f'<Access: {summary}>'
49+
50+
51+
@property
52+
def ids(self):
53+
"""Return a list of current access id's """
54+
return [ap.id for aplist in self.access_points.values() for ap in aplist]
55+
3756

3857
def add_access_point(self, access_point):
3958
"""Add a new AccessPoint to the manager
@@ -56,20 +75,45 @@ def add_access_point(self, access_point):
5675
f'a subclass or a list not {type(base_access)}'
5776
)
5877

59-
ap_type = access_point.type
60-
if not ap_type in self.access_points:
61-
self.access_points[ap_type] = []
62-
self.access_points[ap_type].append(access_point)
78+
ap_name = access_point.name
79+
if not ap_name in self.access_points:
80+
self.access_points[ap_name] = []
81+
if not access_point.id in self.ids:
82+
self.access_points[ap_name].append(access_point)
83+
84+
85+
def summary(self):
86+
"""Print a summary of the access points"""
87+
88+
text = ''
89+
for name,apoints in self.access_points.items():
90+
text += '\n'
91+
text += '\n'.join([str(ap) for ap in apoints])
92+
print(text)
6393

6494

6595
class DataHandler:
6696
"""A container for multiple AccessPoint instances"""
6797

68-
def __init__(self, data_product, **kwargs):
98+
def __init__(self,
99+
data_product,
100+
source='prem',
101+
fallback=True,
102+
url_column=None,
103+
**kwargs
104+
):
69105
"""
70106
Parameters
71107
----------
72108
data_product: astropy.table or pyvo.dal.DALResults
109+
The data to be accessed or downloaded
110+
source: str
111+
The source of the data. prem | aws
112+
fallback: bool
113+
Fallback to prem if other source fail
114+
url_column: str or None
115+
Name of the column that contains the direct url.
116+
If None, attempt to figure it out following VO standards
73117
74118
kwargs: keywrods arguments used to initialize the AccessPoint
75119
instance or its subclasses.
@@ -80,88 +124,142 @@ def __init__(self, data_product, **kwargs):
80124
raise ValueError(f'data_prodcut should be either '
81125
'astropy.table.Table or '
82126
'pyvo.dal.DALResults')
127+
128+
if source not in class_mapper.keys():
129+
raise ValueError(f'Expected prem or aws for source. Found {source}')
83130

84131
# if we have an astropy table, convert to a pyvo.dal.DALResults
85132
if isinstance(data_product, Table):
86133
vot = votable.from_table(data_product)
87-
dal_result = pyvo.dal.DALResults(vot)
134+
dal_product = pyvo.dal.DALResults(vot)
88135
else:
89-
dal_result = data_product
136+
dal_product = data_product
90137

138+
# if url_column is not a column, fail
139+
if url_column is not None and url_column not in dal_product.fieldnames:
140+
raise ValueError(f'No column named {url_column} in the data product')
91141

92142
## column name with direct access url
93-
# SIA v1
94-
url_colname = dal_result.fieldname_with_ucd('VOX:Image_AccessReference')
95-
if url_colname is None:
96-
# SIA v2
97-
if 'access_url' in dal_result.fieldnames:
98-
url_colname = 'access_url'
99-
else:
100-
# try by ucd as a final resort
101-
url_colname = dal_result.fieldname_with_ucd('meta.ref.url')
102-
103-
# if still None, raise
104-
# TODO allow the user the pass the name to avoid failing
105-
if url_colname is None:
106-
raise ValueError(f'Could not figure out the column with direct access url')
107-
108-
# AccessPoint
109-
# - self.access_points: some type of ap manager (could be a simple container; e.g. list)
110-
# - each row has its access_points manager.
111-
# - If we have a cloud_access column, use it. it is easier than datalinks as it does not
112-
# require a new call to server.
113-
# - elif we have datalinks, get all data links in one call, then pass on to AWS ap.
114-
# - else: no aws info; fall back to on-prem
115-
116-
# minimum access point that uses on-prem data
117-
self.nrows = len(dal_result)
118-
self.access_manager = [AccessManager(AccessPoint(url)) for url in dal_result[url_colname]]
119-
120-
121-
# if there is a 'cloud_access' json column, give it priority
122-
# as it does not require a new call to the server
123-
if 'cloud_access' in dal_result.fieldnames:
124-
profile = kwargs.get('profile', None)
125-
126-
for irow in range(self.nrows):
127-
jsontxt = dal_result[irow]['cloud_access']
128-
awsAp = AWSAccessPoint.from_json(jsontxt, profile=profile)
129-
self.access_manager[irow].add_access_point(awsAp)
143+
if url_column is None:
144+
# SIA v1
145+
url_column = dal_product.fieldname_with_ucd('VOX:Image_AccessReference')
146+
if url_column is None:
147+
# SIA v2
148+
if 'access_url' in dal_product.fieldnames:
149+
url_column = 'access_url'
150+
else:
151+
# try by ucd as a final attempt
152+
url_column = dal_product.fieldname_with_ucd('meta.ref.url')
130153

131-
else:
132-
# check for datalinks
133-
try:
134-
dlinks = dal_result.get_adhocservice_by_ivoid(
135-
pyvo.dal.adhoc.DATALINK_IVOID
136-
)
137-
except pyvo.DALServiceError:
138-
raise ValueError(
139-
'No cloud information available in either '
140-
'cloud_access column, or in datalinks'
141-
)
142-
# Look for the 'source' <PARAM> element inside the inputParams <GROUP> element.
143-
# pyvo already handles part of this.
144-
if not hasattr(dlinks, 'groups'):
145-
raise ValueError(
146-
'Datalinks resource does not have a group '
147-
'as required by the standard'
148-
)
149-
150-
# look for the 'source' param in the datalinks resource tree
151-
source_elems = [p for p in dlinks.groups[0].entries if p.name == 'source']
152-
if len(source_elems) == 0:
153-
raise ValueError(
154-
'No <PARAM> named "source" found in the Datalinks resource. '
155-
'No access points will be extracted'
156-
)
154+
# base prem access point
155+
access_manager = [AccessManager(AccessPoint(url)) for url in dal_product[url_column]]
156+
self.access_manager = access_manager
157+
158+
## ------------------- ##
159+
## other access points ##
160+
## ------------------- ##
161+
162+
# process the json column if it exists
163+
self.process_json_column(dal_product)
164+
165+
# process datalinks if they exist
166+
self.process_datalinks(dal_product)
167+
168+
169+
def __getitem__(self, item):
170+
"""Enable access to the access_manager list directly"""
171+
return self.access_manager[item]
172+
173+
174+
def process_json_column(self, dal_product, colname='cloud_access'):
175+
"""Process the json text in the access column
176+
177+
Parameters
178+
----------
179+
dal_product: pyvo.dal.DALResults
180+
A pyvo DALResults object containing the requested data product
181+
colname: str
182+
The name of the column that contains the access information
183+
in json format.
184+
185+
"""
186+
187+
# if no cloud_access column, there is nothing to do
188+
if colname not in dal_product.fieldnames:
189+
return
190+
191+
for irow, jsontxt in enumerate(dal_product[colname]):
192+
desc = json.loads(jsontxt)
193+
194+
# search for the known access types in desc
195+
for ap_name, APclass in class_mapper.items():
196+
197+
if ap_name not in desc:
198+
continue
199+
200+
# TEMPORARY
201+
if 'access' in desc[ap_name]:
202+
del desc[ap_name]['access']
203+
new_ap = APclass(**desc[ap_name])
204+
self.access_manager[irow].add_access_point(new_ap)
157205

158-
# we have a source parameters, process it
159-
source_elem = source_elems[0]
206+
207+
def process_datalinks(self, dal_product):
208+
"""Look for and process access point in datalinks
209+
210+
Parameters
211+
----------
212+
dal_product: pyvo.dal.DALResults
213+
A pyvo DALResults object containing the requested data product
214+
160215
216+
"""
217+
218+
# do we have datalinks?
219+
try:
220+
dlinks = dal_product.get_adhocservice_by_ivoid(
221+
pyvo.dal.adhoc.DATALINK_IVOID
222+
)
223+
except pyvo.DALServiceError:
224+
dlinks = None
225+
226+
# if no datalinks, there is nothing to do here
227+
if dlinks is None:
228+
return
229+
230+
# input parameters for the datalink call
231+
input_params = pyvo.dal.adhoc._get_input_params_from_resource(dlinks)
232+
dl_col_id = [p.ref for p in input_params.values() if p.ref is not None]
233+
dl_col_name = [f.name for f in dal_product.fielddescs if f.ID in dl_col_id]
234+
235+
236+
# proceed only if we have a PARAM named source,
237+
if 'source' in input_params.keys():
238+
# we have a 'source' element, process it
239+
source_elem = input_params['source']
240+
161241
# list the available options in the `source` element:
162242
access_options = source_elem.values.options
163-
164-
165-
from IPython import embed;embed();exit(0)
166-
base_ap = [AccessPoint(url) for url in dal_result[url_colname]]
167-
base_ap = AccessPoint()
243+
for description,option in access_options:
244+
245+
# TEMPORARY
246+
option = option.replace('main-server', 'prem')
247+
if option == 'prem': continue
248+
249+
soption = option.split(':')
250+
query = pyvo.dal.adhoc.DatalinkQuery.from_resource(
251+
dal_product, dlinks,
252+
source=option
253+
)
254+
255+
dl_result = query.execute()
256+
dl_table = dl_result.to_table()
257+
258+
ap_type = option.split(':')[0]
259+
ApClass = class_mapper[ap_type]
260+
for irow in range(len(dal_product)):
261+
dl_res = dl_table[dl_table['ID'] == dal_product[dl_col_name[0]][irow]]
262+
for dl_row in dl_res:
263+
ap = ApClass(uri=dl_row['access_url'])
264+
self.access_manager[irow].add_access_point(ap)
265+

0 commit comments

Comments
 (0)