Index: gw/bb_store.h =================================================================== RCS file: /home/cvs/gateway/gw/bb_store.h,v retrieving revision 1.3 diff -u -r1.3 bb_store.h --- gw/bb_store.h 24 Apr 2008 14:44:59 -0000 1.3 +++ gw/bb_store.h 4 Oct 2008 06:14:26 -0000 @@ -99,7 +99,7 @@ /* initialize system. Return -1 if fname is bad (too long). */ int store_init(const Octstr *type, const Octstr *fname, long dump_freq, - void *pack_func, void *unpack_func); + void *pack_func, void *unpack_func, Cfg *cfg); /* init shutdown (system dies when all acks have been processed) */ extern void (*store_shutdown)(void); @@ -112,7 +112,9 @@ */ int store_spool_init(const Octstr *fname); int store_file_init(const Octstr *fname, long dump_freq); - +#ifdef HAVE_MYSQL +int store_mysql_init(Cfg *cfg); +#endif #endif /*BB_STORE_H_*/ Index: gw/bb_store.c =================================================================== RCS file: /home/cvs/gateway/gw/bb_store.c,v retrieving revision 1.46 diff -u -r1.46 bb_store.c --- gw/bb_store.c 24 Apr 2008 14:44:59 -0000 1.46 +++ gw/bb_store.c 4 Oct 2008 06:14:26 -0000 @@ -79,7 +79,7 @@ int store_init(const Octstr *type, const Octstr *fname, long dump_freq, - void *pack_func, void *unpack_func) + void *pack_func, void *unpack_func, Cfg *cfg) { int ret; @@ -90,6 +90,10 @@ ret = store_file_init(fname, dump_freq); } else if (octstr_str_compare(type, "spool") == 0) { ret = store_spool_init(fname); +#ifdef HAVE_MYSQL + } else if (octstr_str_compare(type, "mysql") == 0) { + ret = store_mysql_init(cfg); +#endif } else { error(0, "Unknown 'store-type' defined."); ret = -1; Index: gw/bearerbox.c =================================================================== RCS file: /home/cvs/gateway/gw/bearerbox.c,v retrieving revision 1.168 diff -u -r1.168 bearerbox.c --- gw/bearerbox.c 21 Sep 2008 15:52:33 -0000 1.168 +++ gw/bearerbox.c 4 Oct 2008 06:14:28 -0000 @@ -415,7 +415,7 @@ log = cfg_get(grp, octstr_imm("store-location")); val = cfg_get(grp, octstr_imm("store-type")); } - if (store_init(val, log, store_dump_freq, msg_pack, msg_unpack_wrapper) == -1) + if (store_init(val, log, store_dump_freq, msg_pack, msg_unpack_wrapper, cfg) == -1) panic(0, "Could not start with store init failed."); octstr_destroy(val); octstr_destroy(log); Index: gwlib/cfg.def =================================================================== RCS file: /home/cvs/gateway/gwlib/cfg.def,v retrieving revision 1.134 diff -u -r1.134 cfg.def --- gwlib/cfg.def 26 Jul 2008 09:21:17 -0000 1.134 +++ gwlib/cfg.def 4 Oct 2008 06:14:50 -0000 @@ -569,6 +569,13 @@ OCTSTR(field-boxc-id) ) +SINGLE_GROUP(store-db, + OCTSTR(id) + OCTSTR(table) + OCTSTR(field-uuid) + OCTSTR(field-message) +) + SINGLE_GROUP(radius-acct, OCTSTR(our-host) Index: gw/bb_store_mysql.c =================================================================== RCS file: gw/bb_store_mysql.c diff -N gw/bb_store_mysql.c --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ gw/bb_store_mysql.c 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,585 @@ +/* ==================================================================== + * The Kannel Software License, Version 1.0 + * + * Copyright (c) 2001-2008 Kannel Group + * Copyright (c) 1998-2001 WapIT Ltd. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * + * 3. The end-user documentation included with the redistribution, + * if any, must include the following acknowledgment: + * "This product includes software developed by the + * Kannel Group (http://www.kannel.org/)." + * Alternately, this acknowledgment may appear in the software itself, + * if and wherever such third-party acknowledgments normally appear. + * + * 4. The names "Kannel" and "Kannel Group" must not be used to + * endorse or promote products derived from this software without + * prior written permission. For written permission, please + * contact org@kannel.org. + * + * 5. Products derived from this software may not be called "Kannel", + * nor may "Kannel" appear in their name, without prior written + * permission of the Kannel Group. + * + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Kannel Group. For more information on + * the Kannel Group, please see . + * + * Portions of this software are based upon software originally written at + * WapIT Ltd., Helsinki, Finland for the Kannel project. + */ + +/** + * bb_store_mysql.c - bearerbox box SMS storage/retrieval module using a mysql database + * + * Author: Alejandro Guerrieri, 2008 + */ + +#include "gw-config.h" + +#include +#include +#include +#include +#include +#include + +#include "gwlib/gwlib.h" +#include "msg.h" +#include "sms.h" +#include "bearerbox.h" +#include "bb_store.h" + + +#ifdef HAVE_MYSQL +#include +#include "gwlib/dbpool.h" + +static Counter *counter; +static List *loaded; +/* + * Our connection pool to mysql. + */ +static DBPool *pool = NULL; + +/* + * Will be used by DB based storage types. + * We have helper init function also. + */ +struct store_db_fields { + Octstr *table; + Octstr *field_uuid; + Octstr *field_message; +}; + +/* + * Database fields, which we are use. + */ +static struct store_db_fields *fields = NULL; + +static int store_mysql_dump() +{ + /* nothing to do */ + return 0; +} + + +static long store_mysql_messages() +{ + return counter ? counter_value(counter) : -1; +} + +static MYSQL_RES* mysql_select(const Octstr *sql) +{ + int state; + MYSQL_RES *result = NULL; + DBPoolConn *pc; + +#if defined(DLR_TRACE) + debug("store.mysql", 0, "sql: %s", octstr_get_cstr(sql)); +#endif + + pc = dbpool_conn_consume(pool); + if (pc == NULL) { + error(0, "MYSQL: Database pool got no connection! DB update failed!"); + return NULL; + } + + state = mysql_query(pc->conn, octstr_get_cstr(sql)); + if (state != 0) { + error(0, "MYSQL: %s", mysql_error(pc->conn)); + } else { + result = mysql_store_result(pc->conn); + } + + dbpool_conn_produce(pc); + + return result; +} + +static void mysql_update(const Octstr *sql) +{ + int state; + DBPoolConn *pc; + +#if defined(DLR_TRACE) + debug("store.mysql", 0, "sql: %s", octstr_get_cstr(sql)); +#endif + + pc = dbpool_conn_consume(pool); + if (pc == NULL) { + error(0, "MYSQL: Database pool got no connection! DB update failed!"); + return; + } + + state = mysql_query(pc->conn, octstr_get_cstr(sql)); + if (state != 0) + error(0, "MYSQL: %s", mysql_error(pc->conn)); + + dbpool_conn_produce(pc); +} + +static void store_mysql_add( Octstr *id, Octstr *os ) +{ + Octstr *sql; + octstr_binary_to_base64( os ); + sql = octstr_format("INSERT INTO %s (%s, %s) VALUES " + "('%s', '%s');", + octstr_get_cstr(fields->table), octstr_get_cstr(fields->field_uuid), + octstr_get_cstr(fields->field_message), + octstr_get_cstr( id ), octstr_get_cstr( os )); + + mysql_update(sql); + octstr_destroy(sql); +} + +static void store_mysql_delete( Octstr *uuid ) { + Octstr *sql; + + sql = octstr_format("DELETE FROM %s WHERE %s = '%s'", octstr_get_cstr(fields->table), + octstr_get_cstr(fields->field_uuid), octstr_get_cstr( uuid )); + + mysql_update(sql); + octstr_destroy(sql); +} + +/* + * Load all configuration directives that are common for all database + * types that use the 'store-db' group to define which attributes are + * used in the table + */ +struct store_db_fields *store_db_fields_create(CfgGroup *grp) +{ + struct store_db_fields *ret = NULL; + + ret = gw_malloc(sizeof(*ret)); + gw_assert(ret != NULL); + memset(ret, 0, sizeof(*ret)); + + if (!(ret->table = cfg_get(grp, octstr_imm("table")))) + panic(0, "Store-MySQL: DB: directive 'table' is not specified!"); + if (!(ret->field_uuid = cfg_get(grp, octstr_imm("field-uuid")))) + panic(0, "Store-MySQL: DB: directive 'field-uuid' is not specified!"); + if (!(ret->field_message = cfg_get(grp, octstr_imm("field-message")))) + panic(0, "Store-MySQL: DB: directive 'field-message' is not specified!"); + + return ret; +} + +void store_db_fields_destroy(struct store_db_fields *fields) +{ + /* sanity check */ + if (fields == NULL) + return; + +#define O_DELETE(a) { if (a) octstr_destroy(a); a = NULL; } + + O_DELETE(fields->table); + O_DELETE(fields->field_uuid); + O_DELETE(fields->field_message); + +#undef O_DELETE + + gw_free(fields); +} + +static int store_mysql_getall(int ignore_err, void(*cb)(Octstr*, void*), void *data) +{ + Octstr *sql; + Octstr *os; + MYSQL_RES *result; + MYSQL_ROW row; + + sql = octstr_format("SELECT %s, %s from %s", + octstr_get_cstr(fields->field_uuid), octstr_get_cstr(fields->field_message), + octstr_get_cstr(fields->table)); + + result = mysql_select(sql); + octstr_destroy(sql); + + if (result == NULL) { + if ( !ignore_err) { + return -1; + } + } + if (mysql_num_rows(result) < 1) { + debug("store.mysql", 0, "no messages found in store"); + } else { + while ( row = mysql_fetch_row(result) ) { + debug("store.mysql", 0, "Found entry %s", row[0] ); + + os = octstr_create(row[1]); + octstr_base64_to_binary(os); + if (os == NULL) { + info(0, "Could not decode message %s", row[0] ); + } else { + cb(os, data); + } + octstr_destroy(os); + } + } + mysql_free_result(result); + + return 0; +} + + +struct status { + const char *format; + Octstr *status; +}; + +static void status_cb(Octstr *msg_s, void *d) +{ + struct status *data = d; + struct tm tm; + char id[UUID_STR_LEN + 1]; + Msg *msg; + + msg = store_msg_unpack(msg_s); + + if (msg == NULL) + return; + /* transform the time value */ +#if LOG_TIMESTAMP_LOCALTIME + tm = gw_localtime(msg->sms.time); +#else + tm = gw_gmtime(msg->sms.time); +#endif + if (msg->sms.udhdata) + octstr_binary_to_hex(msg->sms.udhdata, 1); + if (msg->sms.msgdata && + (msg->sms.coding == DC_8BIT || msg->sms.coding == DC_UCS2 || + (msg->sms.coding == DC_UNDEF && msg->sms.udhdata))) + octstr_binary_to_hex(msg->sms.msgdata, 1); + + uuid_unparse(msg->sms.id, id); + + octstr_format_append(data->status, data->format, + id, + (msg->sms.sms_type == mo ? "MO" : + msg->sms.sms_type == mt_push ? "MT-PUSH" : + msg->sms.sms_type == mt_reply ? "MT-REPLY" : + msg->sms.sms_type == report_mo ? "DLR-MO" : + msg->sms.sms_type == report_mt ? "DLR-MT" : ""), + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec, + (msg->sms.sender ? octstr_get_cstr(msg->sms.sender) : ""), + (msg->sms.receiver ? octstr_get_cstr(msg->sms.receiver) : ""), + (msg->sms.smsc_id ? octstr_get_cstr(msg->sms.smsc_id) : ""), + (msg->sms.boxc_id ? octstr_get_cstr(msg->sms.boxc_id) : ""), + (msg->sms.udhdata ? octstr_get_cstr(msg->sms.udhdata) : ""), + (msg->sms.msgdata ? octstr_get_cstr(msg->sms.msgdata) : "")); + +} + +static Octstr *store_mysql_status(int status_type) +{ + Octstr *ret = octstr_create(""); + const char *format; + struct status data; + + /* check if we are active */ + if (pool == NULL) + return ret; + + /* set the type based header */ + if (status_type == BBSTATUS_HTML) { + octstr_append_cstr(ret, "\n" + "SMS IDTypeTimeSenderReceiver" + "SMSC IDBOX IDUDHMessage" + "\n"); + + format = "%s%s" + "%04d-%02d-%02d %02d:%02d:%02d" + "%s%s%s" + "%s%s%s\n"; + } else if (status_type == BBSTATUS_XML) { + format = "\n\t%s\n\t%s\n\t" + "%04d-%02d-%02d %02d:%02d:%02d\n\t" + "%s\n\t" + "%s\n\t%s\n\t" + "%s\n\t" + "%s\n\t%s\n\t" + "\n"; + } else { + octstr_append_cstr(ret, "[SMS ID] [Type] [Time] [Sender] [Receiver] [SMSC ID] [BOX ID] [UDH] [Message]\n"); + format = "[%s] [%s] [%04d-%02d-%02d %02d:%02d:%02d] [%s] [%s] [%s] [%s] [%s] [%s]\n"; + } + + data.format = format; + data.status = ret; + /* ignore error because files may disappear */ + store_mysql_getall(1, status_cb, &data); + + /* set the type based footer */ + if (status_type == BBSTATUS_HTML) { + octstr_append_cstr(ret,""); + } + + return ret; +} + +static void dispatch(Octstr *msg_s, void *data) +{ + Msg *msg; + void(*receive_msg)(Msg*) = data; + + if (msg_s == NULL) + return; + msg = store_msg_unpack(msg_s); + //octstr_destroy(msg_s); + if (msg != NULL) { + receive_msg(msg); + counter_increase(counter); + } else { + error(0, "Could not unpack message"); + } + //msg_destroy(msg); +} + +static int store_mysql_load(void(*receive_msg)(Msg*)) +{ + int rc; + + /* check if we are active */ + if (pool == NULL) + return 0; + + /* sanity check */ + if (receive_msg == NULL) + return -1; + + rc = store_mysql_getall(0, dispatch, receive_msg); + + info(0, "Loaded %ld messages from store.", counter_value(counter)); + + /* allow using of storage */ + gwlist_remove_producer(loaded); + + return rc; +} + + +static int store_mysql_save(Msg *msg) +{ + char id[UUID_STR_LEN + 1]; + Octstr *id_s; + + /* always set msg id and timestamp */ + if (msg_type(msg) == sms && uuid_is_null(msg->sms.id)) + uuid_generate(msg->sms.id); + + if (msg_type(msg) == sms && msg->sms.time == MSG_PARAM_UNDEFINED) + time(&msg->sms.time); + + if (pool == NULL) + return 0; + + /* block here if store still not loaded */ + gwlist_consume(loaded); + + switch(msg_type(msg)) { + case sms: + { + Octstr *os = store_msg_pack(msg); + + if (os == NULL) { + error(0, "Could not pack message."); + return -1; + } + uuid_unparse(msg->sms.id, id); + id_s = octstr_create(id); + store_mysql_add( id_s, os ); + octstr_destroy(id_s); + counter_increase(counter); + octstr_destroy(os); + break; + } + case ack: + { + //Octstr *filename; + uuid_unparse(msg->ack.id, id); + id_s = octstr_create(id); + store_mysql_delete( id_s ); + octstr_destroy(id_s); + counter_decrease(counter); + break; + } + default: + return -1; + } + + return 0; +} + +static int store_mysql_save_ack(Msg *msg, ack_status_t status) +{ + int ret; + Msg *nack = msg_create(ack); + + nack->ack.nack = status; + uuid_copy(nack->ack.id, msg->sms.id); + nack->ack.time = msg->sms.time; + ret = store_mysql_save(nack); + msg_destroy(nack); + + return ret; +} + + +static void store_mysql_shutdown() +{ + dbpool_destroy(pool); + store_db_fields_destroy(fields); + + counter_destroy(counter); + gwlist_destroy(loaded, NULL); +} + + +int store_mysql_init(Cfg *cfg) +{ + CfgGroup *grp; + List *grplist; + Octstr *mysql_host, *mysql_user, *mysql_pass, *mysql_db, *mysql_id; + long mysql_port = 0; + Octstr *p = NULL; + long pool_size; + DBConf *db_conf = NULL; + + store_messages = store_mysql_messages; + store_save = store_mysql_save; + store_save_ack = store_mysql_save_ack; + store_load = store_mysql_load; + store_dump = store_mysql_dump; + store_shutdown = store_mysql_shutdown; + store_status = store_mysql_status; + + /* + * check for all mandatory directives that specify the field names + * of the used MySQL table + */ + if (!(grp = cfg_get_single_group(cfg, octstr_imm("store-db")))) + panic(0, "Store-MySQL: MySQL: group 'dlr-db' is not specified!"); + + if (!(mysql_id = cfg_get(grp, octstr_imm("id")))) + panic(0, "Store-MySQL: MySQL: directive 'id' is not specified!"); + + fields = store_db_fields_create(grp); + gw_assert(fields != NULL); + + /* + * now grap the required information from the 'mysql-connection' group + * with the mysql-id we just obtained + * + * we have to loop through all available MySQL connection definitions + * and search for the one we are looking for + */ + + grplist = cfg_get_multi_group(cfg, octstr_imm("mysql-connection")); + while (grplist && (grp = gwlist_extract_first(grplist)) != NULL) { + p = cfg_get(grp, octstr_imm("id")); + if (p != NULL && octstr_compare(p, mysql_id) == 0) { + goto found; + } + if (p != NULL) octstr_destroy(p); + } + panic(0, "DLR: MySQL: connection settings for id '%s' are not specified!", + octstr_get_cstr(mysql_id)); + + found: + octstr_destroy(p); + gwlist_destroy(grplist, NULL); + + if (cfg_get_integer(&pool_size, grp, octstr_imm("max-connections")) == -1 || pool_size == 0) + pool_size = 1; + + if (!(mysql_host = cfg_get(grp, octstr_imm("host")))) + panic(0, "DLR: MySQL: directive 'host' is not specified!"); + if (!(mysql_user = cfg_get(grp, octstr_imm("username")))) + panic(0, "DLR: MySQL: directive 'username' is not specified!"); + if (!(mysql_pass = cfg_get(grp, octstr_imm("password")))) + panic(0, "DLR: MySQL: directive 'password' is not specified!"); + if (!(mysql_db = cfg_get(grp, octstr_imm("database")))) + panic(0, "DLR: MySQL: directive 'database' is not specified!"); + cfg_get_integer(&mysql_port, grp, octstr_imm("port")); /* optional */ + + /* + * ok, ready to connect to MySQL + */ + db_conf = gw_malloc(sizeof(DBConf)); + gw_assert(db_conf != NULL); + + db_conf->mysql = gw_malloc(sizeof(MySQLConf)); + gw_assert(db_conf->mysql != NULL); + + db_conf->mysql->host = mysql_host; + db_conf->mysql->port = mysql_port; + db_conf->mysql->username = mysql_user; + db_conf->mysql->password = mysql_pass; + db_conf->mysql->database = mysql_db; + + pool = dbpool_create(DBPOOL_MYSQL, db_conf, pool_size); + gw_assert(pool != NULL); + + /* + * XXX should a failing connect throw panic?! + */ + if (dbpool_conn_count(pool) == 0) + panic(0,"DLR: MySQL: database pool has no connections!"); + + octstr_destroy(mysql_id); + loaded = gwlist_create(); + gwlist_add_producer(loaded); + counter = counter_create(); + + return 0; + +} +#endif