import time # low level methods def coordinate_in_path(latitude, longitude, p): return p.contains_points([(latitude, longitude)])[0] class InvalidTrajectoryError(Exception): def __init__(self, value): # pragma: no cover self.value = value def __str__(self): # pragma: no cover return repr(self.value) def time_in_path(df, p, maxmin='max', getall=False, name='unknown', logfile=None): if df.empty: # pragma: no cover return 0 def f(x): return coordinate_in_path(x['latitude'], x['longitude'], p) inpolygon = df.apply(lambda row:f(row), axis=1).copy() if maxmin == 'max': b = (~inpolygon).shift(-1)+inpolygon else: # pragma: no cover b = (~inpolygon).shift(1)+inpolygon if len(df[b == 2]): if logfile is not None: # pragma: no cover t = time.localtime() timestamp = bytes('{t}'.format( t=time.strftime('%b-%d-%Y_%H%M', t)), 'utf-8') with open(logfile, 'ab') as f: f.write(b'\n') f.write(timestamp) f.write(b' ') f.write(bytes(name, 'utf-8')) f.write(b' ') f.write(bytes(maxmin, 'utf-8')) f.write(b' ') f.write(bytes(str(getall), 'utf-8')) f.write(b' ') f.write(bytes(str(len(df[b == 2])), 'utf-8')) f.write(b' ') if len(df[b == 2]) > 1: f.write(b' passes found') else: f.write(b' pass found') if getall: # pragma: no cover return df[b == 2]['time'], df[b == 2]['cum_dist'] else: return df[b == 2]['time'].min(), df[b == 2]['cum_dist'].min() if logfile is not None: # pragma: no cover t = time.localtime() timestamp = bytes('{t}'.format( t=time.strftime('%b-%d-%Y_%H%M', t)), 'utf-8') with open(logfile, 'ab') as f: f.write(b'\n') f.write(timestamp) f.write(b' ') f.write(bytes(name, 'utf-8')) f.write(b' ') f.write(bytes(maxmin, 'utf-8')) f.write(b' ') f.write(bytes(str(getall), 'utf-8')) f.write(b' ') f.write(bytes(str(len(df[b == 2])), 'utf-8')) f.write(b' ') f.write(b' pass not found') raise InvalidTrajectoryError( "Trajectory doesn't go through path") # pragma: no cover return 0 # pragma: no cover def coursetime_first(data, paths, polygons=[], logfile=None): entrytime = data['time'].max() entrydistance = data['cum_dist'].max() coursecompleted = False if len(polygons) == 0: polygons = [(0, str(i)) for i in range(len(paths))] try: entrytime, entrydistance = time_in_path( data, paths[0], maxmin='max', name=str(polygons[0]), logfile=logfile) coursecompleted = True except InvalidTrajectoryError: # pragma: no cover entrytime = data['time'].max() entrydistance = data['cum_dist'].max() coursecompleted = False return entrytime, entrydistance, coursecompleted def coursetime_paths(data, paths, finalmaxmin='min', polygons=[], logfile=None): entrytime = data['time'].max() entrydistance = data['cum_dist'].max() coursecompleted = False if len(polygons) == 0: polygons = [(0, str(i)) for i in range(len(paths))] # corner case - empty list of paths if len(paths) == 0: # pragma: no cover return 0, 0, True # end - just the Finish polygon if len(paths) == 1: try: ( entrytime, entrydistance ) = time_in_path(data, paths[0], maxmin=finalmaxmin, name=str(polygons[0]), logfile=logfile) coursecompleted = True except InvalidTrajectoryError: # pragma: no cover entrytime = data['time'].max() entrydistance = data['cum_dist'].max() coursecompleted = False return entrytime, entrydistance, coursecompleted if len(paths) > 1: try: time, dist = time_in_path( data, paths[0], name=str(polygons[0]), logfile=logfile) data2 = data[data['time'] > time].copy() data2['time'] = data2['time'].apply(lambda x: x-time) data2['cum_dist'] = data2['cum_dist'].apply(lambda x: x-dist) ( timenext, distnext, coursecompleted ) = coursetime_paths(data2, paths[1:], polygons=polygons[1:], logfile=logfile) return time+timenext, dist+distnext, coursecompleted except InvalidTrajectoryError: # pragma: no cover entrytime = data['time'].max() entrydistance = data['cum_dist'].max() coursecompleted = False return entrytime, entrydistance, coursecompleted # pragma: no cover