golang操作redis&mysql&rabbitmq:
reids
安装导入
go get github.com/garyburd/redigo/redisimport github.com/garyburd/redigo/redis
使用
连接
import github.com/garyburd/redigo/redisfunc main() { c, err := redis.dial(tcp, localhost:6379) if err != nil { fmt.println(conn redis failed, err:, err) return } defer c.close()}
set & get
_, err = c.do(set, name, nick) if err != nil { fmt.println(err) return } r, err := redis.string(c.do(get, name)) if err != nil { fmt.println(err) return } fmt.println(r)
mset & mget
批量设置
_, err = c.do(mset, name, nick, age, 18) if err != nil { fmt.println(mset error: , err) return } r2, err := redis.strings(c.do(mget, name, age)) if err != nil { fmt.println(mget error: , err) return } fmt.println(r2)
hset & hget
hash操作
_, err = c.do(hset, names, nick, suoning) if err != nil { fmt.println(hset error: , err) return } r, err = redis.string(c.do(hget, names, nick)) if err != nil { fmt.println(hget error: , err) return } fmt.println(r)
expire
设置过期时间
_, err = c.do(expire, names, 5) if err != nil { fmt.println(expire error: , err) return }
lpush & lpop & llen
队列
// 队列 _, err = c.do(lpush, queue, nick, dawn, 9) if err != nil { fmt.println(lpush error: , err) return } for { r, err = redis.string(c.do(lpop, queue)) if err != nil { fmt.println(lpop error: , err) break } fmt.println(r) } r3, err := redis.int(c.do(llen, queue)) if err != nil { fmt.println(llen error: , err) return }
连接池
各参数的解释如下:
maxidle:最大的空闲连接数,表示即使没有redis连接时依然可以保持n个空闲的连接,而不被清除,随时处于待命状态。
maxactive:最大的激活连接数,表示同时最多有n个连接
idletimeout:最大的空闲连接等待时间,超过此时间后,空闲连接将被关闭
pool := &redis.pool{ maxidle: 16, maxactive: 1024, idletimeout: 300, dial: func() (redis.conn, error) { return redis.dial(tcp, localhost:6379) }, }
连接池例子:
package mainimport ( fmt github.com/garyburd/redigo/redis)var pool *redis.poolfunc init() { pool = &redis.pool{ maxidle: 16, maxactive: 1024, idletimeout: 300, dial: func() (redis.conn, error) { return redis.dial(tcp, localhost:6379) }, }}func main() { c := pool.get() defer c.close() _, err := c.do(set, name, nick) if err != nil { fmt.println(err) return } r, err := redis.string(c.do(get, name)) if err != nil { fmt.println(err) return } fmt.println(r)}
管道操作
请求/响应服务可以实现持续处理新请求,客户端可以发送多个命令到服务器而无需等待响应,最后在一次读取多个响应。
使用send(),flush(),receive()方法支持管道化操作
send向连接的输出缓冲中写入命令。
flush将连接的输出缓冲清空并写入服务器端。
recevie按照fifo顺序依次读取服务器的响应。
func main() { c, err := redis.dial(tcp, localhost:6379) if err != nil { fmt.println(conn redis failed, err:, err) return } defer c.close() c.send(set, name1, sss1) c.send(set, name2, sss2) c.flush() v, err := c.receive() fmt.printf(v:%v,err:%v\n, v, err) v, err = c.receive() fmt.printf(v:%v,err:%v\n, v, err) v, err = c.receive() // 夯住,一直等待 fmt.printf(v:%v,err:%v\n, v, err)}
mysql
安装导入
go get github.com/go-sql-driver/mysqlgo get github.com/jmoiron/sqlximport ( _ github.com/go-sql-driver/mysql github.com/jmoiron/sqlx)
连接
import ( _ github.com/go-sql-driver/mysql github.com/jmoiron/sqlx)var db *sqlx.dbfunc init() { database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test) if err != nil { fmt.println(open mysql failed,, err) return } db = database}
建表
create table `person` ( `user_id` int(128) default null, `username` varchar(255) default null, `sex` varchar(16) default null, `email` varchar(128) default null) engine=innodb default charset=utf8
(insert)
package mainimport ( fmt _ github.com/go-sql-driver/mysql github.com/jmoiron/sqlx)type person struct { userid int `db:user_id` username string `db:username` sex string `db:sex` email string `db:email`}var db *sqlx.dbfunc init() { database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test) if err != nil { fmt.println(open mysql failed,, err) return } db = database}func main() { r, err := db.exec(insert into person(username, sex, email)values(?, ?, ?), suoning, man, suoning@net263.com) if err != nil { fmt.println(exec failed, , err) return } id, err := r.lastinsertid() if err != nil { fmt.println(exec failed, , err) return } fmt.println(insert succ:, id)}
(update)
package mainimport ( fmt _ github.com/go-sql-driver/mysql github.com/jmoiron/sqlx)type person struct { userid int `db:user_id` username string `db:username` sex string `db:sex` email string `db:email`}var db *sqlx.dbfunc init() { database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test) if err != nil { fmt.println(open mysql failed,, err) return } db = database}func main() { _, err := db.exec(update person set user_id=? where username=?, 20170808, suoning) if err != nil { fmt.println(exec failed, , err) return }}
(select)
package mainimport ( fmt _ github.com/go-sql-driver/mysql github.com/jmoiron/sqlx)type person struct { userid int `db:user_id` username string `db:username` sex string `db:sex` email string `db:email`}type place struct { country string `db:country` city string `db:city` telcode int `db:telcode`}var db *sqlx.dbfunc init() { database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test) if err != nil { fmt.println(open mysql failed,, err) return } db = database}func main() { var person []person err := db.select(&person, select user_id, username, sex, email from person where user_id=?, 1) if err != nil { fmt.println(exec failed, , err) return } fmt.println(select succ:, person) people := []person{} db.select(&people, select * from person order by user_id asc) fmt.println(people) jason, john := people[0], people[1] fmt.printf(%#v\n%#v, jason, john)}
(delete)
package mainimport ( fmt _ github.com/go-sql-driver/mysql github.com/jmoiron/sqlx)type person struct { userid int `db:user_id` username string `db:username` sex string `db:sex` email string `db:email`}var db *sqlx.dbfunc init() { database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test) if err != nil { fmt.println(open mysql failed,, err) return } db = database}func main() { _, err := db.exec(delete from person where username=? limit 1, suoning) if err != nil { fmt.println(exec failed, , err) return } fmt.println(delete succ)}
事务
package mainimport ( github.com/astaxie/beego/logs _ github.com/go-sql-driver/mysql github.com/jmoiron/sqlx)var db *sqlx.dbfunc init() { database, err := sqlx.open(mysql, root:@tcp(127.0.0.1:3306)/test) if err != nil { logs.error(open mysql failed,, err) return } db = database}func main() { conn, err := db.begin() if err != nil { logs.warn(db.begin failed, err:%v, err) return } defer func() { if err != nil { conn.rollback() return } conn.commit() }() // do something}
rabbitmq
安装
go get github.com/streadway/amqp
普通模式
生产者:
package mainimport ( fmt log os strings github.com/streadway/amqp time)/*默认点对点模式*/func failonerror(err error, msg string) { if err != nil { log.fatalf(%s: %s, msg, err) panic(fmt.sprintf(%s: %s, msg, err)) }}func main() { // 连接 conn, err := amqp.dial(amqp://guest:guest@localhost:5672/) failonerror(err, failed to connect to rabbitmq) defer conn.close() // 打开一个并发服务器通道来处理消息 ch, err := conn.channel() failonerror(err, failed to open a channel) defer ch.close() // 申明一个队列 q, err := ch.queuedeclare( task_queue, // name true, // durable 持久性的,如果事前已经声明了该队列,不能重复声明 false, // delete when unused false, // exclusive 如果是真,连接一断开,队列删除 false, // no-wait nil, // arguments ) failonerror(err, failed to declare a queue) body := bodyfrom(os.args) // 发布 err = ch.publish( , // exchange 默认模式,exchange为空 q.name, // routing key 默认模式路由到同名队列,即是task_queue false, // mandatory false, amqp.publishing{ // 持久性的发布,因为队列被声明为持久的,发布消息必须加上这个(可能不用),但消息还是可能会丢,如消息到缓存但mq挂了来不及持久化。 deliverymode: amqp.persistent, contenttype: text/plain, body: []byte(body), }) failonerror(err, failed to publish a message) log.printf( [x] sent %s, body)}func bodyfrom(args []string) string { var s string if (len(args) < 2) || os.args[1] == { s = fmt.sprintf(%s-%v,hello, time.now()) } else { s = strings.join(args[1:], ) } return s}
消费者:
package mainimport ( bytes fmt github.com/streadway/amqp log time)/*默认点对点模式工作方,多个,拿发布方的消息*/func failonerror(err error, msg string) { if err != nil { log.fatalf(%s: %s, msg, err) panic(fmt.sprintf(%s: %s, msg, err)) }}func main() { conn, err := amqp.dial(amqp://guest:guest@localhost:5672/) failonerror(err, failed to connect to rabbitmq) defer conn.close() ch, err := conn.channel() failonerror(err, failed to open a channel) defer ch.close() // 指定队列! q, err := ch.queuedeclare( task_queue, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failonerror(err, failed to declare a queue) // fair dispatch 预取,每个工作方每次拿一个消息,确认后才拿下一次,缓解压力 err = ch.qos( 1, // prefetch count 0, // prefetch size false, // global ) failonerror(err, failed to set qos) // 消费根据队列名 msgs, err := ch.consume( q.name, // queue , // consumer false, // auto-ack 设置为真自动确认消息 false, // exclusive false, // no-local false, // no-wait nil, // args ) failonerror(err, failed to register a consumer) forever := make(chan bool) go func() { for d := range msgs { log.printf(received a message: %s, d.body) dot_count := bytes.count(d.body, []byte(.)) t := time.duration(dot_count) time.sleep(t * time.second) log.printf(done) // 确认消息被收到!!如果为真的,那么同在一个channel,在该消息之前未确认的消息都会确认,适合批量处理 // 真时场景:每十条消息确认一次,类似 d.ack(false) } }() log.printf( [*] waiting for messages. to exit press ctrl+c) <-forever}
订阅模式
订阅 生产者:
package mainimport ( fmt github.com/streadway/amqp log os strings time)/*广播模式发布方*/func failonerror(err error, msg string) { if err != nil { log.fatalf(%s: %s, msg, err) panic(fmt.sprintf(%s: %s, msg, err)) }}func main() { conn, err := amqp.dial(amqp://guest:guest@localhost:5672/) failonerror(err, failed to connect to rabbitmq) defer conn.close() ch, err := conn.channel() failonerror(err, failed to open a channel) defer ch.close() // 默认模式有默认交换机,广播自己定义一个交换机,交换机可与队列进行绑定 err = ch.exchangedeclare( logs, // name fanout, // type 广播模式 true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failonerror(err, failed to declare an exchange) body := bodyfrom(os.args) // 发布 err = ch.publish( logs, // exchange 消息发送到交换机,这个时候没队列绑定交换机,消息会丢弃 , // routing key 广播模式不需要这个,它会把所有消息路由到绑定的所有队列 false, // mandatory false, // immediate amqp.publishing{ contenttype: text/plain, body: []byte(body), }) failonerror(err, failed to publish a message) log.printf( [x] sent %s, body)}func bodyfrom(args []string) string { var s string if (len(args) < 2) || os.args[1] == { s = fmt.sprintf(%s-%v,hello, time.now()) } else { s = strings.join(args[1:], ) } return s}
订阅 消费者:
package mainimport ( fmt github.com/streadway/amqp log)/*广播模式订阅方*/func failonerror(err error, msg string) { if err != nil { log.fatalf(%s: %s, msg, err) panic(fmt.sprintf(%s: %s, msg, err)) }}func main() { conn, err := amqp.dial(amqp://guest:guest@localhost:5672/) failonerror(err, failed to connect to rabbitmq) defer conn.close() ch, err := conn.channel() failonerror(err, failed to open a channel) defer ch.close() // 同样要申明交换机 err = ch.exchangedeclare( logs, // name fanout, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failonerror(err, failed to declare an exchange) // 新建队列,这个队列没名字,随机生成一个名字 q, err := ch.queuedeclare( , // name false, // durable false, // delete when usused true, // exclusive 表示连接一断开,这个队列自动删除 false, // no-wait nil, // arguments ) failonerror(err, failed to declare a queue) // 队列和交换机绑定,即是队列订阅了发到这个交换机的消息 err = ch.queuebind( q.name, // queue name 队列的名字 , // routing key 广播模式不需要这个 logs, // exchange 交换机名字 false, nil) failonerror(err, failed to bind a queue) // 开始消费消息,可开多个订阅方,因为队列是临时生成的,所有每个订阅方都能收到同样的消息 msgs, err := ch.consume( q.name, // queue 队列名字 , // consumer true, // auto-ack 自动确认 false, // exclusive false, // no-local false, // no-wait nil, // args ) failonerror(err, failed to register a consumer) forever := make(chan bool) go func() { for d := range msgs { log.printf( [x] %s, d.body) } }() log.printf( [*] waiting for logs. to exit press ctrl+c) <-forever}
rpc模式
rpc 应答方:
package mainimport ( fmt log strconv github.com/streadway/amqp)/*rpc模式应答方*/func failonerror(err error, msg string) { if err != nil { log.fatalf(%s: %s, msg, err) panic(fmt.sprintf(%s: %s, msg, err)) }}func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) }}func main() { conn, err := amqp.dial(amqp://guest:guest@localhost:5672/) failonerror(err, failed to connect to rabbitmq) defer conn.close() ch, err := conn.channel() failonerror(err, failed to open a channel) defer ch.close() q, err := ch.queuedeclare( rpc_queue, // name false, // durable false, // delete when usused false, // exclusive false, // no-wait nil, // arguments ) failonerror(err, failed to declare a queue) // 公平分发 没有这个则round-robbin err = ch.qos( 1, // prefetch count 0, // prefetch size false, // global ) failonerror(err, failed to set qos) // 消费,等待请求 msgs, err := ch.consume( q.name, // queue , // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failonerror(err, failed to register a consumer) forever := make(chan bool) go func() { //请求来了 for d := range msgs { n, err := strconv.atoi(string(d.body)) failonerror(err, failed to convert body to integer) log.printf( [.] fib(%d), n) // 计算 response := fib(n) // 回答 err = ch.publish( , // exchange d.replyto, // routing key false, // mandatory false, // immediate amqp.publishing{ contenttype: text/plain, correlationid: d.correlationid, //序列号 body: []byte(strconv.itoa(response)), }) failonerror(err, failed to publish a message) // 确认回答完毕 d.ack(false) } }() log.printf( [*] awaiting rpc requests) <-forever}
rpc 请求方:
package mainimport ( fmt log math/rand os strconv strings time github.com/streadway/amqp)/*rpc模式请求方*/func failonerror(err error, msg string) { if err != nil { log.fatalf(%s: %s, msg, err) panic(fmt.sprintf(%s: %s, msg, err)) }}func randomstring(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randint(65, 90)) } return string(bytes)}func randint(min int, max int) int { return min + rand.intn(max-min)}func fibonaccirpc(n int) (res int, err error) { conn, err := amqp.dial(amqp://guest:guest@localhost:5672/) failonerror(err, failed to connect to rabbitmq) defer conn.close() ch, err := conn.channel() failonerror(err, failed to open a channel) defer ch.close() // 队列声明 q, err := ch.queuedeclare( , // name false, // durable false, // delete when usused true, // exclusive 为真即连接断开就删除 false, // nowait nil, // arguments ) failonerror(err, failed to declare a queue) msgs, err := ch.consume( q.name, // queue , // consumer true, // auto-ack false, // exclusive 这个为真,服务器会认为这是该队列唯一的消费者 false, // no-local false, // no-wait nil, // args ) failonerror(err, failed to register a consumer) corrid := randomstring(32) err = ch.publish( , // exchange rpc_queue, // routing key false, // mandatory false, // immediate amqp.publishing{ contenttype: text/plain, correlationid: corrid, replyto: q.name, body: []byte(strconv.itoa(n)), }) failonerror(err, failed to publish a message) for d := range msgs { if corrid == d.correlationid { res, err = strconv.atoi(string(d.body)) failonerror(err, failed to convert body to integer) break } } return}func main() { rand.seed(time.now().utc().unixnano()) n := bodyfrom(os.args) log.printf( [x] requesting fib(%d), n) res, err := fibonaccirpc(n) failonerror(err, failed to handle rpc request) log.printf( [.] got %d, res)}func bodyfrom(args []string) int { var s string if (len(args) < 2) || os.args[1] == { s = 30 } else { s = strings.join(args[1:], ) } n, err := strconv.atoi(s) failonerror(err, failed to convert arg to integer) return n}
推荐:golang教程
以上就是golang操作redis&mysql&rabbitmq的方法介绍的详细内容。