Source code for childcount.models.Patient

#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4 coding=utf-8
# maintainer: ukanga

"""A model representing a single resident of the
catchment area. 
We do not document the model fields, because you
can find those yourself in the code!
"""

from datetime import date, timedelta

from django import db
from django.db import models
from django.db import connection
from django.db.models import F
from django.db.models import Count
from django.db.models.query import QuerySet
from django.utils.translation import ugettext as _
from django.forms import CharField
import reversion

from reporters.models import Reporter
from locations.models import Location

from indicator.cache import cache_simple

import childcount.models.Clinic
import childcount.models.CHW


[docs]class PatientManager(models.Manager): """Custom manager for :class:`Patient` that return our custom :class:`Patient.QuerySet` model. """ def get_query_set(self): return self.model.QuerySet(self.model)
[docs]class Patient(models.Model): """Holds the patient details, properties and methods related to it """ class Meta: app_label = 'childcount' db_table = 'cc_patient' verbose_name = _(u"Patient") verbose_name_plural = _(u"Patients") ordering = ('health_id', ) GENDER_MALE = 'M' GENDER_FEMALE = 'F' GENDER_CHOICES = ( (GENDER_MALE, _(u"Male")), (GENDER_FEMALE, _(u"Female"))) """Patient genders""" STATUS_ACTIVE = 1 STATUS_INACTIVE = 0 STATUS_DEAD = -1 STATUS_CHOICES = ( (STATUS_ACTIVE, _(u"Alive")), (STATUS_INACTIVE, _(u"Relocated")), (STATUS_DEAD, _(u"Dead"))) """Status options for Patients""" objects = PatientManager() health_id = models.CharField(_(u"Health ID"), max_length=6, blank=True, \ null=True, db_index=True, unique=True, \ help_text=_(u"Unique Health ID")) created_on = models.DateTimeField(_(u"Created on"), auto_now_add=True, \ help_text=_(u"When the patient record " \ "was created"),\ db_index=True) updated_on = models.DateTimeField(auto_now=True) first_name = models.CharField(_(u"First name"), max_length=100) last_name = models.CharField(_(u"Last name"), max_length=50, \ help_text=_(u"Family name or surname")) gender = models.CharField(_(u"Gender"), max_length=1, \ choices=GENDER_CHOICES,\ db_index=True) dob = models.DateField(_(u"Date of Birth"), null=True, blank=True,\ db_index=True) estimated_dob = models.BooleanField(_(u"Estimated DOB"), \ help_text=_(u"True or false: the " \ "date of birth is only " \ "an approximation.")) mother = models.ForeignKey('self', blank=True, null=True, \ verbose_name=_(u"Mother or Guardian."), \ related_name='child') household = models.ForeignKey('self', blank=True, null=True, \ verbose_name=_(u"Head of House"), \ help_text=_(u"The primary caregiver in " \ "this person's household " \ "(self if primary caregiver)"),\ related_name='household_member') chw = models.ForeignKey('CHW', db_index=True, verbose_name=_(u"Community Health Worker")) location = models.ForeignKey(Location, blank=True, null=True, \ related_name='resident', \ verbose_name=_(u"Location"), \ help_text=_(u"The location this person " \ "lives within")) clinic = models.ForeignKey('Clinic', blank=True, null=True, \ verbose_name=_(u"Health facility"), \ help_text=_(u"The primary health facility " \ "that this patient visits")) mobile = models.CharField(_(u"Mobile phone number"), max_length=16, \ blank=True, null=True) status = models.SmallIntegerField(_(u"Status"), choices=STATUS_CHOICES, \ default=STATUS_ACTIVE,\ db_index=True) hiv_status = models.NullBooleanField(_(u"HIV Status"), blank=True, null=True) hiv_exposed = models.NullBooleanField(_(u"HIV Exposed?"), blank=True, null=True) latitude = models.DecimalField(max_digits=8, decimal_places=6, \ blank=True, null=True, \ help_text=_("The physical latitude of this person's household")) longitude = models.DecimalField(max_digits=8, decimal_places=6, \ blank=True, null=True, \ help_text=_("The physical longitude of this person's household")) elevation = models.DecimalField(max_digits=8, decimal_places=2, \ blank=True, null=True, \ help_text=_("The physical elevation (meters) of this "\ "person's household"))
[docs] def is_head_of_household(self): """Check if patient is the head his/her own household""" return self.household == self
def get_dictionary(self): days, months = self.age_in_days_months() return {'full_name': '%s %s' % (self.first_name, self.last_name), 'age': '%sm' % months, 'days': days, 'clinic': self.clinic, 'mobile': self.mobile, 'status': self.STATUS_CHOICES.index(self.status), 'chw': self.chw, 'gender': self.gender, 'guardian': self.guardian}
[docs] def age_in_days_weeks_months(self, relative_to=date.today()): """return the age of the patient in days and in months :param relative_to: Date from which to compute the patient's age :type relative_to: :class:`datetime.date` """ days = (relative_to - self.dob).days weeks = days / 7 months = int(days / 30.4375) return days, weeks, months
[docs] def years(self): """Calculate patient's age in years""" days, weeks, months = self.age_in_days_weeks_months() return months / 12
[docs] def humanised_age(self): """return a string containing a human readable age""" days, weeks, months = self.age_in_days_weeks_months() if days < 21: return _(u"%(days)sd") % {'days': days} elif weeks < 12: return _(u"%(weeks)sw") % {'weeks': weeks} elif months < 60: return _(u"%(months)sm") % {'months': months} else: years = months / 12 return _(u"%(years)sy") % {'years': years}
[docs] def full_name(self): """Return the patients first and last names""" return ' '.join([self.first_name, self.last_name])
def __unicode__(self): return u'%s %s %s/%s' % (self.health_id.upper(), self.full_name(), \ self.gender, self.humanised_age()) @classmethod
[docs] def is_valid_health_id(cls, health_id): """Naive check if a health ID is valid :param health_id: Health ID to check :type health_id: str :returns: bool """ MIN_LENGTH = 4 MAX_LENGTH = 4 BASE_CHARACTERS = '0123456789acdefghjklmnprtuvwxy' try: health_id = unicode(health_id) health_id = health_id.lower() except: return False if len(health_id) < MIN_LENGTH or len(health_id) > MAX_LENGTH: return False for char in health_id: if char not in BASE_CHARACTERS: return False # TODO checkbit return True
@classmethod
[docs] def registrations_by_date(cls): """Number of patients registered per day :returns: Iterable of (`datetime.date`, n_registrations) tuples """ conn = connection.cursor() by_date = conn.execute( 'SELECT DATE(`created_on`), COUNT(*) FROM `cc_patient` \ GROUP BY DATE(`created_on`) ORDER BY DATE(`created_on`) ASC;') # Data comes back in an iterable of (date, count) tuples raw_data = conn.fetchall() dates = [] counts = [] agg = 0 for pair in raw_data: dates.append((pair[0] - date.today()).days) agg += pair[1] counts.append(agg) return (dates, counts)
@classmethod def table_columns(cls): columns = [] columns.append( {'name': cls._meta.get_field('household').verbose_name, \ 'bit': '{{object.household.health_id}}'}) columns.append( {'name': cls._meta.get_field('health_id').verbose_name, \ 'bit': '{{object.health_id}}'}) columns.append( {'name': _("Name"), \ 'bit': '{{object.last_name}} {{object.first_name}}'}) columns.append( {'name': cls._meta.get_field('gender').verbose_name, \ 'bit': '{{object.gender}}'}) columns.append( {'name': _(u"Age"), \ 'bit': '{{object.humanised_age}}'}) columns.append( {'name': cls._meta.get_field('chw').verbose_name, \ 'bit': '{{object.chw}}'}) sub_columns = None return columns, sub_columns # # BEGIN Indicator Code # objects = PatientManager()
[docs] class QuerySet(QuerySet): """This QuerySet extends the default django QuerySet with useful filters. .. hint:: All of these filters take standard `start` and `end` arguments for consistency's sake. These parameters are defined as: - `start`: start date of time period (inclusive) -- :class:`datetime.datetime` - `end`: end date of time period (inclusive) -- :class:`datetime.datetime` """
[docs] def created_before(self, cutoff): """Patients who have an encounter before the specified date. The :meth:`Patient.created_on` field does not work, since that just indicates when the DB row was created -- not the encounter date when the patient was created. :param cutoff: Method returns all patients with encounters on or before this date. :type cutoff: :class:`datetime.datetime` """ pks = self\ .filter(encounter__encounter_date__lte=cutoff)\ .values('pk')\ .distinct() return self\ .filter(pk__in=pks)
[docs] def alive(self, start, end): """Patients who were alive at `end`. """ return self\ .exclude(status=Patient.STATUS_INACTIVE)\ .exclude(encounter__ccreport__deathreport__death_date__lte=end) # # Age-related filters #
[docs] def neonatal(self, start, end): """0 days <= age < 28 days""" return self.age(start, end, 0, 28)
[docs] def under_six_months(self, start, end): """0 days <= age < 180 days""" return self.age(start, end, 0, 30*6)
[docs] def under_nine_months(self, start, end): """0 days <= age < 9 months (270 days)""" return self.age(start, end, 0, 30*9)
[docs] def muac_eligible(self, start, end): """180 days <= age < 5 years (5*365 days)""" return self.age(start, end, 6*30, 5*365)
[docs] def under_one(self, start, end): """0 days <= age < 365 days""" return self.age(start, end, 0, 365)
[docs] def under_five(self, start, end): """0 days <= age < 5 years (5*365 days)""" return self.age(start, end, 0, 5*365)
[docs] def under_nine(self, start, end): """0 days <= age < 9 years (9*365 days)""" return self.age(start, end, 0, 9*365)
[docs] def over_five(self, start, end): """5 years (5*365 days) < age""" return self.age(start, end, 5*365, None)
[docs] def age(self, start, end, min_days, max_days): """Living patients whose age (in days) was `min_days` <= `patient__age` < `max_days` at the datetime `end` """ filter_on = {} if min_days: filter_on['dob__lte'] = end-timedelta(days=min_days) if max_days: filter_on['dob__gt'] = end-timedelta(days=max_days) # We filter on dob__lte=end because we never want to # include people who were not born at date end return self\ .alive(start, end)\ .filter(**filter_on)\ .filter(dob__lte=end)
[docs] def household(self, start, end): """Living people who are/were household heads at `end`""" return self\ .alive(start, end)\ .filter(pk=F('household__pk')) # # Pregnancy filters #
[docs] def pregnant(self, start, end): """Patients who were between 0 and 9 months pregnant at date `end` """ return self.pregnant_months(start, end, 0.0, 9.0, False, False)
@cache_simple def pregnant_data(self): """Collect all of the pregnancy reports into a single data structure. We can cache this data structure and use it for running various queries about pregnant women. """ data = {} # Get all of the potentially pregant women women_pks = Patient\ .objects\ .filter(gender=Patient.GENDER_FEMALE, encounter__ccreport__pregnancyreport__pregnancy_month__isnull=False) # Remove duplicate women women = Patient\ .objects\ .filter(pk__in=women_pks)\ .order_by('pk') for p in women: edate = date(2050,01,01) data[p.pk] = [] # Look for all pregnancies b/c a woman can # be pregnant many times (duh) while True: # Check if there is a PregnancyReport before edate reps = p\ .encounter_set\ .filter(ccreport__pregnancyreport__pregnancy_month__isnull=False,\ encounter_date__lte=edate) if not reps.count(): break # If so, record the estimated conception date try: pr = reps\ .latest('ccreport__pregnancyreport__encounter__encounter_date')\ .ccreport_set\ .filter(polymorphic_ctype__model='pregnancyreport')[0] except IndexError: pr = reps\ .latest('ccreport__pregnancyreport__encounter__encounter_date')\ .ccreport_set\ .filter(polymorphic_ctype__model='spregnancy')[0] days_preg = timedelta(30.4375 * pr.pregnancy_month) row = {} row["start_date"] = pr.encounter.encounter_date - days_preg # Look for a pregnancy starting at least 6 months before # the estimated conception date edate = pr.encounter.encounter_date - days_preg - timedelta(30*6) # Set date range for births and miscarriages drange = (row['start_date'], row['start_date'] + timedelta(10*30.4375)) # Look for births birth = Patient\ .objects\ .filter(dob__range=drange, mother__pk=p.pk) if birth.count() > 0: row['birth_date'] = birth[0].dob data[p.pk].append(row) continue # Look for stillbirths/miscarriages sbm = Patient\ .objects\ .get(pk=p.pk)\ .encounter_set\ .filter(ccreport__stillbirthmiscarriagereport__incident_date__range=drange) if sbm.count() > 0: row['sbm_date'] = sbm[0].encounter_date data[p.pk].append(row) continue # If there's no birth or stillbirth/miscarriage report, then # just guess the end date row['end_date'] = row['start_date'] + timedelta(9*30.4375) data[p.pk].append(row) print data return data
[docs] def pregnant_months(self, start, end, start_month, end_month, include_delivered, include_stillbirth): """Uses the cached pregnant_data() call to return women who are between `start_month` and `end_month` months pregnant (inclusive). :param start_month: Include women who are at least `start_month` months pregant :param end_month: Include women who are at most `end_month` months pregant :type start_month: float :type end_month: float :param include_delivered: Include women who have delivered their babies by time `end`? :type include_delivered: bool :param include_stillbirth: Include women who have had a stillbirth or miscarriage before the time `end`? :type include_stillbirth: bool """ data = self.pregnant_data() pks = set() for pk in data: pregs = data[pk] for preg in pregs: days_preg = (end - preg['start_date']).days current_month = days_preg/30.4375 #print "Considering patient [%d]" % pk if not (current_month >= start_month and current_month <= end_month): # print 'not in right month... %f' % current_month continue if not include_delivered: if current_month > 9.5: continue if ('birth_date' in preg) and (preg['birth_date'] <= end.date()): continue if not include_stillbirth and \ ('sbm_date' in preg) and (preg['sbm_date'] <= end): continue #print "In month %f patient %d" % (current_month, pk) pks.add(pk) #print pks return self.filter(pk__in=pks)
""" def pregnant_months2(self, start, end, start_month, end_month, include_delivered, include_stillbirth): assert start_month >= 0, _("Start month must be >= 0") assert start_month < end_month, \ _("Start month must be < end_month") assert isinstance(start_month, float), _("Start month must be float") assert isinstance(end_month, float), _("End month must be float") pregs = self\ .filter(gender=Patient.GENDER_FEMALE, encounter__ccreport__pregnancyreport__pregnancy_month__isnull=False,\ encounter__encounter_date__lte=end)\ .values('pk')\ .distinct() pks = set() patients = Patient\ .objects\ .filter(pk__in=[item['pk'] for item in pregs])\ .iterator() for p in patients: pr = p\ .encounter_set\ .filter(ccreport__pregnancyreport__pregnancy_month__isnull=False,\ encounter_date__lte=end+timedelta(30.4375*end_month))\ .latest('ccreport__pregnancyreport__encounter__encounter_date')\ .ccreport_set\ .filter(polymorphic_ctype__model='pregnancyreport')[0] days_since = (end - pr.encounter.encounter_date).days months_since = days_since/30.4375 current_month = months_since + pr.pregnancy_month print "Considering patient [%s]" % p.health_id.upper() if not (current_month >= start_month and current_month <= end_month): print 'not in right month... %f' % current_month continue # If we are not including women who have already delivered, # check to see that the lady has not yet delivered if not include_delivered: # Skip women who are more than 9.5 months pregnant # b/c we assume that they have delivered if current_month >= 9.5: continue b = p\ .child\ .filter(dob__lte=end, dob__gte=(pr.encounter.encounter_date -\ timedelta(pr.pregnancy_month*30.4375))) # If the birthreport has been found, then she's no longer # pregnant if b.count() > 0: print 'birth at on %s' % b[0].dob continue if not include_stillbirth: # Look for a stillbirth/miscarriage sbm = p\ .encounter_set\ .filter(encounter_date__lte=end, ccreport__stillbirthmiscarriagereport__incident_date__gte=\ pr.encounter.encounter_date-timedelta(pr.pregnancy_month*30.4375),\ ccreport__stillbirthmiscarriagereport__incident_date__lte=end) # If there was a stillbirth/misscariage, then she's # no longer pregnant if sbm.count() > 0: print 'stillbirth' continue print '*PR submitted now %f (was %f on %s #%d)' % \ (current_month, pr.pregnancy_month, \ pr.encounter.encounter_date, pr.pk) pks.add(p.pk) #print pks db.reset_queries() return self.filter(pk__in=pks) """
[docs] def pregnant_recently(self, start, end): """Pregnant or within 42 days of delivery""" return self.pregnant_months(start, end, 0.0, 10.4, True, False)
[docs] def over_five_not_pregnant_recently(self, start, end): """More than five years of age and not in :meth:`.pregnant_recently`""" pr = self.pregnant_recently(start, end) return self\ .over_five(start, end)\ .exclude(pk__in=[p.pk for p in pr])
[docs] def pregnant_during_interval(self, start, end): """Women who were pregnant at some point between `start` and `end`""" ilen = (end - start).days imon = ilen/30.4375 return self.pregnant_months(start, end, 0.0, 9.0 + imon, True, True)
[docs] def delivered(self, start, end): """Women who delivered between `start` and `end`""" return self.pregnant_months(start, end, 9.0, 10.0, True, False)
def pk_list(self): return [p[0] for p in self.order_by('pk').values_list('pk')] def to_list(self): out = [] out.append(str(type(self))) out.append(str(self.model)) out += [str(p) for p in self.pk_list()] return out
reversion.register(Patient)