fixing zip file processing
This commit is contained in:
@@ -255,7 +255,7 @@ class Command(BaseCommand):
|
|||||||
except Message.DoesNotExist:
|
except Message.DoesNotExist:
|
||||||
attachment.delete()
|
attachment.delete()
|
||||||
for rower in rowers:
|
for rower in rowers:
|
||||||
if extension == 'zip':
|
if 'zip' in extension:
|
||||||
try:
|
try:
|
||||||
zip_file = zipfile.ZipFile(attachment.document)
|
zip_file = zipfile.ZipFile(attachment.document)
|
||||||
for id,filename in enumerate(zip_file.namelist()):
|
for id,filename in enumerate(zip_file.namelist()):
|
||||||
|
|||||||
@@ -66,6 +66,67 @@ workout run
|
|||||||
w = ws[0]
|
w = ws[0]
|
||||||
self.assertEqual(w.workouttype,'Run')
|
self.assertEqual(w.workouttype,'Run')
|
||||||
|
|
||||||
|
@override_settings(TESTING=True)
|
||||||
|
class ZipEmailUpload(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
redis_connection.publish('tasks','KILL')
|
||||||
|
u = User.objects.create_user('john',
|
||||||
|
'sander@ds.ds',
|
||||||
|
'koeinsloot')
|
||||||
|
r = Rower.objects.create(user=u,gdproptin=True,
|
||||||
|
gdproptindate=timezone.now()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.theadmin = UserFactory(is_staff=True)
|
||||||
|
self.rtheadmin = Rower.objects.create(user=self.theadmin,
|
||||||
|
birthdate = faker.profile()['birthdate'],
|
||||||
|
gdproptin=True,
|
||||||
|
gdproptindate=timezone.now(),
|
||||||
|
rowerplan='coach')
|
||||||
|
|
||||||
|
nu = datetime.datetime.now()
|
||||||
|
workoutsbox = Mailbox.objects.create(name='workouts1')
|
||||||
|
workoutsbox.save()
|
||||||
|
failbox = Mailbox.objects.create(name='Failed')
|
||||||
|
failbox.save()
|
||||||
|
|
||||||
|
m = Message(mailbox=workoutsbox,
|
||||||
|
from_header = u.email,
|
||||||
|
subject = "Sprint",
|
||||||
|
body = """
|
||||||
|
workout water
|
||||||
|
""")
|
||||||
|
m.save()
|
||||||
|
a2 = 'media/mailbox_attachments/zipfile.zip'
|
||||||
|
copy('rowers/tests/testdata/zipfile.zip',a2)
|
||||||
|
a = MessageAttachment(message=m,document=a2[6:])
|
||||||
|
a.save()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
for filename in os.listdir('media/mailbox_attachments'):
|
||||||
|
path = os.path.join('media/mailbox_attachments/',filename)
|
||||||
|
if not os.path.isdir(path):
|
||||||
|
try:
|
||||||
|
os.remove(path)
|
||||||
|
except (IOError,FileNotFoundError,OSError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@patch('rowers.dataprep.create_engine')
|
||||||
|
@patch('rowers.polarstuff.get_polar_notifications')
|
||||||
|
@patch('rowers.c2stuff.requests.get', side_effect=mocked_requests)
|
||||||
|
@patch('rowers.c2stuff.requests.post', side_effect=mocked_requests)
|
||||||
|
def test_emailupload(
|
||||||
|
self, mocked_sqlalchemy,mocked_polar_notifications, mock_get, mock_post):
|
||||||
|
out = StringIO()
|
||||||
|
call_command('processemail', stdout=out,testing=True,mailbox='workouts1')
|
||||||
|
self.assertIn('Successfully processed email attachments',out.getvalue())
|
||||||
|
|
||||||
|
ws = Workout.objects.all()
|
||||||
|
|
||||||
|
self.assertEqual(len(ws),5)
|
||||||
|
w = ws[4]
|
||||||
|
self.assertEqual(w.name,'Sprint (5)')
|
||||||
|
|
||||||
|
|
||||||
@override_settings(TESTING=True)
|
@override_settings(TESTING=True)
|
||||||
class EmailBikeErgUpload(TestCase):
|
class EmailBikeErgUpload(TestCase):
|
||||||
|
|||||||
BIN
rowers/tests/testdata/testdata.csv.gz
vendored
BIN
rowers/tests/testdata/testdata.csv.gz
vendored
Binary file not shown.
BIN
rowers/tests/testdata/zipfile.zip
vendored
BIN
rowers/tests/testdata/zipfile.zip
vendored
Binary file not shown.
Reference in New Issue
Block a user