Private
Public Access
1
0

v1 using basic auth

This commit is contained in:
2024-11-26 08:37:36 +01:00
parent a8973b80b1
commit b94197a74b
4 changed files with 158 additions and 2 deletions

View File

@@ -254,6 +254,8 @@ urlpatterns = [
name='strokedatajson_v3'),
re_path(r'^api/TCX/workouts/$', views.strokedata_tcx,
name='strokedata_tcx'),
re_path(r'^api/rowingdata/workouts/$', views.strokedata_rowingdata,
name='strokedata_rowingdata'),
re_path(r'^api/courses/$', views.course_list, name='course_list'),
re_path(r'^api/courses/(?P<id>\d+)/$', views.get_crewnerd_kml, name='get_crewnerd_kml'),
re_path(r'^api/courses/kml/liked/$', views.get_crewnerd_liked, name='get_crewnerd_liked'),

View File

@@ -21,6 +21,8 @@ import rowingdata.tcxtools as tcxtools
from rowingdata import TCXParser, rowingdata
import arrow
import base64
class XMLParser(BaseParser):
media_type = "application/xml"
@@ -422,6 +424,52 @@ def get_crewnerd_liked(request):
# Stroke data views
@csrf_exempt
@logged_in_or_basicauth()
def strokedata_rowingdata(request):
"""
Upload a .csv file (rowingdata standard) through API, using Basic Auth
"""
r = getrower(request.user)
if r.rowerplan == 'freecoach':
return HttpResponseNotAllowed("This endpoint is for users, not for free coach accounts")
if request.method == 'POST':
form = DocumentsForm(request.POST, request.FILES)
if not form.is_valid():
return HttpResponseBadRequest(json.dumps(form.errors))
f = form.cleaned_data['file']
if f is None:
return HttpResponseBadRequest("Missing file")
filename, completefilename = handle_uploaded_file(f)
uploadoptions = {
'secret': settings.UPLOAD_SERVICE_SECRET,
'user': r.user.id,
'file': completefilename,
'workouttype': form.cleaned_data['workouttype'],
'boattype': form.cleaned_data['boattype'],
'title': form.cleaned_data['title'],
'rpe': form.cleaned_data['rpe'],
'notes': form.cleaned_data['notes']
}
url = settings.UPLOAD_SERVICE_URL
_ = myqueue(queuehigh,
handle_request_post,
url,
uploadoptions)
return JsonResponse(
{
"status": "success",
}
)
@csrf_exempt
#@login_required()
@api_view(["POST"])

View File

@@ -93,7 +93,8 @@ from django.http import (
HttpResponse, HttpResponseRedirect,
JsonResponse,
HttpResponseForbidden, HttpResponseNotAllowed,
HttpResponseNotFound, Http404
HttpResponseNotFound, Http404,
HttpResponseBadRequest,
)
from django.contrib.auth import authenticate, login, logout
from rowers.forms import (
@@ -302,6 +303,112 @@ from rowers.weather import get_wind_data, get_airport_code, get_metar_data
from oauth2_provider.models import Application, Grant, AccessToken
import base64
from django.http import HttpResponse
from django.contrib.auth import authenticate, login
#############################################################################
#
def view_or_basicauth(view, request, test_func, realm = "", *args, **kwargs):
"""
This is a helper function used by both 'logged_in_or_basicauth' and
'has_perm_or_basicauth' that does the nitty of determining if they
are already logged in or if they have provided proper http-authorization
and returning the view if all goes well, otherwise responding with a 401.
"""
if test_func(request.user):
# Already logged in, just return the view.
#
return view(request, *args, **kwargs)
# They are not logged in. See if they provided login credentials
#
if 'HTTP_AUTHORIZATION' in request.META:
auth = request.META['HTTP_AUTHORIZATION'].split()
if len(auth) == 2:
# NOTE: We are only support basic authentication for now.
#
if auth[0].lower() == "basic":
uname, passwd = base64.b64decode(auth[1]).decode("utf-8").split(':')
user = authenticate(username=uname, password=passwd)
if user is not None:
if user.is_active:
login(request, user)
request.user = user
return view(request, *args, **kwargs)
# Either they did not provide an authorization header or
# something in the authorization attempt failed. Send a 401
# back to them to ask them to authenticate.
#
response = HttpResponse()
response.status_code = 401
response['WWW-Authenticate'] = 'Basic realm="%s"' % realm
return response
#############################################################################
#
def logged_in_or_basicauth(realm = ""):
"""
A simple decorator that requires a user to be logged in. If they are not
logged in the request is examined for a 'authorization' header.
If the header is present it is tested for basic authentication and
the user is logged in with the provided credentials.
If the header is not present a http 401 is sent back to the
requestor to provide credentials.
The purpose of this is that in several django projects I have needed
several specific views that need to support basic authentication, yet the
web site as a whole used django's provided authentication.
The uses for this are for urls that are access programmatically such as
by rss feed readers, yet the view requires a user to be logged in. Many rss
readers support supplying the authentication credentials via http basic
auth (and they do NOT support a redirect to a form where they post a
username/password.)
Use is simple:
@logged_in_or_basicauth
def your_view:
...
You can provide the name of the realm to ask for authentication within.
"""
def view_decorator(func):
def wrapper(request, *args, **kwargs):
return view_or_basicauth(func, request,
lambda u: u.is_authenticated,
realm, *args, **kwargs)
return wrapper
return view_decorator
#############################################################################
#
def has_perm_or_basicauth(perm, realm = ""):
"""
This is similar to the above decorator 'logged_in_or_basicauth'
except that it requires the logged in user to have a specific
permission.
Use:
@logged_in_or_basicauth('asforums.view_forumcollection')
def your_view:
...
"""
def view_decorator(func):
def wrapper(request, *args, **kwargs):
return view_or_basicauth(func, request,
lambda u: u.has_perm(perm),
realm, *args, **kwargs)
return wrapper
return view_decorator
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')

View File

@@ -4931,7 +4931,6 @@ def workout_upload_api(request):
post_data = {k: q.getlist(k) if len(
q.getlist(k)) > 1 else v for k, v in q.items()}
# only allow local host
hostt = request.get_host().split(':')
if hostt[0] not in ['localhost', '127.0.0.1', 'dev.rowsandall.com', 'rowsandall.com']: