Source code for pyesgf.search.results
"""
Module :mod:`pyesgf.search.results`
===================================
Search results are retrieved through the :class:`ResultSet` class. This class
hides paging of large result sets behind a client-side cache. Subclasses of
:class:`Result` represent results of different SOLr record type.
"""
from collections import defaultdict
from collections.abc import Sequence
import re
from .consts import (DEFAULT_BATCH_SIZE, TYPE_DATASET, TYPE_FILE,
TYPE_AGGREGATION)
[docs]class ResultSet(Sequence):
"""
:ivar context: The search context object used to generate this resultset
:property batch_size: The number of results that will be requested
from esgf-search as one call. This must be set on creation and
cannot change.
"""
def __init__(self, context, batch_size=DEFAULT_BATCH_SIZE, eager=True):
"""
:param context: The search context object used to generate this
resultset
:param batch_size: The number of results that will be requested from
esgf-search as one call.
:param eager: Boolean specifying whether to retrieve the first batch on
instantiation.
"""
self.context = context
self.__batch_size = batch_size
self.__batch_cache = {}
self.__len_cache = None
if eager:
self.__get_batch(0)
def __getitem__(self, index):
batch_i = index // self.batch_size
offset = index % self.batch_size
batch = self.__get_batch(batch_i)
search_type = self.context.search_type
ResultClass = _result_classes[search_type]
# !TODO: should probably wrap the json inside self.__batch_cache
return ResultClass(batch[offset], self.context)
def __len__(self):
if self.__len_cache is None:
self.__get_batch(0)
return self.__len_cache
@property
def batch_size(self):
return self.__batch_size
def _build_result(self, result):
"""
Construct a result object from the raw json.
This method is designed to be overridden in subclasses if desired.
The default implementation simply returns the json.
"""
return result
def __get_batch(self, batch_i):
if batch_i in self.__batch_cache:
return self.__batch_cache[batch_i]
offset = self.batch_size * batch_i
limit = self.batch_size
query_dict = self.context._build_query()
response = (self.context.connection
.send_search(query_dict, limit=limit, offset=offset,
shards=self.context.shards))
if self.__len_cache is None:
self.__len_cache = response['response']['numFound']
# !TODO: strip out results
batch = response['response']['docs']
self.__batch_cache[batch_i] = batch
return batch
[docs]class BaseResult(object):
"""
Base class for results.
Subclasses represent different search types such as File and Dataset.
:ivar json: The original json representation of the result.
:ivar context: The SearchContext which generated this result.
:property urls: a dictionary of the form
``{service: [(url, mime_type), ...], ...}``
:property opendap_url: The url of an OPeNDAP endpoint for this result
if available
:property las_url: The url of an LAS endpoint for this result if available
:property download_url: The url for downloading the result by HTTP
if available
:property gridftp_url: The url for downloading the result by Globus
if available
:property globus_url: The url for downloading the result by Globus
if available (including endpoint)
:property index_node: The index node from where the metadata is stored.
Calls to ``*_context()`` will optimise queries to only address this node.
"""
def __init__(self, json, context):
self.json = json
self.context = context
@property
def urls(self):
url_dict = defaultdict(list)
for encoded in self.json['url']:
url, mime_type, service = encoded.split('|')
url_dict[service].append((url, mime_type))
return url_dict
@property
def opendap_url(self):
try:
url, mime = self.urls['OPENDAP'][0]
except (KeyError, IndexError):
return None
url = re.sub(r'.html$', '', url)
return url
@property
def las_url(self):
try:
url, mime = self.urls['LAS'][0]
except (KeyError, IndexError):
return None
return url
@property
def download_url(self):
try:
url, mime = self.urls['HTTPServer'][0]
except (KeyError, IndexError):
return None
return url
@property
def gridftp_url(self):
try:
url, mime = self.urls['GridFTP'][0]
except (KeyError, IndexError):
return None
return url
@property
def globus_url(self):
try:
url, mime = self.urls['Globus'][0]
except (KeyError, IndexError):
return None
return url
@property
def index_node(self):
try:
index_node = self.json['index_node']
except KeyError:
return None
return index_node
[docs]class DatasetResult(BaseResult):
"""
A result object for ESGF datasets.
:property dataset_id: The solr dataset_id which is unique throughout the
system.
"""
@property
def dataset_id(self):
# !TODO: should we decode this into a tuple?
# self.json['id'].split('|')
return self.json['id']
@property
def number_of_files(self):
"""
Returns file count as reported by the dataset record.
"""
return self.json['number_of_files']
[docs] def file_context(self):
"""
Return a SearchContext for searching for files within this dataset.
"""
from .context import FileSearchContext
if self.context.connection.distrib:
# If the index node is in the available shards for this connection
# then restrict shards to that node. Otherwise do nothing to
# handle the case when the shard is replicated
available_shards = list(self.context.connection.get_shard_list().keys())
if self.index_node in available_shards:
shards = [self.index_node]
else:
shards = None
else:
shards = None
files_context = FileSearchContext(
connection=self.context.connection,
constraints={'dataset_id': self.dataset_id},
shards=shards,
)
return files_context
[docs] def aggregation_context(self):
"""
Return a SearchContext for searching for aggregations within this
dataset.
"""
from .context import AggregationSearchContext
if self.context.connection.distrib:
# If the index node is in the available shards for this connection
# then restrict shards to that node. Otherwise do nothing to
# handle the case when the shard is replicated
available_shards = list(self.context.connection.get_shard_list().keys())
if self.index_node in available_shards:
shards = [self.index_node]
else:
shards = None
else:
shards = None
agg_context = AggregationSearchContext(
connection=self.context.connection,
constraints={'dataset_id': self.dataset_id},
shards=shards,
)
return agg_context
[docs]class FileResult(BaseResult):
"""
A result object for ESGF files. Properties from :class:`BaseResult` are
inherited.
:property file_id: The identifier for the file
:property checksum: The checksum of the file
:property checksum_type: The algorithm used for generating the checksum
:property filename: The filename
:property size: The file size in bytes
"""
@property
def file_id(self):
return self.json['id']
@property
def checksum(self):
try:
return self.json['checksum'][0]
except KeyError:
return None
@property
def checksum_type(self):
try:
return self.json['checksum_type'][0]
except KeyError:
return None
@property
def filename(self):
return self.json['title']
@property
def size(self):
return int(self.json['size'])
@property
def tracking_id(self):
try:
return self.json['tracking_id'][0]
except KeyError:
return None
[docs]class AggregationResult(BaseResult):
"""
A result object for ESGF aggregations. Properties from :class:`BaseResult`
are inherited.
:property aggregation_id: The aggregation id
"""
@property
def aggregation_id(self):
return self.json['id']
_result_classes = {
TYPE_DATASET: DatasetResult,
TYPE_FILE: FileResult,
TYPE_AGGREGATION: AggregationResult,
}