31 KiB
PySpark 和大数据处理的第一步
面对数据量太大而无法在单台机器上处理的情况越来越常见。幸运的是,Apache Spark、Hadoop 和其他技术已经被开发出来解决这个问题。使用 PySpark 可以直接从 Python 中利用这些系统的能力!
高效地处理千兆字节甚至更多的数据集是任何 Python 开发人员都能够做到的事情,无论你是数据科学家、web 开发人员,还是介于两者之间的任何人。
在本教程中,您将学习:
- 哪些 Python 概念可以应用于大数据
- 如何使用 Apache Spark 和 PySpark
- 如何编写基本的 PySpark 程序
- 如何在本地小数据集上运行 PySpark 程序
- 将您的 PySpark 技能带到分布式系统的下一步该去哪里
免费下载: 从 Python 技巧中获取一个示例章节:这本书用简单的例子向您展示了 Python 的最佳实践,您可以立即应用它来编写更漂亮的+Python 代码。
Python 中的大数据概念
尽管作为只是一种脚本语言而广受欢迎,Python 公开了几种编程范例,如面向数组编程、面向对象编程、异步编程以及许多其他编程范例。有抱负的大数据专业人士特别感兴趣的一个范例是函数式编程。
在处理大数据时,函数式编程是一种常见的范式。以函数的方式编写会导致令人尴尬的并行代码。这意味着更容易获得代码,并让它在几个 CPU 上运行,甚至在完全不同的机器上运行。通过同时在多个系统上运行,您可以避开单个工作站的物理内存和 CPU 限制。
这就是 PySpark 生态系统的强大之处,它允许您获取功能代码,并自动将其分发到整个计算机集群。
幸运的是,对于 Python 程序员来说,函数式编程的许多核心思想都可以在 Python 的标准库和内置库中找到。您可以学习大数据处理所需的许多概念,而无需离开 Python 的舒适环境。
函数式编程的核心思想是数据应该由函数操作,而不需要维护任何外部状态。这意味着你的代码避免了全局变量,并且总是返回新数据,而不是就地操作数据。
函数式编程中另一个常见的想法是匿名函数。Python 使用 lambda关键字公开匿名函数,不要与 AWS Lambda 函数混淆。
现在您已经了解了一些术语和概念,您可以探索这些概念是如何在 Python 生态系统中体现的。
λ函数
Python 中的 lambda 函数是内联定义的,仅限于单个表达式。你可能在使用内置的 sorted() 函数时见过lambda函数:
>>> x = ['Python', 'programming', 'is', 'awesome!']
>>> print(sorted(x))
['Python', 'awesome!', 'is', 'programming']
>>> print(sorted(x, key=lambda arg: arg.lower()))
['awesome!', 'is', 'programming', 'Python']
为可迭代中的每一项调用sorted的key参数。这使得排序不区分大小写,在排序发生之前,将所有的字符串都变成小写的*。*
这是lambda函数的常见用例,小型匿名函数不维护外部状态。
Python 中还存在其他常见的函数式编程函数,如filter()、、map()、和reduce()。所有这些功能都可以以类似的方式使用lambda功能或用def定义的标准功能。
filter()、map()、reduce()、T3】
内置的 filter() 、 map() 、 reduce() 函数都是函数式编程中常见的。您将很快看到这些概念可以构成 PySpark 程序功能的重要部分。
在核心 Python 上下文中理解这些函数很重要。然后,您将能够将这些知识转化为 PySpark 程序和 Spark API。
filter()根据条件从 iterable 中过滤出项目,通常表示为lambda函数:
>>> x = ['Python', 'programming', 'is', 'awesome!']
>>> print(list(filter(lambda arg: len(arg) < 8, x)))
['Python', 'is']
filter()接受一个 iterable,对每个项目调用lambda函数,并返回lambda返回True的项目。
**注意:**调用list()是必需的,因为filter()也是可迭代的。filter()只给出你循环时的值。list()将所有项目一次强制存入内存,而不是使用循环。
你可以想象使用filter()来替换一个常见的 for循环模式,如下所示:
def is_less_than_8_characters(item):
return len(item) < 8
x = ['Python', 'programming', 'is', 'awesome!']
results = []
for item in x:
if is_less_than_8_characters(item):
results.append(item)
print(results)
这段代码收集所有少于 8 个字符的字符串。代码比filter()示例更加冗长,但是它执行相同的功能,得到相同的结果。
filter()的另一个不太明显的好处是它返回一个 iterable。这意味着filter()不需要你的计算机有足够的内存来一次保存 iterable 中的所有条目。对于可以快速增长到几千兆字节大小的大数据集来说,这变得越来越重要。
map()与filter()的相似之处在于,它将函数应用于 iterable 中的每一项,但它总是产生原始项的一对一映射。map()返回的新 iterable 将始终具有与原始 iterable 相同数量的元素,而filter()则不是这样:
>>> x = ['Python', 'programming', 'is', 'awesome!']
>>> print(list(map(lambda arg: arg.upper(), x)))
['PYTHON', 'PROGRAMMING', 'IS', 'AWESOME!']
map()自动调用所有项目上的lambda函数,有效地取代了如下的for循环:
results = []
x = ['Python', 'programming', 'is', 'awesome!']
for item in x:
results.append(item.upper())
print(results)
for循环的结果与map()示例相同,它以大写形式收集所有项目。然而,与filter()的例子一样,map()返回一个 iterable,这又使得处理大到无法完全放入内存的大型数据集成为可能。
最后,Python 标准库中最后一个函数三重奏是 reduce() 。与filter()和map()一样,reduce()将函数应用于 iterable 中的元素。
同样,所应用的函数可以是使用 def关键字创建的标准 Python 函数,也可以是lambda函数。
然而,reduce()并不返回新的 iterable。相反,reduce()使用名为的函数将 iterable 简化为一个值:
>>> from functools import reduce
>>> x = ['Python', 'programming', 'is', 'awesome!']
>>> print(reduce(lambda val1, val2: val1 + val2, x))
Pythonprogrammingisawesome!
这段代码将 iterable 中的所有项目从左到右组合成一个项目。这里没有对list()的调用,因为reduce()已经返回了一个条目。
注: Python 3.x 将内置的reduce()函数移到了functools包中。
lambda、map()、filter()和reduce()是存在于许多语言中的概念,可以在常规的 Python 程序中使用。很快,您将看到这些概念扩展到 PySpark API 来处理大量数据。
设置
集合是标准 Python 中的另一个常见功能,在大数据处理中非常有用。集合与列表非常相似,只是它们没有任何顺序,并且不能包含重复值。你可以把集合想象成类似于 Python 字典中的键。
PySpark 的 hello World
如同任何优秀的编程教程一样,您会希望从一个Hello World示例开始。下面是 PySpark 当量:
import pyspark
sc = pyspark.SparkContext('local[*]')
txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())
python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())
先不要担心所有的细节。主要思想是记住 PySpark 程序与常规 Python 程序没有太大区别。
**注意:**如果您还没有安装 PySpark 或者没有指定的copyright文件,这个程序可能会在您的系统上引发一个异常,稍后您将看到如何做。
你很快就会了解到这个程序的所有细节,不过还是好好看看吧。该程序计算总行数和在名为copyright的文件中包含单词python的行数。
请记住,PySpark 程序与常规 Python 程序没有太大的不同,但是的执行模型可能与常规 Python 程序有很大的不同,尤其是当您在集群上运行时。
如果您在一个集群上,在后台可能会发生许多事情,将处理分布在多个节点上。但是,现在,请将该程序视为使用 PySpark 库的 Python 程序。
既然您已经看到了 Python 中存在的一些常见函数概念以及一个简单的 PySpark 程序,那么是时候更深入地研究 Spark 和 PySpark 了。
什么是火花?
Apache Spark 是由几个组件组成的,所以描述它可能会很困难。从本质上来说,Spark 是一个通用的 T2 引擎,用于处理大量数据。
Spark 用 Scala 编写,运行在 JVM 上。Spark 内置了用于处理流数据、机器学习、图形处理甚至通过 SQL 与数据交互的组件。
在本指南中,您将仅了解用于处理大数据的核心 Spark 组件。然而,所有其他组件,比如机器学习、SQL 等等,都可以通过 PySpark 用于 Python 项目。
PySpark 是什么?
Spark 是在 Scala 中实现的,Scala 是一种在 JVM 上运行的语言,那么如何通过 Python 访问所有这些功能呢?
PySpark 就是答案。
PySpark 的当前版本是 2.4.3,可以与 Python 2.7、3.3 和更高版本一起工作。
您可以将 PySpark 视为 Scala API 之上的一个基于 Python 的包装器。这意味着您有两套文档可供参考:
PySpark API 文档中有一些例子,但是通常你会想要参考 Scala 文档,并为你的 PySpark 程序将代码翻译成 Python 语法。幸运的是,Scala 是一种可读性很强的基于函数的编程语言。
PySpark 通过 Py4J 库与基于 Spark Scala 的 API 通信。Py4J 不是 PySpark 或 Spark 特有的。Py4J 允许任何 Python 程序与基于 JVM 的代码对话。
PySpark 基于函数范式有两个原因:
- Spark 的原生语言 Scala 是基于函数的。
- 功能代码更容易并行化。
另一种看待 PySpark 的方式是一个允许在单台机器或一组机器上处理大量数据的库。
在 Python 环境中,想想 PySpark 有一种处理并行处理的方法,不需要threading或multiprocessing模块。所有线程、进程甚至不同 CPU 之间复杂的通信和同步都由 Spark 处理。
PySpark API 和数据结构
要与 PySpark 交互,您需要创建称为弹性分布式数据集 (RDDs)的专用数据结构。
如果您在集群上运行,rdd 隐藏了调度程序在多个节点之间自动转换和分发数据的所有复杂性。
为了更好地理解 PySpark 的 API 和数据结构,回想一下前面提到的Hello World程序:
import pyspark
sc = pyspark.SparkContext('local[*]')
txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())
python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())
任何 PySpark 程序的入口点都是一个SparkContext对象。该对象允许您连接到 Spark 集群并创建 rdd。local[*]字符串是一个特殊的字符串,表示您正在使用一个本地集群,也就是说您正在单机模式下运行。*告诉 Spark 在你的机器上创建和逻辑内核一样多的工作线程。
当您使用集群时,创建一个SparkContext会更加复杂。要连接到 Spark 集群,您可能需要处理认证和其他一些特定于集群的信息。您可以按如下方式设置这些详细信息:
conf = pyspark.SparkConf()
conf.setMaster('spark://head_node:56887')
conf.set('spark.authenticate', True)
conf.set('spark.authenticate.secret', 'secret-key')
sc = SparkContext(conf=conf)
一旦有了SparkContext,就可以开始创建 rdd。
可以用多种方式创建 rdd,但一种常见的方式是 PySpark parallelize()函数。parallelize()可以将一些 Python 数据结构(如列表和元组)转换成 rdd,这为您提供了使它们容错和分布式的功能。
为了更好地理解 rdd,考虑另一个例子。下面的代码创建了一个包含 10,000 个元素的迭代器,然后使用parallelize()将数据分布到两个分区中:
>>> big_list = range(10000)
>>> rdd = sc.parallelize(big_list, 2)
>>> odds = rdd.filter(lambda x: x % 2 != 0)
>>> odds.take(5)
[1, 3, 5, 7, 9]
parallelize()将迭代器转换成一个分布式数字集,并为您提供 Spark 基础设施的所有功能。
请注意,这段代码使用了 RDD 的filter()方法,而不是 Python 的内置filter(),您在前面已经看到了。结果是一样的,但是幕后发生的事情却截然不同。通过使用 RDD filter()方法,该操作以分布式方式在几个 CPU 或计算机上进行。
再一次,把这想象成 Spark 为你做multiprocessing工作,所有这些都封装在 RDD 数据结构中。
take()是查看 RDD 内容的一种方式,但只是一个很小的子集。take()将数据子集从分布式系统拉到一台机器上。
take()对于调试很重要,因为在一台机器上检查整个数据集是不可能的。rdd 针对大数据进行了优化,因此在现实世界中,一台机器可能没有足够的 RAM 来容纳整个数据集。
**注意:**在 shell 中运行这样的例子时,Spark 会暂时将信息打印到stdout中,您很快就会看到如何做了。您的stdout可能会暂时显示类似于[Stage 0:> (0 + 1) / 1]的内容。
stdout文本演示了 Spark 如何将 rdd 拆分,并跨不同的 CPU 和机器将您的数据处理成多个阶段。
创建 rdd 的另一种方法是用textFile()读入一个文件,这在前面的例子中已经看到过。rdd 是使用 PySpark 的基础数据结构之一,因此 API 中的许多函数都返回 rdd。
rdd 和其他数据结构之间的一个关键区别是处理被延迟,直到请求结果。这类似于一个 Python 生成器。Python 生态系统中的开发人员通常使用术语懒惰评估来解释这种行为。
您可以在同一个 RDD 上堆叠多个转换,而不进行任何处理。这个功能是可能的,因为 Spark 维护了一个转换的有向无环图。底层图形只有在请求最终结果时才被激活。在前面的例子中,在您通过调用take()请求结果之前,不会进行任何计算。
有多种方法可以从自动 RDD 获取结果。通过在 RDD 上使用collect(),您可以显式地请求对结果进行评估并收集到单个集群节点。您还可以通过各种方式隐式请求结果,其中一种方式是使用前面看到的count()。
**注意:**使用这些方法时要小心,因为它们会将整个数据集拉入内存,如果数据集太大而无法放入单台机器的 RAM 中,这种方法就不起作用。
再次,参考 PySpark API 文档以获得更多关于所有可能功能的细节。
安装 PySpark
通常,您将在 Hadoop 集群上运行 PySpark 程序,但是也支持其他集群部署选项。你可以阅读 Spark 的集群模式概述了解更多详情。
**注意:**设置其中一个集群可能会很困难,这超出了本指南的范围。理想情况下,你的团队有一些向导开发人员工程师来帮助工作。如果没有,Hadoop 发布了指南来帮助你。
在本指南中,您将看到在本地机器上运行 PySpark 程序的几种方法。这对于测试和学习非常有用,但是您很快就会想要使用新程序并在集群上运行它们,以真正处理大数据。
有时,由于所有必需的依赖项,单独设置 PySpark 也很有挑战性。
PySpark 运行在 JVM 之上,需要大量底层的 Java 基础设施才能运行。也就是说,我们生活在码头工人的时代,这使得 PySpark 的实验更加容易。
更好的是, Jupyter 背后令人惊叹的开发人员已经为您完成了所有繁重的工作。他们发布了一个 Dockerfile ,其中包含了所有 PySpark 依赖项以及 Jupyter。所以,你可以直接在 Jupyter 笔记本上做实验!
注: Jupyter 笔记本的功能很多。查看 Jupyter 笔记本:简介了解更多关于如何有效使用笔记本的细节。
首先,你需要安装 Docker。看看 Docker 的运行——更健康、更快乐、更有效率,如果你还没有安装 Docker 的话。
**注意:**Docker 图像可能会非常大,所以请确保您可以使用大约 5gb 的磁盘空间来使用 PySpark 和 Jupyter。
接下来,您可以运行以下命令来下载并自动启动一个 Docker 容器,其中包含一个预构建的 PySpark 单节点设置。这个命令可能需要几分钟,因为它直接从 DockerHub 下载图像以及 Spark、PySpark 和 Jupyter 的所有要求:
$ docker run -p 8888:8888 jupyter/pyspark-notebook
一旦该命令停止打印输出,您就拥有了一个正在运行的容器,其中包含了在单节点环境中测试 PySpark 程序所需的一切。
要停止你的容器,在你输入docker run命令的同一个窗口中输入 Ctrl + C 。
现在终于要运行一些程序了!
运行 PySpark 程序
执行 PySpark 程序的方法有很多种,这取决于您喜欢命令行还是更直观的界面。对于命令行界面,您可以使用spark-submit命令、标准 Python shell 或专门的 PySpark shell。
首先,你会看到 Jupyter 笔记本更直观的界面。
Jupyter 笔记型电脑
您可以在 Jupyter 笔记本中运行您的程序,方法是运行以下命令来启动您之前下载的 Docker 容器(如果它尚未运行):
$ docker run -p 8888:8888 jupyter/pyspark-notebook
Executing the command: jupyter notebook
[I 08:04:22.869 NotebookApp] Writing notebook server cookie secret to /home/jovyan/.local/share/jupyter/runtime/notebook_cookie_secret
[I 08:04:25.022 NotebookApp] JupyterLab extension loaded from /opt/conda/lib/python3.7/site-packages/jupyterlab
[I 08:04:25.022 NotebookApp] JupyterLab application directory is /opt/conda/share/jupyter/lab
[I 08:04:25.027 NotebookApp] Serving notebooks from local directory: /home/jovyan
[I 08:04:25.028 NotebookApp] The Jupyter Notebook is running at:
[I 08:04:25.029 NotebookApp] http://(4d5ab7a93902 or 127.0.0.1):8888/?token=80149acebe00b2c98242aa9b87d24739c78e562f849e4437
[I 08:04:25.029 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 08:04:25.037 NotebookApp]
To access the notebook, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/nbserver-6-open.html
Or copy and paste one of these URLs:
http://(4d5ab7a93902 or 127.0.0.1):8888/?token=80149acebe00b2c98242aa9b87d24739c78e562f849e4437
现在您有了一个运行 PySpark 的容器。注意,docker run命令输出的结尾提到了一个本地 URL。
注意:docker命令的输出在每台机器上会略有不同,因为令牌、容器 id 和容器名称都是随机生成的。
您需要使用该 URL 在 web 浏览器中连接到运行 Jupyter 的 Docker 容器。将 URL 从您的输出中直接复制并粘贴到您的网络浏览器中。以下是您可能会看到的 URL 示例:
$ http://127.0.0.1:8888/?token=80149acebe00b2c98242aa9b87d24739c78e562f849e4437
以下命令中的 URL 在您的计算机上可能会稍有不同,但是一旦您在浏览器中连接到该 URL,您就可以访问 Jupyter 笔记本环境,该环境应该类似于以下内容:
从 Jupyter 笔记本页面,您可以使用最右边的新建按钮来创建一个新的 Python 3 shell。然后你可以测试一些代码,就像之前的Hello World例子:
import pyspark
sc = pyspark.SparkContext('local[*]')
txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())
python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())
下面是 Jupyter 笔记本中运行该代码的样子:
这里的幕后发生了很多事情,所以您的结果可能需要几秒钟才能显示。单击单元格后,答案不会立即出现。
命令行界面
命令行界面提供了多种提交 PySpark 程序的方式,包括 PySpark shell 和spark-submit命令。要使用这些 CLI 方法,首先需要连接到安装了 PySpark 的系统的 CLI。
要连接到 Docker 设置的 CLI,您需要像以前一样启动容器,然后连接到该容器。同样,要启动容器,您可以运行以下命令:
$ docker run -p 8888:8888 jupyter/pyspark-notebook
一旦运行了 Docker 容器,就需要通过 shell 而不是 Jupyter 笔记本来连接它。为此,运行以下命令来查找容器名称:
$ docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4d5ab7a93902 jupyter/pyspark-notebook "tini -g -- start-no…" 12 seconds ago Up 10 seconds 0.0.0.0:8888->8888/tcp kind_edison
这个命令将显示所有正在运行的容器。找到运行jupyter/pyspark-notebook映像的容器的CONTAINER ID,用它连接到容器内的bash外壳*:*
$ docker exec -it 4d5ab7a93902 bash
jovyan@4d5ab7a93902:~$
现在你应该连接到容器内的bash提示*。您可以验证事情正在运行,因为您的 shell 的提示符将变成类似于jovyan@4d5ab7a93902的东西,但是使用您的容器的惟一 ID。*
**注意:**用您机器上使用的CONTAINER ID替换4d5ab7a93902。
集群
您可以使用与 Spark 一起安装的spark-submit命令,通过命令行向集群提交 PySpark 代码。该命令采用 PySpark 或 Scala 程序,并在集群上执行。这可能是您执行真正的大数据处理工作的方式。
**注意:**这些命令的路径取决于 Spark 的安装位置,并且可能只在使用引用的 Docker 容器时才起作用。
要使用正在运行的 Docker 容器运行Hello World示例(或任何 PySpark 程序),首先如上所述访问 shell。一旦你进入容器的外壳环境,你可以使用纳米文本编辑器创建文件。
要在当前文件夹中创建文件,只需使用您想要创建的文件名启动nano:
$ nano hello_world.py
键入Hello World示例的内容,并通过键入 Ctrl + X 并按照保存提示保存文件:
最后,您可以使用pyspark-submit命令通过 Spark 运行代码:
$ /usr/local/spark/bin/spark-submit hello_world.py
默认情况下,这个命令会导致大量的输出,所以可能很难看到你程序的输出。通过改变SparkContext变量的级别,可以在 PySpark 程序内部控制日志的详细程度。为此,请将这一行放在脚本顶部附近:
sc.setLogLevel('WARN')
这将省略spark-submit输出中的部分,这样你可以更清楚地看到你程序的输出。然而,在真实的场景中,您会希望将任何输出放入文件、数据库或其他存储机制中,以便于以后的调试。
幸运的是,PySpark 程序仍然可以访问 Python 的所有标准库,因此将结果保存到文件中不成问题:
import pyspark
sc = pyspark.SparkContext('local[*]')
txt = sc.textFile('file:////usr/share/doc/python/copyright')
python_lines = txt.filter(lambda line: 'python' in line.lower())
with open('results.txt', 'w') as file_obj:
file_obj.write(f'Number of lines: {txt.count()}\n')
file_obj.write(f'Number of lines with python: {python_lines.count()}\n')
现在,您的结果保存在一个名为results.txt的单独文件中,以便于以后参考。
**注意:**上面的代码使用了 f 串,这是在 Python 3.6 中引入的。
PySpark 外壳
另一种特定于 PySpark 的运行程序的方式是使用 PySpark 本身提供的 shell。同样,使用 Docker 设置,您可以如上所述连接到容器的 CLI。然后,您可以使用以下命令运行专门的 Python shell:
$ /usr/local/spark/bin/pyspark
Python 3.7.3 | packaged by conda-forge | (default, Mar 27 2019, 23:01:00)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.1
/_/
Using Python version 3.7.3 (default, Mar 27 2019 23:01:00)
SparkSession available as 'spark'.
现在您处于 Docker 容器中的 Pyspark shell 环境*,您可以测试类似于 Jupyter 笔记本示例的代码:*
>>> txt = sc.textFile('file:////usr/share/doc/python/copyright')
>>> print(txt.count())
316
现在,您可以在 Pyspark shell 中工作,就像使用普通 Python shell 一样。
**注意:**您不必在 Pyspark shell 示例中创建一个SparkContext变量。PySpark shell 自动创建一个变量sc,在单节点模式下将您连接到 Spark 引擎。
当用spark-submit或 Jupyter 笔记本提交真正的 PySpark 程序时,你必须创建你自己的 SparkContext。
只要 PySpark 安装在 Python 环境中,您也可以使用标准的 Python shell 来执行您的程序。您一直在使用的 Docker 容器没有为标准 Python 环境启用 PySpark。因此,您必须使用前面的方法之一在 Docker 容器中使用 PySpark。
将 PySpark 与其他工具结合使用
正如您已经看到的,PySpark 附带了额外的库来完成机器学习和大型数据集的 SQL 式操作。然而,你也可以使用其他常见的科学图书馆,如 NumPy 和 Pandas 。
您必须在每个集群节点上的相同环境中安装它们,然后您的程序可以照常使用它们。然后,你可以自由地使用你已经知道的所有熟悉的惯用熊猫把戏。
记住: 熊猫数据帧被急切地评估,所以所有的数据将需要在一台机器上适合内存**。**
真正大数据处理的后续步骤
在学习 PySpark 基础知识后不久,您肯定会想要开始分析大量数据,这些数据在您使用单机模式时可能无法工作。安装和维护 Spark cluster 超出了本指南的范围,它本身可能是一项全职工作。
因此,现在可能是时候拜访您办公室的 it 部门,或者研究托管的 Spark 集群解决方案了。一个潜在的托管解决方案是数据块。
Databricks 允许你用微软 Azure 或 AWS 托管你的数据,并有14 天免费试用。
有了一个工作的 Spark 集群后,您会希望将所有数据放入该集群进行分析。Spark 有多种导入数据的方法:
- 亚马逊 S3
- Apache Hive 数据仓库
- 任何带有 JDBC 或 ODBC 接口的数据库
您甚至可以直接从网络文件系统中读取数据,这就是前面的例子的工作方式。
无论您是使用 Databricks 这样的托管解决方案还是您自己的机器集群,都不缺少访问所有数据的方法。
结论
PySpark 是大数据处理的一个很好的切入点。
在本教程中,您了解到,如果您熟悉一些函数式编程概念,如map()、filter()和 basic Python ,那么您就不必花很多时间预先学习。事实上,您可以在 PySpark 程序中直接使用所有您已经知道的 Python,包括熟悉的工具,如 NumPy 和 Pandas。
您现在能够:
- 了解适用于大数据的内置 Python 概念
- 编写基本 PySpark 程序
- 使用本地机器在小型数据集上运行 PySpark 程序
- 探索更强大的大数据解决方案,如 Spark 集群或其他定制的托管解决方案*******


