python实战spark(五)常用API
常用APIclass pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)用于控制RDD存储。每个StorageLevel记录:是否使用内存,如果内存不足,是否将RDD放到磁盘上,是否以特定于java的序列化格式将数据保存在内存中,以及是否在多个节点上复制RDD分区。还包含一些常用存...
常用API
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)
用于控制RDD存储。每个StorageLevel记录:
是否使用内存,如果内存不足,是否将RDD放到磁盘上,是否以特定于java的序列化格式将数据保存在内存中,以及是否在多个节点上复制RDD分区。还包含一些常用存储级别的静态常量,MEMORY_ONLY。由于数据总是在Python端序列化,所以所有常量都使用序列化格式。
class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None, sock_file=None)
使用SparkContext.broadcast()
创建广播变量。通过值.value
访问值。
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
>>> large_broadcast = sc.broadcast(range(10000))
1.destroy()
删除与此广播变量相关的所有数据和元数据。一旦广播变量被销毁,就不能再使用它。此方法阻塞,直到删除完成。
2.dump(value,f)
3.load(file)
4.load_from_path(path)
5.unpersist(blocking=False)
删除执行器上此广播的缓存副本。如果在调用后使用广播,则需要将其重新发送给每个执行程序。
参数:
blocking-- 是否阻塞,直到完成非持久化
6.property value
class pyspark.Accumulator(aid, value, accum_param)
可以累积的共享变量,具有可交换和可关联的“add”操作。Spark集群上的工作任务可以使用+=operator
向累加器添加值,但是只有驱动程序可以使用value访问它的值。来自worker的更新会自动传播到driver。
虽然SparkContext支持基本数据类型(如int和float)的累加器,但是用户也可以通过提供一个自定义的AccumulatorParam对象来为自定义类型定义累加器。以该模块的doctest为例。
1.add(term)
2.property value
class pyspark.AccumulatorParam
定义如何累积给定类型的值的helper对象。
1.addInPlace(value1, value2)
添加累加器数据类型的两个值,返回一个新值;为了提高效率,还可以在适当的地方更新value1并返回它。
2.zero(value)
为类型提供一个“零值”,在维度上与提供的值兼容(例如,一个零向量)
class pyspark.MarshalSerializer
使用Python的Marshal序列化对象。该序列化更快,但支持少量数据
1.dumps(obj)
2.loads(obj)
class pyspark.PickleSerializer
该序列化器支持几乎所有Python对象,但可能不像其他专用的序列化器那么快。
1.dumps(obj)
2.loads(obj)
class pyspark.StatusTracker(jtracker)
用于监视job和stage progress的低级状态报告api。
这些api有意提供非常弱的一致性语义;这些api的使用者应该准备好处理空的/丢失的信息。例如,作业的stage id可能是已知的,但是状态API可能没有关于这些stage细节的任何信息,因此getStageInfo
可能会为有效的stage id返回None。
为了限制内存使用,这些api只提供关于最近jobs/stages的信息。这些api将为最后一个spark.ui.retainedStages
和spark.ui.retainedJobs
提供信息。
1.getActiveJobsIds()
返回一个包含所有活跃jobs的id的数组
2.getActiveStageIds()
返回一个包含所有活跃stages的id的数组
3.getJobIdsForGroup(jobGroup=None)
返回特定作业group中所有已知作业的列表。如果jobGroup为None,则返回所有与作业组无关的已知作业。
返回的列表可能包含正在运行、失败和已完成的作业,并且在此方法的不同调用中可能有所不同。此方法不保证其结果中元素的顺序。
4.getJobInfo(jobId)
返回SparkJobInfo对象,如果找不到作业信息或作业信息已被垃圾收集,则返回None。
5.getStageInfo(stageId)
返回SparkStageInfo对象,如果找不到作业信息或作业信息已被垃圾收集,则返回None。
class pyspark.SparkJobInfo
暴露有关Spark作业的信息。
class pyspark.SparkStageInfo
暴露有关Spark阶段的信息。
class pyspark.Profiler(ctx)
PySpark支持自定义分析器,这是为了允许使用不同的分析器,以及输出到不同的格式,而不是在BasicProfiler中提供的。
自定义分析器必须定义或继承以下方法:
- profile–将生成某种类型的系统配置文件。
- stats–返回收集到的统计信息。
- dump–将概要文件转储到路径
- add --将概要文件添加到现有的累积概要文件
创建SparkContext时选择profiler类
>>> from pyspark import SparkConf, SparkContext
>>> from pyspark import BasicProfiler
>>> class MyCustomProfiler(BasicProfiler):
... def show(self, id):
... print("My custom profiles for RDD:%s" % id)
...
>>> conf = SparkConf().set("spark.python.profile", "true")
>>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
>>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.parallelize(range(1000)).count()
1000
>>> sc.show_profiles()
My custom profiles for RDD:1
My custom profiles for RDD:3
>>> sc.stop()
1.dump(id, path)
将profile转储到path中,id是RDD id
2.profile(func)
利用函数分析
3.show(id)
打印profile状态到输出
4.stats()
返回收集到的分析状态
class pyspark.BasicProfiler(ctx)
BasicProfiler是默认的profiler,它是基于cProfile和累加器实现的
1.profile(func)
运行并配置传入的方法to_profile。返回一个profile对象。
2.stats()
返回收集到的profiling统计信息(pstats.Stats)
class pyspark.TaskContext
任务的上下文信息,可以在执行过程中读取或修改。要访问正在运行的任务的TaskContext通过TaskContext.get()
。
1.attemptNumber()
“这个任务已经尝试了多少次了。第一次任务尝试将被分配为尝试号= 0,后续尝试的尝试号将不断增加。
2.classmethod get()
返回当前活动的TaskContext。这可以在用户函数内部调用,以访问有关正在运行的任务的上下文信息。
注意:必须是called on worker,而不是driver。如果没有初始化,则返回None。
3.getLocalProperty(key)
在driver的上游设置一个本地属性,如果它丢失,则不设置。
4.partitionId()
此任务计算的RDD分区的ID。
5.stageId()
此任务所属的阶段的ID。
6.taskAttemptId()
此任务尝试的唯一ID(在相同的SparkContext中,没有两个任务尝试的尝试id不同)。这大致相当于Hadoop的TaskAttemptID。
class pyspark.RDDBarrier(rdd)
将RDD包装在barrier阶段中,这迫使Spark一起启动这个阶段的任务。RDDBarrier
实例是由RDD.barrier()
创建的。
1.mapPartitions(f, preservesPartitioning=False)
通过将一个函数应用到包装好的RDD的每个分区,返回一个新的RDD,其中的任务一起在barrier阶段启动。该接口与RDD.mapPartitions()相同。
class pyspark.BarrierTaskContext
在barrier stage中带有额外上下文信息和工具的TaskContext。使用BarrierTaskContext.get()
获取正在运行的barrier任务的barrier上下文。
1.barrier()
设置全局屏障并等待,直到此阶段的所有任务都遇到此屏障。与MPI中的MPI_Barrier函数类似,这个函数会阻塞,直到同一阶段的所有任务都到达这个例程为止。
在barrier阶段,每个任务在所有可能的代码分支中调用barrier()的次数都是相同的。否则,可能会挂起作业或超时后出现SparkException。
2.classmethod get()
返回当前活动的BarrierTaskContext。这可以在用户函数内部调用,以访问有关正在运行的任务的上下文信息。
注意:必须在worker调用而不是driver。如果没有初始化,则返回None。
3.getTaskInfos()
返回此屏障阶段中所有任务的BarrierTaskInfo,按分区ID排序。
class pyspark.BarrierTaskInfo(address)
携带一个barrier任务的所有任务信息。
变量:
address—barrier任务运行的执行程序的IPv4地址(主机:端口)
更多推荐
所有评论(0)