Location : South Delhi, India
Objective : Data Collection, Wrangling, convert to CSV, insertion into db and anlaysis
#Python Library import statements
import xml.etree.cElementTree as ET
import pprint
import re
from collections import defaultdict
import csv
import cerberus
import schema
import codecs
Lets check how many different tags are there in the dataset.
#what tags are there and how many
file_name = 'ex_ciiyEL8cBe67FrpUrFy4RJdN59ihC.osm' #
def count_tags(filename):
tags = {}
for _,elem in ET.iterparse(filename):
if elem.tag in tags:
tags[elem.tag] +=1
else:
tags[elem.tag] =1
return tags
def test():
tags = count_tags('ex_ciiyEL8cBe67FrpUrFy4RJdN59ihC.osm')
pprint.pprint(tags)
if __name__ == "__main__":
test()
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'" \?%#$@\,\.\t\r\n]')
def key_type(element, keys):
if element.tag == "tag":
for tag in element.iter("tag"):
if lower.search(tag.attrib['k']):
keys['lower'] +=1
elif lower_colon.search(tag.attrib['k']):
keys['lower_colon'] += 1
elif problemchars.search(tag.attrib['k']):
keys["problemchars"] += 1
else :
keys["other"] += 1
return keys
def process_map(filename):
keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
for _, element in ET.iterparse(filename):
keys = key_type(element, keys)
return keys
keys = process_map(file_name)
pprint.pprint(keys)
We have written two functions key_type() and process_map(), these two help to seggregate our key values using regex matching. We are seprating our dataset in 4 groups : lower, lower_colon, problemchars, other.
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
expected = ["Avenue", "Commons", "Court", "Drive", "Lane", "Parkway",
"Place", "Road", "Square", "Street", "Trail"]
#Corrections for data related inconsistency issues, if there is a value with 'Ln'. It should be converted to Lane. This will reduce inconsistency
mapping = {'Ave' : 'Avenue',
'Dr' : 'Drive',
'Ln' : 'Lane',
'Pkwy' : 'Parkway',
'Rd' : 'Road',
'Rd.' : 'Road',
'St' : 'Street',
'street' :"Street",
'Cir' : "Circle",
'ave' : 'Avenue',
'Hwg' : 'Highway',
'Hwy' : 'Highway',
'Sq' : "Square",
'NO.' : "No.",
'Sec' : "Sector",
'nankpura' : "Nanakpura",
'Mg' : "Marg",
'Mr.' : "Marg",
"Have Khas" : "Hauzkhas",
"Hauz Khas" : "Hauzkhas",
}
def audit_street_type(street_types, street_name):
m = street_type_re.search(street_name)
if m:
street_type = m.group()
if street_type not in expected:
street_types[street_type].add(street_name)
def is_street_name(elem):
return (elem.attrib['k'] == "addr:street") #Function to check value to be a street name
def audit(osmfile):
osm_file = open(osmfile, "r")
street_types = defaultdict(set)
for event, elem in ET.iterparse(osm_file, events=("start",)):
if elem.tag == "node" or elem.tag == "way":
for tag in elem.iter("tag"):
if is_street_name(tag):
audit_street_type(street_types, tag.attrib['v'])
return street_types
sort_street_types = audit(file_name)
print 'Done'
audit_street_type() This function searches the i/p string for the regex. If there is a match and it is not within the "expected" list, add the match as a key and add the string to the set.
is_street_name() This function checks if the key is "addr:street" i.e, streetname details record.
audit() This will return a list that matches previous two functions. Using this we can understand and correct our street names.
pprint.pprint(dict(sort_street_types))
#Function to update values with the mapping dictionary
def update_name(name, mapping, regex):
m = regex.search(name)
if m:
street_type = m.group()
if street_type in mapping:
name = re.sub(regex, mapping[street_type], name)
return name
return name
for street_type, ways in sort_street_types.iteritems():
for name in ways:
better_name = update_name(name, mapping, street_type_re)
if better_name:
print name, "=>", better_name
Using update_name() we have updated the street names wiht correct values.
def audit_zipcode(invalid_zipcodes, zipcode):
twoDigits = zipcode[0:4]
if len(zipcode) != 6: #if zipcode is not of length =6, add it to invalid_zipcode dictionary
invalid_zipcodes[twoDigits].add(zipcode)
elif not twoDigits.isdigit(): #if zipcode is not made of just digits, add it to invalid_zipcode dictionary
invalid_zipcodes[twoDigits].add(zipcode)
elif twoDigits != '1100': #if zipcode does not start wiht 1100, add it to invalid_zipcode dictionary
invalid_zipcodes[twoDigits].add(zipcode)
def is_zipcode(elem):
return (elem.attrib['k'] == "addr:postcode")
def audit_zip(osmfile):
osm_file = open(osmfile, "r")
invalid_zipcodes = defaultdict(set)
for event, elem in ET.iterparse(osm_file, events=("start",)):
if elem.tag == "node" or elem.tag == "way":
for tag in elem.iter("tag"):
if is_zipcode(tag):
audit_zipcode(invalid_zipcodes,tag.attrib['v'])
return invalid_zipcodes
sort_zipcode = audit_zip(file_name)
pprint.pprint(dict(sort_zipcode))
audit_zipcode() This function checks the i/p zipcode with various conditions. If there is a match and it is not follow the conditions, add the match as a key and add the zipcode to the set.
is_zipcode() This function checks if the key is "addr:postcode"
audit_zip() This will return a list that matches previous two functions.Using this we can understand & correct our zipcodes.
update_zip() This function is updating the values of zipcode with correct values if its a total error the it wil replace values with None.
def update_zip(zipcode):
if zipcode[0:4] == '110 ':
zipcode = zipcode.replace(" ","")
return zipcode
elif zipcode[0:4] != '1100':
zipcode = 'None'
return zipcode
elif zipcode[0:4] == '1100' and len(zipcode) >6:
zipcode = zipcode.replace("11000","1100")
return zipcode
for street_type, ways in sort_zipcode.iteritems():
for name in ways:
if update_zip(name):
#sortname = update_name(name)
#better_name =name
old_name = name
better_name = update_zip(name)
name = better_name
print old_name, "=>", better_name
In the following code snippet we will clean the file and seprate data. We will save data of nodes tag in nodes.csv , tags tag in nodes_tags.csv , ways tag in ways.csv, ways nodes details in ways_nodes.csv, tags tag under ways tag in ways_tags.csv
tags_clean() function is for setting the value of id, key, value and type. Here, changes in postcode and street names are add to dict.
OSM_PATH = file_name
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']
SCHEMA = schema.schema
LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')
def tags_clean(id , tag ):
node_tagss = {}
node_tagss['id'] = int(id)
if tag.attrib['k'] == "addr:street":
node_tagss['value'] = update_name(tag.attrib['v'], mapping, street_type_re)
elif tag.attrib['k'] == "addr:postcode":
node_tagss['value'] = update_zip(tag.attrib['v'])
else:
node_tagss['value'] = tag.attrib['v']
if ":" not in tag.attrib['k']:
node_tagss['key'] = tag.attrib['k']
node_tagss['type'] = 'regular'
else:
pcolon = tag.attrib['k'].index(":") + 1
node_tagss['key'] = tag.attrib['k'][pcolon:]
node_tagss['type'] = tag.attrib['k'][:pcolon - 1]
return node_tagss
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
problem_chars=PROBLEMCHARS, default_tag_type='regular'):
"""Clean and shape node or way XML element to Python dict"""
node_attribs = {}
way_attribs = {}
way_nodes = []
tags = [] # Handle secondary tags the same way for both node and way elements
node_tagss = {}
# YOUR CODE HERE
if element.tag == 'node':
for node in NODE_FIELDS:
node_attribs[node] = element.attrib[node]
node_attribs['id']= int(node_attribs['id'])
node_attribs['uid']= int(node_attribs['uid'])
node_attribs['changeset']= int(node_attribs['changeset'])
node_attribs['lon']= float(node_attribs['lon'])
node_attribs['lat']= float(node_attribs['lat'])
for tag in element.iter("tag"):
tag_clean ={}
if PROBLEMCHARS.search(tag.attrib['k']) == None:
node_tagss = tags_clean(node_attribs['id'] , tag )
tags.append(node_tagss)
#tag_clean = clean(tag)
#if tag_clean:
# tags.append(tag_clean)
if node_attribs:
return {'node': node_attribs, 'node_tags': tags}
else:
return None
elif element.tag == 'way':
for way in WAY_FIELDS:
way_attribs[way] = element.attrib[way]
way_attribs['id']= int(way_attribs['id'])
way_attribs['uid']= int(way_attribs['uid'])
way_attribs['changeset']= int(way_attribs['changeset'])
for tag in element.iter("tag"):
tag_clean ={}
if PROBLEMCHARS.search(tag.attrib['k']) == None:
node_tagss = tags_clean(way_attribs['id'] , tag )
tags.append(node_tagss)
#tag_clean = clean(tag)
#if tag_clean:
# tags.append(tag_clean)
count =0
for nodes in element.iter("nd"):
wnd = {}
wnd['id'] = int(way_attribs['id'])
wnd['node_id'] = int(nodes.attrib['ref'])
wnd['position'] = count
count += 1
way_nodes.append(wnd)
if way_attribs:
return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
else:
return None
# ================================================== #
# Helper Functions #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
"""Yield element if it is the right type of tag"""
context = ET.iterparse(osm_file, events=('start', 'end'))
_, root = next(context)
for event, elem in context:
if event == 'end' and elem.tag in tags:
yield elem
root.clear()
def validate_element(element, validator, schema=SCHEMA):
"""Raise ValidationError if element does not match schema"""
if validator.validate(element, schema) is not True:
field, errors = next(validator.errors.iteritems())
message_string = "\nElement of type '{0}' has the following errors:\n{1}"
error_strings = (
"{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
for k, v in errors.iteritems()
)
raise cerberus.ValidationError(
message_string.format(field, "\n".join(error_strings))
)
class UnicodeDictWriter(csv.DictWriter, object):
"""Extend csv.DictWriter to handle Unicode input"""
def writerow(self, row):
super(UnicodeDictWriter, self).writerow({
k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
})
def writerows(self, rows):
for row in rows:
self.writerow(row)
# ================================================== #
# Main Function #
# ================================================== #
def process_map(file_in, validate):
"""Iteratively process each XML element and write to csv(s)"""
with codecs.open(NODES_PATH, 'w') as nodes_file, \
codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
codecs.open(WAYS_PATH, 'w') as ways_file, \
codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:
nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)
nodes_writer.writeheader()
node_tags_writer.writeheader()
ways_writer.writeheader()
way_nodes_writer.writeheader()
way_tags_writer.writeheader()
validator = cerberus.Validator()
for element in get_element(file_in, tags=('node', 'way')):
el = shape_element(element)
if el:
if validate is True:
validate_element(el, validator)
if element.tag == 'node':
nodes_writer.writerow(el['node'])
node_tags_writer.writerows(el['node_tags'])
elif element.tag == 'way':
ways_writer.writerow(el['way'])
way_nodes_writer.writerows(el['way_nodes'])
way_tags_writer.writerows(el['way_tags'])
if __name__ == '__main__':
# Note: Validation is ~ 10X slower. For the project consider using a small
# sample of the map when validating.
process_map(OSM_PATH, validate=False)
print "Done"
Now, we have 5 csv files containing wrangled data. We can perform analysis on this data set by importing these to our sqlite database. In the next section, we have performed some analysis using sqlite.
File Sizes
Number of nodes
sqlite> SELECT Count(*)
...> FROM nodes;
364336
Number of ways
sqlite> SELECT Count(*)
...> FROM ways;
66280
Number of users
sqlite> select count(distinct(user.uid))
...> from (select uid from nodes
...> union all
...> select uid from ways) user;
375
Top 5 Active users
sqlite> select user.user, count(*)
...> from (select user from nodes
...> union all
...> select user from ways) user
...> group by user.user
...> order by count(*) DESC
...> limit 5;
bindhu|38740
sdivya|38651
vamshikrishna|33491
Ashok09|29883
venkatkotha|23149
Number of users with single post
sqlite> select count(*) from (select users.user, count(*)
...> from (select user from nodes
...> union all
...> select user from ways) users
...> group by users.user having count(*) ==1) ;
90
Number of Dominos
sqlite> select count(*) from nodes_tags where value Like "domino%";
3
Number of Dominos
sqlite> select count(*) from nodes_tags where value Like "kfc_";
1
Banks in South Delhi Region
sqlite> select nt.value, count(*)
...> from nodes_tags as nt
...> join
...> ( select distinct(id)
...> from nodes_tags
...> where value ='bank') bank
...> on nt.id = bank.id
...> where nt.key = 'name'
...> group by nt.value
...> order by count(*) DESC;
Andhra Bank|1
CITI Bank|1
HDFC|1
HDFC Bank|1
ICICI Bank|1
ICICI, SBI, Citibank,|1
IDBI Bank|1
Indian Bank|1
Punjab National Bank|1
Standard Chartered Bank|1
State Bank of India|1
Number of Bus Stops
sqlite> select count(*)
...> from nodes_tags
...> where key="highway" and value ="bus_stop";
69
Number of Traffic Signals
sqlite> select count(*)
...> from nodes_tags
...> where key="highway" and value ="traffic_signals";
150
Cusines in South Delhi Resturants
sqlite> select nt.value, count(*)
...> from nodes_tags as nt
...> join
...> (select distinct(id)
...> from nodes_tags
...> where value='restaurant') res
...> on nt.id =res.id
...> where nt.key = 'cuisine'
...> group by nt.value
...> order by count(*) DESC;
indian|2
regional|2
thai|2
asian|1
pizza|1
In Delhi(Capital of India) most of the people living here are not native resident of this city. So, there is a huge diversity among people. Here we have done some data wrangling and analysis work on OSM data set of south delhi region.
Scripts which may run twice or thrice a day for cleaning and updating the collected data. This will help users to use only well cleaned data. The data is been updated by millions of people on daily basis, so if it may provide a structured form for all input fields according to a database structure. Then there will be no such requirement of so much cleaning. OSM can also make this data collection task a game by incentivizing this whole process. Just like the way AWS Mechanical Turk works. Then OSM data will contain only validated data .
Implentation of these changes will result by helping users of Open Street Map with clean data.
Data Scientists will be able to get good insights from the data. These changes will save their majority of time since they will get clean data.
There are some issues that may arise for example writing scripts and running it on large data sets will take time, resources and money. We will need to update our scripts after regular interval of somewhere around 10-15 days. For this specialized team of engineers will be required. Incentivizing the whole process may lead to good results but money required for incentivizing is a major issue. Here it is required to check whether a real customers is improving the dataset or not. Fraud users and cross validation of data needs to be performed properly.
Udacity Discussion Forums