代码开发

UnitTest

Spark的任务需要允许在Spark的上下文中,可以用一个叫做FindSpark的包来辅助:

pip install findspark

在初始化上下文之前,需要调用findspark.init()方法,它会自动地根据本地的SPARK_HOME环境变量来定位到Spark的Python依赖包。

import unittest2
import logging
import findspark
findspark.init()
from pyspark.context import SparkContext
class ExampleTest(unittest2.TestCase):
def setUp(self):
self.sc = SparkContext('local[4]')
quiet_logs(self.sc)
def tearDown(self):
self.sc.stop()
def test_something(self):
# start by creating a mockup dataset
l = [(1, 'hello'), (2, 'world'), (3, 'world')]
# and create a RDD out of it
rdd = self.sc.parallelize(l)
# pass it to the transformation you're unit testing
result = non_trivial_transform(rdd)
# collect the results
output = result.collect()
# since it's unit test let's make an assertion
self.assertEqual(output[0][1], 2)
def non_trivial_transform(rdd):
""" a transformation to unit test (word count) - defined here for convenience only"""
return rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)
if __name__ == "__main__":
unittest2.main()