## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#"""A wrapper for GroupedData to behave like pandas GroupBy."""fromabcimportABCMeta,abstractmethodimportinspectfromcollectionsimportdefaultdict,namedtuplefromdistutils.versionimportLooseVersionfromfunctoolsimportpartialfromitertoolsimportproductfromtypingimport(Any,Callable,Dict,Generic,Iterator,Mapping,List,Optional,Sequence,Set,Tuple,Type,Union,cast,TYPE_CHECKING,)importwarningsimportpandasaspdfrompandas.api.typesimportis_number,is_hashable,is_list_like# type: ignore[attr-defined]ifLooseVersion(pd.__version__)>=LooseVersion("1.3.0"):frompandas.core.commonimport_builtin_table# type: ignore[attr-defined]else:frompandas.core.baseimportSelectionMixin_builtin_table=SelectionMixin._builtin_table# type: ignore[attr-defined]frompyspark.sqlimportColumn,DataFrameasSparkDataFrame,Window,functionsasFfrompyspark.sql.typesimport(BooleanType,DataType,DoubleType,NumericType,StructField,StructType,StringType,)frompysparkimportpandasasps# For running doctests and reference resolution in PyCharm.frompyspark.pandas._typingimportAxis,FrameLike,Label,Namefrompyspark.pandas.typedefimportinfer_return_type,DataFrameType,ScalarType,SeriesTypefrompyspark.pandas.frameimportDataFramefrompyspark.pandas.internalimport(InternalField,InternalFrame,HIDDEN_COLUMNS,NATURAL_ORDER_COLUMN_NAME,SPARK_INDEX_NAME_FORMAT,SPARK_DEFAULT_SERIES_NAME,SPARK_INDEX_NAME_PATTERN,)frompyspark.pandas.missing.groupbyimport(MissingPandasLikeDataFrameGroupBy,MissingPandasLikeSeriesGroupBy,)frompyspark.pandas.seriesimportSeries,first_seriesfrompyspark.pandas.sparkimportfunctionsasSFfrompyspark.pandas.configimportget_optionfrompyspark.pandas.utilsimport(align_diff_frames,is_name_like_tuple,is_name_like_value,name_like_string,same_anchor,scol_for,verify_temp_column_name,log_advice,)frompyspark.pandas.spark.utilsimportas_nullable_spark_type,force_decimal_precision_scalefrompyspark.pandas.exceptionsimportDataErrorifTYPE_CHECKING:frompyspark.pandas.windowimportRollingGroupby,ExpandingGroupby,ExponentialMovingGroupby# to keep it the same as pandasNamedAgg=namedtuple("NamedAgg",["column","aggfunc"])classGroupBy(Generic[FrameLike],metaclass=ABCMeta):""" :ivar _psdf: The parent dataframe that is used to perform the groupby :type _psdf: DataFrame :ivar _groupkeys: The list of keys that will be used to perform the grouping :type _groupkeys: List[Series] """def__init__(self,psdf:DataFrame,groupkeys:List[Series],as_index:bool,dropna:bool,column_labels_to_exclude:Set[Label],agg_columns_selected:bool,agg_columns:List[Series],):self._psdf=psdfself._groupkeys=groupkeysself._as_index=as_indexself._dropna=dropnaself._column_labels_to_exclude=column_labels_to_excludeself._agg_columns_selected=agg_columns_selectedself._agg_columns=agg_columns@propertydef_groupkeys_scols(self)->List[Column]:return[s.spark.columnforsinself._groupkeys]@propertydef_agg_columns_scols(self)->List[Column]:return[s.spark.columnforsinself._agg_columns]@abstractmethoddef_apply_series_op(self,op:Callable[["SeriesGroupBy"],Series],should_resolve:bool=False,numeric_only:bool=False,)->FrameLike:pass@abstractmethoddef_handle_output(self,psdf:DataFrame)->FrameLike:pass# TODO: Series support is not implemented yet.# TODO: not all arguments are implemented comparing to pandas' for now.defaggregate(self,func_or_funcs:Optional[Union[str,List[str],Dict[Name,Union[str,List[str]]]]]=None,*args:Any,**kwargs:Any,)->DataFrame:"""Aggregate using one or more operations over the specified axis. Parameters ---------- func_or_funcs : dict, str or list a dict mapping from column name (string) to aggregate functions (string or list of strings). Returns ------- Series or DataFrame The return can be: * Series : when DataFrame.agg is called with a single function * DataFrame : when DataFrame.agg is called with several functions Return Series or DataFrame. Notes ----- `agg` is an alias for `aggregate`. Use the alias. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 2], ... 'B': [1, 2, 3, 4], ... 'C': [0.362, 0.227, 1.267, -0.562]}, ... columns=['A', 'B', 'C']) >>> df A B C 0 1 1 0.362 1 1 2 0.227 2 2 3 1.267 3 2 4 -0.562 Different aggregations per column >>> aggregated = df.groupby('A').agg({'B': 'min', 'C': 'sum'}) >>> aggregated[['B', 'C']].sort_index() # doctest: +NORMALIZE_WHITESPACE B C A 1 1 0.589 2 3 0.705 >>> aggregated = df.groupby('A').agg({'B': ['min', 'max']}) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE B min max A 1 1 2 2 3 4 >>> aggregated = df.groupby('A').agg('min') >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE B C A 1 1 0.227 2 3 -0.562 >>> aggregated = df.groupby('A').agg(['min', 'max']) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE B C min max min max A 1 1 2 0.227 0.362 2 3 4 -0.562 1.267 To control the output names with different aggregations per column, pandas-on-Spark also supports 'named aggregation' or nested renaming in .agg. It can also be used when applying multiple aggregation functions to specific columns. >>> aggregated = df.groupby('A').agg(b_max=ps.NamedAgg(column='B', aggfunc='max')) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE b_max A 1 2 2 4 >>> aggregated = df.groupby('A').agg(b_max=('B', 'max'), b_min=('B', 'min')) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE b_max b_min A 1 2 1 2 4 3 >>> aggregated = df.groupby('A').agg(b_max=('B', 'max'), c_min=('C', 'min')) >>> aggregated.sort_index() # doctest: +NORMALIZE_WHITESPACE b_max c_min A 1 2 0.227 2 4 -0.562 """# I think current implementation of func and arguments in pandas-on-Spark for aggregate# is different than pandas, later once arguments are added, this could be removed.iffunc_or_funcsisNoneandkwargsisNone:raiseValueError("No aggregation argument or function specified.")relabeling=func_or_funcsisNoneandis_multi_agg_with_relabel(**kwargs)ifrelabeling:(func_or_funcs,columns,order,)=normalize_keyword_aggregation(# type: ignore[assignment]kwargs)ifnotisinstance(func_or_funcs,(str,list)):ifnotisinstance(func_or_funcs,dict)ornotall(is_name_like_value(key)and(isinstance(value,str)orisinstance(value,list)andall(isinstance(v,str)forvinvalue))forkey,valueinfunc_or_funcs.items()):raiseValueError("aggs must be a dict mapping from column name ""to aggregate functions (string or list of strings).")else:agg_cols=[col.nameforcolinself._agg_columns]func_or_funcs={col:func_or_funcsforcolinagg_cols}psdf:DataFrame=DataFrame(GroupBy._spark_groupby(self._psdf,func_or_funcs,self._groupkeys))ifself._dropna:psdf=DataFrame(psdf._internal.with_new_sdf(psdf._internal.spark_frame.dropna(subset=psdf._internal.index_spark_column_names)))ifnotself._as_index:should_drop_index=set(ifori,gkeyinenumerate(self._groupkeys)ifgkey._psdfisnotself._psdf)iflen(should_drop_index)>0:psdf=psdf.reset_index(level=should_drop_index,drop=True)iflen(should_drop_index)<len(self._groupkeys):psdf=psdf.reset_index()ifrelabeling:psdf=psdf[order]psdf.columns=columns# type: ignore[assignment]returnpsdfagg=aggregate@staticmethoddef_spark_groupby(psdf:DataFrame,func:Mapping[Name,Union[str,List[str]]],groupkeys:Sequence[Series]=(),)->InternalFrame:groupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(groupkeys))]groupkey_scols=[s.spark.column.alias(name)fors,nameinzip(groupkeys,groupkey_names)]multi_aggs=any(isinstance(v,list)forvinfunc.values())reordered=[]data_columns=[]column_labels=[]forkey,valueinfunc.items():label=keyifis_name_like_tuple(key)else(key,)iflen(label)!=psdf._internal.column_labels_level:raiseTypeError("The length of the key must be the same as the column label level.")foraggfuncin[value]ifisinstance(value,str)elsevalue:column_label=tuple(list(label)+[aggfunc])ifmulti_aggselselabelcolumn_labels.append(column_label)data_col=name_like_string(column_label)data_columns.append(data_col)col_name=psdf._internal.spark_column_name_for(label)ifaggfunc=="nunique":reordered.append(F.expr("count(DISTINCT `{0}`) as `{1}`".format(col_name,data_col)))# Implement "quartiles" aggregate function for ``describe``.elifaggfunc=="quartiles":reordered.append(F.expr("percentile_approx(`{0}`, array(0.25, 0.5, 0.75)) as `{1}`".format(col_name,data_col)))else:reordered.append(F.expr("{1}(`{0}`) as `{2}`".format(col_name,aggfunc,data_col)))sdf=psdf._internal.spark_frame.select(groupkey_scols+psdf._internal.data_spark_columns)sdf=sdf.groupby(*groupkey_names).agg(*reordered)returnInternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsseringroupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(groupkeys,groupkey_names)],column_labels=column_labels,data_spark_columns=[scol_for(sdf,col)forcolindata_columns],)
[docs]defcount(self)->FrameLike:""" Compute count of group, excluding missing values. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], ... 'B': [np.nan, 2, 3, 4, 5], ... 'C': [1, 2, 1, 1, 2]}, columns=['A', 'B', 'C']) >>> df.groupby('A').count().sort_index() # doctest: +NORMALIZE_WHITESPACE B C A 1 2 3 2 2 2 """returnself._reduce_for_stat_function(F.count)
[docs]deffirst(self,numeric_only:Optional[bool]=False,min_count:int=-1)->FrameLike:""" Compute first of group values. .. versionadded:: 3.3.0 Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. .. versionadded:: 3.4.0 min_count : int, default -1 The required number of valid values to perform the operation. If fewer than ``min_count`` non-NA values are present the result will be NA. .. versionadded:: 3.4.0 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], ... "C": [3, 3, 4, 4], "D": ["a", "b", "a", "a"]}) >>> df A B C D 0 1 True 3 a 1 2 False 3 b 2 1 False 4 a 3 2 True 4 a >>> df.groupby("A").first().sort_index() B C D A 1 True 3 a 2 False 3 b Include only float, int, boolean columns when set numeric_only True. >>> df.groupby("A").first(numeric_only=True).sort_index() B C A 1 True 3 2 False 3 >>> df.groupby("D").first().sort_index() A B C D a 1 True 3 b 2 False 3 >>> df.groupby("D").first(min_count=3).sort_index() A B C D a 1.0 True 3.0 b NaN None NaN """ifnotisinstance(min_count,int):raiseTypeError("min_count must be integer")returnself._reduce_for_stat_function(lambdacol:F.first(col,ignorenulls=True),accepted_spark_types=(NumericType,BooleanType)ifnumeric_onlyelseNone,min_count=min_count,)
[docs]deflast(self,numeric_only:Optional[bool]=False,min_count:int=-1)->FrameLike:""" Compute last of group values. .. versionadded:: 3.3.0 Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. .. versionadded:: 3.4.0 min_count : int, default -1 The required number of valid values to perform the operation. If fewer than ``min_count`` non-NA values are present the result will be NA. .. versionadded:: 3.4.0 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], ... "C": [3, 3, 4, 4], "D": ["a", "a", "b", "a"]}) >>> df A B C D 0 1 True 3 a 1 2 False 3 a 2 1 False 4 b 3 2 True 4 a >>> df.groupby("A").last().sort_index() B C D A 1 False 4 b 2 True 4 a Include only float, int, boolean columns when set numeric_only True. >>> df.groupby("A").last(numeric_only=True).sort_index() B C A 1 False 4 2 True 4 >>> df.groupby("D").last().sort_index() A B C D a 2 True 4 b 1 False 4 >>> df.groupby("D").last(min_count=3).sort_index() A B C D a 2.0 True 4.0 b NaN None NaN """ifnotisinstance(min_count,int):raiseTypeError("min_count must be integer")returnself._reduce_for_stat_function(lambdacol:F.last(col,ignorenulls=True),accepted_spark_types=(NumericType,BooleanType)ifnumeric_onlyelseNone,min_count=min_count,)
[docs]defmax(self,numeric_only:Optional[bool]=False,min_count:int=-1)->FrameLike:""" Compute max of group values. .. versionadded:: 3.3.0 Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. .. versionadded:: 3.4.0 min_count : bool, default -1 The required number of valid values to perform the operation. If fewer than min_count non-NA values are present the result will be NA. .. versionadded:: 3.4.0 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], ... "C": [3, 4, 3, 4], "D": ["a", "a", "b", "a"]}) >>> df.groupby("A").max().sort_index() B C D A 1 True 3 b 2 True 4 a Include only float, int, boolean columns when set numeric_only True. >>> df.groupby("A").max(numeric_only=True).sort_index() B C A 1 True 3 2 True 4 >>> df.groupby("D").max().sort_index() A B C D a 2 True 4 b 1 False 3 >>> df.groupby("D").max(min_count=3).sort_index() A B C D a 2.0 True 4.0 b NaN None NaN """ifnotisinstance(min_count,int):raiseTypeError("min_count must be integer")returnself._reduce_for_stat_function(F.max,accepted_spark_types=(NumericType,BooleanType)ifnumeric_onlyelseNone,min_count=min_count,)
[docs]defmean(self,numeric_only:Optional[bool]=True)->FrameLike:""" Compute mean of groups, excluding missing values. Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. .. versionadded:: 3.4.0 Returns ------- pyspark.pandas.Series or pyspark.pandas.DataFrame See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], ... 'B': [np.nan, 2, 3, 4, 5], ... 'C': [1, 2, 1, 1, 2], ... 'D': [True, False, True, False, True]}) Groupby one column and return the mean of the remaining columns in each group. >>> df.groupby('A').mean().sort_index() # doctest: +NORMALIZE_WHITESPACE B C D A 1 3.0 1.333333 0.333333 2 4.0 1.500000 1.000000 """self._validate_agg_columns(numeric_only=numeric_only,function_name="median")warnings.warn("Default value of `numeric_only` will be changed to `False` ""instead of `True` in 4.0.0.",FutureWarning,)returnself._reduce_for_stat_function(F.mean,accepted_spark_types=(NumericType,),bool_to_numeric=True)
# TODO: 'q' accepts list like type
[docs]defquantile(self,q:float=0.5,accuracy:int=10000)->FrameLike:""" Return group values at the given quantile. .. versionadded:: 3.4.0 Parameters ---------- q : float, default 0.5 (50% quantile) Value between 0 and 1 providing the quantile to compute. accuracy : int, optional Default accuracy of approximation. Larger value means better accuracy. The relative error can be deduced by 1.0 / accuracy. This is a panda-on-Spark specific parameter. Returns ------- pyspark.pandas.Series or pyspark.pandas.DataFrame Return type determined by caller of GroupBy object. Notes ----- `quantile` in pandas-on-Spark are using distributed percentile approximation algorithm unlike pandas, the result might be different with pandas, also `interpolation` parameter is not supported yet. See Also -------- pyspark.pandas.Series.quantile pyspark.pandas.DataFrame.quantile pyspark.sql.functions.percentile_approx Examples -------- >>> df = ps.DataFrame([ ... ['a', 1], ['a', 2], ['a', 3], ... ['b', 1], ['b', 3], ['b', 5] ... ], columns=['key', 'val']) Groupby one column and return the quantile of the remaining columns in each group. >>> df.groupby('key').quantile() val key a 2.0 b 3.0 """ifis_list_like(q):raiseNotImplementedError("q doesn't support for list like type for now")ifnotis_number(q):raiseTypeError("must be real number, not %s"%type(q).__name__)ifnot0<=q<=1:raiseValueError("'q' must be between 0 and 1. Got '%s' instead"%q)returnself._reduce_for_stat_function(lambdacol:F.percentile_approx(col.cast(DoubleType()),q,accuracy),accepted_spark_types=(NumericType,BooleanType),bool_to_numeric=True,)
[docs]defmin(self,numeric_only:Optional[bool]=False,min_count:int=-1)->FrameLike:""" Compute min of group values. .. versionadded:: 3.3.0 Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. .. versionadded:: 3.4.0 min_count : bool, default -1 The required number of valid values to perform the operation. If fewer than min_count non-NA values are present the result will be NA. .. versionadded:: 3.4.0 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], ... "C": [3, 4, 3, 4], "D": ["a", "a", "b", "a"]}) >>> df.groupby("A").min().sort_index() B C D A 1 False 3 a 2 False 4 a Include only float, int, boolean columns when set numeric_only True. >>> df.groupby("A").min(numeric_only=True).sort_index() B C A 1 False 3 2 False 4 >>> df.groupby("D").min().sort_index() A B C D a 1 False 3 b 1 False 3 >>> df.groupby("D").min(min_count=3).sort_index() A B C D a 1.0 False 3.0 b NaN None NaN """ifnotisinstance(min_count,int):raiseTypeError("min_count must be integer")returnself._reduce_for_stat_function(F.min,accepted_spark_types=(NumericType,BooleanType)ifnumeric_onlyelseNone,min_count=min_count,)
# TODO: sync the doc.
[docs]defstd(self,ddof:int=1)->FrameLike:""" Compute standard deviation of groups, excluding missing values. .. versionadded:: 3.3.0 Parameters ---------- ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. .. versionchanged:: 3.4.0 Supported including arbitary integers. Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], ... "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) >>> df.groupby("A").std() B C A 1 0.707107 0.0 2 0.707107 0.0 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """ifnotisinstance(ddof,int):raiseTypeError("ddof must be integer")# Raise the TypeError when all aggregation columns are of unaccepted data typesany_accepted=any(isinstance(_agg_col.spark.data_type,(NumericType,BooleanType))for_agg_colinself._agg_columns)ifnotany_accepted:raiseTypeError("Unaccepted data types of aggregation columns; numeric or bool expected.")defstd(col:Column)->Column:returnSF.stddev(col,ddof)returnself._reduce_for_stat_function(std,accepted_spark_types=(NumericType,),bool_to_numeric=True,)
[docs]defsum(self,numeric_only:Optional[bool]=True,min_count:int=0)->FrameLike:""" Compute sum of group values .. versionadded:: 3.3.0 Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. It takes no effect since only numeric columns can be support here. .. versionadded:: 3.4.0 min_count : int, default 0 The required number of valid values to perform the operation. If fewer than min_count non-NA values are present the result will be NA. .. versionadded:: 3.4.0 Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], ... "C": [3, 4, 3, 4], "D": ["a", "a", "b", "a"]}) >>> df.groupby("A").sum().sort_index() B C A 1 1 6 2 1 8 >>> df.groupby("D").sum().sort_index() A B C D a 5 2 11 b 1 0 3 >>> df.groupby("D").sum(min_count=3).sort_index() A B C D a 5.0 2.0 11.0 b NaN NaN NaN Notes ----- There is a behavior difference between pandas-on-Spark and pandas: * when there is a non-numeric aggregation column, it will be ignored even if `numeric_only` is False. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """warnings.warn("Default value of `numeric_only` will be changed to `False` ""instead of `True` in 4.0.0.",FutureWarning,)ifnumeric_onlyisnotNoneandnotisinstance(numeric_only,bool):raiseTypeError("numeric_only must be None or bool")ifnotisinstance(min_count,int):raiseTypeError("min_count must be integer")ifnumeric_onlyisnotNoneandnotnumeric_only:unsupported=[col.nameforcolinself._agg_columnsifnotisinstance(col.spark.data_type,(NumericType,BooleanType))]iflen(unsupported)>0:log_advice("GroupBy.sum() can only support numeric and bool columns even if"f"numeric_only=False, skip unsupported columns: {unsupported}")returnself._reduce_for_stat_function(F.sum,accepted_spark_types=(NumericType,BooleanType),bool_to_numeric=True,min_count=min_count,)
# TODO: sync the doc.
[docs]defvar(self,ddof:int=1)->FrameLike:""" Compute variance of groups, excluding missing values. .. versionadded:: 3.3.0 Parameters ---------- ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. .. versionchanged:: 3.4.0 Supported including arbitary integers. Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 2], "B": [True, False, False, True], ... "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) >>> df.groupby("A").var() B C A 1 0.5 0.0 2 0.5 0.0 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """ifnotisinstance(ddof,int):raiseTypeError("ddof must be integer")defvar(col:Column)->Column:returnSF.var(col,ddof)returnself._reduce_for_stat_function(var,accepted_spark_types=(NumericType,),bool_to_numeric=True,)
defskew(self)->FrameLike:""" Compute skewness of groups, excluding missing values. .. versionadded:: 3.4.0 Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], ... "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) >>> df.groupby("A").skew() B C A 1 -1.732051 1.732051 2 NaN NaN See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """returnself._reduce_for_stat_function(SF.skew,accepted_spark_types=(NumericType,),bool_to_numeric=True,)# TODO: 'axis', 'skipna', 'level' parameter should be implemented.
[docs]defmad(self)->FrameLike:""" Compute mean absolute deviation of groups, excluding missing values. .. versionadded:: 3.4.0 .. deprecated:: 3.4.0 Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], ... "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) >>> df.groupby("A").mad() B C A 1 0.444444 0.444444 2 0.000000 0.000000 >>> df.B.groupby(df.A).mad() A 1 0.444444 2 0.000000 Name: B, dtype: float64 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """warnings.warn("The 'mad' method is deprecated and will be removed in a future version. ""To compute the same result, you may do `(group_df - group_df.mean()).abs().mean()`.",FutureWarning,)groupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]internal,agg_columns,sdf=self._prepare_reduce(groupkey_names=groupkey_names,accepted_spark_types=(NumericType,BooleanType),bool_to_numeric=False,)psdf:DataFrame=DataFrame(internal)iflen(psdf._internal.column_labels)>0:window=Window.partitionBy(groupkey_names).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)new_agg_scols={}new_stat_scols=[]foragg_columninagg_columns:# it is not able to directly use 'self._reduce_for_stat_function', due to# 'it is not allowed to use a window function inside an aggregate function'.# so we need to create temporary columns to compute the 'abs(x - avg(x))' here.agg_column_name=agg_column._internal.data_spark_column_names[0]new_agg_column_name=verify_temp_column_name(psdf._internal.spark_frame,"__tmp_agg_col_{}__".format(agg_column_name))casted_agg_scol=F.col(agg_column_name).cast("double")new_agg_scols[new_agg_column_name]=F.abs(casted_agg_scol-F.avg(casted_agg_scol).over(window))new_stat_scols.append(F.avg(F.col(new_agg_column_name)).alias(agg_column_name))sdf=(psdf._internal.spark_frame.withColumns(new_agg_scols).groupby(groupkey_names).agg(*new_stat_scols))else:sdf=sdf.select(*groupkey_names).distinct()internal=internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],data_spark_columns=[scol_for(sdf,col)forcolininternal.data_spark_column_names],data_fields=None,)returnself._prepare_return(DataFrame(internal))
[docs]defsem(self,ddof:int=1)->FrameLike:""" Compute standard error of the mean of groups, excluding missing values. .. versionadded:: 3.4.0 Parameters ---------- ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. Examples -------- >>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], ... "C": [3, None, 3, 4], "D": ["a", "b", "b", "a"]}) >>> df.groupby("A").sem() B C A 1 0.333333 0.333333 2 NaN NaN >>> df.groupby("D").sem(ddof=1) A B C D a 0.0 0.0 0.5 b 0.5 0.0 NaN >>> df.B.groupby(df.A).sem() A 1 0.333333 2 NaN Name: B, dtype: float64 See Also -------- pyspark.pandas.Series.sem pyspark.pandas.DataFrame.sem """ifnotisinstance(ddof,int):raiseTypeError("ddof must be integer")# Raise the TypeError when all aggregation columns are of unaccepted data typesany_accepted=any(isinstance(_agg_col.spark.data_type,(NumericType,BooleanType))for_agg_colinself._agg_columns)ifnotany_accepted:raiseTypeError("Unaccepted data types of aggregation columns; numeric or bool expected.")defsem(col:Column)->Column:returnSF.stddev(col,ddof)/F.sqrt(F.count(col))returnself._reduce_for_stat_function(sem,accepted_spark_types=(NumericType,BooleanType),bool_to_numeric=True,)
# TODO: 1, 'n' accepts list and slice; 2, implement 'dropna' parameter
[docs]defnth(self,n:int)->FrameLike:""" Take the nth row from each group. .. versionadded:: 3.4.0 Parameters ---------- n : int A single nth value for the row Returns ------- Series or DataFrame Notes ----- There is a behavior difference between pandas-on-Spark and pandas: * when there is no aggregation column, and `n` not equal to 0 or -1, the returned empty dataframe may have an index with different lenght `__len__`. Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], ... 'B': [np.nan, 2, 3, 4, 5]}, columns=['A', 'B']) >>> g = df.groupby('A') >>> g.nth(0) B A 1 NaN 2 3.0 >>> g.nth(1) B A 1 2.0 2 5.0 >>> g.nth(-1) B A 1 4.0 2 5.0 See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby """ifisinstance(n,slice)oris_list_like(n):raiseNotImplementedError("n doesn't support slice or list for now")ifnotisinstance(n,int):raiseTypeError("Invalid index %s"%type(n).__name__)groupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]internal,agg_columns,sdf=self._prepare_reduce(groupkey_names=groupkey_names,accepted_spark_types=None,bool_to_numeric=False,)psdf:DataFrame=DataFrame(internal)iflen(psdf._internal.column_labels)>0:window1=Window.partitionBy(*groupkey_names).orderBy(NATURAL_ORDER_COLUMN_NAME)tmp_row_number_col=verify_temp_column_name(sdf,"__tmp_row_number_col__")ifn>=0:sdf=(psdf._internal.spark_frame.withColumn(tmp_row_number_col,F.row_number().over(window1)).where(F.col(tmp_row_number_col)==n+1).drop(tmp_row_number_col))else:window2=Window.partitionBy(*groupkey_names).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)tmp_group_size_col=verify_temp_column_name(sdf,"__tmp_group_size_col__")sdf=(psdf._internal.spark_frame.withColumn(tmp_group_size_col,F.count(F.lit(0)).over(window2)).withColumn(tmp_row_number_col,F.row_number().over(window1)).where(F.col(tmp_row_number_col)==F.col(tmp_group_size_col)+1+n).drop(tmp_group_size_col,tmp_row_number_col))else:sdf=sdf.select(*groupkey_names).distinct()internal=internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],data_spark_columns=[scol_for(sdf,col)forcolininternal.data_spark_column_names],data_fields=None,)returnself._prepare_return(DataFrame(internal))
[docs]defprod(self,numeric_only:Optional[bool]=True,min_count:int=0)->FrameLike:""" Compute prod of groups. .. versionadded:: 3.4.0 Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. min_count : int, default 0 The required number of valid values to perform the operation. If fewer than min_count non-NA values are present the result will be NA. Returns ------- Series or DataFrame Computed prod of values within each group. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> import numpy as np >>> df = ps.DataFrame( ... { ... "A": [1, 1, 2, 1, 2], ... "B": [np.nan, 2, 3, 4, 5], ... "C": [1, 2, 1, 1, 2], ... "D": [True, False, True, False, True], ... } ... ) Groupby one column and return the prod of the remaining columns in each group. >>> df.groupby('A').prod().sort_index() B C D A 1 8.0 2 0 2 15.0 2 1 >>> df.groupby('A').prod(min_count=3).sort_index() B C D A 1 NaN 2.0 0.0 2 NaN NaN NaN """ifnotisinstance(min_count,int):raiseTypeError("min_count must be integer")warnings.warn("Default value of `numeric_only` will be changed to `False` ""instead of `True` in 4.0.0.",FutureWarning,)self._validate_agg_columns(numeric_only=numeric_only,function_name="prod")returnself._reduce_for_stat_function(lambdacol:SF.product(col,True),accepted_spark_types=(NumericType,BooleanType),bool_to_numeric=True,min_count=min_count,)
[docs]defall(self,skipna:bool=True)->FrameLike:""" Returns True if all values in the group are truthful, else False. Parameters ---------- skipna : bool, default True Flag to ignore NA(nan/null) values during truth testing. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 2, 3, 3, 4, 4, 5, 5], ... 'B': [True, True, True, False, False, ... False, None, True, None, False]}, ... columns=['A', 'B']) >>> df A B 0 1 True 1 1 True 2 2 True 3 2 False 4 3 False 5 3 False 6 4 None 7 4 True 8 5 None 9 5 False >>> df.groupby('A').all().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 1 True 2 False 3 False 4 True 5 False >>> df.groupby('A').all(skipna=False).sort_index() # doctest: +NORMALIZE_WHITESPACE B A 1 True 2 False 3 False 4 False 5 False """groupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]internal,_,sdf=self._prepare_reduce(groupkey_names)psdf:DataFrame=DataFrame(internal)defsfun(scol:Column,scol_type:DataType)->Column:ifisinstance(scol_type,NumericType)orskipna:# np.nan takes no effect to the result; None takes no effect if `skipna`all_col=F.min(F.coalesce(scol.cast("boolean"),F.lit(True)))else:# Take None as False when not `skipna`all_col=F.min(F.when(scol.isNull(),F.lit(False)).otherwise(scol.cast("boolean")))returnall_coliflen(psdf._internal.column_labels)>0:stat_exprs=[]forlabelinpsdf._internal.column_labels:psser=psdf._psser_for(label)stat_exprs.append(sfun(psser._dtype_op.nan_to_null(psser).spark.column,psser.spark.data_type).alias(psser._internal.data_spark_column_names[0]))sdf=sdf.groupby(*groupkey_names).agg(*stat_exprs)else:sdf=sdf.select(*groupkey_names).distinct()internal=internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],data_spark_columns=[scol_for(sdf,col)forcolininternal.data_spark_column_names],data_fields=None,)returnself._prepare_return(DataFrame(internal))
# TODO: skipna should be implemented.
[docs]defany(self)->FrameLike:""" Returns True if any value in the group is truthful, else False. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 1, 2, 2, 3, 3, 4, 4, 5, 5], ... 'B': [True, True, True, False, False, ... False, None, True, None, False]}, ... columns=['A', 'B']) >>> df A B 0 1 True 1 1 True 2 2 True 3 2 False 4 3 False 5 3 False 6 4 None 7 4 True 8 5 None 9 5 False >>> df.groupby('A').any().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 1 True 2 True 3 False 4 True 5 False """returnself._reduce_for_stat_function(lambdacol:F.max(F.coalesce(col.cast("boolean"),F.lit(False))))
# TODO: groupby multiply columns should be implemented.
[docs]defdiff(self,periods:int=1)->FrameLike:""" First discrete difference of element. Calculates the difference of a DataFrame element compared with another element in the DataFrame group (default is the element in the same column of the previous row). Parameters ---------- periods : int, default 1 Periods to shift for calculating difference, accepts negative values. Returns ------- diffed : DataFrame or Series See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'a': [1, 2, 3, 4, 5, 6], ... 'b': [1, 1, 2, 3, 5, 8], ... 'c': [1, 4, 9, 16, 25, 36]}, columns=['a', 'b', 'c']) >>> df a b c 0 1 1 1 1 2 1 4 2 3 2 9 3 4 3 16 4 5 5 25 5 6 8 36 >>> df.groupby(['b']).diff().sort_index() a c 0 NaN NaN 1 1.0 3.0 2 NaN NaN 3 NaN NaN 4 NaN NaN 5 NaN NaN Difference with previous column in a group. >>> df.groupby(['b'])['a'].diff().sort_index() 0 NaN 1 1.0 2 NaN 3 NaN 4 NaN 5 NaN Name: a, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._diff(periods,part_cols=sg._groupkeys_scols),should_resolve=True)
[docs]defcumcount(self,ascending:bool=True)->Series:""" Number each item in each group from 0 to the length of that group - 1. Essentially this is equivalent to .. code-block:: python self.apply(lambda x: pd.Series(np.arange(len(x)), x.index)) Parameters ---------- ascending : bool, default True If False, number in reverse, from length of group - 1 to 0. Returns ------- Series Sequence number of each element within each group. Examples -------- >>> df = ps.DataFrame([['a'], ['a'], ['a'], ['b'], ['b'], ['a']], ... columns=['A']) >>> df A 0 a 1 a 2 a 3 b 4 b 5 a >>> df.groupby('A').cumcount().sort_index() 0 0 1 1 2 2 3 0 4 1 5 3 dtype: int64 >>> df.groupby('A').cumcount(ascending=False).sort_index() 0 3 1 2 2 1 3 1 4 0 5 0 dtype: int64 """ret=(self._groupkeys[0].rename().spark.transform(lambda_:F.lit(0))._cum(F.count,True,part_cols=self._groupkeys_scols,ascending=ascending)-1)internal=ret._internal.resolved_copyreturnfirst_series(DataFrame(internal))
[docs]defcummax(self)->FrameLike:""" Cumulative max for each group. Returns ------- Series or DataFrame See Also -------- Series.cummax DataFrame.cummax Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cummax().sort_index() B C 0 NaN 4 1 0.1 4 2 20.0 4 3 10.0 1 It works as below in Series. >>> df.C.groupby(df.A).cummax().sort_index() 0 4 1 4 2 4 3 1 Name: C, dtype: int64 """returnself._apply_series_op(lambdasg:sg._psser._cum(F.max,True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defcummin(self)->FrameLike:""" Cumulative min for each group. Returns ------- Series or DataFrame See Also -------- Series.cummin DataFrame.cummin Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cummin().sort_index() B C 0 NaN 4 1 0.1 3 2 0.1 2 3 10.0 1 It works as below in Series. >>> df.B.groupby(df.A).cummin().sort_index() 0 NaN 1 0.1 2 0.1 3 10.0 Name: B, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._cum(F.min,True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defcumprod(self)->FrameLike:""" Cumulative product for each group. Returns ------- Series or DataFrame See Also -------- Series.cumprod DataFrame.cumprod Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cumprod().sort_index() B C 0 NaN 4 1 0.1 12 2 2.0 24 3 10.0 1 It works as below in Series. >>> df.B.groupby(df.A).cumprod().sort_index() 0 NaN 1 0.1 2 2.0 3 10.0 Name: B, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._cumprod(True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defcumsum(self)->FrameLike:""" Cumulative sum for each group. Returns ------- Series or DataFrame See Also -------- Series.cumsum DataFrame.cumsum Examples -------- >>> df = ps.DataFrame( ... [[1, None, 4], [1, 0.1, 3], [1, 20.0, 2], [4, 10.0, 1]], ... columns=list('ABC')) >>> df A B C 0 1 NaN 4 1 1 0.1 3 2 1 20.0 2 3 4 10.0 1 By default, iterates over rows and finds the sum in each column. >>> df.groupby("A").cumsum().sort_index() B C 0 NaN 4 1 0.1 7 2 20.1 9 3 10.0 1 It works as below in Series. >>> df.B.groupby(df.A).cumsum().sort_index() 0 NaN 1 0.1 2 20.1 3 10.0 Name: B, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._cumsum(True,part_cols=sg._groupkeys_scols),should_resolve=True,numeric_only=True,)
[docs]defapply(self,func:Callable,*args:Any,**kwargs:Any)->Union[DataFrame,Series]:""" Apply function `func` group-wise and combine the results together. The function passed to `apply` must take a DataFrame as its first argument and return a DataFrame. `apply` will then take care of combining the results back together into a single dataframe. `apply` is therefore a highly flexible grouping method. While `apply` is a very flexible method, its downside is that using it can be quite a bit slower than using more specific methods like `agg` or `transform`. pandas-on-Spark offers a wide range of method that will be much faster than using `apply` for their specific purposes, so try to use them before reaching for `apply`. .. note:: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in ``func``, for instance, as below: >>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]: ... return x[['B', 'C']] / x[['B', 'C']] If the return type is specified, the output column names become `c0, c1, c2 ... cn`. These names are positionally mapped to the returned DataFrame in ``func``. To specify the column names, you can assign them in a NumPy compound type style as below: >>> def pandas_div(x) -> ps.DataFrame[("index", int), [("a", float), ("b", float)]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> pdf = pd.DataFrame({'B': [1.], 'C': [3.]}) >>> def plus_one(x) -> ps.DataFrame[ ... (pdf.index.name, pdf.index.dtype), zip(pdf.columns, pdf.dtypes)]: ... return x[['B', 'C']] / x[['B', 'C']] .. note:: the dataframe within ``func`` is actually a pandas dataframe. Therefore, any pandas API within this function is allowed. Parameters ---------- func : callable A callable that takes a DataFrame as its first argument, and returns a dataframe. *args Positional arguments to pass to func. **kwargs Keyword arguments to pass to func. Returns ------- applied : DataFrame or Series See Also -------- aggregate : Apply aggregate function to the GroupBy object. DataFrame.apply : Apply a function to a DataFrame. Series.apply : Apply a function to a Series. Examples -------- >>> df = ps.DataFrame({'A': 'a a b'.split(), ... 'B': [1, 2, 3], ... 'C': [4, 6, 5]}, columns=['A', 'B', 'C']) >>> g = df.groupby('A') Notice that ``g`` has two groups, ``a`` and ``b``. Calling `apply` in various ways, we can get different grouping results: Below the functions passed to `apply` takes a DataFrame as its argument and returns a DataFrame. `apply` combines the result for each group together into a new DataFrame: >>> def plus_min(x): ... return x + x.min() >>> g.apply(plus_min).sort_index() # doctest: +SKIP A B C 0 aa 2 8 1 aa 3 10 2 bb 6 10 >>> g.apply(sum).sort_index() # doctest: +NORMALIZE_WHITESPACE A B C A a aa 3 10 b b 3 5 >>> g.apply(len).sort_index() # doctest: +NORMALIZE_WHITESPACE A a 2 b 1 dtype: int64 You can specify the type hint and prevent schema inference for better performance. >>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> g.apply(pandas_div).sort_index() # doctest: +SKIP c0 c1 0 1.0 1.0 1 1.0 1.0 2 1.0 1.0 >>> def pandas_div(x) -> ps.DataFrame[("index", int), [("f1", float), ("f2", float)]]: ... return x[['B', 'C']] / x[['B', 'C']] >>> g.apply(pandas_div).sort_index() # doctest: +SKIP f1 f2 index 0 1.0 1.0 1 1.0 1.0 2 1.0 1.0 In case of Series, it works as below. >>> def plus_max(x) -> ps.Series[int]: ... return x + x.max() >>> df.B.groupby(df.A).apply(plus_max).sort_index() # doctest: +SKIP 0 6 1 3 2 4 Name: B, dtype: int64 >>> def plus_min(x): ... return x + x.min() >>> df.B.groupby(df.A).apply(plus_min).sort_index() # doctest: +SKIP 0 2 1 3 2 6 Name: B, dtype: int64 You can also return a scalar value as an aggregated value of the group: >>> def plus_length(x) -> int: ... return len(x) >>> df.B.groupby(df.A).apply(plus_length).sort_index() # doctest: +SKIP 0 1 1 2 Name: B, dtype: int64 The extra arguments to the function can be passed as below. >>> def calculation(x, y, z) -> int: ... return len(x) + y * z >>> df.B.groupby(df.A).apply(calculation, 5, z=10).sort_index() # doctest: +SKIP 0 51 1 52 Name: B, dtype: int64 """ifnotcallable(func):raiseTypeError("%s object is not callable"%type(func).__name__)spec=inspect.getfullargspec(func)return_sig=spec.annotations.get("return",None)should_infer_schema=return_sigisNoneshould_retain_index=should_infer_schemais_series_groupby=isinstance(self,SeriesGroupBy)psdf=self._psdfifself._agg_columns_selected:agg_columns=self._agg_columnselse:agg_columns=[psdf._psser_for(label)forlabelinpsdf._internal.column_labelsiflabelnotinself._column_labels_to_exclude]psdf,groupkey_labels,groupkey_names=GroupBy._prepare_group_map_apply(psdf,self._groupkeys,agg_columns)ifis_series_groupby:name=psdf.columns[-1]pandas_apply=_builtin_table.get(func,func)else:f=_builtin_table.get(func,func)defpandas_apply(pdf:pd.DataFrame,*a:Any,**k:Any)->Any:returnf(pdf.drop(groupkey_names,axis=1),*a,**k)should_return_series=Falseifshould_infer_schema:# Here we execute with the first 1000 to get the return type.log_advice("If the type hints is not specified for `groupby.apply`, ""it is expensive to infer the data type internally.")limit=get_option("compute.shortcut_limit")# Ensure sampling rows >= 2 to make sure apply's infer schema is accurate# See related: https://github.com/pandas-dev/pandas/issues/46893sample_limit=limit+1iflimitelse2pdf=psdf.head(sample_limit)._to_internal_pandas()groupkeys=[pdf[groupkey_name].rename(psser.name)forgroupkey_name,psserinzip(groupkey_names,self._groupkeys)]grouped=pdf.groupby(groupkeys)ifis_series_groupby:pser_or_pdf=grouped[name].apply(pandas_apply,*args,**kwargs)else:pser_or_pdf=grouped.apply(pandas_apply,*args,**kwargs)psser_or_psdf=ps.from_pandas(pser_or_pdf.infer_objects())iflen(pdf)<=limit:ifisinstance(psser_or_psdf,ps.Series)andis_series_groupby:psser_or_psdf=psser_or_psdf.rename(cast(SeriesGroupBy,self)._psser.name)returncast(Union[Series,DataFrame],psser_or_psdf)iflen(grouped)<=1:withwarnings.catch_warnings():warnings.simplefilter("always")warnings.warn("The amount of data for return type inference might not be large enough. ""Consider increasing an option `compute.shortcut_limit`.")ifisinstance(psser_or_psdf,Series):should_return_series=Truepsdf_from_pandas=psser_or_psdf._psdfelse:psdf_from_pandas=cast(DataFrame,psser_or_psdf)index_fields=[field.normalize_spark_type()forfieldinpsdf_from_pandas._internal.index_fields]data_fields=[field.normalize_spark_type()forfieldinpsdf_from_pandas._internal.data_fields]return_schema=StructType([field.struct_fieldforfieldinindex_fields+data_fields])else:return_type=infer_return_type(func)ifnotis_series_groupbyandisinstance(return_type,SeriesType):raiseTypeError("Series as a return type hint at frame groupby is not supported ""currently; however got [%s]. Use DataFrame type hint instead."%return_sig)ifisinstance(return_type,DataFrameType):data_fields=return_type.data_fieldsreturn_schema=return_type.spark_typeindex_fields=return_type.index_fieldsshould_retain_index=len(index_fields)>0psdf_from_pandas=Noneelse:should_return_series=Truedtype=cast(Union[SeriesType,ScalarType],return_type).dtypespark_type=cast(Union[SeriesType,ScalarType],return_type).spark_typeifis_series_groupby:data_fields=[InternalField(dtype=dtype,struct_field=StructField(name=name,dataType=spark_type))]else:data_fields=[InternalField(dtype=dtype,struct_field=StructField(name=SPARK_DEFAULT_SERIES_NAME,dataType=spark_type),)]return_schema=StructType([field.struct_fieldforfieldindata_fields])defpandas_groupby_apply(pdf:pd.DataFrame)->pd.DataFrame:ifis_series_groupby:pdf_or_ser=pdf.groupby(groupkey_names)[name].apply(pandas_apply,*args,**kwargs)else:pdf_or_ser=pdf.groupby(groupkey_names).apply(pandas_apply,*args,**kwargs)ifshould_return_seriesandisinstance(pdf_or_ser,pd.DataFrame):pdf_or_ser=pdf_or_ser.stack()ifnotisinstance(pdf_or_ser,pd.DataFrame):returnpd.DataFrame(pdf_or_ser)else:returnpdf_or_sersdf=GroupBy._spark_group_map_apply(psdf,pandas_groupby_apply,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],return_schema,retain_index=should_retain_index,)ifshould_retain_index:# If schema is inferred, we can restore indexes too.ifpsdf_from_pandasisnotNone:internal=psdf_from_pandas._internal.with_new_sdf(spark_frame=sdf,index_fields=index_fields,data_fields=data_fields)else:index_names:Optional[List[Optional[Tuple[Any,...]]]]=Noneindex_spark_columns=[scol_for(sdf,index_field.struct_field.name)forindex_fieldinindex_fields]ifnotany([SPARK_INDEX_NAME_PATTERN.match(index_field.struct_field.name)forindex_fieldinindex_fields]):index_names=[(index_field.struct_field.name,)forindex_fieldinindex_fields]internal=InternalFrame(spark_frame=sdf,index_names=index_names,index_spark_columns=index_spark_columns,index_fields=index_fields,data_fields=data_fields,)else:# Otherwise, it loses index.internal=InternalFrame(spark_frame=sdf,index_spark_columns=None,data_fields=data_fields)ifshould_return_series:psser=first_series(DataFrame(internal))ifis_series_groupby:psser=psser.rename(cast(SeriesGroupBy,self)._psser.name)returnpsserelse:returnDataFrame(internal)
# TODO: implement 'dropna' parameter
[docs]deffilter(self,func:Callable[[FrameLike],FrameLike])->FrameLike:""" Return a copy of a DataFrame excluding elements from groups that do not satisfy the boolean criterion specified by func. Parameters ---------- f : function Function to apply to each subframe. Should return True or False. dropna : Drop groups that do not pass the filter. True by default; if False, groups that evaluate False are filled with NaNs. Returns ------- filtered : DataFrame or Series Notes ----- Each subframe is endowed the attribute 'name' in case you need to know which group you are working on. Examples -------- >>> df = ps.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar', ... 'foo', 'bar'], ... 'B' : [1, 2, 3, 4, 5, 6], ... 'C' : [2.0, 5., 8., 1., 2., 9.]}, columns=['A', 'B', 'C']) >>> grouped = df.groupby('A') >>> grouped.filter(lambda x: x['B'].mean() > 3.) A B C 1 bar 2 5.0 3 bar 4 1.0 5 bar 6 9.0 >>> df.B.groupby(df.A).filter(lambda x: x.mean() > 3.) 1 2 3 4 5 6 Name: B, dtype: int64 """ifnotcallable(func):raiseTypeError("%s object is not callable"%type(func).__name__)is_series_groupby=isinstance(self,SeriesGroupBy)psdf=self._psdfifself._agg_columns_selected:agg_columns=self._agg_columnselse:agg_columns=[psdf._psser_for(label)forlabelinpsdf._internal.column_labelsiflabelnotinself._column_labels_to_exclude]data_schema=(psdf[agg_columns]._internal.resolved_copy.spark_frame.drop(*HIDDEN_COLUMNS).schema)psdf,groupkey_labels,groupkey_names=GroupBy._prepare_group_map_apply(psdf,self._groupkeys,agg_columns)ifis_series_groupby:defpandas_filter(pdf:pd.DataFrame)->pd.DataFrame:returnpd.DataFrame(pdf.groupby(groupkey_names)[pdf.columns[-1]].filter(func))else:f=_builtin_table.get(func,func)defwrapped_func(pdf:pd.DataFrame)->pd.DataFrame:returnf(pdf.drop(groupkey_names,axis=1))defpandas_filter(pdf:pd.DataFrame)->pd.DataFrame:returnpdf.groupby(groupkey_names).filter(wrapped_func).drop(groupkey_names,axis=1)sdf=GroupBy._spark_group_map_apply(psdf,pandas_filter,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],data_schema,retain_index=True,)psdf=DataFrame(self._psdf[agg_columns]._internal.with_new_sdf(sdf))ifis_series_groupby:returncast(FrameLike,first_series(psdf))else:returncast(FrameLike,psdf)
@staticmethoddef_prepare_group_map_apply(psdf:DataFrame,groupkeys:List[Series],agg_columns:List[Series])->Tuple[DataFrame,List[Label],List[str]]:groupkey_labels:List[Label]=[verify_temp_column_name(psdf,"__groupkey_{}__".format(i))foriinrange(len(groupkeys))]psdf=psdf[[s.rename(label)fors,labelinzip(groupkeys,groupkey_labels)]+agg_columns]groupkey_names=[labeliflen(label)>1elselabel[0]forlabelingroupkey_labels]returnDataFrame(psdf._internal.resolved_copy),groupkey_labels,groupkey_names@staticmethoddef_spark_group_map_apply(psdf:DataFrame,func:Callable[[pd.DataFrame],pd.DataFrame],groupkeys_scols:List[Column],return_schema:StructType,retain_index:bool,)->SparkDataFrame:output_func=GroupBy._make_pandas_df_builder_func(psdf,func,return_schema,retain_index)sdf=psdf._internal.spark_frame.drop(*HIDDEN_COLUMNS)returnsdf.groupby(*groupkeys_scols).applyInPandas(output_func,return_schema)@staticmethoddef_make_pandas_df_builder_func(psdf:DataFrame,func:Callable[[pd.DataFrame],pd.DataFrame],return_schema:StructType,retain_index:bool,)->Callable[[pd.DataFrame],pd.DataFrame]:""" Creates a function that can be used inside the pandas UDF. This function can construct the same pandas DataFrame as if the pandas-on-Spark DataFrame is collected to driver side. The index, column labels, etc. are re-constructed within the function. """frompyspark.sql.utilsimportis_timestamp_ntz_preferredarguments_for_restore_index=psdf._internal.arguments_for_restore_indexprefer_timestamp_ntz=is_timestamp_ntz_preferred()defrename_output(pdf:pd.DataFrame)->pd.DataFrame:pdf=InternalFrame.restore_index(pdf.copy(),**arguments_for_restore_index)pdf=func(pdf)# If schema should be inferred, we don't restore the index. pandas seems to restore# the index in some cases.# When Spark output type is specified, without executing it, we don't know# if we should restore the index or not. For instance, see the example in# https://github.com/databricks/koalas/issues/628.pdf,_,_,_,_=InternalFrame.prepare_pandas_frame(pdf,retain_index=retain_index,prefer_timestamp_ntz=prefer_timestamp_ntz)# Just positionally map the column names to given schema's.pdf.columns=return_schema.namesreturnpdfreturnrename_output
[docs]defrank(self,method:str="average",ascending:bool=True)->FrameLike:""" Provide the rank of values within each group. Parameters ---------- method : {'average', 'min', 'max', 'first', 'dense'}, default 'average' * average: average rank of group * min: lowest rank in group * max: highest rank in group * first: ranks assigned in order they appear in the array * dense: like 'min', but rank always increases by 1 between groups ascending : boolean, default True False for ranks by high (1) to low (N) Returns ------- DataFrame with ranking of values within each group Examples -------- >>> df = ps.DataFrame({ ... 'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df a b 0 1 1 1 1 2 2 1 2 3 2 2 4 2 3 5 2 3 6 3 3 7 3 4 8 3 4 >>> df.groupby("a").rank().sort_index() b 0 1.0 1 2.5 2 2.5 3 1.0 4 2.5 5 2.5 6 1.0 7 2.5 8 2.5 >>> df.b.groupby(df.a).rank(method='max').sort_index() 0 1.0 1 3.0 2 3.0 3 1.0 4 3.0 5 3.0 6 1.0 7 3.0 8 3.0 Name: b, dtype: float64 """returnself._apply_series_op(lambdasg:sg._psser._rank(method,ascending,part_cols=sg._groupkeys_scols),should_resolve=True,)
# TODO: add axis parameter
[docs]defidxmax(self,skipna:bool=True)->FrameLike:""" Return index of first occurrence of maximum over requested axis in group. NA/null values are excluded. Parameters ---------- skipna : boolean, default True Exclude NA/null values. If an entire row/column is NA, the result will be NA. See Also -------- Series.idxmax DataFrame.idxmax pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 2, 2, 3], ... 'b': [1, 2, 3, 4, 5], ... 'c': [5, 4, 3, 2, 1]}, columns=['a', 'b', 'c']) >>> df.groupby(['a'])['b'].idxmax().sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 1 2 3 3 4 Name: b, dtype: int64 >>> df.groupby(['a']).idxmax().sort_index() # doctest: +NORMALIZE_WHITESPACE b c a 1 1 0 2 3 2 3 4 4 """ifself._psdf._internal.index_level!=1:raiseValueError("idxmax only support one-level index now")groupkey_names=["__groupkey_{}__".format(i)foriinrange(len(self._groupkeys))]sdf=self._psdf._internal.spark_framefors,nameinzip(self._groupkeys,groupkey_names):sdf=sdf.withColumn(name,s.spark.column)index=self._psdf._internal.index_spark_column_names[0]stat_exprs=[]forpsser,scolinzip(self._agg_columns,self._agg_columns_scols):name=psser._internal.data_spark_column_names[0]ifskipna:order_column=scol.desc_nulls_last()else:order_column=scol.desc_nulls_first()window=Window.partitionBy(*groupkey_names).orderBy(order_column,NATURAL_ORDER_COLUMN_NAME)sdf=sdf.withColumn(name,F.when(F.row_number().over(window)==1,scol_for(sdf,index)).otherwise(None))stat_exprs.append(F.max(scol_for(sdf,name)).alias(name))sdf=sdf.groupby(*groupkey_names).agg(*stat_exprs)internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsserinself._groupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_names)],column_labels=[psser._column_labelforpsserinself._agg_columns],data_spark_columns=[scol_for(sdf,psser._internal.data_spark_column_names[0])forpsserinself._agg_columns],)returnself._handle_output(DataFrame(internal))
# TODO: add axis parameter
[docs]defidxmin(self,skipna:bool=True)->FrameLike:""" Return index of first occurrence of minimum over requested axis in group. NA/null values are excluded. Parameters ---------- skipna : boolean, default True Exclude NA/null values. If an entire row/column is NA, the result will be NA. See Also -------- Series.idxmin DataFrame.idxmin pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 2, 2, 3], ... 'b': [1, 2, 3, 4, 5], ... 'c': [5, 4, 3, 2, 1]}, columns=['a', 'b', 'c']) >>> df.groupby(['a'])['b'].idxmin().sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 0 2 2 3 4 Name: b, dtype: int64 >>> df.groupby(['a']).idxmin().sort_index() # doctest: +NORMALIZE_WHITESPACE b c a 1 0 1 2 2 3 3 4 4 """ifself._psdf._internal.index_level!=1:raiseValueError("idxmin only support one-level index now")groupkey_names=["__groupkey_{}__".format(i)foriinrange(len(self._groupkeys))]sdf=self._psdf._internal.spark_framefors,nameinzip(self._groupkeys,groupkey_names):sdf=sdf.withColumn(name,s.spark.column)index=self._psdf._internal.index_spark_column_names[0]stat_exprs=[]forpsser,scolinzip(self._agg_columns,self._agg_columns_scols):name=psser._internal.data_spark_column_names[0]ifskipna:order_column=scol.asc_nulls_last()else:order_column=scol.asc_nulls_first()window=Window.partitionBy(*groupkey_names).orderBy(order_column,NATURAL_ORDER_COLUMN_NAME)sdf=sdf.withColumn(name,F.when(F.row_number().over(window)==1,scol_for(sdf,index)).otherwise(None))stat_exprs.append(F.max(scol_for(sdf,name)).alias(name))sdf=sdf.groupby(*groupkey_names).agg(*stat_exprs)internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsserinself._groupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_names)],column_labels=[psser._column_labelforpsserinself._agg_columns],data_spark_columns=[scol_for(sdf,psser._internal.data_spark_column_names[0])forpsserinself._agg_columns],)returnself._handle_output(DataFrame(internal))
[docs]deffillna(self,value:Optional[Any]=None,method:Optional[str]=None,axis:Optional[Axis]=None,inplace:bool=False,limit:Optional[int]=None,)->FrameLike:"""Fill NA/NaN values in group. Parameters ---------- value : scalar, dict, Series Value to use to fill holes. alternately a dict/Series of values specifying which value to use for each column. DataFrame is not supported. method : {'backfill', 'bfill', 'pad', 'ffill', None}, default None Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. If method is not specified, this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None Returns ------- DataFrame DataFrame with NA entries filled. Examples -------- >>> df = ps.DataFrame({ ... 'A': [1, 1, 2, 2], ... 'B': [2, 4, None, 3], ... 'C': [None, None, None, 1], ... 'D': [0, 1, 5, 4] ... }, ... columns=['A', 'B', 'C', 'D']) >>> df A B C D 0 1 2.0 NaN 0 1 1 4.0 NaN 1 2 2 NaN NaN 5 3 2 3.0 1.0 4 We can also propagate non-null values forward or backward in group. >>> df.groupby(['A'])['B'].fillna(method='ffill').sort_index() 0 2.0 1 4.0 2 NaN 3 3.0 Name: B, dtype: float64 >>> df.groupby(['A']).fillna(method='bfill').sort_index() B C D 0 2.0 NaN 0 1 4.0 NaN 1 2 3.0 1.0 5 3 3.0 1.0 4 """returnself._apply_series_op(lambdasg:sg._psser._fillna(value=value,method=method,axis=axis,limit=limit,part_cols=sg._groupkeys_scols),should_resolve=(methodisnotNone),)
[docs]defbfill(self,limit:Optional[int]=None)->FrameLike:""" Synonym for `DataFrame.fillna()` with ``method=`bfill```. Parameters ---------- axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. If method is not specified, this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None Returns ------- DataFrame DataFrame with NA entries filled. Examples -------- >>> df = ps.DataFrame({ ... 'A': [1, 1, 2, 2], ... 'B': [2, 4, None, 3], ... 'C': [None, None, None, 1], ... 'D': [0, 1, 5, 4] ... }, ... columns=['A', 'B', 'C', 'D']) >>> df A B C D 0 1 2.0 NaN 0 1 1 4.0 NaN 1 2 2 NaN NaN 5 3 2 3.0 1.0 4 Propagate non-null values backward. >>> df.groupby(['A']).bfill().sort_index() B C D 0 2.0 NaN 0 1 4.0 NaN 1 2 3.0 1.0 5 3 3.0 1.0 4 """returnself.fillna(method="bfill",limit=limit)
[docs]defbackfill(self,limit:Optional[int]=None)->FrameLike:""" Alias for bfill. .. deprecated:: 3.4.0 """warnings.warn("The GroupBy.backfill method is deprecated ""and will be removed in a future version. ""Use GroupBy.bfill instead.",FutureWarning,)returnself.bfill(limit=limit)
[docs]defffill(self,limit:Optional[int]=None)->FrameLike:""" Synonym for `DataFrame.fillna()` with ``method=`ffill```. Parameters ---------- axis : {0 or `index`} 1 and `columns` are not supported. inplace : boolean, default False Fill in place (do not create a new object) limit : int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill. In other words, if there is a gap with more than this number of consecutive NaNs, it will only be partially filled. If method is not specified, this is the maximum number of entries along the entire axis where NaNs will be filled. Must be greater than 0 if not None Returns ------- DataFrame DataFrame with NA entries filled. Examples -------- >>> df = ps.DataFrame({ ... 'A': [1, 1, 2, 2], ... 'B': [2, 4, None, 3], ... 'C': [None, None, None, 1], ... 'D': [0, 1, 5, 4] ... }, ... columns=['A', 'B', 'C', 'D']) >>> df A B C D 0 1 2.0 NaN 0 1 1 4.0 NaN 1 2 2 NaN NaN 5 3 2 3.0 1.0 4 Propagate non-null values forward. >>> df.groupby(['A']).ffill().sort_index() B C D 0 2.0 NaN 0 1 4.0 NaN 1 2 NaN NaN 5 3 3.0 1.0 4 """returnself.fillna(method="ffill",limit=limit)
defpad(self,limit:Optional[int]=None)->FrameLike:""" Alias for ffill. .. deprecated:: 3.4.0 """warnings.warn("The GroupBy.pad method is deprecated ""and will be removed in a future version. ""Use GroupBy.ffill instead.",FutureWarning,)returnself.ffill(limit=limit)def_limit(self,n:int,asc:bool)->FrameLike:""" Private function for tail and head. """psdf=self._psdfifself._agg_columns_selected:agg_columns=self._agg_columnselse:agg_columns=[psdf._psser_for(label)forlabelinpsdf._internal.column_labelsiflabelnotinself._column_labels_to_exclude]psdf,groupkey_labels,_=GroupBy._prepare_group_map_apply(psdf,self._groupkeys,agg_columns,)groupkey_scols=[psdf._internal.spark_column_for(label)forlabelingroupkey_labels]sdf=psdf._internal.spark_framewindow=Window.partitionBy(*groupkey_scols)# This part is handled differently depending on whether it is a tail or a head.ordered_window=(window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())ifascelsewindow.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()))ifn>=0orLooseVersion(pd.__version__)<LooseVersion("1.4.0"):tmp_row_num_col=verify_temp_column_name(sdf,"__row_number__")sdf=(sdf.withColumn(tmp_row_num_col,F.row_number().over(ordered_window)).filter(F.col(tmp_row_num_col)<=n).drop(tmp_row_num_col))else:# Pandas supports Groupby positional indexing since v1.4.0# https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing## To support groupby positional indexing, we need add a `__tmp_lag__` column to help# us filtering rows before the specified offset row.## For example for the dataframe:# >>> df = ps.DataFrame([["g", "g0"],# ... ["g", "g1"],# ... ["g", "g2"],# ... ["g", "g3"],# ... ["h", "h0"],# ... ["h", "h1"]], columns=["A", "B"])# >>> df.groupby("A").head(-1)## Below is a result to show the `__tmp_lag__` column for above df, the limit n is# `-1`, the `__tmp_lag__` will be set to `0` in rows[:-1], and left will be set to# `null`:## >>> sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), -1).over(ordered_window))# +-----------------+--------------+---+---+-----------------+-----------+# |__index_level_0__|__groupkey_0__| A| B|__natural_order__|__tmp_lag__|# +-----------------+--------------+---+---+-----------------+-----------+# | 0| g| g| g0| 0| 0|# | 1| g| g| g1| 8589934592| 0|# | 2| g| g| g2| 17179869184| 0|# | 3| g| g| g3| 25769803776| null|# | 4| h| h| h0| 34359738368| 0|# | 5| h| h| h1| 42949672960| null|# +-----------------+--------------+---+---+-----------------+-----------+#tmp_lag_col=verify_temp_column_name(sdf,"__tmp_lag__")sdf=(sdf.withColumn(tmp_lag_col,F.lag(F.lit(0),n).over(ordered_window)).where(~F.isnull(F.col(tmp_lag_col))).drop(tmp_lag_col))internal=psdf._internal.with_new_sdf(sdf)returnself._handle_output(DataFrame(internal).drop(groupkey_labels,axis=1))
[docs]deftail(self,n:int=5)->FrameLike:""" Return last n rows of each group. Similar to `.apply(lambda x: x.tail(n))`, but it returns a subset of rows from the original DataFrame with original index and order preserved (`as_index` flag is ignored). Does not work for negative values of n. Returns ------- DataFrame or Series Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [2, 3, 1, 4, 6, 9, 8, 10, 7, 5], ... 'c': [3, 5, 2, 5, 1, 2, 6, 4, 3, 6]}, ... columns=['a', 'b', 'c'], ... index=[7, 2, 3, 1, 3, 4, 9, 10, 5, 6]) >>> df a b c 7 1 2 3 2 1 3 5 3 1 1 2 1 1 4 5 3 2 6 1 4 2 9 2 9 2 8 6 10 3 10 4 5 3 7 3 6 3 5 6 >>> df.groupby('a').tail(2).sort_index() a b c 1 1 4 5 3 1 1 2 4 2 9 2 5 3 7 3 6 3 5 6 9 2 8 6 >>> df.groupby('a')['b'].tail(2).sort_index() 1 4 3 1 4 9 5 7 6 5 9 8 Name: b, dtype: int64 Supports Groupby positional indexing Since pandas on Spark 3.4 (with pandas 1.4+): >>> df = ps.DataFrame([["g", "g0"], ... ["g", "g1"], ... ["g", "g2"], ... ["g", "g3"], ... ["h", "h0"], ... ["h", "h1"]], columns=["A", "B"]) >>> df.groupby("A").tail(-1) # doctest: +SKIP A B 3 g g3 2 g g2 1 g g1 5 h h1 """returnself._limit(n,asc=False)
[docs]defshift(self,periods:int=1,fill_value:Optional[Any]=None)->FrameLike:""" Shift each group by periods observations. Parameters ---------- periods : integer, default 1 number of periods to shift fill_value : optional Returns ------- Series or DataFrame Object shifted within each group. Examples -------- >>> df = ps.DataFrame({ ... 'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df a b 0 1 1 1 1 2 2 1 2 3 2 2 4 2 3 5 2 3 6 3 3 7 3 4 8 3 4 >>> df.groupby('a').shift().sort_index() # doctest: +SKIP b 0 NaN 1 1.0 2 2.0 3 NaN 4 2.0 5 3.0 6 NaN 7 3.0 8 4.0 >>> df.groupby('a').shift(periods=-1, fill_value=0).sort_index() # doctest: +SKIP b 0 2 1 2 2 0 3 3 4 3 5 0 6 4 7 4 8 0 """returnself._apply_series_op(lambdasg:sg._psser._shift(periods,fill_value,part_cols=sg._groupkeys_scols),should_resolve=True,)
[docs]deftransform(self,func:Callable[...,pd.Series],*args:Any,**kwargs:Any)->FrameLike:""" Apply function column-by-column to the GroupBy object. The function passed to `transform` must take a Series as its first argument and return a Series. The given function is executed for each series in each grouped data. While `transform` is a very flexible method, its downside is that using it can be quite a bit slower than using more specific methods like `agg` or `transform`. pandas-on-Spark offers a wide range of method that will be much faster than using `transform` for their specific purposes, so try to use them before reaching for `transform`. .. note:: this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in ``func``, for instance, as below: >>> def convert_to_string(x) -> ps.Series[str]: ... return x.apply("a string {}".format) When the given function has the return type annotated, the original index of the GroupBy object will be lost, and a default index will be attached to the result. Please be careful about configuring the default index. See also `Default Index Type <https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type>`_. .. note:: the series within ``func`` is actually a pandas series. Therefore, any pandas API within this function is allowed. Parameters ---------- func : callable A callable that takes a Series as its first argument, and returns a Series. *args Positional arguments to pass to func. **kwargs Keyword arguments to pass to func. Returns ------- applied : DataFrame See Also -------- aggregate : Apply aggregate function to the GroupBy object. Series.apply : Apply a function to a Series. Examples -------- >>> df = ps.DataFrame({'A': [0, 0, 1], ... 'B': [1, 2, 3], ... 'C': [4, 6, 5]}, columns=['A', 'B', 'C']) >>> g = df.groupby('A') Notice that ``g`` has two groups, ``0`` and ``1``. Calling `transform` in various ways, we can get different grouping results: Below the functions passed to `transform` takes a Series as its argument and returns a Series. `transform` applies the function on each series in each grouped data, and combine them into a new DataFrame: >>> def convert_to_string(x) -> ps.Series[str]: ... return x.apply("a string {}".format) >>> g.transform(convert_to_string) # doctest: +NORMALIZE_WHITESPACE B C 0 a string 1 a string 4 1 a string 2 a string 6 2 a string 3 a string 5 >>> def plus_max(x) -> ps.Series[int]: ... return x + x.max() >>> g.transform(plus_max) # doctest: +NORMALIZE_WHITESPACE B C 0 3 10 1 4 12 2 6 10 You can omit the type hint and let pandas-on-Spark infer its type. >>> def plus_min(x): ... return x + x.min() >>> g.transform(plus_min) # doctest: +NORMALIZE_WHITESPACE B C 0 2 8 1 3 10 2 6 10 In case of Series, it works as below. >>> df.B.groupby(df.A).transform(plus_max) 0 3 1 4 2 6 Name: B, dtype: int64 >>> (df * -1).B.groupby(df.A).transform(abs) 0 1 1 2 2 3 Name: B, dtype: int64 You can also specify extra arguments to pass to the function. >>> def calculation(x, y, z) -> ps.Series[int]: ... return x + x.min() + y + z >>> g.transform(calculation, 5, z=20) # doctest: +NORMALIZE_WHITESPACE B C 0 27 33 1 28 35 2 31 35 """ifnotcallable(func):raiseTypeError("%s object is not callable"%type(func).__name__)spec=inspect.getfullargspec(func)return_sig=spec.annotations.get("return",None)psdf,groupkey_labels,groupkey_names=GroupBy._prepare_group_map_apply(self._psdf,self._groupkeys,agg_columns=self._agg_columns)defpandas_transform(pdf:pd.DataFrame)->pd.DataFrame:returnpdf.groupby(groupkey_names).transform(func,*args,**kwargs)should_infer_schema=return_sigisNoneifshould_infer_schema:# Here we execute with the first 1000 to get the return type.# If the records were less than 1000, it uses pandas API directly for a shortcut.log_advice("If the type hints is not specified for `groupby.transform`, ""it is expensive to infer the data type internally.")limit=get_option("compute.shortcut_limit")pdf=psdf.head(limit+1)._to_internal_pandas()pdf=pdf.groupby(groupkey_names).transform(func,*args,**kwargs)psdf_from_pandas:DataFrame=DataFrame(pdf)return_schema=force_decimal_precision_scale(as_nullable_spark_type(psdf_from_pandas._internal.spark_frame.drop(*HIDDEN_COLUMNS).schema))iflen(pdf)<=limit:returnself._handle_output(psdf_from_pandas)sdf=GroupBy._spark_group_map_apply(psdf,pandas_transform,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],return_schema,retain_index=True,)# If schema is inferred, we can restore indexes too.internal=psdf_from_pandas._internal.with_new_sdf(sdf,index_fields=[field.copy(nullable=True)forfieldinpsdf_from_pandas._internal.index_fields],data_fields=[field.copy(nullable=True)forfieldinpsdf_from_pandas._internal.data_fields],)else:return_type=infer_return_type(func)ifnotisinstance(return_type,SeriesType):raiseTypeError("Expected the return type of this function to be of Series type, ""but found type {}".format(return_type))dtype=return_type.dtypespark_type=return_type.spark_typedata_fields=[InternalField(dtype=dtype,struct_field=StructField(name=c,dataType=spark_type))forcinpsdf._internal.data_spark_column_namesifcnotingroupkey_names]return_schema=StructType([field.struct_fieldforfieldindata_fields])sdf=GroupBy._spark_group_map_apply(psdf,pandas_transform,[psdf._internal.spark_column_for(label)forlabelingroupkey_labels],return_schema,retain_index=False,)# Otherwise, it loses index.internal=InternalFrame(spark_frame=sdf,index_spark_columns=None,data_fields=data_fields)returnself._handle_output(DataFrame(internal))
[docs]defnunique(self,dropna:bool=True)->FrameLike:""" Return DataFrame with number of distinct observations per group for each column. Parameters ---------- dropna : boolean, default True Don’t include NaN in the counts. Returns ------- nunique : DataFrame or Series Examples -------- >>> df = ps.DataFrame({'id': ['spam', 'egg', 'egg', 'spam', ... 'ham', 'ham'], ... 'value1': [1, 5, 5, 2, 5, 5], ... 'value2': list('abbaxy')}, columns=['id', 'value1', 'value2']) >>> df id value1 value2 0 spam 1 a 1 egg 5 b 2 egg 5 b 3 spam 2 a 4 ham 5 x 5 ham 5 y >>> df.groupby('id').nunique().sort_index() # doctest: +SKIP value1 value2 id egg 1 1 ham 1 2 spam 2 1 >>> df.groupby('id')['value1'].nunique().sort_index() # doctest: +NORMALIZE_WHITESPACE id egg 1 ham 1 spam 2 Name: value1, dtype: int64 """ifdropna:defstat_function(col:Column)->Column:returnF.countDistinct(col)else:defstat_function(col:Column)->Column:returnF.countDistinct(col)+F.when(F.count(F.when(col.isNull(),1).otherwise(None))>=1,1).otherwise(0)returnself._reduce_for_stat_function(stat_function)
defrolling(self,window:int,min_periods:Optional[int]=None)->"RollingGroupby[FrameLike]":""" Return an rolling grouper, providing rolling functionality per group. .. note:: 'min_periods' in pandas-on-Spark works as a fixed window size unlike pandas. Unlike pandas, NA is also counted as the period. This might be changed soon. Parameters ---------- window : int, or offset Size of the moving window. This is the number of observations used for calculating the statistic. Each window will be a fixed size. min_periods : int, default 1 Minimum number of observations in window required to have a value (otherwise result is NA). See Also -------- Series.groupby DataFrame.groupby """frompyspark.pandas.windowimportRollingGroupbyreturnRollingGroupby(self,window,min_periods=min_periods)defexpanding(self,min_periods:int=1)->"ExpandingGroupby[FrameLike]":""" Return an expanding grouper, providing expanding functionality per group. .. note:: 'min_periods' in pandas-on-Spark works as a fixed window size unlike pandas. Unlike pandas, NA is also counted as the period. This might be changed soon. Parameters ---------- min_periods : int, default 1 Minimum number of observations in window required to have a value (otherwise result is NA). See Also -------- Series.groupby DataFrame.groupby """frompyspark.pandas.windowimportExpandingGroupbyreturnExpandingGroupby(self,min_periods=min_periods)# TODO: 'adjust', 'axis', 'method' parameter should be implemented.
[docs]defewm(self,com:Optional[float]=None,span:Optional[float]=None,halflife:Optional[float]=None,alpha:Optional[float]=None,min_periods:Optional[int]=None,ignore_na:bool=False,)->"ExponentialMovingGroupby[FrameLike]":""" Return an ewm grouper, providing ewm functionality per group. .. note:: 'min_periods' in pandas-on-Spark works as a fixed window size unlike pandas. Unlike pandas, NA is also counted as the period. This might be changed soon. .. versionadded:: 3.4.0 Parameters ---------- com : float, optional Specify decay in terms of center of mass. alpha = 1 / (1 + com), for com >= 0. span : float, optional Specify decay in terms of span. alpha = 2 / (span + 1), for span >= 1. halflife : float, optional Specify decay in terms of half-life. alpha = 1 - exp(-ln(2) / halflife), for halflife > 0. alpha : float, optional Specify smoothing factor alpha directly. 0 < alpha <= 1. min_periods : int, default None Minimum number of observations in window required to have a value (otherwise result is NA). ignore_na : bool, default False Ignore missing values when calculating weights. - When ``ignore_na=False`` (default), weights are based on absolute positions. For example, the weights of :math:`x_0` and :math:`x_2` used in calculating the final weighted average of [:math:`x_0`, None, :math:`x_2`] are :math:`(1-\alpha)^2` and :math:`1` if ``adjust=True``, and :math:`(1-\alpha)^2` and :math:`\alpha` if ``adjust=False``. - When ``ignore_na=True``, weights are based on relative positions. For example, the weights of :math:`x_0` and :math:`x_2` used in calculating the final weighted average of [:math:`x_0`, None, :math:`x_2`] are :math:`1-\alpha` and :math:`1` if ``adjust=True``, and :math:`1-\alpha` and :math:`\alpha` if ``adjust=False``. """frompyspark.pandas.windowimportExponentialMovingGroupbyreturnExponentialMovingGroupby(self,com=com,span=span,halflife=halflife,alpha=alpha,min_periods=min_periods,ignore_na=ignore_na,)
[docs]defget_group(self,name:Union[Name,List[Name]])->FrameLike:""" Construct DataFrame from group with provided name. Parameters ---------- name : object The name of the group to get as a DataFrame. Returns ------- group : same type as obj Examples -------- >>> psdf = ps.DataFrame([('falcon', 'bird', 389.0), ... ('parrot', 'bird', 24.0), ... ('lion', 'mammal', 80.5), ... ('monkey', 'mammal', np.nan)], ... columns=['name', 'class', 'max_speed'], ... index=[0, 2, 3, 1]) >>> psdf name class max_speed 0 falcon bird 389.0 2 parrot bird 24.0 3 lion mammal 80.5 1 monkey mammal NaN >>> psdf.groupby("class").get_group("bird").sort_index() name class max_speed 0 falcon bird 389.0 2 parrot bird 24.0 >>> psdf.groupby("class").get_group("mammal").sort_index() name class max_speed 1 monkey mammal NaN 3 lion mammal 80.5 """groupkeys=self._groupkeysifnotis_hashable(name):raiseTypeError("unhashable type: '{}'".format(type(name).__name__))eliflen(groupkeys)>1:ifnotisinstance(name,tuple):raiseValueError("must supply a tuple to get_group with multiple grouping keys")iflen(groupkeys)!=len(name):raiseValueError("must supply a same-length tuple to get_group with multiple grouping keys")ifnotis_list_like(name):name=[name]cond=F.lit(True)forgroupkey,iteminzip(groupkeys,name):scol=groupkey.spark.columncond=cond&(scol==item)ifself._agg_columns_selected:internal=self._psdf._internalspark_frame=internal.spark_frame.select(internal.index_spark_columns+self._agg_columns_scols).filter(cond)internal=internal.copy(spark_frame=spark_frame,index_spark_columns=[scol_for(spark_frame,col)forcolininternal.index_spark_column_names],column_labels=[s._column_labelforsinself._agg_columns],data_spark_columns=[scol_for(spark_frame,s._internal.data_spark_column_names[0])forsinself._agg_columns],data_fields=[s._internal.data_fields[0]forsinself._agg_columns],)else:internal=self._psdf._internal.with_filter(cond)ifinternal.spark_frame.head()isNone:raiseKeyError(name)returnself._handle_output(DataFrame(internal))
[docs]defmedian(self,numeric_only:Optional[bool]=True,accuracy:int=10000)->FrameLike:""" Compute median of groups, excluding missing values. For multiple groupings, the result index will be a MultiIndex .. note:: Unlike pandas', the median in pandas-on-Spark is an approximated median based upon approximate percentile computation because computing median across a large dataset is extremely expensive. Parameters ---------- numeric_only : bool, default False Include only float, int, boolean columns. If None, will attempt to use everything, then use only numeric data. .. versionadded:: 3.4.0 Returns ------- Series or DataFrame Median of values within each group. Examples -------- >>> psdf = ps.DataFrame({'a': [1., 1., 1., 1., 2., 2., 2., 3., 3., 3.], ... 'b': [2., 3., 1., 4., 6., 9., 8., 10., 7., 5.], ... 'c': [3., 5., 2., 5., 1., 2., 6., 4., 3., 6.]}, ... columns=['a', 'b', 'c'], ... index=[7, 2, 4, 1, 3, 4, 9, 10, 5, 6]) >>> psdf a b c 7 1.0 2.0 3.0 2 1.0 3.0 5.0 4 1.0 1.0 2.0 1 1.0 4.0 5.0 3 2.0 6.0 1.0 4 2.0 9.0 2.0 9 2.0 8.0 6.0 10 3.0 10.0 4.0 5 3.0 7.0 3.0 6 3.0 5.0 6.0 DataFrameGroupBy >>> psdf.groupby('a').median().sort_index() # doctest: +NORMALIZE_WHITESPACE b c a 1.0 2.0 3.0 2.0 8.0 2.0 3.0 7.0 4.0 SeriesGroupBy >>> psdf.groupby('a')['b'].median().sort_index() a 1.0 2.0 2.0 8.0 3.0 7.0 Name: b, dtype: float64 """ifnotisinstance(accuracy,int):raiseTypeError("accuracy must be an integer; however, got [%s]"%type(accuracy).__name__)self._validate_agg_columns(numeric_only=numeric_only,function_name="median")warnings.warn("Default value of `numeric_only` will be changed to `False` ""instead of `True` in 4.0.0.",FutureWarning,)defstat_function(col:Column)->Column:returnF.percentile_approx(col,0.5,accuracy)returnself._reduce_for_stat_function(stat_function,accepted_spark_types=(NumericType,),bool_to_numeric=True,)
def_validate_agg_columns(self,numeric_only:Optional[bool],function_name:str)->None:"""Validate aggregation columns and raise an error or a warning following pandas."""has_non_numeric=Falsefor_agg_colinself._agg_columns:ifnotisinstance(_agg_col.spark.data_type,(NumericType,BooleanType)):has_non_numeric=Truebreakifhas_non_numeric:ifisinstance(self,SeriesGroupBy):raiseTypeError("Only numeric aggregation column is accepted.")ifnotnumeric_onlyandhas_non_numeric:warnings.warn("Dropping invalid columns in DataFrameGroupBy.%s is deprecated. ""In a future version, a TypeError will be raised. ""Before calling .%s, select only columns which should be ""valid for the function."%(function_name,function_name),FutureWarning,)def_reduce_for_stat_function(self,sfun:Callable[[Column],Column],accepted_spark_types:Optional[Tuple[Type[DataType],...]]=None,bool_to_numeric:bool=False,**kwargs:Any,)->FrameLike:"""Apply an aggregate function `sfun` per column and reduce to a FrameLike. Parameters ---------- sfun : The aggregate function to apply per column. accepted_spark_types: Accepted spark types of columns to be aggregated; default None means all spark types are accepted. bool_to_numeric: If True, boolean columns are converted to numeric columns, which are accepted for all statistical functions regardless of `accepted_spark_types`. """groupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]internal,_,sdf=self._prepare_reduce(groupkey_names,accepted_spark_types,bool_to_numeric)psdf:DataFrame=DataFrame(internal)iflen(psdf._internal.column_labels)>0:min_count=kwargs.get("min_count",0)stat_exprs=[]forlabelinpsdf._internal.column_labels:psser=psdf._psser_for(label)input_scol=psser._dtype_op.nan_to_null(psser).spark.columnoutput_scol=sfun(input_scol)ifmin_count>0:output_scol=F.when(F.count(F.when(~F.isnull(input_scol),F.lit(0)))>=min_count,output_scol)stat_exprs.append(output_scol.alias(psser._internal.data_spark_column_names[0]))sdf=sdf.groupby(*groupkey_names).agg(*stat_exprs)else:sdf=sdf.select(*groupkey_names).distinct()internal=internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],data_spark_columns=[scol_for(sdf,col)forcolininternal.data_spark_column_names],data_fields=None,)psdf=DataFrame(internal)returnself._prepare_return(psdf)def_prepare_return(self,psdf:DataFrame)->FrameLike:ifself._dropna:psdf=DataFrame(psdf._internal.with_new_sdf(psdf._internal.spark_frame.dropna(subset=psdf._internal.index_spark_column_names)))ifnotself._as_index:should_drop_index=set(ifori,gkeyinenumerate(self._groupkeys)ifgkey._psdfisnotself._psdf)iflen(should_drop_index)>0:psdf=psdf.reset_index(level=should_drop_index,drop=True)iflen(should_drop_index)<len(self._groupkeys):psdf=psdf.reset_index()returnself._handle_output(psdf)def_prepare_reduce(self,groupkey_names:List,accepted_spark_types:Optional[Tuple[Type[DataType],...]]=None,bool_to_numeric:bool=False,)->Tuple[InternalFrame,List[Series],SparkDataFrame]:groupkey_scols=[s.alias(name)fors,nameinzip(self._groupkeys_scols,groupkey_names)]agg_columns=[]forpsserinself._agg_columns:ifbool_to_numericandisinstance(psser.spark.data_type,BooleanType):agg_columns.append(psser.astype(int))elif(accepted_spark_typesisNone)orisinstance(psser.spark.data_type,accepted_spark_types):agg_columns.append(psser)sdf=self._psdf._internal.spark_frame.select(*groupkey_scols,*[psser.spark.columnforpsserinagg_columns])internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsserinself._groupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_names)],data_spark_columns=[scol_for(sdf,psser._internal.data_spark_column_names[0])forpsserinagg_columns],column_labels=[psser._column_labelforpsserinagg_columns],data_fields=[psser._internal.data_fields[0]forpsserinagg_columns],column_label_names=self._psdf._internal.column_label_names,)returninternal,agg_columns,sdf@staticmethoddef_resolve_grouping_from_diff_dataframes(psdf:DataFrame,by:List[Union[Series,Label]])->Tuple[DataFrame,List[Series],Set[Label]]:column_labels_level=psdf._internal.column_labels_levelcolumn_labels=[]additional_pssers=[]additional_column_labels=[]tmp_column_labels=set()fori,col_or_sinenumerate(by):ifisinstance(col_or_s,Series):ifcol_or_s._psdfispsdf:column_labels.append(col_or_s._column_label)elifsame_anchor(col_or_s,psdf):temp_label=verify_temp_column_name(psdf,"__tmp_groupkey_{}__".format(i))column_labels.append(temp_label)additional_pssers.append(col_or_s.rename(temp_label))additional_column_labels.append(temp_label)else:temp_label=verify_temp_column_name(psdf,tuple(([""]*(column_labels_level-1))+["__tmp_groupkey_{}__".format(i)]),)column_labels.append(temp_label)tmp_column_labels.add(temp_label)elifisinstance(col_or_s,tuple):psser=psdf[col_or_s]ifnotisinstance(psser,Series):raiseValueError(name_like_string(col_or_s))column_labels.append(col_or_s)else:raiseValueError(col_or_s)psdf=DataFrame(psdf._internal.with_new_columns([psdf._psser_for(label)forlabelinpsdf._internal.column_labels]+additional_pssers))defassign_columns(psdf:DataFrame,this_column_labels:List[Label],that_column_labels:List[Label])->Iterator[Tuple[Series,Label]]:raiseNotImplementedError("Duplicated labels with groupby() and ""'compute.ops_on_diff_frames' option is not supported currently ""Please use unique labels in series and frames.")forcol_or_s,labelinzip(by,column_labels):iflabelintmp_column_labels:psser=col_or_spsdf=align_diff_frames(assign_columns,psdf,psser.rename(label),fillna=False,how="inner",preserve_order_column=True,)tmp_column_labels|=set(additional_column_labels)new_by_series=[]forcol_or_s,labelinzip(by,column_labels):iflabelintmp_column_labels:psser=col_or_snew_by_series.append(psdf._psser_for(label).rename(psser.name))else:new_by_series.append(psdf._psser_for(label))returnpsdf,new_by_series,tmp_column_labels@staticmethoddef_resolve_grouping(psdf:DataFrame,by:List[Union[Series,Label]])->List[Series]:new_by_series=[]forcol_or_sinby:ifisinstance(col_or_s,Series):new_by_series.append(col_or_s)elifisinstance(col_or_s,tuple):psser=psdf[col_or_s]ifnotisinstance(psser,Series):raiseValueError(name_like_string(col_or_s))new_by_series.append(psser)else:raiseValueError(col_or_s)returnnew_by_seriesclassDataFrameGroupBy(GroupBy[DataFrame]):@staticmethoddef_build(psdf:DataFrame,by:List[Union[Series,Label]],as_index:bool,dropna:bool)->"DataFrameGroupBy":ifany(isinstance(col_or_s,Series)andnotsame_anchor(psdf,col_or_s)forcol_or_sinby):(psdf,new_by_series,column_labels_to_exclude,)=GroupBy._resolve_grouping_from_diff_dataframes(psdf,by)else:new_by_series=GroupBy._resolve_grouping(psdf,by)column_labels_to_exclude=set()returnDataFrameGroupBy(psdf,new_by_series,as_index=as_index,dropna=dropna,column_labels_to_exclude=column_labels_to_exclude,)def__init__(self,psdf:DataFrame,by:List[Series],as_index:bool,dropna:bool,column_labels_to_exclude:Set[Label],agg_columns:List[Label]=None,):agg_columns_selected=agg_columnsisnotNoneifagg_columns_selected:forlabelinagg_columns:iflabelincolumn_labels_to_exclude:raiseKeyError(label)else:agg_columns=[labelforlabelinpsdf._internal.column_labelsifnotany(label==key._column_labelandkey._psdfispsdfforkeyinby)andlabelnotincolumn_labels_to_exclude]super().__init__(psdf=psdf,groupkeys=by,as_index=as_index,dropna=dropna,column_labels_to_exclude=column_labels_to_exclude,agg_columns_selected=agg_columns_selected,agg_columns=[psdf[label]forlabelinagg_columns],)def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeDataFrameGroupBy,item):property_or_func=getattr(MissingPandasLikeDataFrameGroupBy,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)returnself.__getitem__(item)def__getitem__(self,item:Any)->GroupBy:ifself._as_indexandis_name_like_value(item):returnSeriesGroupBy(self._psdf._psser_for(itemifis_name_like_tuple(item)else(item,)),self._groupkeys,dropna=self._dropna,)else:ifis_name_like_tuple(item):item=[item]elifis_name_like_value(item):item=[(item,)]else:item=[iifis_name_like_tuple(i)else(i,)foriinitem]ifnotself._as_index:groupkey_names=set(key._column_labelforkeyinself._groupkeys)fornameinitem:ifnameingroupkey_names:raiseValueError("cannot insert {}, already exists".format(name_like_string(name)))returnDataFrameGroupBy(self._psdf,self._groupkeys,as_index=self._as_index,dropna=self._dropna,column_labels_to_exclude=self._column_labels_to_exclude,agg_columns=item,)def_apply_series_op(self,op:Callable[["SeriesGroupBy"],Series],should_resolve:bool=False,numeric_only:bool=False,)->DataFrame:applied=[]forcolumninself._agg_columns:applied.append(op(column.groupby(self._groupkeys)))ifnumeric_only:applied=[colforcolinappliedifisinstance(col.spark.data_type,NumericType)]ifnotapplied:raiseDataError("No numeric types to aggregate")internal=self._psdf._internal.with_new_columns(applied,keep_order=False)ifshould_resolve:internal=internal.resolved_copyreturnDataFrame(internal)def_handle_output(self,psdf:DataFrame)->DataFrame:returnpsdf# TODO: Implement 'percentiles', 'include', and 'exclude' arguments.# TODO: Add ``DataFrame.select_dtypes`` to See Also when 'include'# and 'exclude' arguments are implemented.
[docs]defdescribe(self)->DataFrame:""" Generate descriptive statistics that summarize the central tendency, dispersion and shape of a dataset's distribution, excluding ``NaN`` values. Analyzes both numeric and object series, as well as ``DataFrame`` column sets of mixed data types. The output will vary depending on what is provided. Refer to the notes below for more detail. .. note:: Unlike pandas, the percentiles in pandas-on-Spark are based upon approximate percentile computation because computing percentiles across a large dataset is extremely expensive. Returns ------- DataFrame Summary statistics of the DataFrame provided. See Also -------- DataFrame.count DataFrame.max DataFrame.min DataFrame.mean DataFrame.std Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 3], 'b': [4, 5, 6], 'c': [7, 8, 9]}) >>> df a b c 0 1 4 7 1 1 5 8 2 3 6 9 Describing a ``DataFrame``. By default only numeric fields are returned. >>> described = df.groupby('a').describe() >>> described.sort_index() # doctest: +NORMALIZE_WHITESPACE b c count mean std min 25% 50% 75% max count mean std min 25% 50% 75% max a 1 2.0 4.5 0.707107 4.0 4.0 4.0 5.0 5.0 2.0 7.5 0.707107 7.0 7.0 7.0 8.0 8.0 3 1.0 6.0 NaN 6.0 6.0 6.0 6.0 6.0 1.0 9.0 NaN 9.0 9.0 9.0 9.0 9.0 """forcolinself._agg_columns:ifisinstance(col.spark.data_type,StringType):raiseNotImplementedError("DataFrameGroupBy.describe() doesn't support for string type for now")psdf=self.aggregate(["count","mean","std","min","quartiles","max"])sdf=psdf._internal.spark_frameagg_column_labels=[col._column_labelforcolinself._agg_columns]formatted_percentiles=["25%","50%","75%"]# Split "quartiles" columns into first, second, and third quartiles.forlabelinagg_column_labels:quartiles_col=name_like_string(tuple(list(label)+["quartiles"]))fori,percentileinenumerate(formatted_percentiles):sdf=sdf.withColumn(name_like_string(tuple(list(label)+[percentile])),scol_for(sdf,quartiles_col)[i],)sdf=sdf.drop(quartiles_col)# Reorder columns lexicographically by agg column followed by stats.stats=["count","mean","std","min"]+formatted_percentiles+["max"]column_labels=[tuple(list(label)+[s])forlabel,sinproduct(agg_column_labels,stats)]data_columns=map(name_like_string,column_labels)# Reindex the DataFrame to reflect initial grouping and agg columns.internal=psdf._internal.copy(spark_frame=sdf,column_labels=column_labels,data_spark_columns=[scol_for(sdf,col)forcolindata_columns],data_fields=None,)# Cast columns to ``"float64"`` to match `pandas.DataFrame.groupby`.returnDataFrame(internal).astype("float64")
classSeriesGroupBy(GroupBy[Series]):@staticmethoddef_build(psser:Series,by:List[Union[Series,Label]],as_index:bool,dropna:bool)->"SeriesGroupBy":ifany(isinstance(col_or_s,Series)andnotsame_anchor(psser,col_or_s)forcol_or_sinby):psdf,new_by_series,_=GroupBy._resolve_grouping_from_diff_dataframes(psser.to_frame(),by)returnSeriesGroupBy(first_series(psdf).rename(psser.name),new_by_series,as_index=as_index,dropna=dropna,)else:new_by_series=GroupBy._resolve_grouping(psser._psdf,by)returnSeriesGroupBy(psser,new_by_series,as_index=as_index,dropna=dropna)def__init__(self,psser:Series,by:List[Series],as_index:bool=True,dropna:bool=True):ifnotas_index:raiseTypeError("as_index=False only valid with DataFrame")super().__init__(psdf=psser._psdf,groupkeys=by,as_index=True,dropna=dropna,column_labels_to_exclude=set(),agg_columns_selected=True,agg_columns=[psser],)self._psser=psserdef__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeSeriesGroupBy,item):property_or_func=getattr(MissingPandasLikeSeriesGroupBy,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)def_apply_series_op(self,op:Callable[["SeriesGroupBy"],Series],should_resolve:bool=False,numeric_only:bool=False,)->Series:ifnumeric_onlyandnotisinstance(self._agg_columns[0].spark.data_type,NumericType):raiseDataError("No numeric types to aggregate")psser=op(self)ifshould_resolve:internal=psser._internal.resolved_copyreturnfirst_series(DataFrame(internal))else:returnpsser.copy()def_handle_output(self,psdf:DataFrame)->Series:returnfirst_series(psdf).rename(self._psser.name)defagg(self,*args:Any,**kwargs:Any)->None:returnMissingPandasLikeSeriesGroupBy.agg(self,*args,**kwargs)defaggregate(self,*args:Any,**kwargs:Any)->None:returnMissingPandasLikeSeriesGroupBy.aggregate(self,*args,**kwargs)defsize(self)->Series:returnsuper().size().rename(self._psser.name)size.__doc__=GroupBy.size.__doc__# TODO: add keep parameter
[docs]defnsmallest(self,n:int=5)->Series:""" Return the smallest `n` elements. Parameters ---------- n : int Number of items to retrieve. See Also -------- pyspark.pandas.Series.nsmallest pyspark.pandas.DataFrame.nsmallest Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df.groupby(['a'])['b'].nsmallest(1).sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 0 1 2 3 2 3 6 3 Name: b, dtype: int64 """ifself._psser._internal.index_level>1:raiseValueError("nsmallest do not support multi-index now")groupkey_col_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]sdf=self._psser._internal.spark_frame.select(*[scol.alias(name)forscol,nameinzip(self._groupkeys_scols,groupkey_col_names)],*[scol.alias(SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,scolinenumerate(self._psser._internal.index_spark_columns)],self._psser.spark.column,NATURAL_ORDER_COLUMN_NAME,)window=Window.partitionBy(*groupkey_col_names).orderBy(scol_for(sdf,self._psser._internal.data_spark_column_names[0]).asc(),NATURAL_ORDER_COLUMN_NAME,)temp_rank_column=verify_temp_column_name(sdf,"__rank__")sdf=(sdf.withColumn(temp_rank_column,F.row_number().over(window)).filter(F.col(temp_rank_column)<=n).drop(temp_rank_column)).drop(NATURAL_ORDER_COLUMN_NAME)internal=InternalFrame(spark_frame=sdf,index_spark_columns=([scol_for(sdf,col)forcolingroupkey_col_names]+[scol_for(sdf,SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))foriinrange(self._psdf._internal.index_level)]),index_names=([psser._column_labelforpsserinself._groupkeys]+self._psdf._internal.index_names),index_fields=([psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_col_names)]+[field.copy(name=SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,fieldinenumerate(self._psdf._internal.index_fields)]),column_labels=[self._psser._column_label],data_spark_columns=[scol_for(sdf,self._psser._internal.data_spark_column_names[0])],data_fields=[self._psser._internal.data_fields[0]],)returnfirst_series(DataFrame(internal))
# TODO: add keep parameter
[docs]defnlargest(self,n:int=5)->Series:""" Return the first n rows ordered by columns in descending order in group. Return the first n rows with the smallest values in columns, in descending order. The columns that are not specified are returned as well, but not used for ordering. Parameters ---------- n : int Number of items to retrieve. See Also -------- pyspark.pandas.Series.nlargest pyspark.pandas.DataFrame.nlargest Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df.groupby(['a'])['b'].nlargest(1).sort_index() # doctest: +NORMALIZE_WHITESPACE a 1 1 2 2 4 3 3 7 4 Name: b, dtype: int64 """ifself._psser._internal.index_level>1:raiseValueError("nlargest do not support multi-index now")groupkey_col_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(self._groupkeys))]sdf=self._psser._internal.spark_frame.select(*[scol.alias(name)forscol,nameinzip(self._groupkeys_scols,groupkey_col_names)],*[scol.alias(SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,scolinenumerate(self._psser._internal.index_spark_columns)],self._psser.spark.column,NATURAL_ORDER_COLUMN_NAME,)window=Window.partitionBy(*groupkey_col_names).orderBy(scol_for(sdf,self._psser._internal.data_spark_column_names[0]).desc(),NATURAL_ORDER_COLUMN_NAME,)temp_rank_column=verify_temp_column_name(sdf,"__rank__")sdf=(sdf.withColumn(temp_rank_column,F.row_number().over(window)).filter(F.col(temp_rank_column)<=n).drop(temp_rank_column)).drop(NATURAL_ORDER_COLUMN_NAME)internal=InternalFrame(spark_frame=sdf,index_spark_columns=([scol_for(sdf,col)forcolingroupkey_col_names]+[scol_for(sdf,SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))foriinrange(self._psdf._internal.index_level)]),index_names=([psser._column_labelforpsserinself._groupkeys]+self._psdf._internal.index_names),index_fields=([psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(self._groupkeys,groupkey_col_names)]+[field.copy(name=SPARK_INDEX_NAME_FORMAT(i+len(self._groupkeys)))fori,fieldinenumerate(self._psdf._internal.index_fields)]),column_labels=[self._psser._column_label],data_spark_columns=[scol_for(sdf,self._psser._internal.data_spark_column_names[0])],data_fields=[self._psser._internal.data_fields[0]],)returnfirst_series(DataFrame(internal))
# TODO: add bins, normalize parameter
[docs]defvalue_counts(self,sort:Optional[bool]=None,ascending:Optional[bool]=None,dropna:bool=True)->Series:""" Compute group sizes. Parameters ---------- sort : boolean, default None Sort by frequencies. ascending : boolean, default False Sort in ascending order. dropna : boolean, default True Don't include counts of NaN. See Also -------- pyspark.pandas.Series.groupby pyspark.pandas.DataFrame.groupby Examples -------- >>> df = ps.DataFrame({'A': [1, 2, 2, 3, 3, 3], ... 'B': [1, 1, 2, 3, 3, np.nan]}, ... columns=['A', 'B']) >>> df A B 0 1 1.0 1 2 1.0 2 2 2.0 3 3 3.0 4 3 3.0 5 3 NaN >>> df.groupby('A')['B'].value_counts().sort_index() # doctest: +NORMALIZE_WHITESPACE A B 1 1.0 1 2 1.0 1 2.0 1 3 3.0 2 Name: B, dtype: int64 Don't include counts of NaN when dropna is False. >>> df.groupby('A')['B'].value_counts( ... dropna=False).sort_index() # doctest: +NORMALIZE_WHITESPACE A B 1 1.0 1 2 1.0 1 2.0 1 3 3.0 2 NaN 1 Name: B, dtype: int64 """warnings.warn("The resulting Series will have a fixed name of 'count' from 4.0.0.",FutureWarning,)groupkeys=self._groupkeys+self._agg_columnsgroupkey_names=[SPARK_INDEX_NAME_FORMAT(i)foriinrange(len(groupkeys))]groupkey_cols=[s.spark.column.alias(name)fors,nameinzip(groupkeys,groupkey_names)]sdf=self._psdf._internal.spark_frameagg_column=self._agg_columns[0]._internal.data_spark_column_names[0]sdf=sdf.groupby(*groupkey_cols).count().withColumnRenamed("count",agg_column)ifself._dropna:_groupkey_column_names=groupkey_names[:len(self._groupkeys)]sdf=sdf.dropna(subset=_groupkey_column_names)ifdropna:_agg_columns_names=groupkey_names[len(self._groupkeys):]sdf=sdf.dropna(subset=_agg_columns_names)ifsort:ifascending:sdf=sdf.orderBy(scol_for(sdf,agg_column).asc())else:sdf=sdf.orderBy(scol_for(sdf,agg_column).desc())internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolingroupkey_names],index_names=[psser._column_labelforpsseringroupkeys],index_fields=[psser._internal.data_fields[0].copy(name=name)forpsser,nameinzip(groupkeys,groupkey_names)],column_labels=[self._agg_columns[0]._column_label],data_spark_columns=[scol_for(sdf,agg_column)],)returnfirst_series(DataFrame(internal))
[docs]defunique(self)->Series:""" Return unique values in group. Unique is returned in order of unknown. It does NOT sort. See Also -------- pyspark.pandas.Series.unique pyspark.pandas.Index.unique Examples -------- >>> df = ps.DataFrame({'a': [1, 1, 1, 2, 2, 2, 3, 3, 3], ... 'b': [1, 2, 2, 2, 3, 3, 3, 4, 4]}, columns=['a', 'b']) >>> df.groupby(['a'])['b'].unique().sort_index() # doctest: +SKIP a 1 [1, 2] 2 [2, 3] 3 [3, 4] Name: b, dtype: object """returnself._reduce_for_stat_function(F.collect_set)
defis_multi_agg_with_relabel(**kwargs:Any)->bool:""" Check whether the kwargs pass to .agg look like multi-agg with relabling. Parameters ---------- **kwargs : dict Returns ------- bool Examples -------- >>> is_multi_agg_with_relabel(a='max') False >>> is_multi_agg_with_relabel(a_max=('a', 'max'), ... a_min=('a', 'min')) True >>> is_multi_agg_with_relabel() False """ifnotkwargs:returnFalsereturnall(isinstance(v,tuple)andlen(v)==2forvinkwargs.values())defnormalize_keyword_aggregation(kwargs:Dict[str,Tuple[Name,str]],)->Tuple[Dict[Name,List[str]],List[str],List[Tuple]]:""" Normalize user-provided kwargs. Transforms from the new ``Dict[str, NamedAgg]`` style kwargs to the old defaultdict[str, List[scalar]]. Parameters ---------- kwargs : dict Returns ------- aggspec : dict The transformed kwargs. columns : List[str] The user-provided keys. order : List[Tuple[str, str]] Pairs of the input and output column names. Examples -------- >>> normalize_keyword_aggregation({'output': ('input', 'sum')}) (defaultdict(<class 'list'>, {'input': ['sum']}), ['output'], [('input', 'sum')]) """aggspec:Dict[Union[Any,Tuple],List[str]]=defaultdict(list)order:List[Tuple]=[]columns,pairs=zip(*kwargs.items())forcolumn,aggfuncinpairs:ifcolumninaggspec:aggspec[column].append(aggfunc)else:aggspec[column]=[aggfunc]order.append((column,aggfunc))# For MultiIndex, we need to flatten the tuple, e.g. (('y', 'A'), 'max') needs to be# flattened to ('y', 'A', 'max'), it won't do anything on normal Index.ifisinstance(order[0][0],tuple):order=[(*levs,method)forlevs,methodinorder]returnaggspec,list(columns),orderdef_test()->None:importosimportdoctestimportsysimportnumpyfrompyspark.sqlimportSparkSessionimportpyspark.pandas.groupbyos.chdir(os.environ["SPARK_HOME"])globs=pyspark.pandas.groupby.__dict__.copy()globs["np"]=numpyglobs["ps"]=pyspark.pandasspark=(SparkSession.builder.master("local[4]").appName("pyspark.pandas.groupby tests").getOrCreate())(failure_count,test_count)=doctest.testmod(pyspark.pandas.groupby,globs=globs,optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,)spark.stop()iffailure_count:sys.exit(-1)if__name__=="__main__":_test()