Database Design: Audit Log, Tracking Changes to Column Data Value in PostgreSQL (PART 1)
What is an Audit trail (also commonly called an Audit log)? It is basically a record-keeping of changes in the database. For instance, if a user updates a record, an audit log should record who made the changes and what the changes were. Audit logging is the common requirement in many enterprise applications, logging of data changes in a database, what data has changed, and who changed them and when.
A common mistake of audit logging is to make a duplicate history table that mimics the table you want to log. If you have multiple tables, you end up making multiple history tables. This is a bad design.
The suitable and recommended approach is to make a singular history table of all the changes. It is a single table that can account for any database table you plan to use. It is an “Amorphous” table; meaning it is consolidated and links by references to whatever you need to log. This logging process can be done as a Trigger or a Concurrent Action for your application.
General Benefit of Audit Log
An audit log can help administrators to accomplish several objectives such as individual accountability, reconstruction of events, intrusion detection, and problem analysis:
- Individual Accountability: Users are personally accountable for their actions which are tracked by an audit log because audit log record “before” and “after” versions of records. This can help the administrator to determine if errors were made by the user, by the system or application software, or by some other source
- Reconstruction of Events: Audit logs can be used to reconstruct events after a problem has occurred. It can reconstruct the series of steps taken by the system, the users, and the application
- Intrusion Detection: Refers to the process of identifying attempts to penetrate a system and gain unauthorized access. If an audit log has been designed and implemented to record appropriate information, it can assist in intrusion detection
- Problem Analysis: Refers to real-time auditing or monitoring. An analysis of the audit log may be able to verify that the system operated normally
Audit Log using Trigger (Drawback)
A common method to audit changes in a database is to create a trigger that monitors INSERTS, UPDATE, DELETES. Unfortunately, most web applications usually only have a single user login for the app itself. Users are often stored in a user table. If your users are managed by a web front end with a user table, triggers won’t work for this type of scenario.
2ndQuadrant Audit Trigger
The generic trigger function is used for recording changes to tables into an audit log table. It will record the old and new records, the table affected, the user who made the change, and the timestamp for each change. Row values are recorded as hstore fields rather than as flat text. This allows much more sophisticated querying against the audit history and allows the audit system to record only changed fields for updates. Auditing can be done at a statement-level or finely at a row level. Control is per audited table. The information recorded is as the following table:
This trigger cannot track:
- DDL like ALTER TABLE
- Changes to system catalogs
Configuration and Script
- Step 1: Create extension hstore
CREATE EXTENSION IF NOT EXISTS hstore;
If you cannot install hstore extension, you can install PostgreSQL contrib package as below:
yum install postgresql-contrib (Postgresql 9) yum install postgresql11-contrib (Postgresql 11)
- Step 2: Create schema audit
CREATE SCHEMA audit; REVOKE ALL ON SCHEMA audit FROM public; COMMENT ON SCHEMA audit IS 'Out-of-table audit/history logging tables and trigger functions';
- Step 3: Create a table to record column value changes
CREATE TABLE audit.logged_actions ( event_id bigserial primary key, schema_name text not null, table_name text not null, relid oid not null, session_user_name text, action_tstamp_tx TIMESTAMP WITH TIME ZONE NOT NULL, action_tstamp_stm TIMESTAMP WITH TIME ZONE NOT NULL, action_tstamp_clk TIMESTAMP WITH TIME ZONE NOT NULL, transaction_id bigint, application_name text, client_addr inet, client_port integer, client_query text, action TEXT NOT NULL CHECK (action IN ('I','D','U', 'T')), row_data hstore, changed_fields hstore, statement_only boolean not null ); REVOKE ALL ON audit.logged_actions FROM public; COMMENT ON TABLE audit.logged_actions IS 'History of auditable actions on audited tables, from audit.if_modified_func()'; COMMENT ON COLUMN audit.logged_actions.event_id IS 'Unique identifier for each auditable event'; COMMENT ON COLUMN audit.logged_actions.schema_name IS 'Database schema audited table for this event is in'; COMMENT ON COLUMN audit.logged_actions.table_name IS 'Non-schema-qualified table name of table event occured in'; COMMENT ON COLUMN audit.logged_actions.relid IS 'Table OID. Changes with drop/create. Get with ''tablename''::regclass'; COMMENT ON COLUMN audit.logged_actions.session_user_name IS 'Login / session user whose statement caused the audited event'; COMMENT ON COLUMN audit.logged_actions.action_tstamp_tx IS 'Transaction start timestamp for tx in which audited event occurred'; COMMENT ON COLUMN audit.logged_actions.action_tstamp_stm IS 'Statement start timestamp for tx in which audited event occurred'; COMMENT ON COLUMN audit.logged_actions.action_tstamp_clk IS 'Wall clock time at which audited event''s trigger call occurred'; COMMENT ON COLUMN audit.logged_actions.transaction_id IS 'Identifier of transaction that made the change. May wrap, but unique paired with action_tstamp_tx.'; COMMENT ON COLUMN audit.logged_actions.client_addr IS 'IP address of client that issued query. Null for unix domain socket.'; COMMENT ON COLUMN audit.logged_actions.client_port IS 'Remote peer IP port address of client that issued query. Undefined for unix socket.'; COMMENT ON COLUMN audit.logged_actions.client_query IS 'Top-level query that caused this auditable event. May be more than one statement.'; COMMENT ON COLUMN audit.logged_actions.application_name IS 'Application name set when this audit event occurred. Can be changed in-session by client.'; COMMENT ON COLUMN audit.logged_actions.action IS 'Action type; I = insert, D = delete, U = update, T = truncate'; COMMENT ON COLUMN audit.logged_actions.row_data IS 'Record value. Null for statement-level trigger. For INSERT this is the new tuple. For DELETE and UPDATE it is the old tuple.'; COMMENT ON COLUMN audit.logged_actions.changed_fields IS 'New values of fields changed by UPDATE. Null except for row-level UPDATE events.'; COMMENT ON COLUMN audit.logged_actions.statement_only IS '''t'' if audit event is from an FOR EACH STATEMENT trigger, ''f'' for FOR EACH ROW';
- Step 4: Create an index on created table
CREATE INDEX logged_actions_relid_idx ON audit.logged_actions(relid); CREATE INDEX logged_actions_action_tstamp_tx_stm_idx ON audit.logged_actions(action_tstamp_stm); CREATE INDEX logged_actions_action_idx ON audit.logged_actions(action);
- Step 5: Create trigger function (if_modified_func)
CREATE OR REPLACE FUNCTION audit.if_modified_func() RETURNS TRIGGER AS $body$ DECLARE audit_row audit.logged_actions; include_values boolean; log_diffs boolean; h_old hstore; h_new hstore; excluded_cols text = ARRAY::text; BEGIN IF TG_WHEN <> 'AFTER' THEN RAISE EXCEPTION 'audit.if_modified_func() may only run as an AFTER trigger'; END IF; audit_row = ROW( nextval('audit.logged_actions_event_id_seq'), -- event_id TG_TABLE_SCHEMA::text, -- schema_name TG_TABLE_NAME::text, -- table_name TG_RELID, -- relation OID for much quicker searches session_user::text, -- session_user_name current_timestamp, -- action_tstamp_tx statement_timestamp(), -- action_tstamp_stm clock_timestamp(), -- action_tstamp_clk txid_current(), -- transaction ID current_setting('application_name'), -- client application inet_client_addr(), -- client_addr inet_client_port(), -- client_port current_query(), -- top-level query or queries (if multistatement) from client substring(TG_OP,1,1), -- action NULL, NULL, -- row_data, changed_fields 'f' -- statement_only ); IF NOT TG_ARGV::boolean IS DISTINCT FROM 'f'::boolean THEN audit_row.client_query = NULL; END IF; IF TG_ARGV IS NOT NULL THEN excluded_cols = TG_ARGV::text; END IF; IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN audit_row.row_data = hstore(OLD.*) - excluded_cols; audit_row.changed_fields = (hstore(NEW.*) - audit_row.row_data) - excluded_cols; IF audit_row.changed_fields = hstore('') THEN -- All changed fields are ignored. Skip this update. RETURN NULL; END IF; ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN audit_row.row_data = hstore(OLD.*) - excluded_cols; ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN audit_row.row_data = hstore(NEW.*) - excluded_cols; ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN audit_row.statement_only = 't'; ELSE RAISE EXCEPTION '[audit.if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL; RETURN NULL; END IF; INSERT INTO audit.logged_actions VALUES (audit_row.*); RETURN NULL; END; $body$ LANGUAGE plpgsql SECURITY DEFINER SET search_path = pg_catalog, public; COMMENT ON FUNCTION audit.if_modified_func() IS $body$ Track changes to a table at the statement and/or row level. Optional parameters to trigger in CREATE TRIGGER call: param 0: boolean, whether to log the query text. Default 't'. param 1: text, columns to ignore in updates. Default . Updates to ignored cols are omitted from changed_fields. Updates with only ignored cols changed are not inserted into the audit log. Almost all the processing work is still done for updates that ignored. If you need to save the load, you need to use WHEN clause on the trigger instead. No warning or error is issued if ignored_cols contains columns that do not exist in the target table. This lets you specify a standard set of ignored columns. There is no parameter to disable logging of values. Add this trigger as a 'FOR EACH STATEMENT' rather than 'FOR EACH ROW' trigger if you do not want to log row values. Note that the user name logged is the login role for the session. The audit trigger cannot obtain the active role because it is reset by the SECURITY DEFINER invocation of the audit trigger its self. $body$;
- Step 6: Create function to attach trigger to specified table (audit_table)
CREATE OR REPLACE FUNCTION audit.audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean, ignored_cols text) RETURNS void AS $body$ DECLARE stm_targets text = 'INSERT OR UPDATE OR DELETE OR TRUNCATE'; _q_txt text; _ignored_cols_snip text = ''; BEGIN EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_row ON ' || target_table; EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_stm ON ' || target_table; IF audit_rows THEN IF array_length(ignored_cols,1) > 0 THEN _ignored_cols_snip = ', ' || quote_literal(ignored_cols); END IF; _q_txt = 'CREATE TRIGGER audit_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' || target_table || ' FOR EACH ROW EXECUTE PROCEDURE audit.if_modified_func(' || quote_literal(audit_query_text) || _ignored_cols_snip || ');'; RAISE NOTICE '%',_q_txt; EXECUTE _q_txt; stm_targets = 'TRUNCATE'; ELSE END IF; _q_txt = 'CREATE TRIGGER audit_trigger_stm AFTER ' || stm_targets || ' ON ' || target_table || ' FOR EACH STATEMENT EXECUTE PROCEDURE audit.if_modified_func('|| quote_literal(audit_query_text) || ');'; RAISE NOTICE '%',_q_txt; EXECUTE _q_txt; END; $body$ language 'plpgsql'; COMMENT ON FUNCTION audit.audit_table(regclass, boolean, boolean, text) IS $body$ Add auditing support to a table. Arguments: target_table: Table name, schema qualified if not on search_path audit_rows: Record each row change, or only audit at a statement level audit_query_text: Record the text of the client query that triggered the audit event? ignored_cols: Columns to exclude from update diffs, ignore updates that change only ignored cols. $body$;
- Step 7: Create overloading function audit_table
CREATE OR REPLACE FUNCTION audit.audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean) RETURNS void AS $body$ SELECT audit.audit_table($1, $2, $3, ARRAY::text); $body$ LANGUAGE SQL;
CREATE OR REPLACE FUNCTION audit.audit_table(target_table regclass) RETURNS void AS $body$ SELECT audit.audit_table($1, BOOLEAN 't', BOOLEAN 't'); $body$ LANGUAGE 'sql'; COMMENT ON FUNCTION audit.audit_table(regclass) IS $body$ Add auditing support to the given table. Row-level changes will be logged with full client query text. No cols are ignored. $body$;
- Step 8: Create View for listing tables which have attached trigger to do audit logs
CREATE OR REPLACE VIEW audit.tableslist AS SELECT DISTINCT triggers.trigger_schema AS schema, triggers.event_object_table AS auditedtable FROM information_schema.triggers WHERE triggers.trigger_name::text IN ('audit_trigger_row'::text, 'audit_trigger_stm'::text) ORDER BY schema, auditedtable; COMMENT ON VIEW audit.tableslist IS $body$ View showing all tables with auditing set up. Ordered by schema, then table. $body$;
**** GitHub LINK *****
As of now, we have done the audit log configuration with PostgreSQL for tracking value changes that happened by insert, update, and delete transactions in the system. In the next blog post, you will learn about audit log basic usage and the experiment in PostgreSQL 13 as well. STAY TUNE !!!!!!!!!