Source code for nevis._os_terrain_50

#!/usr/bin/env python3
"""
Hidden module that provides methods to download, unpack, and process the OS
Terrain 50 data set.
"""
# The zip contains several grid squares, using the lettering described here:
# https://en.wikipedia.org/wiki/Ordnance_Survey_National_Grid
#
# Inspired by
# https://scipython.com/blog/processing-uk-ordnance-survey-terrain-data/
#
import io
import os
import sys
import urllib.request
import zipfile

# Patch the zipfile module to support the PKZIP proprietary DEFLATE64 format
# This is used by two files in the May 2022 release of OS Terrain 50
# (data/sd/sd67_OST50GRID_20220506.zip and data/sd/sd68_OST50GRID_20220506.zip)
import zipfile_deflate64  # noqa

import numpy as np

import nevis


# Base name of the big zip file to read (e.g. to read 'terr50_gagg_gb.zip' we
# put 'terr50_gagg_gb').
terrain_file = 'terr50_gagg_gb'
terrain_file_zip = os.path.join(nevis._DIR_DATA, terrain_file + '.zip')
terrain_file_npy = os.path.join(nevis._DIR_DATA, terrain_file + '.npy')
not_sea_file = 'not_sea'
not_sea_file_npy = os.path.join(nevis._DIR_DATA, not_sea_file + '.npy')

# URL to download it from
url = ('https://api.os.uk/downloads/v1/products/Terrain50/downloads?'
       'area=GB&format=ASCII+Grid+and+GML+%28Grid%29&redirect')

# Resolution: distance between neighbours in the grid
resolution = 50

# .asc file encoding
ENC = 'utf-8'

# Cached heights and not-sea-mask
_heights = None
_not_sea = None             # y * width + x
_not_sea_unpacked = None    # array of y, x indices


