Browse Source

server: Remove server4/6 and integrate serve logic

This removes the use of dhcp/server{4,6}.Server from coredhcp (but keeps
the connection establishment from NewIpv?UDPConn); instead taking in the
serving logic.

This allows modifying the API between the socket and the handlers
easily, which we do to:
 * Use `ipv4/6.PacketConn.ReadFrom`, to obtain additional info on the
   packets, notably the incoming interface, to fix the outstanding bug
   documented in #52
 * Simplify the kludge of listening on multiple addresses
 * Move the parsing of the DHCP packet into the per-request goroutine;
   so that the routine listening on the connection only copies the bits
   before going to the next packet

As a incidental bonus, we can rip out the big kitchen-sink Server struct

Signed-off-by: Anatole Denis <anatole@unverle.fr>
Anatole Denis 6 years ago
parent
commit
cd8dc74571
5 changed files with 200 additions and 74 deletions
  1. 2 2
      cmds/coredhcp/main.go
  2. 1 0
      go.mod
  3. 2 0
      go.sum
  4. 81 37
      server/handle.go
  5. 114 35
      server/serve.go

+ 2 - 2
cmds/coredhcp/main.go

@@ -89,8 +89,8 @@ func main() {
 	}
 
 	// start server
-	srv := server.NewServer(config)
-	if err := srv.Start(); err != nil {
+	srv, err := server.Start(config)
+	if err != nil {
 		log.Fatal(err)
 	}
 	if err := srv.Wait(); err != nil {

+ 1 - 0
go.mod

@@ -18,4 +18,5 @@ require (
 	github.com/spf13/viper v1.4.0
 	github.com/u-root/u-root v6.0.0+incompatible // indirect
 	github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
+	golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3
 )

+ 2 - 0
go.sum

@@ -305,6 +305,8 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
 golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3 h1:6KET3Sqa7fkVfD63QnAM81ZeYg5n4HwApOJkufONnHA=
+golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

+ 81 - 37
server/handle.go

@@ -7,48 +7,39 @@ package server
 import (
 	"fmt"
 	"net"
+	"sync"
+
+	"golang.org/x/net/ipv4"
+	"golang.org/x/net/ipv6"
 
-	"github.com/coredhcp/coredhcp/config"
-	"github.com/coredhcp/coredhcp/handler"
 	"github.com/insomniacslk/dhcp/dhcpv4"
-	"github.com/insomniacslk/dhcp/dhcpv4/server4"
 	"github.com/insomniacslk/dhcp/dhcpv6"
-	"github.com/insomniacslk/dhcp/dhcpv6/server6"
 )
 
-// Server is a CoreDHCP server structure that holds information about
-// DHCPv6 and DHCPv4 servers, and their respective handlers.
-type Server struct {
-	Handlers6 []handler.Handler6
-	Handlers4 []handler.Handler4
-	Config    *config.Config
-	Servers6  []*server6.Server
-	Servers4  []*server4.Server
-	errors    chan error
-}
-
 // BUG(Natolumin): Servers not bound to a specific interface may send responses
 // on the wrong interface as they will use the default route.
 // See https://github.com/coredhcp/coredhcp/issues/52
 
-// MainHandler6 runs for every received DHCPv6 packet. It will run every
+// HandleMsg6 runs for every received DHCPv6 packet. It will run every
 // registered handler in sequence, and reply with the resulting response.
 // It will not reply if the resulting response is `nil`.
-func (s *Server) MainHandler6(conn net.PacketConn, peer net.Addr, req dhcpv6.DHCPv6) {
-	var (
-		resp dhcpv6.DHCPv6
-		stop bool
-		err  error
-	)
+func (l *listener6) HandleMsg6(buf []byte, oob *ipv6.ControlMessage, peer *net.UDPAddr) {
+	d, err := dhcpv6.FromBytes(buf)
+	bufpool.Put(&buf)
+	if err != nil {
+		log.Printf("Error parsing DHCPv6 request: %v", err)
+		return
+	}
 
 	// decapsulate the relay message
-	msg, err := req.GetInnerMessage()
+	msg, err := d.GetInnerMessage()
 	if err != nil {
 		log.Warningf("DHCPv6: cannot get inner message: %v", err)
 		return
 	}
 
 	// Create a suitable basic response packet
+	var resp dhcpv6.DHCPv6
 	switch msg.Type() {
 	case dhcpv6.MessageTypeSolicit:
 		if msg.GetOneOption(dhcpv6.OptionRapidCommit) != nil {
@@ -62,13 +53,14 @@ func (s *Server) MainHandler6(conn net.PacketConn, peer net.Addr, req dhcpv6.DHC
 	default:
 		err = fmt.Errorf("MainHandler6: message type %d not supported", msg.Type())
 	}
-
 	if err != nil {
 		log.Printf("MainHandler6: NewReplyFromDHCPv6Message failed: %v", err)
 		return
 	}
-	for _, handler := range s.Handlers6 {
-		resp, stop = handler(req, resp)
+
+	var stop bool
+	for _, handler := range l.handlers {
+		resp, stop = handler(d, resp)
 		if stop {
 			break
 		}
@@ -79,27 +71,38 @@ func (s *Server) MainHandler6(conn net.PacketConn, peer net.Addr, req dhcpv6.DHC
 	}
 
 	// if the request was relayed, re-encapsulate the response
-	if req.IsRelay() {
-		tmp, err := dhcpv6.NewRelayReplFromRelayForw(req.(*dhcpv6.RelayMessage), resp.(*dhcpv6.Message))
-		if err != nil {
-			log.Warningf("DHCPv6: cannot create relay-repl from relay-forw: %v", err)
-			return
+	if d.IsRelay() {
+		if rmsg, ok := resp.(*dhcpv6.Message); !ok {
+			log.Warningf("DHCPv6: response is a relayed message, not reencapsulating")
+		} else {
+			tmp, err := dhcpv6.NewRelayReplFromRelayForw(d.(*dhcpv6.RelayMessage), rmsg)
+			if err != nil {
+				log.Warningf("DHCPv6: cannot create relay-repl from relay-forw: %v", err)
+				return
+			}
+			resp = tmp
 		}
-		resp = tmp
 	}
 
-	if _, err := conn.WriteTo(resp.ToBytes(), peer); err != nil {
+	if _, err := l.WriteTo(resp.ToBytes(), nil, peer); err != nil {
 		log.Printf("MainHandler6: conn.Write to %v failed: %v", peer, err)
 	}
 }
 
-// MainHandler4 is like MainHandler6, but for DHCPv4 packets.
-func (s *Server) MainHandler4(conn net.PacketConn, _peer net.Addr, req *dhcpv4.DHCPv4) {
+func (l *listener4) HandleMsg4(buf []byte, oob *ipv4.ControlMessage, _peer net.Addr) {
 	var (
 		resp, tmp *dhcpv4.DHCPv4
 		err       error
 		stop      bool
 	)
+
+	req, err := dhcpv4.FromBytes(buf)
+	bufpool.Put(&buf)
+	if err != nil {
+		log.Printf("Error parsing DHCPv6 request: %v", err)
+		return
+	}
+
 	if req.OpCode != dhcpv4.OpcodeBootRequest {
 		log.Printf("MainHandler4: unsupported opcode %d. Only BootRequest (%d) is supported", req.OpCode, dhcpv4.OpcodeBootRequest)
 		return
@@ -120,7 +123,7 @@ func (s *Server) MainHandler4(conn net.PacketConn, _peer net.Addr, req *dhcpv4.D
 	}
 
 	resp = tmp
-	for _, handler := range s.Handlers4 {
+	for _, handler := range l.handlers {
 		resp, stop = handler(req, resp)
 		if stop {
 			break
@@ -149,7 +152,7 @@ func (s *Server) MainHandler4(conn net.PacketConn, _peer net.Addr, req *dhcpv4.D
 			peer = &net.UDPAddr{IP: net.IPv4bcast, Port: dhcpv4.ClientPort}
 		}
 
-		if _, err := conn.WriteTo(resp.ToBytes(), peer); err != nil {
+		if _, err := l.WriteTo(resp.ToBytes(), nil, peer); err != nil {
 			log.Printf("MainHandler4: conn.Write to %v failed: %v", peer, err)
 		}
 
@@ -157,3 +160,44 @@ func (s *Server) MainHandler4(conn net.PacketConn, _peer net.Addr, req *dhcpv4.D
 		log.Print("MainHandler4: dropping request because response is nil")
 	}
 }
+
+// XXX: performance-wise, Pool may or may not be good (see https://github.com/golang/go/issues/23199)
+// Interface is good for what we want. Maybe "just" trust the GC and we'll be fine ?
+var bufpool = sync.Pool{New: func() interface{} { r := make([]byte, MaxDatagram); return &r }}
+
+// MaxDatagram is the maximum length of message that can be received.
+const MaxDatagram = 1 << 16
+
+// XXX: investigate using RecvMsgs to batch messages and reduce syscalls
+
+// Serve6 handles datagrams received on conn and passes them to the pluginchain
+func (l *listener6) Serve() error {
+	log.Printf("Listen %s", l.LocalAddr())
+	for {
+		b := *bufpool.Get().(*[]byte)
+		b = b[:MaxDatagram] //Reslice to max capacity in case the buffer in pool was resliced smaller
+
+		n, oob, peer, err := l.ReadFrom(b)
+		if err != nil {
+			log.Printf("Error reading from connection: %v", err)
+			return err
+		}
+		go l.HandleMsg6(b[:n], oob, peer.(*net.UDPAddr))
+	}
+}
+
+// Serve6 handles datagrams received on conn and passes them to the pluginchain
+func (l *listener4) Serve() error {
+	log.Printf("Listen %s", l.LocalAddr())
+	for {
+		b := *bufpool.Get().(*[]byte)
+		b = b[:MaxDatagram] //Reslice to max capacity in case the buffer in pool was resliced smaller
+
+		n, oob, peer, err := l.ReadFrom(b)
+		if err != nil {
+			log.Printf("Error reading from connection: %v", err)
+			return err
+		}
+		go l.HandleMsg4(b[:n], oob, peer.(*net.UDPAddr))
+	}
+}

+ 114 - 35
server/serve.go

@@ -5,7 +5,15 @@
 package server
 
 import (
+	"fmt"
+	"io"
+	"net"
+
+	"golang.org/x/net/ipv4"
+	"golang.org/x/net/ipv6"
+
 	"github.com/coredhcp/coredhcp/config"
+	"github.com/coredhcp/coredhcp/handler"
 	"github.com/coredhcp/coredhcp/logger"
 	"github.com/coredhcp/coredhcp/plugins"
 	"github.com/insomniacslk/dhcp/dhcpv4/server4"
@@ -14,68 +22,139 @@ import (
 
 var log = logger.GetLogger("server")
 
-// Start will start the server asynchronously. See `Wait` to wait until
-// the execution ends.
-func (s *Server) Start() error {
+type listener6 struct {
+	*ipv6.PacketConn
+	handlers []handler.Handler6
+}
+
+type listener4 struct {
+	*ipv4.PacketConn
+	handlers []handler.Handler4
+}
+
+type listener interface {
+	io.Closer
+}
+
+// Servers contains state for a running server (with possibly multiple interfaces/listeners)
+type Servers struct {
+	listeners []listener
+	errors    chan error
+}
+
+func listen4(a *net.UDPAddr) (*listener4, error) {
 	var err error
+	l4 := listener4{}
+	udpConn, err := server4.NewIPv4UDPConn(a.Zone, a)
+	if err != nil {
+		return nil, err
+	}
+	l4.PacketConn = ipv4.NewPacketConn(udpConn)
+	var ifi *net.Interface
+	if a.Zone != "" {
+		ifi, err = net.InterfaceByName(a.Zone)
+		if err != nil {
+			return nil, fmt.Errorf("DHCPv4: Listen could not find interface %s: %v", a.Zone, err)
+		}
+	}
+
+	if a.IP.IsMulticast() {
+		err = l4.JoinGroup(ifi, a)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return &l4, nil
+}
 
-	s.Handlers4, s.Handlers6, err = plugins.LoadPlugins(s.Config)
+func listen6(a *net.UDPAddr) (*listener6, error) {
+	l6 := listener6{}
+	udpconn, err := server6.NewIPv6UDPConn(a.Zone, a)
 	if err != nil {
-		return err
+		return nil, err
+	}
+	l6.PacketConn = ipv6.NewPacketConn(udpconn)
+	var ifi *net.Interface
+	if a.Zone != "" {
+		ifi, err = net.InterfaceByName(a.Zone)
+		if err != nil {
+			return nil, fmt.Errorf("DHCPv4: Listen could not find interface %s: %v", a.Zone, err)
+		}
+	}
+
+	if a.IP.IsMulticast() {
+		err = l6.JoinGroup(ifi, a)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return &l6, nil
+}
+
+// Start will start the server asynchronously. See `Wait` to wait until
+// the execution ends.
+func Start(config *config.Config) (*Servers, error) {
+	handlers4, handlers6, err := plugins.LoadPlugins(config)
+	if err != nil {
+		return nil, err
+	}
+	srv := Servers{
+		errors: make(chan error),
 	}
 
 	// listen
-	if s.Config.Server6 != nil {
+	if config.Server6 != nil {
 		log.Println("Starting DHCPv6 server")
-		for _, l := range s.Config.Server6.Addresses {
-			s6, err := server6.NewServer(l.Zone, l, s.MainHandler6)
+		for _, addr := range config.Server6.Addresses {
+			var l6 *listener6
+			l6, err = listen6(addr)
 			if err != nil {
-				return err
+				goto cleanup
 			}
-			s.Servers6 = append(s.Servers6, s6)
-			log.Infof("Listen %s", l)
+			l6.handlers = handlers6
+			srv.listeners = append(srv.listeners, l6)
 			go func() {
-				s.errors <- s6.Serve()
+				srv.errors <- l6.Serve()
 			}()
 		}
 	}
 
-	if s.Config.Server4 != nil {
+	if config.Server4 != nil {
 		log.Println("Starting DHCPv4 server")
-		for _, l := range s.Config.Server4.Addresses {
-			s4, err := server4.NewServer(l.Zone, l, s.MainHandler4)
+		for _, addr := range config.Server4.Addresses {
+			var l4 *listener4
+			l4, err = listen4(addr)
 			if err != nil {
-				return err
+				goto cleanup
 			}
-			s.Servers4 = append(s.Servers4, s4)
-			log.Infof("Listen %s", l)
+			l4.handlers = handlers4
+			srv.listeners = append(srv.listeners, l4)
 			go func() {
-				s.errors <- s4.Serve()
+				srv.errors <- l4.Serve()
 			}()
 		}
 	}
 
-	return nil
+	return &srv, nil
+
+cleanup:
+	srv.Close()
+	return nil, err
 }
 
 // Wait waits until the end of the execution of the server.
-func (s *Server) Wait() error {
-	log.Print("Waiting")
+func (s *Servers) Wait() error {
+	log.Debug("Waiting")
 	err := <-s.errors
-	for _, s6 := range s.Servers6 {
-		if s6 != nil {
-			s6.Close()
-		}
-	}
-	for _, s4 := range s.Servers4 {
-		if s4 != nil {
-			s4.Close()
-		}
-	}
+	s.Close()
 	return err
 }
 
-// NewServer creates a Server instance with the provided configuration.
-func NewServer(config *config.Config) *Server {
-	return &Server{Config: config, errors: make(chan error, 1)}
+// Close closes all listening connections
+func (s *Servers) Close() {
+	for _, srv := range s.listeners {
+		if srv != nil {
+			srv.Close()
+		}
+	}
 }