| 
									
										
										
										
											2016-10-04 08:29:55 +08:00
										 |  |  | /* | 
					
						
							|  |  |  |  * Minio Cloud Storage, (C) 2014-2016 Minio, Inc. | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  |  * you may not use this file except in compliance with the License. | 
					
						
							|  |  |  |  * You may obtain a copy of the License at | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  *     http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  |  * distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  |  * See the License for the specific language governing permissions and | 
					
						
							|  |  |  |  * limitations under the License. | 
					
						
							|  |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // PostgreSQL Notifier implementation. A table with a specific
 | 
					
						
							|  |  |  | // structure (column names, column types, and primary key/uniqueness
 | 
					
						
							|  |  |  | // constraint) is used. The user may set the table name in the
 | 
					
						
							|  |  |  | // configuration. A sample SQL command that creates a command with the
 | 
					
						
							|  |  |  | // required structure is:
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | //     CREATE TABLE myminio (
 | 
					
						
							|  |  |  | //         key VARCHAR PRIMARY KEY,
 | 
					
						
							|  |  |  | //         value JSONB
 | 
					
						
							|  |  |  | //     );
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // PostgreSQL's "INSERT ... ON CONFLICT ... DO UPDATE ..." feature
 | 
					
						
							|  |  |  | // (UPSERT) is used here, so the minimum version of PostgreSQL
 | 
					
						
							|  |  |  | // required is 9.5.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // On each create or update object event in Minio Object storage
 | 
					
						
							|  |  |  | // server, a row is created or updated in the table in Postgres. On
 | 
					
						
							|  |  |  | // each object removal, the corresponding row is deleted from the
 | 
					
						
							|  |  |  | // table.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package cmd | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"database/sql" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"io/ioutil" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/Sirupsen/logrus" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// libpq db driver is usually imported blank - see examples in
 | 
					
						
							|  |  |  | 	// https://godoc.org/github.com/lib/pq
 | 
					
						
							|  |  |  | 	_ "github.com/lib/pq" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	upsertRow = `INSERT INTO %s (key, value) | 
					
						
							|  |  |  | VALUES ($1, $2) | 
					
						
							|  |  |  | ON CONFLICT (key) | 
					
						
							|  |  |  | DO UPDATE SET value = EXCLUDED.value;` | 
					
						
							|  |  |  | 	deleteRow = ` DELETE FROM %s | 
					
						
							|  |  |  | WHERE key = $1;` | 
					
						
							|  |  |  | 	createTable = `CREATE TABLE %s ( | 
					
						
							|  |  |  |     key VARCHAR PRIMARY KEY, | 
					
						
							|  |  |  |     value JSONB | 
					
						
							|  |  |  | );` | 
					
						
							|  |  |  | 	tableExists = `SELECT 1 FROM %s;` | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type postgreSQLNotify struct { | 
					
						
							|  |  |  | 	Enable bool `json:"enable"` | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// pass connection string in config directly. This string is
 | 
					
						
							|  |  |  | 	// formatted according to
 | 
					
						
							|  |  |  | 	// https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters
 | 
					
						
							|  |  |  | 	ConnectionString string `json:"connectionString"` | 
					
						
							|  |  |  | 	// specifying a table name is required.
 | 
					
						
							|  |  |  | 	Table string `json:"table"` | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// uses the values below if no connection string is specified
 | 
					
						
							|  |  |  | 	// - however the connection string method offers more
 | 
					
						
							|  |  |  | 	// flexibility.
 | 
					
						
							|  |  |  | 	Host     string `json:"host"` | 
					
						
							|  |  |  | 	Port     string `json:"port"` | 
					
						
							|  |  |  | 	User     string `json:"user"` | 
					
						
							|  |  |  | 	Password string `json:"password"` | 
					
						
							|  |  |  | 	Database string `json:"database"` | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-03-16 07:30:34 +08:00
										 |  |  | func (p *postgreSQLNotify) Validate() error { | 
					
						
							|  |  |  | 	if !p.Enable { | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2017-03-17 02:44:01 +08:00
										 |  |  | 	if _, err := checkURL(p.Host); err != nil { | 
					
						
							| 
									
										
										
										
											2017-03-16 07:30:34 +08:00
										 |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-10-04 08:29:55 +08:00
										 |  |  | type pgConn struct { | 
					
						
							|  |  |  | 	connStr       string | 
					
						
							|  |  |  | 	table         string | 
					
						
							|  |  |  | 	preparedStmts map[string]*sql.Stmt | 
					
						
							|  |  |  | 	*sql.DB | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) { | 
					
						
							|  |  |  | 	if !pgN.Enable { | 
					
						
							|  |  |  | 		return pgConn{}, errNotifyNotEnabled | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// check that table is specified
 | 
					
						
							|  |  |  | 	if pgN.Table == "" { | 
					
						
							|  |  |  | 		return pgConn{}, fmt.Errorf( | 
					
						
							|  |  |  | 			"PostgreSQL Notifier Error: Table was not specified in configuration") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	connStr := pgN.ConnectionString | 
					
						
							|  |  |  | 	// check if connection string is specified
 | 
					
						
							|  |  |  | 	if connStr == "" { | 
					
						
							|  |  |  | 		// build from other parameters
 | 
					
						
							|  |  |  | 		params := []string{} | 
					
						
							|  |  |  | 		if pgN.Host != "" { | 
					
						
							|  |  |  | 			params = append(params, "host="+pgN.Host) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if pgN.Port != "" { | 
					
						
							|  |  |  | 			params = append(params, "port="+pgN.Port) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if pgN.User != "" { | 
					
						
							|  |  |  | 			params = append(params, "user="+pgN.User) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if pgN.Password != "" { | 
					
						
							|  |  |  | 			params = append(params, "password="+pgN.Password) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if pgN.Database != "" { | 
					
						
							|  |  |  | 			params = append(params, "dbname="+pgN.Database) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		connStr = strings.Join(params, " ") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	db, err := sql.Open("postgres", connStr) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return pgConn{}, fmt.Errorf( | 
					
						
							|  |  |  | 			"PostgreSQL Notifier Error: Connection opening failure (connectionString=%s): %v", | 
					
						
							|  |  |  | 			connStr, err, | 
					
						
							|  |  |  | 		) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// ping to check that server is actually reachable.
 | 
					
						
							|  |  |  | 	err = db.Ping() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return pgConn{}, fmt.Errorf( | 
					
						
							|  |  |  | 			"PostgreSQL Notifier Error: Ping to server failed with: %v", | 
					
						
							|  |  |  | 			err, | 
					
						
							|  |  |  | 		) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// check that table exists - if not, create it.
 | 
					
						
							|  |  |  | 	_, err = db.Exec(fmt.Sprintf(tableExists, pgN.Table)) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		// most likely, table does not exist. try to create it:
 | 
					
						
							|  |  |  | 		_, errCreate := db.Exec(fmt.Sprintf(createTable, pgN.Table)) | 
					
						
							|  |  |  | 		if errCreate != nil { | 
					
						
							|  |  |  | 			// failed to create the table. error out.
 | 
					
						
							|  |  |  | 			return pgConn{}, fmt.Errorf( | 
					
						
							|  |  |  | 				"PostgreSQL Notifier Error: 'Select' failed with %v, then 'Create Table' failed with %v", | 
					
						
							|  |  |  | 				err, errCreate, | 
					
						
							|  |  |  | 			) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// create prepared statements
 | 
					
						
							|  |  |  | 	stmts := make(map[string]*sql.Stmt) | 
					
						
							|  |  |  | 	// insert or update statement
 | 
					
						
							|  |  |  | 	stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRow, pgN.Table)) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return pgConn{}, | 
					
						
							|  |  |  | 			fmt.Errorf("PostgreSQL Notifier Error: create UPSERT prepared statement failed with: %v", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRow, pgN.Table)) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return pgConn{}, | 
					
						
							|  |  |  | 			fmt.Errorf("PostgreSQL Notifier Error: create DELETE prepared statement failed with: %v", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return pgConn{connStr, pgN.Table, stmts, db}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newPostgreSQLNotify(accountID string) (*logrus.Logger, error) { | 
					
						
							| 
									
										
										
										
											2017-02-10 07:20:54 +08:00
										 |  |  | 	pgNotify := serverConfig.Notify.GetPostgreSQLByID(accountID) | 
					
						
							| 
									
										
										
										
											2016-10-04 08:29:55 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Dial postgres
 | 
					
						
							|  |  |  | 	pgC, err := dialPostgreSQL(pgNotify) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	pgLog := logrus.New() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	pgLog.Out = ioutil.Discard | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	pgLog.Formatter = new(logrus.JSONFormatter) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	pgLog.Hooks.Add(pgC) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return pgLog, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (pgC pgConn) Close() { | 
					
						
							|  |  |  | 	// first close all prepared statements
 | 
					
						
							|  |  |  | 	for _, v := range pgC.preparedStmts { | 
					
						
							|  |  |  | 		_ = v.Close() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// close db connection
 | 
					
						
							|  |  |  | 	_ = pgC.DB.Close() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (pgC pgConn) Fire(entry *logrus.Entry) error { | 
					
						
							|  |  |  | 	// get event type by trying to convert to string
 | 
					
						
							|  |  |  | 	entryEventType, ok := entry.Data["EventType"].(string) | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		// ignore event if converting EventType to string
 | 
					
						
							|  |  |  | 		// fails.
 | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Check for event delete
 | 
					
						
							|  |  |  | 	if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) { | 
					
						
							|  |  |  | 		// delete row from the table
 | 
					
						
							|  |  |  | 		_, err := pgC.preparedStmts["deleteRow"].Exec(entry.Data["Key"]) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf( | 
					
						
							|  |  |  | 				"Error deleting event with key = %v - got postgres error - %v", | 
					
						
							|  |  |  | 				entry.Data["Key"], err, | 
					
						
							|  |  |  | 			) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		// json encode the value for the row
 | 
					
						
							|  |  |  | 		value, err := json.Marshal(map[string]interface{}{ | 
					
						
							|  |  |  | 			"Records": entry.Data["Records"], | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf( | 
					
						
							|  |  |  | 				"Unable to encode event %v to JSON - got error - %v", | 
					
						
							|  |  |  | 				entry.Data["Records"], err, | 
					
						
							|  |  |  | 			) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// upsert row into the table
 | 
					
						
							|  |  |  | 		_, err = pgC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf( | 
					
						
							|  |  |  | 				"Unable to upsert event with Key=%v and Value=%v - got postgres error - %v", | 
					
						
							|  |  |  | 				entry.Data["Key"], entry.Data["Records"], err, | 
					
						
							|  |  |  | 			) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (pgC pgConn) Levels() []logrus.Level { | 
					
						
							|  |  |  | 	return []logrus.Level{ | 
					
						
							|  |  |  | 		logrus.InfoLevel, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |