jl-
2/28/2016 - 1:45 PM

SQL and PLPGSQL code to store ftp logs into a table in PostgreSQL (From rsyslog)

SQL and PLPGSQL code to store ftp logs into a table in PostgreSQL (From rsyslog)

--
-- PostgreSQL database dump
--

SET statement_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SET check_function_bodies = false;
SET client_min_messages = warning;

--
-- Name: plperlu; Type: EXTENSION; Schema: -; Owner: 
--

CREATE EXTENSION IF NOT EXISTS plperlu WITH SCHEMA pg_catalog;


--
-- Name: EXTENSION plperlu; Type: COMMENT; Schema: -; Owner: 
--

COMMENT ON EXTENSION plperlu IS 'PL/PerlU untrusted procedural language';


--
-- Name: plpgsql; Type: EXTENSION; Schema: -; Owner: 
--

CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;


--
-- Name: EXTENSION plpgsql; Type: COMMENT; Schema: -; Owner: 
--

COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';


SET search_path = public, pg_catalog;

--
-- Name: video_date_range; Type: TYPE; Schema: public; Owner: syslog_writer
--

CREATE TYPE video_date_range AS (
	begins_at timestamp with time zone,
	ends_at timestamp with time zone
);


ALTER TYPE public.video_date_range OWNER TO syslog_writer;

--
-- Name: assign_images(); Type: FUNCTION; Schema: public; Owner: syslog_writer
--

CREATE FUNCTION assign_images() RETURNS trigger
    LANGUAGE plpgsql
    AS $$
  BEGIN
      UPDATE images SET assigned = 't' WHERE images.camera_id = NEW.camera_id AND
      images.uploaded_at >= NEW.begins_at AND images.uploaded_at <= NEW.ends_at;
      RETURN NEW;
  END;
$$;


ALTER FUNCTION public.assign_images() OWNER TO syslog_writer;

--
-- Name: delete_file(); Type: FUNCTION; Schema: public; Owner: pgsql
--

CREATE FUNCTION delete_file() RETURNS trigger
    LANGUAGE plperlu
    AS $_X$
 my $f = $_TD->{old}{path};
 unlink($f) or warn "delete_file(): could not unlink $f: $!\n";
 return "SKIP";
$_X$;


ALTER FUNCTION public.delete_file() OWNER TO pgsql;

--
-- Name: delete_images(); Type: FUNCTION; Schema: public; Owner: syslog_writer
--

CREATE FUNCTION delete_images() RETURNS trigger
    LANGUAGE plpgsql
    AS $$
  BEGIN
      if (NEW.uploaded IS TRUE) THEN
        DELETE FROM images
        WHERE images.camera_id = NEW.camera_id
        AND images.uploaded_at >= NEW.begins_at
        AND images.uploaded_at <= NEW.ends_at
        AND images.assigned IS TRUE;
      END IF;
      RETURN NEW;

  END
$$;


ALTER FUNCTION public.delete_images() OWNER TO syslog_writer;

--
-- Name: find_camera_id(text); Type: FUNCTION; Schema: public; Owner: syslog_writer
--

CREATE FUNCTION find_camera_id(ip text) RETURNS integer
    LANGUAGE plpgsql
    AS $$
  DECLARE
    camera_id INTEGER;
  BEGIN
    SELECT cameras.id INTO camera_id FROM cameras WHERE ip_address = ip;

    IF (camera_id IS NULL) THEN
      INSERT INTO cameras(ip_address) VALUES (ip) RETURNING id INTO camera_id;
    END IF;

    RETURN camera_id;
  END;
$$;


ALTER FUNCTION public.find_camera_id(ip text) OWNER TO syslog_writer;

--
-- Name: find_worker_id(text); Type: FUNCTION; Schema: public; Owner: syslog_writer
--

CREATE FUNCTION find_worker_id(ip text) RETURNS integer
    LANGUAGE plpgsql
    AS $$
  DECLARE
    worker_id INTEGER;
  BEGIN
    SELECT workers.id INTO worker_id FROM workers WHERE ip_address = ip;

    IF (worker_id IS NULL) THEN
      INSERT INTO workers(ip_address) VALUES (ip) RETURNING id INTO worker_id;
    END IF;

    RETURN worker_id;
  END;
$$;


ALTER FUNCTION public.find_worker_id(ip text) OWNER TO syslog_writer;

--
-- Name: ftp_logs_to_images(); Type: FUNCTION; Schema: public; Owner: syslog_writer
--