[docs] class DataNotFoundError(RuntimeError): """ Raised when the OS Terrain 50 data is not found. """
[docs] def spacing(): """ Returns the spacing between any two grid points. """ return resolution
[docs] def gb(downsampling=None): """ Returns an array of elevation data for Great Britain, with height in meters and grid points spaced 50m apart. Array indices are (height, width), also known as (northings, eastings). A downsampled version can be returned for testing purposes, by setting ``downsampling`` to any integer greater than one. """ global _heights, _not_sea # Load data (or retrieve from cache) if _heights is None or _not_sea is None: # Check files exists if not os.path.isfile(terrain_file_npy): raise DataNotFoundError( f'OS Terrain 50 data not found in {nevis._DIR_DATA}.' ' Please use nevis.download_os_terrain_50() to download and' ' process this data set.' ) # Don't load the heights data yet, may have to be rebuild it anyway! if not os.path.isfile(not_sea_file_npy): raise DataNotFoundError( 'List of below sea-level inland points not found in' f' {nevis._DIR_DATA}. Please call' ' nevis.download_os_terrain_50() to create this file.' ) # Load both files _heights = np.load(terrain_file_npy) _heights.setflags(write=False) _not_sea = np.load(not_sea_file_npy) _not_sea.setflags(write=False) # Return downsampled version for testing (but keep full version in cache) if downsampling is not None: downsampling = int(downsampling) if downsampling > 1: return _heights[::downsampling, ::downsampling] return _heights
def inland_below_sea_level_indices(): """ Returns a list of indices in the heights data that are below sea-level, but which nevis's preprocessing did not consider to be "sea". """ global _not_sea_unpacked if _not_sea_unpacked is None: if _not_sea is None: gb() x = _not_sea % _heights.shape[1] y = _not_sea // _heights.shape[1] _not_sea_unpacked = np.vstack((y, x)).T return _not_sea_unpacked
[docs] def inland_below_sea_level(): """ Returns a list of points (x, y) in meters that are below sea-level, but which nevis's preprocessing did not consider to be "sea". """ ij = inland_below_sea_level_indices() * resolution return np.vstack((ij[:, 1], ij[:, 0])).T
[docs] def is_sea(coords): """ Returns ``True`` if and only if the given :class:`nevis.Coords` were classified as "sea" by nevis's pre-processing. """ if _heights is None: gb() x, y = coords.grid // nevis.spacing() try: if _heights[y, x] >= 0: return False except IndexError: # Anything off-map counts as sea return True return (y * _heights.shape[1] + x) not in _not_sea
[docs] def download_os_terrain_50(force=False): """ Downloads, unpacks and processes the OS Terrain 50 data set. If a previously downloaded zip file is found then the downloading step is skipped, unless ``force`` is set to ``True``. If a previously unpacked and processed file is found, this method does nothing unless ``force`` is set to ``True``. """ global _heights # Already done and not forcing? Then return have_terrain = os.path.isfile(terrain_file_npy) have_not_sea = os.path.isfile(not_sea_file_npy) if (not force) and have_terrain and have_not_sea: print('Downloaded, unpacked, and processed file already found:' ' Skipping.') return # Check directory exists, show message saying it will be created and how to # change its location. make_dirs = not os.path.isdir(nevis._DIR_DATA) if make_dirs: msg = ( f'This method will create a directory: {nevis._DIR_DATA}\n' 'This will be used to store the OS Terrain 50 data set in both a' ' compressed (160mb) and uncompressed (1.5gb) form, as well as' ' additional files that can reach several gb in size.\n' 'These files will NOT be deleted automatically when nevis is' ' uninstalled.\n' ) if nevis._ENV_DATA in os.environ: msg += 'This path was' else: msg += 'An alternative directory can be' msg += f' specified with the environment variable: {nevis._ENV_DATA}' print(msg) # Download zip file if force or not os.path.isfile(terrain_file_zip): print('The OS Terrain 50 database will be downloaded from') print(f' {url}') print() yesno = input('Continue? (y/n) ') if yesno.lower() not in ['y', 'yes']: print('Halted.') return if make_dirs: os.makedirs(nevis._DIR_DATA) download_gagg(url, terrain_file_zip) # Unpack / process if force or not have_terrain or not have_not_sea: # Temporary sea-mask level s = -100 # Create empty array width, height = nevis.dimensions() nx, ny = width // resolution, height // resolution heights = np.empty((ny, nx), dtype=np.float32) heights.fill(np.nan) # Fill it up extract(terrain_file, heights, resolution) # Replace missing values by far-below-sea level heights[np.isnan(heights)] = s # Fix odd squares fix_sea_levels_in_odd_squares(heights) # Add a few fake damns save_cambridgeshire(heights) # Create a "sea mask" in the loaded data, indicating sea with -100 set_sea_level(heights, s) # Store below-zero points that are not in the sea, so that we can # later detect if something has been labelled sea or not. not_sea = inland_below_sea_level_points(heights, s) print(f'Saving to {not_sea_file_npy}...') np.save(not_sea_file_npy, not_sea) # Make the points labelled as sea slope towards the land, to make it # more findable. add_sea_slope(heights, s) print(f'Saving to {terrain_file_npy}...') np.save(terrain_file_npy, heights) # Store _heights = heights _heights.setflags(write=False) _not_sea = not_sea _not_sea.setflags(write=False)
def download_gagg(url, fname, print_to_screen=True): """ Downloads the (160mb) ``gagg`` file containing the OS50 data from the Ordnance Survey. """ if print_to_screen: print('Downloading terrain data...') url = urllib.request.urlopen(url) try: raw_data = url.read() finally: url.close() if print_to_screen: print('Writing...') with open(fname, 'wb') as f: f.write(raw_data) def extract(basename, heights, resolution, print_to_screen=True): """ Extracts data from an "ASCII Grid and GML (Grid)" file at ``basename``.zip, and extracts the data into numpy array ``heights``. The resolution of each file inside the zip must be the same, and must be specified as ``resolution``. """ # Open zip file if print_to_screen: print(f'Reading from {terrain_file_zip}') i = 0 t = nevis.Timer() with zipfile.ZipFile(terrain_file_zip, 'r') as f: # Find inner zip files zips = [] for name in f.namelist(): if os.path.splitext(name)[1] == '.zip': zips.append(name) zips.sort() # Read nested zip files for name in zips: read_nested_zip(f, name, heights, resolution) if print_to_screen: i += 1 print('.', end=(None if i % 79 == 0 else '')) sys.stdout.flush() if print_to_screen: print(f'\nFinished, after {t.format()}') def read_nested_zip(parent, name, heights, resolution): """ Opens a zip-in-a-zip and reads and extracts any ``.asc`` files inside it. """ # Open zip-in-a-zip with parent.open(name, 'r') as par: nested = io.BytesIO(par.read()) with zipfile.ZipFile(nested) as f: # Scan for asc files for path in f.namelist(): if os.path.splitext(path)[1] == '.asc': # Read internal asc with f.open(path, 'r') as asc: try: read_asc(asc, heights, resolution) except Exception as e: raise Exception( f'Error reading {path} in {name}: {str(e)}.') def read_asc(handle, heights, resolution): """ Extracts data from a handle pointing to an already opened ``.asc`` file. """ # Grab all text in one go and decode lines = handle.read().decode(ENC).splitlines() # Head lines def header(line, field): start = field.strip() + ' ' n = len(start) if line[:n] != start: raise Exception( f'Unexpected header line. Got "{line}", expecting "{start}".') return int(line[n:]) # Ncols and nrows ncols = header(lines[0], 'ncols') nrows = header(lines[1], 'nrows') # Offset xll = header(lines[2], 'xllcorner') // resolution yll = header(lines[3], 'yllcorner') // resolution # Resolution cellsize = header(lines[4], 'cellsize') if cellsize != resolution: raise Exception( f'Unexpected resolution. Got {cellsize}, expecting {resolution}.') # Optional missing value indicator missing = None offset = 5 if lines[offset].startswith('nodata_value '): missing = lines[offset].split()[1:2] offset += 1 # Read data data = np.genfromtxt( lines[offset:], delimiter=' ', missing_values=missing, filling_values=[np.nan], max_rows=nrows, ) assert data.shape == (nrows, ncols) # Insert data into vector heights[yll:yll + nrows, xll:xll + ncols] = data[::-1, :] def fix_sea_levels_in_odd_squares(heights): """ Manually "lower" some areas just off the coast that have above sea-level heights in the data set. """ # Fix sea level in NT68 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NT68') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + w, x:x + w] view[view < 2.5] -= 2.1 # Fix sea level in NR24, 34, 44 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NR24') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + w, x:x + 3 * w] view[view < 0.2] -= 10 # Fix sea level in NR33 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NR33') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + w, x:x + w] view[view < 0.2] -= 10 # Fix sea level in NR35 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NR35') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + w, x:x + w] view[view < 0.2] -= 0.5 # Fix sea level in NR56 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NR56') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + w, x:x + w] view[view < 0.1] = -10 # Fix sea level in NR57 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NR57') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + w, x:x + w] view[view < 0.1] -= 0.5 # Fix sea level in NR76 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NR76') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + w, x:x + w] view[view < 0.1] -= 0.5 # Fix sea level in NR65,75,64,74 (last checked on 2022-06-27) x, w = nevis.Coords.from_square_with_size('NR64') x, y = x.grid[0] // resolution, x.grid[1] // resolution w = w // resolution view = heights[y:y + 2 * w, x:x + 2 * w] view[view < 0.1] = -10 del view def save_cambridgeshire(heights): """ Artificially raise the level of two river beds to stop the sea-level setting algorithm from treating e.g. Holme fen as "sea". """ # Last checked 2023-08-27 # Block river Great Ouse in TF 50, stopping a lot of flooding in # cambridgeshire. #heights[6047, 11183] = 0.01 # Worked 2022-06-27, but not 2023-08-31 heights[6196, 11197] = 0.01 # Block river Yare in TG 50, and Oulton Dyke in TM59, stopping lots of # flooding near Norwich. # Note: Both must be set to see an effect. heights[6151, 13041] = 0.01 # TG50 heights[5851, 13013] = 0.01 # TM59 def inland_below_sea_level_points(heights, s, print_to_screen=True): """ Return an array of (x, y) coordinates that are below sea-level, but not to be treated as sea. """ y, x = np.nonzero(np.logical_and(heights < 0, heights > s)) p = y * heights.shape[1] + x assert np.max(p) < 2**32, 'Not-sea point does not fit uint32' return np.array(p, dtype=np.uint32) def set_sea_level(heights, s, print_to_screen=True): """ Create a "mask" on the heights map, by setting all entries suspected to be sea to a fixed (very low) "sea level" value ``s``. """ # Find a "sea mask", set all pixels to s if print_to_screen: print('Creating sea bitmask...') t = nevis.Timer() # Treat each square separately, starting bottom left and spiraling inwards # To iterate in this way, we first create a list of squares d = 80 # Trial and error shows this is near optimal squares = _spiral_squares(heights, d) # The edges of the map are all definitely sea, so set to s e = 1 heights[:e, :] = s heights[-e:, :] = s heights[:, :e] = s heights[:, -e:] = s # Iterate over squares, and apply "sea mask" in each individually iters = 0 ny, nx = heights.shape zmax = 100 for z in range(zmax): skip = [] changed = False for i, sq in enumerate(squares): if print_to_screen: iters += 1 if iters % 100 == 0: print('.', end=(None if (iters // 100) % 79 == 0 else '')) sys.stdout.flush() # Create views of center of square, plus neighbours x0, y0 = sq x1, y1 = x0 + d, y0 + d # At the edge? Then translate by 1 pixel x0, y0 = max(x0, 1), max(y0, 1) x1, y1 = min(x1, nx - 1), min(y1, ny - 1) # Create view and views of neighbours v = heights[y0: y1, x0: x1] v0 = heights[y0 + 1: y1 + 1, x0: x1] # Above v1 = heights[y0 - 1: y1 - 1, x0: x1] # Below v2 = heights[y0: y1, x0 - 1: x1 - 1] # Left v3 = heights[y0: y1, x0 + 1: x1 + 1] # Right # Skip easy squares if np.all(v < 0): if (np.any(v == s) | np.any(v0 == s) | np.any(v1 == s) | np.any(v2 == s) | np.any(v3 == s)): v[:] = s skip.append(i) changed = True # Don't skip if < 0 but no s neighbours _yet_ continue elif np.all(v > 0): skip.append(i) continue kmax = 1000 for k in range(1000): n = (v <= 0) & (v != s) & ( (v0 == s) | (v1 == s) | (v2 == s) | (v3 == s)) if not np.any(n): break v[n] = s changed = True if k + 1 == kmax: print('WARNING: Reached kmax') for i in reversed(skip): del squares[i] if not squares: break if not changed: break if z + 1 == zmax: print('WARNING: Reached zmax') if print_to_screen: print(f'\nFinished, after {t.format()}') def add_sea_slope(heights, s, print_to_screen=True): """ Make the sea slope downwards, as you move further away from the coast. The input should be a ``heights`` array in which all points known to be sea (and only those points) have been set to height ``s``. """ print('Adding slope to sea bed') t = nevis.Timer() h = 0.01 # slope per square ny, nx = heights.shape # Create arrays of all neighbours above, below, left, and right of points v0 = heights[1:] v1 = heights[:-1] v2 = heights[:, 1:] v3 = heights[:, :-1] # Select all points next to a "sea" point, that are not themselves sea. yd, xd = np.nonzero(np.logical_and(v1 == s, v0 != s)) yu, xu = np.nonzero(np.logical_and(v0 == s, v1 != s)) yl, xl = np.nonzero(np.logical_and(v3 == s, v2 != s)) yr, xr = np.nonzero(np.logical_and(v2 == s, v3 != s)) y = np.concatenate((yd, yu + 1, yl, yr)) x = np.concatenate((xd, xu, xl, xr + 1)) # And set all these points heights to s (sea mask value) minus h heights[y, x] = s - h # Iteratively find all neighbours of the current selection that are still # at level s, or at a value lower than what this pass would set them to. # Then set these points to the current selection's height minus s. j = 0 for i in range(ny * nx): div = 1 if len(x) > 300000 else (10 if len(x) > 50000 else 100) if i % div == 0: j += 1 print('.', end=(None if j % 79 == 0 else '')) sys.stdout.flush() # Move down ok = np.nonzero(y > 0) yd, xd = y[ok] - 1, x[ok] ok = np.nonzero((heights[yd, xd] == s) | (heights[yd, xd] < heights[yd + 1, xd] - h)) yd, xd = yd[ok], xd[ok] heights[yd, xd] = heights[yd + 1, xd] - h # Move up ok = np.nonzero(y < ny - 1) yu, xu = y[ok] + 1, x[ok] ok = np.nonzero((heights[yu, xu] == s) | (heights[yu, xu] < heights[yu - 1, xu] - h)) yu, xu = yu[ok], xu[ok] heights[yu, xu] = heights[yu - 1, xu] - h # Move left ok = np.nonzero(x > 0) yl, xl = y[ok], x[ok] - 1 ok = np.nonzero((heights[yl, xl] == s) | (heights[yl, xl] < heights[yl, xl + 1] - h)) yl, xl = yl[ok], xl[ok] heights[yl, xl] = heights[yl, xl + 1] - h # Move right ok = np.nonzero(x < nx - 1) yr, xr = y[ok], x[ok] + 1 ok = np.nonzero((heights[yr, xr] == s) | (heights[yr, xr] < heights[yr, xr - 1] - h)) yr, xr = yr[ok], xr[ok] heights[yr, xr] = heights[yr, xr - 1] - h y = np.concatenate((yd, yu, yl, yr)) x = np.concatenate((xd, xu, xl, xr)) if len(y) == 0: break heights[heights < s] -= s print(f'\nFinished, after {t.format()}') def _spiral_squares(heights, d): """ Returns a list of squares (indicated by the lower-left corner) that cover the map, starting lower-left and spiraling inward. Each square has size ``d``. """ squares = [] x0, y0 = (0, 0) y1, x1 = heights.shape while x1 > x0: for x in range(x0, x1, d): squares.append((x, y0)) y0 += d if y1 <= y0: break for y in range(y0, y1, d): squares.append((x1 - d, y)) x1 -= d if x1 <= x0: break for x in range(x1 - d, x0 - d, -d): squares.append((x, y1 - d)) y1 -= d if y1 <= y0: break for y in range(y1 - d, y0 - d, -d): squares.append((x0, y)) x0 += d return squares