Source code for lsru

import os
import json
import datetime
from pprint import pprint
from configparser import ConfigParser
import warnings

import requests

from .utils import url_retrieve, url_retrieve_and_unpack

__version__ = "0.6.2"


[docs]class Usgs(object): """Interface to the Usgs API See documentation of the ``search`` method for basic usage Args: version (str): API version to use, defaults to ``'stable'`` conf (str): Path of the configuration file containing usgs login credentials Attributes: USER (str): Usgs username PASSWORD (str): Usgs password endpoint (str): API endpoint key (str): API key. Required to perform a search and obtained by running the ``login()`` method key_dt (datetime.datetime): Time at which the key was generated """
[docs] def __init__(self, version='stable', conf=os.path.expanduser('~/.lsru')): try: config = ConfigParser() config.read(conf) self.USER = config['usgs']['username'] self.PASSWORD = config['usgs']['password'] self.endpoint = '/'.join(['https://earthexplorer.usgs.gov/inventory/json/v', version]) self.key = None self.key_dt = None except Exception as e: raise FileNotFoundError('There must be a valid configuration file to instantiate this class')
@property def key_age(self): """Determines the age of API key Returns: datetime.timedelta """ if self.key_dt is None: raise ValueError('key_age is not defined, you probably need to run login()') return datetime.datetime.now() - self.key_dt
[docs] @staticmethod def get_collection_name(num): """Get Earth Explorer Landsat collection names Args: num (int): Landsat spacecraft number (4, 5, 7 or 8) Returns: str: Earth Explorer collection name formated to i.e. pass to the search method """ collections = {4: 'LANDSAT_TM_C1', 5: 'LANDSAT_TM_C1', 7: 'LANDSAT_ETM_C1', 8: 'LANDSAT_8_C1'} return collections[num]
[docs] def login(self): """Login to the Usgs api This method is necessary to obtain an API key (automatically saved in the ``key`` attribute), and send other queries to the API Return: bool: True if query was successful, False otherwise """ login_endpoint = '/'.join([self.endpoint, 'login']) r = requests.post(login_endpoint, data={'jsonRequest': json.dumps({'username': self.USER, 'password': self.PASSWORD})}) if r.json()['errorCode'] is not None: return False self.key = r.json()['data'] self.key_dt = datetime.datetime.now() return True
[docs] def search(self, collection, bbox, begin=None, end=None, max_cloud_cover=100, months=None, starting_number=1, max_results=50000): """Perform a spatio temporal query on Landsat catalog Args: collection (str): Landsat collection to query. Use LANDSAT_8_C1, LANDSAT_ETM_C1 and LANDSAT_TM_C1 for OLI, ETM+, and TM respectively bbox (tuple): A bounding box in the form of a tuple (left, bottom, right, top) begin (datetime.datetime): Optional begin date end (datetime.datetime): Optional end date max_cloud_cover (int): Cloud cover threshold to use for the query months (list): List of month indices (1,12) for only limiting the query to these months max_results (int): Maximum number of scenes to return starting_number (int): Used to determine the result number to start returning from. Is meant to be used when the total number of hits is higher than ``max_results``, to return results in a paginated fashion Example: >>> from lsru import Usgs >>> import datetime >>> usgs = Usgs() >>> usgs.login() >>> scene_list = usgs.search(collection='LANDSAT_8_C1', >>> bbox=(3.5, 43.4, 4, 44), >>> begin=datetime.datetime(2012,1,1), >>> end=datetime.datetime(2016,1,1)) >>> print(scene_list) Returns: list: List of scenes with complete metadata """ if self.key_age > datetime.timedelta(0, 3600): raise ValueError('Api key has probably expired (1 hr), re-run the login method') search_endpoint = '/'.join([self.endpoint, 'search']) params = {'apiKey': self.key, 'node': 'EE', 'datasetName': collection, 'maxCloudCover': max_cloud_cover, 'lowerLeft': {'latitude': bbox[1], 'longitude': bbox[0]}, 'upperRight': {'latitude': bbox[3], 'longitude': bbox[2]}, 'maxResults': max_results, 'startingNumber': starting_number} if begin is not None: params.update(startDate=begin.isoformat()) if end is not None: params.update(endDate=end.isoformat()) if months is not None: params.update(months=months) r = requests.post(search_endpoint, data={'jsonRequest': json.dumps(params)}) return r.json()['data']['results']
class _EspaBase(object): """Interface to the Espa API (metaclass) Espa is a platform providing on demand pre-processing of Landsat surface data. This class uses the API of the espa platform to query and place orders programatically Attributes: USER (str): Usgs username PASSWORD (str): Usgs password host (str): API host url Args: conf (str): Path of the config file containing usgs credentials """ def __init__(self, conf): try: config = ConfigParser() config.read(conf) self.USER = config['usgs']['username'] self.PASSWORD = config['usgs']['password'] self.host = 'https://espa.cr.usgs.gov/api/v1' self.conf = conf except Exception as e: raise FileNotFoundError('There must be a valid configuration file to instantiate this class') def _request(self, endpoint, verb='get', body=None): """Generic interface to ESPA api Adapted from Jake Brinkmann's example in https://github.com/USGS-EROS/espa-api/blob/master/examples/api_demo.ipynb Args: endpoint (str): Api endpoint to call verb (str): Request verb (get, post, put ...) body (dict): Data to pass to the request """ auth_tup = (self.USER, self.PASSWORD) response = getattr(requests, verb)('/'.join([self.host, endpoint]), auth=auth_tup, json=body) data = response.json() if isinstance(data, dict): messages = data.pop("messages", None) if messages: pprint(messages) response.raise_for_status() return data
[docs]class Espa(_EspaBase): """Interface to the Espa API Espa is a platform providing on demand pre-processing of Landsat surface data. This class uses the API of the espa platform to query and place orders programatically Attributes: USER (str): Usgs username PASSWORD (str): Usgs password host (str): API host url Args: conf (str): Path of the config file containing usgs credentials """
[docs] def __init__(self, conf=os.path.expanduser('~/.lsru')): super(Espa, self).__init__(conf=conf) self._projections = None self._formats = None self._resampling_methods = None self._user = None
[docs] def order(self, scene_list, products, format='gtiff', note=None, resampling='nn', resolution=None, projection=None, extent=None, extent_units='dd', verbose=False): """Place a pre-procesing order to espa Args: scene_list (list): List of Landsat scene ids products (list): List of products to order for pre-processing See ``Espa.get_available_products()`` to get information on available products format (str): Pre-processing file format. See Espa.formats for information on available formats note (str): Optional human readable message to pass to the order resampling (str): Resamping method to be used when reprojecting or resizing ordered images. See ``Espa.resampling_methods`` for valid values. resolution (float): Ouput resolution (optional). If specified, the pre-processing order will be resized to the specified resolution. If set to None (default), no resizing is performed and products are processed at their original resolution (usually 30m). projection (dict): Optional dictionary with projection name and projection parameter values. Ordered products are re-projected to the specified projection when set. See ``Espa.projections`` for list and format of supported projections extent (tuple): Bounding box to use to crop the pre-processed products bounding box is in the form of a (left, bottom, right, top) tuple. This is optional and requires a projection to be set. extent_units (str): Units of the provided extent. ``'dd'`` (decimal degrees) is the default. If ```meters'`` bounds are specified according to the coordinate reference system space. verbose (bool): Prints the json body being sent. Useful for debugging purposes Example: >>> from lsru import Espa, Usgs >>> import datetime >>> espa = Espa() >>> usgs = Usgs() >>> usgs.login() >>> scene_list = usgs.search(collection='LANDSAT_8_C1', ... bbox=(3.5, 43.4, 4, 44), ... begin=datetime.datetime(2014,1,1), ... end=datetime.datetime(2018,1,1)) >>> scene_list = [x['displayId'] for x in scene_list] >>> order = espa.order(scene_list, products=['sr', 'pixel_qa']) Return: lsru.Order: The method is mostly used for its side effect of placing a pre-processing order on the espa platform. It also returns a the ``lsru.Order`` instance corresponding to the order """ if note is None: note = 'order placed on %s' % datetime.datetime.now().isoformat() prods = self.get_available_products(scene_list) prods.pop('not_implemented', None) # There may be unavailable scenes for ordered products (remove them if 'date_restricted' in prods: date_restricted = prods.pop('date_restricted') for k,v in date_restricted.items(): if k in products: for scene_id in v: for collection in prods.keys(): try: prods[collection]['inputs'].remove(scene_id) warnings.warn('%s removed from order; reason: %s date restriction' % (scene_id, k)) except ValueError: pass def prepare_dict(d): d['products'] = products return d params = {k:prepare_dict(v) for k,v in prods.items()} params.update(format=format, note=note, resampling_method=resampling) if resolution is not None: params.update(resize={'pixel_size': resolution, 'pixel_size_units': 'meters'}) if projection is not None: params.update(projection=projection) if extent is not None: extent_dict = dict(zip(('west', 'south', 'east', 'north'), extent)) extent_dict.update(units=extent_units) params.update(image_extents=extent_dict) if verbose: pprint(params) order_meta = self._request('order', verb='post', body=params) return Order(order_meta['orderid'], conf=self.conf)
@property def projections(self): """Get a dictionary of projections supported by the platform Return: dict: Dictionary with key=projections names and values=projection attributes """ if self._projections is None: self._projections = self._request('projections') return self._projections
[docs] def get_available_products(self, scene_list): """Get the list of available products for each elements of a list of scene ids Args: scene_list (list): List of scene ids Example: >>> from lsru import Espa >>> espa = Espa() >>> print(espa.get_available_products([ ... 'LE07_L1TP_029030_20170221_20170319_01_T1' ... ])) Return: dict: Information on products available for each element of the input list provided """ return self._request('available-products', body={'inputs': scene_list})
@property def formats(self): """Get a list of file formats supported by the platform Returns: list: List of strings corresponding to the formats names """ if self._formats is None: self._formats = self._request('formats') return self._formats @property def resampling_methods(self): """Get a list of resamling methods suported by the platform Returns: list: List of resampling methods """ if self._resampling_methods is None: self._resampling_methods = self._request('resampling-methods') return self._resampling_methods @property def user(self): """Get Usgs user details Returns: dict: Usgs user information """ if self._user is None: self._user = self._request('user') return self._user @property def orders(self): """Get a list of current orders Returns: list: List of ``lsru.Order``, each one corresponding to an order with ordered or complete status (purged orders are not listed) """ order_list = self._request('list-orders', body={'status': ['complete', 'ordered']}) return [Order(x, conf=self.conf) for x in order_list]
[docs]class Order(_EspaBase): """Class to deal with espa orders Attributes: orderir (str): Espa order ID Args: orderid (str): Espa order ID conf (str): Path to file containing usgs credentials """
[docs] def __init__(self, orderid, conf=os.path.expanduser('~/.lsru')): super(Order, self).__init__(conf=conf) self.orderid = orderid
@property def status(self): """Get the current status of the order Return: str: Order status (e.g. ``ordered``, ``complete``, ``purged``) """ return self._request('order-status/%s' % self.orderid)['status'] @property def is_complete(self): """Check if order has status ``complete`` Return: bool """ return True if self.status == 'complete' else False @property def items_status(self): return self._request('item-status/%s' % self.orderid)[self.orderid] @property def urls_completed(self): """Get list of item's url whose status is complete Return: list: A list of download urls """ item_list = self.items_status url_list = [x['product_dload_url'] for x in item_list if x['status'] == 'complete'] return url_list
[docs] def cancel(self): """Cancel the order Orders are processed in the order they were placed. Cancelling an order may be useful when the order is blocking other orders Return: dict: The response of the API to the cancellation order """ cancel_request = {"orderid": self.orderid, "status": "cancelled"} return self._request('order', verb='put', body=cancel_request)
[docs] def download_all_complete(self, path, unpack=False, overwrite=False, check_complete=True): """Download all completed scenes of the order to a folder Args: path (str): Directory where data are to be downloaded unpack (bool): Unpack downloaded archives on the fly overwrite (bool): Force overwriting existing files even when they already exist? Defaults to False check_complete (bool): When local files exist and overwrite is set to ``False``, check whether local and remote files sizes match? File is re-downloaded when sizes are different. Only makes sense if overwrite is set to ``False``. Defaults to ``True``. Also note that checking file size takes time (a few millisecons probably), so that you'll save time setting this argument to ``False`` in case you're sure previous downloads are complete Note that this option does not work when ``unpack`` is set to True Returns: Used for its side effect of batch downloading data, no return """ for url in self.urls_completed: filename = url.split('/')[-1] try: print('Downloading %s' % filename) if unpack: url_retrieve_and_unpack(url, path, overwrite=overwrite) else: dst = os.path.join(path, filename) url_retrieve(url, dst, overwrite=overwrite, check_complete=check_complete) except Exception as e: print('%s skipped. Reason: %s' % (filename, e))