From ec940d1d9144711f907c55c5a218c292071630a2 Mon Sep 17 00:00:00 2001 From: Hermann Stolte <hstolte@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:49:35 +0100 Subject: [PATCH] revision --- .../AstroCheckResultWithoutSOUNDCase.yaml | 8 +- ...martGridAnomalyCheckResultNOSOUNDCase.yaml | 7 +- reproduce/plot.py | 150 ++--- reproduce/plot_checkresult.py | 550 +++++++++++++++--- reproduce/sound/complete.sh | 7 +- reproduce/sound/figure9.sh | 28 + reproduce/va/binary_sequence_check.py | 144 +++++ .../provenance/util/ExperimentSettings.java | 5 + .../soundcheck/core/VA2CheckResult.java | 16 +- .../core/VABinarySequenceCheck.java | 18 +- .../core/sinks/WindowedDataSeriesSink.java | 61 ++ .../soundcheck/usecases/astro/CaseWithVA.java | 6 +- .../usecases/astro/CaseWithVASinks.java | 252 ++++++++ 13 files changed, 1094 insertions(+), 158 deletions(-) create mode 100755 reproduce/sound/figure9.sh create mode 100644 reproduce/va/binary_sequence_check.py create mode 100644 src/main/java/io/stolther/soundcheck/core/sinks/WindowedDataSeriesSink.java create mode 100644 src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVASinks.java diff --git a/experiments/checkresults/AstroCheckResultWithoutSOUNDCase.yaml b/experiments/checkresults/AstroCheckResultWithoutSOUNDCase.yaml index 0668647..0ecd68a 100644 --- a/experiments/checkresults/AstroCheckResultWithoutSOUNDCase.yaml +++ b/experiments/checkresults/AstroCheckResultWithoutSOUNDCase.yaml @@ -5,7 +5,7 @@ executor_script: './scripts/flink_do_run.sh' query: 'AstroOverhead' -CI: 997 +CI: 990 dimensions: schema: variant.nSamples @@ -16,16 +16,18 @@ dimensions: # - 3 # # - 0 nSamples: - - 0 + # - 0 - 200 +manual_sparsity: 0 + variants: - name: SOUND spe_command: > {flink_cmd} io.stolther.soundcheck.usecases.astro.CaseWithSparsity {query_jar} --inputFolder {input_folder} --statisticsFolder {statistics_folder} {args} {experiment_args} {extra_args} - --manual_sparsity 0 --parallelism {parallelism} --CI {CI} + --manual_sparsity {manual_sparsity} --parallelism {parallelism} --CI {CI} args: '' flink_cmd: "{repo_dir}/flink-1.14.0/bin/flink run --class" diff --git a/experiments/checkresults/SmartGridAnomalyCheckResultNOSOUNDCase.yaml b/experiments/checkresults/SmartGridAnomalyCheckResultNOSOUNDCase.yaml index 58e0a6d..bd5e7ed 100644 --- a/experiments/checkresults/SmartGridAnomalyCheckResultNOSOUNDCase.yaml +++ b/experiments/checkresults/SmartGridAnomalyCheckResultNOSOUNDCase.yaml @@ -5,15 +5,14 @@ executor_script: './scripts/flink_do_run.sh' query: 'SmartGridAnomalyCaseWithUncertainty' -manual_value_uncertainty: 1 -CI: 997 +manual_value_uncertainty: 0 +CI: 990 dimensions: schema: variant.nSamples bufferDelay: - 0 nSamples: - - 0 - 200 variants: @@ -31,7 +30,7 @@ input_folder: "{repo_dir}/data/input/sg-debs" experiment_args: "--inputFile sg-debs-1G" -parallelism: 2 +parallelism: 1 diff --git a/reproduce/plot.py b/reproduce/plot.py index 514ab0e..cf8e09e 100755 --- a/reproduce/plot.py +++ b/reproduce/plot.py @@ -96,7 +96,7 @@ PARAMETER_LABELS = { # Discard warmup and cooldown WARMUP_PERCENTAGE = 15 COOLDOWN_PERCENTAGE = 15 -VARIANT_NOSOUND = "BASELINE" +VARIANT_NOSOUND = "BASE_NOM" VARIANT_SOUND = "SOUND-w/serialization" VARIANT_SOUNDNOOP = "SOUND" VARIANT_EREBUS_WP = "EB+W" @@ -186,7 +186,7 @@ def sum_dropna(a, **kwargs): def aggregate_node_rep( - data, parameter, dimensions, t_agg=np.mean, node_agg=None, new_name=None + data, parameter, dimensions, t_agg=np.mean, node_agg=None, new_name=None ): result = ( data[data.parameter == parameter] @@ -202,7 +202,7 @@ def aggregate_node_rep( def aggregate_rep( - data, parameter, dimensions, t_agg=np.mean, node_agg=np.mean, new_name=None + data, parameter, dimensions, t_agg=np.mean, node_agg=np.mean, new_name=None ): result = ( aggregate_node_rep(data, parameter, dimensions, t_agg, new_name) @@ -268,10 +268,10 @@ def pivot_table(data, index, parameter, dimensions): def create_tables( - data, - index=["variant"], - parameters=["throughput", "latency"], - dimensions=["provenanceActivator"], + data, + index=["variant"], + parameters=["throughput", "latency"], + dimensions=["provenanceActivator"], ): for parameter in parameters: try: @@ -316,7 +316,7 @@ def time_series(data, parameters, extra_group=None): def set_axis_info( - axes, idx, title, xlabel, ylabel="", yscale="linear", title_size=14, label_size=12 + axes, idx, title, xlabel, ylabel="", yscale="linear", title_size=14, label_size=12 ): axes.flat[idx].set_title(title, fontsize=title_size, pad=8) axes.flat[idx].set_xlabel(xlabel, fontsize=label_size) @@ -336,7 +336,7 @@ def display_percentage_diffs(axes, ref_idx=None, end_idx=None): # ax.patches[4].set_hatch('///') # ax.patches[5].set_hatch('///') # ax.patches[6].set_hatch('///') - for p in ax.patches[ref_idx + 1: end_idx]: + for p in ax.patches[ref_idx + 1 : end_idx]: height = p.get_height() if not np.isfinite(height): continue @@ -530,8 +530,8 @@ def loadData(folder): ] /= 1e3 # Convert latency to seconds, memory to GB DATA.loc[ ( - DATA["parameter"].isin(["latency", "provsize", "past-buffer-size"]) - & (DATA["value"] < 0) + DATA["parameter"].isin(["latency", "provsize", "past-buffer-size"]) + & (DATA["value"] < 0) ), "value", ] = np.nan # Negative average values mean missing @@ -629,11 +629,11 @@ def percent_diffs(row, baseline="NP/none"): def aggregated_per_rep( - data, - parameters, - dimensions=["predicate"], - selectivity_dimensions=["predicate", "variant"], - selectivity_column=True, + data, + parameters, + dimensions=["predicate"], + selectivity_dimensions=["predicate", "variant"], + selectivity_column=True, ): if "predicate-selectivity" in parameters: if not "predicate-in" in parameters or not "predicate-out" in parameters: @@ -694,7 +694,7 @@ def comparison_table(data, parameters, dimensions, baseline=f"NP/none"): def answer_percentages( - data, dimensions=["predicate"], rename_operators=None, round_digits=1 + data, dimensions=["predicate"], rename_operators=None, round_digits=1 ): aggregated = aggregate_node_rep( data, "predicate-out", dimensions=dimensions + ["node-name"], t_agg=sum_dropna @@ -709,7 +709,7 @@ def answer_percentages( ) pv[ pv == 0 - ] = np.nan # Differentiate between very few and zero outputs after dividing + ] = np.nan # Differentiate between very few and zero outputs after dividing for col in pv.columns: pv[col] = 100 * pv[col] / pv[col].sum() pv = pv.round(round_digits) @@ -725,13 +725,13 @@ def answer_percentages( def auto_set_axis_info( - g, - xlabel, - title_key="col_name", - title_fontsize=AXIS_TITLE_SIZE, - label_fontsize=AXIS_LABEL_SIZE, - title_labels=PARAMETER_LABELS, - title_pad=8, + g, + xlabel, + title_key="col_name", + title_fontsize=AXIS_TITLE_SIZE, + label_fontsize=AXIS_LABEL_SIZE, + title_labels=PARAMETER_LABELS, + title_pad=8, ): g.set_titles("{" + title_key + "}") @@ -751,17 +751,17 @@ def total_selectivity(data, variant, predicate): def predicate_selectivity_plot( - data, - variants, - dimensions=[], - no_annotations=[VARIANT_NOSOUND], - export=False, - bbox=(0.75, 0), - bottom=0.05, - ncol=4, - adjusttext_args={}, - custom_annotations={}, - annotate_predicates=False, + data, + variants, + dimensions=[], + no_annotations=[VARIANT_NOSOUND], + export=False, + bbox=(0.75, 0), + bottom=0.05, + ncol=4, + adjusttext_args={}, + custom_annotations={}, + annotate_predicates=False, ): parameters = ["rate", "latency", "predicate-in", "memory", "cpu", "predicate-out"] @@ -843,9 +843,9 @@ def predicate_selectivity_plot( if not np.isfinite(x) or not np.isfinite(y): continue if ( - annotate_predicates - and parameter == "rate" - and row["variant"] == VARIANT_SOUND + annotate_predicates + and parameter == "rate" + and row["variant"] == VARIANT_SOUND ): predicate_paper_names = { "Q1": "P1", @@ -926,7 +926,7 @@ NO_ANNOTATIONS["LinearRoadAccident"] = [VARIANT_NOSOUND, VARIANT_EREBUS_WP] def predicate_parallelism_plot( - data, variants, export=False, bbox=(0.75, 0), bottom=0.35, ncol=4 + data, variants, export=False, bbox=(0.75, 0), bottom=0.35, ncol=4 ): parameters = ["rate", "latency", "predicate-in"] dimensions = ["parallelism"] @@ -994,14 +994,14 @@ def set_xlim_to_min_of_max_values(ax): def predicate_time_series_compact( - data, - predicateDelay, - bufferDelay, - predicate, - trange=(-np.inf, np.inf), - bbox=(0.97, 0.725), - bottom=0.1, - export=False, + data, + predicateDelay, + bufferDelay, + predicate, + trange=(-np.inf, np.inf), + bbox=(0.97, 0.725), + bottom=0.1, + export=False, ): parameters = [ "rate", @@ -1089,7 +1089,7 @@ def predicate_time_series_compact( ax = axes[plot_ax[parameter]] label = variant m = marker[variant] - ax.set_title(PARAMETER_LABELS[parameter])#, fontsize=AXIS_TITLE_SIZE) + ax.set_title(PARAMETER_LABELS[parameter]) # , fontsize=AXIS_TITLE_SIZE) # if variant == VARIANT_NOSOUND: # color = "C0" # elif variant == VARIANT_SOUND: @@ -1358,18 +1358,18 @@ def jmh_plot(jmh_df, bbox_x=0.0, bbox_y=0.0, bottom=0.1, export=False): def overheadbynsamples_plot( - data, - variants, - dimensions=["bufferDelay"], - export=False, - no_annotations=[VARIANT_NOSOUND], - bbox=(0.75, 0), - bottom=0.05, - ncol=4, - adjusttext_args={}, - custom_annotations={}, - xlabel="n_samples", - baseline="NS/100", + data, + variants, + dimensions=["bufferDelay"], + export=False, + no_annotations=[VARIANT_NOSOUND], + bbox=(0.75, 0), + bottom=0.05, + ncol=4, + adjusttext_args={}, + custom_annotations={}, + xlabel="n_samples", + baseline="NS/100", ): TUPLE_SIZE_BYTES = ( 176 # Experiment-specific, computed using Java Object Layout (JOL) @@ -1446,7 +1446,7 @@ def overheadbynsamples_plot( # Define a formatter function def format_func(value, tick_number): # Divide the tick value by 1000 and format it - return f'{value / 1000}' + return f"{value / 1000}" # Create a FuncFormatter and apply it to the x-axis ax.xaxis.set_major_formatter(FuncFormatter(format_func)) @@ -1456,9 +1456,9 @@ def overheadbynsamples_plot( sns.despine() - - - auto_set_axis_info(g, xlabel.replace("CI", "$c$").replace("n_samples", "$N$"), title_pad=14) + auto_set_axis_info( + g, xlabel.replace("CI", "$c$").replace("n_samples", "$N$"), title_pad=14 + ) # if xlabel == "CI": # # Assuming 'g' is your Seaborn FacetGrid object @@ -1477,16 +1477,16 @@ def overheadbynsamples_plot( def buffer_size_plot( - data, - variants, - dimensions=["bufferDelay"], - export=False, - no_annotations=[VARIANT_NOSOUND], - bbox=(0.75, 0), - bottom=0.05, - ncol=4, - adjusttext_args={}, - custom_annotations={}, + data, + variants, + dimensions=["bufferDelay"], + export=False, + no_annotations=[VARIANT_NOSOUND], + bbox=(0.75, 0), + bottom=0.05, + ncol=4, + adjusttext_args={}, + custom_annotations={}, ): TUPLE_SIZE_BYTES = ( 176 # Experiment-specific, computed using Java Object Layout (JOL) diff --git a/reproduce/plot_checkresult.py b/reproduce/plot_checkresult.py index b4a35b4..5a7fd11 100755 --- a/reproduce/plot_checkresult.py +++ b/reproduce/plot_checkresult.py @@ -14,6 +14,7 @@ from sklearn.metrics import ( classification_report, cohen_kappa_score, ) +from tqdm import tqdm from plot import save_fig, LEGEND_LABEL_SIZE, LEGEND_TITLE_SIZE from pathlib import Path @@ -21,6 +22,13 @@ import os import yaml from data_prep.astro_data import sources +from va.binary_sequence_check import ( + binary_sequence_check, + CONSTRAINTS, + DataSeries, + create_lookup_table, + OUTCOMES, +) # Plot visual config plt.style.use("ggplot") @@ -352,11 +360,6 @@ def make_VA_plot( # print(b) # print("-" * 80) - df_by_group = { - k: pd.read_csv(path).assign(group_key=k) for k, path in path_by_groupkey.items() - } - df = list(df_by_group.values())[0] - case_keys = [ # "4FGL_J0035.9+59504FGL_J0035.9+5950", "4FGL_J0035.8+61314FGL_J0035.8+6131", @@ -365,6 +368,11 @@ def make_VA_plot( if constraint_key not in case_keys: return + df_by_group = { + k: pd.read_csv(path).assign(group_key=k) for k, path in path_by_groupkey.items() + } + df = list(df_by_group.values())[0] + # select only constraint key df = df[df["key"] == constraint_key] @@ -518,74 +526,79 @@ def make_VA_plot( VA_df = sdf.loc[:idx_first_violation] - v0_diff = VA_df.iloc[-1]["valuemean_0"] / VA_df.iloc[-2]["valuemean_0"] - v1_diff = VA_df.iloc[-1]["valuemean_1"] / VA_df.iloc[-2]["valuemean_1"] - - u0_diff = VA_df.iloc[-1]["meanuncertainty_0"] / VA_df.iloc[-2]["meanuncertainty_0"] - u1_diff = VA_df.iloc[-1]["meanuncertainty_1"] / VA_df.iloc[-2]["meanuncertainty_1"] - - print(f"{v0_diff=}") - print(f"{v1_diff=}") - print(f"{u0_diff=}") - print(f"{u1_diff=}") - - # print("df_before_violation:") - # print(df_before_violation) - # print("df_violation:") - # print(df_violation) - Path.mkdir(Path(out_path).parent, exist_ok=True, parents=True) VA_df.to_csv(VA_out_path) - x_change_point = np.mean( - [ - sdf.loc[idx_before, COL_TIMESTAMP], - sdf.loc[idx_first_violation, COL_TIMESTAMP], - ] - ) + if len(VA_df) > 1: + v0_diff = VA_df.iloc[-1]["valuemean_0"] / VA_df.iloc[-2]["valuemean_0"] + v1_diff = VA_df.iloc[-1]["valuemean_1"] / VA_df.iloc[-2]["valuemean_1"] - for ax in axes.flat: - # Add vertical line at x position 2643041396 - ax.axvline(x=x_change_point, color="grey", linestyle="--") - - # Add text label at the vertical line - # axes.flat[1].text( - # x_change_point, - # 0.5, # This centers the text vertically - # "Change Point", - # rotation=90, - # verticalalignment="center", - # fontsize=label_size * 0.8, - # transform=axes.flat[1].get_xaxis_transform(), - # # horizontalalignment="left", - # # This sets the x-coordin ate in data units and the y-coordinate in axes units - # ) - # Add text label at the vertical line with some spacing - # axes.flat[1].annotate( - # "Change Point", - # xy=(x_change_point, 0.5), - # xycoords=axes.flat[1].get_xaxis_transform(), - # xytext=(3, 0), # 3 points offset from the line - # textcoords="offset points", - # rotation=90, - # verticalalignment="center", - # fontsize=label_size * 0.8, - # color="grey", - # ) + u0_diff = ( + VA_df.iloc[-1]["meanuncertainty_0"] / VA_df.iloc[-2]["meanuncertainty_0"] + ) + u1_diff = ( + VA_df.iloc[-1]["meanuncertainty_1"] / VA_df.iloc[-2]["meanuncertainty_1"] + ) - axes.flat[0].annotate( - "Change Point", - xy=(x_change_point, 1 - 0.12), - xycoords=axes.flat[0].get_xaxis_transform(), - xytext=(-4, 0), - textcoords="offset points", - rotation=0, - verticalalignment="top", - horizontalalignment="right", - fontsize=label_size, - color="grey", - ) + print(f"{v0_diff=}") + print(f"{v1_diff=}") + print(f"{u0_diff=}") + print(f"{u1_diff=}") + + # print("df_before_violation:") + # print(df_before_violation) + # print("df_violation:") + # print(df_violation) + + x_change_point = np.mean( + [ + sdf.loc[idx_before, COL_TIMESTAMP], + sdf.loc[idx_first_violation, COL_TIMESTAMP], + ] + ) + + for ax in axes.flat: + # Add vertical line at x position 2643041396 + ax.axvline(x=x_change_point, color="grey", linestyle="--") + + # Add text label at the vertical line + # axes.flat[1].text( + # x_change_point, + # 0.5, # This centers the text vertically + # "Change Point", + # rotation=90, + # verticalalignment="center", + # fontsize=label_size * 0.8, + # transform=axes.flat[1].get_xaxis_transform(), + # # horizontalalignment="left", + # # This sets the x-coordin ate in data units and the y-coordinate in axes units + # ) + # Add text label at the vertical line with some spacing + # axes.flat[1].annotate( + # "Change Point", + # xy=(x_change_point, 0.5), + # xycoords=axes.flat[1].get_xaxis_transform(), + # xytext=(3, 0), # 3 points offset from the line + # textcoords="offset points", + # rotation=90, + # verticalalignment="center", + # fontsize=label_size * 0.8, + # color="grey", + # ) + + axes.flat[0].annotate( + "Change Point", + xy=(x_change_point, 1 - 0.12), + xycoords=axes.flat[0].get_xaxis_transform(), + xytext=(-4, 0), + textcoords="offset points", + rotation=0, + verticalalignment="top", + horizontalalignment="right", + fontsize=label_size, + color="grey", + ) fig.savefig( out_path, @@ -1186,6 +1199,396 @@ def baseline_comparison(constraints): ) +def vaeval( + path, + panel_groups_param, + constraint, +): + with open(os.path.join(arguments.path, "experiment.yaml"), mode="r") as f: + experiment = yaml.safe_load(f) + + group_keys = experiment["dimensions"][panel_groups_param] + + path_by_groupkey = { + key: os.path.join( + path, + experiment["query"] + + "_" + + get_variant_name(experiment, panel_groups_param, key), + "1", + f"checkresult_{constraint}_0.csv", + ) + for key in group_keys + } + + df_by_group = { + k: pd.read_csv(path).assign(group_key=k) for k, path in path_by_groupkey.items() + } + df = list(df_by_group.values())[0] + + lookup_table = create_lookup_table( + experiment["n_samples"], experiment["CI"] / 1000, 0.5 + ) + + change_points = [] + + for key, grouped in tqdm(df.groupby("key")): + # print(constraint, key, grouped["outcome"].value_counts()) + + # extract change points in validation outcome + # that is, where outcome changes in consecutive rows from OUTCOME_VIOLATED to OUTCOME_SATISFIED or vice versa + + keyed_change_points = [] + + for i in range(1, len(grouped)): + first = grouped.iloc[i - 1] + second = grouped.iloc[i] + + if ( + first["outcome"] == second["outcome"] + or first["outcome"] == OUTCOME_INCONCLUSIVE + or second["outcome"] == OUTCOME_INCONCLUSIVE + ): + continue + + satisfied_row = first if first["outcome"] == OUTCOME_SATISFIED else second + violated_row = first if first["outcome"] == OUTCOME_VIOLATED else second + assert satisfied_row["outcome"] == OUTCOME_SATISFIED + assert violated_row["outcome"] == OUTCOME_VIOLATED + + E2 = E3 = False + E4 = E5 = False + E6 = False + + def parse_array(string): + try: + return np.fromstring(string.replace(" ", "").strip("[]"), sep=",") + except AttributeError: + return None + + satisfied = { + 0: DataSeries( + v=parse_array(satisfied_row[f"values_{0}"]), + sigma=parse_array(satisfied_row[f"uncertainties_{0}"]), + ), + 1: DataSeries( + v=parse_array(satisfied_row[f"values_{1}"]), + sigma=parse_array(satisfied_row[f"uncertainties_{1}"]), + ), + } + violated = { + 0: DataSeries( + v=parse_array(violated_row[f"values_{0}"]), + sigma=parse_array(violated_row[f"uncertainties_{0}"]), + ), + 1: DataSeries( + v=parse_array(violated_row[f"values_{1}"]), + sigma=parse_array(violated_row[f"uncertainties_{1}"]), + ), + } + + assert all(np.isfinite(violated[k].sigma).all() for k in [0, 1]) + + violated_rescaled_sigmas = { + k: ( + violated[k].sigma + * np.mean(satisfied[k].sigma) + / np.mean(violated[k].sigma) + ) + if (np.mean(violated[k].sigma) > 0) + and np.isfinite(violated[k].sigma).all() + else None + for k in [0, 1] + } + + violated_adjusted = { + k: DataSeries( + v=violated[k].v, + sigma=violated_rescaled_sigmas[k] + if violated_rescaled_sigmas[k] is not None + else violated[k].sigma, + ) + for k in [0, 1] + } + + if np.isfinite(np.mean(violated_adjusted[0].sigma)): + assert np.isclose( + np.mean(violated_adjusted[0].sigma), + np.mean(satisfied[0].sigma), + rtol=np.mean(satisfied[0].sigma), + ) + + if np.isfinite(np.mean(violated_adjusted[1].sigma)): + assert np.isclose( + np.mean(violated_adjusted[1].sigma), + np.mean(satisfied[1].sigma), + rtol=np.mean(satisfied[1].sigma), + ) + + # result = binary_sequence_check( + # CONSTRAINTS[constraint], + # satisfied0, + # satisfied1, + # experiment["n_samples"], + # lookup_table, + # ) + # + # assert result["outcome"] == OUTCOME_SATISFIED + + # resultViolated = binary_sequence_check( + # CONSTRAINTS[constraint], + # violated[0], + # violated[1], + # experiment["n_samples"], + # lookup_table, + # ) + + # re-evaluate constraint on the violated window with the adjusted sigma to see if it is satisfied, + # then an explanation is E4 + + resultAdjusted = binary_sequence_check( + CONSTRAINTS[constraint], + violated_adjusted[0], + violated_adjusted[1], + experiment["n_samples"], + lookup_table, + ) + + if resultAdjusted["outcome"] == OUTCOMES[OUTCOME_SATISFIED]: + if any( + np.mean(violated[k].sigma) > np.mean(satisfied[k].sigma) + for k in [0, 1] + ): + E4 = True + elif any( + np.mean(violated[k].sigma) < np.mean(satisfied[k].sigma) + for k in [0, 1] + ): + E5 = True + + E6 = violated_row["blockoutcome"] == OUTCOME_SATISFIED + + keyed_change_points.append( + dict( + t=violated_row.timestamp, + E1=not E4 and not E5 and not E6, + E4=E4, + E5=E5, + E6=E6, + ) + ) + + # if grouped["outcome"].iloc[i] != grouped["outcome"].iloc[i - 1]: + # print(grouped["timestamp"].iloc[i]) + + change_points.extend(keyed_change_points) + + df_changepoints = pd.DataFrame(change_points) + + df_changepoints.to_csv( + os.path.join( + path, + f"changepoints_{constraint}.csv", + ) + ) + + df_changepoints.drop(columns=["t"]).value_counts().to_csv( + os.path.join( + path, + f"changepoints_{constraint}_value_counts.csv", + ) + ) + + df_changepoints.drop(columns=["t"]).sum().to_csv( + os.path.join( + path, + f"changepoints_{constraint}_sum.csv", + ) + ) + + print( + constraint, + len(change_points), + (df_changepoints["E4"].sum() / len(change_points)) + if len(change_points) > 0 + else "N/A", + (df_changepoints["E5"].sum() / len(change_points)) + if len(change_points) > 0 + else "N/A", + (df_changepoints["E6"].sum() / len(change_points)) + if len(change_points) > 0 + else "N/A", + ) + + # n_upstream_series_A3 = + # n_upstream_series_A4 = + + +def vaeval_qualitytable(path, constraints): + dfs = [] + for constraint in constraints: + df = pd.read_csv( + os.path.join(path, f"changepoints_{constraint}_sum.csv"), index_col=[0] + ).T + df = df.assign(E2=0, E3=0) + df = df.reindex(sorted(df.columns), axis=1) + df = df.assign( + baseline_fp=lambda row: row.drop(columns=["E1"]).sum(axis=1) + / row.sum(axis=1), + ).assign(constraint=constraint) + + dfs.append(df) + df = pd.concat(dfs).reset_index(drop=True).set_index("constraint") + + df.to_latex( + os.path.join(path, "quality_table.tex"), + ) + print(df) + + +def vaeval_efficiency(path, constraints, panel_groups_param): + dfs = [] + for constraint in constraints: + df = pd.read_csv(os.path.join(path, f"changepoints_{constraint}.csv")).assign( + constraint=constraint + ) + dfs.append(df) + + df = pd.concat(dfs).reset_index(drop=True) + + # sound efficiency: t-windowed in 50 time bins, the number of E1 == True + + ## group entire time range in 50 time bins + ## count the number of E1 == True in each time bin + + # Bin the `t` column into 50 equal size time windows + df["time_bin"] = pd.cut(df["t"], bins=50) + + # Group by the time bins and count the number of `E1 == True` in each bin + bin_counts = ( + df[df["E1"] == True].groupby("time_bin").size().reset_index(name="count") + ) + + # Calculate the center of each time window + bin_counts["time_center"] = bin_counts["time_bin"].apply(lambda x: x.mid) + + sound_counts = bin_counts[["time_center", "count"]] + + # baseline efficiency: t-windowed in 50 time bins, the number checkresults + with open(os.path.join(arguments.path, "experiment.yaml"), mode="r") as f: + experiment = yaml.safe_load(f) + + group_keys = experiment["dimensions"][panel_groups_param] + + path_by_groupkey = { + key: os.path.join( + path, + experiment["query"] + + "_" + + get_variant_name(experiment, panel_groups_param, key), + "1", + f"checkresult_{constraint}_0.csv", + ) + for key in group_keys + } + + df_by_group = { + k: pd.read_csv(path).assign(group_key=k) for k, path in path_by_groupkey.items() + } + df = list(df_by_group.values())[0] + + df["time_bin"] = pd.cut(df["timestamp"], bins=50) + bin_counts = df.groupby("time_bin").size().reset_index(name="count") + bin_counts["time_center"] = bin_counts["time_bin"].apply(lambda x: x.mid) + baseline_counts = bin_counts[["time_center", "count"]] + + n_checks_per_window = ( + 2 + 3 + ) # two for the binary constraint series, and 3 for upstream series + + # plot both as lines + # Plot visual config + palette = sns.color_palette("Set2", n_colors=14, desat=0.9) + plt.rcParams["axes.prop_cycle"] = plt.cycler(color=palette) + + fig, ax = plt.subplots( + nrows=1, + ncols=1, + figsize=(6.4 / 1.5, (4.8 / 3)), + sharey=False, + sharex=True, + ) + + ax.plot( + sound_counts["time_center"], + sound_counts["count"] * n_checks_per_window, + label=r"$\mathrm{SOUND}$", + color=(144 / 255, 161 / 255, 200 / 255), + ) + + ax.plot( + baseline_counts["time_center"], + baseline_counts["count"] * n_checks_per_window, + label=r"$\mathtt{BASE\_VA}$", + color=(107 / 255, 189 / 255, 164 / 255), + linestyle="--", + ) + + # ax.bar( + # x=sound_counts["time_center"], + # height=sound_counts["count"] * n_checks_per_window, + # width=sound_counts["time_center"].diff()[1], + # label="SOUND", + # color=(144 / 255, 161 / 255, 200 / 255), + # ) + # + # ax.bar( + # x=baseline_counts["time_center"], + # height=baseline_counts["count"] * n_checks_per_window, + # width=baseline_counts["time_center"].diff()[1], + # label="NAIVE_VA", + # color=(107 / 255, 189 / 255, 164 / 255), + # # linestyle="--", + # ) + ax.set_xlabel("Event Time (JD)") + ax.set_title(r"# Evaluations of $\phi^2_{\mathrm{change}}$") + ax.tick_params(axis="both", which="major", labelsize=11) + ax.set_xticks([]) + ax.spines[["right", "top"]].set_visible(False) + ax.set_yscale("log") + ax.set_ylim( + 0.8 * ax.get_ylim()[0], + 1.6 * ax.get_ylim()[1], + ) + fig.tight_layout() + ax.legend( + fontsize=LEGEND_LABEL_SIZE, + frameon=True, + columnspacing=1, + loc="center left", + bbox_to_anchor=(1.01, 0.5), + bbox_transform=ax.transAxes, + handles=reversed(ax.get_legend_handles_labels()[0]), + labels=reversed(ax.get_legend_handles_labels()[1]), + ) + + fig.savefig( + os.path.join(path, "efficiency.pdf"), + pad_inches=0.1, + bbox_inches="tight", + ) + + print( + "SOUND", + np.sum(sound_counts["count"] * n_checks_per_window), + "BASE_VA", + np.sum(baseline_counts["count"] * n_checks_per_window), + "ratio", + np.sum(sound_counts["count"] * n_checks_per_window) + / (np.sum(baseline_counts["count"] * n_checks_per_window)), + ) + + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( @@ -1339,6 +1742,19 @@ if __name__ == "__main__": ) exit(0) + elif arguments.plot == "VAEval": + constraints = ["A-3", "A-4"] + # for constraint in constraints: + # vaeval( + # arguments.path, + # panel_groups_param="manual_sparsity", + # constraint=constraint, + # ) + # vaeval_qualitytable(arguments.path, constraints) + vaeval_efficiency( + arguments.path, constraints, panel_groups_param="manual_sparsity" + ) + checkresult_file_paths = list(find_check_result_files(arguments.path)) with open(os.path.join(arguments.path, "experiment.yaml"), mode="r") as f: diff --git a/reproduce/sound/complete.sh b/reproduce/sound/complete.sh index f2d97fd..d6bf5ae 100755 --- a/reproduce/sound/complete.sh +++ b/reproduce/sound/complete.sh @@ -22,7 +22,10 @@ # Figure 8 ./reproduce/sound/figure8.sh 1 4 -# Section 5.3 (Effectiveness / Baseline) +# Table V ./reproduce/sound/figure_checkresult_comparison_NOSOUND_case.sh 1 8 -./reproduce/sound/smartgrid_NOSOUND_case.sh 1 8 \ No newline at end of file +./reproduce/sound/smartgrid_NOSOUND_case.sh 1 8 + +# Table VI + Figure 9 +./reproduce/sound/figure10.sh 1 1.5 diff --git a/reproduce/sound/figure9.sh b/reproduce/sound/figure9.sh new file mode 100755 index 0000000..43adffa --- /dev/null +++ b/reproduce/sound/figure9.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +function usage() { + echo "Usage: $0 #reps #duration_min" + exit 1 +} + +[[ -z $1 ]] && usage +[[ -z $2 ]] && usage +#[[ -z $3 ]] && usage + +REPS="$1" +DURATION="$2" +#KAFKA_HOST="$3" +DATE_CODE=$(date +%j_%H%M) +COMMIT_CODE=$(git rev-parse --short HEAD) +EXPERIMENT_FOLDER="${DATE_CODE}_${COMMIT_CODE}_$(hostname)" + +echo "Starting experiment. Results will be stored in data/output/$EXPERIMENT_FOLDER" +sleep 2 + +./scripts/run.py ./experiments/checkresults/AstroCheckResultVACase.yaml -d "$DURATION" -r "$REPS" -c "$DATE_CODE" + + +cmd="./reproduce/plot_checkresult.py --path \"data/output/$EXPERIMENT_FOLDER\" --plot VAEval" + +echo "To re-run only the plot, execute: $cmd" +bash -c "$cmd" diff --git a/reproduce/va/binary_sequence_check.py b/reproduce/va/binary_sequence_check.py new file mode 100644 index 0000000..e890659 --- /dev/null +++ b/reproduce/va/binary_sequence_check.py @@ -0,0 +1,144 @@ +from dataclasses import dataclass + +import numpy as np + +from scipy.stats import beta + +OUTCOME_INCONCLUSIVE = 0 +OUTCOME_SATISFIED = 1 +OUTCOME_VIOLATED = 2 + +OUTCOMES = { + 0: "INCONCLUSIVE", + 1: "SATISFIED", + 2: "VIOLATED", +} + + +class DataSeries: + v: np.ndarray + sigma: np.ndarray + + def __init__(self, v: np.ndarray, sigma: np.ndarray): + self.v = v + self.sigma = sigma + + if sigma is None: + self.sigma = np.zeros_like(v) + + +def binary_sequence_check( + constraint, + s1: DataSeries, + s2: DataSeries, + n_samples, + decision_lookup_table, + value_uncertainty_n_sigma=2.0, +): + n_satisfied = 0 + n_violated = 0 + n_samples_seen = 1 + outcome = 0 + + n_observations = len(s1.v) + block_length = int(np.floor(np.sqrt(n_observations))) + num_blocks = int(np.floor(n_observations / block_length)) + + bootstrap_sample1 = np.zeros(n_observations) + bootstrap_sample2 = np.zeros(n_observations) + + for _ in range(n_samples): + sample_index = 0 + + for _ in range(num_blocks): + random_block_start = np.random.randint(0, n_observations - block_length + 1) + + for i in range(block_length): + current_index = random_block_start + i + + bootstrap_sample1[sample_index] = sample_value( + s1, current_index, value_uncertainty_n_sigma + ) + bootstrap_sample2[sample_index] = sample_value( + s2, current_index, value_uncertainty_n_sigma + ) + + sample_index += 1 + + result = constraint(bootstrap_sample1, bootstrap_sample2) + n_satisfied += 1 if result else 0 + n_violated = n_samples_seen - n_satisfied + outcome = decision_lookup_table[1 + n_satisfied][1 + n_violated] + if outcome > 0: + break + + n_samples_seen += 1 + + return { + "naive_outcome": "SATISFIED" if constraint(s1.v, s2.v) else "VIOLATED", + "outcome": OUTCOMES[outcome], + "alpha": 1 + n_violated, + "beta": 1 + n_satisfied, + "n_samples_seen": n_samples_seen, + } + + +def sample_value(series: DataSeries, index, value_uncertainty_n_sigma): + if series.sigma is not None: + return np.random.normal( + series.v[index], series.sigma[index] * value_uncertainty_n_sigma + ) + return series.v[index] + + +def A3(x: np.ndarray, y: np.ndarray) -> bool: + if x.size < 2 or y.size < 2: + return True + + if not np.isfinite(x).all() or not np.isfinite(y).all(): + print(x, y) + with np.errstate(invalid="ignore"): + corr = np.corrcoef(x, y)[0, 1] + if np.isnan(corr): + return True + else: + return corr > 0.2 + + +def A4(x: np.ndarray, y: np.ndarray) -> bool: + def average_delta(array: np.ndarray) -> float: + if len(array) < 2: + raise ValueError("Array should have at least two elements.") + sum_delta = np.sum(np.abs(np.diff(array))) + return sum_delta / (len(array) - 1) + + avg_delta_x = average_delta(x) + avg_delta_y = average_delta(y) + return avg_delta_x < avg_delta_y + + +def create_lookup_table(max_sample_size, credibility_level, decision_threshold): + max_sample_size += 1 + + lookup_table = np.zeros((max_sample_size + 1, max_sample_size + 1), dtype=int) + + for alpha_post in range(1, max_sample_size + 1): + for beta_post in range(1, max_sample_size - alpha_post + 2): + beta_dist = beta(alpha_post, beta_post) + lower_bound = beta_dist.ppf((1 - credibility_level) / 2) + upper_bound = beta_dist.ppf(1 - (1 - credibility_level) / 2) + + if lower_bound > decision_threshold: + lookup_table[alpha_post][beta_post] = OUTCOME_SATISFIED + elif upper_bound < decision_threshold: + lookup_table[alpha_post][beta_post] = OUTCOME_VIOLATED + else: + lookup_table[alpha_post][beta_post] = OUTCOME_INCONCLUSIVE + + return lookup_table + + +CONSTRAINTS = { + "A-3": A3, + "A-4": A4, +} diff --git a/src/main/java/io/palyvos/provenance/util/ExperimentSettings.java b/src/main/java/io/palyvos/provenance/util/ExperimentSettings.java index 0e8a985..7937246 100644 --- a/src/main/java/io/palyvos/provenance/util/ExperimentSettings.java +++ b/src/main/java/io/palyvos/provenance/util/ExperimentSettings.java @@ -42,6 +42,7 @@ public class ExperimentSettings implements Serializable { public static final String LATENCY_FILE = "latency"; public static final String CHECKRESULT_FILE = "checkresult"; public static final String WINDOWDATA_FILE = "windowdata"; + public static final String WINDOWSERIES_FILE = "windowseries"; public static final String THROUGHPUT_FILE = "rate"; public static final String KAFKA_THROUGHPUT_FILE = "kafka-rate"; public static final String SINK_THROUGHPUT_FILE = "sink-rate"; @@ -355,6 +356,10 @@ public class ExperimentSettings implements Serializable { return statisticsFile(operator, taskIndex, statisticsFolder(), String.valueOf(id) + "_" + WINDOWDATA_FILE, EXTENSION_CSV); } + public String windowSeriesFile(int taskIndex, String operator, String id) { + return statisticsFile(operator, taskIndex, statisticsFolder(), WINDOWSERIES_FILE + "_" + id, EXTENSION_CSV); + } + public String latencyFile(int taskIndex, String operator) { return statisticsFile(operator, taskIndex, statisticsFolder(), LATENCY_FILE, EXTENSION_CSV); } diff --git a/src/main/java/io/stolther/soundcheck/core/VA2CheckResult.java b/src/main/java/io/stolther/soundcheck/core/VA2CheckResult.java index 5e1efec..44d6734 100644 --- a/src/main/java/io/stolther/soundcheck/core/VA2CheckResult.java +++ b/src/main/java/io/stolther/soundcheck/core/VA2CheckResult.java @@ -1,10 +1,13 @@ package io.stolther.soundcheck.core; +import java.util.Arrays; + public class VA2CheckResult { public long timestamp; Outcome outcome; + Outcome blockoutcome; int alpha; int beta; int n_samples_tot; // 0 if no sampling was required to determine outcome @@ -12,6 +15,10 @@ public class VA2CheckResult { public final double[] values_0; public final double[] values_1; + + public final double[] uncertainties_0; + public final double[] uncertainties_1; + public long nPoints_0; public long nPoints_1; @@ -23,8 +30,9 @@ public class VA2CheckResult { public String key; - public VA2CheckResult(long timestamp, Outcome outcome, int alpha, int beta, int n_samples_tot, String key, long nPoints_0, double meanUncertainty_0, double dataMean_0, long nPoints_1, double meanUncertainty_1, double dataMean_1, double[] values_0, double[] values_1) { + public VA2CheckResult(long timestamp, Outcome outcome, Outcome blockoutcome, int alpha, int beta, int n_samples_tot, String key, long nPoints_0, double meanUncertainty_0, double dataMean_0, long nPoints_1, double meanUncertainty_1, double dataMean_1, double[] values_0, double[] values_1, double[] uncertainties_0, double[] uncertainties_1) { this.outcome = outcome; + this.blockoutcome = blockoutcome; this.alpha = alpha; this.beta = beta; this.n_samples_tot = n_samples_tot; @@ -38,10 +46,12 @@ public class VA2CheckResult { this.dataMean_1 = dataMean_1; this.values_0 = values_0; this.values_1 = values_1; + this.uncertainties_0 = uncertainties_0; + this.uncertainties_1 = uncertainties_1; } public static String get_csv_header() { - return "timestamp,outcome,alpha,beta,n_samples_tot,key,n_points_0,meanuncertainty_0,valuemean_0,n_points_1,meanuncertainty_1,valuemean_1"; + return "timestamp,outcome,blockoutcome,alpha,beta,n_samples_tot,key,n_points_0,meanuncertainty_0,valuemean_0,n_points_1,meanuncertainty_1,valuemean_1,values_0,values_1,uncertainties_0,uncertainties_1"; } @Override @@ -63,7 +73,7 @@ public class VA2CheckResult { } public String to_csv() { - return timestamp + "," + outcome.getValue() + "," + alpha + "," + beta + "," + n_samples_tot + "," + key + "," + nPoints_0 + "," + meanUncertainty_0 + "," + dataMean_0 + "," + nPoints_1 + "," + meanUncertainty_1 + "," + dataMean_1; + return timestamp + "," + outcome.getValue() + "," + blockoutcome.getValue() + "," + alpha + "," + beta + "," + n_samples_tot + "," + key + "," + nPoints_0 + "," + meanUncertainty_0 + "," + dataMean_0 + "," + nPoints_1 + "," + meanUncertainty_1 + "," + dataMean_1 + ",\"" + Arrays.toString(values_0) + "\",\"" + Arrays.toString(values_1) + "\",\"" + Arrays.toString(uncertainties_0) + "\",\"" + Arrays.toString(uncertainties_1) + "\""; } public double[] get_values(int id) { diff --git a/src/main/java/io/stolther/soundcheck/core/VABinarySequenceCheck.java b/src/main/java/io/stolther/soundcheck/core/VABinarySequenceCheck.java index 144d958..8a85c53 100644 --- a/src/main/java/io/stolther/soundcheck/core/VABinarySequenceCheck.java +++ b/src/main/java/io/stolther/soundcheck/core/VABinarySequenceCheck.java @@ -13,6 +13,9 @@ public abstract class VABinarySequenceCheck extends Check implements MapFunction DataSeries s1 = dataSeriesDataSeriesTuple2.f0; DataSeries s2 = dataSeriesDataSeriesTuple2.f1; + Outcome blockOutcome = Outcome.SATISFIED; + + int n_satisfied = 0; int n_violated = 0; int n_samples_seen = 1; @@ -22,6 +25,9 @@ public abstract class VABinarySequenceCheck extends Check implements MapFunction int block_length = (int) Math.floor(Math.sqrt(n_observations)); int numBlocks = (int) Math.floor((double) n_observations / block_length); + double[] blockSample1 = new double[block_length]; + double[] blockSample2 = new double[block_length]; + for (; n_samples_seen < N_SAMPLES + 1; n_samples_seen++) { double[] bootstrapSample1 = new double[n_observations]; @@ -37,8 +43,15 @@ public abstract class VABinarySequenceCheck extends Check implements MapFunction bootstrapSample1[sampleIndex] = sampleValue(s1, currentIndex); bootstrapSample2[sampleIndex] = sampleValue(s2, currentIndex); + blockSample1[i] = bootstrapSample1[sampleIndex]; + blockSample2[i] = bootstrapSample2[sampleIndex]; + sampleIndex++; } + + if (blockOutcome == Outcome.SATISFIED && !constraint(blockSample1, blockSample2)) { + blockOutcome = Outcome.VIOLATED; + } } boolean result = constraint(bootstrapSample1, bootstrapSample2); @@ -53,6 +66,7 @@ public abstract class VABinarySequenceCheck extends Check implements MapFunction VA2CheckResult result = new VA2CheckResult( Math.max(s1.t, s2.t), Outcome.fromValue(outcome), + blockOutcome, n_violated, n_satisfied, n_samples_seen, @@ -64,7 +78,9 @@ public abstract class VABinarySequenceCheck extends Check implements MapFunction (s2.sigma != null) ? MathUtils.mean(s2.sigma) : 0.0, MathUtils.mean(s2.v), s1.v, - s2.v + s2.v, + s1.sigma, + s2.sigma ); return result; diff --git a/src/main/java/io/stolther/soundcheck/core/sinks/WindowedDataSeriesSink.java b/src/main/java/io/stolther/soundcheck/core/sinks/WindowedDataSeriesSink.java new file mode 100644 index 0000000..011fe88 --- /dev/null +++ b/src/main/java/io/stolther/soundcheck/core/sinks/WindowedDataSeriesSink.java @@ -0,0 +1,61 @@ +package io.stolther.soundcheck.core.sinks; + +import io.palyvos.provenance.util.ExperimentSettings; +import io.stolther.soundcheck.core.DataSeries; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; + +public class WindowedDataSeriesSink extends RichSinkFunction<DataSeries> { + private final ExperimentSettings settings; + private transient PrintWriter pw_v; + private transient PrintWriter pw_t; + private transient PrintWriter pw_s; + private final String name; + + + public WindowedDataSeriesSink(String name, ExperimentSettings settings) { + this.settings = settings; + this.name = name; + } + + @Override + public void open(Configuration parameters) throws Exception { + final int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + try { + pw_v = new PrintWriter(new FileWriter(settings.windowSeriesFile(taskIndex, name, "v")), settings.autoFlush()); + pw_t = new PrintWriter(new FileWriter(settings.windowSeriesFile(taskIndex, name, "t")), settings.autoFlush()); + pw_s = new PrintWriter(new FileWriter(settings.windowSeriesFile(taskIndex, name, "s")), settings.autoFlush()); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + + super.open(parameters); + + } + + @Override + public void invoke(DataSeries ds, Context context) { + pw_v.println(Arrays.toString(ds.v) + .replace("[", "") + .replace("]", "") + .trim()); + + pw_t.println(Arrays.toString(new double[]{ds.t}) + .replace("[", "") + .replace("]", "") + .trim()); + + pw_s.println(Arrays.toString(ds.sigma) + .replace("[", "") + .replace("]", "") + .trim()); + + } + + +} diff --git a/src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVA.java b/src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVA.java index 195f539..e47201f 100644 --- a/src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVA.java +++ b/src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVA.java @@ -184,9 +184,9 @@ public class CaseWithVA { }; A4.init(settings); twoSeriesStream.map(A4).addSink(new VA2CheckResultSink("A-4", settings)); - twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 0, settings)); - twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 1, settings)); - twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 2, settings)); +// twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 0, settings)); +// twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 1, settings)); +// twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 2, settings)); UnarySetCheck APP_2 = new UnarySetCheck() { diff --git a/src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVASinks.java b/src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVASinks.java new file mode 100644 index 0000000..d34fdd4 --- /dev/null +++ b/src/main/java/io/stolther/soundcheck/usecases/astro/CaseWithVASinks.java @@ -0,0 +1,252 @@ +package io.stolther.soundcheck.usecases.astro; + +import io.palyvos.provenance.util.ExperimentSettings; +import io.palyvos.provenance.util.FlinkSerializerActivator; +import io.palyvos.provenance.util.LatencyLoggingSink; +import io.stolther.soundcheck.core.*; +import io.stolther.soundcheck.core.sinks.WindowedDataSeriesSink; +import org.apache.commons.math3.linear.ArrayRealVector; +import org.apache.commons.math3.linear.RealVector; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; + +public class CaseWithVASinks { + + public static SinkFunction<CheckResult> get_check_sink(String name, ExperimentSettings settings) { + return new NoOpCheckResultSink(); + } + + public static void main(String[] args) throws Exception { + + ExperimentSettings settings = ExperimentSettings.newInstance(args); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + FlinkSerializerActivator.NOPROVENANCE.activate(env, settings) + .register(AstroWindowTuple.class, new AstroWindowTuple.KryoSerializer()) + .register(AstroDataPoint.class, new AstroDataPoint.KryoSerializer()) + .register(DataSeries.class, new DataSeries.KryoSerializer()); + + final SingleOutputStreamOperator<AstroDataPoint> sourceStream = + env.addSource(new AstroDataPointFileSource(settings)) + .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); + + + DataStream<AstroDataPoint> filtered = sourceStream.filter(new FilterFunction<AstroDataPoint>() { + @Override + public boolean filter(AstroDataPoint astroDataPoint) throws Exception { + return astroDataPoint.is_upper_lim == 0 && astroDataPoint.relative_error < 5.0; + } + }); + + + DataStream<Tuple2<AstroDataPoint, Double>> smoothed = filtered + .keyBy(x -> x.sourceName) + .flatMap(new Query.SmoothingFunction()); + + DataStream<Tuple3<DataSeries, DataSeries, Long>> comparisonSteam = smoothed + .keyBy(value -> value.f0.sourceName) + .countWindow(10) + .apply(new WindowFunction<Tuple2<AstroDataPoint, Double>, Tuple3<DataSeries, DataSeries, Long>, String, GlobalWindow>() { + @Override + public void apply(String s, GlobalWindow globalWindow, Iterable<Tuple2<AstroDataPoint, Double>> iterable, Collector<Tuple3<DataSeries, DataSeries, Long>> collector) throws Exception { + ArrayList<Double> sList = new ArrayList<>(); + ArrayList<Double> sigmaList = new ArrayList<>(); + ArrayList<Double> oList = new ArrayList<>(); + + long maxStimulus = 0; + + long t = 0; + + + for (Tuple2<AstroDataPoint, Double> point : iterable) { + sList.add(point.f1); + oList.add(point.f0.flux); + sigmaList.add(point.f0.sigma); + maxStimulus = Math.max(maxStimulus, point.f0.stimulus); + + t = Math.max(t, point.f0.time); + } + + + DataSeries smoothed = new DataSeries(t, sList.stream().mapToDouble(Double::doubleValue).toArray(), null, s); + DataSeries original = new DataSeries(t, oList.stream().mapToDouble(Double::doubleValue).toArray(), sigmaList.stream().mapToDouble(Double::doubleValue).toArray(), s); + + collector.collect(Tuple3.of(original, smoothed, maxStimulus)); + } + }); + + + DataStream<Tuple2<DataSeries, Long>> resultStream = comparisonSteam + .keyBy(value -> value.f0.key) + .map(new MapFunction<Tuple3<DataSeries, DataSeries, Long>, Tuple2<DataSeries, Long>>() { + + + @Override + public Tuple2<DataSeries, Long> map(Tuple3<DataSeries, DataSeries, Long> t) throws Exception { + RealVector vector1 = new ArrayRealVector(t.f0.v); + RealVector vector2 = new ArrayRealVector(t.f1.v); + + RealVector resultVector = vector1.subtract(vector2); + double[] resultArray = resultVector.toArray(); + + DataSeries result = new DataSeries(t.f0.t, resultArray, null, t.f0.key); + return Tuple2.of(result, t.f2); + } + }); + + + resultStream.addSink(LatencyLoggingSink.newInstance("SINK", t -> t.f1, settings)) + .setParallelism(settings.parallelism()); + + // Additional sanity checks + + UnaryPointBasedCheck APP_1 = new UnaryPointBasedCheck() { + @Override + protected boolean constraint(double x) { + return x < 1 && 0 < x; + } + }; + APP_1.init(settings); + filtered + .map((MapFunction<AstroDataPoint, DataPoint>) t -> new DataPoint(t.time, t.flux, t.sigma)) + .map(APP_1) + .addSink(get_check_sink("A-1", settings)) + .setParallelism(settings.parallelism()); + + + VABinarySequenceCheck A3 = new VABinarySequenceCheck() { + @Override + public boolean constraint(double[] x, double[] y) { + if (x.length < 2 || y.length < 2) { + return true; + } + double corr = MathUtils.pearsonCorrelation(x, y); + if (Double.isNaN(corr)) { + return true; + } else { + return corr > 0.2; + } + } + }; + A3.init(settings); + + DataStream<Tuple2<DataSeries, DataSeries>> twoSeriesStream = comparisonSteam +// .filter(new FilterFunction<Tuple3<DataSeries, DataSeries, Long>>() { +// @Override +// public boolean filter(Tuple3<DataSeries, DataSeries, Long> t) throws Exception { +// return t.f0.v.length > 3; +// } +// }) + .map(new MapFunction<Tuple3<DataSeries, DataSeries, Long>, Tuple2<DataSeries, DataSeries>>() { + @Override + public Tuple2<DataSeries, DataSeries> map(Tuple3<DataSeries, DataSeries, Long> t) throws Exception { + return Tuple2.of(t.f0.shorten(settings.manual_sparsity), t.f1.shorten(settings.manual_sparsity)); + } + }); + + twoSeriesStream.map(A3).addSink(new VA2CheckResultSink("A-3", settings)); + + // Write out windowed data series of A-3 + + twoSeriesStream.map(new MapFunction<Tuple2<DataSeries, DataSeries>, DataSeries>() { + @Override + public DataSeries map(Tuple2<DataSeries, DataSeries> t) throws Exception { + return t.f0; + } + }).addSink(new WindowedDataSeriesSink("A3-S1", settings)); + + twoSeriesStream.map(new MapFunction<Tuple2<DataSeries, DataSeries>, DataSeries>() { + @Override + public DataSeries map(Tuple2<DataSeries, DataSeries> t) throws Exception { + return t.f1; + } + }).addSink(new WindowedDataSeriesSink("A3-S2", settings)); + + + VABinarySequenceCheck A4 = new VABinarySequenceCheck() { + @Override + public boolean constraint(double[] x, double[] y) { + double avgDeltaX = averageDelta(x); + double avgDeltaY = averageDelta(y); + return avgDeltaX < avgDeltaY; + } + + public double averageDelta(double[] array) { + if (array.length < 2) { + throw new IllegalArgumentException("Array should have at least two elements."); + } + double sumDelta = 0.0; + for (int i = 1; i < array.length; i++) { + sumDelta += Math.abs(array[i] - array[i - 1]); + } + return sumDelta / (array.length - 1); + } + + }; + A4.init(settings); + twoSeriesStream.map(A4).addSink(new VA2CheckResultSink("A-4", settings)); +// twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 0, settings)); +// twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 1, settings)); +// twoSeriesStream.map(A4).addSink(new WindowValuesSink("A-4", 2, settings)); + + + UnarySetCheck APP_2 = new UnarySetCheck() { + @Override + public boolean constraint(double[] x) { + return MathUtils.std(x) > 0; + } + }; + APP_2.init(settings); + + DataStream<DataSeries> a2Stream = filtered + .keyBy(new KeySelector<AstroDataPoint, String>() { + @Override + public String getKey(AstroDataPoint astroDataPoint) throws Exception { + return astroDataPoint.sourceName; + } + }) + .countWindow(3) + .process(new ProcessWindowFunction<AstroDataPoint, DataSeries, String, GlobalWindow>() { + @Override + public void process(String key, Context context, Iterable<AstroDataPoint> events, Collector<DataSeries> out) { + int size = 3; + double[] values = new double[size]; + double[] sigmas = new double[size]; + int i = 0; + + for (AstroDataPoint event : events) { + values[i] = event.flux; + sigmas[i] = event.sigma; + i++; + } + + long windowEnd = context.window().maxTimestamp(); + out.collect(new DataSeries(windowEnd, values, sigmas, key)); + } + }); + + a2Stream.map(APP_2).addSink(get_check_sink("A-2", settings)) + .setParallelism(settings.parallelism()); + + + env.execute(QuerySoundNoOpSink.class.getSimpleName()); + + } + +} -- GitLab