Apache Spark는 대규모 분산 처리 환경에서 각 작업 노드 간 데이터와 변수를 효율적으로 공유하고 처리 성능을 최적화하기 위해 다양한 변수를 제공합니다. 이러한 변수는 네트워크 오버헤드를 줄이고 안전한 데이터 집계를 지원하여 대규모 작업에서도 일관된 결과를 보장합니다.


Closures

Closures는 드라이버에서 정의된 변수나 함수가 클러스터 내 Task 단위 작업에서 참조될 때 발생하는 개념입니다. 각 태스크는 독립적으로 실행되며, 드라이버에서 정의된 변수가 워커 노드의 태스크에서 사용될 수 있도록 전송됩니다.

  • 드라이버에서 정의된 변수를 클러스터 내 작업에서 사용
  • 불필요한 데이터 캡처는 성능 저하를 초래할 수 있음
  • 각 태스크는 독립적으로 실행됨
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.master("local").appName("Closure Example with UDF").getOrCreate()

# Closures
multiplier = 10

def multiply(x):
    return x * multiplier

multiply_udf = udf(multiply, IntegerType())

df = spark.createDataFrame([(1,), (2,), (3,), (4,)], ["value"])

result_df = df.withColumn("result", multiply_udf(df["value"]))
result_df.show()
+-----+------+
|value|result|
+-----+------+
|    1|    10|
|    2|    20|
|    3|    30|
|    4|    40|
+-----+------+

Broadcast Variables

Broadcast Variables는 데이터를 클러스터의 모든 Worker에 한 번만 전송하여 여러 작업에서 반복적으로 사용할 수 있게 해주는 변수입니다. 이를 통해 네트워크 비용을 절감하고 성능을 최적화할 수 있습니다.

  • 데이터를 한 번만 전송하여 여러 작업에서 사용
  • 네트워크 비용 절감
  • 대규모 데이터셋을 처리할 때 성능 최적화
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.master("local").appName("Broadcast Example with UDF").getOrCreate()

# Broadcast Variables
broadcast_var = spark.sparkContext.broadcast([1, 2, 3, 4, 5])

def add_broadcast_sum(x):
    return x + sum(broadcast_var.value)

add_broadcast_sum_udf = udf(add_broadcast_sum, IntegerType())

df = spark.createDataFrame([(10,), (20,), (30,), (40,)], ["value"])

result_df = df.withColumn("result", add_broadcast_sum_udf(df["value"]))
result_df.show()
+-----+------+
|value|result|
+-----+------+
|   10|    25|
|   20|    35|
|   30|    45|
|   40|    55|
+-----+------+

Accumulators

Accumulators는 값을 안전하게 누적할 수 있는 변수로, 주로 디버깅과 성능 분석에 사용됩니다. 값은 한 번에 하나의 작업에서만 수정되며, 작업 완료 후 드라이버에서 최종 값을 읽을 수 있습니다.

  • 값 안전하게 누적
  • 한 번에 하나의 작업만 수정
  • 최종 값은 드라이버에서 읽음
  • 작업 실패 시 재시도 시 누적값 중복 가능
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.master("local").appName("Accumulator Example with Global").getOrCreate()

# Accumulators
accum_even = spark.sparkContext.accumulator(0)
accum_sum = spark.sparkContext.accumulator(0)

def accumulate(x):
    if x % 2 == 0:
      accum_even.add(1) 
    return x * 2

def accumulate_sum(x):
    global accum_sum
    accum_sum += x["value"]

accumulate_udf = udf(accumulate, IntegerType())

df = spark.createDataFrame([(1,), (2,), (3,), (4,)], ["value"])

df.withColumn("accumulated", accumulate_udf(df["value"])).show()
print("Accumulator even after processing:", accum_even.value)

df.foreach(accumulate_sum)
print("Accumulator sum after processing:", accum_sum.value)
+-----+-----------+
|value|accumulated|
+-----+-----------+
|    1|          2|
|    2|          4|
|    3|          6|
|    4|          8|
+-----+-----------+

Accumulator even after processing: 2
Accumulator sum after processing: 10

References