您好,欢迎访问一九零五行业门户网

golang操作Redis&Mysql&RabbitMQ的方法介绍

golang操作redis&mysql&rabbitmq:
reids
安装导入
go get github.com/garyburd/redigo/redisimport github.com/garyburd/redigo/redis
使用
连接
import github.com/garyburd/redigo/redisfunc main() {    c, err := redis.dial(tcp, localhost:6379)    if err != nil {        fmt.println(conn redis failed, err:, err)        return    }    defer c.close()}
set & get
       _, err = c.do(set, name, nick)    if err != nil {        fmt.println(err)        return    }    r, err := redis.string(c.do(get, name))    if err != nil {        fmt.println(err)        return    }    fmt.println(r)
mset & mget
批量设置
      _, err = c.do(mset, name, nick, age, 18)    if err != nil {        fmt.println(mset error: , err)        return    }    r2, err := redis.strings(c.do(mget, name, age))    if err != nil {        fmt.println(mget error: , err)        return    }    fmt.println(r2)
hset & hget
hash操作
   _, err = c.do(hset, names, nick, suoning)    if err != nil {        fmt.println(hset error: , err)        return    }    r, err = redis.string(c.do(hget, names, nick))    if err != nil {        fmt.println(hget error: , err)        return    }    fmt.println(r)
expire
设置过期时间
   _, err = c.do(expire, names, 5)    if err != nil {        fmt.println(expire error: , err)        return    }
lpush & lpop & llen
队列
 // 队列    _, err = c.do(lpush, queue, nick, dawn, 9)    if err != nil {        fmt.println(lpush error: , err)        return    }    for {        r, err = redis.string(c.do(lpop, queue))        if err != nil {            fmt.println(lpop error: , err)            break        }        fmt.println(r)    }    r3, err := redis.int(c.do(llen, queue))    if err != nil {        fmt.println(llen error: , err)        return    }
连接池
各参数的解释如下:
maxidle:最大的空闲连接数,表示即使没有redis连接时依然可以保持n个空闲的连接,而不被清除,随时处于待命状态。
maxactive:最大的激活连接数,表示同时最多有n个连接
idletimeout:最大的空闲连接等待时间,超过此时间后,空闲连接将被关闭
  pool := &redis.pool{        maxidle:     16,        maxactive:   1024,        idletimeout: 300,        dial: func() (redis.conn, error) {            return redis.dial(tcp, localhost:6379)        },    }