CREATE FUNCTION ftp_logs_to_images() RETURNS trigger
    LANGUAGE plpgsql
    AS $$
  DECLARE
    camera_ip TEXT;
    path TEXT;
    year  INTEGER;
    month  INTEGER;
    camera_id INTEGER;
    uploaded_at TIMESTAMP;
    uploaded_at_year INTEGER;
    uploaded_at_month INTEGER;
  BEGIN
    IF ( NEW.msg ~ 'OK UPLOAD' ) THEN
      camera_ip := regexp_replace(split_part(NEW.msg, ' ', 6), '\"|\,', '', 'g') ;
      path      := regexp_replace(split_part(NEW.msg, ' ', 7), '\"|\,', '', 'g');
      year      := CAST(split_part(path, '/', 6) AS INTEGER);
      month      := CAST(split_part(path, '/', 7) AS INTEGER);
      uploaded_at = NEW.timegenerated;
      uploaded_at_year := CAST(date_part('year', uploaded_at) AS INTEGER);
      uploaded_at_month := CAST(date_part('month', uploaded_at) AS INTEGER);
      IF (year = uploaded_at_year AND month = uploaded_at_month) THEN
        SELECT * INTO camera_id FROM find_camera_id(camera_ip);
        INSERT INTO images(uploaded_at, camera_id, path) VALUES (uploaded_at, camera_id, path);
        UPDATE cameras SET latest_upload_at = uploaded_at WHERE id = camera_id;
      END IF;
    END IF;
    RETURN NEW;
  END;
$$;


ALTER FUNCTION public.ftp_logs_to_images() OWNER TO syslog_writer;

--
-- Name: log_images(); Type: FUNCTION; Schema: public; Owner: syslog_writer
--

CREATE FUNCTION log_images() RETURNS trigger
    LANGUAGE plpgsql
    AS $$
  BEGIN
      INSERT INTO images_logs(old_id, uploaded_at, camera_id, path, assigned)
      VALUES (OLD.*);
      RETURN NULL;
  END
$$;


ALTER FUNCTION public.log_images() OWNER TO syslog_writer;

--
-- Name: video_date_range_by_camera_id(integer); Type: FUNCTION; Schema: public; Owner: syslog_writer
--

CREATE FUNCTION video_date_range_by_camera_id(integer) RETURNS video_date_range
    LANGUAGE plpgsql
    AS $_$
  DECLARE
    date_range video_date_range;
    duration text;
    camera_id INTEGER;
    begins_at timestamp with time zone;
  BEGIN
       SELECT video_duration || ' minutes' AS video_duration INTO duration FROM cameras WHERE id = $1;

       SELECT
         i.uploaded_at
       INTO begins_at
       FROM
         images i
       INNER JOIN videos v
       ON
         v.camera_id = $1
         AND i.camera_id = $1
         AND i.uploaded_at > v.ends_at
         AND i.assigned IS FALSE
       ORDER BY
         i.uploaded_at ASC
       LIMIT 1;

       IF NOT FOUND THEN
         SELECT
           i.uploaded_at
         INTO begins_at
         FROM
           images i
         WHERE
           i.camera_id = $1
           AND i.assigned IS FALSE
         ORDER BY
           i.uploaded_at ASC
         LIMIT 1;
       END IF;
       date_range := ROW(begins_at, (begins_at + INTERVAL ' 10 minutes'));
    RETURN date_range;
  END;
$_$;


ALTER FUNCTION public.video_date_range_by_camera_id(integer) OWNER TO syslog_writer;

SET default_tablespace = '';

SET default_with_oids = false;

--
-- Name: cameras; Type: TABLE; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE TABLE cameras (
    id integer NOT NULL,
    ip_address text NOT NULL,
    video_duration integer DEFAULT 10 NOT NULL,
    latest_upload_at timestamp with time zone,
    has_worker boolean DEFAULT false NOT NULL
);


ALTER TABLE public.cameras OWNER TO syslog_writer;

--
-- Name: cameras_id_seq; Type: SEQUENCE; Schema: public; Owner: syslog_writer
--

CREATE SEQUENCE cameras_id_seq
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;


ALTER TABLE public.cameras_id_seq OWNER TO syslog_writer;

--
-- Name: cameras_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: syslog_writer
--

ALTER SEQUENCE cameras_id_seq OWNED BY cameras.id;


--
-- Name: ftp_logs; Type: TABLE; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE TABLE ftp_logs (
    id integer NOT NULL,
    timegenerated timestamp with time zone NOT NULL,
    hostname character varying(60),
    tag text,
    msg text
);


ALTER TABLE public.ftp_logs OWNER TO syslog_writer;

--
-- Name: ftp_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: syslog_writer
--

CREATE SEQUENCE ftp_logs_id_seq
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;


ALTER TABLE public.ftp_logs_id_seq OWNER TO syslog_writer;

--
-- Name: ftp_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: syslog_writer
--

ALTER SEQUENCE ftp_logs_id_seq OWNED BY ftp_logs.id;


