my $DATA = q(\Program Files\Attunity\Replicate\data); # hack! my $MAX_TABLES = 20; my $LINES = 2*10**6; my $PREPARE_LIMIT = 60; my $CT_SUFFIX = q(__ct); my $usage = qq(Usage:\n\tperl $0 reptask_xxx.log [ > redirected_output_file ]\n\t-h for help\n); my $help = << "EOF"; apply_summary.pl Hein van den Heuvel, (ex-Attunity, ex-Qlik)02-02-2022 $usage This script can be used to summarize the Target Apply activity for an Attunity Replicate CDC task log. The Attunity Replicate task log must be provided as the input file and the LOGGING LEVEL for TARGET_APPLY must have been set to TRACE (was DEBUG). These are the main lines the script looks for: [TARGET_APPLY ]D: Going to run insert statement, from seq 1 to seq 1 # INSERT INTO [dbo].[authservice] ([createid],[updateid]... Target applies are done in Batches, which are pre-loaded into a helper table on the table with a name like attrep_changes92857F920498243B. Each row in that table has a SEQ number and we apply in ranges for that SEQ number. Seeing 'from seq 1' implies we finished one batch, and we report on that, and we start a next batch. In 2021 (v7) the MERGE statement appeared without 'from seq X to Y' but joining on table_id instead. in contrast, all other SQL statements (INSERT, UPDATE DELETE) dissappeared at that time. Detailed output when 'verbose' is requested colums are constructed as: logtime_hhmmss, thread, duration, gap, verb, from, to, count, table Note: That gap is the time in seconds between the current line and when the prior line started and thus refers to the prior command, not the current. Arguments: Input file specification. Mandatory Options: -d Data directory ( used to access dynamic_metadata to look up table number ) Defaults to $DATA -b Report summary detail for each batch -c CSV output for individual -o Report details for One-By-One data -v Verbose Output with each individual apply (includes -b and -o) -t Per Table totals, including table for one-by-one apply statistics if those occured. -x X eXtra high volume greater or equal than X - start and stop reporting. -A After [[[yyyy-]mm-]ddT]hh[:mm[:ss]] Defaults to First Date in log hh:00:00 -B Before [[[yyyy-]mm-]ddT]hh[:mm[:ss]] Defaults to First Date in log, or After-day, hh:00:00 -h Display this Help text -a Display creation of attrep_changes table -T N Limit tables to Top N -s STORE Changes Change Table suffix. Defaults to $CT_SUFFIX -p Prepare Limit = Start Handling Table time. Defaults to $PREPARE_LIMIT EOF use DBI; use strict; use warnings; use Time::Local; # Program argument and help and usage handling. # use Getopt::Std; my %opt; if (!getopts ('abmc:dhHvotA:B:x:T:s:p:',\%opt)) { print STDERR $usage; exit; }; if ($opt{h} || $opt{H}) { print STDERR $help; exit; }; my $show_attrep_changes = $opt{a}; my $verbose = $opt{v}; my $log = shift or die "Must have a REPTASK LOG file, with TRACE level APPLY_TARGET, to work one"; my $batch_report = $opt{b}; my $csv = $opt{c}; my $one_by_one = $opt{o}; my $data = $opt{d}; my $metadata = $opt{m}; my $report_per_table = $opt{t}; my $after_time = $opt{A}; my $before_time = $opt{B}; my $max_tables = $opt{T}; my $ct_suffix = $opt{s}; my $prepare_limit = $opt{p}; my $x = $opt{x}; # extra_high_volume $max_tables = $MAX_TABLES unless $max_tables; $ct_suffix = $CT_SUFFIX unless $ct_suffix; $DATA = $data if $data; $prepare_limit = $PREPARE_LIMIT unless $prepare_limit; my ($i, $from, $to, $to_last, $verb, $verbs, $verbs_in_batch, $table, @data, %per_table, %one_table, %per_tableX, $rows); my ($one_by_one_time_max, $one_by_one_switches, $one_by_one_events, $one_by_one_time_total, $one_by_one_active, $one_by_one_size); my ($failed_to_execute, $failed_to_execute_total, $affected,$csv_header, $batch_finish, $sequential); my ($gap, $gap_batch_max, $gap_batch_max_text, $gap_max, $gap_max_text, $batches, $max_batch, $one_by_one_max_text); my ($logtime, $logtime_one_by_one, $logtime_last, $logtime_last_hhmmss, $logtime_batch, $logtime_first, $batch_time, $batch_time_max, $logtime_hhmmss, $logtime_first_hhmmss); my ($changes, $changes_batch, $changes_batch_max, $changes_max, $changes_total, $apply, $singles,$closing_log); my ($no_bulk, $no_bulk_total, %no_bulk, $no_pk, $no_pk_total, %no_pk,$table_name,%table_names,%table_numbers,%pk_change); my ($bulk_finished_line, $v7_table, $v7_verb); my ($bulk_applied_time, $bulk_prepare, $bulk_prepare_total) = (0, 0); my ($master_thread, $thread, $thread_number, $reattach); my (%merge_started, %apply_start, %apply_start_hhmmss, %apply_end, %thread_names, %thread_counts, %logtime_last, %logtime_last_hhmmss); my ($send_file_total_count, $send_file_total_time, $send_file_count, $send_file_time, $send_file_logtime) = 0; my ($truncate_start, $truncate_table, $truncate_time, $truncate_time_total, $truncate_total,$sqlserver_truncate); my ($attrep_changes_inserts, $attrep_changes_start, $attrep_changes_time,$statement_not_found,$noPK_time,$noPK_start_time,$noPK_total_time) = (0,0,0,0,0,0,0); my ($start_handling, $start_handling_logtime,$start_handling_seconds, $start_handling_count, $start_handling_table, %start_handling_table) = (0,0,0,0); my ($source_start, $source_end, $source_min, $source_max, $target_start, $target_end, $target_min, $target_max,$current_latency) = (undef,0,999999,0, undef,0,999999,0); my ($first_line_hhmmss, $longest_sql_length, $longest_sql_text, $longest_sql) = ('??',0,''); $to = $to_last = $changes_batch = $changes_max = $changes_total = $gap_max = $batch_time_max = $gap_batch_max = $closing_log = 0; $i = $singles = $no_bulk = $no_bulk_total = $no_pk = $no_pk_total = $affected = $csv_header = $sequential = $bulk_finished_line = 0; $one_by_one_time_max = $one_by_one_switches = $one_by_one_events = $one_by_one_time_total = $failed_to_execute = $failed_to_execute_total = $verbs = 0; $table = '?'; $batch_finish = ' OK'; $gap_batch_max_text = " "; $truncate_start = $truncate_table = $truncate_time = $truncate_time_total = $truncate_total = $changes_batch_max = $reattach = 0; $gap_max_text = "gap=N.A."; $table_names{0}=q(); $table_numbers{TOTAL}=0; open (LOG, "<$log") or die "Failed to open file: $log\n$!"; #00109048: 2015-02-11T22:28:19 [SERVER ]I: Task Server Log - TSK_ATT06_NZP_PLANDATA_FL_PRD (V4.0.0.138 DC01ATTEDWPV06.molina.mhc #Microsoft Windows 64-bit, PID: 141436) started at Wed Feb 11 22:28:19 2015 (logger.c:448) $_ = ; my $n = $log; $n =~ s/^reptask_//; $n =~ s/__\d+//; $n =~ s/\.log$//; my $task_name = (/Task Server Log -\s+(\S+)/)? $1 : $n; my ($year,$mon,$day); if (/ (\d+)-(\d\d)-(\d\d)T(\S+)/) { $first_line_hhmmss = qq($2/$3 $4); ($year,$mon,$day) = ($1,$2,$3); } else { die "Bad Hair Day : $_"; } if ($after_time) { $after_time = $day . q(T) . $after_time unless $after_time =~ /\d\dT/; $after_time = $mon . q(-) . $after_time unless $after_time =~ /\d\d-\d\dT/; $after_time = $year . q(-) . $after_time unless $after_time =~ /\d\d\d\d-\d\d-\d\dT/; $after_time =~ m/(\d+)-(\d\d)-(\d\d)/; ($year,$mon,$day) = ($1,$2,$3); } if ($before_time) { $before_time = $day . q(T) . $before_time unless $before_time =~ /\d\dT/; $before_time = $mon . q(-) . $before_time unless $before_time =~ /\d\d-\d\dT/; $before_time = $year . q(-) . $before_time unless $before_time =~ /\d\d\d\d-\d\d-\d\dT/; } #print "$year/$mon/$day - $after_time - $before_time\n"; exit 0; open (CSV, ">$csv") or die "Failed to create CSV file: $csv\n$!" if $csv; while ($log) { while () { print STDERR sprintf(qq(%02d:%02d:%02d), (localtime)[2,1,0]) . " $. lines...\n" unless $. % $LINES; if (/^(\w+):.*?(\d+)-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)/) { # verbose logging adds microseconds - removed check for space. $thread_number = $1; $thread = $thread_names{$thread_number}; $logtime_hhmmss = "$3/$4 $5:$6:$7"; $logtime = timelocal($7,$6,$5,$4,$3-1,$2); my $t = "$2-$3-$4T$5:$6:$7"; next if $after_time and ($t lt $after_time); last if $before_time and ($t ge $before_time); } # 6.3 changed layout - no change though: [PERFORMANCE ]T: Source latency 1.73 seconds, Target latency 17234.73 seconds, Handling latency 17233.00 seconds if (/PERFORMANCE.*?latency ([0-9.]+)\s.*\s([0-9.]+)/) { # [PERFORMANCE ]T: Source latency 0.00 seconds, Target latency 0.00 seconds (replicationtask.c:2380) $current_latency = $1 + $2; $source_start = $1 if ($1 > 0) && !defined($source_start); # look for first non-zero as tasks initially report zero which is not the real minimum. $target_start = $2 if !defined($target_start); # Target/Handlign latency can easily be 0. $source_end = $1; $target_end = $2; $source_min = $1 if ($source_start) && ($1 < $source_min); $target_min = $2 if ($target_start) && ($2 < $target_min); $source_max = $1 if $1 > $source_max; $target_max = $2 if $2 > $target_max; next; } #00006972: 2016-01-07T14:27:25 [TARGET_APPLY ]D: Going to run truncate for changes table (oracle_endpoint_bulk.c:1136) #00006972: 2016-01-07T14:27:25 [TARGET_APPLY ]D: Truncate statement TRUNCATE TABLE "ATTUNITY_REPL"."attrep_changesE6D8B077_0000001" (oracle_endpoint_load.c:672) #00006972: 2016-01-07T14:29:41 [TARGET_APPLY ]D: After truncating changes table (oracle_endpoint_bulk.c:1145) #Truncate statement TRUNCATE TABLE [attrep_changesA83ACD2086C324A8] (sqlserver_endpoint_imp.c:3025) #[TARGET_APPLY ]T: Start loading to changes table if (/After truncating ch/ || $sqlserver_truncate || /loading to changes/ ) { # For SQLserver, just take the next line, some Oracle also. if ($truncate_start) { $truncate_total++; $truncate_time = $logtime - $truncate_start; $truncate_time_total += $truncate_time; printf ("%s%5d truncated %s\n", $logtime_hhmmss, $truncate_time, $truncate_table) if $truncate_time > 2; $truncate_time = $truncate_start = 0; } $sqlserver_truncate = 0; } if ($attrep_changes_start) { $attrep_changes_time += ($logtime - $attrep_changes_start); #print STDERR "== $logtime" if $i++ < 50; $attrep_changes_start = 0; } if (/INSERT INTO .attrep_changes/) { $attrep_changes_start = $logtime; $attrep_changes_inserts++; #print STDERR "-- $attrep_changes_time -- $attrep_changes_inserts\n" if $i++ < 50; } # 00004555: 2018-12-04T13:54:06 [TARGET_APPLY ]T: Truncate statement TRUNCATE TABLE "ATTUNITY"."attrep_changes5C126B94_0000118" (oracle_endpoint_load.c:806) if (/TRUNCATE.*(attrep_changes\w+)/) { $truncate_start = $logtime; $truncate_table = $1; $sqlserver_truncate = 1 if /sqlserver/; # Oracle endpoint gives explicit AFTER line, SQLserver does not. } if (/After truncating ch/) { if ($truncate_start) { $truncate_total++; $truncate_time = $logtime - $truncate_start; $truncate_time_total += $truncate_time; printf ("%s%5d truncated %s\n", $logtime_hhmmss, $truncate_time, $truncate_table) if $truncate_time > 2; $truncate_time = $truncate_start = 0; } } # 00225784: 2015-01-22T15:17:54 [TARGET_APPLY ]D: Create table statement: CREATE TABLE "EDW"."attrep_changes92857F920498243B" ( "seq" INTEGER NOT NULL, "col1" VARCHAR(20000), "col2" VARCHAR(20000), "col3" VARCHAR(8192), "col4" VARCHAR(128), "col5" VARCHAR(120), "col6" VARCHAR(120), "col7" VARCHAR(60), "col8" VARCHAR(60), "col9" VARCHAR(60), "col10" VARCHAR(60), "col11" VARCHAR(37), "col12" VARCHAR(37), "col13" VARCHAR(37), "col14" VARCHAR(37), "col15" VARCHAR(37), "col16" VARCHAR(37), "col17" VARCHAR(37), "col18" VARCHAR(37), "col19" VARCHAR(37), "col20" VARCHAR(37), "col21" VARCHAR(37), "col22" VARCHAR(37), "col23" VARCHAR(37), "col24" VARCHAR(37), "col25" VARCHAR(37), "col26" VARCHAR(37), "col27" VARCHAR(37), "col28" VARCHAR(37), "col29" VARCHAR(37), "col30" VARCHAR(37), "col31" VARCHAR(37), "col32" VARCHAR(37), "col33" VARCHAR(37), "col34" VARCHAR(37), "col35" VARCHAR(30), "col36" VARCHAR(30), "col37" VARCHAR(30), "col38" VARCHAR(30), "col39" VARCHAR(30), "col40" VARCHAR(25), "col41" VARCHAR(21), "col42 if ($show_attrep_changes) { if (/^\S+\s+(\S+).*Create table.*?(attrep_changes\S+).\s.*(col\d+)/) { print "$1 create table $2 - $3\n"; next; } } # Apply no Bulk Event - 'UPDATE (3)' for table 714 (bulk_apply.c:1747) if (/Apply no Bulk.*table\s+(\d+)/) { $no_bulk++; $no_bulk_total++; $no_bulk{$1}++; next; } if (/no PK for table (\d+)/) { $no_pk++; $no_pk_total++; $no_pk{$1}++; next; } $closing_log = 1 if /Closing log/; # [METADATA_MANAGER]W: Target table 'EDW.CAPMESSAGEQUEUE' does not have a primary key or a unique index. if (/Target table\s+(\S+)/) { $table_name = $1; next; } # 00012980: 2015-08-20T18:22:53 [TARGET_APPLY ]D: Execute: 'select id from sysobjects o where schema_name(o.uid) = N'dbo' and o.name= N'benegroup' and o.type=N'U'' (ar_odbc_stmt.c:2069) if ($metadata && /Execute.*sysobjects.*name= N.(\w+)/) { print ("$logtime_hhmmss - get metadata for : $1\n"); } # Move Inserts with no PK for table 742 to events with no PK list. if (/for table\s+(\d+)/ && $table_name) { # # Many customers have tablename transformation, the above is the SOURCE name. # Target may well be different. Let's convert to lower and hope for the best. # $table_names{$1} = lc($table_name); $table_numbers{lc($table_name)} = $1; $table_name = ''; next; } $task_name = $1 if /TASK_MANAGER.*Task '([^']+)/; # # Statements beyond this point expect logtime and logtime_hhmmss. # if (/was reattached/) { %thread_names = () ; $reattach++ } # Clear names, kep counts. next unless /TARGET_APPLY/; # next if /After get rows affected, rows affected/; $batch_finish = q( OK) if /First bulk event/; if (/Start applying.*\.'(.*)'\s+\((\d+)/) { # Start applying of 'DELETE (5)' events for table 'dbo'.'T_APPRADDONFEE' (28). # # Many customers have tablename transformation, the above is the SOURCE name. # Target may well be different. Let's convert to lover and hope for the best. # if ( $bulk_applied_time ) { $bulk_prepare = $logtime - $bulk_applied_time; $bulk_prepare_total += $bulk_prepare; $bulk_applied_time = 0; } if ( !$table_names{$2} ) { $table_names{$2} = lc($1); $table_numbers{lc($1)} = $2; # $table_names{$2} = $1; # $table_numbers{$1} = $2; } } # # We may have an 'Execute line' as part of the start # Replciate V4 - 00002144: 2017-08-09T22:44:12 [TARGET_APPLY ]D: Execute: 'UPDATE "staging"."employercontribution" # Replicate V5+ - [TARGET_APPLY ]V: Execute immediate # Replicate V6 - [TARGET_APPLY ]V: Construct statement execute internal: 'UPDATE next if /]V:.*execute/i; # # INSERT statement for table 'attrep_changesFAB8606F7E18D889' was not found in the pool # if (/not found in the pool/) { $statement_not_found++; next; # Still potentially in start handling } # # Seeing a new Target_apply must mean any prior start_handling is done. Count it. # if ($start_handling) { my $elapsed = $logtime - $start_handling_logtime; printf (qq(%s%5d handle table %d\n), $logtime_hhmmss,$elapsed,$start_handling_table) if $elapsed > $prepare_limit; $start_handling_table{$start_handling_table} += $elapsed; # by number if ($elapsed > 1) { $start_handling_count++; $start_handling_seconds += $elapsed; } $start_handling = $start_handling_table = 0; } if (/Start handling table (\d+)/) { #table 28 which $start_handling_table = $1; $start_handling_logtime = $logtime; $start_handling = 1; } # 00006328: 2021-11-08T14:37:00 [TARGET_APPLY ]T: Finished applying of 1 'UNKNOWN (0)' events for table 'GLOGOWNER'.'SHIPMENT_INVOLVED_PARTY' (8). (bulk_operations_applier.c:374) # For now we believe UNKNOWN is used for MERGE if (/Finished applying of (\d+) 'UNKNOWN/ && $merge_started{$thread}) { my $xrows = $1; my($xlogtime, $xlogtime_hhmmss, $xtable) = split /,/, $merge_started{$thread}; $apply_start{$thread} = $xlogtime; $thread_counts{$thread}++; $apply_start_hhmmss{$thread} = ($csv) ? qq($logtime_hhmmss,$thread,%%d,%%d,MERGE,X,X,$xrows,$xtable\n) : sprintf( qq(%s %s%%5d %%5d %s %6s %6s %5d %s\n), $xlogtime_hhmmss, $thread, 'merge ', 'X', 'X', $xrows, $xtable); delete $merge_started{$thread}; } if (/The current bulk was applied/) { $bulk_applied_time = $logtime; for my $thread (sort {$apply_start{$a} <=> $apply_start{$b}}keys %apply_start) { # print "handle - $thread - $apply_start{$thread}\n"; # - $apply_start_hhmmss{$thread}"; next unless $apply_start{$thread}; &handle_duration($thread); } } else { &handle_duration($thread) if $thread && $apply_start{$thread}; # also handles MERGE } $failed_to_execute=0 if /Applying \S+ one-by-one/; $failed_to_execute++ if /Failed to execute statement/; $affected = $1 if /rows affected (\d+)/; # 00130208: 2015-02-16T16:31:05 [TARGET_APPLY ]I: Applying UPDATES one-by-one for table 'dbo'.'authservice' # 00000324: 2016-06-16T16:33:57 [TARGET_APPLY ]I: Applying INSERTS one-by-one for table 'dbo'.'T_ACCOUNT_STATUS_ACTIVITY' (20) (bulk_apply.c:4106) # 00130208: 2015-02-16T16:31:19 [TARGET_APPLY ]I: Switch back to bulk apply mode (bulk_apply.c:3907) #00000324: 2016-06-16T16:33:50 [TARGET_APPLY ]I: Bulk resend requested. Switch to one-by-one applying mode (bulk_apply.c:1843) #0130760: 2015-02-17T15:06:44 [TARGET_APPLY ]I: Applying UPDATES one-by-one for table 'dbo'.'claim' (160) (bulk_apply.c:3846) if (/(Applying\s+\S+\s+one-by-one for table\s+)(\S+)/i) { $one_by_one_size = (defined($to) && defined($from) && ($to > $from)) ? $to - $from + 1 : 0 ; $one_by_one_active = sprintf ("%s for %s events", $1.$2, defined($one_by_one_size )? $one_by_one_size : '?'); $table = $2; $one_table{"$table TOTAL"} += $one_by_one_size; $logtime_one_by_one = $logtime; # $logtime_last{thread} = $logtime unless $logtime_last{$thread}; # $logtime_first = $logtime unless $logtime_first; # $logtime_first_hhmmss = $logtime_hhmmss unless $logtime_first_hhmmss; next; } # 00025184: 2019-01-15T04:09:12 [TARGET_APPLY ]T: Got non-Insert change for table 164 that has no PK. Going to move the Inserts in the current if (/non-Insert change for table (\d+)/) { #print "== $_"; for (keys %table_names){ print qq($_ $table_names{$_}\n)}; last; $table = $table_names{$1}; $table = qq(Table_$1) unless $table; $per_table{"$table DURATION"} += 0; $per_table{"$table TOTAL"} += 1; $per_table{"$table noPK"} += 1; $per_table{" DURATION"} += 0; $per_table{" TOTAL"} += 1; $per_table{" noPK"} += 1; $logtime_first = $logtime unless $logtime_first; # Just in case. $logtime_first_hhmmss = $logtime_hhmmss unless $logtime_first_hhmmss; $logtime_last = $logtime; # Just in case. } #00025184: 2019-01-15T04:09:12 [TARGET_APPLY ]T: BEGIN sent for tables with no bookmark #00025184: 2019-01-15T04:30:34 [TARGET_APPLY ]T: COMMIT sent. Working in auto commit mode, split tables with no bookmark transaction $noPK_start_time = $logtime if /BEGIN.*no book/; if (/COMMIT.*no book/) { if ($noPK_start_time) { $noPK_time = $logtime - $noPK_start_time; } } if (/back to bulk/) { if ($logtime_one_by_one) { my $one_by_one_time = $logtime - $logtime_one_by_one; if ($one_by_one_time > $one_by_one_time_max) { $one_by_one_time_max = $one_by_one_time; $one_by_one_max_text = " table $table @ $logtime_hhmmss"; } $one_by_one_time_total += $one_by_one_time; $failed_to_execute_total += $failed_to_execute; $one_table{"$table DURATION"} += $one_by_one_time; $one_table{"$table FAIL"} += $failed_to_execute; $one_by_one_switches++; $one_by_one_events += $one_by_one_size if defined($one_by_one_size ); $one_by_one_active = ''; $affected = 0; printf( qq(%s%5d one-by-one mode - %3d Failed to execute out of %3s for %s\n), $logtime_hhmmss, $one_by_one_time, $failed_to_execute, defined($one_by_one_size)? $one_by_one_size : '?', $table) if $verbose or $one_by_one; $failed_to_execute = 0; $to = $from = $one_by_one_size = undef; $table = '?'; } else { print qq($logtime_hhmmss Back to bulk, but start not seen.\n); } next; } # Early versions of this script triggered the end of one batch when the beginning # of the next batch was seen as "from seq 1 ". Look for "Bulk finished" instead. # Still... that really means finished preparing for the next bulk, starting to apply, but script uses as end. #1-First #INSERT (1) on table 311 has the same PK as a PK that was previously updated in the same bulk #Update on table 311 changes PK to a PK that was previously updated in the same bulk #Update on table 311 changes PK to a previously deleted one in the same bulk #Finished applying bulk changes for tables with PK #00004476: 2015-07-22T14:58:30 [TARGET_APPLY ]D: Bulk finished because memory usage has exceeded the limit (bulk_apply.c:1710) #00004476: 2015-07-22T14:55:50 [TARGET_APPLY ]D: Finish bulk because stream timeout (bulk_apply.c:660) #00014656: 2015-08-25T00:32:36 [TARGET_APPLY ]D: Finish Bulk because Bulk timeout (bulk_apply.c:615) #00004476: 2015-07-23T15:52:11 [TARGET_APPLY ]D: Finish Bulk after resume at the same point as the last not confirmed #2-Followed by #00004476: 2015-07-22T14:55:50 [TARGET_APPLY ]D: Bulk finished. Total sequencial events #0 (bulk_apply.c:1715) if (/table (\d+) .*same bulk/) { $no_bulk = $1; # Overload for reporting!! Sorry. $batch_finish = q(PKi) if /INSERT/; $batch_finish = q(PKu) if /Update.*updated/i; $batch_finish = q(PKd) if /deleted/; $pk_change{qq($no_bulk $batch_finish)}++; $pk_change{qq($no_bulk TOTAL)}++; $pk_change{qq(0 $batch_finish)}++; } #0025184: 2019-01-15T04:09:12 [TARGET_APPLY ]T: Bulk finished. Total sequential events #458 (bulk_apply.c:2167) if (/inish/) { # FINISHED? $batch_finish = q(MEM) if /because memory/; $batch_finish = q(TIM) if /stream timeout/; $batch_finish = q(TMO) if /Bulk timeout/; $batch_finish = q(RES) if /resume at the same/; # $batch_finish = q(SNG) if /tables with PK/; #SiNGle <<--- this is finished applying, not finished making a bulk. $sequential = $1 if /sequen[ct]ial events #(\d+)/; } if (/Bulk finished\./ && $. > $bulk_finished_line) { # (/Finished applying bulk changes/) { $bulk_finished_line = $. + 1; # Catch adjacent finished messages. $batches++; $to = $from = undef; $master_thread = $thread_number unless $master_thread; # When a bulk is finished, all threads must have finished, but there is not always an explicit message for that. # --- The current bulk was applied. # Now we have finalize each active thread, using the end times of the worst of them. # Unfortunately this reports excessive duration for all but the last on to finish. for my $thread (sort {$apply_start{$a} <=> $apply_start{$b}}keys %apply_start) { next unless $apply_start{$thread}; print "finish - $thread - $apply_start{$thread}\n"; # - $apply_start_hhmmss{$thread}"; &handle_duration($thread); } if ($logtime_batch) { $batch_time = $logtime - $logtime_batch; $batch_time_max = $batch_time if $batch_time > $batch_time_max; my $extra = q(); $extra .= sprintf( qq(, latency=%d), $current_latency) if $current_latency; $extra .= qq(, seq=$sequential) if $sequential; $extra .= qq(, nobulk=$no_bulk) if $no_bulk; $extra .= qq(, nopk=$noPK_time) if $noPK_time; $extra .= qq(, $send_file_count CSV in $send_file_time) if ($send_file_count); printf("%s $batch_finish Batch %2d. %4d seconds, %5d Changes, %2d applies, $bulk_prepare prep time, ${extra}$gap_batch_max_text\n", $logtime_hhmmss, $batches, $batch_time, $changes_batch, $verbs_in_batch, ) if $verbose or $batch_report; $current_latency = undef; } $logtime_batch = $logtime; $noPK_total_time += $noPK_time; $gap_batch_max = $changes_batch = $to = $from = $to_last = $verbs_in_batch = $no_bulk = $no_pk = $noPK_time = $send_file_count = $send_file_time = $bulk_prepare = 0; $batch_finish = q(!!!) } # 00007904: 2019-02-21T07:35:57 [TARGET_APPLY ]T: Going to send file C:\Program Files\Attunity\Replicate\data\tasks\SCWMS_WINGS_ATT_PRD_HV\cloud\bulk\CDC00000009.csv to S3, target file scwms-wings-attunity-raw/Snowflake_Prd_QA/SCWMS_WINGS_ATT_PRD_HV/0/CDC00000009.csv (cloud_bulk.c:693) # 00007904: 2019-02-21T07:36:01 [TARGET_APPLY ]T: File CDC00000009.csv was copied successfully to table STAGE.attrep_changes9AC89EEA2F16FA94 from external stage (cloud_imp.c:4689) if (/Going to send file/) { $send_file_logtime = $logtime; } if (/was copied successfully/) { if ($send_file_logtime) { my $duration = $logtime - $send_file_logtime; $send_file_count++; $send_file_total_count++; $send_file_total_time += $duration; $send_file_time += $duration; } else { print STDERR qq(-- $logtime_hhmmss Found CSV file copied without prior send @ line $.\n); } } # Replicate V7 no longer has SQL statement. Instead look for: # # Start applying of 'INSERT (1)' events for table 'ATT_USER'.'PJ_PROJECT' (1). # Start applying of 'DELETE (5)' events for table 'ATT_USER'.'PJ_PROJECT' (1). # Start applying of 'UPDATE (3)' events for table 'ATT_USER'.'PJ_PROJECT' (1). # Bulk UPDATE statement for table 'HEIN.PJ_PROJECT' was not found in the pool, # Going to run update statement, from seq 2 to seq 2 # # 00006328: 2021-11-08T14:36:58 [TARGET_APPLY ]T: Start applying of 'UNKNOWN (0)' events for table 'GLOGOWNER'.'SHIPMENT_INVOLVED_PARTY' (8). (bulk_operations_applier.c:342) if (/Start applying of '(\w+) .*'\.'([^']+)/) { $v7_verb = $1; $v7_table = $2; } #00006328: 2021-11-08T14:37:51 [TARGET_APPLY ]T: Merge table statement: MERGE INTO "GLOGOWNER"."SHIPMENT_STOP" T if (substr($_, 75, 75) =~ /MERGE INTO.*?\.['"]([^'"]+)/) { $table = $1; if (!$thread && /TARGET_APPLY/) { $thread = chr( 0x41 + scalar keys %thread_names); # threads named A..Z $thread_names{$thread_number}=$thread; } $merge_started{$thread} = qq($logtime,$logtime_hhmmss,$table); if (length > $longest_sql_length) { $longest_sql_length = length; $longest_sql_text = "Line $. @ $logtime_hhmmss"; $longest_sql = substr($_, 75); } } next unless / (\w+) statement, from seq (\d+) to seq (\d+)/; $to_last = $to; $verb = $1; $from = $2; $to = $3; $rows = $to - $from + 1; if (!$thread && /TARGET_APPLY/) { $thread = chr( 0x41 + scalar keys %thread_names); # threads named A..Z $thread_names{$thread_number}=$thread; # $thread_counts{$thread} = 0; } $apply_start{$thread} = $logtime; $thread_counts{$thread}++; $changes_batch_max = $changes_batch if $changes_batch > $changes_batch_max; $changes = $to - $from + 1; $changes_total += $changes; $changes_batch += $changes; $changes_batch_max = $changes_batch if $changes_batch > $changes_batch_max; $changes_max = $changes if $changes > $changes_max; $singles++ if $from == $to; $verbs++; $verbs_in_batch++; # if (/CREATE TABLE.*?attrep_changes.*(col\d+)/) { # print STDERR "$logtime_hhmmss create table attrep_changes - $1\n"; # } # Netezza: 0108664: 2015-01-27T00:43:03 [TARGET_APPLY ]D: Going to run delete statement, from seq 1 to seq 250 #DELETE FROM "EDW"."CLAIM" WHERE ( "CLAIMID" ) IN ( SELECT CAST( "EDW"."attrep_changes92857F920498243B"."seg1" as VARCHAR(15)) FROM #"EDW"."attrep_changes92857F920498243B" WHERE "EDW"."attrep_changes92857F920498243B"."seq" >= ? and if ($v7_table) { # Replicate V7 or later? $table = $v7_table; } else { # Older Replicate version have a file SQL statement. my $curpos = tell(LOG); $_ = ; # read the actual statement. if (length > $longest_sql_length) { $longest_sql_length = length; $longest_sql_text = "Line $. @ $logtime_hhmmss"; $longest_sql = $_; } if (/^[-0-9]+:\s/) { # did NOT read a statement, but a regular line staring with a thread ID $table = "N.A."; seek(LOG, $curpos, 0); # Reset record location to read again main loop } else { # Oracle - INSERT /*+ PARALLEL("QUORUM_STG_OWNER"."I$CLSFC_CATEGORY_ITEM__ct") */ INTO "QUORUM_STG_OWNER"."I$CLSFC_CATEGORY_ITEM__ct" # s/\/\*.*\*\///; if (/^MERGE INTO "[^"]+"\."([^"]+)/) { # EXASOL $verb = uc($verb); # from 'from seq' line. $table = $1; } elsif (/(\w+).*?\.`([^`]+)`/) { # mySQL - INSERT INTO `rep_suppliesonthefly`.`aspnet_membership` (`Email`,`LoweredEmail` $verb = $1; $table = $2; } elsif (/(\w+).*?\[(\w+)\]\s/){ # SQLserver $verb = $1; $table = $2; } elsif (/([A-Za-z\$_0-9]+).*?"\."([A-Za-z\$_0-9]+)"[\s|\)]/){ # Netezza --- Used to use \w+ until we had an Oracle with an $ in the name # print "-- $1 $2 -- $_"; $verb = $1; $table = $2; } # Oracle - INSERT /*+ PARALLEL("QUORUM_STG_OWNER"."I$CLSFC_CATEGORY_ITEM__ct") */ INTO "QUORUM_STG_OWNER"."I$CLSFC_CATEGORY_ITEM__ct" # Applying UPDATES one-by-one for table 'dbo'.'claimdetail' # } } # Replicate V7 or later ? $apply_start_hhmmss{$thread} = ($csv) ? qq($logtime_hhmmss,$thread,%%d,%%d,$verb,$from,$to,$rows,$table\n) : sprintf( qq(%s %s%%5d %%5d %s %6d %6d %5d %s\n), $logtime_hhmmss, $thread, $verb, $from, $to, $rows, $table); $logtime_last = $logtime; if ($from ==1 ) { if (! $to_last) { # pick up first and last time in case this is the only apply in the batch if (!$logtime_first) { $logtime_first = $logtime; $logtime_batch = $logtime; $logtime_first_hhmmss = $logtime_hhmmss; } if (! $logtime_last{$thread}) { # First time is special $logtime_last{$thread} = $logtime; $logtime_last_hhmmss{$thread} = $logtime_hhmmss; } next } next if $batch_finish eq q(!!!); # No batch start seen. $gap_batch_max = 0; $batches++; if ($logtime_batch) { $batch_time = $logtime - $logtime_batch; $batch_time_max = $batch_time if $batch_time > $batch_time_max; printf("-HEIN- fix this line! %s Batch %2d. %4d seconds, %5d Changes, %2d applies. Prep $bulk_prepare, $no_bulk no bulk, $no_pk no pk, $gap_batch_max_text\n", $logtime_hhmmss, $batches, $batch_time, $to_last, $verbs_in_batch ) if $verbose or $batch_report; } $logtime_batch = $logtime; $bulk_prepare = 0; $verbs_in_batch = 0; $no_bulk = 0; $no_pk = 0; $from = 0; } # print "$logtime_hhmmss $from $to $verb $table\n" ; #if $i++ < 10; # Debug if ($logtime_last{$thread}) { # First time is special $gap = $logtime - $logtime_last{$thread}; $logtime_last{$thread} = $logtime; $logtime_last_hhmmss{$thread} = $logtime_hhmmss; if ($gap > $gap_max) { $gap_max = $gap; $gap_max_text = ", gap $gap @ $logtime_hhmmss"; } if ($gap > $gap_batch_max) { $gap_batch_max = $gap; $gap_batch_max_text = ", gap $gap @ $logtime_hhmmss"; } } else { $logtime_batch = $logtime; $logtime_last{$thread} = $logtime; $logtime_first = $logtime; $logtime_last_hhmmss = $logtime_hhmmss; $logtime_first_hhmmss = $logtime_hhmmss; } } $log = shift; if ($log) { print STDERR "Next log: $log\n"; close LOG; open (LOG, "<$log") or die "Failed to open file: $log\n$!"; my $n = $log; $n =~ s/^reptask_//; $n =~ s/__\d+//; $n =~ s/\.log$//; #00109048: 2015-02-11T22:28:19 [SERVER ]I: Task Server Log - TSK_ATT06_NZP_PLANDATA_FL_PRD (V4.0.0.138 DC01ATTEDWPV06.molina.mhc #Microsoft Windows 64-bit, PID: 141436) started at Wed Feb 11 22:28:19 2015 (logger.c:448) $_ = ; my $new_task_name = (/Task Server Log -\s+(\S+)/)? $1 : $n ; print STDERR "Old task name = $task_name, New task name = $new_task_name\n" unless $task_name eq $new_task_name; } } $batches++; $_ = ($one_by_one_size && $affected && ($affected < $one_by_one_size)) ? $one_by_one_size - $affected : '?'; print "-- $one_by_one_active $failed_to_execute Failed so far. $_ expected.\n" if $one_by_one_active; if ($logtime_batch) { $batch_time = $logtime_last - $logtime_batch; # can't take the last record time. $no_pk_total = $per_table{" noPK"} unless $no_pk_total; # No batch seen? $no_pk_total = 0 unless $no_pk_total; my $duration = $logtime - $logtime_first; my $changes_per_second = ($duration) ? sprintf ( "%.1f", $changes_total/$duration) : 'NAN'; print "Last batch # $batches. $batch_time seconds, $to Changes. ", ($closing_log) ? 'Log Closed' : 'Could still be active', "($.)\n\n"; print "Task <$task_name> from $logtime_first_hhmmss thru $logtime_hhmmss. ($duration seconds)\n"; print "Total $batches Batches in log, largest $changes_batch_max, longest duration $batch_time_max, Total preparation $bulk_prepare_total\n"; print "$changes_per_second changes/second overall. $changes_total total$gap_max_text. $statement_not_found Statement not found.\n"; print "no_bulk:$no_bulk_total, no pk:$no_pk_total, $verbs applies, $singles singles, max $changes_max. noPK-time=$noPK_total_time\n\n"; printf(qq(%d CSV Files Send, taking %d seconds total. %.1f seconds/file, %.1f%% of total time.\n), $send_file_total_count, $send_file_total_time, $send_file_total_time/$send_file_total_count, ($duration)? 100*$send_file_total_time/$duration : 0) if $send_file_total_count; $thread = keys %thread_names; if ($thread > 1) { my @thread_activity; for (sort {$thread_names{$a} cmp $thread_names{$b}} keys %thread_names ) { my $thread = $thread_names{$_}; push @thread_activity, qq($thread=$_:$thread_counts{$thread}); } print "$thread Apply threads were active. $reattach reattaches. Master Thread=$master_thread.\nApply Counts: " . join (q(, ), @thread_activity) . "\n"; } if ($one_by_one_events) { print "$one_by_one_time_total Seconds for $one_by_one_events One-By-One applies recording $failed_to_execute_total errors\n"; print "$one_by_one_switches One-By-One switches. Worst case $one_by_one_time_max seconds for$one_by_one_max_text\n"; } if ($truncate_total) { print "$truncate_total truncates for attrep_changes tables taking $truncate_time_total seconds\n"; } if ($start_handling_seconds > 10) { print "\"Start Handling Table\" took $start_handling_count times more than 1 second. Total $start_handling_seconds seconds\n"; } if ($attrep_changes_time > 10) { printf "\"INSERT INTO [attrep_changes\" executed %d times and took %d seconds, average %5.1f\n", $attrep_changes_inserts, $attrep_changes_time, $attrep_changes_time / $attrep_changes_inserts; } my ($attrep_changes_inserts, $attrep_changes_start, $attrep_changes_time); if ($longest_sql_length > 20000) { my $columns = $1 if $longest_sql =~ /.*col(\d+)/; $columns = 'Unknown' unless $columns; print qq(Longest SQL statement was $longest_sql_length bytes, $columns columns $longest_sql_text\n); } } else { print STDERR "Not enough, or no, TARGET_APPLY data in log for batch summary.\n"; $first_line_hhmmss = "No-Start-time" unless $first_line_hhmmss; $logtime_hhmmss = "No Target Apply line" unless $logtime_hhmmss; print STDERR "$. lines read. $first_line_hhmmss - $logtime_hhmmss\n"; } $source_start = 0 unless $source_start; $target_start = 0 unless $target_start; if ($source_max || $target_max ) { printf qq(\n LATENCY| Begin| End| Min| Max| Net|\n); printf qq( -------|-------|-------|-------|-------|-------|\n); printf qq( %-7s|%7d|%7d|%7d|%7d|%7d|\n), 'Source', $source_start, $source_end, $source_min, $source_max, $source_end - $source_start; printf qq( %-7s|%7d|%7d|%7d|%7d|%7d|\n), 'Target', $target_start, $target_end, $target_min, $target_max, $target_end - $target_start; printf qq( %-7s|%7s|%7s|%7s|%7s|%7d|\n\n), 'Elapsed','','','','', ($logtime_first)? $logtime - $logtime_first: 0; } #print "\n\n table_numbers\n\n"; #for (sort keys %table_numbers) { print qq($table_numbers{$_}\t$_\n) }; # #print "\n\n table_names\n\n"; #for (sort keys %table_names) { print qq($_\t$table_names{$_}\n) }; #print "\n"; if (($report_per_table) and keys %per_table) { my $format = qq( %4s %-30s %9s %9s %9s %9s %5s %6s %7s %7s%s\n); print "\n"; printf $format, qw(Num Table_Name Total Insert Update Delete noPK Prep TotTime AvgTime), ''; printf $format, '----', '-'x30, '-'x9, '-'x9, '-'x9, '-'x9, '-'x5, '-'x6, sprintf("%7d",$logtime - $logtime_first), '-'x9, ''; my $count = 0; for (sort {$per_table{$b} <=> $per_table{$a}} keys %per_table) { next unless /\sTOTAL/; # only my $n = 0; my $table = (split)[0]; if ( $table =~ s/$ct_suffix$//i ) { # If we stripped the suffix, then it was a change table $n = $table_numbers{lc($table)}; # Use negative base table ID for make-up change table. $n = 0 unless $n; $n = - ($n % 10000) if $n > 10000; $table .= $ct_suffix; # restore } else { $n = $table_numbers{lc($table)}; } $n = 0 unless $n; $n = ($n % 10000) if $n > 10000; # Change tables ?? my $i = $per_table{"$table INSERT"}; my $u = $per_table{"$table UPDATE"}; my $m = $per_table{"$table MERGE"}; my $d = $per_table{"$table DELETE"}; my $t = $per_table{"$table TOTAL"}; my $s = $per_table{"$table DURATION"}; my $p = $per_table{"$table noPK"}; my $h = ($n) ? $start_handling_table{$n} : 0; $i = 0 unless $i; $u = 0 unless $u; $m = 0 unless $m; $u = $m unless $u; $d = 0 unless $d; $h = 0 unless $h; $p = 0 unless $p; $s = 0 unless $s; my $xx = ''; if ($x && $per_tableX{"$table X_start"} ) { $xx = ' ' . $per_tableX{"$table X_start"} . ' - ' . $per_tableX{"$table X_end"} }; printf $format, $n, substr($table,0,30), $per_table{"$_"}, $i , $u, $d, $p, $h, $s, sprintf (qq(%5.4f),($s and $t) ? ($s / $t) : 0), $xx; last if ++$count > $max_tables; } } if (($report_per_table) and keys %one_table) { my $format = qq(%-30s %9s %10s %9s %9s %9s\n); print "\n"; printf $format, qw(Table_Name Changes One-by-One Failed Time AvgTime); printf $format, '-'x30, '-'x9, '-'x10, '-'x9, '-'x9, '-'x9; for (sort {$one_table{$b} <=> $one_table{$a}} keys %one_table) { next unless /\sTOTAL/; # only my $table = (split)[0]; my $table_source = $table; $table_source = $1 if /(\w+)['"] TOTAL/; #'dbo'.'ClaimDetailPriceCode' my $o = $one_table{"$table TOTAL"}; my $t = $per_table{"$table_source TOTAL"}; $t = $per_table{uc($table_source)." TOTAL"} unless $t; $t = $per_table{lc($table_source)." TOTAL"} unless $t; my $f = $one_table{"$table FAIL"}; my $s = $one_table{"$table DURATION"}; $s = 0 unless $s; $f = 0 unless $f; $t = 0 unless $t; printf $format, $table_source, $t, $o, $f, $s, sprintf (qq(%5.4f),($s and $o) ? ($s / $o) : 0); } } if (($report_per_table) and keys %pk_change) { my $format = qq(%5s %-30s %9s %9s %9s\n); print "\n"; printf $format, qw(Num Table_Name PK_update PK_insert PK_delete); printf $format, '-'x4, '-'x30, '-'x9, '-'x9, '-'x9; # debug for (sort keys %pk_change) {print "$_ - $pk_change{$_}\n"} for (sort {$pk_change{$b} <=> $pk_change{$a}} keys %pk_change) { next unless /TOTAL/; my ($table,$operation) = split; my $table_source = $table_names{$table}; $table_source = ('table #' . $table) unless $table_source; my $u = $pk_change{"$table PKu"}; my $i = $pk_change{"$table PKi"}; my $d = $pk_change{"$table PKd"}; $u = 0 unless $u; $i = 0 unless $i; $d = 0 unless $d; printf $format, $table, $table_source, $u, $i, $d; } } # Connect to SQLite database in TASK directory to map table number to names. if ($no_bulk_total or $no_pk_total) { my $dbh = DBI->connect("dbi:SQLite:${DATA}\\tasks\\${task_name}\\dynamic_metadata.sqlite", "", "", { RaiseError => 1, AutoCommit => 1 }); my $sth = $dbh->prepare('SELECT table_owner_and_name from table_definitions WHERE table_id = ?') or die "Couldn't prepare statement: " . $dbh->errstr; for (keys %no_bulk, keys %no_pk) { if (!$table_names{$_}){ $sth->execute($_) # Execute the query or print STDERR "Couldn't get name for table $_ executing statement: " . $sth->errstr . "\n"; $table_names{$_} = $data[0] if (@data = $sth->fetchrow_array()); } } if ($no_bulk_total) { print "\nTable No Bulk - Table Name\n"; for (sort {$no_bulk{$b} <=> $no_bulk{$a}} keys %no_bulk) { printf "%5d%9d %s\n", $_, $no_bulk{$_}, ($table_names{$_}) ? $table_names{$_} : 'N.A.'; } } if ($no_pk_total) { print "\nTable No PK - Table Name\n"; for (sort {$no_pk{$b} <=> $no_pk{$a}} keys %no_pk) { printf "%5d%9d %s\n", $_, $no_pk{$_}, ($table_names{$_}) ? $table_names{$_} : 'N.A.'; } } } sub handle_duration() { my $thread = shift @_; my $apply_start = $apply_start{$thread}; my $duration = $logtime - $apply_start; #02/21 07:40:15 E 2 1 INSERT 11255 11257 3 OUTPT_ORDER_LINE_ITEM_ATT_PROD my ($xd,$xt,$xu,$xl,$verb,$xf,$dummy,$rows,$table) = split /\s+/,$apply_start_hhmmss{$thread}; $verb = uc($verb); #print "Handling $thread - $apply_start - $duration - $logtime_hhmmss\n"; $per_table{"$table DURATION"} += $duration; $per_table{"$table TOTAL"} += $rows; $per_table{"$table $verb"} += $rows; $per_table{" DURATION"} += $duration; $per_table{" TOTAL"} += $rows; $per_table{" $verb"} += $rows; $gap = ($apply_start and $apply_end{$thread})? $apply_start - $apply_end{$thread} : 0; if ($x && $rows >= $x) { $per_tableX{"$table X_start"} = $logtime_hhmmss unless $per_tableX{"$table X_start"}; $per_tableX{"$table X_end"} = $logtime_hhmmss; } if ($csv) { print CSV "LogStartTime,Thread,Duration,Gap,Verb,From,To,Rows,Table\n" unless $csv_header++; printf CSV $apply_start_hhmmss{$thread}, $duration, $gap; } else { #02/20 19:51:20 x 9 1 UPDATE 72 73 2 WM_INVENTORY_ATT_PROD if ($verbose) { print "Log Start Time # Time Gap Verb From To Rows Table\n" unless $csv_header; print "-------------- - ---- ----- ------ ------ ----- ---- ----------------\n" unless $csv_header++; printf ($apply_start_hhmmss{$thread}, $duration, $gap); } } $apply_end{$thread} = $logtime; $apply_start{$thread} = 0; }