Data Exploration using Hadoop MapReduce

PROBLEM STATEMENT :

From a given set of Flight departure and arrival data,

  1. Find maximum departure delay from each origin airport
  2. Find minimum arrival delay by origin-destination airport
  3. Find the average arrival delay by flight number.

File to use : Flight Data.

The MapReduce code is as follows :

Mapper code for maximum departure delay from each origin airport :

#!/usr/bin/env python

import sys
import csv

SEP = "\t"

class Mapper(object):

	def __init__(self, stream, sep = SEP):
		self.stream = stream
		self.sep = SEP

	def emit(self, key, value):
		sys.stdout.write("{0}{1}{2}\n".format(key, self.sep, value))

	def map(self):
		for row in self:
			self.emit(row[3], row[6])

	def __iter__(self):
		reader = csv.reader(self.stream)
		for row in reader:
			yield row

if __name__ == "__main__":
	mapper = Mapper(sys.stdin)
	mapper.map()</pre>
<pre>

Reducer code for maximum departure delay from each origin airport :

</pre>
<pre>#!/usr/bin/env python

import sys

from itertools import groupby
from operator import itemgetter

SEP = "\t"

class Reducer(object):

	def __init__(self, stream, sep = SEP):
		self.stream = stream
		self.sep = sep

	def emit(self, key, value):
		sys.stdout.write("{0}{1}{2}\n".format(key, self.sep, value))

	def reduce(self):
		for current, group in groupby(self, itemgetter(0)):
			maxdel = -100000

			for item in group:
				if item[1] > maxdel:
					maxdel = item[1]
			self.emit(current, float(maxdel))

	def __iter__(self):
		for line in self.stream:
			try:
				parts = line.split(self.sep)
				yield parts[0], float(parts[1])
			except:
				continue

if __name__ == '__main__':
	reducer = Reducer(sys.stdin)
	reducer.reduce()
</pre>
<pre>

Mapper code for minimum arrival delay by origin-destination airport :

</pre>
<pre>#!/usr/bin/env python

import sys
import csv

SEP = "\t"

class Mapper(object):

	def __init__(self, stream, sep = SEP):
		self.stream = stream
		self.sep = SEP

	def emit(self, key, value):
		sys.stdout.write("{0}{1}{2}\n".format(key, self.sep, value))

	def map(self):
		for row in self:
			self.emit(row[2], row[8])

	def __iter__(self):
		reader = csv.reader(self.stream)
		for row in reader:
			yield row

if __name__ == "__main__":
	mapper = Mapper(sys.stdin)
	mapper.map()
</pre>
<pre>
Reducer code for minimum arrival delay by origin-destination airport :

</pre>
<pre>#!/usr/bin/env python

import sys

from itertools import groupby
from operator import itemgetter

SEP = "\t"

class Reducer(object):

	def __init__(self, stream, sep = SEP):
		self.stream = stream
		self.sep = sep

	def emit(self, key, value):
		sys.stdout.write("{0}{1}{2}\n".format(key, self.sep, value))

	def reduce(self):
		for current, group in groupby(self, itemgetter(0)):
			total = 0
			count = 0

			for item in group:
				total += item[1]
				count += 1
			self.emit(current, float(total) / float(count))

	def __iter__(self):
		for line in self.stream:
			try:
				parts = line.split(self.sep)
				yield parts[0], float(parts[1])
			except:
				continue

if __name__ == '__main__':
	reducer = Reducer(sys.stdin)
	reducer.reduce()
</pre>
<pre>
Mapper code for the average arrival delay by flight # :

</pre>
<pre>#!/usr/bin/env python

import sys
import csv

SEP = "\t"

class Mapper(object):

	def __init__(self, stream, sep = SEP):
		self.stream = stream
		self.sep = SEP

	def emit(self, key, value):
		sys.stdout.write("{0}{1}{2}\n".format(key, self.sep, value))

	def map(self):
		for row in self:
			self.emit(row[3] + '-' + row[4], row[8])

	def __iter__(self):
		reader = csv.reader(self.stream)
		for row in reader:
			yield row

if __name__ == "__main__":
	mapper = Mapper(sys.stdin)
	mapper.map()</pre>
<pre>
Reducer code for the average arrival delay by flight # :

</pre>
<pre>#!/usr/bin/env python

import sys

from itertools import groupby
from operator import itemgetter

SEP = "\t"

class Reducer(object):

	def __init__(self, stream, sep = SEP):
		self.stream = stream
		self.sep = sep

	def emit(self, key, value):
		sys.stdout.write("{0}{1}{2}\n".format(key, self.sep, value))

	def reduce(self):
		for current, group in groupby(self, itemgetter(0)):
			mindel = 100000

			for item in group:
				if item[1] < mindel:
					mindel = item[1]
			self.emit(current, float(mindel))

	def __iter__(self):
		for line in self.stream:
			try:
				parts = line.split(self.sep)
				yield parts[0], float(parts[1])
			except:
				continue

if __name__ == '__main__':
	reducer = Reducer(sys.stdin)
	reducer.reduce()
</pre>
<pre>
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s