--
-- Name: images; Type: TABLE; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE TABLE images (
    id integer NOT NULL,
    uploaded_at timestamp with time zone NOT NULL,
    camera_id integer NOT NULL,
    path text NOT NULL,
    assigned boolean DEFAULT false
);


ALTER TABLE public.images OWNER TO syslog_writer;

--
-- Name: images_id_seq; Type: SEQUENCE; Schema: public; Owner: syslog_writer
--

CREATE SEQUENCE images_id_seq
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;


ALTER TABLE public.images_id_seq OWNER TO syslog_writer;

--
-- Name: images_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: syslog_writer
--

ALTER SEQUENCE images_id_seq OWNED BY images.id;


--
-- Name: images_logs; Type: TABLE; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE TABLE images_logs (
    id integer NOT NULL,
    old_id integer NOT NULL,
    uploaded_at timestamp with time zone NOT NULL,
    camera_id integer NOT NULL,
    path text NOT NULL,
    assigned boolean DEFAULT false
);


ALTER TABLE public.images_logs OWNER TO syslog_writer;

--
-- Name: images_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: syslog_writer
--

CREATE SEQUENCE images_logs_id_seq
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;


ALTER TABLE public.images_logs_id_seq OWNER TO syslog_writer;

--
-- Name: images_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: syslog_writer
--

ALTER SEQUENCE images_logs_id_seq OWNED BY images_logs.id;


--
-- Name: videos; Type: TABLE; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE TABLE videos (
    id integer NOT NULL,
    begins_at timestamp with time zone NOT NULL,
    ends_at timestamp with time zone NOT NULL,
    worker_id integer NOT NULL,
    camera_id integer NOT NULL,
    uploaded boolean DEFAULT false NOT NULL,
    created_at timestamp with time zone DEFAULT now() NOT NULL
);


ALTER TABLE public.videos OWNER TO syslog_writer;

--
-- Name: videos_id_seq; Type: SEQUENCE; Schema: public; Owner: syslog_writer
--

CREATE SEQUENCE videos_id_seq
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;


ALTER TABLE public.videos_id_seq OWNER TO syslog_writer;

--
-- Name: videos_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: syslog_writer
--

ALTER SEQUENCE videos_id_seq OWNED BY videos.id;


--
-- Name: workers; Type: TABLE; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE TABLE workers (
    id integer NOT NULL,
    ip_address text NOT NULL
);


ALTER TABLE public.workers OWNER TO syslog_writer;

--
-- Name: workers_id_seq; Type: SEQUENCE; Schema: public; Owner: syslog_writer
--

CREATE SEQUENCE workers_id_seq
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;


ALTER TABLE public.workers_id_seq OWNER TO syslog_writer;

--
-- Name: workers_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: syslog_writer
--

ALTER SEQUENCE workers_id_seq OWNED BY workers.id;


--
-- Name: id; Type: DEFAULT; Schema: public; Owner: syslog_writer
--

ALTER TABLE ONLY cameras ALTER COLUMN id SET DEFAULT nextval('cameras_id_seq'::regclass);


--
-- Name: id; Type: DEFAULT; Schema: public; Owner: syslog_writer
--

ALTER TABLE ONLY ftp_logs ALTER COLUMN id SET DEFAULT nextval('ftp_logs_id_seq'::regclass);


--
-- Name: id; Type: DEFAULT; Schema: public; Owner: syslog_writer
--

ALTER TABLE ONLY images ALTER COLUMN id SET DEFAULT nextval('images_id_seq'::regclass);


--
-- Name: id; Type: DEFAULT; Schema: public; Owner: syslog_writer
--

ALTER TABLE ONLY images_logs ALTER COLUMN id SET DEFAULT nextval('images_logs_id_seq'::regclass);


--
-- Name: id; Type: DEFAULT; Schema: public; Owner: syslog_writer
--

ALTER TABLE ONLY videos ALTER COLUMN id SET DEFAULT nextval('videos_id_seq'::regclass);


--
-- Name: id; Type: DEFAULT; Schema: public; Owner: syslog_writer
--

ALTER TABLE ONLY workers ALTER COLUMN id SET DEFAULT nextval('workers_id_seq'::regclass);


--
-- Name: cameras_ip_address; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY cameras
    ADD CONSTRAINT cameras_ip_address UNIQUE (ip_address);


--
-- Name: cameras_pkey; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY cameras
    ADD CONSTRAINT cameras_pkey PRIMARY KEY (id);


--
-- Name: ftp_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY ftp_logs
    ADD CONSTRAINT ftp_logs_pkey PRIMARY KEY (id);


--
-- Name: images_ip_and_path; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY images
    ADD CONSTRAINT images_ip_and_path UNIQUE (camera_id, path);


