From aebbb06116184ead8e5f98a4c7588d75e5e3a54f Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Sat, 25 Mar 2023 17:18:57 -0700 Subject: [PATCH 1/4] wip --- docs/source/extending_text_generation.rst | 6 +- docs/source/generating_cdc_data.rst | 53 ++++++------- docs/source/generating_json_data.rst | 91 +++++++++++----------- docs/source/multi_table_data.rst | 91 +++++++++++----------- docs/source/troubleshooting.rst | 93 ++++++++++++----------- 5 files changed, 174 insertions(+), 160 deletions(-) diff --git a/docs/source/extending_text_generation.rst b/docs/source/extending_text_generation.rst index 7f0df8a4..f59e7171 100644 --- a/docs/source/extending_text_generation.rst +++ b/docs/source/extending_text_generation.rst @@ -38,7 +38,7 @@ extended syntax. .withColumn("address", text=fakerText("address" )) .withColumn("email", text=fakerText("ascii_company_email") ) .withColumn("ip_address", text=fakerText("ipv4_private" )) - .withColumn("faker_text", text=fakerText("sentence", ext_word_list=my_word_list) ) + .withColumn("faker_text", text=fakerText("sentence", ext_word_list=my_word_list)) ) dfFakerOnly = fakerDataspec.build() @@ -91,7 +91,9 @@ The following code shows use of a custom Python function to generate text: pluginDataspec = (DataGenerator(spark, rows=data_rows, partitions=partitions_requested, randomSeedMethod="hash_fieldname") - .withColumn("text", text=PyfuncText(text_generator, initFn=initPluginContext)) + .withColumn("text", + text=PyfuncText(text_generator, + initFn=initPluginContext)) ) dfPlugin = pluginDataspec.build() diff --git a/docs/source/generating_cdc_data.rst b/docs/source/generating_cdc_data.rst index 1633ce41..ccbf16b4 100644 --- a/docs/source/generating_cdc_data.rst +++ b/docs/source/generating_cdc_data.rst @@ -1,7 +1,7 @@ .. Test Data Generator documentation master file, created by - sphinx-quickstart on Sun Jun 21 10:54:30 2020. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +sphinx-quickstart on Sun Jun 21 10:54:30 2020. +You can adapt this file completely to your liking, but it should at least +contain the root `toctree` directive. Generating Change Data Capture Data =================================== @@ -47,28 +47,30 @@ We'll add a timestamp for when the row was generated and a memo field to mark wh uniqueCustomers = 10 * 1000000 - dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) - .withColumn("customer_id","long", uniqueValues=uniqueCustomers) - .withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') - .withColumn("alias", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') - .withColumn("payment_instrument_type", values=['paypal', 'Visa', 'Mastercard', - 'American Express', 'discover', 'branded visa', 'branded mastercard'], - random=True, distribution="normal") - .withColumn("int_payment_instrument", "int", minValue=0000, maxValue=9999, baseColumn="customer_id", - baseColumnType="hash", omit=True) - .withColumn("payment_instrument", expr="format_number(int_payment_instrument, '**** ****** *####')", - baseColumn="int_payment_instrument") - .withColumn("email", template=r'\\w.\\w@\\w.com|\\w-\\w@\\w') - .withColumn("email2", template=r'\\w.\\w@\\w.com') - .withColumn("ip_address", template=r'\\n.\\n.\\n.\\n') - .withColumn("md5_payment_instrument", - expr="md5(concat(payment_instrument_type, ':', payment_instrument))", - base_column=['payment_instrument_type', 'payment_instrument']) - .withColumn("customer_notes", text=dg.ILText(words=(1,8))) - .withColumn("created_ts", "timestamp", expr="now()") - .withColumn("modified_ts", "timestamp", expr="now()") - .withColumn("memo", expr="'original data'") - ) + dataspec = ( + dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) + .withColumn("customer_id","long", uniqueValues=uniqueCustomers) + .withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') + .withColumn("alias", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') + .withColumn("payment_instrument_type", values=['paypal', 'Visa', 'Mastercard', + 'American Express', 'discover', 'branded visa', 'branded mastercard'], + random=True, distribution="normal") + .withColumn("int_payment_instrument", "int", minValue=0000, maxValue=9999, + baseColumn="customer_id", baseColumnType="hash", omit=True) + .withColumn("payment_instrument", + expr="format_number(int_payment_instrument, '**** ****** *####')", + baseColumn="int_payment_instrument") + .withColumn("email", template=r'\\w.\\w@\\w.com|\\w-\\w@\\w') + .withColumn("email2", template=r'\\w.\\w@\\w.com') + .withColumn("ip_address", template=r'\\n.\\n.\\n.\\n') + .withColumn("md5_payment_instrument", + expr="md5(concat(payment_instrument_type, ':', payment_instrument))", + base_column=['payment_instrument_type', 'payment_instrument']) + .withColumn("customer_notes", text=dg.ILText(words=(1,8))) + .withColumn("created_ts", "timestamp", expr="now()") + .withColumn("modified_ts", "timestamp", expr="now()") + .withColumn("memo", expr="'original data'") + ) df1 = dataspec.build() # write table @@ -168,7 +170,6 @@ values of the columns from the source table will be used. ]) print(sqlStmt) - spark.sql(sqlStmt) That's all that's required to perform merges with the data generation framework. diff --git a/docs/source/generating_json_data.rst b/docs/source/generating_json_data.rst index 56989d76..a26db3ee 100644 --- a/docs/source/generating_json_data.rst +++ b/docs/source/generating_json_data.rst @@ -195,51 +195,52 @@ functions such as `named_struct` and `to_json`. lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid'] - testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=1000000, - partitions=8, - randomSeedMethod='hash_fieldname') - .withIdOutput() - # we'll use hash of the base field to generate the ids to - # avoid a simple incrementing sequence - .withColumn("internal_device_id", LongType(), minValue=0x1000000000000, - uniqueValues=device_population, omit=True, baseColumnType="hash") - - # note for format strings, we must use "%lx" not "%x" as the - # underlying value is a long - .withColumn("device_id", StringType(), format="0x%013x", - baseColumn="internal_device_id") - - # the device / user attributes will be the same for the same device id - # so lets use the internal device id as the base column for these attribute - .withColumn("country", StringType(), values=country_codes, - weights=country_weights, - baseColumn="internal_device_id") - - .withColumn("manufacturer", StringType(), values=manufacturers, - baseColumn="internal_device_id", omit=True) - .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", - baseColumnType="hash", omit=True) - .withColumn("manufacturer_info", "string", - expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", - baseColumn=['manufacturer', 'line']) - - - .withColumn("model_ser", IntegerType(), minValue=1, maxValue=11, - baseColumn="device_id", - baseColumnType="hash", omit=True) - - .withColumn("event_type", StringType(), - values=["activation", "deactivation", "plan change", - "telecoms activity", "internet activity", "device error"], - random=True, omit=True) - .withColumn("event_ts", "timestamp", - begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", - interval="1 minute", random=True, omit=True) - - .withColumn("event_info", "string", - expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", - baseColumn=['event_type', 'event_ts']) - ) + testDataSpec = ( + dg.DataGenerator(spark, name="device_data_set", rows=1000000, + partitions=8, + randomSeedMethod='hash_fieldname') + .withIdOutput() + # we'll use hash of the base field to generate the ids to + # avoid a simple incrementing sequence + .withColumn("internal_device_id", LongType(), minValue=0x1000000000000, + uniqueValues=device_population, omit=True, baseColumnType="hash") + + # note for format strings, we must use "%lx" not "%x" as the + # underlying value is a long + .withColumn("device_id", StringType(), format="0x%013x", + baseColumn="internal_device_id") + + # the device / user attributes will be the same for the same device id + # so lets use the internal device id as the base column for these attribute + .withColumn("country", StringType(), values=country_codes, + weights=country_weights, + baseColumn="internal_device_id") + + .withColumn("manufacturer", StringType(), values=manufacturers, + baseColumn="internal_device_id", omit=True) + .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", + baseColumnType="hash", omit=True) + .withColumn("manufacturer_info", "string", + expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", + baseColumn=['manufacturer', 'line']) + + + .withColumn("model_ser", IntegerType(), minValue=1, maxValue=11, + baseColumn="device_id", + baseColumnType="hash", omit=True) + + .withColumn("event_type", StringType(), + values=["activation", "deactivation", "plan change", + "telecoms activity", "internet activity", "device error"], + random=True, omit=True) + .withColumn("event_ts", "timestamp", + begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", + interval="1 minute", random=True, omit=True) + + .withColumn("event_info", "string", + expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", + baseColumn=['event_type', 'event_ts']) + ) dfTestData = testDataSpec.build() diff --git a/docs/source/multi_table_data.rst b/docs/source/multi_table_data.rst index 5eab313d..d5aa5ec2 100644 --- a/docs/source/multi_table_data.rst +++ b/docs/source/multi_table_data.rst @@ -1,7 +1,7 @@ .. Test Data Generator documentation master file, created by - sphinx-quickstart on Sun Jun 21 10:54:30 2020. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +sphinx-quickstart on Sun Jun 21 10:54:30 2020. +You can adapt this file completely to your liking, but it should at least +contain the root `toctree` directive. Generating and Using Data with Multiple Tables ============================================== @@ -73,7 +73,9 @@ Here we use a simple sequence for our plan ids. import dbldatagen as dg import pyspark.sql.functions as F - spark.catalog.clearCache() # clear cache so that if we run multiple times to check performance, we're not relying on cache + # clear cache so that if we run multiple times to check performance, + # we're not relying on cache + spark.catalog.clearCache() UNIQUE_PLANS = 20 PLAN_MIN_VALUE = 100 @@ -87,36 +89,35 @@ Here we use a simple sequence for our plan ids. spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000) - plan_dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) - .withColumn("plan_id","int", minValue=PLAN_MIN_VALUE, uniqueValues=UNIQUE_PLANS) - # use plan_id as root value - .withColumn("plan_name", prefix="plan", baseColumn="plan_id") - - # note default step is 1 so you must specify a step for small number ranges, - .withColumn("cost_per_mb", "decimal(5,3)", minValue=0.005, maxValue=0.050, - step=0.005, random=True) - .withColumn("cost_per_message", "decimal(5,3)", minValue=0.001, maxValue=0.02, - step=0.001, random=True) - .withColumn("cost_per_minute", "decimal(5,3)", minValue=0.001, maxValue=0.01, - step=0.001, random=True) - - # we're modelling long distance and international prices simplistically - - # each is a multiplier thats applied to base rate - .withColumn("ld_multiplier", "decimal(5,3)", minValue=1.5, maxValue=3, step=0.05, - random=True, distribution="normal", omit=True) - .withColumn("ld_cost_per_minute", "decimal(5,3)", - expr="cost_per_minute * ld_multiplier", - baseColumns=['cost_per_minute', 'ld_multiplier']) - .withColumn("intl_multiplier", "decimal(5,3)", minValue=2, maxValue=4, step=0.05, - random=True, distribution="normal", omit=True) - .withColumn("intl_cost_per_minute", "decimal(5,3)", - expr="cost_per_minute * intl_multiplier", - baseColumns=['cost_per_minute', 'intl_multiplier']) + plan_dataspec = ( + dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) + .withColumn("plan_id","int", minValue=PLAN_MIN_VALUE, uniqueValues=UNIQUE_PLANS) + # use plan_id as root value + .withColumn("plan_name", prefix="plan", baseColumn="plan_id") + + # note default step is 1 so you must specify a step for small number ranges, + .withColumn("cost_per_mb", "decimal(5,3)", minValue=0.005, maxValue=0.050, + step=0.005, random=True) + .withColumn("cost_per_message", "decimal(5,3)", minValue=0.001, maxValue=0.02, + step=0.001, random=True) + .withColumn("cost_per_minute", "decimal(5,3)", minValue=0.001, maxValue=0.01, + step=0.001, random=True) + + # we're modelling long distance and international prices simplistically - + # each is a multiplier thats applied to base rate + .withColumn("ld_multiplier", "decimal(5,3)", minValue=1.5, maxValue=3, step=0.05, + random=True, distribution="normal", omit=True) + .withColumn("ld_cost_per_minute", "decimal(5,3)", + expr="cost_per_minute * ld_multiplier", + baseColumns=['cost_per_minute', 'ld_multiplier']) + .withColumn("intl_multiplier", "decimal(5,3)", minValue=2, maxValue=4, step=0.05, + random=True, distribution="normal", omit=True) + .withColumn("intl_cost_per_minute", "decimal(5,3)", + expr="cost_per_minute * intl_multiplier", + baseColumns=['cost_per_minute', 'intl_multiplier']) ) - df_plans = (plan_dataspec.build() - .cache() - ) + df_plans = plan_dataspec.build().cache() display(df_plans) @@ -195,10 +196,11 @@ when using hashed values, the range of the hashes produced can be large. effective_customers = df_customers.count() - print(stripMargin(f"""revised customers : {df_customers.count()}, - | unique customers: {df_customers.select(F.countDistinct('customer_id')).take(1)[0][0]}, - | unique device ids: {df_customers.select(F.countDistinct('device_id')).take(1)[0][0]}, - | unique phone numbers: {df_customers.select(F.countDistinct('phone_number')).take(1)[0][0]}""") + print(stripMargin( + f"""revised customers : {df_customers.count()}, + | unique customers: {df_customers.select(F.countDistinct('customer_id')).take(1)[0][0]}, + | unique device ids: {df_customers.select(F.countDistinct('device_id')).take(1)[0][0]}, + | unique phone numbers: {df_customers.select(F.countDistinct('phone_number')).take(1)[0][0]}""") ) display(df_customers) @@ -247,7 +249,8 @@ A simple approach is simply to multiply the # use random seed method of 'hash_fieldname' for better spread - default in later builds events_dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested, randomSeed=42, randomSeedMethod="hash_fieldname") - # use same logic as per customers dataset to ensure matching keys - but make them random + # use same logic as per customers dataset to ensure matching keys + # but make them random .withColumn("device_id_base","decimal(10)", minValue=CUSTOMER_MIN_VALUE, uniqueValues=UNIQUE_CUSTOMERS, random=True, omit=True) @@ -260,12 +263,14 @@ A simple approach is simply to multiply the weights=[50, 50, 20, 10, 5 ], random=True) # use Gamma distribution for skew towards short calls - .withColumn("base_minutes","decimal(7,2)", minValue=1.0, maxValue=100.0, step=0.1, + .withColumn("base_minutes","decimal(7,2)", + minValue=1.0, maxValue=100.0, step=0.1, distribution=dg.distributions.Gamma(shape=1.5, scale=2.0), random=True, omit=True) # use Gamma distribution for skew towards short transfers - .withColumn("base_bytes_transferred","decimal(12)", minValue=K_1, maxValue=MB_100, + .withColumn("base_bytes_transferred","decimal(12)", + minValue=K_1, maxValue=MB_100, distribution=dg.distributions.Gamma(shape=0.75, scale=2.0), random=True, omit=True) @@ -308,8 +313,7 @@ Let's compute the customers and associated plans import pyspark.sql.functions as F import pyspark.sql.types as T - df_customer_pricing = df_customers.join(df_plans, - df_plans.plan_id == df_customers.plan) + df_customer_pricing = df_customers.join(df_plans, df_plans.plan_id == df_customers.plan) display(df_customer_pricing) @@ -365,8 +369,9 @@ now let's compute the invoices .. code-block:: python - df_customer_summary = (df_customer_pricing.join(df_summary, - df_customer_pricing.device_id == df_summary.device_id ) + df_customer_summary = ( + df_customer_pricing.join(df_summary, + df_customer_pricing.device_id == df_summary.device_id ) .createOrReplaceTempView("customer_summary")) df_invoices = spark.sql(""" diff --git a/docs/source/troubleshooting.rst b/docs/source/troubleshooting.rst index 386d35ab..83d721b3 100644 --- a/docs/source/troubleshooting.rst +++ b/docs/source/troubleshooting.rst @@ -165,50 +165,55 @@ In these cases, we use the `baseColumn` attribute to ensure the correct column b lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid'] - testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=1000000, - partitions=8, - randomSeedMethod='hash_fieldname') - # we'll use hash of the base field to generate the ids to - # avoid a simple incrementing sequence - .withColumn("internal_device_id", "long", minValue=0x1000000000000, - uniqueValues=device_population, omit=True, baseColumnType="hash") - - # note for format strings, we must use "%lx" not "%x" as the - # underlying value is a long - .withColumn("device_id", "string", format="0x%013x", - baseColumn="internal_device_id") - - # the device / user attributes will be the same for the same device id - # so lets use the internal device id as the base column for these attribute - .withColumn("country", "string", values=country_codes, - weights=country_weights, - baseColumn="internal_device_id") - - .withColumn("manufacturer", "string", values=manufacturers, - baseColumn="internal_device_id", omit=True) - - .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", - baseColumnType="hash", omit=True) - - # note use of baseColumn to control column build ordering - .withColumn("manufacturer_info", "string", - expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", - baseColumn=["line", "manufacturer"] - ) - - .withColumn("event_type", "string", - values=["activation", "deactivation", "plan change", - "telecoms activity", "internet activity", "device error"], - random=True, omit=True) - - .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", - interval="1 minute", random=True, omit=True) - - # note use of baseColumn to control column build ordering - .withColumn("event_info", "string", - expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", - baseColumn=["event_type", "event_ts"]) - ) + testDataSpec = ( + dg.DataGenerator(spark, name="device_data_set", rows=1000000, + partitions=8, + randomSeedMethod='hash_fieldname') + # we'll use hash of the base field to generate the ids to + # avoid a simple incrementing sequence + .withColumn("internal_device_id", "long", minValue=0x1000000000000, + uniqueValues=device_population, omit=True, baseColumnType="hash") + + # note for format strings, we must use "%lx" not "%x" as the + # underlying value is a long + .withColumn("device_id", "string", format="0x%013x", + baseColumn="internal_device_id") + + # the device / user attributes will be the same for the same device id + # so lets use the internal device id as the base column for these attribute + .withColumn("country", "string", values=country_codes, + weights=country_weights, + baseColumn="internal_device_id") + + .withColumn("manufacturer", "string", values=manufacturers, + baseColumn="internal_device_id", omit=True) + + .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", + baseColumnType="hash", omit=True) + + # note use of baseColumn to control column build ordering + .withColumn("manufacturer_info", "string", + expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", + baseColumn=["line", "manufacturer"] + ) + + .withColumn("event_type", "string", + values=["activation", "deactivation", "plan change", + "telecoms activity", "internet activity", "device error"], + random=True, omit=True) + + .withColumn("event_ts", "timestamp", + begin="2020-01-01 01:00:00", + end="2020-12-31 23:59:00", + interval="1 minute", + random=True, + omit=True) + + # note use of baseColumn to control column build ordering + .withColumn("event_info", "string", + expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", + baseColumn=["event_type", "event_ts"]) + ) dfTestData = testDataSpec.build() From c4fdc3bc8e77a7eeaaa2ff03797cf160675cb79b Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 9 May 2023 11:59:45 -0700 Subject: [PATCH 2/4] wip --- python/dev_require.txt | 1 - python/require.txt | 1 - 2 files changed, 2 deletions(-) diff --git a/python/dev_require.txt b/python/dev_require.txt index 8d53d810..a34ed3b2 100644 --- a/python/dev_require.txt +++ b/python/dev_require.txt @@ -31,7 +31,6 @@ pypandoc ipython==7.22.0 recommonmark sphinx-markdown-builder -rst2pdf==0.98 Jinja2 < 3.1 sphinx-copybutton diff --git a/python/require.txt b/python/require.txt index 53f80fde..5f0e30a4 100644 --- a/python/require.txt +++ b/python/require.txt @@ -30,7 +30,6 @@ pypandoc ipython==7.22.0 recommonmark sphinx-markdown-builder -rst2pdf==0.98 Jinja2 < 3.1 sphinx-copybutton From e53f8fe3d13344c64e6b3b61de8e6028af164425 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Thu, 13 Jul 2023 09:38:27 -0700 Subject: [PATCH 3/4] changes for release --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dac8deb7..b3ab0403 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## Change History All notable changes to the Databricks Labs Data Generator will be documented in this file. -### Unreleased +### Version 0.3.5 #### Changed * Added formatting of generated code as Html for script methods From 23760baa9778ed5ceb636c3c7f3b26a024bc6c81 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Mon, 28 Aug 2023 15:57:57 -0700 Subject: [PATCH 4/4] wip --- dbldatagen/data_analyzer.py | 52 +++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/dbldatagen/data_analyzer.py b/dbldatagen/data_analyzer.py index 5aec5245..310d7e48 100644 --- a/dbldatagen/data_analyzer.py +++ b/dbldatagen/data_analyzer.py @@ -5,7 +5,10 @@ """ This module defines the ``DataAnalyzer`` class. -This code is experimental and both APIs and code generated is liable to change in future versions. + .. warning:: + Experimental + This code is experimental and both APIs and code generated is liable to change in future versions. + """ from pyspark.sql.types import LongType, FloatType, IntegerType, StringType, DoubleType, BooleanType, ShortType, \ TimestampType, DateType, DecimalType, ByteType, BinaryType, StructType, ArrayType, DataType @@ -14,6 +17,7 @@ import pyspark.sql.functions as F from .utils import strip_margins +from .html_utils import HtmlUtils from .spark_singleton import SparkSingleton @@ -58,6 +62,11 @@ def __init__(self, df=None, sparkSession=None): self._sparkSession = sparkSession self._dataSummary = None + @property + def sourceDf(self): + """ Get source dataframe""" + return self._df + def _displayRow(self, row): """Display details for row""" results = [] @@ -116,21 +125,21 @@ def summarizeToDF(self): 'schema', summaryExpr=f"""to_json(named_struct('column_count', {len(dtypes)}))""", fieldExprs=[f"'{dtype[1]}' as {dtype[0]}" for dtype in dtypes], - dfData=self._df) + dfData=self.sourceDf) # count dfDataSummary = self._addMeasureToSummary( 'count', summaryExpr=f"{total_count}", fieldExprs=[f"string(count({dtype[0]})) as {dtype[0]}" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) dfDataSummary = self._addMeasureToSummary( 'null_probability', fieldExprs=[f"""string( round( ({total_count} - count({dtype[0]})) /{total_count}, 2)) as {dtype[0]}""" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) # distinct count @@ -138,20 +147,20 @@ def summarizeToDF(self): 'distinct_count', summaryExpr="count(distinct *)", fieldExprs=[f"string(count(distinct {dtype[0]})) as {dtype[0]}" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) # min dfDataSummary = self._addMeasureToSummary( 'min', fieldExprs=[f"string(min({dtype[0]})) as {dtype[0]}" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) dfDataSummary = self._addMeasureToSummary( 'max', fieldExprs=[f"string(max({dtype[0]})) as {dtype[0]}" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) descriptionDf = self._df.describe().where("summary in ('mean', 'stddev')") @@ -169,20 +178,20 @@ def summarizeToDF(self): dfDataSummary = self._addMeasureToSummary( measure, fieldExprs=[f"'{values[dtype[0]]}'" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) # string characteristics for strings and string representation of other values dfDataSummary = self._addMeasureToSummary( 'print_len_min', fieldExprs=[f"min(length(string({dtype[0]}))) as {dtype[0]}" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) dfDataSummary = self._addMeasureToSummary( 'print_len_max', fieldExprs=[f"max(length(string({dtype[0]}))) as {dtype[0]}" for dtype in dtypes], - dfData=self._df, + dfData=self.sourceDf, dfSummary=dfDataSummary) return dfDataSummary @@ -359,7 +368,7 @@ def _scriptDataGeneratorCode(cls, schema, dataSummary=None, sourceDf=None, suppr return "\n".join(stmts) @classmethod - def scriptDataGeneratorFromSchema(cls, schema, suppressOutput=False, name=None): + def scriptDataGeneratorFromSchema(cls, schema, suppressOutput=False, name=None, asHtml=False): """ Generate outline data generator code from an existing dataframe @@ -375,14 +384,20 @@ def scriptDataGeneratorFromSchema(cls, schema, suppressOutput=False, name=None): :param schema: Pyspark schema - i.e manually constructed StructType or return value from `dataframe.schema` :param suppressOutput: Suppress printing of generated code if True :param name: Optional name for data generator - :return: String containing skeleton code + :param asHtml: If True, will generate Html suitable for notebook ``displayHtml``. If true, suppresses output + :return: String containing skeleton code (in Html form if `asHtml` is True) """ - return cls._scriptDataGeneratorCode(schema, + generated_code = cls._scriptDataGeneratorCode(schema, suppressOutput=suppressOutput, name=name) - def scriptDataGeneratorFromData(self, suppressOutput=False, name=None): + if asHtml: + generated_code = HtmlUtils.formatCodeAsHtml(generated_code) + + return generated_code + + def scriptDataGeneratorFromData(self, suppressOutput=False, name=None, asHtml=False): """ Generate outline data generator code from an existing dataframe @@ -411,8 +426,13 @@ def scriptDataGeneratorFromData(self, suppressOutput=False, name=None): row_key_pairs = row.asDict() self._dataSummary[row['measure_']] = row_key_pairs - return self._scriptDataGeneratorCode(self._df.schema, + generated_code = self._scriptDataGeneratorCode(self._df.schema, suppressOutput=suppressOutput, name=name, dataSummary=self._dataSummary, - sourceDf=self._df) + sourceDf=self.sourceDf) + + if asHtml: + generated_code = HtmlUtils.formatCodeAsHtml(generated_code) + + return generated_code