连接池例子:
package mainimport (    fmt    github.com/garyburd/redigo/redis)var pool *redis.poolfunc init() {    pool = &redis.pool{        maxidle:     16,        maxactive:   1024,        idletimeout: 300,        dial: func() (redis.conn, error) {            return redis.dial(tcp, localhost:6379)        },    }}func main() {    c := pool.get()    defer c.close()    _, err := c.do(set, name, nick)    if err != nil {        fmt.println(err)        return    }    r, err := redis.string(c.do(get, name))    if err != nil {        fmt.println(err)        return    }    fmt.println(r)}
管道操作
请求/响应服务可以实现持续处理新请求,客户端可以发送多个命令到服务器而无需等待响应,最后在一次读取多个响应。
使用send(),flush(),receive()方法支持管道化操作
send向连接的输出缓冲中写入命令。
flush将连接的输出缓冲清空并写入服务器端。
recevie按照fifo顺序依次读取服务器的响应。
func main() {    c, err := redis.dial(tcp, localhost:6379)    if err != nil {        fmt.println(conn redis failed, err:, err)        return    }    defer c.close()    c.send(set, name1, sss1)    c.send(set, name2, sss2)    c.flush()    v, err := c.receive()    fmt.printf(v:%v,err:%v\n, v, err)    v, err = c.receive()    fmt.printf(v:%v,err:%v\n, v, err)    v, err = c.receive()    // 夯住,一直等待    fmt.printf(v:%v,err:%v\n, v, err)}
mysql
安装导入
go get github.com/go-sql-driver/mysqlgo get github.com/jmoiron/sqlximport (    _ github.com/go-sql-driver/mysql    github.com/jmoiron/sqlx)
连接
import (    _ github.com/go-sql-driver/mysql    github.com/jmoiron/sqlx)var db *sqlx.dbfunc init() {    database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test)    if err != nil {        fmt.println(open mysql failed,, err)        return    }    db = database}
建表
create table `person` (  `user_id` int(128) default null,  `username` varchar(255) default null,  `sex` varchar(16) default null,  `email` varchar(128) default null) engine=innodb default charset=utf8
(insert)
package mainimport (    fmt    _ github.com/go-sql-driver/mysql    github.com/jmoiron/sqlx)type person struct {    userid   int    `db:user_id`    username string `db:username`    sex      string `db:sex`    email    string `db:email`}var db *sqlx.dbfunc init() {    database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test)    if err != nil {        fmt.println(open mysql failed,, err)        return    }    db = database}func main() {    r, err := db.exec(insert into person(username, sex, email)values(?, ?, ?), suoning, man, suoning@net263.com)    if err != nil {        fmt.println(exec failed, , err)        return    }    id, err := r.lastinsertid()    if err != nil {        fmt.println(exec failed, , err)        return    }    fmt.println(insert succ:, id)}
(update)
package mainimport (    fmt    _ github.com/go-sql-driver/mysql    github.com/jmoiron/sqlx)type person struct {    userid   int    `db:user_id`    username string `db:username`    sex      string `db:sex`    email    string `db:email`}var db *sqlx.dbfunc init() {    database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test)    if err != nil {        fmt.println(open mysql failed,, err)        return    }    db = database}func main() {    _, err := db.exec(update person set user_id=? where username=?, 20170808, suoning)    if err != nil {        fmt.println(exec failed, , err)        return    }}
(select)
package mainimport (    fmt    _ github.com/go-sql-driver/mysql    github.com/jmoiron/sqlx)type person struct {    userid   int    `db:user_id`    username string `db:username`    sex      string `db:sex`    email    string `db:email`}type place struct {    country string `db:country`    city    string `db:city`    telcode int    `db:telcode`}var db *sqlx.dbfunc init() {    database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test)    if err != nil {        fmt.println(open mysql failed,, err)        return    }    db = database}func main() {    var person []person    err := db.select(&person, select user_id, username, sex, email from person where user_id=?, 1)    if err != nil {        fmt.println(exec failed, , err)        return    }    fmt.println(select succ:, person)    people := []person{}    db.select(&people, select * from person order by user_id asc)    fmt.println(people)    jason, john := people[0], people[1]    fmt.printf(%#v\n%#v, jason, john)}
(delete)
package mainimport (    fmt    _ github.com/go-sql-driver/mysql    github.com/jmoiron/sqlx)type person struct {    userid   int    `db:user_id`    username string `db:username`    sex      string `db:sex`    email    string `db:email`}var db *sqlx.dbfunc init() {    database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test)    if err != nil {        fmt.println(open mysql failed,, err)        return    }    db = database}func main() {    _, err := db.exec(delete from person where username=? limit 1, suoning)    if err != nil {        fmt.println(exec failed, , err)        return    }    fmt.println(delete succ)}
事务
package mainimport (    github.com/astaxie/beego/logs    _ github.com/go-sql-driver/mysql    github.com/jmoiron/sqlx)var db *sqlx.dbfunc init() {    database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test)    if err != nil {        logs.error(open mysql failed,, err)        return    }    db = database}func main()  {    conn, err := db.begin()    if err != nil {        logs.warn(db.begin failed, err:%v, err)        return    }    defer func() {        if err != nil {            conn.rollback()            return        }        conn.commit()    }()    // do something}
rabbitmq
安装
go get github.com/streadway/amqp
普通模式
生产者:
package mainimport (    fmt    log    os    strings    github.com/streadway/amqp    time)/*默认点对点模式*/func failonerror(err error, msg string) {    if err != nil {        log.fatalf(%s: %s, msg, err)        panic(fmt.sprintf(%s: %s, msg, err))    }}func main() {    // 连接    conn, err := amqp.dial(amqp://guest:guest@localhost:5672/)    failonerror(err, failed to connect to rabbitmq)    defer conn.close()    // 打开一个并发服务器通道来处理消息    ch, err := conn.channel()    failonerror(err, failed to open a channel)    defer ch.close()    // 申明一个队列    q, err := ch.queuedeclare(        task_queue, // name        true,         // durable  持久性的,如果事前已经声明了该队列,不能重复声明        false,        // delete when unused        false,        // exclusive 如果是真,连接一断开,队列删除        false,        // no-wait        nil,          // arguments    )    failonerror(err, failed to declare a queue)    body := bodyfrom(os.args)    // 发布    err = ch.publish(        ,     // exchange 默认模式,exchange为空        q.name,           // routing key 默认模式路由到同名队列,即是task_queue        false,  // mandatory        false,        amqp.publishing{            // 持久性的发布,因为队列被声明为持久的,发布消息必须加上这个(可能不用),但消息还是可能会丢,如消息到缓存但mq挂了来不及持久化。            deliverymode: amqp.persistent,            contenttype:  text/plain,            body:         []byte(body),        })    failonerror(err, failed to publish a message)    log.printf( [x] sent %s, body)}func bodyfrom(args []string) string {    var s string    if (len(args) < 2) || os.args[1] ==  {        s = fmt.sprintf(%s-%v,hello, time.now())    } else {        s = strings.join(args[1:],  )    }    return s}
消费者:
package mainimport (    bytes    fmt    github.com/streadway/amqp    log    time)/*默认点对点模式工作方,多个,拿发布方的消息*/func failonerror(err error, msg string) {    if err != nil {        log.fatalf(%s: %s, msg, err)        panic(fmt.sprintf(%s: %s, msg, err))    }}func main() {    conn, err := amqp.dial(amqp://guest:guest@localhost:5672/)    failonerror(err, failed to connect to rabbitmq)    defer conn.close()    ch, err := conn.channel()    failonerror(err, failed to open a channel)    defer ch.close()    // 指定队列!    q, err := ch.queuedeclare(        task_queue, // name        true,         // durable        false,        // delete when unused        false,        // exclusive        false,        // no-wait        nil,          // arguments    )    failonerror(err, failed to declare a queue)    // fair dispatch 预取,每个工作方每次拿一个消息,确认后才拿下一次,缓解压力    err = ch.qos(        1,     // prefetch count        0,     // prefetch size        false, // global    )    failonerror(err, failed to set qos)    // 消费根据队列名    msgs, err := ch.consume(        q.name, // queue        ,     // consumer        false,  // auto-ack   设置为真自动确认消息        false,  // exclusive        false,  // no-local        false,  // no-wait        nil,    // args    )    failonerror(err, failed to register a consumer)    forever := make(chan bool)    go func() {        for d := range msgs {            log.printf(received a message: %s, d.body)            dot_count := bytes.count(d.body, []byte(.))            t := time.duration(dot_count)            time.sleep(t * time.second)            log.printf(done)            // 确认消息被收到!!如果为真的,那么同在一个channel,在该消息之前未确认的消息都会确认,适合批量处理            // 真时场景:每十条消息确认一次,类似            d.ack(false)        }    }()    log.printf( [*] waiting for messages. to exit press ctrl+c)    <-forever}
订阅模式
订阅 生产者:
package mainimport (    fmt    github.com/streadway/amqp    log    os    strings    time)/*广播模式发布方*/func failonerror(err error, msg string) {    if err != nil {        log.fatalf(%s: %s, msg, err)        panic(fmt.sprintf(%s: %s, msg, err))    }}func main() {    conn, err := amqp.dial(amqp://guest:guest@localhost:5672/)    failonerror(err, failed to connect to rabbitmq)    defer conn.close()    ch, err := conn.channel()    failonerror(err, failed to open a channel)    defer ch.close()    // 默认模式有默认交换机,广播自己定义一个交换机,交换机可与队列进行绑定    err = ch.exchangedeclare(        logs,   // name        fanout, // type 广播模式        true,     // durable        false,    // auto-deleted        false,    // internal        false,    // no-wait        nil,      // arguments    )    failonerror(err, failed to declare an exchange)    body := bodyfrom(os.args)    // 发布    err = ch.publish(        logs, // exchange 消息发送到交换机,这个时候没队列绑定交换机,消息会丢弃        ,     // routing key  广播模式不需要这个,它会把所有消息路由到绑定的所有队列        false,  // mandatory        false,  // immediate        amqp.publishing{            contenttype: text/plain,            body:        []byte(body),        })    failonerror(err, failed to publish a message)    log.printf( [x] sent %s, body)}func bodyfrom(args []string) string {    var s string    if (len(args) < 2) || os.args[1] ==  {        s = fmt.sprintf(%s-%v,hello, time.now())    } else {        s = strings.join(args[1:],  )    }    return s}
订阅 消费者:
package mainimport (    fmt    github.com/streadway/amqp    log)/*广播模式订阅方*/func failonerror(err error, msg string) {    if err != nil {        log.fatalf(%s: %s, msg, err)        panic(fmt.sprintf(%s: %s, msg, err))    }}func main() {    conn, err := amqp.dial(amqp://guest:guest@localhost:5672/)    failonerror(err, failed to connect to rabbitmq)    defer conn.close()    ch, err := conn.channel()    failonerror(err, failed to open a channel)    defer ch.close()    // 同样要申明交换机    err = ch.exchangedeclare(        logs,   // name        fanout, // type        true,     // durable        false,    // auto-deleted        false,    // internal        false,    // no-wait        nil,      // arguments    )    failonerror(err, failed to declare an exchange)    // 新建队列,这个队列没名字,随机生成一个名字    q, err := ch.queuedeclare(        ,    // name        false, // durable        false, // delete when usused        true,  // exclusive  表示连接一断开,这个队列自动删除        false, // no-wait        nil,   // arguments    )    failonerror(err, failed to declare a queue)    // 队列和交换机绑定,即是队列订阅了发到这个交换机的消息    err = ch.queuebind(        q.name, // queue name  队列的名字        ,     // routing key  广播模式不需要这个        logs, // exchange  交换机名字        false,        nil)    failonerror(err, failed to bind a queue)    // 开始消费消息,可开多个订阅方,因为队列是临时生成的,所有每个订阅方都能收到同样的消息    msgs, err := ch.consume(        q.name, // queue  队列名字        ,     // consumer        true,   // auto-ack  自动确认        false,  // exclusive        false,  // no-local        false,  // no-wait        nil,    // args    )    failonerror(err, failed to register a consumer)    forever := make(chan bool)    go func() {        for d := range msgs {            log.printf( [x] %s, d.body)        }    }()    log.printf( [*] waiting for logs. to exit press ctrl+c)    <-forever}
rpc模式
rpc 应答方:
package mainimport (    fmt    log    strconv    github.com/streadway/amqp)/*rpc模式应答方*/func failonerror(err error, msg string) {    if err != nil {        log.fatalf(%s: %s, msg, err)        panic(fmt.sprintf(%s: %s, msg, err))    }}func fib(n int) int {    if n == 0 {        return 0    } else if n == 1 {        return 1    } else {        return fib(n-1) + fib(n-2)    }}func main() {    conn, err := amqp.dial(amqp://guest:guest@localhost:5672/)    failonerror(err, failed to connect to rabbitmq)    defer conn.close()    ch, err := conn.channel()    failonerror(err, failed to open a channel)    defer ch.close()    q, err := ch.queuedeclare(        rpc_queue, // name        false,       // durable        false,       // delete when usused        false,       // exclusive        false,       // no-wait        nil,         // arguments    )    failonerror(err, failed to declare a queue)    // 公平分发 没有这个则round-robbin    err = ch.qos(        1,     // prefetch count        0,     // prefetch size        false, // global    )    failonerror(err, failed to set qos)    // 消费,等待请求    msgs, err := ch.consume(        q.name, // queue        ,     // consumer        false,  // auto-ack        false,  // exclusive        false,  // no-local        false,  // no-wait        nil,    // args    )    failonerror(err, failed to register a consumer)    forever := make(chan bool)    go func() {        //请求来了        for d := range msgs {            n, err := strconv.atoi(string(d.body))            failonerror(err, failed to convert body to integer)            log.printf( [.] fib(%d), n)            // 计算            response := fib(n)            // 回答            err = ch.publish(                ,        // exchange                d.replyto, // routing key                false,     // mandatory                false,     // immediate                amqp.publishing{                    contenttype:   text/plain,                    correlationid: d.correlationid,  //序列号                    body:          []byte(strconv.itoa(response)),                })            failonerror(err, failed to publish a message)            // 确认回答完毕            d.ack(false)        }    }()    log.printf( [*] awaiting rpc requests)    <-forever}
rpc 请求方:
package mainimport (    fmt    log    math/rand    os    strconv    strings    time    github.com/streadway/amqp)/*rpc模式请求方*/func failonerror(err error, msg string) {    if err != nil {        log.fatalf(%s: %s, msg, err)        panic(fmt.sprintf(%s: %s, msg, err))    }}func randomstring(l int) string {    bytes := make([]byte, l)    for i := 0; i < l; i++ {        bytes[i] = byte(randint(65, 90))    }    return string(bytes)}func randint(min int, max int) int {    return min + rand.intn(max-min)}func fibonaccirpc(n int) (res int, err error) {    conn, err := amqp.dial(amqp://guest:guest@localhost:5672/)    failonerror(err, failed to connect to rabbitmq)    defer conn.close()    ch, err := conn.channel()    failonerror(err, failed to open a channel)    defer ch.close()    // 队列声明    q, err := ch.queuedeclare(        ,    // name        false, // durable        false, // delete when usused        true,  // exclusive 为真即连接断开就删除        false, // nowait        nil,   // arguments    )    failonerror(err, failed to declare a queue)    msgs, err := ch.consume(        q.name, // queue        ,     // consumer        true,   // auto-ack        false,  // exclusive   这个为真,服务器会认为这是该队列唯一的消费者        false,  // no-local        false,  // no-wait        nil,    // args    )    failonerror(err, failed to register a consumer)    corrid := randomstring(32)    err = ch.publish(        ,          // exchange        rpc_queue, // routing key        false,       // mandatory        false,       // immediate        amqp.publishing{            contenttype:   text/plain,            correlationid: corrid,            replyto:       q.name,            body:          []byte(strconv.itoa(n)),        })    failonerror(err, failed to publish a message)    for d := range msgs {        if corrid == d.correlationid {            res, err = strconv.atoi(string(d.body))            failonerror(err, failed to convert body to integer)            break        }    }    return}func main() {    rand.seed(time.now().utc().unixnano())    n := bodyfrom(os.args)    log.printf( [x] requesting fib(%d), n)    res, err := fibonaccirpc(n)    failonerror(err, failed to handle rpc request)    log.printf( [.] got %d, res)}func bodyfrom(args []string) int {    var s string    if (len(args) < 2) || os.args[1] ==  {        s = 30    } else {        s = strings.join(args[1:],  )    }    n, err := strconv.atoi(s)    failonerror(err, failed to convert arg to integer)    return n}
推荐:golang教程
以上就是golang操作redis&mysql&rabbitmq的方法介绍的详细内容。
其它类似信息

推荐信息