Map Reduce
Djangae contains functionality to allow Google's mapreduce library to run over instances of Django models.
Instructions:
- Add the mapreduce library to your project, e.g.
$ pip install GoogleAppEngineMapReduce
or from https://github.com/GoogleCloudPlatform/appengine-mapreduce. - Ensure that you have included
djangae.urls
in your URL config. - Add
'djangae.contrib.processing.mapreduce'
toINSTALLED_APPS
in your Django settings. - Subclass
djangae.contrib.mappers.pipes.MapReduceTask
, and define yourmap
method. - Your class can also override attributes such as
shard_count
,job_name
, andqueue_name
(see the code for all options). - The model to map over can either be defined as a attribute on the class or can be passed in when you instantiate it.
- In your code, call
YourMapreduceClass().start()
to trigger the mapping of themap
method over all of the instances of your model. Optionally you can use thequeue_name
keyword argument to specify a task queue that will be used (don't forget to define the queue in queue.yaml). - You can optionally pass any additional args and/or kwargs to the
.start()
method, which will then be passed to each call of the.map()
method for you.
Note that currently only the 'map' stage is implemented. There is currently no reduce stage, but you could contribute it :-).
Helpful functions
djangae.contrib.mappers.defer_iteration
This function takes a queryset
and a callback
, and also optionally a shard_size
and a _queue
. It
defers background tasks to iterate over the entire queryset on the specified task queue calling your
callback function on each Django model instance in the queryset.
- The shard size is the number of instance which it will attempt to process in a single task, and defaults to 500.
- Depending on how long your callback function is likely to take on each instance, you should reduce the shard size in order that it can safely process that many instances in App Engine's task time limit of 10 minutes.
- There is no exception handling around the callback function, so an exception is raised when processing one of the instances then that task will fail and be retried, thus re-processing any prior instances in that same shard.
- Your callback function should be idempotent.