--
-- Name: images_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY images_logs
    ADD CONSTRAINT images_logs_pkey PRIMARY KEY (id);


--
-- Name: images_pkey; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY images
    ADD CONSTRAINT images_pkey PRIMARY KEY (id);


--
-- Name: videos_pkey; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY videos
    ADD CONSTRAINT videos_pkey PRIMARY KEY (id);


--
-- Name: worker_transactions_unique_keys; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY videos
    ADD CONSTRAINT worker_transactions_unique_keys UNIQUE (worker_id, camera_id, begins_at, ends_at);


--
-- Name: workers_ip_address; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY workers
    ADD CONSTRAINT workers_ip_address UNIQUE (ip_address);


--
-- Name: workers_pkey; Type: CONSTRAINT; Schema: public; Owner: syslog_writer; Tablespace: 
--

ALTER TABLE ONLY workers
    ADD CONSTRAINT workers_pkey PRIMARY KEY (id);


--
-- Name: images_camera_id; Type: INDEX; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE INDEX images_camera_id ON images USING btree (camera_id);


--
-- Name: images_camera_id_and_uploaded_at; Type: INDEX; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE INDEX images_camera_id_and_uploaded_at ON images USING btree (camera_id, uploaded_at);


--
-- Name: images_logs_path_idx; Type: INDEX; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE INDEX images_logs_path_idx ON images_logs USING btree (path);


--
-- Name: images_logs_uploaded_at; Type: INDEX; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE INDEX images_logs_uploaded_at ON images_logs USING btree (uploaded_at);


--
-- Name: images_uploaded_at; Type: INDEX; Schema: public; Owner: syslog_writer; Tablespace: 
--

CREATE INDEX images_uploaded_at ON images USING btree (uploaded_at);


--
-- Name: assign_images_from_video; Type: TRIGGER; Schema: public; Owner: syslog_writer
--

CREATE TRIGGER assign_images_from_video AFTER INSERT ON videos FOR EACH ROW EXECUTE PROCEDURE assign_images();


--
-- Name: delete_image; Type: TRIGGER; Schema: public; Owner: syslog_writer
--

CREATE TRIGGER delete_image AFTER DELETE ON images_logs FOR EACH ROW EXECUTE PROCEDURE delete_file();


--
-- Name: delete_images_from_video; Type: TRIGGER; Schema: public; Owner: syslog_writer
--

CREATE TRIGGER delete_images_from_video AFTER UPDATE ON videos FOR EACH ROW EXECUTE PROCEDURE delete_images();


--
-- Name: export_ftp_logs_to_images; Type: TRIGGER; Schema: public; Owner: syslog_writer
--

CREATE TRIGGER export_ftp_logs_to_images AFTER INSERT ON ftp_logs FOR EACH ROW EXECUTE PROCEDURE ftp_logs_to_images();


--
-- Name: log_deleted_images; Type: TRIGGER; Schema: public; Owner: syslog_writer
--

CREATE TRIGGER log_deleted_images AFTER DELETE ON images FOR EACH ROW EXECUTE PROCEDURE log_images();


--
-- Name: fk_images_camera_id; Type: FK CONSTRAINT; Schema: public; Owner: syslog_writer
--

ALTER TABLE ONLY images
    ADD CONSTRAINT fk_images_camera_id FOREIGN KEY (camera_id) REFERENCES cameras(id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE;


--
-- Name: public; Type: ACL; Schema: -; Owner: pgsql
--

REVOKE ALL ON SCHEMA public FROM PUBLIC;
REVOKE ALL ON SCHEMA public FROM pgsql;
GRANT ALL ON SCHEMA public TO pgsql;
GRANT ALL ON SCHEMA public TO PUBLIC;


--
-- Name: cameras; Type: ACL; Schema: public; Owner: syslog_writer
--

REVOKE ALL ON TABLE cameras FROM PUBLIC;
REVOKE ALL ON TABLE cameras FROM syslog_writer;
GRANT ALL ON TABLE cameras TO syslog_writer;
GRANT SELECT,UPDATE ON TABLE cameras TO worker;


--
-- Name: images; Type: ACL; Schema: public; Owner: syslog_writer
--

REVOKE ALL ON TABLE images FROM PUBLIC;
REVOKE ALL ON TABLE images FROM syslog_writer;
GRANT ALL ON TABLE images TO syslog_writer;
GRANT SELECT ON TABLE images TO worker;


--
-- Name: workers; Type: ACL; Schema: public; Owner: syslog_writer
--

REVOKE ALL ON TABLE workers FROM PUBLIC;
REVOKE ALL ON TABLE workers FROM syslog_writer;
GRANT ALL ON TABLE workers TO syslog_writer;
GRANT SELECT,INSERT ON TABLE workers TO worker;


--
-- PostgreSQL database dump complete
--