start new data flow
This commit is contained in:
149
rowers/dataflow.py
Normal file
149
rowers/dataflow.py
Normal file
@@ -0,0 +1,149 @@
|
||||
from rowers.celery import app
|
||||
from rowers.utils import myqueue
|
||||
import zipfile
|
||||
import os
|
||||
from rowingdata import get_file_type
|
||||
import django_rq
|
||||
|
||||
queue = django_rq.get_queue('default')
|
||||
queuelow = django_rq.get_queue('low')
|
||||
queuehigh = django_rq.get_queue('default')
|
||||
|
||||
from django.conf import settings
|
||||
from rowers.forms import DocumentsForm, TeamUploadOptionsForm
|
||||
from rowers.models import TeamInviteForm, Workout
|
||||
from uuid import uuid4
|
||||
|
||||
def generate_job_id():
|
||||
return str(uuid4())
|
||||
|
||||
def valid_uploadoptions(uploadoptions):
|
||||
secret = uploadoptions.get('secret', '')
|
||||
if secret != settings.UPLOAD_SERVICE_SECRET:
|
||||
return False
|
||||
|
||||
fstr = uploadoptions.get('file', None)
|
||||
if fstr is None:
|
||||
return False
|
||||
|
||||
# check if file can be found
|
||||
if not os.path.isfile(fstr):
|
||||
return False
|
||||
|
||||
form = DocumentsForm(uploadoptions)
|
||||
optionsform = TeamUploadOptionsForm(uploadoptions)
|
||||
rowerform = TeamInviteForm(uploadoptions)
|
||||
rowerform.fields.pop('email') # we don't need email here
|
||||
return form.is_valid() and optionsform.is_valid() and rowerform.is_valid()
|
||||
|
||||
def is_zipfile(file_path):
|
||||
fileformat = get_file_type(file_path)
|
||||
return fileformat[0] == 'zip'
|
||||
|
||||
def is_invalid_file(file_path):
|
||||
fileformat = get_file_type(file_path)
|
||||
if fileformat == "imageformat":
|
||||
return False, "Image files are not supported for upload."
|
||||
if fileformat == "json":
|
||||
return False, "JSON files are not supported for upload."
|
||||
if fileformat == "c2log":
|
||||
return False, "Concept2 log files are not supported for upload."
|
||||
if fileformat == "nostrokes":
|
||||
return False, "No stroke data found in the file."
|
||||
if fileformat == "kml":
|
||||
return False, "KML files are not supported for upload."
|
||||
if fileformat == "notgzip":
|
||||
return False, "The gzip file appears to be corrupted."
|
||||
if fileformat == "rowprolog":
|
||||
return False, "RowPro logbook summary files are not supported for upload."
|
||||
if fileformat == "gpx":
|
||||
return False, "GPX files are not supported for upload."
|
||||
if fileformat == "unknown":
|
||||
extension = os.path.splitext(f2)[1]
|
||||
filename = os.path.splitext(f2)[0]
|
||||
if extension == '.gz':
|
||||
filename = os.path.splitext(filename)[0]
|
||||
extension2 = os.path.splitext(filename)[1]+extension
|
||||
extension = extension2
|
||||
f4 = filename+'a'+extension
|
||||
copyfile(f2, f4)
|
||||
_ = myqueue(queuehigh,
|
||||
handle_sendemail_unrecognized,
|
||||
f4,
|
||||
r.user.email)
|
||||
return False, "The file format is not recognized or supported."
|
||||
return True, ""
|
||||
|
||||
|
||||
def upload_handler(filename, uploadoptions):
|
||||
if not valid_uploadoptions(uploadoptions):
|
||||
return {
|
||||
"status": "error",
|
||||
"job_id": None,
|
||||
"message": "Invalid upload options or file not found."
|
||||
}
|
||||
is_valid, message = is_invalid_file(filename)
|
||||
if not is_valid:
|
||||
return {
|
||||
"status": "error",
|
||||
"job_id": None,
|
||||
"message": message
|
||||
}
|
||||
if is_zipfile(file):
|
||||
parent_job_id = generate_job_id()
|
||||
_ = myqueue.enqueue(
|
||||
queuehigh,
|
||||
unzip_and_process,
|
||||
filename,
|
||||
uploadoptions,
|
||||
parent_job_id)
|
||||
return {
|
||||
"status": "processing",
|
||||
"job_id": parent_job_id,
|
||||
"message": "Your zip file is being processed. You will be notified when it is complete."
|
||||
}
|
||||
job_id = generate_job_id()
|
||||
_ = myqueue.enqueue(
|
||||
queuehigh,
|
||||
process_single_file,
|
||||
filename,
|
||||
uploadoptions,
|
||||
job_id)
|
||||
return {
|
||||
"status": "processing",
|
||||
"job_id": job_id,
|
||||
"message": "Your file is being processed. You will be notified when it is complete."
|
||||
}
|
||||
|
||||
@app.task
|
||||
def unzip_and_process(zip_filepath, uploadoptions, parent_job_id):
|
||||
with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
|
||||
for id, filename in enumerate(zip_ref.namelist()):
|
||||
datafile = zip_ref.extract(filename, path='media/')
|
||||
if id > 0:
|
||||
uploadoptions['title'] = uploadoptions['title'] + " Part {id+1}".format(id=id)
|
||||
uploadoptions['file'] = datafile
|
||||
job_id = generate_job_id()
|
||||
_ = myqueue.enqueue(
|
||||
queuehigh,
|
||||
process_single_file,
|
||||
datafile,
|
||||
uploadoptions,
|
||||
job_id)
|
||||
|
||||
return {
|
||||
"status": "completed",
|
||||
"job_id": parent_job_id,
|
||||
"message": "All files from the zip have been processed."
|
||||
}
|
||||
|
||||
|
||||
@app.task
|
||||
def process_single_file(file_path, uploadoptions, job_id):
|
||||
# placeholder
|
||||
return True
|
||||
|
||||
# process data to create df
|
||||
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user