154 lines
5.1 KiB
Python
154 lines
5.1 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
import time
|
|
|
|
|
|
# low level methods
|
|
def coordinate_in_path(latitude, longitude, p):
|
|
|
|
return p.contains_points([(latitude, longitude)])[0]
|
|
|
|
|
|
class InvalidTrajectoryError(Exception):
|
|
def __init__(self, value): # pragma: no cover
|
|
self.value = value
|
|
|
|
def __str__(self): # pragma: no cover
|
|
return repr(self.value)
|
|
|
|
|
|
def time_in_path(df, p, maxmin='max', getall=False, name='unknown', logfile=None):
|
|
|
|
if df.empty: # pragma: no cover
|
|
return 0
|
|
|
|
latitude = df.latitude
|
|
longitude = df.longitude
|
|
|
|
def f(x): return coordinate_in_path(x['latitude'], x['longitude'], p)
|
|
|
|
df['inpolygon'] = df.apply(f, axis=1)
|
|
|
|
if maxmin == 'max':
|
|
b = (~df['inpolygon']).shift(-1)+df['inpolygon']
|
|
else: # pragma: no cover
|
|
b = (~df['inpolygon']).shift(1)+df['inpolygon']
|
|
|
|
if len(df[b == 2]):
|
|
if logfile is not None: # pragma: no cover
|
|
t = time.localtime()
|
|
timestamp = bytes('{t}'.format(
|
|
t=time.strftime('%b-%d-%Y_%H%M', t)), 'utf-8')
|
|
with open(logfile, 'ab') as f:
|
|
f.write(b'\n')
|
|
f.write(timestamp)
|
|
f.write(b' ')
|
|
f.write(bytes(name, 'utf-8'))
|
|
f.write(b' ')
|
|
f.write(bytes(maxmin, 'utf-8'))
|
|
f.write(b' ')
|
|
f.write(bytes(str(getall), 'utf-8'))
|
|
f.write(b' ')
|
|
f.write(bytes(str(len(df[b == 2])), 'utf-8'))
|
|
f.write(b' ')
|
|
if len(df[b == 2]) > 1:
|
|
f.write(b' passes found')
|
|
else:
|
|
f.write(b' pass found')
|
|
if getall: # pragma: no cover
|
|
return df[b == 2]['time'], df[b == 2]['cum_dist']
|
|
else:
|
|
return df[b == 2]['time'].min(), df[b == 2]['cum_dist'].min()
|
|
|
|
if logfile is not None: # pragma: no cover
|
|
t = time.localtime()
|
|
timestamp = bytes('{t}'.format(
|
|
t=time.strftime('%b-%d-%Y_%H%M', t)), 'utf-8')
|
|
with open(logfile, 'ab') as f:
|
|
f.write(b'\n')
|
|
f.write(timestamp)
|
|
f.write(b' ')
|
|
f.write(bytes(name, 'utf-8'))
|
|
f.write(b' ')
|
|
f.write(bytes(maxmin, 'utf-8'))
|
|
f.write(b' ')
|
|
f.write(bytes(str(getall), 'utf-8'))
|
|
f.write(b' ')
|
|
f.write(bytes(str(len(df[b == 2])), 'utf-8'))
|
|
f.write(b' ')
|
|
f.write(b' pass not found')
|
|
raise InvalidTrajectoryError(
|
|
"Trajectory doesn't go through path") # pragma: no cover
|
|
|
|
return 0 # pragma: no cover
|
|
|
|
|
|
def coursetime_first(data, paths, polygons=[], logfile=None):
|
|
|
|
entrytime = data['time'].max()
|
|
entrydistance = data['cum_dist'].max()
|
|
coursecompleted = False
|
|
|
|
if len(polygons) == 0:
|
|
polygons = [(0, str(i)) for i in range(len(paths))]
|
|
|
|
try:
|
|
entrytime, entrydistance = time_in_path(
|
|
data, paths[0], maxmin='max', name=polygons[0][1], logfile=logfile)
|
|
coursecompleted = True
|
|
except InvalidTrajectoryError: # pragma: no cover
|
|
entrytime = data['time'].max()
|
|
entrydistance = data['cum_dist'].max()
|
|
coursecompleted = False
|
|
return entrytime, entrydistance, coursecompleted
|
|
|
|
|
|
def coursetime_paths(data, paths, finalmaxmin='min', polygons=[], logfile=None):
|
|
|
|
entrytime = data['time'].max()
|
|
entrydistance = data['cum_dist'].max()
|
|
coursecompleted = False
|
|
|
|
if len(polygons) == 0:
|
|
polygons = [(0, str(i)) for i in range(len(paths))]
|
|
|
|
# corner case - empty list of paths
|
|
if len(paths) == 0: # pragma: no cover
|
|
return 0, 0, True
|
|
|
|
# end - just the Finish polygon
|
|
if len(paths) == 1:
|
|
try:
|
|
(
|
|
entrytime,
|
|
entrydistance
|
|
) = time_in_path(data, paths[0], maxmin=finalmaxmin, name=polygons[0][1], logfile=logfile)
|
|
coursecompleted = True
|
|
except InvalidTrajectoryError: # pragma: no cover
|
|
entrytime = data['time'].max()
|
|
entrydistance = data['cum_dist'].max()
|
|
coursecompleted = False
|
|
return entrytime, entrydistance, coursecompleted
|
|
|
|
if len(paths) > 1:
|
|
try:
|
|
time, dist = time_in_path(
|
|
data, paths[0], name=polygons[0][1], logfile=logfile)
|
|
data2 = data[data['time'] > time].copy()
|
|
data2['time'] = data2['time'].apply(lambda x: x-time)
|
|
data2['cum_dist'] = data2['cum_dist'].apply(lambda x: x-dist)
|
|
(
|
|
timenext,
|
|
distnext,
|
|
coursecompleted
|
|
) = coursetime_paths(data2, paths[1:], polygons=polygons[1:], logfile=logfile)
|
|
return time+timenext, dist+distnext, coursecompleted
|
|
except InvalidTrajectoryError: # pragma: no cover
|
|
entrytime = data['time'].max()
|
|
entrydistance = data['cum_dist'].max()
|
|
coursecompleted = False
|
|
|
|
return entrytime, entrydistance, coursecompleted # pragma: no cover
|